import logging
from multiprocessing import Pool
from pathlib import Path
from typing import Tuple, List
import math
import dill
from immuneML.environment.SequenceType import SequenceType
import numpy as np
from sklearn.preprocessing import StandardScaler
from immuneML.caching.CacheHandler import CacheHandler
from immuneML.caching.CacheObjectType import CacheObjectType
from immuneML.data_model.datasets.RepertoireDataset import RepertoireDataset
from immuneML.data_model.EncodedData import EncodedData
from immuneML.encodings.DatasetEncoder import DatasetEncoder
from immuneML.encodings.EncoderParams import EncoderParams
from immuneML.encodings.atchley_kmer_encoding.RelativeAbundanceType import RelativeAbundanceType
from immuneML.encodings.atchley_kmer_encoding.Util import Util
from immuneML.encodings.preprocessing.FeatureScaler import FeatureScaler
from immuneML.util.ParameterValidator import ParameterValidator
from scripts.specification_util import update_docs_per_mapping
[docs]
class AtchleyKmerEncoder(DatasetEncoder):
"""
Represents a repertoire through Atchley factors and relative abundance of k-mers. Should be used in combination
with the :ref:`AtchleyKmerMILClassifier`.
For more details, see the original publication: Ostmeyer J,
Christley S, Toby IT, Cowell LG. Biophysicochemical motifs in T cell receptor sequences distinguish repertoires from tumor-infiltrating
lymphocytes and adjacent healthy tissue. Cancer Res. Published online January 1, 2019:canres.2292.2018. `doi:10.1158/0008-5472.CAN-18-2292
<https://cancerres.aacrjournals.org/content/79/7/1671>`_ .
Note that sequences in the repertoire with length shorter than skip_first_n_aa + skip_last_n_aa + k will not be encoded.
**Dataset type:**
- RepertoireDatasets
**Specification arguments:**
- k (int): k-mer length
- skip_first_n_aa (int): number of amino acids to remove from the beginning of the receptor sequence
- skip_last_n_aa (int): number of amino acids to remove from the end of the receptor sequence
- abundance: how to compute abundance term for k-mers
- normalize_all_features (bool): when normalizing features to have 0 mean and unit variance, this parameter indicates if the abundance feature should be included in the normalization
**YAML specification:**
.. indent with spaces
.. code-block:: yaml
definitions:
encodings:
my_encoder:
AtchleyKmer:
k: 4
skip_first_n_aa: 3
skip_last_n_aa: 3
abundance: RELATIVE_ABUNDANCE
normalize_all_features: False
"""
[docs]
@staticmethod
def build_object(dataset, **params):
if isinstance(dataset, RepertoireDataset):
location = "AtchleyKmerEncoder"
ParameterValidator.assert_type_and_value(params["k"], int, location, "k", 1)
ParameterValidator.assert_type_and_value(params["skip_first_n_aa"], int, location, "skip_first_n_aa", 0)
ParameterValidator.assert_type_and_value(params["skip_last_n_aa"], int, location, "skip_last_n_aa", 0)
ParameterValidator.assert_in_valid_list(params["abundance"].upper(), [ab.name for ab in RelativeAbundanceType], location, "abundance")
ParameterValidator.assert_type_and_value(params["normalize_all_features"], bool, location, "normalize_all_features")
return AtchleyKmerEncoder(**params)
else:
raise ValueError(f"AtchleyKmerEncoder can only be applied to repertoire dataset, got {type(dataset).__name__} instead.")
def __init__(self, k: int, skip_first_n_aa: int, skip_last_n_aa: int, abundance: str, normalize_all_features: bool, name: str = None):
super().__init__(name=name)
self.k = k
self.skip_first_n_aa = skip_first_n_aa
self.skip_last_n_aa = skip_last_n_aa
self.abundance = RelativeAbundanceType[abundance.upper()]
self.normalize_all_features = normalize_all_features
self.scaler = None
self.kmer_keys = None
[docs]
def encode(self, dataset, params: EncoderParams):
examples, keys, labels = self._encode_examples(dataset, params)
examples = self._vectorize_examples(examples, params, keys)
# normalize to zero mean and unit variance only features coming from Atchley factors
tmp_examples = examples[:, :, :-1] if not self.normalize_all_features else examples
flattened_vectorized_examples = tmp_examples.reshape(examples.shape[0] * examples.shape[1], -1)
scaled_examples = self._scale_examples(flattened_vectorized_examples, params)
if self.normalize_all_features:
examples = np.array(scaled_examples).reshape((examples.shape[0], len(self.kmer_keys), -1))
else:
examples[:, :, :-1] = np.array(scaled_examples).reshape((examples.shape[0], len(self.kmer_keys), -1))
# swap axes to get examples x atchley_factors x kmers dimensions
examples = np.swapaxes(examples, 1, 2)
feature_names = [f"atchley_factor_{j}_aa_{i}" for i in range(1, self.k + 1) for j in range(1, Util.ATCHLEY_FACTOR_COUNT + 1)] + ["abundance"]
encoded_data = EncodedData(examples=examples, example_ids=dataset.get_example_ids(),
feature_names=feature_names, labels=labels,
encoding=AtchleyKmerEncoder.__name__, info={"kmer_keys": self.kmer_keys,
'sequence_type': SequenceType.AMINO_ACID,
'region_type': params.region_type})
encoded_dataset = dataset.clone()
encoded_dataset.encoded_data = encoded_data
return encoded_dataset
def _scale_examples(self, flattened_vectorized_examples, params: EncoderParams):
if params.learn_model:
self.scaler = StandardScaler(with_mean=True, with_std=True)
scaled_examples = FeatureScaler.standard_scale_fit(self.scaler, flattened_vectorized_examples)
else:
scaled_examples = FeatureScaler.standard_scale(self.scaler, flattened_vectorized_examples)
if hasattr(scaled_examples, "todense"):
scaled_examples = scaled_examples.todense()
return scaled_examples
def _encode_examples(self, dataset: RepertoireDataset, params: EncoderParams) -> Tuple[list, set, dict]:
keys = set()
example_count = dataset.get_example_count()
arguments = [(repertoire.identifier, dill.dumps(repertoire), index, example_count, params)
for index, repertoire in enumerate(dataset.repertoires)]
with Pool(params.pool_size) as pool:
chunksize = math.floor(dataset.get_example_count() / params.pool_size) + 1
examples = pool.starmap(self._process_repertoire_cached, arguments, chunksize=chunksize)
examples = [dill.loads(example) for example in examples]
for example in examples:
keys.update(list(example.keys()))
labels = dataset.get_metadata(params.label_config.get_labels_by_name()) if params.encode_labels else None
return examples, keys, labels
def _process_repertoire_cached(self, repertoire_id: str, repertoire, index, example_count, params):
return CacheHandler.memo_by_params((('repertoire', repertoire_id), ('encoder', AtchleyKmerEncoder.__name__),
(self.abundance, self.skip_last_n_aa, self.skip_first_n_aa, self.k)),
lambda: self._process_repertoire(repertoire, index, example_count, params),
CacheObjectType.ENCODING_STEP)
def _process_repertoire(self, repertoire, index, example_count, params: EncoderParams):
if isinstance(repertoire, bytes):
repertoire = dill.loads(repertoire)
logging.info(f"AtchleyKmerEncoder: encoding repertoire {index + 1}/{example_count}.")
sequences, counts = self._trunc_sequences(repertoire, params)
abundances = Util.compute_abundance(sequences, counts, self.k, self.abundance)
kmers = [el.to_string() for el in abundances.keys()]
atchley_factors_df = Util.get_atchely_factors(kmers, self.k)
atchley_factors_df["abundances"] = np.log(list(abundances.values()))
encoded = atchley_factors_df.to_dict('index')
encoded = {key: list(encoded[key].values()) for key in encoded}
encoded = dill.dumps(encoded)
return encoded
def _vectorize_examples(self, examples, params: EncoderParams, keys: set) -> np.ndarray:
if params.learn_model:
self.kmer_keys = sorted(list(keys))
vectorized_examples = [
np.array([np.array(example[key]) if key in example else np.zeros(self.k * Util.ATCHLEY_FACTOR_COUNT + 1) for key in self.kmer_keys])
for example in examples]
return np.array(vectorized_examples, dtype=np.float32)
def _trunc_sequences(self, repertoire, params: EncoderParams):
data = repertoire.data
sequences = getattr(data, params.region_type.value + "_aa")
indices = sequences.lengths >= self.skip_first_n_aa + self.skip_last_n_aa + self.k
sequences = sequences[indices]
counts = data.duplicate_count[indices]
if self.skip_first_n_aa > 0 and self.skip_last_n_aa > 0:
sequences = sequences[:, self.skip_first_n_aa: -self.skip_last_n_aa]
elif self.skip_first_n_aa > 0:
sequences = sequences[:, self.skip_first_n_aa:]
elif self.skip_last_n_aa > 0:
sequences = sequences[:, :-self.skip_last_n_aa]
return sequences, counts
[docs]
def get_additional_files(self) -> List[str]:
return []
[docs]
@staticmethod
def load_encoder(encoder_file: Path):
encoder = DatasetEncoder.load_encoder(encoder_file)
return encoder
[docs]
@staticmethod
def get_documentation():
doc = str(AtchleyKmerEncoder.__doc__)
valid_values = str([item.name for item in RelativeAbundanceType])[1:-1].replace("'", "`")
mapping = {
"how to compute abundance term for k-mers":
f"how to compute abundance term for k-mers; valid values are {valid_values}."
}
doc = update_docs_per_mapping(doc, mapping)
return doc