# -*- coding: utf-8 -*-
# Author: TDC Team
# License: MIT
import pandas as pd
import numpy as np
import os, sys, json
import warnings
warnings.filterwarnings("ignore")
from ..utils import bm_group_load, print_sys, fuzzy_search
from ..utils import create_fold,\
create_fold_setting_cold,\
create_combination_split,\
create_fold_time,\
create_scaffold_split,\
create_group_split
from ..metadata import get_task2category, bm_metric_names, benchmark_names, bm_split_names, docking_target_info
from ..evaluator import Evaluator
[docs]class BenchmarkGroup:
"""Boilerplate of benchmark group class. It downloads, processes, and loads a set of benchmark classes along with their splits. It also provides evaluators and train/valid splitters.
"""
def __init__(self, name, path = './data', file_format='csv'):
"""create a benchmark group class object
Args:
name (str): the name of the benchmark group class
path (str, optional): the path to save/load the benchkmark group dataset
file_format (str, optional): designated file format for each dataset in the benchmark group
"""
self.name = bm_group_load(name, path)
self.path = os.path.join(path, self.name)
self.datasets = benchmark_names[self.name]
self.dataset_names = []
self.file_format = file_format
for task, datasets in self.datasets.items():
for dataset in datasets:
self.dataset_names.append(dataset)
def __iter__(self):
"""iterator implementation to iterate over all benchmarks in the benchmark group
Returns:
BenchmarkGroup: self
"""
self.index = 0
self.num_datasets = len(self.dataset_names)
return self
def __next__(self):
"""iterator implementation to define the next benchmark
Returns:
dict: a dictionary of key values in a benchmark, namely the train_val file, test file and benchmark name
Raises:
StopIteration: stop when exceed the number of benchmarks
"""
if self.index < self.num_datasets:
dataset = self.dataset_names[self.index]
print_sys('--- ' + dataset + ' ---')
data_path = os.path.join(self.path, dataset)
if not os.path.exists(data_path):
os.mkdir(data_path)
if self.file_format == 'csv':
train = pd.read_csv(os.path.join(data_path, 'train_val.csv'))
test = pd.read_csv(os.path.join(data_path, 'test.csv'))
elif self.file_format == 'pkl':
train = pd.read_pickle(os.path.join(data_path, 'train_val.pkl'))
test = pd.read_pickle(os.path.join(data_path, 'test.pkl'))
self.index += 1
return {'train_val': train, 'test': test, 'name': dataset}
else:
raise StopIteration
[docs] def get_train_valid_split(self, seed, benchmark, split_type = 'default'):
"""obtain training and validation split given a split type from train_val file
Args:
seed (int): the random seed of the data split
benchmark (str): name of the benchmark
split_type (str, optional): name of the split
Returns:
pd.DataFrame: the training and validation files
Raises:
NotImplementedError: split method not implemented
"""
print_sys('generating training, validation splits...')
dataset = fuzzy_search(benchmark, self.dataset_names)
data_path = os.path.join(self.path, dataset)
if self.file_format == 'csv':
train_val = pd.read_csv(os.path.join(data_path, 'train_val.csv'))
elif self.file_format == 'pkl':
train_val = pd.read_pickle(os.path.join(data_path, 'train_val.pkl'))
if split_type == 'default':
split_method = bm_split_names[self.name][dataset]
else:
split_method = split_type
frac = [0.875, 0.125, 0.0]
if split_method == 'scaffold':
out = create_scaffold_split(train_val, seed, frac = frac, entity = 'Drug')
elif split_method == 'random':
out = create_fold(train_val, seed, frac = frac)
elif split_method == 'combination':
out = create_combination_split(train_val, seed, frac=frac)
elif split_method == 'group':
out = create_group_split(train_val, seed, holdout_frac = 0.2, group_column = 'Year')
else:
raise NotImplementedError
return out['train'], out['valid']
[docs] def get(self, benchmark):
"""get individual benchmark
Args:
benchmark (str): benchmark name
Returns:
dict: a dictionary of train_val, test dataframes and normalized name of the benchmark
"""
dataset = fuzzy_search(benchmark, self.dataset_names)
data_path = os.path.join(self.path, dataset)
if self.file_format == 'csv':
train = pd.read_csv(os.path.join(data_path, 'train_val.csv'))
test = pd.read_csv(os.path.join(data_path, 'test.csv'))
elif self.file_format == 'pkl':
train = pd.read_pickle(os.path.join(data_path, 'train_val.pkl'))
test = pd.read_pickle(os.path.join(data_path, 'test.pkl'))
return {'train_val': train, 'test': test, 'name': dataset}
[docs] def evaluate(self, pred, testing = True, benchmark = None, save_dict = True):
"""automatic evaluation function
Args:
pred (dict): a dictionary of benchmark name as the key and prediction array as the value
testing (bool, optional): evaluate using testing set mode or validation set mode
benchmark (str, optional): name of the benchmark
save_dict (bool, optional): whether or not to save the evaluation result
Returns:
dict: a dictionary with key the benchmark name and value a dictionary of metrics to metric value
Raises:
ValueError: benchmark name not found
"""
if testing:
# test set evaluation
metric_dict = bm_metric_names[self.name]
out = {}
for data_name, pred_ in pred.items():
data_name = fuzzy_search(data_name, self.dataset_names)
data_path = os.path.join(self.path, data_name)
if self.file_format == 'csv':
test = pd.read_csv(os.path.join(data_path, 'test.csv'))
elif self.file_format == 'pkl':
test = pd.read_pickle(os.path.join(data_path, 'test.pkl'))
y = test.Y.values
evaluator = eval('Evaluator(name = \'' + metric_dict[data_name] + '\')')
out[data_name] = {metric_dict[data_name]: round(evaluator(y, pred_), 3)}
# If reporting accuracy across target classes
if 'target_class' in test.columns:
test['pred'] = pred_
for c in test['target_class'].unique():
data_name_subset = data_name + '_' + c
test_subset = test[test['target_class']==c]
y_subset = test_subset.Y.values
pred_subset = test_subset.pred.values
evaluator = eval('Evaluator(name = \'' +
metric_dict[data_name_subset] + '\')')
out[data_name_subset] = {metric_dict[data_name_subset]:
round(evaluator(y_subset, pred_subset), 3)}
return out
else:
# validation set evaluation
if benchmark is None:
raise ValueError('Please specify the benchmark name for us to retrieve the standard metric!')
data_name = fuzzy_search(benchmark, self.dataset_names)
metric_dict = bm_metric_names[self.name]
evaluator = eval('Evaluator(name = \'' + metric_dict[data_name] + '\')')
return {metric_dict[data_name]: round(evaluator(true, pred), 3)}
[docs] def evaluate_many(self, preds, save_file_name = None, results_individual = None):
"""
This function returns the data in a format needed to submit to the Leaderboard
Args:
preds (list of dict): list of dictionary of predictions, each item is the input to the evaluate function.
save_file_name (str, optional): file name to save the result
results_individual (list of dictionary, optional): if you already have results generated for each run, simply input here so that this function won't call the evaluation function again
Returns:
dict: a dictionary where key is the benchmark name and value is another dictionary where the key is the metric name and value is a list [mean, std].
"""
min_requirement = 5
if len(preds) < min_requirement:
return ValueError("Must have predictions from at least " + str(min_requirement) + " runs for leaderboard submission")
if results_individual is None:
individual_results = []
for pred in preds:
retval = self.evaluate(pred)
individual_results.append(retval)
else:
individual_results = results_individual
given_dataset_names = list(individual_results[0].keys())
aggregated_results = {}
for dataset_name in given_dataset_names:
my_results = []
for individual_result in individual_results:
my_result = list(individual_result[dataset_name].values())[0]
my_results.append(my_result)
u = np.mean(my_results)
std = np.std(my_results)
aggregated_results[dataset_name] = [round(u, 3), round(std, 3)]
return aggregated_results