Source code for immuneML.ml_methods.classifiers.AtchleyKmerMILClassifier

import copy
import logging
import random
from pathlib import Path

import numpy as np
import pandas as pd
import torch
import yaml

from immuneML.data_model.EncodedData import EncodedData
from immuneML.environment.Label import Label
from immuneML.ml_methods.classifiers.MLMethod import MLMethod
from immuneML.ml_methods.pytorch_implementations.PyTorchLogisticRegression import PyTorchLogisticRegression
from immuneML.ml_methods.util.Util import Util
from immuneML.util.PathBuilder import PathBuilder


[docs] class AtchleyKmerMILClassifier(MLMethod): """ A binary Repertoire classifier which uses the data encoded by :ref:`AtchleyKmer` encoder to predict the repertoire label. The original publication: Ostmeyer J, Christley S, Toby IT, Cowell LG. Biophysicochemical motifs in T cell receptor sequences distinguish repertoires from tumor-infiltrating lymphocytes and adjacent healthy tissue. Cancer Res. Published online January 1, 2019:canres.2292.2018. `doi:10.1158/0008-5472.CAN-18-2292 <https://cancerres.aacrjournals.org/content/79/7/1671>`_ . **Specification arguments:** - iteration_count (int): max number of training iterations - threshold (float): loss threshold at which to stop training if reached - evaluate_at (int): log model performance every 'evaluate_at' iterations and store the model every 'evaluate_at' iterations if early stopping is used - use_early_stopping (bool): whether to use early stopping - learning_rate (float): learning rate for stochastic gradient descent - random_seed (int): random seed used - zero_abundance_weight_init (bool): whether to use 0 as initial weight for abundance term (if not, a random value is sampled from normal distribution with mean 0 and variance 1 / total_number_of_features - number_of_threads: number of threads to be used for training - initialization_count (int): how many times to repeat the fitting procedure from the beginning before choosing the optimal model (trains the model with multiple random initializations) - pytorch_device_name (str): The name of the pytorch device to use. This name will be passed to torch.device(pytorch_device_name). **YAML specification:** .. indent with spaces .. code-block:: yaml definitions: ml_methods: my_kmer_mil_classifier: AtchleyKmerMILClassifier: iteration_count: 100 evaluate_at: 15 use_early_stopping: False learning_rate: 0.01 random_seed: 100 zero_abundance_weight_init: True number_of_threads: 8 threshold: 0.00001 initialization_count: 4 """ MIN_SEED_VALUE = 1 MAX_SEED_VALUE = 100000 def __init__(self, iteration_count: int = None, threshold: float = None, evaluate_at: int = None, use_early_stopping: bool = None, random_seed: int = None, learning_rate: float = None, zero_abundance_weight_init: bool = None, number_of_threads: int = None, initialization_count: int = None, pytorch_device_name: str = None): super().__init__() self.logistic_regression = None self.random_seed = random_seed self.iteration_count = iteration_count self.threshold = threshold self.evaluate_at = evaluate_at self.use_early_stopping = use_early_stopping self.learning_rate = learning_rate self.zero_abundance_weight_init = zero_abundance_weight_init self.number_of_threads = number_of_threads self.input_size = 0 self.initialization_count = initialization_count self.pytorch_device_name = pytorch_device_name def _make_log_reg(self): return PyTorchLogisticRegression(in_features=self.input_size, zero_abundance_weight_init=self.zero_abundance_weight_init) def _fit(self, encoded_data: EncodedData, cores_for_training: int): mapped_y = Util.map_to_new_class_values(encoded_data.labels[self.label.name], self.class_mapping) self.logistic_regression = None min_loss = np.inf for initialization in range(self.initialization_count): random.seed(self.random_seed) random_seed = random.randint(AtchleyKmerMILClassifier.MIN_SEED_VALUE, AtchleyKmerMILClassifier.MAX_SEED_VALUE) Util.setup_pytorch(self.number_of_threads, random_seed, self.pytorch_device_name) self.input_size = encoded_data.examples.shape[1] log_reg = self._make_log_reg() loss = np.inf state = {"loss": loss, "model": None} loss_func = torch.nn.BCEWithLogitsLoss(reduction='mean') optimizer = torch.optim.SGD(log_reg.parameters(), lr=self.learning_rate) for iteration in range(self.iteration_count): # reset gradients optimizer.zero_grad() # compute predictions only for k-mers with max score max_logit_indices = self._get_max_logits_indices(encoded_data.examples, log_reg) example_count = encoded_data.examples.shape[0] examples = torch.from_numpy(encoded_data.examples).float()[torch.arange(example_count).long(), :, max_logit_indices] logits = log_reg(examples) # compute the loss loss = loss_func(logits, torch.tensor(mapped_y).float()) # perform update loss.backward() optimizer.step() # log current score and keep model for early stopping if specified if iteration % self.evaluate_at == 0 or iteration == self.iteration_count - 1: logging.info(f"AtchleyKmerMILClassifier: log loss at iteration {iteration + 1}/{self.iteration_count}: {loss}.") if state["loss"] < loss and self.use_early_stopping: state = {"loss": loss.numpy(), "model": copy.deepcopy(log_reg)} if loss < self.threshold: break logging.warning(f"AtchleyKmerMILClassifier: the logistic regression model did not converge.") if loss > state['loss'] and self.use_early_stopping: log_reg.load_state_dict(state["model"]) if min_loss > loss: self.logistic_regression = log_reg min_loss = loss def _get_max_logits_indices(self, data, log_reg=None): with torch.no_grad(): if log_reg: logits = log_reg(torch.from_numpy(np.swapaxes(data, 1, 2).reshape(data.shape[0] * data.shape[2], -1))) else: logits = self.logistic_regression(torch.from_numpy(np.swapaxes(data, 1, 2).reshape(data.shape[0] * data.shape[2], -1))) logits = torch.reshape(logits, (data.shape[0], data.shape[2])) max_logits_indices = torch.argmax(logits, dim=1) return max_logits_indices.long() def _predict(self, encoded_data: EncodedData): predictions_proba = self._predict_proba(encoded_data) return {self.label.name: [self.class_mapping[val] for val in (predictions_proba[self.label.name][self.label.positive_class] > 0.5).tolist()]}
[docs] def store(self, path: Path): PathBuilder.build(path) torch.save(copy.deepcopy(self.logistic_regression).state_dict(), str(path / "log_reg.pt")) custom_vars = copy.deepcopy(vars(self)) coefficients_df = pd.DataFrame(custom_vars["logistic_regression"].linear.weight.detach().numpy(), columns=self.feature_names) coefficients_df["bias"] = custom_vars["logistic_regression"].linear.bias.detach().numpy() coefficients_df.to_csv(path / "coefficients.csv", index=False) del custom_vars["result_path"] del custom_vars["logistic_regression"] del custom_vars["label"] if self.label: custom_vars["label"] = self.label.get_desc_for_storage() params_path = self._get_custom_params_path(path) with params_path.open('w') as file: yaml.dump(custom_vars, file)
[docs] def load(self, path): params_path = self._get_custom_params_path(path) with params_path.open("r") as file: custom_params = yaml.load(file, Loader=yaml.SafeLoader) for param, value in custom_params.items(): if hasattr(self, param): if param == "label": setattr(self, "label", Label(**value)) else: setattr(self, param, value) self.logistic_regression = self._make_log_reg() self.logistic_regression.load_state_dict(torch.load(str(path / "log_reg.pt")))
def _get_custom_params_path(self, path): return path / "custom_params.yaml"
[docs] def get_params(self): params = copy.deepcopy(vars(self)) params["logistic_regression"] = copy.deepcopy(self.logistic_regression).state_dict() return params
def _predict_proba(self, encoded_data: EncodedData): self.logistic_regression.eval() example_count = encoded_data.examples.shape[0] max_logit_indices = self._get_max_logits_indices(encoded_data.examples) with torch.no_grad(): data = torch.from_numpy(encoded_data.examples).float()[torch.arange(example_count).long(), :, max_logit_indices] predictions = torch.sigmoid(self.logistic_regression(data)).numpy() return {self.label.name: {self.label.positive_class: predictions, self.label.get_binary_negative_class(): 1 - predictions}}
[docs] def can_predict_proba(self) -> bool: return True
[docs] def can_fit_with_example_weights(self) -> bool: return False
[docs] def get_compatible_encoders(self): from immuneML.encodings.atchley_kmer_encoding.AtchleyKmerEncoder import AtchleyKmerEncoder return [AtchleyKmerEncoder]