Source code for tdc.benchmark_group.base_group

# -*- coding: utf-8 -*-
# Author: TDC Team
# License: MIT

import pandas as pd
import numpy as np
import os, sys, json 
import warnings

from ..utils import bm_group_load, print_sys, fuzzy_search
from ..utils import create_fold,\
from ..metadata import get_task2category, bm_metric_names, benchmark_names, bm_split_names, docking_target_info
from ..evaluator import Evaluator

[docs]class BenchmarkGroup: """Boilerplate of benchmark group class. It downloads, processes, and loads a set of benchmark classes along with their splits. It also provides evaluators and train/valid splitters. """ def __init__(self, name, path = './data', file_format='csv'): """create a benchmark group class object Args: name (str): the name of the benchmark group class path (str, optional): the path to save/load the benchkmark group dataset file_format (str, optional): designated file format for each dataset in the benchmark group """ = bm_group_load(name, path) self.path = os.path.join(path, self.datasets = benchmark_names[] self.dataset_names = [] self.file_format = file_format for task, datasets in self.datasets.items(): for dataset in datasets: self.dataset_names.append(dataset) def __iter__(self): """iterator implementation to iterate over all benchmarks in the benchmark group Returns: BenchmarkGroup: self """ self.index = 0 self.num_datasets = len(self.dataset_names) return self def __next__(self): """iterator implementation to define the next benchmark Returns: dict: a dictionary of key values in a benchmark, namely the train_val file, test file and benchmark name Raises: StopIteration: stop when exceed the number of benchmarks """ if self.index < self.num_datasets: dataset = self.dataset_names[self.index] print_sys('--- ' + dataset + ' ---') data_path = os.path.join(self.path, dataset) if not os.path.exists(data_path): os.mkdir(data_path) if self.file_format == 'csv': train = pd.read_csv(os.path.join(data_path, 'train_val.csv')) test = pd.read_csv(os.path.join(data_path, 'test.csv')) elif self.file_format == 'pkl': train = pd.read_pickle(os.path.join(data_path, 'train_val.pkl')) test = pd.read_pickle(os.path.join(data_path, 'test.pkl')) self.index += 1 return {'train_val': train, 'test': test, 'name': dataset} else: raise StopIteration
[docs] def get_train_valid_split(self, seed, benchmark, split_type = 'default'): """obtain training and validation split given a split type from train_val file Args: seed (int): the random seed of the data split benchmark (str): name of the benchmark split_type (str, optional): name of the split Returns: pd.DataFrame: the training and validation files Raises: NotImplementedError: split method not implemented """ print_sys('generating training, validation splits...') dataset = fuzzy_search(benchmark, self.dataset_names) data_path = os.path.join(self.path, dataset) if self.file_format == 'csv': train_val = pd.read_csv(os.path.join(data_path, 'train_val.csv')) elif self.file_format == 'pkl': train_val = pd.read_pickle(os.path.join(data_path, 'train_val.pkl')) if split_type == 'default': split_method = bm_split_names[][dataset] else: split_method = split_type frac = [0.875, 0.125, 0.0] if split_method == 'scaffold': out = create_scaffold_split(train_val, seed, frac = frac, entity = 'Drug') elif split_method == 'random': out = create_fold(train_val, seed, frac = frac) elif split_method == 'combination': out = create_combination_split(train_val, seed, frac=frac) elif split_method == 'group': out = create_group_split(train_val, seed, holdout_frac = 0.2, group_column = 'Year') else: raise NotImplementedError return out['train'], out['valid']
[docs] def get(self, benchmark): """get individual benchmark Args: benchmark (str): benchmark name Returns: dict: a dictionary of train_val, test dataframes and normalized name of the benchmark """ dataset = fuzzy_search(benchmark, self.dataset_names) data_path = os.path.join(self.path, dataset) if self.file_format == 'csv': train = pd.read_csv(os.path.join(data_path, 'train_val.csv')) test = pd.read_csv(os.path.join(data_path, 'test.csv')) elif self.file_format == 'pkl': train = pd.read_pickle(os.path.join(data_path, 'train_val.pkl')) test = pd.read_pickle(os.path.join(data_path, 'test.pkl')) return {'train_val': train, 'test': test, 'name': dataset}
[docs] def evaluate(self, pred, testing = True, benchmark = None, save_dict = True): """automatic evaluation function Args: pred (dict): a dictionary of benchmark name as the key and prediction array as the value testing (bool, optional): evaluate using testing set mode or validation set mode benchmark (str, optional): name of the benchmark save_dict (bool, optional): whether or not to save the evaluation result Returns: dict: a dictionary with key the benchmark name and value a dictionary of metrics to metric value Raises: ValueError: benchmark name not found """ if testing: # test set evaluation metric_dict = bm_metric_names[] out = {} for data_name, pred_ in pred.items(): data_name = fuzzy_search(data_name, self.dataset_names) data_path = os.path.join(self.path, data_name) if self.file_format == 'csv': test = pd.read_csv(os.path.join(data_path, 'test.csv')) elif self.file_format == 'pkl': test = pd.read_pickle(os.path.join(data_path, 'test.pkl')) y = test.Y.values evaluator = eval('Evaluator(name = \'' + metric_dict[data_name] + '\')') out[data_name] = {metric_dict[data_name]: round(evaluator(y, pred_), 3)} # If reporting accuracy across target classes if 'target_class' in test.columns: test['pred'] = pred_ for c in test['target_class'].unique(): data_name_subset = data_name + '_' + c test_subset = test[test['target_class']==c] y_subset = test_subset.Y.values pred_subset = test_subset.pred.values evaluator = eval('Evaluator(name = \'' + metric_dict[data_name_subset] + '\')') out[data_name_subset] = {metric_dict[data_name_subset]: round(evaluator(y_subset, pred_subset), 3)} return out else: # validation set evaluation if benchmark is None: raise ValueError('Please specify the benchmark name for us to retrieve the standard metric!') data_name = fuzzy_search(benchmark, self.dataset_names) metric_dict = bm_metric_names[] evaluator = eval('Evaluator(name = \'' + metric_dict[data_name] + '\')') return {metric_dict[data_name]: round(evaluator(true, pred), 3)}
[docs] def evaluate_many(self, preds, save_file_name = None, results_individual = None): """ This function returns the data in a format needed to submit to the Leaderboard Args: preds (list of dict): list of dictionary of predictions, each item is the input to the evaluate function. save_file_name (str, optional): file name to save the result results_individual (list of dictionary, optional): if you already have results generated for each run, simply input here so that this function won't call the evaluation function again Returns: dict: a dictionary where key is the benchmark name and value is another dictionary where the key is the metric name and value is a list [mean, std]. """ min_requirement = 5 if len(preds) < min_requirement: return ValueError("Must have predictions from at least " + str(min_requirement) + " runs for leaderboard submission") if results_individual is None: individual_results = [] for pred in preds: retval = self.evaluate(pred) individual_results.append(retval) else: individual_results = results_individual given_dataset_names = list(individual_results[0].keys()) aggregated_results = {} for dataset_name in given_dataset_names: my_results = [] for individual_result in individual_results: my_result = list(individual_result[dataset_name].values())[0] my_results.append(my_result) u = np.mean(my_results) std = np.std(my_results) aggregated_results[dataset_name] = [round(u, 3), round(std, 3)] return aggregated_results