Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a joint-wrapper which parallelizes SAMTOOLS MPILEUP/VARSCAN MPILEUP2SNP #163

Open
moldach opened this issue Aug 25, 2020 · 7 comments
Open
Labels
enhancement New feature or request

Comments

@moldach
Copy link

moldach commented Aug 25, 2020

There are currently two separate wrappers for SAMTOOLS MPILEUP and VARSCAN MPILEUP2SNP

These tools are used sequentially and unfortunately single-threaded.

I'm in the process of converting shell commands to wrappers so I have not had a chance to benchmark these wrappers specifically; however, I assume it is the same as piping the output of samtools mpileup into varscan, e.g.:

samtools mpileup -f $ref ${array_bam[$j]{"bam"}} |\\
    java -jar /home/${user}/projects/def-mtarailo/common/tools/VarScan.v2.3.9.jar pileup2snp \\
    --variants \\
    --min-coverage $mincov \\
    --min-avg-qual $minqual \\
    --min-var-freq $minfreq > ${path}/calling/varscan/${array_bam[$j]{"id"}}_snp_varscan.vcf ;

When I initially benchmarked the above code on C. elegans it took 101 minutes.

It would be ideal to create a joint-wrapper, combining the two tools, taking advantage of samtools mpileup's --region parameter and GNU Parallel.

The shell command I'm currently using is:

rule variant_calling:
    input:
        ref = os.path.join(dirs_dict["REF_DIR"],config["REF_GENOME"]),
        bam = lambda wildcards: getDeduppedBams(wildcards.sample),
        bam_index = lambda wildcards: getDeduppedBamsIndex(wildcards.sample)
    output:
        os.path.join(dirs_dict["CALLING_DIR"],config["CALLING_TOOL"],"{sample}_sorted_dedupped_snp_varscan.vcf")
    log: os.path.join(dirs_dict["LOG_DIR"],config["CALLING_TOOL"],"{sample}.log")
    resources:
        mem = 10000,
        time = 90
    threads: 7
    params:
        sample = "{sample}"
    message: """--- Varscan pileup2snp."""
    shell: """
        module load samtools;
        module load java/13.0.1;
        echo -n "I II III IV V X MtDNA" |\
        xargs -d " " -n 1 -P 7 -I {{}} /bin/bash -c \
        "samtools mpileup -r {{}} \
        -f ~/projects/def-mtarailo/common/indexes/WS265_wormbase/{{}}.fa \
        {input.bam} |\
        java -Xmx5G -jar ~/projects/def-mtarailo/common/tools/VarScan.v2.3.9.jar pileup2snp \
        --variants \
        --min-coverage 5 \
        --min-avg-qual 30 \
        --min-var-freq 0.9 > {params.sample}_{{}}.vcf"
        awk 'FNR==1 && NR!=1 {{ while (/^<header>/) getline; }} 1 {{print}} ' *.vcf > {output}

        rm {params.sample}_I.vcf {params.sample}_II.vcf {params.sample}_III.vcf {params.sample}_IV.vcf {params.sample}_V.vcf {params.sample}_X.vcf {params.sample}_MtDNA.vcf
        """

This parallelized the variant calling process by applying these operations to each chromosome (on a separate core) reducing computation time to 17 minutes - A 81% decrease in processing time for the lengthiest step in the C. elegans pipeline.

@moldach moldach added the enhancement New feature or request label Aug 25, 2020
@jafors
Copy link
Contributor

jafors commented Aug 26, 2020

Hi @moldach ,

seems like parallelizing over chromosomes is really an enhancement here. Did you think about using the chromosomes as a wildcard? Then, snakemake would run every chromosome in a different job, without the need of xargs and the likes.

Additionally, you could also pipe the output from samtools mpileup to varscan pileup2snp between rules, so you can actually use a separate rule for each of those two steps.

@moldach
Copy link
Author

moldach commented Sep 15, 2020

Hi @jafors thanks for getting back to me.

I've tried your suggestion of pipe the output from samtools mpileup to varscan pileup2snp between rules and adding another rule to combine the output of all these

rule mpilup_I:
    input:
	bam=lambda wildcards: getDeduppedBams(wildcards.sample),
        reference_genome=os.path.join(dirs_dict["REF_DIR"],config["REF_GENOME"])
    output:
	os.path.join(dirs_dict["CALLING_DIR"],config["CALLING_TOOL"],"{sample}_I.mpileup.gz")
    log:
        os.path.join(dirs_dict["LOG_DIR"],config["CALLING_TOOL"],"{sample}_I_samtools_mpileup.log")
    params:
	extra="-r I",  # optional
    resources:
	mem = 1000,
        time = 30
    wrapper:
        "0.65.0/bio/samtools/mpileup"

rule mpileup_to_vcf_I:
    input:
	os.path.join(dirs_dict["CALLING_DIR"],config["CALLING_TOOL"],"{sample}_I.mpileup.gz")
    output:
	os.path.join(dirs_dict["CALLING_DIR"],config["CALLING_TOOL"],"{sample}_I.vcf")
    message:
	"Calling SNP with Varscan2"
    threads:
	2 # Keep threading value to one for unzipped mpileup input
          # Set it to two for zipped mipileup files
    log:
        os.path.join(dirs_dict["LOG_DIR"],config["CALLING_TOOL"],"varscan_{sample}_I.log")
    resources:
	mem = 1000,
        time = 30
    wrapper:
	"0.65.0/bio/varscan/mpileup2snp"

  ...

and then combine the output of these rules at the end

rule vcf_merge:
    input:
        I = os.path.join(dirs_dict["CALLING_DIR"],config["CALLING_TOOL"],"{sample}_I.vcf"),
        II = os.path.join(dirs_dict["CALLING_DIR"],config["CALLING_TOOL"],"{sample}_II.vcf"),
        III = os.path.join(dirs_dict["CALLING_DIR"],config["CALLING_TOOL"],"{sample}_III.vcf"),
        IV = os.path.join(dirs_dict["CALLING_DIR"],config["CALLING_TOOL"],"{sample}_IV.vcf"),
	V = os.path.join(dirs_dict["CALLING_DIR"],config["CALLING_TOOL"],"{sample}_V.vcf"),
        X = os.path.join(dirs_dict["CALLING_DIR"],config["CALLING_TOOL"],"{sample}_X.vcf"),
        MtDNA = os.path.join(dirs_dict["CALLING_DIR"],config["CALLING_TOOL"],"{sample}_MtDNA.vcf")
    output:
        os.path.join(dirs_dict["CALLING_DIR"],config["CALLING_TOOL"],"{sample}_sorted_dedupped_snp_varscan.vcf")
    log: os.path.join(dirs_dict["LOG_DIR"],config["CALLING_TOOL"],"{sample}_vcf-merge.log")
    resources:
        mem = 1000,
	time = 10
    threads: 1
    message: """--- Merge VarScan by Chromosome."""
    shell: """
	awk 'FNR==1 && NR!=1 {{ while (/^<header>/) getline; }} 1 {{print}} ' {input.I} {input.II} {input.III} {input.IV} {input.V} {input.X} {input.MtDNA} > {output}
        """

We don't have to list the temporary files in rule all - just the final merged .vcf:

rule all:
    input:
        expand('{CALLING_DIR}/{CALLING_TOOL}/{sample}_sorted_dedupped_snp_varscan.{ext}', CALLING_DIR=dirs_dict["CALLING_DIR"], CALLING_TOOL=config["CALLING_TOOL"], sample=sample_names, ext=['vcf']),

For C. elegans it produces a DAG, like:

(snakemake) [moldach@arc wrappers]$ snakemake --use-conda --profile slurm --jobs 13 
Building DAG of jobs...
Using shell: /usr/bin/bash
Provided cluster nodes: 13
Job counts:
	count	jobs
	1	all
	1	mpileup_to_vcf_I
	1	mpileup_to_vcf_II
	1	mpileup_to_vcf_III
	1	mpileup_to_vcf_IV
	1	mpileup_to_vcf_MtDNA
	1	mpileup_to_vcf_V
	1	mpileup_to_vcf_X
	1	mpilup_I
	1	mpilup_II
	1	mpilup_III
	1	mpilup_IV
	1	mpilup_MtDNA
	1	mpilup_V
	1	mpilup_X
	1	vcf_merge
	16

However, forever the Human genome this would require many more rules.

Is there a way to do this more succinctly?

@jafors
Copy link
Contributor

jafors commented Sep 15, 2020

There is indeed a way to improve on this when you replace the specific contigs with a wildcard.
You can expand in the merging rule:

input:
   variants=expand(os.path.join(dirs_dict["CALLING_DIR"],config["CALLING_TOOL"], "{{sample}}_{contig}.vcf"), contig=["I", "II"]) # and so on
output:
    os.path.join(dirs_dict["CALLING_DIR"],config["CALLING_TOOL"],"{sample}_sorted_dedupped_snp_varscan.vcf")

Then, you need the other mpileup and mpileup_to_vcf only once, replacing the "I" with {contig}. In the params of rule mpileup, you can do:

params:
    extra=lambda wc: "-r {}".format(wc.contig)

@moldach
Copy link
Author

moldach commented Sep 16, 2020

Thanks for the suggestion.

I've tried the following but get an error:

rule mpilup:
    input:
        bam=lambda wildcards: getDeduppedBams(wildcards.sample),
        reference_genome=os.path.join(dirs_dict["REF_DIR"],config["REF_GENOME"])
    output:
        expand(os.path.join(dirs_dict["CALLING_DIR"],config["CALLING_TOOL"],"{sample}_{contig}.mpileup.gz"), sample=sample_names, contig=["I","II","III","IV","V","X","MtDNA"])
    log:
        expand(os.path.join(dirs_dict["LOG_DIR"],config["CALLING_TOOL"],"{sample}_{contig}_samtools_mpileup.log"), sample=sample_names, contig=["I","II","III","IV","V","X","MtDNA"])
    params:
        extra=lambda wc: "-r {}".format(wc.contig)
    resources:
        mem = 1000,
        time = 30
    wrapper:
        "0.65.0/bio/samtools/mpileup"

rule mpileup_to_vcf:
    input:
        expand(os.path.join(dirs_dict["CALLING_DIR"],config["CALLING_TOOL"],"{sample}_{contig}.mpileup.gz"), sample=sample_names, contig=["I","II","III","IV","V","X","MtDNA"])
    output:
        expand(os.path.join(dirs_dict["CALLING_DIR"],config["CALLING_TOOL"],"{sample}_{contig}.vcf"), sample=sample_names, contig=["I","II","III","IV","V","X","MtDNA"])
    message:
        "Calling SNP with Varscan2"
    threads:
        2 # Keep threading value to one for unzipped mpileup input
          # Set it to two for zipped mipileup files
    log:
        expand(os.path.join(dirs_dict["LOG_DIR"],config["CALLING_TOOL"],"varscan_{sample}_{contig}.log"), sample=sample_names, contig=["I","II","III","IV","V","X","MtDNA"])
    resources:
        mem = 1000,
        time = 30
    wrapper:
        "0.65.0/bio/varscan/mpileup2snp"

Error

(snakemake) [moldach@arc wrappers]$ snakemake -n -r
Workflow defines that rule get_vep_cache is eligible for caching between workflows (use the --cache argument to enable this).
Building DAG of jobs...
InputFunctionException in line 341 of /home/moldach/wrappers/Snakefile:
Error:
  AttributeError: 'Wildcards' object has no attribute 'sample'
Wildcards:

Traceback:
  File "/home/moldach/wrappers/Snakefile", line 343, in <lambda>

Any idea what could be wrong?

@moldach
Copy link
Author

moldach commented Sep 16, 2020

vim +341 Snakefile takes me to:

rule mpilup:
    input:
        bam=lambda wildcards: getDeduppedBams(wildcards.sample),
        reference_genome=os.path.join(dirs_dict["REF_DIR"],config["REF_GENOME"])
    output:
        expand(os.path.join(dirs_dict["CALLING_DIR"],config["CALLING_TOOL"],"{sample}_{contig}.mpileup.gz"), sample=sample_names, contig=["I","II","III","IV","V","X","MtDNA"])
    log:
        expand(os.path.join(dirs_dict["LOG_DIR"],config["CALLING_TOOL"],"{sample}_{contig}_samtools_mpileup.log"), sample=sample_names, contig=["I","II","III","IV","V","X","MtDNA"])
    params:
        extra=lambda wc: "-r {}".format(wc.contig)
    resources:
        mem = 1000,
        time = 30
    wrapper:
        "0.65.0/bio/samtools/mpileup"

@jafors
Copy link
Contributor

jafors commented Sep 16, 2020

There is no need to expand in the two mpileup-related rules. Those rules will run once for all contigs over all samples. Just write

rule mpileup_to_vcf:
    input:
        os.path.join(dirs_dict["CALLING_DIR"],config["CALLING_TOOL"],"{sample}_{contig}.mpileup.gz")
    output:
        os.path.join(dirs_dict["CALLING_DIR"],config["CALLING_TOOL"],"{sample}_{contig}.vcf")

and the contigs will be determined by the expand() in rule vcf_merge similar to the samples which are determined by the expand() in rule all.
In rule mpileup, you do the same and remove the expand from the output and the log fields.

@moldach
Copy link
Author

moldach commented Sep 16, 2020

Hi @jafors thank you very much for you help. This works wonderfully.

However, I've noticed that the output of these two rules creates downstream issues with another rule #167

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants