import copy
from multiprocessing.pool import Pool
from pathlib import Path
import numpy as np
from immuneML.data_model.dataset.RepertoireDataset import RepertoireDataset
from immuneML.data_model.repertoire.Repertoire import Repertoire
from immuneML.preprocessing.filters.Filter import Filter
[docs]class CountPerSequenceFilter(Filter):
"""
Removes all sequences from a Repertoire when they have a count below low_count_limit, or sequences with no count
value if remove_without_counts is True.
This filter can be applied to Repertoires and RepertoireDatasets.
Arguments:
low_count_limit (int): The inclusive minimal count value in order to retain a given sequence.
remove_without_count (bool): Whether the sequences without a reported count value should be removed.
remove_empty_repertoires (bool): Whether repertoires without sequences should be removed.
Only has an effect when remove_without_count is also set to True.
batch_size (int): number of repertoires that can be loaded at the same time (only affects the speed when applying this filter on a RepertoireDataset)
YAML specification:
.. indent with spaces
.. code-block:: yaml
preprocessing_sequences:
my_preprocessing:
- my_filter:
CountPerSequenceFilter:
remove_without_count: True
remove_empty_repertoires: True
low_count_limit: 3
batch_size: 4
"""
def __init__(self, low_count_limit: int, remove_without_count: bool, remove_empty_repertoires: bool, batch_size: int):
self.low_count_limit = low_count_limit
self.remove_without_count = remove_without_count
self.remove_empty_repertoires = remove_empty_repertoires
self.batch_size = batch_size
[docs] @staticmethod
def process(dataset: RepertoireDataset, params: dict) -> RepertoireDataset:
CountPerSequenceFilter.check_dataset_type(dataset, [RepertoireDataset], "CountPerSequenceFilter")
processed_dataset = copy.deepcopy(dataset)
with Pool(params["batch_size"]) as pool:
repertoires = pool.starmap(CountPerSequenceFilter.process_repertoire,
[(repertoire, params) for repertoire in dataset.repertoires])
if params["remove_empty_repertoires"]:
repertoires = Filter.remove_empty_repertoires(repertoires)
processed_dataset.repertoires = repertoires
CountPerSequenceFilter.check_dataset_not_empty(processed_dataset, "CountPerSequenceFilter")
return processed_dataset
[docs] @staticmethod
def process_repertoire(repertoire: Repertoire, params: dict) -> Repertoire:
counts = repertoire.get_counts()
counts = counts if counts is not None else np.full(repertoire.get_element_count(), None)
not_none_indices = counts != None
counts[not_none_indices] = counts[not_none_indices].astype(np.int)
indices_to_keep = np.full(repertoire.get_element_count(), False)
if params["remove_without_count"] and params["low_count_limit"] is not None:
np.greater_equal(counts, params["low_count_limit"], out=indices_to_keep, where=not_none_indices)
elif params["remove_without_count"]:
indices_to_keep = not_none_indices
elif params["low_count_limit"] is not None:
indices_to_keep[np.logical_not(not_none_indices)] = True
np.greater_equal(counts, params["low_count_limit"], out=indices_to_keep, where=not_none_indices)
processed_repertoire = Repertoire.build_like(repertoire, indices_to_keep, params["result_path"], filename_base=f"{repertoire.data_filename.stem}_filtered")
return processed_repertoire
[docs] def process_dataset(self, dataset: RepertoireDataset, result_path: Path) -> RepertoireDataset:
params = {"result_path": result_path, "low_count_limit": self.low_count_limit, "remove_without_count": self.remove_without_count,
"remove_empty_repertoires": self.remove_empty_repertoires, "batch_size": self.batch_size}
return CountPerSequenceFilter.process(dataset, params)