Source code for immuneML.dsl.instruction_parsers.ClusteringParser

import copy
import inspect
from pathlib import Path
from typing import List

import sklearn.metrics as sklearn_metrics

from immuneML.data_model.SequenceParams import RegionType
from immuneML.dsl.DefaultParamsLoader import DefaultParamsLoader
from immuneML.dsl.instruction_parsers.LabelHelper import LabelHelper
from immuneML.dsl.symbol_table.SymbolTable import SymbolTable
from immuneML.dsl.symbol_table.SymbolType import SymbolType
from immuneML.environment.LabelConfiguration import LabelConfiguration
from immuneML.environment.SequenceType import SequenceType
from immuneML.hyperparameter_optimization.config.LeaveOneOutConfig import LeaveOneOutConfig
from immuneML.hyperparameter_optimization.config.ManualSplitConfig import ManualSplitConfig
from immuneML.hyperparameter_optimization.config.SplitConfig import SplitConfig
from immuneML.hyperparameter_optimization.config.SplitType import SplitType
from immuneML.ml_methods.clustering.ClusteringMethod import ClusteringMethod
from immuneML.ml_methods.dim_reduction.DimRedMethod import DimRedMethod
from immuneML.reports.Report import Report
from immuneML.util.ParameterValidator import ParameterValidator
from immuneML.workflows.instructions.clustering.ClusteringInstruction import ClusteringInstruction
from immuneML.workflows.instructions.clustering.clustering_run_model import ClusteringSetting


[docs] class ClusteringParser:
[docs] def parse(self, key: str, instruction: dict, symbol_table: SymbolTable, path: Path = None) -> ClusteringInstruction: valid_keys = [k for k in inspect.signature(ClusteringInstruction.__init__).parameters.keys() if k not in ['result_path', 'name', 'self', 'label_config']] + ['type', 'labels'] ParameterValidator.assert_keys(instruction.keys(), valid_keys, ClusteringParser.__name__, key) ParameterValidator.assert_region_type(instruction, ClusteringParser.__name__) ParameterValidator.assert_sequence_type(instruction, ClusteringParser.__name__) dataset = symbol_table.get(instruction['dataset']) clustering_settings = parse_clustering_settings(key, instruction, symbol_table) metrics = parse_metrics(key, instruction, symbol_table) label_config = parse_labels(key, instruction, dataset) reports = parse_reports(key, instruction, symbol_table) split_config = parse_split_config(key, instruction, symbol_table) return ClusteringInstruction(dataset=dataset, metrics=metrics, clustering_settings=clustering_settings, name=key, label_config=label_config, reports=reports, sequence_type=SequenceType[instruction['sequence_type'].upper()], region_type=RegionType[instruction['region_type'].upper()], split_config=split_config)
[docs] def parse_split_config(key, instruction, symbol_table) -> SplitConfig: try: split_key = 'split_config' default_params = DefaultParamsLoader.load("instructions/", SplitConfig.__name__) instruction[split_key] = {**default_params, **instruction[split_key]} split_strategy = SplitType[instruction[split_key]["split_strategy"].upper()] training_percentage = float( instruction[split_key]["training_percentage"]) if split_strategy == SplitType.RANDOM else -1 if split_strategy == SplitType.RANDOM: assert 0. <= training_percentage <= 1., \ f'{ClusteringParser.__name__}: training_percentage has to between 0 and 1 if split_strategy is RANDOM.' elif split_strategy == SplitType.MANUAL: ParameterValidator.assert_keys(keys=instruction[split_key]["manual_config"].keys(), valid_keys=["discovery_data", "validation_data"], location=ClusteringParser.__name__, parameter_name="manual_config", exclusive=True) ParameterValidator.assert_valid_tabular_file(instruction[split_key]["manual_config"]["discovery_data"], location=ClusteringParser.__name__, parameter_name="discovery_data") ParameterValidator.assert_valid_tabular_file(instruction[split_key]["manual_config"]["validation_data"], location=ClusteringParser.__name__, parameter_name="validation_data") return SplitConfig(split_strategy=split_strategy, split_count=1, training_percentage=training_percentage, manual_config=ManualSplitConfig(train_metadata_path=instruction[split_key]['discovery_data'], test_metadata_path=instruction[split_key]['validation_data']) if "manual_config" in instruction[split_key] else None, leave_one_out_config=LeaveOneOutConfig(**instruction[split_key]["leave_one_out_config"]) if "leave_one_out_config" in instruction[split_key] else None) except KeyError as key_error: raise KeyError( f"{ClusteringParser.__name__}: parameter {key_error.args[0]} was not defined under {split_key}.") from key_error
[docs] def parse_labels(key, instruction, dataset) -> LabelConfiguration: if 'labels' in instruction and instruction['labels'] is not None: label_config = LabelHelper.create_label_config(instruction['labels'], dataset, key, 'labels') else: label_config = LabelConfiguration() return label_config
[docs] def parse_reports(key, instruction, symbol_table) -> List[Report]: if 'reports' not in instruction or instruction['reports'] is None: return [] else: ParameterValidator.assert_type_and_value(instruction['reports'], list, 'ClusteringParser', 'reports') valid_reports = symbol_table.get_keys_by_type(SymbolType.REPORT) ParameterValidator.assert_all_type_and_value(instruction['reports'], str, 'ClusteringParser', 'reports') ParameterValidator.assert_all_in_valid_list(instruction['reports'], valid_reports, 'ClusteringParser', 'reports') reports = [symbol_table.get(report_id) for report_id in instruction['reports']] return reports
[docs] def parse_metrics(key: str, instruction: dict, symbol_table: SymbolTable) -> List[str]: ParameterValidator.assert_type_and_value(instruction['metrics'], list, 'ClusteringParser', f'{key}:metrics') ParameterValidator.assert_all_type_and_value(instruction['metrics'], str, 'ClusteringParser', f'{key}:metrics') for metric in instruction['metrics']: assert hasattr(sklearn_metrics, metric), (f"Clustering parser: metric {metric} is not a valid metric. See the " f"list of scikit-learn's metrics for clustering.") return instruction['metrics']
[docs] def parse_clustering_settings(key: str, instruction: dict, symbol_table: SymbolTable) -> List[ClusteringSetting]: ParameterValidator.assert_type_and_value(instruction['clustering_settings'], list, 'ClusteringParser', 'key:clustering_settings') valid_encodings = symbol_table.get_keys_by_type(SymbolType.ENCODING) valid_dim_red = [method.symbol for method in symbol_table.get_by_type(SymbolType.ML_METHOD) if isinstance(method.item, DimRedMethod)] valid_clusterings = [method.symbol for method in symbol_table.get_by_type(SymbolType.ML_METHOD) if isinstance(method.item, ClusteringMethod)] settings_objs = [] for setting in instruction['clustering_settings']: setting_obj = make_setting_obj(setting, valid_encodings, valid_clusterings, valid_dim_red, symbol_table, instruction) settings_objs.append(setting_obj) return settings_objs
[docs] def make_setting_obj(setting, valid_encodings, valid_clusterings, valid_dim_red, symbol_table, instruction): ParameterValidator.assert_keys_present(setting.keys(), ['encoding', 'method'], 'ClusteringParser', 'clustering_settings') ParameterValidator.assert_keys(setting.keys(), ['encoding', 'dim_reduction', 'method'], 'ClusteringParser', 'clustering_settings', exclusive=False) ParameterValidator.assert_in_valid_list(setting['encoding'], valid_encodings, 'ClusteringParser', 'encoding') ParameterValidator.assert_in_valid_list(setting['method'], valid_clusterings, 'ClusteringParser', 'method') if 'dim_reduction' in setting and setting['dim_reduction'] is not None: ParameterValidator.assert_in_valid_list(setting['dim_reduction'], valid_dim_red, 'ClusteringParser', 'dim_reduction') dim_reduction = copy.deepcopy(symbol_table.get(setting['dim_reduction'])) dim_red_params = symbol_table.get_config(setting['dim_reduction']) dim_red_name = setting['dim_reduction'] else: dim_reduction, dim_red_params, dim_red_name = None, None, None encoder = make_encoder_obj(symbol_table, setting['encoding'], instruction['dataset']) method = copy.deepcopy(symbol_table.get(setting['method'])) return ClusteringSetting(encoder=encoder, encoder_params=symbol_table.get_config(setting['encoding']), encoder_name=setting['encoding'], clustering_method=method, clustering_params=symbol_table.get_config(setting['method']), clustering_method_name=setting['method'], dim_reduction_method=dim_reduction, dim_red_params=dim_red_params, dim_red_name=dim_red_name)
[docs] def make_encoder_obj(symbol_table, encoding_key, dataset_key): return symbol_table.get(encoding_key).build_object(symbol_table.get(dataset_key), **symbol_table.get_config(encoding_key)["encoder_params"]) \ .set_context({"dataset": symbol_table.get(dataset_key)})