Source code for immuneML.reports.data_reports.SignificantFeatures

import warnings
from pathlib import Path
from typing import List

import pickle
import pandas as pd
import plotly.express as px
import numpy as np

from immuneML.data_model.dataset.RepertoireDataset import RepertoireDataset
from immuneML.dsl.instruction_parsers.LabelHelper import LabelHelper
from immuneML.encodings.abundance_encoding.AbundanceEncoderHelper import AbundanceEncoderHelper
from immuneML.encodings.abundance_encoding.CompAIRRSequenceAbundanceEncoder import CompAIRRSequenceAbundanceEncoder
from immuneML.encodings.abundance_encoding.KmerAbundanceEncoder import KmerAbundanceEncoder
from immuneML.reports.ReportOutput import ReportOutput
from immuneML.reports.ReportResult import ReportResult
from immuneML.reports.data_reports.DataReport import DataReport
from immuneML.util.ParameterValidator import ParameterValidator
from immuneML.util.PathBuilder import PathBuilder
from immuneML.util.SignificantFeaturesHelper import SignificantFeaturesHelper


[docs] class SignificantFeatures(DataReport): """ Plots a boxplot of the number of significant features (label-associated k-mers or sequences) per Repertoire according to Fisher's exact test, across different classes for the given label. Internally uses the :py:obj:`~immuneML.encodings.abundance_encoding.KmerAbundanceEncoder.KmerAbundanceEncoder` for calculating significant k-mers, and :py:obj:`~immuneML.encodings.abundance_encoding.SequenceAbundanceEncoder.SequenceAbundanceEncoder` or :py:obj:`~immuneML.encodings.abundance_encoding.CompAIRRSequenceAbundanceEncoder.CompAIRRSequenceAbundanceEncoder` to calculate significant full sequences (depending on whether the argument compairr_path was set). Arguments: p_values (list): The p value thresholds to be used by Fisher's exact test. Each p-value specified here will become one panel in the output figure. k_values (list): Length of the k-mers (number of amino acids) created by the :py:obj:`~immuneML.encodings.abundance_encoding.KmerAbundanceEncoder.KmerAbundanceEncoder`. When using a full sequence encoding (:py:obj:`~immuneML.encodings.abundance_encoding.SequenceAbundanceEncoder.SequenceAbundanceEncoder` or :py:obj:`~immuneML.encodings.abundance_encoding.CompAIRRSequenceAbundanceEncoder.CompAIRRSequenceAbundanceEncoder`), specify 'full_sequence' here. Each value specified under k_values will represent one boxplot in the output figure. label (dict): A label configuration. One label should be specified, and the positive_class for this label should be defined. See the YAML specification below for an example. compairr_path (str): If 'full_sequence' is listed under k_values, the path to the CompAIRR executable may be provided. If the compairr_path is specified, the :py:obj:`~immuneML.encodings.abundance_encoding.CompAIRRSequenceAbundanceEncoder.CompAIRRSequenceAbundanceEncoder` will be used to compute the significant sequences. If the path is not specified and 'full_sequence' is listed under k-values, :py:obj:`~immuneML.encodings.abundance_encoding.SequenceAbundanceEncoder.SequenceAbundanceEncoder` will be used. log_scale (bool): Whether to plot the y axis in log10 scale (log_scale = True) or continuous scale (log_scale = False). By default, log_scale is False. YAML specification: .. indent with spaces .. code-block:: yaml my_significant_features_report: SignificantFeatures: p_values: - 0.1 - 0.01 - 0.001 - 0.0001 k_values: - 3 - 4 - 5 - full_sequence compairr_path: path/to/compairr # can be specified if 'full_sequence' is listed under k_values label: # Define a label, and the positive class for that given label CMV: positive_class: + log_scale: False """
[docs] @classmethod def build_object(cls, **kwargs): location = SignificantFeatures.__name__ kwargs = SignificantFeaturesHelper.parse_parameters(kwargs, location) ParameterValidator.assert_keys_present(kwargs.keys(), ["log_scale"], location, location) ParameterValidator.assert_type_and_value(kwargs["log_scale"], bool, "SignificantFeatures", "log_scale") return SignificantFeatures(**kwargs)
def __init__(self, dataset: RepertoireDataset = None, p_values: List[float] = None, k_values: List[int] = None, label: dict = None, compairr_path: Path = None, log_scale: bool = False, result_path: Path = None, name: str = None, number_of_processes: int = 1): super().__init__(dataset=dataset, result_path=result_path, number_of_processes=number_of_processes, name=name) self.p_values = p_values self.k_values = k_values self.label = label self.label_config = None self.compairr_path = compairr_path self.log_scale = log_scale
[docs] def check_prerequisites(self): if isinstance(self.dataset, RepertoireDataset): return True else: warnings.warn(f"{SignificantFeatures.__name__}: report can be generated only from RepertoireDataset. Skipping this report...") return False
def _generate(self) -> ReportResult: self.label_config = LabelHelper.create_label_config([self.label], self.dataset, SignificantFeatures.__name__, f"{SignificantFeatures.__name__}/label") plotting_data = self._compute_plotting_data() table_result = self._write_results_table(plotting_data) report_output_fig = self._safe_plot(plotting_data=plotting_data) output_figures = None if report_output_fig is None else [report_output_fig] return ReportResult(name=self.name, info="The number of significant features (label-associated k-mers or sequences) per Repertoire according to Fisher's exact test, across different classes for the given label.", output_figures=output_figures, output_tables=[table_result]) def _compute_plotting_data(self): result = {"encoding": [], "p-value": [], "class": [], "significant_features": []} positive_class, negative_class = self._get_positive_negative_classes() for k in self.k_values: encoder_name = SignificantFeaturesHelper._get_encoder_name(k) for p_value in self.p_values: pos_class_feature_counts, neg_class_feature_counts = self._compute_significant_feature_counts(k, p_value) n_examples = len(pos_class_feature_counts) + len(neg_class_feature_counts) result["encoding"].extend([encoder_name] * n_examples) result["p-value"].extend([p_value] * n_examples) result["class"].extend( [positive_class] * len(pos_class_feature_counts) + [negative_class] * len(neg_class_feature_counts)) result["significant_features"].extend(list(pos_class_feature_counts) + list(neg_class_feature_counts)) return pd.DataFrame(result) def _get_positive_negative_classes(self): label = self.label_config.get_label_objects()[0] positive_class = label.positive_class negative_class = label.get_binary_negative_class() return positive_class, negative_class def _get_encoder_result_path(self, k, p_value): result_path = self.result_path / f"{SignificantFeaturesHelper._get_encoder_name(k)}_{p_value}" PathBuilder.build(result_path) return result_path def _write_results_table(self, data) -> ReportOutput: table_path = self.result_path / f"significant_features_report.csv" data.to_csv(table_path, index=False) return ReportOutput(table_path, "Significant features across different Repertoire classes") def _plot(self, plotting_data): figure = px.box(plotting_data, x="encoding", y="significant_features", color="class", facet_row=None, facet_col="p-value", log_y=self.log_scale, labels={ "significant_features": "Number of significant features per AIRR according to Fisher's exact test", "encoding": "Encoding", "class": "Repertoire class" }, template='plotly_white', color_discrete_sequence=px.colors.diverging.Tealrose) file_path = self.result_path / f"significant_features_figure.html" figure.write_html(str(file_path)) return ReportOutput(file_path, name="Significant features across different Repertoire classes") def _compute_significant_feature_counts(self, k, p_value): encoder_result_path = self._get_encoder_result_path(k, p_value) encoder_params = SignificantFeaturesHelper._build_encoder_params(self.label_config, encoder_result_path) if type(k) == int: pos_class_feature_counts, neg_class_feature_counts = self._compute_significant_kmer_counts(k, p_value, encoder_params) elif self.compairr_path is None: pos_class_feature_counts, neg_class_feature_counts = self._compute_significant_sequence_counts(p_value, encoder_params) else: pos_class_feature_counts, neg_class_feature_counts = self._compute_significant_compairr_sequence_counts(p_value, encoder_params) return pos_class_feature_counts, neg_class_feature_counts def _compute_significant_kmer_counts(self, k, p_value, encoder_params): encoder = SignificantFeaturesHelper._build_kmer_encoder(self.dataset, k, p_value, encoder_params) with encoder.relevant_indices_path.open("rb") as file: relevant_indices = pickle.load(file) relevant_feature_presence = np.sum(encoder.kmer_presence_matrix[relevant_indices], axis=0) return self._get_positive_negative_class(relevant_feature_presence, encoder.matrix_repertoire_ids) def _compute_significant_sequence_counts(self, p_value, encoder_params): encoder = SignificantFeaturesHelper._build_sequence_encoder(self.dataset, p_value, encoder_params) with encoder.relevant_indices_path.open("rb") as file: relevant_indices = pickle.load(file) relevant_feature_presence = self._sum_sequence_vectors_iterable(relevant_indices, encoder.comparison_data.get_item_vectors()) return self._get_positive_negative_class(relevant_feature_presence, self.dataset.get_repertoire_ids()) def _compute_significant_compairr_sequence_counts(self, p_value, encoder_params): encoder = SignificantFeaturesHelper._build_compairr_sequence_encoder(self.dataset, p_value, encoder_params, self.compairr_path) with encoder.relevant_indices_path.open("rb") as file: relevant_indices = pickle.load(file) relevant_feature_presence = self._sum_sequence_vectors_iterable(relevant_indices, encoder.compairr_sequence_presence) return self._get_positive_negative_class(relevant_feature_presence, self.dataset.get_repertoire_ids()) def _sum_sequence_vectors_iterable(self, relevant_indices, sequence_vector_iterable): relevant_feature_presence = np.zeros(shape=(6,)) for i, sequence_vector in enumerate(sequence_vector_iterable): if relevant_indices[i]: relevant_feature_presence += sequence_vector return relevant_feature_presence def _get_relevant_feature_presence(self, encoder, relevant_indices): if isinstance(encoder, KmerAbundanceEncoder): relevant_feature_presence = np.sum(encoder.kmer_presence_matrix[relevant_indices], axis=0) elif isinstance(encoder, CompAIRRSequenceAbundanceEncoder): relevant_feature_presence = np.sum(encoder.sequence_presence_matrix[relevant_indices], axis=0) else: relevant_feature_presence = np.zeros(shape=(6,)) for i, sequence_vector in enumerate(encoder.comparison_data.get_item_vectors()): if relevant_indices[i]: relevant_feature_presence += sequence_vector return relevant_feature_presence def _get_positive_negative_class(self, relevant_feature_presence, repertoire_ids): is_positive_class = AbundanceEncoderHelper.check_is_positive_class(self.dataset, repertoire_ids, self.label_config) pos_class_feature_counts = relevant_feature_presence[is_positive_class] neg_class_feature_counts = relevant_feature_presence[np.logical_not(is_positive_class)] return pos_class_feature_counts, neg_class_feature_counts