# quality: gold
import ast
import logging
import shutil
import weakref
from pathlib import Path
from typing import List
from uuid import uuid4
import numpy as np
import yaml
from immuneML.data_model.DatasetItem import DatasetItem
from immuneML.data_model.cell.Cell import Cell
from immuneML.data_model.cell.CellList import CellList
from immuneML.data_model.receptor.Receptor import Receptor
from immuneML.data_model.receptor.ReceptorBuilder import ReceptorBuilder
from immuneML.data_model.receptor.RegionType import RegionType
from immuneML.data_model.receptor.receptor_sequence.Chain import Chain
from immuneML.data_model.receptor.receptor_sequence.ReceptorSequence import ReceptorSequence
from immuneML.data_model.receptor.receptor_sequence.SequenceAnnotation import SequenceAnnotation
from immuneML.data_model.receptor.receptor_sequence.SequenceMetadata import SequenceMetadata
from immuneML.environment.EnvironmentSettings import EnvironmentSettings
from immuneML.simulation.implants.ImplantAnnotation import ImplantAnnotation
from immuneML.util.NumpyHelper import NumpyHelper
from immuneML.util.PathBuilder import PathBuilder
[docs]
class Repertoire(DatasetItem):
"""
Repertoire object consisting of sequence objects, each sequence attribute is stored as a list across all sequences and can be
loaded separately. Internally, this class relies on numpy to store/import_dataset the data.
"""
FIELDS = tuple(
"sequence_aas,sequences,v_genes,j_genes,v_subgroups,j_subgroups,v_alleles,j_alleles,chains,counts,region_types,frame_types,"
"sequence_identifiers,cell_ids".split(","))
[docs]
@staticmethod
def process_custom_lists(custom_lists):
if custom_lists:
field_list = list(custom_lists.keys())
values = [[NumpyHelper.get_numpy_representation(el) for el in custom_lists[field]] for field in custom_lists.keys()]
dtype = [(field, np.array(values[index]).dtype) for index, field in enumerate(custom_lists.keys())]
else:
field_list, values, dtype = [], [], []
return field_list, values, dtype
[docs]
@staticmethod
def check_count(sequence_aas: list = None, sequences: list = None, custom_lists: dict = None) -> int:
sequence_count = len(sequence_aas) if sequence_aas is not None else len(sequences) if sequences is not None else 0
if sequences is not None and sequence_aas is not None:
assert len(sequences) == len(sequence_aas), \
f"Repertoire: there is a mismatch between number of nucleotide sequences ({len(sequences)}) and the number of amino acid " \
f"sequences ({len(sequence_aas)})."
assert all(len(custom_lists[key]) == sequence_count for key in custom_lists) if custom_lists else True, \
f"Repertoire: there is a mismatch between the number of sequences and the number of attributes listed in " \
f"{str(list(custom_lists.keys()))[1:-1]}"
return sequence_count
[docs]
@classmethod
def build(cls, sequence_aas: list = None, sequences: list = None, v_genes: list = None, j_genes: list = None,
v_subgroups: list = None, j_subgroups: list = None, v_alleles: list = None, j_alleles: list = None,
chains: list = None, counts: list = None, region_types: list = None, frame_types: list = None,
custom_lists: dict = None, sequence_identifiers: list = None, path: Path = None, metadata: dict = None,
signals: dict = None, cell_ids: List[str] = None, filename_base: str = None):
sequence_count = Repertoire.check_count(sequence_aas, sequences, custom_lists)
if sequence_identifiers is None or len(sequence_identifiers) == 0 or any(identifier is None for identifier in sequence_identifiers):
sequence_identifiers = np.arange(sequence_count).astype(str)
identifier = uuid4().hex
filename_base = filename_base if filename_base is not None else identifier
data_filename = path / f"{filename_base}.npy"
field_list, values, dtype = Repertoire.process_custom_lists(custom_lists)
if signals:
signals_filtered = {signal: signals[signal] for signal in signals if signal not in Repertoire.FIELDS}
field_list_signals, values_signals, dtype_signals = Repertoire.process_custom_lists(signals_filtered)
for index, field_name in enumerate(field_list_signals):
if field_name not in field_list:
field_list.append(field_name)
values.append(values_signals[index])
dtype.append(dtype_signals[index])
for field in Repertoire.FIELDS:
if eval(field) is not None and not all(el is None for el in eval(field)):
field_list.append(field)
values.append([NumpyHelper.get_numpy_representation(val) if val is not None else np.nan for val in eval(field)])
dtype.append((field, np.array(values[-1]).dtype))
repertoire_matrix = np.array(list(map(tuple, zip(*values))), order='F', dtype=dtype)
np.save(str(data_filename), repertoire_matrix, allow_pickle=False)
metadata_filename = path / f"{filename_base}_metadata.yaml"
metadata = {} if metadata is None else metadata
metadata["field_list"] = field_list
with metadata_filename.open("w") as file:
yaml.dump(metadata, file)
repertoire = Repertoire(data_filename, metadata_filename, identifier)
return repertoire
[docs]
@classmethod
def build_like(cls, repertoire, indices_to_keep: list, result_path: Path, filename_base: str = None):
PathBuilder.build(result_path)
data = repertoire.load_data()
data = data[indices_to_keep]
identifier = uuid4().hex
filename_base = filename_base if filename_base is not None else identifier
data_filename = result_path / f"{filename_base}.npy"
np.save(str(data_filename), data)
metadata_filename = result_path / f"{filename_base}_metadata.yaml"
shutil.copyfile(repertoire.metadata_filename, metadata_filename)
new_repertoire = Repertoire(data_filename, metadata_filename, identifier)
return new_repertoire
[docs]
@classmethod
def build_from_sequence_objects(cls, sequence_objects: list, path: Path, metadata: dict, filename_base: str = None):
assert all(isinstance(sequence, ReceptorSequence) for sequence in sequence_objects), \
"Repertoire: all sequences have to be instances of ReceptorSequence class."
sequence_aas, sequences, v_genes, j_genes, v_subgroups, j_subgroups, v_alleles, j_alleles, chains, counts, region_types, frame_types, sequence_identifiers, cell_ids = [], [], [], [], [], [], [], [], [], [], [], [], [], []
custom_lists = {key: [] for key in sequence_objects[0].metadata.custom_params} if sequence_objects[0].metadata else {}
signals = {}
for index, sequence in enumerate(sequence_objects):
sequence_identifiers.append(sequence.identifier)
sequence_aas.append(sequence.amino_acid_sequence)
sequences.append(sequence.nucleotide_sequence)
if sequence.metadata:
v_genes.append(sequence.metadata.v_gene)
j_genes.append(sequence.metadata.j_gene)
v_subgroups.append(sequence.metadata.v_subgroup)
j_subgroups.append(sequence.metadata.j_subgroup)
v_alleles.append(sequence.metadata.v_allele)
j_alleles.append(sequence.metadata.j_allele)
chains.append(sequence.metadata.chain)
counts.append(sequence.metadata.count)
region_types.append(sequence.metadata.region_type)
frame_types.append(sequence.metadata.frame_type)
cell_ids.append(sequence.metadata.cell_id)
for param in sequence.metadata.custom_params.keys():
custom_lists[param].append(sequence.metadata.custom_params[param] if param in sequence.metadata.custom_params else None)
if sequence.annotation and sequence.annotation.implants and len(sequence.annotation.implants) > 0:
for implant in sequence.annotation.implants:
if implant.signal_id in signals:
signals[implant.signal_id].append(str(vars(implant)))
else:
signals[implant.signal_id] = [None for _ in range(index)] + [str(vars(implant))]
return cls.build(sequence_aas=sequence_aas, sequences=sequences, v_genes=v_genes, j_genes=j_genes, v_subgroups=v_subgroups,
j_subgroups=j_subgroups, v_alleles=v_alleles, j_alleles=j_alleles, chains=chains, counts=counts, region_types=region_types,
frame_types=frame_types, custom_lists=custom_lists, sequence_identifiers=sequence_identifiers, path=path, metadata=metadata,
signals=signals, cell_ids=cell_ids, filename_base=filename_base)
def __init__(self, data_filename: Path, metadata_filename: Path, identifier: str):
data_filename = Path(data_filename)
metadata_filename = Path(metadata_filename) if metadata_filename is not None else None
assert data_filename.suffix == ".npy", \
f"Repertoire: the file representing the repertoire has to be in numpy binary format. Got {data_filename.suffix} instead."
self.data_filename = data_filename
if metadata_filename:
with metadata_filename.open("r") as file:
self.metadata = yaml.safe_load(file)
self.fields = self.metadata["field_list"]
self.metadata_filename = metadata_filename
self.identifier = identifier
self.data = None
self.element_count = None
[docs]
def get_sequence_aas(self):
return self.get_attribute("sequence_aas")
[docs]
def get_sequence_identifiers(self):
return self.get_attribute("sequence_identifiers")
[docs]
def get_v_genes(self):
return self.get_attribute("v_genes")
[docs]
def get_j_genes(self):
return self.get_attribute("j_genes")
[docs]
def get_counts(self):
counts = self.get_attribute("counts")
if counts is not None:
counts = np.array([int(count) if not NumpyHelper.is_nan_or_empty(count) else None for count in counts])
return counts
[docs]
def get_chains(self):
chains = self.get_attribute("chains")
if chains is not None:
chains = np.array([Chain.get_chain(chain_str) if chain_str is not None else None for chain_str in chains])
return chains
[docs]
def load_data(self):
if self.data is None or (isinstance(self.data, weakref.ref) and self.data() is None):
data = np.load(self.data_filename, allow_pickle=False)
self.data = weakref.ref(data) if EnvironmentSettings.low_memory else data
data = self.data() if EnvironmentSettings.low_memory else self.data
self.element_count = data.shape[0]
return data
[docs]
def get_attribute(self, attribute):
data = self.load_data()
if attribute in data.dtype.names:
tmp = data[attribute]
return tmp
else:
return None
[docs]
def get_attributes(self, attributes: list):
data = self.load_data()
result = {}
for attribute in attributes:
if attribute in data.dtype.names:
result[attribute] = data[attribute]
else:
logging.warning(f"{Repertoire.__name__}: attribute {attribute} is not present in the repertoire {self.identifier}, skipping...")
return result
[docs]
def get_region_type(self):
region_types = self.get_attribute("region_types")
if region_types is not None:
region_types = set(region_types)
assert len(region_types) == 1, f"Repertoire {self.identifier}: expected one region_type, found: {region_types}"
return RegionType(region_types.pop())
else:
logging.warning(f'Repertoire {self.identifier}: region_types are not set for sequences.')
return None
[docs]
def free_memory(self):
self.data = None
def __getstate__(self):
state = self.__dict__.copy()
del state['data']
return state
def __setstate__(self, state):
self.__dict__.update(state)
self.data = None
[docs]
def get_element_count(self):
if self.element_count is None:
self.load_data()
return self.element_count
def _make_sequence_object(self, row, load_implants: bool = False):
fields = row.dtype.names
implants = []
if load_implants:
keys = [key for key in row.dtype.names if key not in Repertoire.FIELDS]
for key in keys:
value_dict = row[key]
if value_dict:
try:
implants.append(ImplantAnnotation(**ast.literal_eval(value_dict)))
except (SyntaxError, ValueError, TypeError) as e:
implants.append(ImplantAnnotation(signal_id=key))
seq = ReceptorSequence(amino_acid_sequence=row["sequence_aas"] if "sequence_aas" in fields else None,
nucleotide_sequence=row["sequences"] if "sequences" in fields else None,
identifier=row["sequence_identifiers"] if "sequence_identifiers" in fields else None,
metadata=SequenceMetadata(v_gene=row["v_genes"] if "v_genes" in fields else None,
j_gene=row["j_genes"] if "j_genes" in fields else None,
v_subgroup=row["v_subgroups"] if "v_subgroups" in fields else None,
j_subgroup=row["j_subgroups"] if "j_subgroups" in fields else None,
v_allele=row["v_alleles"] if "v_alleles" in fields else None,
j_allele=row["j_alleles"] if "j_alleles" in fields else None,
chain=row["chains"] if "chains" in fields else None,
count=row["counts"] if "counts" in fields and not NumpyHelper.is_nan_or_empty(row['counts']) else None,
region_type=row["region_types"] if "region_types" in fields else None,
frame_type=row["frame_types"] if "frame_types" in fields else "IN",
cell_id=row["cell_ids"] if "cell_ids" in fields else None,
custom_params={key: row[key] if key in fields else None
for key in set(self.fields) - set(Repertoire.FIELDS)}),
annotation=SequenceAnnotation(implants=implants))
return seq
def _prepare_cell_lists(self):
data = self.load_data()
assert "cell_ids" in data.dtype.names and data["cell_ids"] is not None, \
f"Repertoire: cannot return receptor objects in repertoire {self.identifier} since cell_ids are not specified. " \
f"Existing fields are: {str(data.dtype.names)[1:-1]}"
same_cell_lists = NumpyHelper.group_structured_array_by(data, "cell_ids")
return same_cell_lists
def _make_receptors(self, cell_content):
sequences = []
for item in cell_content:
sequences.append(self._make_sequence_object(item))
return ReceptorBuilder.build_objects(sequences)
[docs]
def get_sequence_objects(self, load_implants: bool = True) -> List[ReceptorSequence]:
"""
Lazily loads sequences from disk to reduce RAM consumption
Args:
load_implants: whether implants should be parsed to objects and converted to ImplantAnnotations; if True, might slow down the loading
Returns:
a list of ReceptorSequence objects
"""
seqs = []
data = self.load_data()
for i in range(len(self.get_sequence_identifiers())):
seq = self._make_sequence_object(data[i], load_implants)
seqs.append(seq)
return seqs
@property
def sequences(self):
return self.get_sequence_objects(True)
@property
def receptors(self) -> List[Receptor]:
"""
A property that creates a list of Receptor objects based on the cell_ids field in the following manner:
- all sequences that have the same cell_id are grouped together
- they are divided into groups based on the chain
- all valid combinations of chains are created and used to make a receptor object - this means that if a cell has
two beta (b1 and b2) and one alpha chain (a1), two receptor objects will be created: receptor1 (b1, a1), receptor2 (b2, a1)
To avoid have multiple receptors in the same cell, use some of the preprocessing classes which could merge/eliminate multiple
sequences. See the documentation of the preprocessing module for more information.
Returns:
ReceptorList: a list of objects of Receptor class
"""
receptors = []
same_cell_lists = self._prepare_cell_lists()
for cell_content in same_cell_lists:
receptors.extend(self._make_receptors(cell_content))
return receptors
@property
def cells(self) -> CellList:
"""
A property that creates a list of Cell objects based on the cell_ids field in the following manner:
- all sequences that have the same cell_id are grouped together
- they are divided into groups based on the chain
- all valid combinations of chains are created and used to make a receptor object - this means that if a cell has
two beta (b1 and b2) and one alpha chain (a1), two receptor objects will be created: receptor1 (b1, a1), receptor2 (b2, a1)
- an object of the Cell class is created from all receptors with the same cell_id created as described in the previous steps
To avoid have multiple receptors in the same cell, use some of the preprocessing classes which could merge/eliminate multiple
sequences. See the documentation of the preprocessing module for more information.
Returns:
CellList: a list of objects of Cell class
"""
cells = CellList()
cell_lists = self._prepare_cell_lists()
for cell_content in cell_lists:
receptors = self._make_receptors(cell_content)
cells.append(Cell(receptors))
return cell_lists