Source code for immuneML.encodings.reference_encoding.MatchedReceptorsRepertoireEncoder

import numpy as np
import pandas as pd

from immuneML.analysis.SequenceMatcher import SequenceMatcher
from immuneML.data_model.dataset.RepertoireDataset import RepertoireDataset
from immuneML.data_model.encoded_data.EncodedData import EncodedData
from immuneML.data_model.repertoire.Repertoire import Repertoire
from immuneML.encodings.EncoderParams import EncoderParams
from immuneML.util.ReadsType import ReadsType
from immuneML.encodings.reference_encoding.MatchedReceptorsEncoder import MatchedReceptorsEncoder


[docs]class MatchedReceptorsRepertoireEncoder(MatchedReceptorsEncoder): def _encode_new_dataset(self, dataset, params: EncoderParams): encoded_dataset = RepertoireDataset(repertoires=dataset.repertoires, labels=dataset.labels, metadata_file=dataset.metadata_file) feature_annotations = None if self.sum_matches else self._get_feature_info() if self.sum_matches: chains = self.reference_receptors[0].get_chains() feature_names = [f"sum_of_{self.reads.value}_reads_{chains[0]}", f"sum_of_{self.reads.value}_reads_{chains[1]}"] else: feature_names = [f"{row['receptor_id']}.{row['chain']}" for index, row in feature_annotations.iterrows()] encoded_repertoires, labels, example_ids = self._encode_repertoires(dataset, params) encoded_repertoires = self._normalize(dataset, encoded_repertoires) if self.normalize else encoded_repertoires encoded_dataset.add_encoded_data(EncodedData( # examples contains a np.ndarray with counts examples=encoded_repertoires, # example_ids contains a list of repertoire identifiers example_ids=example_ids, # feature_names contains a list of reference receptor identifiers feature_names=feature_names, # feature_annotations contains a PD dataframe with sequence and VDJ gene usage per reference receptor feature_annotations=feature_annotations, labels=labels, encoding=MatchedReceptorsEncoder.__name__ )) return encoded_dataset def _normalize(self, dataset, encoded_repertoires): if self.reads == ReadsType.UNIQUE: repertoire_totals = np.asarray([[repertoire.get_element_count() for repertoire in dataset.get_data()]]).T else: repertoire_totals = np.asarray([[sum(repertoire.get_counts()) for repertoire in dataset.get_data()]]).T return encoded_repertoires / repertoire_totals def _get_feature_info(self): """ returns a pandas dataframe containing: - receptor id - receptor chain - amino acid sequence - v gene - j gene """ features = [[] for i in range(0, self.feature_count)] for i, receptor in enumerate(self.reference_receptors): id = receptor.identifier chain_names = receptor.get_chains() first_chain = receptor.get_chain(chain_names[0]) second_chain = receptor.get_chain(chain_names[1]) clonotype_id = receptor.metadata["clonotype_id"] if "clonotype_id" in receptor.metadata else None if first_chain.metadata.custom_params is not None: first_dual_chain_id = first_chain.metadata.custom_params["dual_chain_id"] if "dual_chain_id" in first_chain.metadata.custom_params else None if second_chain.metadata.custom_params is not None: second_dual_chain_id = second_chain.metadata.custom_params["dual_chain_id"] if "dual_chain_id" in second_chain.metadata.custom_params else None features[i * 2] = [id, clonotype_id, chain_names[0], first_dual_chain_id, first_chain.amino_acid_sequence, first_chain.metadata.v_gene, first_chain.metadata.j_gene] features[i * 2 + 1] = [id, clonotype_id, chain_names[1], second_dual_chain_id, second_chain.amino_acid_sequence, second_chain.metadata.v_gene, second_chain.metadata.j_gene] features = pd.DataFrame(features, columns=["receptor_id", "clonotype_id", "chain", "dual_chain_id", "sequence", "v_gene", "j_gene"]) features.dropna(axis="columns", how="all", inplace=True) return features def _encode_repertoires(self, dataset: RepertoireDataset, params: EncoderParams): # Rows = repertoires, Columns = reference chains (two per sequence receptor) encoded_repertories = np.zeros((dataset.get_example_count(), self.feature_count), dtype=int) labels = {label: [] for label in params.label_config.get_labels_by_name()} if params.encode_labels else None for i, repertoire in enumerate(dataset.get_data()): encoded_repertories[i] = self._match_repertoire_to_receptors(repertoire) if labels is not None: for label_name in params.label_config.get_labels_by_name(): labels[label_name].append(repertoire.metadata[label_name]) return encoded_repertories, labels, dataset.get_repertoire_ids() def _match_repertoire_to_receptors(self, repertoire: Repertoire): matcher = SequenceMatcher() matches = np.zeros(self.feature_count, dtype=int) rep_seqs = repertoire.sequences for i, ref_receptor in enumerate(self.reference_receptors): chain_names = ref_receptor.get_chains() first_chain = ref_receptor.get_chain(chain_names[0]) second_chain = ref_receptor.get_chain(chain_names[1]) for rep_seq in rep_seqs: matches_idx = 0 if self.sum_matches else i * 2 match_count = 1 if self.reads == ReadsType.UNIQUE else rep_seq.metadata.count # Match with first chain: add to even columns in matches. # Match with second chain: add to odd columns if matcher.matches_sequence(first_chain, rep_seq, max_distance=self.max_edit_distances[chain_names[0]]): matches[matches_idx] += match_count if matcher.matches_sequence(second_chain, rep_seq, max_distance=self.max_edit_distances[chain_names[1]]): matches[matches_idx + 1] += match_count return matches