Source code for immuneML.encodings.abundance_encoding.CompAIRRBatchIterator
from immuneML.util.CompAIRRHelper import CompAIRRHelper
[docs]
class CompAIRRBatchIterator:
def __init__(self, paths, sequence_batch_size):
self.repertoire_ids = None
self.sequence_batch_size = sequence_batch_size
self.batch_paths = self.get_batch_dict(paths)
self.sequence_count = self.compute_sequence_count()
def __iter__(self):
return self.get_sequence_vectors(self.repertoire_ids)
def __len__(self):
return self.sequence_count
[docs]
def compute_sequence_count(self):
sequence_count = (len(self.batch_paths) - 1) * self.sequence_batch_size
last_batch_path = self.batch_paths[max(self.batch_paths.keys())]
sequence_count += len(CompAIRRHelper.read_compairr_output_file(last_batch_path))
return sequence_count
[docs]
def get_batch_dict(self, paths):
return {self.get_batch_from_path(path): path for path in paths}
[docs]
def get_batch_from_path(self, path):
return int(path.stem.split("_batch")[1])
[docs]
def set_repertoire_ids(self, repertoire_ids):
self.repertoire_ids = repertoire_ids
[docs]
def get_batches(self, repertoire_ids=None):
for batch_idx in sorted(self.batch_paths):
path = self.batch_paths[batch_idx]
batch = CompAIRRHelper.read_compairr_output_file(path)
if repertoire_ids is not None:
batch = batch[repertoire_ids]
# count clones only
batch[batch > 1] = 1
batch.sort_index(inplace=True)
yield batch
[docs]
def get_sequence_vectors(self, repertoire_ids=None):
for batch in self.get_batches(repertoire_ids):
for idx, sequence_vector in batch.iterrows():
yield sequence_vector.to_numpy()