import copy
from pathlib import Path
from typing import List
import pandas as pd
import yaml
from immuneML.IO.dataset_import.ImmuneMLImport import ImmuneMLImport
from immuneML.data_model.dataset.Dataset import Dataset
from immuneML.data_model.dataset.ReceptorDataset import ReceptorDataset
from immuneML.data_model.dataset.RepertoireDataset import RepertoireDataset
from immuneML.data_model.receptor.Receptor import Receptor
from immuneML.data_model.repertoire.Repertoire import Repertoire
from immuneML.simulation.SimulationState import SimulationState
from immuneML.util.FilenameHandler import FilenameHandler
from immuneML.util.PathBuilder import PathBuilder
from immuneML.workflows.steps.Step import Step
[docs]
class SignalImplanter(Step):
DATASET_NAME = "simulated_dataset"
[docs]
@staticmethod
def run(simulation_state: SimulationState = None):
path = simulation_state.result_path / FilenameHandler.get_dataset_name(SignalImplanter.__name__)
if path.is_file():
dataset = ImmuneMLImport.import_dataset({"path": path}, SignalImplanter.DATASET_NAME)
else:
dataset = SignalImplanter._implant_signals_in_dataset(simulation_state)
return dataset
@staticmethod
def _implant_signals_in_dataset(simulation_state: SimulationState = None) -> Dataset:
PathBuilder.build(simulation_state.result_path)
if isinstance(simulation_state.dataset, RepertoireDataset):
dataset = SignalImplanter._implant_signals_in_repertoires(simulation_state)
else:
dataset = SignalImplanter._implant_signals_in_receptors(simulation_state)
return dataset
@staticmethod
def _implant_signals_in_receptors(simulation_state: SimulationState) -> Dataset:
processed_receptors = SignalImplanter._implant_signals(simulation_state, SignalImplanter._process_receptor, None)
processed_dataset = ReceptorDataset.build_from_objects(receptors=processed_receptors, file_size=simulation_state.dataset.file_size,
name=simulation_state.dataset.name, path=simulation_state.result_path)
processed_dataset.labels = {**(simulation_state.dataset.labels if simulation_state.dataset.labels is not None else {}),
**{signal.id: [True, False] for signal in simulation_state.signals}}
return processed_dataset
@staticmethod
def _implant_signals_in_repertoires(simulation_state: SimulationState = None) -> Dataset:
repertoires_path = PathBuilder.build(simulation_state.result_path / "repertoires")
processed_repertoires = SignalImplanter._implant_signals(simulation_state, SignalImplanter._process_repertoire, repertoires_path)
processed_dataset = RepertoireDataset(repertoires=processed_repertoires, labels={**(simulation_state.dataset.labels if simulation_state.dataset.labels is not None else {}),
**{signal.id: [True, False] for signal in simulation_state.signals}},
name=simulation_state.dataset.name,
metadata_file=Path(SignalImplanter._create_metadata_file(processed_repertoires, simulation_state)))
return processed_dataset
@staticmethod
def _implant_signals(simulation_state: SimulationState, process_element_func, output_path: Path):
processed_elements = []
simulation_limits = SignalImplanter._prepare_simulation_limits(simulation_state.simulation.implantings,
simulation_state.dataset.get_example_count())
current_implanting_index = 0
current_implanting = simulation_state.simulation.implantings[current_implanting_index]
for index, element in enumerate(simulation_state.dataset.get_data()):
if current_implanting is not None and index >= simulation_limits[current_implanting.name]:
current_implanting_index += 1
if current_implanting_index < len(simulation_limits.keys()):
current_implanting = simulation_state.simulation.implantings[current_implanting_index]
else:
current_implanting = None
processed_element = process_element_func(index, element, current_implanting, simulation_state, output_path)
processed_elements.append(processed_element)
return processed_elements
@staticmethod
def _process_receptor(index, receptor, implanting, simulation_state, output_path: Path = None) -> Receptor:
if implanting is not None:
new_receptor = receptor
for signal in implanting.signals:
new_receptor = signal.implant_in_receptor(new_receptor, implanting.is_noise)
else:
new_receptor = receptor.clone()
for signal in simulation_state.signals:
if signal.id not in new_receptor.metadata:
new_receptor.metadata[signal.id] = False
return new_receptor
@staticmethod
def _process_repertoire(index, repertoire, current_implanting, simulation_state, output_path: Path = None) -> Repertoire:
if current_implanting is not None:
new_repertoire = SignalImplanter._implant_in_repertoire(index, repertoire, current_implanting, simulation_state)
else:
new_metadata = {**repertoire.metadata, **{f"{signal.id}": False for signal in simulation_state.signals}}
new_repertoire = Repertoire.build_from_sequence_objects(repertoire.sequences, simulation_state.result_path / "repertoires",
metadata=new_metadata)
return new_repertoire
@staticmethod
def _create_metadata_file(processed_repertoires: List[Repertoire], simulation_state) -> str:
path = simulation_state.result_path / "metadata.csv"
new_df = pd.DataFrame([{**repertoire.metadata, **{'identifier': repertoire.identifier}} for repertoire in processed_repertoires])
new_df.drop('field_list', axis=1, inplace=True)
new_df["filename"] = [repertoire.data_filename.name for repertoire in processed_repertoires]
new_df.to_csv(path, index=False)
return path
@staticmethod
def _implant_in_repertoire(index, repertoire, implanting, simulation_state) -> Repertoire:
new_repertoire = copy.deepcopy(repertoire)
for signal in implanting.signals:
new_repertoire = signal.implant_to_repertoire(repertoire=new_repertoire,
repertoire_implanting_rate=implanting.repertoire_implanting_rate,
path=simulation_state.result_path / "repertoires/")
for signal in implanting.signals:
if implanting.is_noise:
new_repertoire.metadata[f"{signal.id}"] = False
else:
new_repertoire.metadata[f"{signal.id}"] = True
for signal in simulation_state.signals:
if signal not in implanting.signals:
new_repertoire.metadata[f"{signal.id}"] = False
with Path(new_repertoire.metadata_filename).open('w') as file:
yaml.safe_dump(new_repertoire.metadata, file)
return new_repertoire
@staticmethod
def _prepare_simulation_limits(simulation: list, element_count: int) -> dict:
"""for each implanting returns the last index of the element in the dataset with that implanting scheme"""
limits = {item.name: int(item.dataset_implanting_rate * element_count) for item in simulation}
limits = {item_name: sum(list(limits.values())[:i+1]) for i, item_name in enumerate(limits.keys())}
return limits