Skip to content

Commit

Permalink
feat(export): initial emgb export functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
pbelmann committed Dec 10, 2024
1 parent 386e88f commit fdf4fc6
Show file tree
Hide file tree
Showing 12 changed files with 669 additions and 48 deletions.
8 changes: 8 additions & 0 deletions .github/workflows/workflow_modules.yml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ jobs:
" --output=output --smetana_image=pbelmann/metabolomics:0.1.0 \
--steps.cooccurrence.beforeProcessScript=/vol/spool/dockerPull.sh " ./example_params/fullPipelineAggregate.yml \
"${WORK_DIR}" ${PROFILE} || exit 1
- name: Test export workflow based on previous test
run: |
bash ./scripts/test_fullPipelineExport.sh \
" --output=output --input=output --smetana_image=pbelmann/metabolomics:0.1.0 \
"" " ./example_params/export.yml \
"${WORK_DIR}" ${PROFILE} || exit 1
- name: Test if SRA S3 Input module works with ONT data
run: |
bash ./scripts/test_fullPipeline.sh " -c test_data/assets/aws.config " \
Expand Down
47 changes: 47 additions & 0 deletions example_params/export.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
tempdir: "tmp"
summary: false
s3SignIn: false
input: "output"
output: "output"
logDir: log
runid: 1
databases: "/mnt/databases"
logLevel: 1
scratch: "/vol/scratch"
publishDirMode: "symlink"
steps:
export:
emgb:
titles:
database:
download:
source: "https://openstack.cebitec.uni-bielefeld.de:8080/databases/uniref90.titles.tsv.gz"
md5sum: aaf1dd9021243def8e6c4e438b4b3669
kegg:
database:
download:
source: s3://databases_internal/annotatedgenes2json_db_kegg-mirror-2022-12.tar.zst
md5sum: 262dab8ca564fbc1f27500c22b5bc47b
s5cmd:
params: '--retry-count 30 --no-verify-ssl --endpoint-url https://openstack.cebitec.uni-bielefeld.de:8080'
resources:
highmemLarge:
cpus: 28
memory: 230
highmemMedium:
cpus: 14
memory: 113
large:
cpus: 28
memory: 58
medium:
cpus: 14
memory: 29
small:
cpus: 7
memory: 14
tiny:
cpus: 1
memory: 1


14 changes: 14 additions & 0 deletions example_params/fullPipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,20 @@ steps:
additionalParams:
run: ""
report: ""
export:
emgb:
titles:
database:
download:
source: "https://openstack.cebitec.uni-bielefeld.de:8080/databases/uniref90.titles.tsv.gz"
md5sum: aaf1dd9021243def8e6c4e438b4b3669
kegg:
database:
download:
source: s3://databases_internal/annotatedgenes2json_db_kegg-mirror-2022-12.tar.zst
md5sum: 262dab8ca564fbc1f27500c22b5bc47b
s5cmd:
params: '--retry-count 30 --no-verify-ssl --endpoint-url https://openstack.cebitec.uni-bielefeld.de:8080'
resources:
highmemLarge:
cpus: 28
Expand Down
309 changes: 273 additions & 36 deletions main.nf

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion modules/annotation/module.nf
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ process pMMseqs2 {

container "${params.mmseqs2_image}"


// Databases will be downloaded to a fixed place so that they can be used by future processes.
// This fixed place has to be outside of the working-directory to be easy to find for every process.
// Therefore this place has to be mounted to the docker container to be accessible during run time.
Expand Down Expand Up @@ -610,6 +611,11 @@ workflow wAnnotateList {
emit:
keggAnnotation = _wAnnotation.out.keggAnnotation
proteins = _wAnnotation.out.prokka_faa
ffn = _wAnnotation.out.prokka_ffn
gff = _wAnnotation.out.prokka_gff
faa = _wAnnotation.out.prokka_faa
mmseqs2_taxonomy = _wAnnotation.out.mmseqs2_taxonomy
mmseqs2_blast = _wAnnotation.out.mmseqs2_blast
}

/**
Expand Down Expand Up @@ -702,7 +708,6 @@ workflow _wSplit {
| pCount | combine(chunkSize) | flatMap { sample -> \
Utils.splitFilesIndex(Integer.parseInt(sample[COUNT_IDX]), sample[CHUNK_SIZE_IDX], [sample[SAMPLE_IDX], sample[FILE_2_IDX], sample[FILE_3_IDX]]) } \
| set { chunks }

emit:
chunks
}
Expand Down
165 changes: 165 additions & 0 deletions modules/export/emgb.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
include { pDumpLogs } from '../utils/processes'



def getOutput(SAMPLE, RUNID, TOOL, filename){
return SAMPLE + '/' + RUNID + '/' + params.modules.export.name + '/' +
params.modules.export.version.major + "." +
params.modules.export.version.minor + "." +
params.modules.export.version.patch +
'/' + TOOL + '/' + filename
}

process pEMGBAnnotatedContigs {

container "${params.emgbAnnotatedContigs_image}"

tag "Sample: $sample"

publishDir params.output, mode: "${params.publishDirMode}", saveAs: { filename -> getOutput("${sample}", params.runid, "emgb", filename) }

when params.steps.containsKey("export") && params.steps.export.containsKey("emgb")

containerOptions " --entrypoint='' "

label 'tiny'

input:
tuple val(sample), path(contigs), path(mapping), path("bins/*")

output:

tuple val("${sample}"), path("${sample}.contigs.json.gz"), emit: json
tuple env(FILE_ID), val("${output}"), val(params.LOG_LEVELS.INFO), file(".command.sh"), \
file(".command.out"), file(".command.err"), file(".command.log"), emit: logs

shell:
template 'emgbAnnotatedContigs.sh'
}


process pEMGBAnnotatedBins {

container "${params.emgbAnnotatedBins_image}"

tag "Sample: $sample"

publishDir params.output, mode: "${params.publishDirMode}", saveAs: { filename -> getOutput("${sample}", params.runid, "emgb", filename) }

when params.steps.containsKey("export") && params.steps.export.containsKey("emgb")

containerOptions " --entrypoint='' "

label 'tiny'

input:
tuple val(sample), path(checkm), path(gtdbtk), path("bins/*")

output:
tuple val("${sample}"), path("${sample}.bins.json.gz"), emit: json
tuple env(FILE_ID), val("${output}"), val(params.LOG_LEVELS.INFO), file(".command.sh"), \
file(".command.out"), file(".command.err"), file(".command.log"), emit: logs

shell:
template 'emgbAnnotatedBins.sh'
}


process pEMGBAnnotatedGenes {

container "${params.emgbAnnotatedGenes_image}"

tag "Sample: $sample"

publishDir params.output, mode: "${params.publishDirMode}", saveAs: { filename -> getOutput("${sample}", params.runid, "emgb", filename) }

secret { "${S3_EMGB_TITLES_ACCESS}"!="" ? ["S3_EMGB_TITLES_ACCESS", "S3_EMGB_TITLES_SECRET"] : [] }

secret { "${S3_EMGB_KEGG_ACCESS}"!="" ? ["S3_EMGB_KEGG_ACCESS", "S3_EMGB_KEGG_SECRET"] : [] }

containerOptions Utils.getDockerMount(params.steps?.export?.emgb?.titles?.database, params) + Utils.getDockerMount(params.steps?.export?.emgb?.kegg?.database, params) + Utils.getDockerNetwork() + " --entrypoint='' "

when params.steps.containsKey("export") && params.steps.export.containsKey("emgb")

memory { Utils.getMemoryResources(params.resources.medium, "${sample}", task.attempt, params.resources) }

cpus { Utils.getCPUsResources(params.resources.medium, "${sample}", task.attempt, params.resources) }

beforeScript Utils.getCreateDatabaseDirCommand("${params.polished.databases}")

input:
tuple val(sample), path("gff/*"), path("ffn/*"), path("faa/*"), path("taxonomy/*"), path("blastResult/*"), path("blastKeggResult/*"), path("bins/*")

output:
tuple val("${sample}"), path("${sample}.genes.json.gz"), emit: json
tuple env(FILE_ID), val("${output}"), val(params.LOG_LEVELS.INFO), file(".command.sh"), \
file(".command.out"), file(".command.err"), file(".command.log"), emit: logs

shell:
TITLES_S5CMD_PARAMS=params?.steps?.export?.emgb?.titles?.database?.download?.s5cmd?.params ?: ""
TITLES_DOWNLOAD_LINK=params?.steps?.export?.emgb?.titles?.database?.download?.source ?: ""
TITLES_MD5SUM=params.steps?.export?.emgb?.titles?.database?.download?.md5sum ?: ""
TITLES_EXTRACTED_DB=params.steps?.export?.emgb?.titles?.database?.extractedDBPath ?: ""
KEGG_S5CMD_PARAMS=params?.steps?.export?.emgb?.kegg?.database?.download?.s5cmd?.params ?: ""
KEGG_DOWNLOAD_LINK=params?.steps?.export?.emgb?.kegg?.database?.download?.source ?: ""
KEGG_MD5SUM=params.steps?.export?.emgb?.kegg?.database?.download?.md5sum ?: ""
KEGG_EXTRACTED_DB=params.steps?.export?.emgb?.kegg?.database?.extractedDBPath ?: ""

TITLES_S3_EMGB_ACCESS=params?.steps?.export?.emgb?.titles?.database?.download?.s5cmd && TITLES_S5CMD_PARAMS.indexOf("--no-sign-request") == -1 ? "\$S3_EMGB_TITLES_ACCESS" : ""
TITLES_S3_EMGB_SECRET=params?.steps?.export?.emgb?.titles?.database?.download?.s5cmd && TITLES_S5CMD_PARAMS.indexOf("--no-sign-request") == -1 ? "\$S3_EMGB_TITLES_SECRET" : ""

KEGG_S3_EMGB_ACCESS=params?.steps?.export?.emgb?.kegg?.database?.download?.s5cmd && KEGG_S5CMD_PARAMS.indexOf("--no-sign-request") == -1 ? "\$S3_EMGB_KEGG_ACCESS" : ""
KEGG_S3_EMGB_SECRET=params?.steps?.export?.emgb?.kegg?.database?.download?.s5cmd && KEGG_S5CMD_PARAMS.indexOf("--no-sign-request") == -1 ? "\$S3_EMGB_KEGG_SECRET" : ""
template 'emgbAnnotatedGenes.sh'
}



// CONTIGS: [test1, /vol/spool/metagenomics-tk/work_wFullPipeline/cd/7c943c0808e6d36c72a64834e7a88e/test1_contigs.fa.gz]
// [test2, /vol/spool/metagenomics-tk/work_wFullPipeline/00/71fd0e590f4f03fda4cb87903a3b38/test2_contigs.fa.gz]
// mapping: [test2, /vol/spool/metagenomics-tk/work_wFullPipeline/7c/43a3513d49143a76a29c4404417997/test2.bam]
// bins: [test1, [/vol/spool/metagenomics-tk/work_wFullPipeline/95/085ffbe17a6f2ebfb60709d3f33cf3/test1_bin.1.fa, /vol/spool/metagenomics-tk/work_wFullPipeline/95/085ffbe17a6f2ebfb60709d3f33cf3/test1_bin.2.fa]]
// gtdbtk: [test2, [/vol/spool/metagenomics-tk/work_wFullPipeline/d2/5b5937c7a950d87cd796e2bf80cfcd/chunk_00_test2_gtdbtk.bac120.summary.tsv]]
// checkm: [test2, /vol/spool/metagenomics-tk/work_wFullPipeline/ff/1292ed3399aada6fa1dffc18cfbc12/test2_checkm2_EfY2cPIh.tsv]
// gff,ffn,faa: [test2, test2_bin.2.fa, /vol/spool/metagenomics-tk/work_wFullPipeline/78/1224783c7ae2d9e0a8d4264893e47a/test2_bin.2.gff.gz]
// mmseqsTaxonomy: [ncbi_nr, test1, /vol/spool/metagenomics-tk/work_wFullPipeline/93/56bf5c34d7524930d73a6ef45ae23d/test1_binned.ncbi_nr.taxonomy.tsv]
//mmseqsBlast:
// [bacmet20_predicted, test2, binned, 1, 2922, 1, /vol/spool/metagenomics-tk/work_wFullPipeline/3c/e5221be7fd305dc056356201de7d61/test2_binned.1.2922.bacmet20_predicted.blast.tsv]
// [uniref90, test1, binned, 1, 2923, 1, /vol/spool/metagenomics-tk/work_wFullPipeline/a0/8e7a2a7b928731dbcfd62cbf546d4f/test1_binned.1.2923.uniref90.blast.tsv]
// [bacmet20_predicted, test1, binned, 1, 2923, 1, /vol/spool/metagenomics-tk/work_wFullPipeline/2f/dffc66ee85b2a1baf296323fcc894c/test1_binned.1.2923.bacmet20_predicted.blast.tsv]
// [kegg, test2, binned, 1, 2922, 1, /vol/spool/metagenomics-tk/work_wFullPipeline/5a/74adca85c1b4005aa490440f8b05ba/test2_binned.1.2922.kegg.blast.tsv]
// [uniref90, test2, binned, 1, 2922, 1, /vol/spool/metagenomics-tk/work_wFullPipeline/57/5f4e7f5ae75937613d7c813be6c8fd/test2_binned.1.2922.uniref90.blast.tsv]
// [kegg, test1, binned, 1, 2923, 1, /vol/spool/metagenomics-tk/work_wFullPipeline/6b/41a5ce820f9fb5e897b6a9109e5678/test1_binned.1.2923.kegg.blast.tsv]
workflow wEMGBList {
take:
contigs
mapping
bins
gtdbtk
checkm
gff
ffn
faa
mmseqsTaxonomy
mmseqsBlast
main:
SAMPLE_IDX = 0
contigs | combine(mapping, by: SAMPLE_IDX) | combine(bins, by: SAMPLE_IDX) | pEMGBAnnotatedContigs
checkm | combine(gtdbtk, by: SAMPLE_IDX) | combine(bins, by: SAMPLE_IDX) | pEMGBAnnotatedBins

mmseqsBlast | filter { db, sample, type, start, end, chunkNumber, blastResult -> db == "uniref90" } \
| map { db, sample, type, start, end, chunkNumber, blastResult -> [sample, blastResult] } \
| groupTuple(by: SAMPLE_IDX) | set { selectedDBBlastResults }

mmseqsBlast | filter { db, sample, type, start, end, chunkNumber, blastResult -> db == "kegg" } \
| map { db, sample, type, start, end, chunkNumber, blastResult -> [sample, blastResult] } \
| groupTuple(by: SAMPLE_IDX) | set { selectedKeggBlastResults }

gff | map { sample, bin, gff -> [sample, gff] } | groupTuple(by: SAMPLE_IDX) \
| combine(ffn | map { sample, bin, ffn -> [sample, ffn] } | groupTuple(by: SAMPLE_IDX), by: SAMPLE_IDX) \
| combine(faa | map { sample, bin, faa -> [sample, faa] } | groupTuple(by: SAMPLE_IDX), by: SAMPLE_IDX) \
| combine(mmseqsTaxonomy | map { db, sample, blastResult -> [sample, blastResult] } | groupTuple(by: SAMPLE_IDX), by: SAMPLE_IDX) \
| combine(selectedDBBlastResults, by: SAMPLE_IDX) \
| combine(selectedKeggBlastResults, by: SAMPLE_IDX) \
| combine(bins, by: SAMPLE_IDX) | pEMGBAnnotatedGenes
}
32 changes: 21 additions & 11 deletions modules/magAttributes/module.nf
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ process pCheckM {
tuple val(sample), val(ending), path(bins), val(chunkId)

output:
tuple path("${sample}_checkm_*.tsv", type: "file"), val("${sample}"), emit: checkm
tuple val("${sample}"), path("${sample}_checkm_*.tsv", type: "file"), emit: checkm
tuple env(FILE_ID), val("${output}"), val(params.LOG_LEVELS.INFO), file(".command.sh"), \
file(".command.out"), file(".command.err"), file(".command.log"), emit: logs

Expand Down Expand Up @@ -98,7 +98,7 @@ process pCheckM2 {
tuple val(sample), val(ending), path(bins), val(chunkId)

output:
tuple path("${sample}_checkm2_*.tsv", type: "file"), val("${sample}"), emit: checkm
tuple val("${sample}"), path("${sample}_checkm2_*.tsv", type: "file"), emit: checkm
tuple env(FILE_ID), val("${output}"), val(params.LOG_LEVELS.INFO), file(".command.sh"), \
file(".command.out"), file(".command.err"), file(".command.log"), emit: logs

Expand Down Expand Up @@ -136,12 +136,12 @@ process pGtdbtk {
tuple val(sample), val(ending), path(bins), val(chunkId)

output:
tuple path("chunk_*_${sample}_gtdbtk.bac120.summary.tsv"), val("${sample}"), optional: true, emit: bacteria
tuple path("chunk_*_${sample}_gtdbtk.ar122.summary.tsv"), val("${sample}"), optional: true, emit: archea
tuple path("chunk_*_${sample}_gtdbtk_unclassified.tsv"), val("${sample}"), optional: true, emit: unclassified
tuple path("*.tree"), val("${sample}"), optional: true, emit: tree
tuple path("chunk_*_${sample}_gtdbtk_combined.tsv"), val("${sample}"), optional: true, emit: combined
tuple path("chunk_*_${sample}_missing_bins.tsv"), val("${sample}"), optional: true, emit: missing
tuple val("${sample}"), path("chunk_*_${sample}_gtdbtk.bac120.summary.tsv"), optional: true, emit: bacteria
tuple val("${sample}"), path("chunk_*_${sample}_gtdbtk.ar122.summary.tsv"), optional: true, emit: archea
tuple val("${sample}"), path("chunk_*_${sample}_gtdbtk_unclassified.tsv"), optional: true, emit: unclassified
tuple val("${sample}"), path("*.tree"), val("${sample}"), optional: true, emit: tree
tuple val("${sample}"), path("chunk_*_${sample}_gtdbtk_combined.tsv"), optional: true, emit: combined
tuple val("${sample}"), path("chunk_*_${sample}_missing_bins.tsv"), optional: true, emit: missing
tuple env(FILE_ID), val("${output}"), val(params.LOG_LEVELS.INFO), file(".command.sh"), \
file(".command.out"), file(".command.err"), file(".command.log"), emit: logs

Expand Down Expand Up @@ -259,6 +259,9 @@ workflow wMagAttributesList {
checkm = _wMagAttributes.out.checkm
gtdb = _wMagAttributes.out.gtdb
gtdbMissing = _wMagAttributes.out.gtdbMissing
gtdbArcheaFiles = _wMagAttributes.out.gtdbArcheaFiles
gtdbBacteriaFiles = _wMagAttributes.out.gtdbBacteriaFiles
checkmFiles = _wMagAttributes.out.checkmFiles
}


Expand Down Expand Up @@ -311,19 +314,23 @@ workflow _wMagAttributes {

// Prepare checkm output file
checkmSelected | splitCsv(sep: '\t', header: true) \
| map { checkmDict, sample -> checkmDict } \
| map { sample, checkmDict -> checkmDict } \
| set { checkmList }

// Prepare gtdb output file
gtdb.combined | splitCsv(sep: '\t', header: true) \
| map { gtdb, sample -> gtdb } \
| map { sample, gtdb -> gtdb } \
| set { gtdbCombinedList }

// Prepare missing gtdb output file
gtdb.missing | splitCsv(sep: '\t', header: true) \
| map { gtdb, sample -> gtdb } \
| map { sample, gtdb -> gtdb } \
| set { gtdbMissingList }

gtdb.bacteria | set { gtdbBacteriaFiles }

gtdb.archea | set { gtdbArcheaFiles }

if(params.summary){
// collect checkm files for checkm2 results across multiple datasets
checkmSelected \
Expand All @@ -347,4 +354,7 @@ workflow _wMagAttributes {
checkm = checkmList
gtdb = gtdbCombinedList
gtdbMissing = gtdbMissingList
gtdbArcheaFiles = gtdbArcheaFiles
gtdbBacteriaFiles = gtdbBacteriaFiles
checkmFiles = checkmSelected
}
11 changes: 11 additions & 0 deletions nextflow.config
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,9 @@ params {

skipVersionCheck = false

emgbAnnotatedGenes_image = "quay.io/emgb/annotatedgenes2json:2.5.2z"
emgbAnnotatedContigs_image = "quay.io/emgb/annotatedcontigs2json:2.2.4z"
emgbAnnotatedBins_image = "quay.io/emgb/annotatedbins2json:2.4.0z"
pysradb_image = "quay.io/biocontainers/pysradb:1.4.1--pyhdfd78af_0"
minimap2_image= "quay.io/biocontainers/minimap2:2.24--h7132678_1"
metaflye_image = "quay.io/biocontainers/flye:2.9--py36h7281c5b_1"
Expand Down Expand Up @@ -498,6 +501,14 @@ params {
patch = 0
}
}
export {
name = "export"
version {
major = 0
minor = 1
patch = 0
}
}
readMapping {
name = "readMapping"
version {
Expand Down
Loading

0 comments on commit fdf4fc6

Please sign in to comment.