import logging
from pathlib import Path
from typing import List
import numpy as np
from sklearn.preprocessing import StandardScaler
from immuneML.caching.CacheHandler import CacheHandler
from immuneML.data_model.AIRRSequenceSet import AIRRSequenceSet, AminoAcidXEncoding
from immuneML.data_model.EncodedData import EncodedData
from immuneML.data_model.SequenceParams import RegionType
from immuneML.data_model.bnp_util import get_sequence_field_name
from immuneML.data_model.datasets.Dataset import Dataset
from immuneML.data_model.datasets.ElementDataset import SequenceDataset, ReceptorDataset
from immuneML.data_model.datasets.RepertoireDataset import RepertoireDataset
from immuneML.encodings.DatasetEncoder import DatasetEncoder
from immuneML.encodings.EncoderParams import EncoderParams
from immuneML.encodings.preprocessing.FeatureScaler import FeatureScaler
from immuneML.environment.SequenceType import SequenceType
from immuneML.util.EncoderHelper import EncoderHelper
from immuneML.util.ParameterValidator import ParameterValidator
_FACTORS_DIR = Path(__file__).parent.parent.parent / "config" / "physicochemical_factors"
SUPPORTED_FACTORS = ['atchley', 'amino_acid_property', 'kidera']
[docs]
class AminoAcidPropertyEncoder(DatasetEncoder):
"""
Encodes a dataset by replacing each amino acid in a sequence with its biophysicochemical
factor vector and averaging those vectors across all positions in the sequence.
Three factor sets are supported, each stored as a TSV file under
``immuneML/config/physicochemical_factors/``:
- ``atchley`` — 5 factors per amino acid (Atchley et al., 2005).
- ``kidera`` — 10 factors per amino acid (Kidera et al., 1985).
- ``amino_acid_property`` — 14 mixed physicochemical descriptors per amino acid compiled
from several published sources and originally aggregated in VDJtools (Shugay et al., 2015).
Characters outside the standard 20-amino-acid alphabet (gaps, X, etc.) are silently
skipped; a sequence with no known amino acids is encoded as an all-zero vector.
For SequenceDatasets the output shape is ``[n_sequences, n_factors]``. For ReceptorDatasets
each chain is encoded independently and the resulting vectors are concatenated (chains ordered
alphabetically by locus name), giving shape ``[n_receptors, 2 × n_factors]``.
**Dataset type:**
- SequenceDatasets
- ReceptorDatasets
**Specification arguments:**
- factors (str): Which set of biophysicochemical factors to use. Valid values: ``atchley``
(5 factors), ``kidera`` (10 factors), or ``amino_acid_property`` (14 factors).
- region_type (str): Which part of the receptor sequence to encode (e.g. ``imgt_cdr3``).
- scale_to_zero_mean (bool): Whether to scale each feature to zero mean across examples
after encoding. Defaults to ``true``.
- scale_to_unit_variance (bool): Whether to scale each feature to unit variance across
examples after encoding. Defaults to ``true``.
**References:**
Factor values downloaded from `vadimnazarov/kidera-atchley
<https://github.com/vadimnazarov/kidera-atchley>`_.
- W.R. Atchley, J. Zhao, A.D. Fernandes, & T. Drüke, Solving the protein sequence metric problem, Proc.
Natl. Acad. Sci. U.S.A. 102 (18) 6395-6400, https://doi.org/10.1073/pnas.0408677102 (2005).
- Kidera, A., Konishi, Y., Oka, M. et al. Statistical analysis of the physical properties of the 20 naturally
occurring amino acids. J Protein Chem 4, 23–55 (1985). https://doi.org/10.1007/BF01025492
- Shugay M et al. VDJtools: Unifying Post-analysis of T Cell Receptor Repertoires. PLoS Comp Biol 2015;
11(11):e1004503-e1004503.
**YAML specification:**
.. indent with spaces
.. code-block:: yaml
definitions:
encodings:
my_atchley_encoder:
AminoAcidProperty:
factors: atchley
region_type: imgt_cdr3
scale_to_zero_mean: true
scale_to_unit_variance: true
my_kidera_encoder:
AminoAcidProperty:
factors: kidera
region_type: imgt_cdr3
my_aa_property_encoder:
AminoAcidProperty:
factors: amino_acid_property
region_type: imgt_cdr3
"""
def __init__(self, factors: str, region_type: RegionType, scale_to_zero_mean: bool = False,
scale_to_unit_variance: bool = False, name: str = None):
super().__init__(name=name)
self.factors = factors
self.region_type = region_type
self.scale_to_zero_mean = scale_to_zero_mean
self.scale_to_unit_variance = scale_to_unit_variance
self.factor_table, self.factor_names = self._load_factor_file(factors)
self.scaler = None
self._lookup, self._valid_mask = self._build_lookup_table()
@staticmethod
def _load_factor_file(factors: str) -> tuple:
"""Load a TSV factor file and return (factor_table dict, factor_names list)."""
filepath = _FACTORS_DIR / f"{factors}.tsv"
factor_table = {}
with open(filepath) as fh:
header = next(fh).rstrip('\n').split('\t')
factor_names = header[1:]
for line in fh:
parts = line.rstrip('\n').split('\t')
if len(parts) < 2:
continue
aa = parts[0]
factor_table[aa] = [float(v) for v in parts[1:]]
return factor_table, factor_names
[docs]
@staticmethod
def build_object(dataset: Dataset, **params):
location = AminoAcidPropertyEncoder.__name__
ParameterValidator.assert_in_valid_list(params.get('factors'), SUPPORTED_FACTORS,
location, 'factors')
ParameterValidator.assert_region_type(params, location)
ParameterValidator.assert_type_and_value(params['scale_to_zero_mean'], bool, location,
'scale_to_zero_mean')
ParameterValidator.assert_type_and_value(params['scale_to_unit_variance'], bool, location,
'scale_to_unit_variance')
return AminoAcidPropertyEncoder(
factors=params['factors'],
region_type=RegionType[params['region_type'].upper()],
scale_to_zero_mean=params['scale_to_zero_mean'],
scale_to_unit_variance=params['scale_to_unit_variance'],
name=params.get('name'),
)
[docs]
def encode(self, dataset: Dataset, params: EncoderParams) -> Dataset:
cache_params = self._get_caching_params(dataset, params)
if isinstance(dataset, SequenceDataset):
return CacheHandler.memo_by_params(cache_params,
lambda: self._encode_sequence_dataset(dataset, params))
elif isinstance(dataset, ReceptorDataset):
return CacheHandler.memo_by_params(cache_params,
lambda: self._encode_receptor_dataset(dataset, params))
elif isinstance(dataset, RepertoireDataset):
return CacheHandler.memo_by_params(cache_params,
lambda: self._encode_repertoire_dataset(dataset, params))
else:
raise RuntimeError(f"{self.__class__.__name__}: {self.name}: unsupported dataset type "
f"'{type(dataset).__name__}'.")
# ------------------------------------------------------------------
# Core encoding: one averaged factor vector per sequence
# ------------------------------------------------------------------
def _build_lookup_table(self) -> tuple:
"""Build factor lookup and validity mask indexed by AminoAcidXEncoding integer codes.
AminoAcidXEncoding uses alphabet 'ACDEFGHIKLMNPQRSTVWXY*' (codes 0–21).
The lookup table has one row per code; unknown/non-standard characters (X, *)
get an all-zero row and are excluded from the per-sequence average.
"""
alphabet = bytes(AminoAcidXEncoding._alphabet).decode('ascii')
n_factors = len(next(iter(self.factor_table.values())))
lookup = np.zeros((len(alphabet), n_factors), dtype=float)
valid = np.zeros(len(alphabet), dtype=bool)
for i, aa in enumerate(alphabet):
if aa in self.factor_table:
lookup[i] = self.factor_table[aa]
valid[i] = True
return lookup, valid
def _encode_sequence_set(self, sequence_set: AIRRSequenceSet, seq_field: str) -> np.ndarray:
"""Return a ``[n_sequences, n_factors]`` array of per-sequence factor averages.
Uses the bionumpy EncodedRaggedArray directly to avoid Python-level string
operations: integer codes are retrieved via ``.ravel().raw()`` and lengths via
``.lengths``, then a single ``np.add.reduceat`` aggregates all sequences at once.
"""
seq_array = getattr(sequence_set, seq_field) # EncodedRaggedArray
n_seqs = len(seq_array)
n_factors = self._lookup.shape[1]
if n_seqs == 0:
return np.zeros((0, n_factors), dtype=float)
lengths = np.asarray(seq_array.lengths) # [n_seqs]
flat_codes = seq_array.ravel().raw() # [total_chars] int codes 0–21
flat_factors = self._lookup[flat_codes] # [total_chars, n_factors]
flat_valid = self._valid_mask[flat_codes] # [total_chars] bool
starts = np.concatenate([[0], np.cumsum(lengths[:-1])])
factor_sums = np.add.reduceat(flat_factors, starts) # [n_seqs, n_factors]
valid_counts = np.add.reduceat(flat_valid.astype(float), starts) # [n_seqs]
# np.add.reduceat returns a[i] rather than 0 for zero-length slices; fix those.
zero_len = lengths == 0
if zero_len.any():
factor_sums[zero_len] = 0.0
valid_counts[zero_len] = 0.0
for i in np.where(zero_len)[0]:
logging.warning(f"{self.__class__.__name__}: sequence at index {i} is empty; "
f"encoding as zeros.")
no_valid_aa = (valid_counts == 0) & ~zero_len
if no_valid_aa.any():
for i in np.where(no_valid_aa)[0]:
logging.warning(f"{self.__class__.__name__}: sequence at index {i} contains no "
f"known amino acids; encoding as zeros.")
safe_counts = np.where(valid_counts == 0, 1.0, valid_counts)
return factor_sums / safe_counts[:, np.newaxis]
# ------------------------------------------------------------------
# Dataset-type-specific encoding methods
# ------------------------------------------------------------------
def _encode_sequence_dataset(self, dataset: SequenceDataset, params: EncoderParams) -> SequenceDataset:
seq_field = get_sequence_field_name(self.region_type, SequenceType.AMINO_ACID)
examples = self._encode_sequence_set(dataset.data, seq_field)
examples = self._scale_examples(examples, params)
labels = ({label.name: getattr(dataset.data, label.name).tolist()
for label in params.label_config.get_label_objects()}
if params.encode_labels else None)
encoded_dataset = dataset.clone()
encoded_dataset.encoded_data = EncodedData(
examples=examples,
labels=labels,
example_ids=dataset.data.sequence_id.tolist(),
feature_names=self._get_feature_names(),
encoding=AminoAcidPropertyEncoder.__name__,
)
return encoded_dataset
def _encode_receptor_dataset(self, dataset: ReceptorDataset, params: EncoderParams) -> ReceptorDataset:
seq_field = get_sequence_field_name(self.region_type, SequenceType.AMINO_ACID)
data = dataset.data
receptor_ids, loci, mask1, mask2 = EncoderHelper.get_receptor_chain_masks(dataset)
per_seq_embeddings = self._encode_sequence_set(data, seq_field) # [n_chains, n_factors]
examples = np.hstack([per_seq_embeddings[mask1], per_seq_embeddings[mask2]]) # [n_receptors, 2*n_factors]
examples = self._scale_examples(examples, params)
if params.encode_labels:
df = data.topandas()
labels = {name: df[name].values[mask1].tolist()
for name in params.label_config.get_labels_by_name()}
else:
labels = None
encoded_dataset = dataset.clone()
encoded_dataset.encoded_data = EncodedData(
examples=examples,
labels=labels,
example_ids=receptor_ids,
feature_names=self._get_receptor_feature_names(loci),
encoding=AminoAcidPropertyEncoder.__name__,
)
return encoded_dataset
def _encode_repertoire_dataset(self, dataset: RepertoireDataset, params: EncoderParams) -> RepertoireDataset:
seq_field = get_sequence_field_name(self.region_type, SequenceType.AMINO_ACID)
examples = []
for repertoire in dataset.repertoires:
rep_vec = CacheHandler.memo_by_params(
(repertoire.identifier, self.__class__.__name__, self.factors, self.region_type.name),
lambda: self._encode_sequence_set(repertoire.data, seq_field).mean(axis=0),
)
examples.append(rep_vec)
examples = self._scale_examples(np.array(examples), params)
labels = (dataset.get_metadata(params.label_config.get_labels_by_name())
if params.encode_labels else None)
encoded_dataset = dataset.clone()
encoded_dataset.encoded_data = EncodedData(
examples=examples,
labels=labels,
example_ids=dataset.get_example_ids(),
feature_names=self._get_feature_names(),
encoding=AminoAcidPropertyEncoder.__name__,
)
return encoded_dataset
# ------------------------------------------------------------------
# Scaling
# ------------------------------------------------------------------
def _scale_examples(self, examples: np.ndarray, params: EncoderParams) -> np.ndarray:
if not self.scale_to_zero_mean and not self.scale_to_unit_variance:
return examples
if params.learn_model:
self.scaler = StandardScaler(with_mean=self.scale_to_zero_mean,
with_std=self.scale_to_unit_variance)
return FeatureScaler.standard_scale_fit(self.scaler, examples,
with_mean=self.scale_to_zero_mean)
else:
return FeatureScaler.standard_scale(self.scaler, examples,
with_mean=self.scale_to_zero_mean)
# ------------------------------------------------------------------
# Feature names
# ------------------------------------------------------------------
def _get_feature_names(self) -> List[str]:
return [f'{self.factors}_{name}' for name in self.factor_names]
def _get_receptor_feature_names(self, loci: List[str]) -> List[str]:
return [f'{locus}_{self.factors}_{name}'
for locus in loci
for name in self.factor_names]
# ------------------------------------------------------------------
# Caching
# ------------------------------------------------------------------
def _get_caching_params(self, dataset: Dataset, params: EncoderParams, step: str = None) -> tuple:
return (dataset.identifier,
tuple(params.label_config.get_labels_by_name()),
self.factors,
self.region_type.name,
self.scale_to_zero_mean,
self.scale_to_unit_variance,
params.learn_model,
step)