Source code for immuneML.preprocessing.ReferenceSequenceAnnotator

import copy
import glob
import shutil
import subprocess
from pathlib import Path
from typing import List

import pandas as pd

from immuneML.IO.dataset_export.AIRRExporter import AIRRExporter
from immuneML.data_model.SequenceParams import RegionType
from immuneML.data_model.SequenceSet import ReceptorSequence, Repertoire
from immuneML.data_model.datasets.ElementDataset import SequenceDataset
from immuneML.data_model.datasets.RepertoireDataset import RepertoireDataset
from immuneML.encodings.reference_encoding.MatchedReferenceUtil import MatchedReferenceUtil
from immuneML.preprocessing.Preprocessor import Preprocessor
from immuneML.util.CompAIRRHelper import CompAIRRHelper
from immuneML.util.CompAIRRParams import CompAIRRParams
from immuneML.util.ParameterValidator import ParameterValidator
from immuneML.util.PathBuilder import PathBuilder


[docs] class ReferenceSequenceAnnotator(Preprocessor): """ Annotates each sequence in each repertoire if it matches any of the reference sequences provided as input parameter. This report uses CompAIRR internally. To match CDR3 sequences (and not JUNCTION), CompAIRR v1.10 or later is needed. **Specification arguments:** - reference_sequences (dict): A dictionary describing the reference dataset file. Import should be specified the same way as regular dataset import. It is only allowed to import a receptor dataset here (i.e., is_repertoire is False and paired is True by default, and these are not allowed to be changed). - max_edit_distance (int): The maximum edit distance between a target sequence (from the repertoire) and the reference sequence. - compairr_path (str): optional path to the CompAIRR executable. If not given, it is assumed that CompAIRR has been installed such that it can be called directly on the command line with the command 'compairr', or that it is located at /usr/local/bin/compairr. - threads (int): how many threads to be used by CompAIRR for sequence matching - ignore_genes (bool): Whether to ignore V and J gene information. If False, the V and J genes between two receptor chains have to match. If True, gene information is ignored. By default, ignore_genes is False. - output_column_name (str): in case there are multiple annotations, it is possible here to define the name of the column in the output repertoire files for this specific annotation - repertoire_batch_size (int): how many repertoires to process simultaneously; depending on the repertoire size, this parameter might be use to limit the memory usage - region_type (str): which region type to check, default is IMGT_CDR3 **YAML specification:** .. indent with spaces .. code-block:: yaml preprocessing_sequences: my_preprocessing: - step1: ReferenceSequenceAnnotator: reference_sequences: format: VDJDB params: path: path/to/file.csv compairr_path: optional/path/to/compairr ignore_genes: False max_edit_distance: 0 output_column_name: matched threads: 4 repertoire_batch_size: 5 region_type: IMGT_CDR3 """ def __init__(self, reference_sequences: List[ReceptorSequence], max_edit_distance: int, compairr_path: str, ignore_genes: bool, threads: int, output_column_name: str, repertoire_batch_size: int, region_type: RegionType): super().__init__() self._reference_sequences = reference_sequences self._max_edit_distance = max_edit_distance self._output_column_name = output_column_name self._repertoire_batch_size = repertoire_batch_size self._region_type = region_type self._compairr_params = CompAIRRParams(compairr_path=CompAIRRHelper.determine_compairr_path(compairr_path), keep_compairr_input=True, differences=max_edit_distance, indels=False, ignore_counts=True, ignore_genes=ignore_genes, threads=threads, output_filename="compairr_out.tsv", log_filename="compairr_log.txt", output_pairs=False, do_repertoire_overlap=False, do_sequence_matching=True, pairs_filename="pairs.tsv")
[docs] @classmethod def build_object(cls, **kwargs): ParameterValidator.assert_keys(list(kwargs.keys()), ['reference_sequences', 'max_edit_distance', 'compairr_path', 'ignore_genes', 'output_column_name', 'threads', 'repertoire_batch_size', 'region_type'], ReferenceSequenceAnnotator.__name__, ReferenceSequenceAnnotator.__name__) ParameterValidator.assert_in_valid_list(kwargs['region_type'].upper(), [el.name for el in RegionType], ReferenceSequenceAnnotator.__name__, 'region_type') ref_seqs = MatchedReferenceUtil.prepare_reference(reference_params=kwargs['reference_sequences'], location=ReferenceSequenceAnnotator.__name__, paired=False) return ReferenceSequenceAnnotator(**{**kwargs, 'reference_sequences': ref_seqs, 'region_type': RegionType[kwargs['region_type'].upper()]})
[docs] def process_dataset(self, dataset: RepertoireDataset, result_path: Path, number_of_processes=1) -> RepertoireDataset: self._compairr_params.is_cdr3 = self._region_type == RegionType.IMGT_CDR3 sequence_filepath = self._prepare_sequences_for_compairr(result_path / 'tmp') repertoires_filepaths = self._prepare_repertoires_for_compairr(dataset, result_path / 'tmp') compairr_output_files = self._annotate_repertoires(sequence_filepath, repertoires_filepaths, result_path) processed_dataset = self._make_annotated_dataset(dataset, result_path, compairr_output_files) shutil.rmtree(result_path / 'tmp') return processed_dataset
def _annotate_repertoires(self, sequences_filepath, repertoire_filepaths: List[Path], result_path: Path): tmp_path = PathBuilder.build(result_path / 'tmp') updated_output_files = [] for index, rep_file in enumerate(repertoire_filepaths): batch_tmp_path = PathBuilder.build(tmp_path / str(index)) args = CompAIRRHelper.get_cmd_args(self._compairr_params, [rep_file, sequences_filepath], batch_tmp_path) compairr_result = subprocess.run(args, capture_output=True, text=True) output_file = CompAIRRHelper.verify_compairr_output_path(compairr_result, self._compairr_params, batch_tmp_path) updated_output_file = result_path / f'updated_compairr_output_{index}.tsv' with open(output_file, 'r') as file: output_lines = file.readlines() with updated_output_file.open(mode='w') as file: output_lines[0] = output_lines[0].replace("#", '') file.writelines(output_lines) updated_output_files.append(updated_output_file) return updated_output_files def _make_annotated_dataset(self, dataset: RepertoireDataset, result_path: Path, compairr_output_files: List[Path]) -> RepertoireDataset: repertoires = [] repertoire_path = PathBuilder.build(result_path / 'repertoires') for index, repertoire in enumerate(dataset.repertoires): compairr_out_df = pd.read_csv(compairr_output_files[index], sep='\t', comment="#") data = repertoire.data.add_fields({self._output_column_name: compairr_out_df.iloc[:, 1].values.astype(int)}, {self._output_column_name: int}) type_dict = type(data).get_field_type_dict(all_fields=False) repertoires.append(Repertoire.build_from_dc_object(repertoire_path, metadata=repertoire.metadata, filename_base=repertoire.data_filename.name, data=data, type_dict=type_dict)) return RepertoireDataset.build_from_objects(**{'repertoires': repertoires, 'path': result_path}) def _prepare_sequences_for_compairr(self, result_path: Path) -> Path: path = PathBuilder.build(result_path) / 'reference_sequences.tsv' # TODO: remove this when import is refactored, this now ensures that string matching is done on sequences as imported reference_sequences = [] for seq in self._reference_sequences: tmp_seq = copy.deepcopy(seq) tmp_seq.duplicate_count = seq.duplicate_count if not self._compairr_params.ignore_counts else 1 reference_sequences.append(tmp_seq) SequenceDataset.build_from_objects(reference_sequences, PathBuilder.build(result_path), region_type=self._region_type) result_files = glob.glob(str(result_path / "*.tsv")) assert len(result_files) == 1, \ f"Error occurred while exporting sequences for matching using CompAIRR. Resulting files: {result_files}" shutil.move(result_files[0], path) return path def _prepare_repertoires_for_compairr(self, dataset: RepertoireDataset, result_path: Path) -> List[Path]: PathBuilder.build(result_path) paths = [] for i, repertoire in enumerate(dataset.repertoires): path = result_path / f'repertoires_{i}.tsv' CompAIRRHelper.write_repertoire_file(repertoires=[repertoire], filename=path, compairr_params=self._compairr_params, export_sequence_id=True) paths.append(path) return paths def _check_column_name(self, dataset): for repertoire in dataset.repertoires: assert repertoire.get_attribute(self._output_column_name) is None, \ (f"{ReferenceSequenceAnnotator.__name__}: attribute {self._output_column_name} already exists in " f"repertoire ({repertoire.identifier}); choose another name.")