Source code for tdc.benchmark_group.base_group

# -*- coding: utf-8 -*-
# Author: TDC Team
# License: MIT

import pandas as pd
import numpy as np
import os, sys, json
import warnings

warnings.filterwarnings("ignore")

from ..utils import bm_group_load, print_sys, fuzzy_search
from ..utils import (
    create_fold,
    create_fold_setting_cold,
    create_combination_split,
    create_fold_time,
    create_scaffold_split,
    create_group_split,
)
from ..metadata import (
    get_task2category,
    bm_metric_names,
    benchmark_names,
    bm_split_names,
    docking_target_info,
)
from ..evaluator import Evaluator


[docs]class BenchmarkGroup: """Boilerplate of benchmark group class. It downloads, processes, and loads a set of benchmark classes along with their splits. It also provides evaluators and train/valid splitters.""" def __init__(self, name, path="./data", file_format="csv"): """create a benchmark group class object Args: name (str): the name of the benchmark group class path (str, optional): the path to save/load the benchkmark group dataset file_format (str, optional): designated file format for each dataset in the benchmark group """ self.name = bm_group_load(name, path) self.path = os.path.join(path, self.name) self.datasets = benchmark_names[self.name] self.dataset_names = [] self.file_format = file_format for task, datasets in self.datasets.items(): for dataset in datasets: self.dataset_names.append(dataset) def __iter__(self): """iterator implementation to iterate over all benchmarks in the benchmark group Returns: BenchmarkGroup: self """ self.index = 0 self.num_datasets = len(self.dataset_names) return self def __next__(self): """iterator implementation to define the next benchmark Returns: dict: a dictionary of key values in a benchmark, namely the train_val file, test file and benchmark name Raises: StopIteration: stop when exceed the number of benchmarks """ if self.index < self.num_datasets: dataset = self.dataset_names[self.index] print_sys("--- " + dataset + " ---") data_path = os.path.join(self.path, dataset) if not os.path.exists(data_path): os.mkdir(data_path) if self.file_format == "csv": train = pd.read_csv(os.path.join(data_path, "train_val.csv")) test = pd.read_csv(os.path.join(data_path, "test.csv")) elif self.file_format == "pkl": train = pd.read_pickle(os.path.join(data_path, "train_val.pkl")) test = pd.read_pickle(os.path.join(data_path, "test.pkl")) self.index += 1 return {"train_val": train, "test": test, "name": dataset} else: raise StopIteration
[docs] def get_train_valid_split(self, seed, benchmark, split_type="default"): """obtain training and validation split given a split type from train_val file Args: seed (int): the random seed of the data split benchmark (str): name of the benchmark split_type (str, optional): name of the split Returns: pd.DataFrame: the training and validation files Raises: NotImplementedError: split method not implemented """ print_sys("generating training, validation splits...") dataset = fuzzy_search(benchmark, self.dataset_names) data_path = os.path.join(self.path, dataset) if self.file_format == "csv": train_val = pd.read_csv(os.path.join(data_path, "train_val.csv")) elif self.file_format == "pkl": train_val = pd.read_pickle(os.path.join(data_path, "train_val.pkl")) if split_type == "default": split_method = bm_split_names[self.name][dataset] else: split_method = split_type frac = [0.875, 0.125, 0.0] if split_method == "scaffold": out = create_scaffold_split(train_val, seed, frac=frac, entity="Drug") elif split_method == "random": out = create_fold(train_val, seed, frac=frac) elif split_method == "combination": out = create_combination_split(train_val, seed, frac=frac) elif split_method == "group": out = create_group_split( train_val, seed, holdout_frac=0.2, group_column="Year" ) else: raise NotImplementedError return out["train"], out["valid"]
[docs] def get(self, benchmark): """get individual benchmark Args: benchmark (str): benchmark name Returns: dict: a dictionary of train_val, test dataframes and normalized name of the benchmark """ dataset = fuzzy_search(benchmark, self.dataset_names) data_path = os.path.join(self.path, dataset) if self.file_format == "csv": train = pd.read_csv(os.path.join(data_path, "train_val.csv")) test = pd.read_csv(os.path.join(data_path, "test.csv")) elif self.file_format == "pkl": train = pd.read_pickle(os.path.join(data_path, "train_val.pkl")) test = pd.read_pickle(os.path.join(data_path, "test.pkl")) return {"train_val": train, "test": test, "name": dataset}
[docs] def evaluate(self, pred, testing=True, benchmark=None, save_dict=True): """automatic evaluation function Args: pred (dict): a dictionary of benchmark name as the key and prediction array as the value testing (bool, optional): evaluate using testing set mode or validation set mode benchmark (str, optional): name of the benchmark save_dict (bool, optional): whether or not to save the evaluation result Returns: dict: a dictionary with key the benchmark name and value a dictionary of metrics to metric value Raises: ValueError: benchmark name not found """ if testing: # test set evaluation metric_dict = bm_metric_names[self.name] out = {} for data_name, pred_ in pred.items(): data_name = fuzzy_search(data_name, self.dataset_names) data_path = os.path.join(self.path, data_name) if self.file_format == "csv": test = pd.read_csv(os.path.join(data_path, "test.csv")) elif self.file_format == "pkl": test = pd.read_pickle(os.path.join(data_path, "test.pkl")) y = test.Y.values evaluator = eval("Evaluator(name = '" + metric_dict[data_name] + "')") out[data_name] = {metric_dict[data_name]: round(evaluator(y, pred_), 3)} # If reporting accuracy across target classes if "target_class" in test.columns: test["pred"] = pred_ for c in test["target_class"].unique(): data_name_subset = data_name + "_" + c test_subset = test[test["target_class"] == c] y_subset = test_subset.Y.values pred_subset = test_subset.pred.values evaluator = eval( "Evaluator(name = '" + metric_dict[data_name_subset] + "')" ) out[data_name_subset] = { metric_dict[data_name_subset]: round( evaluator(y_subset, pred_subset), 3 ) } return out else: # validation set evaluation if benchmark is None: raise ValueError( "Please specify the benchmark name for us to retrieve the standard metric!" ) data_name = fuzzy_search(benchmark, self.dataset_names) metric_dict = bm_metric_names[self.name] evaluator = eval("Evaluator(name = '" + metric_dict[data_name] + "')") return {metric_dict[data_name]: round(evaluator(true, pred), 3)}
[docs] def evaluate_many(self, preds, save_file_name=None, results_individual=None): """ This function returns the data in a format needed to submit to the Leaderboard Args: preds (list of dict): list of dictionary of predictions, each item is the input to the evaluate function. save_file_name (str, optional): file name to save the result results_individual (list of dictionary, optional): if you already have results generated for each run, simply input here so that this function won't call the evaluation function again Returns: dict: a dictionary where key is the benchmark name and value is another dictionary where the key is the metric name and value is a list [mean, std]. """ min_requirement = 5 if len(preds) < min_requirement: return ValueError( "Must have predictions from at least " + str(min_requirement) + " runs for leaderboard submission" ) if results_individual is None: individual_results = [] for pred in preds: retval = self.evaluate(pred) individual_results.append(retval) else: individual_results = results_individual given_dataset_names = list(individual_results[0].keys()) aggregated_results = {} for dataset_name in given_dataset_names: my_results = [] for individual_result in individual_results: my_result = list(individual_result[dataset_name].values())[0] my_results.append(my_result) u = np.mean(my_results) std = np.std(my_results) aggregated_results[dataset_name] = [round(u, 3), round(std, 3)] return aggregated_results