import logging
import os
import subprocess
from collections import defaultdict
from pathlib import Path
import pandas as pd
import plotly.graph_objects as go
from immuneML.data_model.SequenceParams import RegionType
from immuneML.data_model.datasets.Dataset import Dataset
from immuneML.data_model.datasets.ElementDataset import SequenceDataset
from immuneML.data_model.datasets.RepertoireDataset import RepertoireDataset
from immuneML.reports.PlotlyUtil import PlotlyUtil
from immuneML.reports.ReportOutput import ReportOutput
from immuneML.reports.ReportResult import ReportResult
from immuneML.reports.data_reports.DataReport import DataReport
from immuneML.util.ParameterValidator import ParameterValidator
from immuneML.util.PathBuilder import PathBuilder
[docs]
class NodeDegreeDistribution(DataReport):
"""
A report that uses CompAIRR to compute the node degree distribution of a sequence dataset. Results are visualized
as a histogram and stored in a TSV file.
The report assumes that CompAIRR (https://github.com/uio-bmi/compairr) has been installed.
**Specification arguments**:
- compairr_path (str): The path to the CompAIRR executable.
- region_type (str): The region type to analyze. Can be either "IMGT_CDR3" or "IMGT_JUNCTION".
- indels (bool): Whether to include indels in the analysis. Default is False.
- ignore_genes (bool): Whether to ignore gene names in the analysis. Default is False.
- hamming_distance (int): The Hamming distance to use for the analysis. Default is 1.
- per_repertoire (bool): Whether to compute the node degree distribution for each repertoire separately. Only
applicable when using a RepertoireDataset. Default is False.
- per_label (bool): Whether to compute the node degree distribution for each label separately. Only applicable
when using a RepertoireDataset. Default is False.
- threads (int): The number of threads to use for the analysis. Default is 4.
YAML specification:
.. indent with spaces
.. code-block:: yaml
NodeDegreeDistribution:
compairr_path: /path/to/compairr
region_type: IMGT_JUNCTION
indels: False
ignore_genes: False
hamming_distance: 1
per_repertoire: False
per_label: False
threads: 4
"""
[docs]
@classmethod
def build_object(cls, **kwargs):
ParameterValidator.assert_region_type(kwargs)
return NodeDegreeDistribution(**{**kwargs, 'region_type': RegionType[kwargs['region_type'].upper()]})
def __init__(self, dataset: Dataset = None, result_path: Path = None,
name: str = None, compairr_path: str = None, region_type: RegionType = RegionType.IMGT_JUNCTION,
indels: bool = False, ignore_genes: bool = False, hamming_distance: int = 1,
per_repertoire: bool = False, per_label=False, threads: int = 4):
super().__init__(dataset=dataset, result_path=result_path, name=name)
self.compairr_path = compairr_path
self.region_type = region_type
self.indels = indels
self.ignore_genes = ignore_genes
self.hamming_distance = hamming_distance
self.per_repertoire = per_repertoire
self.per_label = per_label
self.threads = threads
[docs]
def check_prerequisites(self) -> bool:
if not self.compairr_path:
logging.warning("CompAIRR path not provided. CompAIRR must be installed and available in the system PATH.")
return False
if not isinstance(self.dataset, (SequenceDataset, RepertoireDataset)):
logging.warning(f"{NodeDegreeDistribution.__name__} report can only be generated for SequenceDataset or "
f"RepertoireDataset.")
return False
if self.region_type not in {RegionType.IMGT_CDR3, RegionType.IMGT_JUNCTION}:
logging.warning(f"Invalid region type: {self.region_type.value.upper()}. "
f"{NodeDegreeDistribution.__name__} report can only be generated for "
f"{RegionType.IMGT_CDR3.value.upper()} or {RegionType.IMGT_JUNCTION.value.upper()}.")
return False
return True
def _generate(self) -> ReportResult:
PathBuilder.build(self.result_path)
output_figures, output_tables = [], []
if isinstance(self.dataset, SequenceDataset):
self._generate_for_sequence_dataset(output_figures, output_tables)
elif isinstance(self.dataset, RepertoireDataset):
self._generate_for_repertoire_dataset(output_figures, output_tables)
return ReportResult(
name=self.name,
info=f"Node degree distribution with hamming distance {self.hamming_distance}",
output_figures=output_figures,
output_tables=output_tables
)
def _generate_for_sequence_dataset(self, output_figures, output_tables):
node_degree_dist = _compute_node_degree_dist(self.result_path, self.dataset.filename, self.region_type,
self.ignore_genes, self.compairr_path, self.hamming_distance,
self.indels, self.threads)
output_figures.append(_plot_node_degree_dist(self.result_path, node_degree_dist, "node_degree_distribution"))
output_tables.append(_store_node_degree_dist(self.result_path, node_degree_dist, "node_degree_distribution"))
def _generate_for_repertoire_dataset(self, output_figures, output_tables):
node_degree_dist_list, node_degree_dist_labels = self._compute_repertoire_node_degrees()
if self.per_repertoire:
self._generate_output_per_repertoire(output_figures, output_tables, node_degree_dist_list)
if self.per_label:
self._generate_output_per_label(output_figures, output_tables, node_degree_dist_labels)
self._generate_average_output(output_figures, output_tables, node_degree_dist_list)
def _compute_repertoire_node_degrees(self):
node_degree_dist_list = []
node_degree_dist_labels = defaultdict(lambda: defaultdict(list))
for repertoire in self.dataset.repertoires:
node_degree_dist = _compute_node_degree_dist(
self.result_path, repertoire.data_filename,
self.region_type, self.ignore_genes, self.compairr_path,
self.hamming_distance, self.indels, self.threads
)
node_degree_dist_list.append(node_degree_dist)
for label in self.dataset.labels:
node_degree_dist_labels[label][repertoire.metadata[label]].append(node_degree_dist)
return node_degree_dist_list, node_degree_dist_labels
def _generate_output_per_repertoire(self, output_figures, output_tables, node_degree_dist_list):
for dist, repertoire in zip(node_degree_dist_list, self.dataset.repertoires):
name = f"node_degree_distribution_{repertoire.metadata['subject_id']}"
output_figures.append(_plot_node_degree_dist(self.result_path, dist, name))
output_tables.append(_store_node_degree_dist(self.result_path, dist, name))
def _generate_output_per_label(self, output_figures, output_tables, node_degree_dist_labels):
for label, value_map in node_degree_dist_labels.items():
for value, dists in value_map.items():
name = f"node_degree_distribution_{label}_{value}"
output_figures.append(_plot_avg_node_degree_dist(self.result_path, dists, name))
output_tables.append(_store_avg_node_degree_dist(self.result_path, dists, name))
def _generate_average_output(self, output_figures, output_tables, node_degree_dist_list):
name = "node_degree_distribution_average"
output_figures.append(_plot_avg_node_degree_dist(self.result_path, node_degree_dist_list, name))
output_tables.append(_store_avg_node_degree_dist(self.result_path, node_degree_dist_list, name))
def _compute_node_degree_dist(result_path, dataset_filename, region_type, ignore_genes, compairr_path, hamming_distance,
indels, threads):
compairr_existence_output = _run_compairr(result_path, dataset_filename, region_type, ignore_genes, compairr_path,
hamming_distance, indels, threads)
if compairr_existence_output is None:
raise RuntimeError("CompAIRR execution failed to generate output.")
compairr_existence_output["degree"] -= 1
node_degree_dist = compairr_existence_output["degree"].value_counts()
return node_degree_dist
def _run_compairr(result_path, dataset_filename, region_type, ignore_genes, compairr_path, hamming_distance, indels,
threads):
output_file = result_path / f"compairr_existence.tsv"
dedupliacted_dataset_filename = _deduplicate_sequences(dataset_filename, region_type, ignore_genes)
cmd_args = [str(compairr_path), "--existence", str(dedupliacted_dataset_filename),
str(dedupliacted_dataset_filename), "--output", str(output_file), "--differences",
str(hamming_distance), "--ignore-counts"]
if indels:
cmd_args.append("--indels")
if ignore_genes:
cmd_args.append("--ignore-genes")
if region_type == RegionType.IMGT_CDR3:
cmd_args.append("--cdr3")
cmd_args.extend(["-t", str(threads)])
subprocess.run(cmd_args, capture_output=True, text=True)
compairr_existence_output = None
if output_file.is_file() and os.path.getsize(output_file) > 0:
compairr_existence_output = pd.read_csv(output_file, sep='\t', names=['sequence_id', 'degree'],
header=0)
os.remove(str(output_file))
os.remove(dedupliacted_dataset_filename)
return compairr_existence_output
def _deduplicate_sequences(dataset_filename, region_type, ignore_genes):
deduplicated_dataset_filename = dataset_filename.with_name(
f"{dataset_filename.stem}_deduplicated{dataset_filename.suffix}"
)
dataset = pd.read_csv(dataset_filename, sep="\t", header=0)
subset = _get_deduplication_subset(region_type, ignore_genes)
deduplicated_dataset = dataset.drop_duplicates(subset=subset)
deduplicated_dataset.to_csv(deduplicated_dataset_filename, sep="\t", index=False)
return deduplicated_dataset_filename
def _get_deduplication_subset(region_type, ignore_genes):
if region_type == RegionType.IMGT_CDR3:
subset = ["cdr3_aa"]
elif region_type == RegionType.IMGT_JUNCTION:
subset = ["junction_aa"]
else:
raise ValueError(f"Unsupported region type: {region_type}")
if not ignore_genes:
subset.extend(["v_call", "j_call"])
return subset
def _plot_node_degree_dist(result_path, node_degree_dist: pd.DataFrame, name: str) -> ReportOutput:
fig = go.Figure()
fig.add_trace(go.Bar(x=node_degree_dist.index, y=node_degree_dist.values))
fig.update_layout(xaxis_title="Degree",
yaxis_title="Count")
output_path = result_path / f"{name}.html"
output_path = PlotlyUtil.write_image_to_file(fig, output_path)
return ReportOutput(path=output_path, name=name)
def _plot_avg_node_degree_dist(result_path, node_degree_dists: list[pd.Series], name: str) -> ReportOutput:
df = pd.DataFrame(node_degree_dists).fillna(0).astype(int)
avg_dist = df.mean(axis=0)
std_dist = df.std(axis=0)
fig = go.Figure()
fig.add_trace(go.Bar(
x=avg_dist.index,
y=avg_dist.values,
error_y=dict(type='data', array=std_dist.values, visible=True)
))
fig.update_layout(xaxis_title="Degree", yaxis_title="Average Count ± Std Dev")
output_path = result_path / f"{name}.html"
output_path = PlotlyUtil.write_image_to_file(fig, output_path)
return ReportOutput(path=output_path, name=f"{name}")
def _store_node_degree_dist(result_path, node_degree_dist: pd.Series, name: str) -> ReportOutput:
output_path = result_path / f"{name}.tsv"
node_degree_dist.to_csv(output_path, sep="\t")
return ReportOutput(path=output_path, name=name)
def _store_avg_node_degree_dist(result_path, node_degree_dists: list[pd.Series], name: str) -> ReportOutput:
df = pd.DataFrame(node_degree_dists).fillna(0).astype(int)
avg_dist = df.mean(axis=0)
std_dist = df.std(axis=0)
combined = pd.DataFrame({
"degree": avg_dist.index,
"average_count": avg_dist.values,
"std_count": std_dist.values
})
output_path = result_path / f"{name}.tsv"
combined.to_csv(output_path, sep="\t", index=False)
return ReportOutput(path=output_path, name=f"{name}")