import argparse
import glob
import itertools as it
import logging
import sys
import warnings
from pathlib import Path
import yaml
from immuneML.util.ReadsType import ReadsType
from immuneML.encodings.kmer_frequency.sequence_encoding.SequenceEncodingType import SequenceEncodingType
from immuneML.ml_methods.MLMethod import MLMethod
from immuneML.reports.ml_reports.CoefficientPlottingSetting import CoefficientPlottingSetting
from immuneML.util.PathBuilder import PathBuilder
from immuneML.util.ReflectionHandler import ReflectionHandler
[docs]
def get_sequence_enc_type(sequence_type, position_type, gap_type):
if sequence_type == "complete":
encoding_type = SequenceEncodingType.IDENTITY
else:
if position_type == "positional":
if gap_type == "gapped":
encoding_type = SequenceEncodingType.IMGT_GAPPED_KMER
else:
encoding_type = SequenceEncodingType.IMGT_CONTINUOUS_KMER
else:
if gap_type == "gapped":
encoding_type = SequenceEncodingType.GAPPED_KMER
else:
encoding_type = SequenceEncodingType.CONTINUOUS_KMER
return encoding_type.name
[docs]
def build_encodings_specs(args):
encodings = dict()
for i in range(len(args.sequence_type)):
enc_name = f"encoding_{i + 1}"
enc_spec = dict()
enc_spec["sequence_encoding"] = get_sequence_enc_type(args.sequence_type[i],
None if args.position_type is None else args.position_type[i],
None if args.gap_type is None else args.gap_type[i])
enc_spec["reads"] = args.reads[i]
if args.sequence_type[i] == "subsequence":
if args.gap_type[i] == "gapped":
enc_spec["k_left"] = args.k_left[i]
enc_spec["k_right"] = args.k_right[i]
enc_spec["min_gap"] = args.min_gap[i]
enc_spec["max_gap"] = args.max_gap[i]
else:
enc_spec["k"] = args.k[i]
encodings[enc_name] = {"KmerFrequency": enc_spec}
return encodings
[docs]
def get_ml_method_spec(ml_method_class, model_selection_n_folds=5):
if ml_method_class == "LogisticRegression" or ml_method_class == "SimpleLogisticRegression":
ml_spec = {
"logistic_regression": {
"LogisticRegression": {
"penalty": ["l1"],
"C": [0.01, 0.1, 1, 10, 100],
"class_weight": ["balanced"],
"show_warnings": False
},
"model_selection_cv": True,
"model_selection_n_folds": model_selection_n_folds
}
}
elif ml_method_class == "RandomForestClassifier":
ml_spec = {
"random_forest": {
"RandomForestClassifier": {
"n_estimators": [10, 50, 100],
"class_weight": ["balanced"],
"show_warnings": False
},
"model_selection_cv": True,
"model_selection_n_folds": model_selection_n_folds
}
}
elif ml_method_class == "SVM":
ml_spec = {
"support_vector_machine": {
"SVC": {
"penalty": ["l1"],
"dual": False,
"C": [0.01, 0.1, 1, 10, 100],
"class_weight": ["balanced"],
"show_warnings": False
},
"model_selection_cv": True,
"model_selection_n_folds": model_selection_n_folds
}
}
elif ml_method_class == "KNN":
ml_spec = {
"k_nearest_neighbors": {
"KNN": {
"n_neighbors": [3, 5, 7],
"show_warnings": False
},
"model_selection_cv": True,
"model_selection_n_folds": model_selection_n_folds
}
}
else:
ml_spec = {ml_method_class: ml_method_class}
return ml_spec
[docs]
def build_ml_methods_specs(args):
ml_methods_spec = dict()
for method in args.ml_methods:
ml_methods_spec.update(get_ml_method_spec(method))
return ml_methods_spec
[docs]
def build_settings_specs(enc_names, ml_names):
return [{"encoding": enc_name, "ml_method": ml_name} for enc_name, ml_name in it.product(enc_names, ml_names)]
[docs]
def discover_dataset_params():
dataset = glob.glob("*.iml_dataset")
assert len(dataset) > 0, "no .iml_dataset file was present in the current working directory"
assert len(dataset) < 2, "multiple .iml_dataset files were present in the current working directory"
dataset_path = dataset[0]
dataset_name = dataset_path.rsplit('.iml_dataset', 1)[0]
return {"path": dataset_path}
[docs]
def build_labels(labels_str):
labels = labels_str.split(",")
return [label.strip().strip("'\"") for label in labels]
[docs]
def build_specs(args):
specs = {
"definitions": {
"datasets": {
"dataset": {
"format": "ImmuneML",
"params": None
}
},
"encodings": dict(),
"ml_methods": dict(),
"reports": {
"coefficients": {
"Coefficients": {
"coefs_to_plot": [CoefficientPlottingSetting.N_LARGEST.name],
"n_largest": [25]
}
},
"benchmark": "MLSettingsPerformance"
}
},
"instructions": {
"inst1": {
"type": "TrainMLModel",
"settings": [],
"assessment": {
"split_strategy": "random",
"split_count": None,
"training_percentage": None,
"reports": {
"models": ["coefficients"]
}
},
"selection": {
"split_strategy": "random",
"split_count": 1,
"training_percentage": 0.7,
},
"labels": [],
"dataset": "dataset",
"strategy": "GridSearch",
"metrics": [],
"number_of_processes": 10,
"reports": ["benchmark"],
"optimization_metric": "accuracy",
'refit_optimal_model': True
}
}
}
enc_specs = build_encodings_specs(args)
ml_specs = build_ml_methods_specs(args)
settings_specs = build_settings_specs(enc_specs.keys(), ml_specs.keys())
dataset_params = discover_dataset_params()
labels = build_labels(args.labels)
specs["definitions"]["datasets"]["dataset"]["params"] = dataset_params
specs["definitions"]["encodings"] = enc_specs
specs["definitions"]["ml_methods"] = ml_specs
specs["instructions"]["inst1"]["settings"] = settings_specs
specs["instructions"]["inst1"]["assessment"]["split_count"] = args.split_count
specs["instructions"]["inst1"]["assessment"]["training_percentage"] = args.training_percentage / 100
specs["instructions"]["inst1"]["labels"] = labels
return specs
[docs]
def check_arguments(args):
assert 100 >= args.training_percentage >= 10, "training_percentage must range between 10 and 100"
assert args.split_count >= 1, "The minimal split_count is 1."
encoding_err = "When multiple encodings are used, fields must still be of equal length, add 'NA' variables where necessary"
assert len(args.sequence_type) == len(args.reads), encoding_err
assert args.position_type is None or len(args.sequence_type) == len(args.position_type), encoding_err
assert args.gap_type is None or len(args.sequence_type) == len(args.gap_type), encoding_err
assert args.k is None or len(args.sequence_type) == len(args.k), encoding_err
assert args.k_left is None or len(args.sequence_type) == len(args.k_left), encoding_err
assert args.k_right is None or len(args.sequence_type) == len(args.k_right), encoding_err
assert args.min_gap is None or len(args.sequence_type) == len(args.min_gap), encoding_err
assert args.max_gap is None or len(args.sequence_type) == len(args.max_gap), encoding_err
[docs]
def parse_commandline_arguments(args):
ReflectionHandler.get_classes_by_partial_name("", "ml_methods/")
ml_method_names = [cl.__name__ for cl in ReflectionHandler.all_nonabstract_subclasses(MLMethod)] + ["SimpleLogisticRegression"]
parser = argparse.ArgumentParser(description="tool for building immuneML Galaxy YAML from arguments")
parser.add_argument("-o", "--output_path", required=True, help="Output location for the generated yaml file (directiory).")
parser.add_argument("-f", "--file_name", default="specs.yaml",
help="Output file name for the yaml file. Default name is 'specs.yaml' if not specified.")
parser.add_argument("-l", "--labels", required=True,
help="Which metadata labels should be predicted for the dataset (separated by comma).")
parser.add_argument("-m", "--ml_methods", nargs="+", choices=ml_method_names, required=True,
help="Which machine learning methods should be applied.")
parser.add_argument("-t", "--training_percentage", type=float, required=True,
help="The percentage of data used for training.")
parser.add_argument("-c", "--split_count", type=int, required=True,
help="The number of times to repeat the training process with a different random split of the data.")
parser.add_argument("-s", "--sequence_type", choices=["complete", "subsequence"], default=["subsequence"], nargs="+",
help="Whether complete CDR3 sequences are used, or k-mer subsequences.")
parser.add_argument("-p", "--position_type", choices=["invariant", "positional"], nargs="+",
help="Whether IMGT-positional information is used for k-mers, or the k-mer positions are position-invariant.")
parser.add_argument("-g", "--gap_type", choices=["gapped", "ungapped"], nargs="+", help="Whether the k-mers contain gaps.")
parser.add_argument("-k", "--k", type=int, nargs="+", help="K-mer size.")
parser.add_argument("-kl", "--k_left", type=int, nargs="+", help="Length before gap when k-mers are used.")
parser.add_argument("-kr", "--k_right", type=int, nargs="+", help="Length after gap when k-mers are used.")
parser.add_argument("-gi", "--min_gap", type=int, nargs="+", help="Minimal gap length when gapped k-mers are used.")
parser.add_argument("-ga", "--max_gap", type=int, nargs="+", help="Maximal gap length when gapped k-mers are used.")
parser.add_argument("-r", "--reads", choices=[ReadsType.UNIQUE.value, ReadsType.ALL.value], nargs="+", default=[ReadsType.UNIQUE.value],
help="Whether k-mer counts should be scaled by unique clonotypes or all observed receptor sequences")
return parser.parse_args(args)
[docs]
def main(args):
logging.basicConfig(filename="build_yaml_from_args_log.txt", level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s')
warnings.showwarning = lambda message, category, filename, lineno, file=None, line=None: logging.warning(message)
parsed_args = parse_commandline_arguments(args)
check_arguments(parsed_args)
specs = build_specs(parsed_args)
PathBuilder.build(parsed_args.output_path)
output_location = Path(parsed_args.output_path) / parsed_args.file_name
with output_location.open("w") as file:
yaml.dump(specs, file)
return output_location
if __name__ == "__main__":
main(sys.argv[1:])