import os
import shutil
from dataclasses import dataclass
from pathlib import Path
import numpy as np
import pandas as pd
from bionumpy.bnpdataclass import BNPDataClass
from olga import load_model
from olga.generation_probability import GenerationProbabilityVJ, GenerationProbabilityVDJ
from olga.sequence_generation import SequenceGenerationVJ, SequenceGenerationVDJ
from immuneML.IO.dataset_import.DatasetImportParams import DatasetImportParams
from immuneML.IO.dataset_import.OLGAImport import OLGAImport
from immuneML.data_model.bnp_util import read_yaml, write_yaml
from immuneML.data_model.SequenceParams import RegionType
from immuneML.data_model.SequenceParams import Chain
from immuneML.dsl.DefaultParamsLoader import DefaultParamsLoader
from immuneML.environment.SequenceType import SequenceType
from immuneML.ml_methods.generative_models.GenerativeModel import GenerativeModel
from immuneML.ml_methods.generative_models.InternalOlgaModel import InternalOlgaModel
from immuneML.simulation.util.igor_helper import make_skewed_model_files
from immuneML.util.ImportHelper import ImportHelper
from immuneML.util.ParameterValidator import ParameterValidator
from immuneML.util.PathBuilder import PathBuilder
[docs]
@dataclass
class OLGA(GenerativeModel):
"""
This is a wrapper for the OLGA package as described by Sethna et al. 2019 (OLGA package on PyPI or GitHub:
https://github.com/statbiophys/OLGA ).
This model should be used only for LIgO simulation and is not yet supported for use with TrainGenModel instruction.
Reference:
Zachary Sethna, Yuval Elhanati, Curtis G Callan, Jr, Aleksandra M Walczak, Thierry Mora, OLGA: fast computation of
generation probabilities of B- and T-cell receptor amino acid sequences and motifs, Bioinformatics, Volume 35,
Issue 17, 1 September 2019, Pages 2974–2981, https://doi.org/10.1093/bioinformatics/btz035
Note:
- OLGA generates sequences that correspond to IMGT junction and are used for matching as such. See the
https://github.com/statbiophys/OLGA for more details.
- Gene names are as provided in OLGA (either in default models or in the user-specified model files). For
simulation, one should use gene names in the same format.
.. note::
While this is a generative model, in the current version of immuneML it cannot be used in combination with TrainGenModel or
ApplyGenModel instruction. If you want to use OLGA for sequence simulation, see :ref:`Dataset simulation with LIgO`.
`
**Specification arguments:**
- model_path (str): if not default model, this parameter should point to a folder where the four OLGA/IGOR format
files are stored (could also be inferred from some experimental data)
- default_model_name (str): if not using custom models, one of the OLGA default models could be specified here;
the value should be the same as it would be passed to command line in OLGA: e.g., humanTRB, human IGH
**YAML specification:**
.. indent with spaces
.. code-block:: yaml
definitions:
ml_methods:
generative_model:
type: OLGA
model_path: None
default_model_name: humanTRB
"""
model_path: Path = None
default_model_name: str = None
locus: Chain = None
region_type: RegionType = RegionType.IMGT_JUNCTION
_olga_model: InternalOlgaModel = None
DEFAULT_MODEL_FOLDER_MAP = {
"humanTRA": "human_T_alpha", "humanTRB": "human_T_beta",
"humanIGH": "human_B_heavy", "humanIGK": "human_B_kappa", "humanIGL": "human_B_lambda",
"mouseTRB": "mouse_T_beta", "mouseTRA": "mouse_T_alpha"
}
MODEL_FILENAMES = {'marginals': 'model_marginals.txt', 'params': 'model_params.txt',
'v_gene_anchor': 'V_gene_CDR3_anchors.csv',
'j_gene_anchor': 'J_gene_CDR3_anchors.csv'}
OUTPUT_COLUMNS = ["sequence", 'sequence_aa', 'v_call', 'j_call', 'region_type', "frame_type", "p_gen",
"from_default_model", 'duplicate_count', 'locus']
[docs]
@classmethod
def build_object(cls, **kwargs):
location = OLGA.__name__
ParameterValidator.assert_keys(list(kwargs.keys()), ['model_path', 'default_model_name', 'locus'], location,
'OLGA generative model', exclusive=False)
if 'model_path' in kwargs and kwargs['model_path']:
assert 'locus' in kwargs, f"{OLGA.__name__}: locus not defined."
assert Path(kwargs['model_path']).is_dir(), \
f"{OLGA.__name__}: the model path is not a directory. It has to be a directory and contain files with the exact names as " \
f"described in the OLGA package documentation: https://github.com/statbiophys/OLGA."
for filename in OLGA.MODEL_FILENAMES.values():
assert (Path(kwargs['model_path']) / filename).is_file(), \
f"{OLGA.__name__}: file {filename} is missing in the specified directory: {kwargs['model_path']}"
assert kwargs['default_model_name'] is None, \
f"{OLGA.__name__}: default_model_name must be None when model_path is set, but now it is {kwargs['default_model_name']}."
locus = Chain.get_chain(kwargs['locus'])
else:
ParameterValidator.assert_in_valid_list(kwargs['default_model_name'],
list(OLGA.DEFAULT_MODEL_FOLDER_MAP.keys()), location,
'default_model_name')
locus = Chain.get_chain(kwargs['default_model_name'][-3:])
kwargs['model_path'] = Path(
load_model.__file__).parent / f"default_models/{OLGA.DEFAULT_MODEL_FOLDER_MAP[kwargs['default_model_name']]}"
return OLGA(**{**kwargs, **{'locus': locus}})
[docs]
@classmethod
def load_model(cls, path: Path):
assert path.exists(), f"{cls.__name__}: {path} does not exist."
model_overview_file = path / 'model_overview.yaml'
for file in [model_overview_file]:
assert file.exists(), f"{cls.__name__}: {file} is not a file."
model_overview = read_yaml(model_overview_file)
olga = OLGA(**{k: v for k, v in model_overview.items() if k != 'type'})
olga._olga_model = olga.load_internal_model(path)
return olga
@property
def is_vdj(self):
return self.locus in [Chain.BETA, Chain.HEAVY]
[docs]
def fit(self, data, path: Path = None):
raise NotImplementedError("Fitting an OLGA model is currently not supported by immuneML.")
[docs]
def load_internal_model(self, model_path: Path = None) -> InternalOlgaModel:
model_path = self.model_path if model_path is None else model_path
olga_gen_model = load_model.GenerativeModelVDJ() if self.is_vdj else load_model.GenerativeModelVJ()
olga_gen_model.load_and_process_igor_model(str(model_path / OLGA.MODEL_FILENAMES['marginals']))
genomic_data = load_model.GenomicDataVDJ() if self.is_vdj else load_model.GenomicDataVJ()
genomic_data.load_igor_genomic_data(params_file_name=str(model_path / OLGA.MODEL_FILENAMES['params']),
V_anchor_pos_file=str(model_path / OLGA.MODEL_FILENAMES['v_gene_anchor']),
J_anchor_pos_file=str(model_path / OLGA.MODEL_FILENAMES['j_gene_anchor']))
v_gene_mapping = [gene[0] for gene in genomic_data.genV]
j_gene_mapping = [gene[0] for gene in genomic_data.genJ]
sequence_gen_model = SequenceGenerationVDJ(olga_gen_model, genomic_data) if self.is_vdj \
else SequenceGenerationVJ(olga_gen_model, genomic_data)
return InternalOlgaModel(sequence_gen_model=sequence_gen_model, v_gene_mapping=v_gene_mapping,
j_gene_mapping=j_gene_mapping,
genomic_data=genomic_data, olga_gen_model=olga_gen_model)
[docs]
def generate_sequences(self, count: int, seed: int, path: Path, sequence_type: SequenceType,
compute_p_gen: bool) -> Path:
if not self._olga_model:
self._olga_model = self.load_internal_model()
self._generate_productive_sequences(count, path, seed, self._olga_model, compute_p_gen, sequence_type)
return path
def _generate_productive_sequences(self, count: int, path: Path, seed: int, olga_model: InternalOlgaModel,
compute_p_gen: bool,
sequence_type: SequenceType, **kwargs):
sequences = pd.DataFrame(index=np.arange(count), columns=OLGA.OUTPUT_COLUMNS)
for i in range(count):
seq_row = olga_model.sequence_gen_model.gen_rnd_prod_CDR3()
p_gen = self.compute_p_gen(
{'sequence': seq_row[0], 'sequence_aa': seq_row[1], 'v_call': olga_model.v_gene_mapping[seq_row[2]],
'j_call': olga_model.j_gene_mapping[seq_row[3]]}, sequence_type) if compute_p_gen else -1.
sequences.loc[i] = (
seq_row[0], seq_row[1], olga_model.v_gene_mapping[seq_row[2]], olga_model.j_gene_mapping[seq_row[3]],
RegionType.IMGT_JUNCTION.name, 1, p_gen, int(olga_model == self._olga_model),
-1, self.locus.value)
sequences.to_csv(path, index=False, sep='\t')
return path
[docs]
def compute_p_gen(self, sequence: dict, sequence_type: SequenceType, sequence_field: str = None) -> float:
cls = GenerationProbabilityVDJ if self.is_vdj else GenerationProbabilityVJ
p_gen_model = cls(generative_model=self._olga_model.olga_gen_model, genomic_data=self._olga_model.genomic_data)
if sequence_type == SequenceType.NUCLEOTIDE:
seq_field_name = 'sequence' if sequence_field is None else sequence_field
return p_gen_model.compute_nt_CDR3_pgen(sequence[seq_field_name], sequence['v_call'], sequence['j_call'])
else:
seq_field_name = 'sequence_aa' if sequence_field is None else sequence_field
return p_gen_model.compute_aa_CDR3_pgen(sequence[seq_field_name], sequence['v_call'], sequence['j_call'])
[docs]
def compute_p_gens(self, sequences: BNPDataClass, sequence_type: SequenceType, sequence_field: str = None) -> list:
cls = GenerationProbabilityVDJ if self.is_vdj else GenerationProbabilityVJ
p_gen_model = cls(generative_model=self._olga_model.olga_gen_model, genomic_data=self._olga_model.genomic_data)
p_gen_func = p_gen_model.compute_nt_CDR3_pgen if sequence_type == SequenceType.NUCLEOTIDE else p_gen_model.compute_aa_CDR3_pgen
if sequence_field is None:
seq_field = 'sequence' if sequence_type == SequenceType.NUCLEOTIDE else 'sequence_aa'
else:
seq_field = sequence_field
return [p_gen_func(getattr(seq, seq_field).to_string(), seq.v_call.to_string(), seq.j_call.to_string(), False)
for seq in sequences]
[docs]
def can_compute_p_gens(self) -> bool:
return True
[docs]
def can_generate_from_skewed_gene_models(self) -> bool:
return True
[docs]
def generate_from_skewed_gene_models(self, v_genes: list, j_genes: list, seed: int, path: Path,
sequence_type: SequenceType, batch_size: int,
compute_p_gen: bool):
if not self._olga_model:
self._olga_model = self.load_internal_model()
if len(v_genes) > 0 or len(j_genes) > 0:
skewed_model_path = PathBuilder.build(path.parent / "skewed_model/")
skewed_seqs_path = PathBuilder.build(path.parent) / f"tmp_skewed_model_seqs_{seed}.tsv"
make_skewed_model_files(v_genes, j_genes, self.model_path, skewed_model_path)
skewed_model = self.load_internal_model(skewed_model_path)
self._generate_productive_sequences(count=batch_size, path=skewed_seqs_path, seed=seed,
olga_model=skewed_model,
compute_p_gen=compute_p_gen, sequence_type=sequence_type)
if skewed_seqs_path.is_file():
pd.read_csv(skewed_seqs_path, sep='\t').to_csv(path, mode='a' if path.is_file() else 'w', sep='\t',
index=False,
header=not path.is_file())
os.remove(skewed_seqs_path)
def _import_olga_sequences(self, sequence_type: SequenceType, path: Path):
import_empty_nt_sequences = False if sequence_type == SequenceType.NUCLEOTIDE else True
default_params = DefaultParamsLoader.load('datasets', 'olga')
params = DatasetImportParams.build_object(
**{**default_params, **{'path': path, "import_empty_nt_sequences": import_empty_nt_sequences}})
sequences = ImportHelper.import_items(OLGAImport, path, params)
return sequences
[docs]
def is_same(self, model) -> bool:
return type(model) == type(self) and self.locus == model.locus and self.model_path == model.model_path and \
self.default_model_name == model.default_model_name
[docs]
def save_model(self, path: Path) -> Path:
PathBuilder.build(path / 'model')
write_yaml(path / 'model_overview.yaml',
{'type': 'OLGA', 'default_model_name': self.default_model_name, 'locus': self.locus.name,
'region_type': self.region_type.name})
shutil.copytree(str(self.model_path), str(path / 'model'), dirs_exist_ok=True)
return Path(shutil.make_archive(str(path / 'trained_model'), "zip", str(path / 'model'))).absolute()