import copy
import logging
import numbers
from pathlib import Path
from typing import Tuple, Dict, List, Union
import numpy as np
import pandas as pd
import sklearn
from immuneML.caching.CacheHandler import CacheHandler
from immuneML.data_model.SequenceParams import RegionType
from immuneML.data_model.datasets.Dataset import Dataset
from immuneML.encodings.DatasetEncoder import DatasetEncoder
from immuneML.encodings.EncoderParams import EncoderParams
from immuneML.environment.LabelConfiguration import LabelConfiguration
from immuneML.environment.SequenceType import SequenceType
from immuneML.ml_methods.clustering.ClusteringMethod import ClusteringMethod
from immuneML.ml_methods.dim_reduction.DimRedMethod import DimRedMethod
from immuneML.ml_methods.helper_methods.FurthestNeighborClassifier import FurthestNeighborClassifier
from immuneML.ml_metrics import ClusteringMetric
from immuneML.ml_metrics.ClusteringMetric import is_external, is_internal
from immuneML.util.Logger import print_log
from immuneML.util.PathBuilder import PathBuilder
from immuneML.workflows.instructions.clustering.ClusteringState import ClusteringItemResult
from immuneML.workflows.instructions.clustering.clustering_run_model import ClusteringItem, DataFrameWrapper, \
ClusteringSetting
from immuneML.workflows.steps.DataEncoder import DataEncoder
from immuneML.workflows.steps.DataEncoderParams import DataEncoderParams
from scipy.sparse import issparse
from sklearn.neighbors import NearestCentroid, KNeighborsClassifier
[docs]
def run_all_settings(dataset: Dataset, clustering_settings: List[ClusteringSetting], path: Path,
predictions_df: pd.DataFrame, metrics: List[str], label_config: LabelConfiguration,
number_of_processes: int, sequence_type: SequenceType, region_type: RegionType,
report_handler=None, run_id: int = None, state=None) -> Tuple[Dict, pd.DataFrame]:
"""
Run all clustering settings on a dataset and collect results.
Args:
dataset: The dataset to cluster
clustering_settings: List of clustering settings to evaluate
path: Output path for results
predictions_df: DataFrame to store predictions
metrics: List of metric names to compute
label_config: Label configuration for external metrics
number_of_processes: Number of processes for parallelization
sequence_type: Sequence type for encoding
region_type: Region type for encoding
report_handler: Optional report handler for running item reports
run_id: Optional run identifier
state: Optional clustering state for report handler
Returns:
Tuple of (clustering_items dict, updated predictions_df)
"""
clustering_items = {}
for cl_setting in clustering_settings:
cl_item_res, predictions_df = run_setting(
dataset=dataset,
cl_setting=cl_setting,
path=path,
predictions_df=predictions_df,
metrics=metrics,
label_config=label_config,
number_of_processes=number_of_processes,
sequence_type=sequence_type,
region_type=region_type,
report_handler=report_handler,
run_id=run_id,
state=state
)
clustering_items[cl_setting.get_key()] = cl_item_res
return clustering_items, predictions_df
[docs]
def run_setting(dataset: Dataset, cl_setting: ClusteringSetting, path: Path,
predictions_df: pd.DataFrame, metrics: List[str], label_config: LabelConfiguration,
number_of_processes: int, sequence_type: SequenceType, region_type: RegionType,
report_handler=None, run_id: int = None, state=None, evaluate: bool = True) \
-> Tuple[ClusteringItemResult, pd.DataFrame]:
"""
Run a single clustering setting on a dataset.
Args:
dataset: The dataset to cluster
cl_setting: The clustering setting to use
path: Output path for results
predictions_df: DataFrame to store predictions
metrics: List of metric names to compute
label_config: Label configuration for external metrics
number_of_processes: Number of processes for parallelization
sequence_type: Sequence type for encoding
region_type: Region type for encoding
report_handler: Optional report handler for running item reports
run_id: Optional run identifier
state: Optional clustering state for report handler
evaluate: Whether to compute internal/external evaluation metrics
Returns:
Tuple of (ClusteringItemResult, updated predictions_df)
"""
print_log(f"Running clustering setting {cl_setting.get_key()}")
cl_setting.path = path / f"{cl_setting.get_key()}"
# Encode data
encoder = copy.deepcopy(cl_setting.encoder)
enc_dataset, fitted_encoder, dim_red_method = encode_dataset(dataset, cl_setting, number_of_processes, label_config,
True, sequence_type, region_type, encoder,
copy.deepcopy(cl_setting.dim_reduction_method) if cl_setting.dim_reduction_method else None)
# Run clustering
method = copy.deepcopy(cl_setting.clustering_method)
predictions = fit_and_predict(enc_dataset, method)
if predictions_df is not None:
predictions_df[f'predictions_{cl_setting.get_key()}'] = predictions
print_log(f"{cl_setting.get_key()}: clustering method fitted and predictions made.")
cl_item = ClusteringItem(
cl_setting=cl_setting,
dataset=enc_dataset,
predictions=predictions,
encoder=fitted_encoder,
dim_red_method=dim_red_method,
method=method
)
if evaluate:
cl_item = evaluate_clustering(predictions_df, cl_setting, get_features(enc_dataset, cl_setting), metrics, label_config, cl_item)
# Run reports if handler provided
report_results = []
if report_handler is not None and run_id is not None and state is not None:
report_results = report_handler.run_item_reports(cl_item, run_id, cl_setting.path, state)
print_log(f"Clustering setting {cl_setting.get_key()} finished.")
return ClusteringItemResult(cl_item, report_results), predictions_df
[docs]
def fit_and_predict(dataset: Dataset, method: ClusteringMethod) -> np.ndarray:
"""Fit clustering method and get predictions."""
if hasattr(method, 'fit_predict'):
return method.fit_predict(dataset)
else:
method.fit(dataset)
return method.predict(dataset)
[docs]
def get_features(dataset: Dataset, cl_setting: ClusteringSetting):
"""Get features from encoded dataset."""
return dataset.encoded_data.examples if cl_setting.dim_reduction_method is None \
else dataset.encoded_data.dimensionality_reduced_data
[docs]
def evaluate_clustering(predictions_df: pd.DataFrame, cl_setting: ClusteringSetting,
features, metrics: List[str], label_config: LabelConfiguration,
cl_item: ClusteringItem, predictions_col_name: str = None) -> ClusteringItem:
"""
Evaluate clustering results using internal and external metrics.
Args:
predictions_col_name: name of the predictions column in predictions_df
predictions_df: DataFrame containing predictions and labels
cl_setting: The clustering setting used
features: Feature matrix for internal metrics
metrics: List of metric names to compute
label_config: Label configuration for external metrics
cl_item: Clustering item to evaluate and update with performance csv files
Returns:
Updated ClusteringItem with performance CSV file paths
"""
cl_item.external_performance = DataFrameWrapper(path=eval_external_metrics(predictions_df, cl_setting, metrics, label_config, predictions_col_name))
cl_item.internal_performance = DataFrameWrapper(path=eval_internal_metrics(predictions_df, cl_setting, features, metrics, predictions_col_name))
return cl_item
[docs]
def eval_internal_metrics(predictions_df: pd.DataFrame, cl_setting: ClusteringSetting,
features, metrics: List[str], predictions_col_name: str = None) -> Path:
if predictions_col_name is None:
predictions_col_name = f'predictions_{cl_setting.get_key()}'
internal_performances = {}
dense_features = features.toarray() if issparse(features) else features
for metric in [m for m in metrics if is_internal(m)]:
try:
internal_performances[metric] = [ClusteringMetric.compute(metric, dense_features,
predictions_df[predictions_col_name].values,
cl_setting.clustering_params.get('metric', None))]
except ValueError as e:
logging.warning(f"Error calculating metric {metric}: {e}")
internal_performances[metric] = [np.nan]
PathBuilder.build(cl_setting.path)
pd.DataFrame(internal_performances).to_csv(str(cl_setting.path / 'internal_performances.csv'), index=False)
return cl_setting.path / 'internal_performances.csv'
[docs]
def eval_external_metrics(predictions_df: pd.DataFrame, cl_setting: ClusteringSetting,
metrics: List[str], label_config: LabelConfiguration, predictions_col_name: str = None) \
-> Union[Path, None]:
if label_config is not None and label_config.get_label_count() > 0:
external_performances = {label: {} for label in label_config.get_labels_by_name()}
if predictions_col_name is None:
predictions_col_name = f"predictions_{cl_setting.get_key()}"
for metric in [m for m in metrics if is_external(m)]:
metric_fn = getattr(sklearn.metrics, metric)
for label in label_config.get_labels_by_name():
try:
external_performances[label][metric] = metric_fn(
predictions_df[label].values,
predictions_df[predictions_col_name].values
)
except ValueError as e:
logging.warning(f"Error calculating metric {metric}: {e}")
external_performances[label][metric] = np.nan
PathBuilder.build(cl_setting.path)
(pd.DataFrame(external_performances).reset_index().rename(columns={'index': 'metric'})
.to_csv(str(cl_setting.path / 'external_performances.csv'), index=False))
return cl_setting.path / 'external_performances.csv'
else:
return None
[docs]
def train_cluster_classifier(cl_item: ClusteringItem):
classifier = get_complementary_classifier(cl_item.cl_setting)
# Get features and cluster assignments from discovery data
features = get_features(cl_item.dataset, cl_item.cl_setting)
if len(list(set(cl_item.predictions))) == 1:
return cl_item.predictions[0]
else:
classifier.fit(features, cl_item.predictions)
return classifier
[docs]
def apply_cluster_classifier(dataset: Dataset, cl_setting: ClusteringSetting, classifier, encoder: DatasetEncoder,
dim_red_method: DimRedMethod, predictions_path: Path, number_of_processes: int,
sequence_type: SequenceType, region_type: RegionType) -> ClusteringItem:
enc_dataset = encode_dataset(dataset, cl_setting, number_of_processes, LabelConfiguration(),
learn_model=False, sequence_type=sequence_type,
region_type=region_type, encoder=encoder, dim_red_method=dim_red_method)[0]
features = get_features(enc_dataset, cl_setting)
if isinstance(classifier, numbers.Number):
predictions = [classifier] * dataset.get_example_count()
logging.warning(f"Only one cluster found in discovery data. Assigning all validation data to "
f"cluster {classifier}.")
else:
predictions = classifier.predict(features)
cl_item = ClusteringItem(
cl_setting=cl_setting,
dataset=enc_dataset,
predictions=predictions,
encoder=encoder,
dim_red_method=dim_red_method,
method=None, # not used here because of the transfer
classifier=classifier
)
pd.DataFrame({'predictions': predictions, 'example_id': dataset.get_example_ids()}).to_csv(
predictions_path, index=False)
return cl_item
[docs]
def encode_dataset(dataset: Dataset, cl_setting: ClusteringSetting, number_of_processes: int,
label_config: LabelConfiguration, learn_model: bool, sequence_type: SequenceType,
region_type: RegionType, encoder: DatasetEncoder = None, dim_red_method: DimRedMethod = None):
"""
Encode a dataset using the specified clustering setting's encoder.
Results are cached based on parameters.
Args:
dataset: The dataset to encode
cl_setting: The clustering setting containing encoder configuration
number_of_processes: Number of processes for parallelization
label_config: Label configuration
learn_model: Whether to learn the encoder model or use existing
sequence_type: Sequence type for encoding
region_type: Region type for encoding
encoder: Optional pre-configured encoder
dim_red_method: Optional pre-configured dimensionality reduction method
Returns:
Encoded dataset, encoder, and dimensionality reduction method
"""
def dict_to_sorted_tuple(d: dict) -> tuple:
return tuple(sorted(d.items(), key=lambda tup: tup[0])) if d is not None else None
return CacheHandler.memo_by_params(
(dataset.identifier, dict_to_sorted_tuple(cl_setting.encoder_params), label_config.get_labels_by_name(),
sequence_type.name, region_type.name, cl_setting.dim_red_name, learn_model,
dict_to_sorted_tuple(cl_setting.dim_red_params)),
lambda: encode_dataset_internal(dataset, cl_setting, number_of_processes, label_config,
learn_model, sequence_type, region_type, encoder, dim_red_method))
[docs]
def encode_dataset_internal(dataset: Dataset, cl_setting: ClusteringSetting, number_of_processes: int,
label_config: LabelConfiguration, learn_model: bool, sequence_type: SequenceType,
region_type: RegionType, encoder: DatasetEncoder = None, dim_red_method: DimRedMethod = None) \
-> Tuple[Dataset, DatasetEncoder, DimRedMethod]:
"""
Internal function to encode a dataset (called by encode_dataset with caching).
Args:
dataset: The dataset to encode
cl_setting: The clustering setting containing encoder configuration
number_of_processes: Number of processes for parallelization
label_config: Label configuration
learn_model: Whether to learn the encoder model or use existing
sequence_type: Sequence type for encoding
region_type: Region type for encoding
encoder: Optional pre-configured encoder
dim_red_method: Optional pre-configured dimensionality reduction method
Returns:
Encoded dataset with optional dimensionality reduction
"""
enc_params = EncoderParams(model=cl_setting.encoder_params, result_path=cl_setting.path,
pool_size=number_of_processes, label_config=label_config,
learn_model=learn_model, encode_labels=False,
sequence_type=sequence_type,
region_type=region_type)
internal_encoder = copy.deepcopy(cl_setting.encoder) if encoder is None else encoder
enc_dataset = DataEncoder.run(DataEncoderParams(dataset=dataset, encoder=internal_encoder, encoder_params=enc_params))
if cl_setting.dim_reduction_method:
if learn_model:
internal_dim_red = copy.deepcopy(cl_setting.dim_reduction_method)
enc_dataset.encoded_data.dimensionality_reduced_data = internal_dim_red.fit_transform(dataset=enc_dataset)
enc_dataset.encoded_data.dim_names = internal_dim_red.get_dimension_names()
elif dim_red_method is not None:
internal_dim_red = dim_red_method
enc_dataset.encoded_data.dimensionality_reduced_data = internal_dim_red.transform(dataset=enc_dataset)
enc_dataset.encoded_data.dim_names = internal_dim_red.get_dimension_names()
else:
raise ValueError("Dimensionality reduction method must be provided for transformation when "
"learn_model is False.")
else:
internal_dim_red = None
return enc_dataset, internal_encoder, internal_dim_red
[docs]
def get_complementary_classifier(cl_setting: ClusteringSetting):
"""
Returns a complementary classifier based on the clustering method.
Args:
cl_setting: ClusteringSetting object containing the clustering method configuration
Returns:
An instance of the appropriate classifier; kNN if no matches are found
"""
clustering_method = cl_setting.clustering_method
method_name = clustering_method.__class__.__name__
if method_name == 'KMeans':
return NearestCentroid()
elif method_name == 'AgglomerativeClustering':
if hasattr(clustering_method.model, 'linkage'):
if clustering_method.model.linkage == 'ward':
return NearestCentroid()
elif clustering_method.model.linkage == 'complete':
if cl_setting.encoder.__class__.__name__ == 'TCRdistEncoder':
return FurthestNeighborClassifier(metric='precomputed')
else:
return FurthestNeighborClassifier(metric='euclidean')
return KNeighborsClassifier()