Source code for immuneML.preprocessing.ReferenceSequenceAnnotator

import copy
import glob
import math
import shutil
import subprocess
from pathlib import Path
from typing import List

import numpy as np
import pandas as pd

from immuneML.IO.dataset_export.AIRRExporter import AIRRExporter
from immuneML.data_model.dataset.RepertoireDataset import RepertoireDataset
from immuneML.data_model.dataset.SequenceDataset import SequenceDataset
from immuneML.data_model.receptor.RegionType import RegionType
from immuneML.data_model.receptor.receptor_sequence.ReceptorSequence import ReceptorSequence
from immuneML.data_model.repertoire.Repertoire import Repertoire
from immuneML.encodings.reference_encoding.MatchedReferenceUtil import MatchedReferenceUtil
from immuneML.preprocessing.Preprocessor import Preprocessor
from immuneML.util.CompAIRRHelper import CompAIRRHelper
from immuneML.util.CompAIRRParams import CompAIRRParams
from immuneML.util.ParameterValidator import ParameterValidator
from immuneML.util.PathBuilder import PathBuilder


[docs] class ReferenceSequenceAnnotator(Preprocessor): """ Annotates each sequence in each repertoire if it matches any of the reference sequences provided as input parameter. This report uses CompAIRR internally. To match CDR3 sequences (and not JUNCTION), CompAIRR v1.10 or later is needed. Arguments: reference_sequences (dict): A dictionary describing the reference dataset file. Import should be specified the same way as regular dataset import. It is only allowed to import a receptor dataset here (i.e., is_repertoire is False and paired is True by default, and these are not allowed to be changed). max_edit_distance (int): The maximum edit distance between a target sequence (from the repertoire) and the reference sequence. compairr_path (str): optional path to the CompAIRR executable. If not given, it is assumed that CompAIRR has been installed such that it can be called directly on the command line with the command 'compairr', or that it is located at /usr/local/bin/compairr. threads (int): how many threads to be used by CompAIRR for sequence matching ignore_genes (bool): Whether to ignore V and J gene information. If False, the V and J genes between two receptor chains have to match. If True, gene information is ignored. By default, ignore_genes is False. output_column_name (str): in case there are multiple annotations, it is possible here to define the name of the column in the output repertoire files for this specific annotation repertoire_batch_size (int): how many repertoires to process simultaneously; depending on the repertoire size, this parameter might be use to limit the memory usage YAML specification: .. indent with spaces .. code-block:: yaml preprocessing_sequences: my_preprocessing: - step1: ReferenceSequenceAnnotator: reference_sequences: format: VDJDB params: path: path/to/file.csv compairr_path: optional/path/to/compairr ignore_genes: False max_edit_distance: 0 output_column_name: matched threads: 4 repertoire_batch_size: 5 """ def __init__(self, reference_sequences: List[ReceptorSequence], max_edit_distance: int, compairr_path: str, ignore_genes: bool, threads: int, output_column_name: str, repertoire_batch_size: int): super().__init__() self._reference_sequences = reference_sequences self._max_edit_distance = max_edit_distance self._output_column_name = output_column_name self._repertoire_batch_size = repertoire_batch_size self._compairr_params = CompAIRRParams(compairr_path=CompAIRRHelper.determine_compairr_path(compairr_path), keep_compairr_input=True, differences=max_edit_distance, indels=False, ignore_counts=True, ignore_genes=ignore_genes, threads=threads, output_filename="compairr_out.tsv", log_filename="compairr_log.txt", output_pairs=False, do_repertoire_overlap=False, do_sequence_matching=True, pairs_filename="pairs.tsv")
[docs] @classmethod def build_object(cls, **kwargs): ParameterValidator.assert_keys(list(kwargs.keys()), ['reference_sequences', 'max_edit_distance', 'compairr_path', 'ignore_genes', 'output_column_name', 'threads', 'repertoire_batch_size'], ReferenceSequenceAnnotator.__name__, ReferenceSequenceAnnotator.__name__) ref_seqs = MatchedReferenceUtil.prepare_reference(reference_params=kwargs['reference_sequences'], location=ReferenceSequenceAnnotator.__name__, paired=False) return ReferenceSequenceAnnotator(**{**kwargs, 'reference_sequences': ref_seqs})
[docs] def process_dataset(self, dataset: RepertoireDataset, result_path: Path, number_of_processes=1) -> RepertoireDataset: region_type = self._get_region_type_from_dataset(dataset) self._compairr_params.is_cdr3 = region_type == RegionType.IMGT_CDR3 sequence_filepath = self._prepare_sequences_for_compairr(result_path / 'tmp', region_type) repertoires_filepaths = self._prepare_repertoires_for_compairr(dataset, result_path / 'tmp') compairr_output_files = self._annotate_repertoires(sequence_filepath, repertoires_filepaths, result_path, region_type) processed_dataset = self._make_annotated_dataset(dataset, result_path, compairr_output_files) shutil.rmtree(result_path / 'tmp') return processed_dataset
def _get_region_type_from_dataset(self, dataset: RepertoireDataset) -> RegionType: region_types = [repertoire.get_region_type() for repertoire in dataset.repertoires] assert all(region_types[0] == region_type for region_type in region_types), \ "Not all repertoires have the sequences with the same region type." return region_types[0] def _annotate_repertoires(self, sequences_filepath, repertoire_filepaths: List[Path], result_path: Path, region_type: RegionType): tmp_path = PathBuilder.build(result_path / 'tmp') updated_output_files = [] for index, rep_file in enumerate(repertoire_filepaths): batch_tmp_path = PathBuilder.build(tmp_path / str(index)) args = CompAIRRHelper.get_cmd_args(self._compairr_params, [rep_file, sequences_filepath], batch_tmp_path) compairr_result = subprocess.run(args, capture_output=True, text=True) output_file = CompAIRRHelper.verify_compairr_output_path(compairr_result, self._compairr_params, batch_tmp_path) updated_output_file = result_path / f'updated_compairr_output_{index}.tsv' with open(output_file, 'r') as file: output_lines = file.readlines() with updated_output_file.open(mode='w') as file: output_lines[0] = output_lines[0].replace("#", '') file.writelines(output_lines) updated_output_files.append(updated_output_file) return updated_output_files def _make_annotated_dataset(self, dataset: RepertoireDataset, result_path: Path, compairr_output_files: List[Path]) -> RepertoireDataset: repertoires = [] repertoire_path = PathBuilder.build(result_path / 'repertoires') for index, repertoire in enumerate(dataset.repertoires): compairr_out_df = pd.read_csv(compairr_output_files[index], sep='\t', comment="#") sequences = self._add_params_to_sequence_objects(repertoire.sequences, compairr_out_df.iloc[:, 1]) repertoires.append(Repertoire.build_from_sequence_objects(sequences, repertoire_path, repertoire.metadata)) return RepertoireDataset.build_from_objects(**{'repertoires': repertoires, 'path': result_path}) def _add_params_to_sequence_objects(self, sequence_objects: List[ReceptorSequence], matches_reference): sequences = copy.deepcopy(sequence_objects) for seq_index, seq in enumerate(sequences): seq.metadata.custom_params[self._output_column_name] = int(matches_reference[seq_index]) return sequences def _prepare_sequences_for_compairr(self, result_path: Path, region_type: RegionType) -> Path: path = PathBuilder.build(result_path) / 'reference_sequences.tsv' # TODO: remove this when import is refactored, this now ensures that string matching is done on sequences as imported reference_sequences = [] for seq in self._reference_sequences: tmp_seq = copy.deepcopy(seq) tmp_seq.metadata.region_type = region_type reference_sequences.append(tmp_seq) AIRRExporter.export(SequenceDataset.build_from_objects(reference_sequences, len(self._reference_sequences), PathBuilder.build(result_path / 'tmp_seq_dataset')), result_path) result_files = glob.glob(str(result_path / "*.tsv")) assert len(result_files) == 1, f"Error occurred while exporting sequences for matching using CompAIRR. Resulting files: {result_files}" shutil.move(result_files[0], path) return path def _prepare_repertoires_for_compairr(self, dataset: RepertoireDataset, result_path: Path) -> List[Path]: PathBuilder.build(result_path) paths = [] for i, repertoire in enumerate(dataset.repertoires): path = result_path / f'repertoires_{i}.tsv' CompAIRRHelper.write_repertoire_file(repertoires=[repertoire], filename=path, compairr_params=self._compairr_params, export_sequence_id=True) paths.append(path) return paths def _check_column_name(self, dataset): for repertoire in dataset.repertoires: assert repertoire.get_attribute(self._output_column_name) is None, \ (f"{ReferenceSequenceAnnotator.__name__}: attribute {self._output_column_name} already exists in " f"repertoire ({repertoire.identifier}); choose another name.")