Source code for immuneML.encodings.abundance_encoding.AbundanceEncoderHelper

import pickle

import fisher
import numpy as np

from immuneML.caching.CacheHandler import CacheHandler
from immuneML.environment.Label import Label
from immuneML.environment.LabelConfiguration import LabelConfiguration


[docs] class AbundanceEncoderHelper: INVALID_P_VALUE = 2.0
[docs] @staticmethod def check_labels(label_config: LabelConfiguration, location: str): labels = label_config.get_label_objects() assert len(labels) == 1, f"{location}: this encoding works only for single label." label = labels[0] assert isinstance(label, Label) and label.positive_class is not None and label.positive_class != "", \ f"{location}: positive_class parameter was not set for label {label}. It has to be set to determine the " \ f"receptor sequences associated with the positive class. " \ f"To use this encoder, in the label definition in the specification of the instruction, define " \ f"the positive class for the label. See documentation for this encoder for more details." assert len(label.values) == 2, f"{location}: only binary classification (2 classes) is possible when extracting " \ f"relevant sequences for the label, but got these classes for label {label.name} instead: {label.values}."
[docs] @staticmethod def check_is_positive_class(dataset, matrix_repertoire_ids, label_config: LabelConfiguration): label = label_config.get_label_objects()[0] is_positive_class = np.array( [dataset.get_repertoire(repertoire_identifier=repertoire_id).metadata[label.name] for repertoire_id in matrix_repertoire_ids]) == label.positive_class return is_positive_class
[docs] @staticmethod def get_relevant_sequence_indices(sequence_presence_iterator, is_positive_class, p_value_threshold, relevant_indices_path, params, cache_params=None): relevant_indices_path = relevant_indices_path if relevant_indices_path is not None else params.result_path / 'relevant_sequence_indices' \ '.pickle ' file_paths = {"relevant_indices_path": relevant_indices_path} if params.learn_model: contingency_table = CacheHandler.memo_by_params(('cache_params', cache_params, ('type', 'contingency_table')), lambda: AbundanceEncoderHelper._get_contingency_table(sequence_presence_iterator, is_positive_class)) p_values = CacheHandler.memo_by_params((('cache_params', cache_params), ("type", "fisher_p_values")), lambda: AbundanceEncoderHelper._find_sequence_p_values_with_fisher(contingency_table)) relevant_sequence_indices = p_values < p_value_threshold file_paths["contingency_table_path"] = AbundanceEncoderHelper._write_contingency_table(contingency_table, params.result_path) file_paths["p_values_path"] = AbundanceEncoderHelper._write_p_values(p_values, params.result_path) with relevant_indices_path.open("wb") as file: pickle.dump(relevant_sequence_indices, file) else: with relevant_indices_path.open("rb") as file: relevant_sequence_indices = pickle.load(file) return relevant_sequence_indices, file_paths
@staticmethod def _get_contingency_table(sequence_presence_iterator, is_positive_class): contingency_table = np.zeros(shape=(len(sequence_presence_iterator), 4), dtype=int) for i, sequence_vector in enumerate(sequence_presence_iterator): contingency_table[i, 0] = np.sum(sequence_vector[np.logical_and(sequence_vector, is_positive_class)]) contingency_table[i, 1] = np.sum( sequence_vector[np.logical_and(sequence_vector, np.logical_not(is_positive_class))]) contingency_table[i, 2] = np.sum(np.logical_and(is_positive_class, sequence_vector == 0)) contingency_table[i, 3] = np.sum(np.logical_and(np.logical_not(is_positive_class), sequence_vector == 0)) return contingency_table @staticmethod def _find_sequence_p_values_with_fisher(contingency_table): return np.apply_along_axis(AbundanceEncoderHelper._fisher_test, 1, contingency_table) @staticmethod def _fisher_test(row): if row[0] + row[1] > 1: return fisher.pvalue(row[0], row[1], row[2], row[3]).right_tail else: return AbundanceEncoderHelper.INVALID_P_VALUE @staticmethod def _write_contingency_table(contingency_table, result_path): contingency_table_path = result_path / 'contingency_table.csv' np.savetxt(contingency_table_path, contingency_table, fmt="%s", delimiter=",", header="positive_present,negative_present,positive_absent,negative_absent", comments='') return contingency_table_path @staticmethod def _write_p_values(p_values, result_path): p_values_path = result_path / 'p_values.csv' np.savetxt(p_values_path, p_values, header="p_values", comments='') return p_values_path
[docs] @staticmethod def build_abundance_matrix(sequence_presence_matrix, matrix_repertoire_ids, dataset_repertoire_ids, sequence_p_values_indices): abundance_matrix = np.zeros((len(dataset_repertoire_ids), 2)) for idx_in_dataset, dataset_repertoire_id in enumerate(dataset_repertoire_ids): relevant_row = np.where(matrix_repertoire_ids == dataset_repertoire_id) repertoire_vector = sequence_presence_matrix.T[relevant_row] relevant_sequence_abundance = np.sum(repertoire_vector[np.logical_and(sequence_p_values_indices, repertoire_vector)]) total_sequence_abundance = np.sum(repertoire_vector) abundance_matrix[idx_in_dataset] = [relevant_sequence_abundance, total_sequence_abundance] return abundance_matrix