Source code for immuneML.ml_methods.ProbabilisticBinaryClassifier

import copy
import pickle
import warnings
from pathlib import Path
from typing import Tuple

import numpy as np
import yaml
from scipy.special import beta as beta_func
from scipy.special import betaln as beta_func_ln
from scipy.special import digamma
from scipy.stats import betabinom as beta_binomial

from immuneML.data_model.encoded_data.EncodedData import EncodedData
from immuneML.environment.Label import Label
from immuneML.ml_methods.MLMethod import MLMethod
from immuneML.ml_methods.util.Util import Util
from immuneML.util.FilenameHandler import FilenameHandler
from immuneML.util.PathBuilder import PathBuilder


[docs] class ProbabilisticBinaryClassifier(MLMethod): """ ProbabilisticBinaryClassifier predicts the class assignment in binary classification case based on encoding examples by number of successful trials and total number of trials. It models this ratio by one beta distribution per class and predicts the class of the new examples using log-posterior odds ratio with threshold at 0. ProbabilisticBinaryClassifier is based on the paper (details on the classification can be found in the Online Methods section): Emerson, Ryan O., William S. DeWitt, Marissa Vignali, Jenna Gravley, Joyce K. Hu, Edward J. Osborne, Cindy Desmarais, et al. ‘Immunosequencing Identifies Signatures of Cytomegalovirus Exposure History and HLA-Mediated Effects on the T Cell Repertoire’. Nature Genetics 49, no. 5 (May 2017): 659–65. `doi.org/10.1038/ng.3822 <https://doi.org/10.1038/ng.3822>`_. Arguments: max_iterations (int): maximum number of iterations while optimizing the parameters of the beta distribution (same for both classes) update_rate (float): how much the computed gradient should influence the updated value of the parameters of the beta distribution likelihood_threshold (float): at which threshold to stop the optimization (default -1e-10) YAML specification: .. indent with spaces .. code-block:: yaml my_probabilistic_classifier: # user-defined name of the ML method ProbabilisticBinaryClassifier: # method name max_iterations: 1000 update_rate: 0.01 """ SMALL_POSITIVE_NUMBER = 1e-15 def __init__(self, max_iterations: int = None, update_rate: float = None, likelihood_threshold: float = None): super().__init__() self.max_iterations = max_iterations self.update_rate = update_rate self.N_0 = None self.N_1 = None self.alpha_0 = None self.alpha_1 = None self.beta_0 = None self.beta_1 = None self.likelihood_threshold = likelihood_threshold if likelihood_threshold is not None else -1e-10 self.class_mapping = None self.label = None self.feature_names = None
[docs] def fit(self, encoded_data: EncodedData, label: Label, cores_for_training: int = 2): self.feature_names = encoded_data.feature_names X = encoded_data.examples assert X.shape[1] == 2, "ProbabilisticBinaryClassifier: the shape of the input is not compatible with the classifier. " \ "The classifier is defined when examples are encoded by two counts: the number of successful trials " \ "and the total number of trials. If this is not targeted use-case and the encoding, please consider using " \ "another classifier." self.class_mapping = Util.make_binary_class_mapping(encoded_data.labels[label.name], label.positive_class) self.label = label self.N_0 = int(np.sum(np.array(encoded_data.labels[label.name]) == self.class_mapping[0])) self.N_1 = int(np.sum(np.array(encoded_data.labels[label.name]) == self.class_mapping[1])) self.alpha_0, self.beta_0 = self._find_beta_distribution_parameters( X[np.nonzero(np.array(encoded_data.labels[self.label.name]) == self.class_mapping[0])], self.N_0) self.alpha_1, self.beta_1 = self._find_beta_distribution_parameters( X[np.nonzero(np.array(encoded_data.labels[self.label.name]) == self.class_mapping[1])], self.N_1)
[docs] def fit_by_cross_validation(self, encoded_data: EncodedData, number_of_splits: int = 5, label: Label = None, cores_for_training: int = -1, optimization_metric=None): warnings.warn("ProbabilisticBinaryClassifier: cross-validation on this classifier is not defined: fitting one model instead...") self.fit(encoded_data, label)
[docs] def predict(self, encoded_data: EncodedData, label: Label): """ Predict the class assignment for examples in X (where X is validation or test set - examples not seen during training). .. math:: \\widehat{c} \\, (k, n) = \\left\\{\\begin{matrix} 0, & F(k, n) \\leq 0\\\\ 1, & F(k, n) > 0 \\end{matrix}\\right Arguments: encoded_data (EncodedData): EncodedData object with examples attribute which is a design matrix of shape [number of examples x number of features], where number of features is 2 (the first feature is the number of disease-associated sequences and the second is the total number of sequences per example) label (str): the label used for classification (e.g. CMV) Returns: a dictionary of the following format: {label_name: predictions} where predictions is a list of predicted classes for each example """ X = encoded_data.examples self._check_labels(label.name) predictions_list = [] for example in X: k, n = example[0], example[1] F = self._compute_log_posterior_odds_ratio(k, n) predicted_class = int(F > 0) predictions_list.append(self.class_mapping[predicted_class]) return {self.label.name: predictions_list}
[docs] def predict_proba(self, encoded_data: EncodedData, label: Label): """ Predict the probability of the class for examples in X. .. math:: \\widehat{c} \\, (k, n) = '\\left\\{\\begin{matrix} 0, & F(k, n) \\leq 0\\ 1, & F(k, n) > 0 \\end{matrix}\\right Arguments: encoded_data (EncodedData): EncodedData object with examples attribute which is a design matrix of shape, where number of features is 2 (the first feature is the number of disease-associated sequences and the second is the total number of sequences per example) label (str): the label used for classification (e.g. CMV) Returns: class probabilities for all examples in X """ self._check_labels(label.name) X = encoded_data.examples class_probabilities = np.zeros((X.shape[0], len(list(self.class_mapping.keys()))), dtype=float) for index, example in enumerate(X): k, n = example[0], example[1] posterior_class_probabilities = self._compute_posterior_class_probability(k, n) class_probabilities[index] = posterior_class_probabilities return {label.name: {self.class_mapping[i]: class_probabilities[:, i] for i in range(class_probabilities.shape[1])}}
def _find_beta_distribution_parameters(self, X, N_l: int) -> Tuple[float, float]: """ Function implementing gradient ascent to find parameters of the beta distribution for the given class. It maximizes the following log-likelihood: .. math:: l_l (\\alpha, \\beta) = - N_l \\, log \\, B (\\alpha, \\beta) + \\sum_{i: c_i = l} log \\, B(k_i + \\alpha, n_i - k_i + \\beta), l = 0, 1 Arguments: X: design matrix of shape [number of examples x number of features], where number of features is 2 (the first feature is the number of disease-associated sequences and the second is the total number of sequences per example) N_l: number of examples in the given class Returns: estimated values of alpha and beta for the given class """ k_is, n_is = X[:, 0], X[:, 1] alpha, beta = self._initialize_beta_distribution_parameters(k_is, n_is) k_is, n_is = self._perform_laplace_smoothing(k_is, n_is) for iteration in range(self.max_iterations): log_likelihood = - N_l * beta_func(alpha, beta) + np.sum(beta_func_ln(k_is + alpha, n_is - k_is + beta)) if np.isnan(log_likelihood): raise RuntimeError(f"ProbabilisticBinaryClassifier: while estimating beta distribution parameters, " f"log_likelihood became nan in iteration {iteration}. \nalpha: {alpha}, beta: {beta}") elif log_likelihood > self.likelihood_threshold: break grad_alpha, grad_beta = self._compute_alpha_beta_gradients(N_l, alpha, beta, k_is, n_is) alpha = max(alpha + self.update_rate * grad_alpha, ProbabilisticBinaryClassifier.SMALL_POSITIVE_NUMBER) beta = max(beta + self.update_rate * grad_beta, ProbabilisticBinaryClassifier.SMALL_POSITIVE_NUMBER) return alpha, beta def _initialize_beta_distribution_parameters(self, k_is, n_is) -> Tuple[float, float]: """ Function using the method of moments to initialize the parameters of the beta distribution (estimating initial values for population from sample values) if variance is not 0, otherwise initializes both alpha and beta to 1 making all values in the domain of the distribution to have equal density. Initial parameter values as per the method of moments: .. math:: \\alpha = \\frac{E[X]^2 * (1-E[X])}{V[X]}-E[X] \\beta = (\\frac{E[X](1-E[X])}{V[X]} - 1) * (1 - E[X]) Arguments: k_is: number of disease-associated sequences per example n_is: total number of sequences per example Returns: initial values of parameters alpha and beta """ binomial_proportions_p = k_is / n_is mean = binomial_proportions_p.mean() variance = binomial_proportions_p.var() if variance != 0: alpha = np.square(mean) * (1 - mean) / variance - mean beta = (mean * (1 - mean) / variance - 1) * (1 - mean) else: alpha, beta = 1, 1 return alpha, beta def _compute_alpha_beta_gradients(self, N_l, alpha, beta, k_is, n_is) -> Tuple[float, float]: """ Function computing the gradients of alpha and beta parameters of the beta distribution to maximize log-likelihood: .. math:: \\frac{\\partial l_l}{\\partial \\alpha} = - N_l (\\Psi (\\alpha) - \\Psi (\\alpha + \\beta)) + \\sum_{i:c_i=l}^{} (\\Psi(k_i + \\alpha) - \\Psi(n_i + k_i + \\alpha + \\beta)) \\frac{\\partial l_l}{\\partial \\beta} = - N_l (\\Psi(\\beta) - \\Psi(\\alpha + \\beta)) + \\sum_{i:c_i=l} (\\Psi(n_i - k_i + \\beta) - \\Psi(n_i + k_i + \\alpha + \\beta)) Arguments: N_l: number of examples in the current class alpha: alpha parameter of beta distribution beta: beta parameter of beta distribution k_is: array of numbers of disease-associated sequences per training example n_is: array of total numbers of sequences per training example Returns: gradients for alpha and beta """ grad_alpha = - N_l * (digamma(alpha) - digamma(alpha + beta)) \ + np.sum([digamma(k_is[i] + alpha) - digamma(n_is[i] + k_is[i] + alpha + beta) for i in range(k_is.shape[0])]) grad_beta = - N_l * (digamma(beta) - digamma(alpha + beta)) \ + np.sum([digamma(n_is[i] - k_is[i] + beta) - digamma(n_is[i] + k_is[i] + alpha + beta) for i in range(k_is.shape[0])]) return grad_alpha, grad_beta def _perform_laplace_smoothing(self, k_is, n_is) -> Tuple[np.array, np.array]: """ Function performing Laplace smoothing of data, where it uses the most deeply sampled example in the class (example with maximum n) by adding the ratio of number of disease-associated sequences and total number of sequences for the example with maximum n to the number of disease-associated sequences for all examples, and 1 to the total number of sequences for all examples, thus regularizing the likelihood computed from these values and potentially avoiding numerical instabilities. If n_max is the total number of sequences in the example with the largest number of total sequences, and k_max is the number of disease-associated sequences for that same example, then the smoothing is performed in the following way for each example in the training dataset: .. math:: k_i' = k_i + k_{max} / n_{max} n_i' = n_i + 1 Arguments: k_is: array of numbers of disease-associated sequences per training example n_is: array of total numbers of sequences per training example Returns: Laplace-smoothed values of k_i and n_i """ regularizer_index = np.argmax(n_is) # index of max n regularizer_k = k_is[regularizer_index] # k corresponding to max n regularizer_n = n_is[regularizer_index] # max n regularized_k_is = copy.deepcopy(k_is) regularized_n_is = copy.deepcopy(n_is) regularized_k_is = regularized_k_is + regularizer_k / regularizer_n regularized_n_is = regularized_n_is + 1 return regularized_k_is, regularized_n_is def _compute_posterior_class_probability(self, k, n) -> Tuple[float, float]: """ For given parameters of beta distributions for both classes, computes the posterior class probabilities: .. math:: p(c' = x | n', k')= \\binom{n'}{k'} \\frac{B(k'+\\alpha_x, n' - k' + \\beta_x)}{B(\\alpha_x, \\beta_x)} \\frac{N_x + 1}{N + 2}, x=0,1 Arguments: k: number of disease-associated sequences n: total number of sequences Returns: a tuple of probabilities for negative class and positive class for given example, normalized to sum to 1 """ predicted_probability_0 = beta_binomial.pmf(k, n, self.alpha_0, self.beta_0) * (self.N_0 + 1) / (self.N_0 + self.N_1 + 2) predicted_probability_1 = beta_binomial.pmf(k, n, self.alpha_1, self.beta_1) * (self.N_1 + 1) / (self.N_0 + self.N_1 + 2) normalization_const = predicted_probability_0 + predicted_probability_1 if np.isnan(normalization_const): raise ValueError(f"{ProbabilisticBinaryClassifier.__name__}: encountered nan in predicted posterior class probabilities." f"\nprobability of class 0: {predicted_probability_0}\nprobability of class 1: {predicted_probability_1}\n" f"alpha 0: {self.alpha_0}, beta 0: {self.beta_0}\nalpha 1: {self.alpha_1}, beta 1: {self.beta_1}\n" f"positive example count: {self.N_1}, negative example count: {self.N_0}") elif normalization_const == 0: warnings.warn(f"{ProbabilisticBinaryClassifier.__name__}: posterior class probabilities for both classes are 0 (k={k}, n={n}). Returning " f"normalized values to indicate that the example could not be classified, by setting both probabilities to 0.5.", RuntimeWarning) return 0.5, 0.5 return predicted_probability_0 / normalization_const, predicted_probability_1 / normalization_const def _compute_log_posterior_odds_ratio(self, k, n): """ Function computing log-posterior odds ratio for class assignment for new example with parameters k and n: .. math:: F(k, n) = log \\, p (c=1|k,n) - log \\, p(c=0|k,n)) = log (N_1 + 1) - log(N_0 + 1) + log \\, B(\\alpha_0, \\beta_0) - log \\, B(\\alpha_1, \\beta_1) + log \\, B(k + \\alpha_1, n - k + \\beta_1) - log \\, B(k + \\alpha_0, n-k + \\beta_0) Arguments: k: number of disease-associated sequences n: total number of sequences Returns: log-posterior odds ratio for class assignment """ return np.log(self.N_1 + 1) - np.log(self.N_0 + 1) \ + beta_func_ln(self.alpha_0, self.beta_0) - beta_func_ln(self.alpha_1, self.beta_1) \ + beta_func_ln(k + self.alpha_1, n - k + self.beta_1) \ - beta_func_ln(k + self.alpha_0, n - k + self.beta_0) def _convert_object_to_dict(self): content = vars(self) result = {} for key, value in content.items(): if isinstance(value, np.ndarray): result[key] = value.tolist() elif value is None or isinstance(value, str) or isinstance(value, dict) or isinstance(value, list) or isinstance(value, Path): result[key] = value elif isinstance(value, Label): result[key] = value.name else: result[key] = float(value) return result
[docs] def store(self, path: Path, feature_names=None, details_path=None): content = self._convert_object_to_dict() PathBuilder.build(path) file_path = path / FilenameHandler.get_filename(self.__class__.__name__, "pickle") with file_path.open("wb") as file: pickle.dump(content, file) if details_path is None: params_path = path / FilenameHandler.get_filename(self.__class__.__name__, "yaml") else: params_path = details_path with params_path.open("w") as file: desc = {self.label.name: { **content, "feature_names": feature_names, "classes": list(self.class_mapping.values()) }} if self.label is not None: desc["label"] = self.label.get_desc_for_storage() yaml.dump(desc, file)
[docs] def load(self, path: Path): keys = list(vars(self).keys()) file_path = path / FilenameHandler.get_filename(self.__class__.__name__, "pickle") if file_path.is_file(): with file_path.open("rb") as file: content = pickle.load(file) assert all( key in keys for key in content.keys()), f"ProbabilisticBinaryClassifier: error while loading from {file_path}: " \ f"object attributes from file and from the class do not match.\n" \ f"Attributes from file: {list(content.keys())}\n" \ f"Attributes for object of class ProbabilisticBinaryClassifier: {keys}" for key in content: if key == "label": setattr(self, "label", Label(**content[key])) else: setattr(self, key, content[key]) else: raise FileNotFoundError(f"{self.__class__.__name__} model could not be loaded from {file_path}. " f"Check if the path to the {file_path.name} file is properly set.")
[docs] def get_params(self): return vars(self)
[docs] def check_if_exists(self, path): vals = vars(self).values() if any(val is None for val in vals): return False else: return True
def _check_labels(self, label_name): assert label_name == self.label.name, f"ProbabilisticBinaryClassifier: classifier cannot predict the labels " \ f"on which it was not trained: got: {label_name}, expected: {self.label.name}."
[docs] def get_label_name(self): return self.label.name
[docs] def get_package_info(self) -> str: return Util.get_immuneML_version()
[docs] def get_feature_names(self) -> list: return self.feature_names
[docs] def can_predict_proba(self) -> bool: return True
[docs] def get_classes(self) -> list: return list(self.class_mapping.values())
[docs] def get_class_mapping(self) -> dict: return self.class_mapping
[docs] def get_compatible_encoders(self): from immuneML.encodings.abundance_encoding.SequenceAbundanceEncoder import SequenceAbundanceEncoder from immuneML.encodings.abundance_encoding.CompAIRRSequenceAbundanceEncoder import CompAIRRSequenceAbundanceEncoder from immuneML.encodings.abundance_encoding.KmerAbundanceEncoder import KmerAbundanceEncoder return [SequenceAbundanceEncoder, CompAIRRSequenceAbundanceEncoder, KmerAbundanceEncoder]