Source code for immuneML.workflows.instructions.clustering.ClusteringState
from dataclasses import dataclass, field
from pathlib import Path
from typing import List, Dict, Union
from immuneML.data_model.SequenceParams import RegionType
from immuneML.data_model.datasets.Dataset import Dataset
from immuneML.environment.LabelConfiguration import LabelConfiguration
from immuneML.environment.SequenceType import SequenceType
from immuneML.hyperparameter_optimization.config.SplitConfig import SplitConfig
from immuneML.reports.ReportResult import ReportResult
from immuneML.workflows.instructions.clustering.clustering_run_model import ClusteringItem, ClusteringSetting
[docs]
@dataclass
class ClusteringConfig:
name: str
dataset: Dataset
metrics: List[str]
split_config: SplitConfig
validation_type: List[str]
clustering_settings: List[ClusteringSetting]
region_type: RegionType = RegionType.IMGT_CDR3
label_config: LabelConfiguration = None
sequence_type: SequenceType = SequenceType.AMINO_ACID
[docs]
@dataclass
class ClusteringItemResult:
item: ClusteringItem
report_results: List[ReportResult] = field(default_factory=list)
[docs]
@dataclass
class ClusteringResultPerRun:
run_id: int
run_type: str
items: Dict[str, ClusteringItemResult] = field(default_factory=dict)
[docs]
def get_cl_item(self, cl_setting: Union[str, ClusteringSetting]):
key = cl_setting if isinstance(cl_setting, str) else cl_setting.get_key()
return self.items[key].item
[docs]
@dataclass
class ClusteringResults:
discovery: ClusteringResultPerRun = None
method_based_validation: ClusteringResultPerRun = None
result_based_validation: ClusteringResultPerRun = None
[docs]
@dataclass
class ClusteringState:
name: str
config: ClusteringConfig
result_path: Path = None
clustering_items: List[ClusteringResults] = field(default_factory=list)
predictions_paths: List[Dict[str, Path]] = None
discovery_datasets: List[Dataset] = None
validation_datasets: List[Dataset] = None
clustering_report_results: List[ReportResult] = field(default_factory=list)
[docs]
def add_cl_result_per_run(self, run_id: int, analysis_desc: str, cl_item_result: ClusteringResultPerRun):
if len(self.clustering_items) <= run_id:
self.clustering_items.append(ClusteringResults(**{analysis_desc: cl_item_result}))
else:
setattr(self.clustering_items[run_id], analysis_desc, cl_item_result)