from pathlib import Path
import numpy as np
import os
from immuneML.data_model.SequenceParams import RegionType
from immuneML.encodings.EncoderParams import EncoderParams
from immuneML.encodings.abundance_encoding.CompAIRRSequenceAbundanceEncoder import CompAIRRSequenceAbundanceEncoder
from immuneML.encodings.abundance_encoding.KmerAbundanceEncoder import KmerAbundanceEncoder
from immuneML.encodings.abundance_encoding.SequenceAbundanceEncoder import SequenceAbundanceEncoder
from immuneML.encodings.kmer_frequency.sequence_encoding.SequenceEncodingType import SequenceEncodingType
from immuneML.environment.EnvironmentSettings import EnvironmentSettings
from immuneML.environment.SequenceType import SequenceType
from immuneML.util.ParameterValidator import ParameterValidator
[docs]
class SignificantFeaturesHelper:
[docs]
@staticmethod
def parse_parameters(kwargs, location):
ParameterValidator.assert_keys_present(kwargs.keys(), ["p_values", "k_values", "label", 'region_type', 'sequence_type'], location, location)
ParameterValidator.assert_type_and_value(kwargs["p_values"], list, location, "p_values")
ParameterValidator.assert_type_and_value(kwargs["k_values"], list, location, "k_values")
assert isinstance(kwargs["label"], dict), f"{location}: {kwargs['label']} is not a valid value for parameter label. " \
f"It has to be of type dict, but is now of type {type(kwargs['label']).__name__}." \
f"Did you remember to set the positive_class?"
assert len(kwargs["label"]) == 1, f"{location}: only one label is allowed to be set, found {len(kwargs['label'])}: {list(kwargs['label'])}"
label_name = list(kwargs["label"].keys())[0]
assert "positive_class" in kwargs["label"][label_name], f"{location}: positive_class must be set for label {label_name}"
assert len(kwargs["p_values"]) == len(set( kwargs["p_values"])), f"{location}: p_values should only contain unique values, found {kwargs['p_values']}"
assert len(kwargs["k_values"]) == len(set( kwargs["k_values"])), f"{location}: k_values should only contain unique values, found {kwargs['k_values']}"
ParameterValidator.assert_all_type_and_value(kwargs["p_values"], float, "location", "p_values", min_inclusive=0)
ParameterValidator.assert_region_type(kwargs, SignificantFeaturesHelper.__name__)
ParameterValidator.assert_sequence_type(kwargs, SignificantFeaturesHelper.__name__)
kwargs['region_type'] = RegionType[kwargs['region_type'].upper()]
kwargs['sequence_type'] = SequenceType[kwargs['sequence_type'].upper()]
for value in kwargs["k_values"]:
if value != "full_sequence":
ParameterValidator.assert_type_and_value(value, int, location, "k_values", 1)
if "compairr_path" in kwargs and kwargs["compairr_path"] is not None:
ParameterValidator.assert_type_and_value(kwargs["compairr_path"], str, location, "compairr_path")
kwargs["compairr_path"] = Path(kwargs["compairr_path"])
return kwargs
[docs]
@staticmethod
def parse_sequences_path(kwargs, field_name, location):
ParameterValidator.assert_keys_present(kwargs.keys(), [field_name], location, location)
ParameterValidator.assert_type_and_value(kwargs[field_name], str, location,
field_name)
assert os.path.isfile(kwargs[field_name]), f"{location}: implanted_sequences_path does not exist: {kwargs['field_name']}"
kwargs[field_name] = Path(kwargs[field_name])
return kwargs
[docs]
@staticmethod
def load_sequences(ground_truth_sequences_path: Path):
with open(ground_truth_sequences_path) as f:
lines = f.readlines()
sequences = [seq.strip() for seq in lines]
return sequences
@staticmethod
def _get_encoder_name(k):
encoder_name = f"{k}-mer" if type(k) == int else k
return encoder_name
@staticmethod
def _build_encoder_params(label_config, encoder_result_path, region_type: RegionType, sequence_type: SequenceType):
encoder_params = EncoderParams(result_path=encoder_result_path,
label_config=label_config,
pool_size=1,
learn_model=True,
encode_labels=False,
region_type=region_type,
sequence_type=sequence_type)
return encoder_params
@staticmethod
def _build_kmer_encoder(dataset, k, p_value, encoder_params):
encoder = KmerAbundanceEncoder(p_value_threshold=p_value,
sequence_encoding=SequenceEncodingType.CONTINUOUS_KMER,
k=k, k_left=0, k_right=0, min_gap=0, max_gap=0)
encoder.encode(dataset, encoder_params)
return encoder
@staticmethod
def _build_sequence_encoder(dataset, p_value, encoder_params: EncoderParams):
encoder = SequenceAbundanceEncoder(comparison_attributes=[encoder_params.get_sequence_field_name()],
p_value_threshold=p_value, sequence_batch_size=100000, repertoire_batch_size=16)
encoder.encode(dataset, encoder_params)
return encoder
@staticmethod
def _build_compairr_sequence_encoder(dataset, p_value, encoder_params, compairr_path):
encoder = CompAIRRSequenceAbundanceEncoder(p_value_threshold=p_value, compairr_path=compairr_path,
sequence_batch_size=100000, ignore_genes=True, threads=8,
keep_temporary_files=True)
encoder.encode(dataset, encoder_params)
return encoder
@staticmethod
def _get_relevant_feature_presence(encoder, relevant_indices):
if isinstance(encoder, KmerAbundanceEncoder):
relevant_feature_presence = np.sum(encoder.kmer_presence_matrix[relevant_indices], axis=0)
elif isinstance(encoder, CompAIRRSequenceAbundanceEncoder):
relevant_feature_presence = np.sum(encoder.sequence_presence_matrix[relevant_indices], axis=0)
else:
relevant_feature_presence = np.zeros(shape=(6,))
for i, sequence_vector in enumerate(encoder.comparison_data.get_item_vectors()):
if relevant_indices[i]:
relevant_feature_presence += sequence_vector
return relevant_feature_presence