Source code for immuneML.workflows.steps.data_splitter.LeaveOneOutSplitter

import numpy as np

from immuneML.data_model.dataset.ReceptorDataset import ReceptorDataset
from immuneML.data_model.dataset.SequenceDataset import SequenceDataset
from immuneML.data_model.receptor.receptor_sequence.ReceptorSequence import ReceptorSequence
from immuneML.workflows.steps.data_splitter.DataSplitterParams import DataSplitterParams
from immuneML.workflows.steps.data_splitter.Util import Util


[docs] class LeaveOneOutSplitter:
[docs] @staticmethod def split_dataset(input_params: DataSplitterParams): if isinstance(input_params.dataset, ReceptorDataset) or isinstance(input_params.dataset, SequenceDataset): return LeaveOneOutSplitter._split_receptor_dataset(input_params) else: raise NotImplementedError("LeaveOneOutSplitter: leave-one-out stratification is currently implemented only for receptor dataset, " f"got {type(input_params.dataset).__name__} instead.")
@staticmethod def _split_receptor_dataset(input_params: DataSplitterParams): dataset = input_params.dataset param, min_count = input_params.split_config.leave_one_out_config.parameter, input_params.split_config.leave_one_out_config.min_count unique_values = LeaveOneOutSplitter._get_unique_param_values(dataset, param, min_count) input_params = LeaveOneOutSplitter._update_split_count(input_params, unique_values) train_indices, test_indices = LeaveOneOutSplitter._get_train_test_indices(dataset, unique_values, param) train_datasets, test_datasets = LeaveOneOutSplitter._make_datasets_from_indices(unique_values, dataset, train_indices, test_indices, input_params) return train_datasets, test_datasets @staticmethod def _update_split_count(input_params: DataSplitterParams, unique_values): input_params.split_config.split_count = unique_values.shape[0] input_params.split_count = input_params.split_config.split_count return input_params @staticmethod def _make_datasets_from_indices(unique_values, dataset, train_indices, test_indices, input_params): train_datasets, test_datasets = [], [] for index, value in enumerate(unique_values): train_datasets.append(Util.make_dataset(dataset, train_indices[value], input_params, index, type(dataset).TRAIN)) test_datasets.append(Util.make_dataset(dataset, test_indices[value], input_params, index, type(dataset).TEST)) return train_datasets, test_datasets @staticmethod def _get_unique_param_values(dataset, param, min_count): if isinstance(dataset, ReceptorDataset): parameter_values = [receptor.metadata[param] for receptor in dataset.get_data()] elif isinstance(dataset, SequenceDataset): parameter_values = [seq.metadata.custom_params[param] for seq in dataset.get_data()] else: raise RuntimeError(f"{LeaveOneOutSplitter.__name__}: dataset of type {type(dataset)} cannot be used with this splitter.") unique_values, count = np.unique(parameter_values, return_counts=True) assert all(el > min_count for el in count), f"DataSplitter: there are not enough examples with different values of the parameter {param} " \ f"to split the dataset." return unique_values @staticmethod def _get_train_test_indices(dataset, unique_values, param): train_indices, test_indices = {value: [] for value in unique_values}, {value: [] for value in unique_values} for index, receptor in enumerate(dataset.get_data()): for value in unique_values: if LeaveOneOutSplitter._get_key_from_element_obj(receptor, param) == value: test_indices[value].append(index) else: train_indices[value].append(index) return train_indices, test_indices @staticmethod def _get_key_from_element_obj(obj, param): if isinstance(obj, ReceptorSequence): return obj.metadata.custom_params[param] else: return obj.metadata[param]