Source code for immuneML.encodings.baseline_encoding.GeneFrequencyEncoder

from collections import Counter
from typing import List

import pandas as pd
from sklearn.preprocessing import StandardScaler

from immuneML.analysis.data_manipulation.NormalizationType import NormalizationType
from immuneML.data_model.EncodedData import EncodedData
from immuneML.data_model.datasets.Dataset import Dataset
from immuneML.data_model.datasets.ElementDataset import ReceptorDataset, SequenceDataset
from immuneML.data_model.datasets.RepertoireDataset import RepertoireDataset
from immuneML.encodings.DatasetEncoder import DatasetEncoder
from immuneML.encodings.EncoderParams import EncoderParams
from immuneML.encodings.preprocessing.FeatureScaler import FeatureScaler
from immuneML.util.NumpyHelper import NumpyHelper
from immuneML.util.ParameterValidator import ParameterValidator


[docs] class GeneFrequencyEncoder(DatasetEncoder): """ GeneFrequencyEncoder represents a repertoire by the frequency of V and/or J genes used. **Dataset type:** - RepertoireDatasets - ReceptorDatasets - SequenceDatasets **Specification arguments:** - genes (list): List of genes to use for the encoding. Possible values are 'V', and 'J'. At least one gene must be specified. - normalization_type (str): Type of normalization to apply to the gene frequencies. Possible values are 'none', 'binary', 'relative_frequency', 'max', 'l2'. Defaults to 'relative_frequency'. For SequenceDatasets and ReceptorDatasets, the gene frequencies are binary (1 if the gene is present in any chain, 0 otherwise) regardless of the normalization_type specified. - encoding_type (str): How to encode gene presence for SequenceDatasets and ReceptorDatasets. Ignored for RepertoireDatasets. Possible values are 'dummy' and 'one_hot'. Defaults to 'dummy'. - 'dummy': Creates k−1 binary columns per gene segment and locus (V or J), dropping the most frequent gene as the reference. Since each receptor or sequence carries exactly one gene per segment, including all k binary columns would create perfect multicollinearity with the intercept (the columns sum to 1). Dropping the most frequent gene resolves this and is the standard approach when fitting a model with an intercept. The reference gene for each segment is stored in the encoded data info and reported in the results. Use this when fitting logistic regression with an intercept, or when combining with other encodings. - 'one_hot': Creates k binary columns, one per observed gene. Avoids multicollinearity only when the downstream model is fitted without an intercept (fit_intercept=False in sklearn). In that case, each coefficient directly represents a gene's absolute log-odds contribution rather than an effect relative to an arbitrary baseline. - scale_to_zero_mean (bool): Whether to scale the features to zero mean. Defaults to True. - scale_to_unit_variance (bool): Whether to scale the features to unit variance. Defaults to True. **YAML specification:** .. code-block:: yaml encodings: gene_frequency_encoding: GeneFrequency: genes: [V, J] normalization_type: relative_frequency encoding_type: dummy scale_to_unit_variance: true scale_to_zero_mean: true """ VALID_ENCODING_TYPES = ['dummy', 'one_hot'] def __init__(self, genes: List[str], normalization_type: NormalizationType, scale_to_zero_mean: bool, scale_to_unit_variance: bool, encoding_type: str = 'dummy', name: str = None): super().__init__(name=name) self.genes = genes self.normalization_type = normalization_type self.scale_to_zero_mean = scale_to_zero_mean self.scale_to_unit_variance = scale_to_unit_variance self.encoding_type = encoding_type self.scaler = None self.feature_names = None self.reference_genes = {} @property def all_feature_names(self) -> List[str]: if self.feature_names is None: return [] return [name for gene in self.genes for name in self.feature_names[gene]]
[docs] @staticmethod def build_object(dataset: Dataset, **params): valid_keys = ['genes', 'normalization_type', 'scale_to_zero_mean', 'scale_to_unit_variance', 'encoding_type', 'name'] ParameterValidator.assert_keys(params.keys(), valid_keys, "GeneFrequencyEncoder", "parameters", exclusive=False) ParameterValidator.assert_all_in_valid_list(params['genes'], ['V', 'J'], "GeneFrequencyEncoder", "genes") ParameterValidator.assert_type_and_value(params['scale_to_zero_mean'], bool, "GeneFrequencyEncoder", "scale_to_zero_mean") ParameterValidator.assert_type_and_value(params['scale_to_unit_variance'], bool, "GeneFrequencyEncoder", "scale_to_unit_variance") encoding_type = params.get('encoding_type', 'dummy') if encoding_type not in GeneFrequencyEncoder.VALID_ENCODING_TYPES: raise ValueError(f"GeneFrequencyEncoder: encoding_type must be one of " f"{GeneFrequencyEncoder.VALID_ENCODING_TYPES}, got '{encoding_type}'.") normalization_type = NormalizationType[params['normalization_type'].upper()] return GeneFrequencyEncoder(genes=params['genes'], normalization_type=normalization_type, scale_to_zero_mean=params['scale_to_zero_mean'], scale_to_unit_variance=params['scale_to_unit_variance'], encoding_type=encoding_type, name=params.get('name', 'gene_frequency'))
[docs] def encode(self, dataset, params: EncoderParams) -> Dataset: if isinstance(dataset, RepertoireDataset): return self._encode_repertoire_dataset(dataset, params) elif isinstance(dataset, ReceptorDataset): return self._encode_receptor_dataset(dataset, params) elif isinstance(dataset, SequenceDataset): return self._encode_sequence_dataset(dataset, params) else: raise RuntimeError(f"{self.__class__.__name__}: {self.name}: invalid dataset type: {type(dataset)}.")
def _drop_reference_gene(self, dummies: pd.DataFrame, segment: str, locus: str) -> pd.DataFrame: """Drop the most frequent gene as the reference for the given segment+locus (e.g. V_TRA, J_TRB).""" reference = dummies.sum().idxmax() self.reference_genes[f'{segment}_{locus}'] = reference return dummies.drop(columns=[reference]) def _dummies_per_locus(self, df: pd.DataFrame, col: str, dtype) -> dict: """Return a dict of {locus -> dummies DataFrame} using the locus column in df.""" locus_dfs = {} for locus, locus_df in df.groupby('locus'): gene_values = locus_df[col].apply(lambda x: x.split('*')[0] if x else x) locus_dummies = pd.get_dummies(gene_values, dtype=dtype) locus_dummies.index = locus_df.index locus_dfs[locus] = locus_dummies return locus_dfs def _encode_receptor_dataset(self, dataset: ReceptorDataset, params: EncoderParams) -> ReceptorDataset: df = dataset.data.topandas() gene_dfs = {} for segment in self.genes: col = f'{segment.lower()}_call' locus_dfs = {} for locus, locus_df in df.groupby('locus'): gene_values = locus_df[col].apply(lambda x: x.split('*')[0] if x else x) locus_dummies = pd.get_dummies(gene_values, dtype=float) locus_dummies['cell_id'] = locus_df['cell_id'].values locus_dummies = locus_dummies.groupby('cell_id').max() # index = cell_id if params.learn_model and self.encoding_type == 'dummy': locus_dummies = self._drop_reference_gene(locus_dummies, segment, locus) locus_dfs[locus] = locus_dummies # join loci side-by-side on cell_id index; reorder to match get_example_ids() order, then drop index combined = pd.concat(locus_dfs.values(), axis=1).fillna(0) combined = combined.loc[dataset.get_example_ids()].reset_index(drop=True) gene_dfs[segment] = combined if params.learn_model: self.feature_names = {segment: gene_df.columns.tolist() for segment, gene_df in gene_dfs.items()} else: for segment in self.genes: # genes not seen during training are ignored (dropped); # training genes absent here become 0; reference gene is absent by design gene_dfs[segment] = gene_dfs[segment].reindex(columns=self.feature_names[segment], fill_value=0) examples = NumpyHelper.concat_arrays_rowwise([gene_dfs[segment].values for segment in self.genes]) labels = dataset.get_metadata(params.label_config.get_labels_by_name(), return_df=False) \ if params.encode_labels else None encoded_data = EncodedData(examples=examples, labels=labels, example_ids=dataset.get_example_ids(), feature_names=self.all_feature_names, encoding=GeneFrequencyEncoder.__name__, info={'genes': self.genes, 'encoding_type': self.encoding_type, 'reference_genes': self.reference_genes}, feature_annotations=pd.DataFrame({"feature": self.all_feature_names})) encoded_dataset = dataset.clone() encoded_dataset.encoded_data = encoded_data return encoded_dataset def _encode_sequence_dataset(self, dataset: SequenceDataset, params: EncoderParams) -> SequenceDataset: df = dataset.data.topandas() gene_dfs = {} for segment in self.genes: col = f'{segment.lower()}_call' locus_dfs = self._dummies_per_locus(df, col, dtype=int) for locus, locus_dummies in locus_dfs.items(): if params.learn_model and self.encoding_type == 'dummy': locus_dummies = self._drop_reference_gene(locus_dummies, segment, locus) locus_dfs[locus] = locus_dummies # stack loci row-wise (each sequence belongs to one locus), fill other-locus columns with 0 combined = pd.concat(locus_dfs.values(), axis=0).fillna(0).sort_index().reset_index(drop=True) gene_dfs[segment] = combined if params.learn_model: self.feature_names = {segment: gene_df.columns.tolist() for segment, gene_df in gene_dfs.items()} else: for segment in self.genes: gene_dfs[segment] = gene_dfs[segment].reindex(columns=self.feature_names[segment], fill_value=0) examples = NumpyHelper.concat_arrays_rowwise([gene_dfs[segment].values for segment in self.genes]) labels = dataset.get_metadata(params.label_config.get_labels_by_name(), return_df=False) \ if params.encode_labels else None encoded_data = EncodedData(examples=examples, labels=labels, example_ids=dataset.get_example_ids(), feature_names=self.all_feature_names, encoding=GeneFrequencyEncoder.__name__, info={'genes': self.genes, 'encoding_type': self.encoding_type, 'reference_genes': self.reference_genes}, feature_annotations=pd.DataFrame({"feature": self.all_feature_names})) encoded_dataset = dataset.clone() encoded_dataset.encoded_data = encoded_data return encoded_dataset def _encode_repertoire_dataset(self, dataset: RepertoireDataset, params: EncoderParams) -> RepertoireDataset: counters = {segment: [] for segment in self.genes} for rep in dataset.repertoires: for segment in self.genes: genes = [gene_call.split("*")[0] for gene_call in getattr(rep.data, f'{segment.lower()}_call').tolist()] counters[segment].append(Counter(genes)) gene_dfs = {segment: pd.DataFrame(counters[segment]).fillna(0) for segment in self.genes} if params.learn_model: self.feature_names = {segment: df.columns.tolist() for segment, df in gene_dfs.items()} else: for segment in self.genes: gene_dfs[segment] = gene_dfs[segment].reindex(columns=self.feature_names[segment], fill_value=0) gene_dfs = {segment: FeatureScaler.normalize(df.values, self.normalization_type) for segment, df in gene_dfs.items()} examples = NumpyHelper.concat_arrays_rowwise([gene_dfs[segment] for segment in self.genes]) if params.encode_labels: labels = dataset.get_metadata(params.label_config.get_labels_by_name(), return_df=False) else: labels = None return self._make_encoded_dataset(dataset, examples, labels, params) def _make_encoded_dataset(self, dataset, examples, labels, params: EncoderParams): examples = self._scale_examples(examples, params) encoded_data = EncodedData(examples=examples, labels=labels, example_ids=dataset.get_example_ids(), feature_names=self.all_feature_names, encoding=GeneFrequencyEncoder.__name__, info={'genes': self.genes, 'encoding_type': self.encoding_type, 'reference_genes': self.reference_genes}, feature_annotations=pd.DataFrame({"feature": self.all_feature_names})) encoded_dataset = dataset.clone() encoded_dataset.encoded_data = encoded_data return encoded_dataset def _scale_examples(self, examples, params): if params.learn_model: self.scaler = StandardScaler(with_mean=self.scale_to_zero_mean, with_std=self.scale_to_unit_variance) examples = FeatureScaler.standard_scale_fit(self.scaler, examples, with_mean=self.scale_to_zero_mean) else: examples = FeatureScaler.standard_scale(self.scaler, examples, with_mean=self.scale_to_zero_mean) return examples