import pandas as pd
import numpy as np
import os, sys, json
import warnings
warnings.filterwarnings("ignore")
from .utils import fuzzy_search
from .metadata import evaluator_name, distribution_oracles
try:
from sklearn.metrics import (
roc_auc_score,
f1_score,
average_precision_score,
precision_score,
recall_score,
accuracy_score,
precision_recall_curve,
)
from sklearn.metrics import (
mean_squared_error,
mean_absolute_error,
r2_score,
cohen_kappa_score,
auc,
roc_curve,
)
except:
ImportError(
"Please install sklearn by 'conda install -c anaconda scikit-learn' or 'pip install scikit-learn '! "
)
[docs]def avg_auc(y_true, y_pred):
scores = []
for i in range(np.array(y_true).shape[0]):
scores.append(roc_auc_score(y_true[i], y_pred[i]))
return sum(scores) / len(scores)
[docs]def rmse(y_true, y_pred):
return np.sqrt(mean_squared_error(y_true, y_pred))
[docs]def recall_at_precision_k(y_true, y_pred, threshold=0.9):
pr, rc, thr = precision_recall_curve(y_true, y_pred)
if len(np.where(pr >= threshold)[0]) > 0:
return rc[np.where(pr >= threshold)[0][0]]
else:
return 0.0
[docs]def precision_at_recall_k(y_true, y_pred, threshold=0.9):
pr, rc, thr = precision_recall_curve(y_true, y_pred)
if len(np.where(rc >= threshold)[0]) > 0:
return pr[np.where(rc >= threshold)[0][-1]]
else:
return 0.0
[docs]def pcc(y_true, y_pred):
return np.corrcoef(y_true, y_pred)[1, 0]
[docs]def range_logAUC(true_y, predicted_score, FPR_range=(0.001, 0.1)):
"""
Author: Yunchao "Lance" Liu (lanceknight26@gmail.com)
Calculate logAUC in a certain FPR range (default range: [0.001, 0.1]).
This was used by previous methods [1] and the reason is that only a
small percentage of samples can be selected for experimental tests in
consideration of cost. This means only molecules with very high
predicted score can be worth testing, i.e., the decision
threshold is high. And the high decision threshold corresponds to the
left side of the ROC curve, i.e., those FPRs with small values. Also,
because the threshold cannot be predetermined, the area under the curve
is used to consolidate all possible thresholds within a certain FPR
range. Finally, the logarithm is used to bias smaller FPRs. The higher
the logAUC[0.001, 0.1], the better the performance.
A perfect classifer gets a logAUC[0.001, 0.1] ) of 1, while a random
classifer gets a logAUC[0.001, 0.1] ) of around 0.0215 (See [2])
References:
[1] Mysinger, M.M. and B.K. Shoichet, Rapid Context-Dependent Ligand
Desolvation in Molecular Docking. Journal of Chemical Information and
Modeling, 2010. 50(9): p. 1561-1573.
[2] Liu, Yunchao, et al. "Interpretable Chirality-Aware Graph Neural
Network for Quantitative Structure Activity Relationship Modeling in
Drug Discovery." bioRxiv (2022).
:param true_y: numpy array of the ground truth. Values are either 0
(inactive) or 1(active).
:param predicted_score: numpy array of the predicted score (The
score does not have to be between 0 and 1)
:param FPR_range: the range for calculating the logAUC formated in
(x, y) with x being the lower bound and y being the upper bound
:return: a numpy array of logAUC of size [1,1]
"""
# FPR range validity check
if FPR_range == None:
raise Exception("FPR range cannot be None")
lower_bound = FPR_range[0]
upper_bound = FPR_range[1]
if lower_bound >= upper_bound:
raise Exception("FPR upper_bound must be greater than lower_bound")
fpr, tpr, thresholds = roc_curve(true_y, predicted_score, pos_label=1)
tpr = np.append(tpr, np.interp([lower_bound, upper_bound], fpr, tpr))
fpr = np.append(fpr, [lower_bound, upper_bound])
# Sort both x-, y-coordinates array
tpr = np.sort(tpr)
fpr = np.sort(fpr)
# Get the data points' coordinates. log_fpr is the x coordinate, tpr is the y coordinate.
log_fpr = np.log10(fpr)
x = log_fpr
y = tpr
lower_bound = np.log10(lower_bound)
upper_bound = np.log10(upper_bound)
# Get the index of the lower and upper bounds
lower_bound_idx = np.where(x == lower_bound)[-1][-1]
upper_bound_idx = np.where(x == upper_bound)[-1][-1]
# Create a new array trimmed at the lower and upper bound
trim_x = x[lower_bound_idx : upper_bound_idx + 1]
trim_y = y[lower_bound_idx : upper_bound_idx + 1]
area = auc(trim_x, trim_y) / (upper_bound - lower_bound)
return area
## code source check here https://github.com/charnley/rmsd
[docs]def centroid(X):
"""
Centroid is the mean position of all the points in all of the coordinate
directions, from a vectorset X.
https://en.wikipedia.org/wiki/Centroid
C = sum(X)/len(X)
Parameters
----------
X : array
(N,D) matrix, where N is points and D is dimension.
Returns
-------
C : float
centroid
"""
C = X.mean(axis=0)
return C
[docs]def rmsd(V, W):
"""
Calculate Root-mean-square deviation from two sets of vectors V and W.
Parameters
----------
V : array
(N,D) matrix, where N is points and D is dimension.
W : array
(N,D) matrix, where N is points and D is dimension.
Returns
-------
rmsd : float
Root-mean-square deviation between the two vectors
"""
diff = np.array(V) - np.array(W)
N = len(V)
return np.sqrt((diff * diff).sum() / N)
[docs]def kabsch(P, Q):
"""
Using the Kabsch algorithm with two sets of paired point P and Q, centered
around the centroid. Each vector set is represented as an NxD
matrix, where D is the the dimension of the space.
The algorithm works in three steps:
- a centroid translation of P and Q (assumed done before this function
call)
- the computation of a covariance matrix C
- computation of the optimal rotation matrix U
For more info see http://en.wikipedia.org/wiki/Kabsch_algorithm
Parameters
----------
P : array
(N,D) matrix, where N is points and D is dimension.
Q : array
(N,D) matrix, where N is points and D is dimension.
Returns
-------
U : matrix
Rotation matrix (D,D)
"""
# Computation of the covariance matrix
C = np.dot(np.transpose(P), Q)
# Computation of the optimal rotation matrix
# This can be done using singular value decomposition (SVD)
# Getting the sign of the det(V)*(W) to decide
# whether we need to correct our rotation matrix to ensure a
# right-handed coordinate system.
# And finally calculating the optimal rotation matrix U
# see http://en.wikipedia.org/wiki/Kabsch_algorithm
V, S, W = np.linalg.svd(C)
d = (np.linalg.det(V) * np.linalg.det(W)) < 0.0
if d:
S[-1] = -S[-1]
V[:, -1] = -V[:, -1]
# Create Rotation matrix U
U = np.dot(V, W)
return U
[docs]def kabsch_rmsd(P, Q, W=None, translate=False):
"""
Rotate matrix P unto Q using Kabsch algorithm and calculate the RMSD.
An optional vector of weights W may be provided.
Parameters
----------
P : array
(N,D) matrix, where N is points and D is dimension.
Q : array
(N,D) matrix, where N is points and D is dimension.
W : array or None
(N) vector, where N is points.
translate : bool
Use centroids to translate vector P and Q unto each other.
Returns
-------
rmsd : float
root-mean squared deviation
"""
if translate:
Q = Q - centroid(Q)
P = P - centroid(P)
if W is not None:
return kabsch_weighted_rmsd(P, Q, W)
P = kabsch_rotate(P, Q)
return rmsd(P, Q)
[docs]def kabsch_rotate(P, Q):
"""
Rotate matrix P unto matrix Q using Kabsch algorithm.
Parameters
----------
P : array
(N,D) matrix, where N is points and D is dimension.
Q : array
(N,D) matrix, where N is points and D is dimension.
Returns
-------
P : array
(N,D) matrix, where N is points and D is dimension,
rotated
"""
U = kabsch(P, Q)
# Rotate P
P = np.dot(P, U)
return P
[docs]def kabsch_weighted_rmsd(P, Q, W=None):
"""
Calculate the RMSD between P and Q with optional weighhts W
Parameters
----------
P : array
(N,D) matrix, where N is points and D is dimension.
Q : array
(N,D) matrix, where N is points and D is dimension.
W : vector
(N) vector, where N is points
Returns
-------
RMSD : float
"""
R, T, w_rmsd = kabsch_weighted(P, Q, W)
return w_rmsd
[docs]def kabsch_weighted(P, Q, W=None):
"""
Using the Kabsch algorithm with two sets of paired point P and Q.
Each vector set is represented as an NxD matrix, where D is the
dimension of the space.
An optional vector of weights W may be provided.
Note that this algorithm does not require that P and Q have already
been overlayed by a centroid translation.
The function returns the rotation matrix U, translation vector V,
and RMS deviation between Q and P', where P' is:
P' = P * U + V
For more info see http://en.wikipedia.org/wiki/Kabsch_algorithm
Parameters
----------
P : array
(N,D) matrix, where N is points and D is dimension.
Q : array
(N,D) matrix, where N is points and D is dimension.
W : array or None
(N) vector, where N is points.
Returns
-------
U : matrix
Rotation matrix (D,D)
V : vector
Translation vector (D)
RMSD : float
Root mean squared deviation between P and Q
"""
# Computation of the weighted covariance matrix
CMP = np.zeros(3)
CMQ = np.zeros(3)
C = np.zeros((3, 3))
if W is None:
W = np.ones(len(P)) / len(P)
W = np.array([W, W, W]).T
# NOTE UNUSED psq = 0.0
# NOTE UNUSED qsq = 0.0
iw = 3.0 / W.sum()
n = len(P)
for i in range(3):
for j in range(n):
for k in range(3):
C[i, k] += P[j, i] * Q[j, k] * W[j, i]
CMP = (P * W).sum(axis=0)
CMQ = (Q * W).sum(axis=0)
PSQ = (P * P * W).sum() - (CMP * CMP).sum() * iw
QSQ = (Q * Q * W).sum() - (CMQ * CMQ).sum() * iw
C = (C - np.outer(CMP, CMQ) * iw) * iw
# Computation of the optimal rotation matrix
# This can be done using singular value decomposition (SVD)
# Getting the sign of the det(V)*(W) to decide
# whether we need to correct our rotation matrix to ensure a
# right-handed coordinate system.
# And finally calculating the optimal rotation matrix U
# see http://en.wikipedia.org/wiki/Kabsch_algorithm
V, S, W = np.linalg.svd(C)
d = (np.linalg.det(V) * np.linalg.det(W)) < 0.0
if d:
S[-1] = -S[-1]
V[:, -1] = -V[:, -1]
# Create Rotation matrix U, translation vector V, and calculate RMSD:
U = np.dot(V, W)
msd = (PSQ + QSQ) * iw - 2.0 * S.sum()
if msd < 0.0:
msd = 0.0
rmsd_ = np.sqrt(msd)
V = np.zeros(3)
for i in range(3):
t = (U[i, :] * CMQ).sum()
V[i] = CMP[i] - t
V = V * iw
return U, V, rmsd_
[docs]class Evaluator:
"""evaluator to evaluate predictions
Args:
name (str): the name of the evaluator function
"""
def __init__(self, name):
"""create an evaluate object"""
self.name = fuzzy_search(name, evaluator_name)
self.assign_evaluator()
[docs] def assign_evaluator(self):
"""obtain evaluator function given the evaluator name"""
if self.name == "roc-auc":
self.evaluator_func = roc_auc_score
elif self.name == "f1":
self.evaluator_func = f1_score
elif self.name == "pr-auc":
self.evaluator_func = average_precision_score
elif self.name == "rp@k":
self.evaluator_func = recall_at_precision_k
elif self.name == "pr@k":
self.evaluator_func = precision_at_recall_k
elif self.name == "precision":
self.evaluator_func = precision_score
elif self.name == "recall":
self.evaluator_func = recall_score
elif self.name == "accuracy":
self.evaluator_func = accuracy_score
elif self.name == "mse":
self.evaluator_func = mean_squared_error
elif self.name == "rmse":
self.evaluator_func = rmse
elif self.name == "mae":
self.evaluator_func = mean_absolute_error
elif self.name == "r2":
self.evaluator_func = r2_score
elif self.name == "pcc":
self.evaluator_func = pcc
elif self.name == "spearman":
try:
from scipy import stats
except:
ImportError("Please install scipy by 'pip install scipy'! ")
self.evaluator_func = stats.spearmanr
elif self.name == "micro-f1":
self.evaluator_func = f1_score
elif self.name == "macro-f1":
self.evaluator_func = f1_score
elif self.name == "kappa":
self.evaluator_func = cohen_kappa_score
elif self.name == "avg-roc-auc":
self.evaluator_func = avg_auc
elif self.name == "novelty":
from .chem_utils import novelty
self.evaluator_func = novelty
elif self.name == "diversity":
from .chem_utils import diversity
self.evaluator_func = diversity
elif self.name == "validity":
from .chem_utils import validity
self.evaluator_func = validity
elif self.name == "uniqueness":
from .chem_utils import uniqueness
self.evaluator_func = uniqueness
elif self.name == "kl_divergence":
from .chem_utils import kl_divergence
self.evaluator_func = kl_divergence
elif self.name == "fcd_distance":
from .chem_utils import fcd_distance
self.evaluator_func = fcd_distance
elif self.name == "range_logAUC":
self.evaluator_func = range_logAUC
elif self.name == "rmsd":
self.evaluator_func = rmsd
elif self.name == "kabsch_rmsd":
self.evaluator_func = kabsch_rmsd
def __call__(self, *args, **kwargs):
"""call the evaluator function on targets and predictions
Args:
*args: targets, predictions, and other information
**kwargs: other auxilliary inputs for some evaluators
Returns:
float: the evaluator output
"""
if self.name in distribution_oracles:
return self.evaluator_func(*args, **kwargs)
# #### evaluator for distribution learning, e.g., diversity, validity
y_true = kwargs["y_true"] if "y_true" in kwargs else args[0]
y_pred = kwargs["y_pred"] if "y_pred" in kwargs else args[1]
if len(args) <= 2 and "threshold" not in kwargs:
threshold = 0.5
else:
threshold = kwargs["threshold"] if "threshold" in kwargs else args[2]
### original __call__(self, y_true, y_pred, threshold = 0.5)
y_true = np.array(y_true)
y_pred = np.array(y_pred)
if self.name in ["precision", "recall", "f1", "accuracy"]:
y_pred = [1 if i > threshold else 0 for i in y_pred]
if self.name in ["micro-f1", "macro-f1"]:
return self.evaluator_func(y_true, y_pred, average=self.name[:5])
if self.name in ["rp@k", "pr@k"]:
return self.evaluator_func(y_true, y_pred, threshold=threshold)
if self.name == "spearman":
return self.evaluator_func(y_true, y_pred)[0]
return self.evaluator_func(y_true, y_pred)