Source code for immuneML.encodings.atchley_kmer_encoding.AtchleyKmerEncoder

import logging
import math
from multiprocessing import Pool
from pathlib import Path
from typing import Tuple, List

import numpy as np
import yaml

from immuneML.caching.CacheHandler import CacheHandler
from immuneML.caching.CacheObjectType import CacheObjectType
from immuneML.data_model.dataset.RepertoireDataset import RepertoireDataset
from immuneML.data_model.encoded_data.EncodedData import EncodedData
from immuneML.encodings.DatasetEncoder import DatasetEncoder
from immuneML.encodings.EncoderParams import EncoderParams
from immuneML.encodings.atchley_kmer_encoding.RelativeAbundanceType import RelativeAbundanceType
from immuneML.encodings.atchley_kmer_encoding.Util import Util
from immuneML.encodings.preprocessing.FeatureScaler import FeatureScaler
from immuneML.util.ParameterValidator import ParameterValidator
from immuneML.util.PathBuilder import PathBuilder
from scripts.specification_util import update_docs_per_mapping


[docs]class AtchleyKmerEncoder(DatasetEncoder): """ Represents a repertoire through Atchley factors and relative abundance of k-mers. Should be used in combination with the :ref:`AtchleyKmerMILClassifier`. For more details, see the original publication: Ostmeyer J, Christley S, Toby IT, Cowell LG. Biophysicochemical motifs in T cell receptor sequences distinguish repertoires from tumor-infiltrating lymphocytes and adjacent healthy tissue. Cancer Res. Published online January 1, 2019:canres.2292.2018. `doi:10.1158/0008-5472.CAN-18-2292 <https://cancerres.aacrjournals.org/content/79/7/1671>`_ . Note that sequences in the repertoire with length shorter than skip_first_n_aa + skip_last_n_aa + k will not be encoded. Arguments: k (int): k-mer length skip_first_n_aa (int): number of amino acids to remove from the beginning of the receptor sequence skip_last_n_aa (int): number of amino acids to remove from the end of the receptor sequence abundance: how to compute abundance term for k-mers normalize_all_features (bool): when normalizing features to have 0 mean and unit variance, this parameter indicates if the abundance feature should be included in the normalization YAML specification: .. indent with spaces .. code-block:: yaml my_encoder: AtchleyKmer: k: 4 skip_first_n_aa: 3 skip_last_n_aa: 3 abundance: RELATIVE_ABUNDANCE normalize_all_features: False """
[docs] @staticmethod def build_object(dataset, **params): if isinstance(dataset, RepertoireDataset): return AtchleyKmerEncoder(**params) else: raise ValueError(f"AtchleyKmerEncoder can only be applied to repertoire dataset, got {type(dataset).__name__} instead.")
def __init__(self, k: int, skip_first_n_aa: int, skip_last_n_aa: int, abundance: str, normalize_all_features: bool, name: str = None): location = "AtchleyKmerEncoder" ParameterValidator.assert_type_and_value(k, int, location, "k", 1) ParameterValidator.assert_type_and_value(skip_first_n_aa, int, location, "skip_first_n_aa", 0) ParameterValidator.assert_type_and_value(skip_last_n_aa, int, location, "skip_last_n_aa", 0) ParameterValidator.assert_in_valid_list(abundance.upper(), [ab.name for ab in RelativeAbundanceType], location, "abundance") ParameterValidator.assert_type_and_value(normalize_all_features, bool, location, "normalize_all_features") self.k = k self.skip_first_n_aa = skip_first_n_aa self.skip_last_n_aa = skip_last_n_aa self.abundance = RelativeAbundanceType[abundance.upper()] self.normalize_all_features = normalize_all_features self.name = name self.scaler_path = None self.vectorizer_path = None
[docs] def encode(self, dataset, params: EncoderParams): examples, keys, labels = self._encode_examples(dataset, params) examples, kmer_keys = self._vectorize_examples(examples, params, keys) # normalize to zero mean and unit variance only features coming from Atchley factors tmp_examples = examples[:, :, :-1] if not self.normalize_all_features else examples flattened_vectorized_examples = tmp_examples.reshape(examples.shape[0] * examples.shape[1], -1) if self.scaler_path is None: self.scaler_path = params.result_path / "atchley_factor_scaler.pickle" scaled_examples = FeatureScaler.standard_scale(self.scaler_path, flattened_vectorized_examples) if hasattr(scaled_examples, "todense"): scaled_examples = scaled_examples.todense() if self.normalize_all_features: examples = np.array(scaled_examples).reshape(examples.shape[0], len(kmer_keys), -1) else: examples[:, :, :-1] = np.array(scaled_examples).reshape(examples.shape[0], len(kmer_keys), -1) # swap axes to get examples x atchley_factors x kmers dimensions examples = np.swapaxes(examples, 1, 2) feature_names = [f"atchley_factor_{j}_aa_{i}" for i in range(1, self.k+1) for j in range(1, Util.ATCHLEY_FACTOR_COUNT+1)] + ["abundance"] encoded_data = EncodedData(examples=examples, example_ids=dataset.get_example_ids(), feature_names=feature_names, labels=labels, encoding=AtchleyKmerEncoder.__name__, info={"kmer_keys": kmer_keys}) encoded_dataset = dataset.clone() encoded_dataset.encoded_data = encoded_data return encoded_dataset
def _encode_examples(self, dataset: RepertoireDataset, params: EncoderParams) -> Tuple[list, set, dict]: keys = set() example_count = dataset.get_example_count() arguments = [(repertoire, index, example_count) for index, repertoire in enumerate(dataset.repertoires)] with Pool(params.pool_size) as pool: chunksize = math.floor(dataset.get_example_count() / params.pool_size) + 1 examples = pool.starmap(self._process_repertoire_cached, arguments, chunksize=chunksize) for example in examples: keys.update(list(example.keys())) labels = dataset.get_metadata(params.label_config.get_labels_by_name()) if params.encode_labels else None return examples, keys, labels def _process_repertoire_cached(self, repertoire, index, example_count): return CacheHandler.memo_by_params((('repertoire', repertoire.identifier), ('encoder', AtchleyKmerEncoder.__name__), (self.abundance, self.skip_last_n_aa, self.skip_first_n_aa, self.k)), lambda: self._process_repertoire(repertoire, index, example_count), CacheObjectType.ENCODING_STEP) def _process_repertoire(self, repertoire, index, example_count): if self.skip_first_n_aa > 0 and self.skip_last_n_aa > 0: remove_aa_func = lambda seqs: [seq[self.skip_first_n_aa:-self.skip_last_n_aa] for seq in seqs] elif self.skip_last_n_aa > 0: remove_aa_func = lambda seqs: [seq[:-self.skip_last_n_aa] for seq in seqs] else: remove_aa_func = lambda seqs: [seq[self.skip_first_n_aa:] for seq in seqs] logging.info(f"AtchleyKmerEncoder: encoding repertoire {index + 1}/{example_count}.") sequences, counts = self._trunc_sequences(repertoire, remove_aa_func) abundances = Util.compute_abundance(sequences, counts, self.k, self.abundance) kmers = list(abundances.keys()) atchley_factors_df = Util.get_atchely_factors(kmers, self.k) atchley_factors_df["abundances"] = np.log(list(abundances.values())) encoded = atchley_factors_df.to_dict('index') encoded = {key: list(encoded[key].values()) for key in encoded} return encoded def _vectorize_examples(self, examples, params: EncoderParams, keys: set) -> Tuple[np.ndarray, list]: if self.vectorizer_path is None: self.vectorizer_path = params.result_path / "vectorizer_keys.yaml" if params.learn_model is True: kmer_keys = sorted(list(keys)) PathBuilder.build_from_objects(params.result_path) with self.vectorizer_path.open("w") as file: yaml.dump(kmer_keys, file) else: with self.vectorizer_path.open("r") as file: kmer_keys = yaml.safe_load(file) vectorized_examples = [ np.array([np.array(example[key]) if key in example else np.zeros(self.k * Util.ATCHLEY_FACTOR_COUNT + 1) for key in kmer_keys]) for example in examples] return np.array(vectorized_examples, dtype=np.float32), kmer_keys def _trunc_sequences(self, repertoire, remove_aa_func): sequences = repertoire.get_sequence_aas() counts = repertoire.get_counts() indices = [i for i in range(sequences.shape[0]) if len(sequences[i]) >= self.skip_first_n_aa + self.skip_last_n_aa + self.k] sequences = sequences[indices] counts = counts[indices] if self.skip_first_n_aa > 0 or self.skip_last_n_aa > 0: sequences = np.apply_along_axis(remove_aa_func, 0, sequences) return sequences, counts
[docs] def get_additional_files(self) -> List[str]: return [self.scaler_path, self.vectorizer_path]
[docs] @staticmethod def export_encoder(path: Path, encoder) -> Path: encoder_file = DatasetEncoder.store_encoder(encoder, path / "encoder.pickle") return encoder_file
[docs] @staticmethod def load_encoder(encoder_file: Path): encoder = DatasetEncoder.load_encoder(encoder_file) for attribute in ["scaler_path", "vectorizer_path"]: encoder = DatasetEncoder.load_attribute(encoder, encoder_file, attribute) return encoder
[docs] @staticmethod def get_documentation(): doc = str(AtchleyKmerEncoder.__doc__) valid_values = str([item.name for item in RelativeAbundanceType])[1:-1].replace("'", "`") mapping = { "how to compute abundance term for k-mers": f"how to compute abundance term for k-mers; valid values are {valid_values}." } doc = update_docs_per_mapping(doc, mapping) return doc