Skip to content

Commit

Permalink
feat: make previous clustering optional
Browse files Browse the repository at this point in the history
  • Loading branch information
boasvdp committed Jun 28, 2024
1 parent f2767d0 commit 3e201af
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 95 deletions.
10 changes: 5 additions & 5 deletions Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,18 @@ for param in ["threads", "mem_gb"]:
OUT = config["output_dir"]

# find collection using collfinder
# iget collection and save as "previous_clustering" in working dir
PREVIOUS_CLUSTERING = "previous_clustering"
# iget collection and save to a path passed to cli
PREVIOUS_CLUSTERING = config["previous_clustering"]

# Configure pipeline outputs
expected_outputs = []

expected_outputs.append(OUT + "/clusters.tsv")
expected_outputs.append(OUT + "/clusters.csv")
expected_outputs.append(OUT + "/distances.tsv")

if config["clustering_type"] == "juno-variant-typing":
if config["clustering_type"] == "alignment":
expected_outputs.append(OUT + "/aln.fa.gz")
elif config["clustering_type"] == "juno-cgmlst":
elif config["clustering_type"] == "mlst":
expected_outputs.append(OUT + "/cgmlst_alleles.tsv.gz")


Expand Down
8 changes: 5 additions & 3 deletions config/presets.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
mycobacterium_tuberculosis:
threshold: 12
cluster_threshold: 12
max_distance: 200
clustering_type: "alignment"
salmonella:
threshold: 7
max_distance: 200
cluster_threshold: 7
max_distance: 200
clustering_type: "mlst"
50 changes: 32 additions & 18 deletions juno_clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import argparse
from dataclasses import dataclass, field
from juno_library import Pipeline
from typing import Optional
from typing import Optional, Union, List, ClassVar
from version import __package_name__, __version__, __description__


Expand All @@ -26,7 +26,7 @@ def main() -> None:
class JunoClustering(Pipeline):
pipeline_name: str = __package_name__
pipeline_version: str = __version__
input_type: str = "fastq"
input_type: ClassVar[Union[str|List[str]]] = ["fasta"]

def _add_args_to_parser(self) -> None:
super()._add_args_to_parser()
Expand All @@ -42,7 +42,7 @@ def _add_args_to_parser(self) -> None:
)
self.add_argument(
"--clustering-preset",
type=Path,
type=str,
required=True,
metavar="STR",
help="Type of clustering that should be performed.",
Expand All @@ -54,24 +54,23 @@ def _add_args_to_parser(self) -> None:
metavar="STR",
help="Path to a custom presets file.",
)
self.add_argument(
"--merged-cluster-separator",
type=str,
required=False,
metavar="STR",
help="Separator for merged cluster names.",
default="|",
)

def _parse_args(self) -> argparse.Namespace:
args = super()._parse_args()

# Check if max distance is not smaller than threshold
if args.max_distance < args.threshold:
raise ValueError(
"Maximum distance to calculate should be larger than threshold."
)
elif args.max_distance < 50:
logging.warning(
"""Maximum distance to calculate is set to a low value, which might remove a lot of information.\n
Note this parameter is not the clustering threshold."""
)

# Optional arguments are loaded into self here
self.previous_clustering: str = args.previous_clustering
self.clustering_preset: str = args.clustering_preset
self.presets_path: Optional[Path] = args.presets_path
self.merged_cluster_separator: str = args.merged_cluster_separator

return args

Expand All @@ -81,10 +80,11 @@ def setup(self) -> None:
self.set_presets()

if self.snakemake_args["use_singularity"]:
list_sing_args = [self.snakemake_args["singularity_args"]]
if self.previous_clustering:
list_sing_args.append(f"--bind {self.previous_clustering}:{self.previous_clustering}")
self.snakemake_args["singularity_args"] = " ".join(
[
self.snakemake_args["singularity_args"]
] # paths that singularity should be able to read from can be bound by adding to the above list
list_sing_args
)

with open(
Expand All @@ -98,8 +98,10 @@ def setup(self) -> None:
"output_dir": str(self.output_dir),
"exclusion_file": str(self.exclusion_file),
"previous_clustering": str(self.previous_clustering),
"threshold": str(self.threshold), # from presets
"merged_cluster_separator": str(self.merged_cluster_separator),
"cluster_threshold": str(self.cluster_threshold), # from presets
"max_distance": str(self.max_distance), # from presets
"clustering_type": str(self.clustering_type), # from presets
}

def set_presets(self) -> None:
Expand All @@ -113,6 +115,18 @@ def set_presets(self) -> None:
if self.clustering_preset in presets_dict.keys():
for key, value in presets_dict[self.clustering_preset].items():
setattr(self, key, value)

# Check if max distance is not smaller than threshold
if self.max_distance < self.cluster_threshold:
raise ValueError(
"Maximum distance to calculate should be larger than threshold."
)
elif self.max_distance < 50:
logging.warning(
"""Maximum distance to calculate is set to a low value, which might remove a lot of information.\n
Note this parameter is not the clustering threshold."""
)



if __name__ == "__main__":
Expand Down
84 changes: 60 additions & 24 deletions workflow/rules/clustering.smk
Original file line number Diff line number Diff line change
@@ -1,32 +1,68 @@
rule clustering:
input:
distances=OUT + "/distances.tsv",
previous_clustering=PREVIOUS_CLUSTERING + "/clusters.tsv",
output:
OUT + "/clusters.tsv",
log:
OUT + "/log/clustering.log",
message:
"Clustering {input.distances} with threshold {params.threshold}"
resources:
mem_gb=config["mem_gb"]["clustering"],
conda:
"../envs/clustering.yaml"
container:
"docker://ghcr.io/boasvdp/network_analysis:0.2"
params:
threshold=config["cluster_threshold"],
merged_cluster_separator=config["merged_cluster_separator"],
threads: config["threads"]["clustering"]
shell:
"""
# PREVIOUS_CLUSTERING is read into config as a str
if PREVIOUS_CLUSTERING == "None":

rule clustering_from_scratch:
input:
distances=OUT + "/distances.tsv",
output:
OUT + "/clusters.csv",
log:
OUT + "/log/clustering.log",
message:
"Clustering {input.distances} with threshold {params.threshold}"
resources:
mem_gb=config["mem_gb"]["clustering"],
conda:
"../envs/clustering.yaml"
container:
"docker://ghcr.io/boasvdp/network_analysis:0.2"
params:
threshold=config["cluster_threshold"],
merged_cluster_separator=config["merged_cluster_separator"],
threads: config["threads"]["clustering"]
shell:
"""
python workflow/scripts/cluster.py \
--threshold {params.threshold} \
--distances {input.distances} \
--output {output} \
--log {log} \
--verbose \
--merged-cluster-separator {params.merged_cluster_separator:q} \
--output {output}
"""

else:

rule clustering_from_previous:
input:
distances=OUT + "/distances.tsv",
previous_clustering=PREVIOUS_CLUSTERING + "/clusters.csv",
output:
OUT + "/clusters.csv",
log:
OUT + "/log/clustering.log",
message:
"Clustering {input.distances} with threshold {params.threshold}"
resources:
mem_gb=config["mem_gb"]["clustering"],
conda:
"../envs/clustering.yaml"
container:
"docker://ghcr.io/boasvdp/network_analysis:0.2"
params:
threshold=config["cluster_threshold"],
merged_cluster_separator=config["merged_cluster_separator"],
threads: config["threads"]["clustering"]
shell:
"""
python workflow/scripts/cluster.py \
--threshold {params.threshold} \
--distances {input.distances} \
--previous-clustering {input.previous_clustering} \
--output {output} \
--log {log} \
--verbose \
--merged-cluster-separator {params.merged_cluster_separator}
--merged-cluster-separator {params.merged_cluster_separator:q} \
--output {output}
"""
"""
110 changes: 67 additions & 43 deletions workflow/rules/combine_snp_profiles.smk
Original file line number Diff line number Diff line change
@@ -1,56 +1,80 @@
# TODO: Add per sample output for juno-cgmlst which can be listed by juno-library

# PREVIOUS_CLUSTERING is read into config as a str
if PREVIOUS_CLUSTERING == "None":

rule decompress_snp_profiles:
input:
PREVIOUS_CLUSTERING + "/aln.fa.gz",
output:
temp(OUT + "/old_aln.fa"),
log:
OUT + "/log/decompress_snp_profiles.log",
message:
"Uncompressing {input}."
resources:
mem_gb=config["mem_gb"]["compression"],
conda:
"../envs/scripts.yaml"
container:
"docker://ghcr.io/boasvdp/juno_clustering_scripts:0.1"
params:
script="workflow/scripts/script.py",
threads: config["threads"]["compression"]
shell:
"""
rule combine_snp_profiles_from_scratch:
input:
assemblies=[SAMPLES[sample]["assembly"] for sample in SAMPLES],
output:
temp(OUT + "/aln.fa"),
log:
OUT + "/log/combine_snp_profiles.log",
message:
"Combining SNP profiles from scratch."
resources:
mem_gb=config["mem_gb"]["compression"],
conda:
"../envs/scripts.yaml"
container:
"docker://ghcr.io/boasvdp/juno_clustering_scripts:0.1"
threads: config["threads"]["compression"]
shell:
"""
cat {input.assemblies} > {output}
"""

else:

rule decompress_snp_profiles:
input:
PREVIOUS_CLUSTERING + "/aln.fa.gz",
output:
temp(OUT + "/old_aln.fa"),
log:
OUT + "/log/decompress_snp_profiles.log",
message:
"Uncompressing {input}."
resources:
mem_gb=config["mem_gb"]["compression"],
conda:
"../envs/scripts.yaml"
container:
"docker://ghcr.io/boasvdp/juno_clustering_scripts:0.1"
params:
script="workflow/scripts/script.py",
threads: config["threads"]["compression"]
shell:
"""
pigz \
--decompress \
--stdout \
--processes {threads} \
{input} > {output}
"""

{input} 1> {output} 2> {log}
"""

rule add_snp_profiles:
input:
previous_aln=OUT + "/old_aln.fa",
assemblies=[SAMPLES[sample]["assembly"] for sample in SAMPLES],
output:
temp(OUT + "/aln.fa"),
log:
OUT + "/log/add_snp_profiles.log",
message:
"Adding SNP profiles to {input.previous_aln}."
resources:
mem_gb=config["mem_gb"]["compression"],
conda:
"../envs/scripts.yaml"
container:
"docker://ghcr.io/boasvdp/juno_clustering_scripts:0.1"
threads: config["threads"]["compression"]
shell:
"""
rule add_snp_profiles:
input:
previous_aln=OUT + "/old_aln.fa",
assemblies=[SAMPLES[sample]["assembly"] for sample in SAMPLES],
output:
temp(OUT + "/aln.fa"),
log:
OUT + "/log/add_snp_profiles.log",
message:
"Adding SNP profiles to {input.previous_aln}."
resources:
mem_gb=config["mem_gb"]["compression"],
conda:
"../envs/scripts.yaml"
container:
"docker://ghcr.io/boasvdp/juno_clustering_scripts:0.1"
threads: config["threads"]["compression"]
shell:
"""
# TODO: find better way of combining samples, e.g. make sure no duplicate names
cat {input.previous_aln} {input.assemblies} > {output}
"""
"""


rule compress_snp_profiles:
Expand Down
4 changes: 2 additions & 2 deletions workflow/rules/distance_calculation.smk
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ distle \
--input-format fasta \
--output-mode {params.output_mode} \
--maxdist {params.max_distance} \
{input} {output} 2&>1 > {log}
{input} {output} 2>&1 > {log}
"""


Expand Down Expand Up @@ -53,5 +53,5 @@ distle \
--input-format cgmlst \
--output-mode {params.output_mode} \
--maxdist {params.max_distance} \
{input} {output} 2&>1 > {log}
{input} {output} 2>&1 > {log}
"""

0 comments on commit 3e201af

Please sign in to comment.