Source code for immuneML.dsl.instruction_parsers.ValidateClusteringParser

import inspect
import logging
import shutil
import tempfile
from pathlib import Path
from typing import List

import sklearn.metrics as sklearn_metrics

from immuneML.IO.ml_method.ClusteringImporter import ClusteringImporter
from immuneML.reports.clustering_method_reports.ClusteringMethodReport import ClusteringMethodReport
from immuneML.reports.data_reports.DataReport import DataReport
from immuneML.reports.encoding_reports.EncodingReport import EncodingReport
from immuneML.data_model.SequenceParams import RegionType
from immuneML.dsl.DefaultParamsLoader import DefaultParamsLoader
from immuneML.dsl.instruction_parsers.LabelHelper import LabelHelper
from immuneML.dsl.symbol_table.SymbolTable import SymbolTable
from immuneML.environment.LabelConfiguration import LabelConfiguration
from immuneML.environment.SequenceType import SequenceType
from immuneML.util.ParameterValidator import ParameterValidator
from immuneML.workflows.instructions.clustering.ValidateClusteringInstruction import ValidateClusteringInstruction


[docs] class ValidateClusteringParser:
[docs] def parse(self, key: str, instruction: dict, symbol_table: SymbolTable, path: Path = None) -> ValidateClusteringInstruction: valid_keys = ['type', 'clustering_config_path', 'dataset', 'metrics', 'validation_type', 'labels', 'sequence_type', 'region_type', 'number_of_processes', 'reports'] ParameterValidator.assert_keys(instruction.keys(), valid_keys, ValidateClusteringParser.__name__, key) ParameterValidator.assert_keys_present(instruction.keys(), ['clustering_config_path', 'dataset', 'metrics', 'validation_type'], ValidateClusteringParser.__name__, key) ParameterValidator.assert_region_type(instruction, ValidateClusteringParser.__name__) ParameterValidator.assert_sequence_type(instruction, ValidateClusteringParser.__name__) # Load the clustering item from the exported zip file clustering_item = self._load_clustering_item(instruction['clustering_config_path']) # Get the validation dataset dataset = symbol_table.get(instruction['dataset']) # Parse metrics metrics = self._parse_metrics(key, instruction) # Parse validation types validation_type = self._parse_validation_type(key, instruction) # Parse labels if provided label_config = self._parse_labels(key, instruction, dataset) # Get optional parameters with defaults number_of_processes = instruction.get('number_of_processes', 1) # Parse reports reports = self._parse_reports(instruction, symbol_table) return ValidateClusteringInstruction( clustering_item=clustering_item, dataset=dataset, metrics=metrics, validation_type=validation_type, label_config=label_config, sequence_type=SequenceType[instruction['sequence_type'].upper()], region_type=RegionType[instruction['region_type'].upper()], number_of_processes=number_of_processes, reports=reports )
def _load_clustering_item(self, config_path: str): """Load a ClusteringItem from an exported zip file or directory.""" config_path = Path(config_path) if config_path.suffix == '.zip': # Extract zip to temp directory and load temp_dir = tempfile.mkdtemp() shutil.unpack_archive(config_path, temp_dir) cl_item, config = ClusteringImporter.import_clustering_item(Path(temp_dir)) # Note: temp_dir is not cleaned up here to keep the loaded objects valid # It will be cleaned up when the process ends else: # Assume it's a directory cl_item, config = ClusteringImporter.import_clustering_item(config_path) return cl_item def _parse_metrics(self, key: str, instruction: dict) -> List[str]: """Parse and validate clustering metrics.""" ParameterValidator.assert_type_and_value(instruction['metrics'], list, 'ValidateClusteringParser', f'{key}:metrics') ParameterValidator.assert_all_type_and_value(instruction['metrics'], str, 'ValidateClusteringParser', f'{key}:metrics') for metric in instruction['metrics']: assert hasattr(sklearn_metrics, metric), ( f"ValidateClusteringParser: metric {metric} is not a valid metric. " f"See the list of scikit-learn's metrics for clustering." ) return instruction['metrics'] def _parse_validation_type(self, key: str, instruction: dict) -> List[str]: """Parse and validate validation types.""" ParameterValidator.assert_type_and_value(instruction['validation_type'], list, 'ValidateClusteringParser', f'{key}:validation_type') valid_types = ['method_based', 'result_based'] for vtype in instruction['validation_type']: ParameterValidator.assert_in_valid_list(vtype, valid_types, 'ValidateClusteringParser', 'validation_type') return instruction['validation_type'] def _parse_labels(self, key: str, instruction: dict, dataset) -> LabelConfiguration: """Parse labels for external evaluation.""" if 'labels' in instruction and instruction['labels'] is not None: return LabelHelper.create_label_config(instruction['labels'], dataset, key, 'labels') return LabelConfiguration() def _parse_reports(self, instruction: dict, symbol_table: SymbolTable) -> List: """Parse reports from the symbol table.""" reports = [] if 'reports' in instruction and instruction['reports'] is not None: for report_name in instruction['reports']: report = symbol_table.get(report_name) if any(isinstance(report, cls) for cls in [ClusteringMethodReport, DataReport, EncodingReport]): reports.append(report) else: logging.warning(f"Report {report_name} (type: {type(report)}) could not be added " f"to ValidateClusteringInstruction.") return reports