Skip to content

Commit

Permalink
feat(magAttributes): use chunk index instead of random id (#349)
Browse files Browse the repository at this point in the history
* feat(magAttributes): use chunk index instead of random id

* fix(magAttributes): collect gtdb chunks as output

* fix(magAttributes): split gtdb table to simplify code in other modules

* fix(magAttributes): remove unnecessary scratch flag

* fix(annotation): use empty channel instead of null

* fix(cooccurrence): do not use collect files

* fix(aggregate): split gtdb before processing
  • Loading branch information
pbelmann authored Feb 25, 2024
1 parent 8ec47be commit 8e7a493
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 56 deletions.
9 changes: 5 additions & 4 deletions main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,8 @@ workflow wAggregatePipeline {
// get gtdbtk summary files
Pattern gtdbPattern = Pattern.compile('.*/magAttributes/' + params.modules.magAttributes.version.major + '..*/.*/.*_gtdbtk_combined.tsv$' )
selectedSRAMagAttributes | filter({ sra, path -> gtdbPattern.matcher(path.toString()).matches()}) \
| map { sraID, bins -> [bins, sraID] } \
| map { sraID, bins -> bins } \
| splitCsv(sep: '\t', header: true) \
| set { gtdb }

// Get genome scale metabolic model files
Expand Down Expand Up @@ -558,16 +559,16 @@ workflow wFullPipeline {
| set { binsStats }

wAnnotatePlasmidList(Channel.value("plasmid"), Channel.value("meta"), \
wPlasmidsList.out.newPlasmids, null, wPlasmidsList.out.newPlasmidsCoverage, wPlasmidsList.out.newPlasmids | map { [it[SAMPLE_IDX], 1] })
wPlasmidsList.out.newPlasmids, Channel.empty(), wPlasmidsList.out.newPlasmidsCoverage, wPlasmidsList.out.newPlasmids | map { [it[SAMPLE_IDX], 1] })

ont.bins | mix(illumina.bins) | map { sample, bins -> [sample, bins.size()] } | set { binsCounter }
wAnnotateBinsList(Channel.value("binned"), Channel.value("single"), bins, wMagAttributesList.out.gtdb?:null, contigCoverage, binsCounter)
wAnnotateBinsList(Channel.value("binned"), Channel.value("single"), bins, wMagAttributesList.out.gtdb, contigCoverage, binsCounter)

wFragmentRecruitmentList.out.foundGenomesPerSample | map { sample, genomes -> [sample, genomes.size()] } | set { recruitedGenomesCounter }
wAnnotateRecruitedGenomesList(Channel.value("binned"), Channel.value("single"), wFragmentRecruitmentList.out.foundGenomesSeperated, \
wMagAttributesList.out.gtdb, wFragmentRecruitmentList.out.contigCoverage, recruitedGenomesCounter)

wAnnotateUnbinnedList(Channel.value("unbinned"), Channel.value("meta"), notBinnedContigs, null, \
wAnnotateUnbinnedList(Channel.value("unbinned"), Channel.value("meta"), notBinnedContigs, Channel.empty(), \
contigCoverage, notBinnedContigs | map { it -> [it[SAMPLE_IDX], 1] })

BIN_ID_IDX = 1
Expand Down
34 changes: 14 additions & 20 deletions modules/annotation/module.nf
Original file line number Diff line number Diff line change
Expand Up @@ -591,26 +591,20 @@ workflow _wCreateProkkaInput {
DATASET_IDX = 0
BIN_ID_IDX = 1
PATH_IDX = 2
if(gtdb){
//get GTDB domains from the tsv file with Taxonomy for all bins:
GTDB_FILE_IDX = 0
DOMAIN_IDX = 0
gtdb | filter(it -> file(it[GTDB_FILE_IDX]).text?.trim()) \
| splitCsv(sep: '\t', header: true) \
| map { it -> def command = it[GTDB_FILE_IDX].classification.split(';')[DOMAIN_IDX].minus('d__'); [it[GTDB_FILE_IDX].SAMPLE, it[GTDB_FILE_IDX].BIN_ID, command] } \

//get GTDB domains from the tsv file with Taxonomy for all bins:
DOMAIN_IDX = 0
gtdb | map { it -> def command = it.classification.split(';')[DOMAIN_IDX].minus('d__'); [it.SAMPLE, it.BIN_ID, command] } \
| set { gtdbDomain }
//set domain for each bin
DOMAIN_PROKKA_INPUT_IDX = 3
fasta | map {it -> [it[DATASET_IDX], it[BIN_ID_IDX], file(it[PATH_IDX]) ]} \
| join(gtdbDomain, by:[DATASET_IDX, BIN_ID_IDX], remainder: true) \
| filter({ it -> it[PATH_IDX]!=null }) \
| map { it -> [ it[DATASET_IDX], it[BIN_ID_IDX], it[PATH_IDX], it[DOMAIN_PROKKA_INPUT_IDX]?:params?.steps?.annotation?.prokka?.defaultKingdom ] } \
| set { prokkaInput }
} else {
//if no gtdb annotation available, default to the defaultKingdom in params
fasta | map { it -> [it[DATASET_IDX], it[BIN_ID_IDX], it[PATH_IDX], params?.steps?.annotation?.prokka?.defaultKingdom]} | set { prokkaInput }
}


//set domain for each bin
DOMAIN_PROKKA_INPUT_IDX = 3
fasta | map {it -> [it[DATASET_IDX], it[BIN_ID_IDX], file(it[PATH_IDX]) ]} \
| join(gtdbDomain, by:[DATASET_IDX, BIN_ID_IDX], remainder: true) \
| filter({ it -> it[PATH_IDX]!=null }) \
| map { it -> [ it[DATASET_IDX], it[BIN_ID_IDX], it[PATH_IDX], it[DOMAIN_PROKKA_INPUT_IDX]?:params?.steps?.annotation?.prokka?.defaultKingdom ] } \
| set { prokkaInput }

emit:
prokkaInput
}
Expand Down Expand Up @@ -701,7 +695,7 @@ workflow wAnnotateFile {
input | map { sample, bin, path -> [sample, bin] } | groupTuple(by: DATASET_IDX) \
| map { sample, bins -> [sample, bins.size()] } | set { fastaCounter }

_wAnnotation(Channel.value("out"), Channel.value("param"), input, null, coverage, fastaCounter)
_wAnnotation(Channel.value("out"), Channel.value("param"), input, Channel.empty(), coverage, fastaCounter)
emit:
keggAnnotation = _wAnnotation.out.keggAnnotation
}
Expand Down
9 changes: 6 additions & 3 deletions modules/cooccurrence/module.nf
Original file line number Diff line number Diff line change
Expand Up @@ -251,13 +251,16 @@ workflow wCooccurrenceList {
gtdb
models
main:
gtdb | collectFile(keepHeader: true) { file, dataset -> ["gtdb", file.text]}\
| set {gtdbConcatenated}

MODEL_NAME_IDX = 0
MODEL_PATH_IDX = 1

gtdb | map { bin -> bin.BIN_ID + "\t" + bin.DOMAIN + "\t" + bin.SAMPLE + "\t" + bin.user_genome + "\t" + bin.classification } \
| collectFile(seed: "BIN_ID\tDOMAIN\tSAMPLE\tuser_genome\tclassification", newLine: true) \
| set {gtdbConcatenated}

models | map { model -> [fixModelID(model[MODEL_NAME_IDX]), model[MODEL_PATH_IDX]]} | set { fixedModel }
_wCooccurrence(abundanceMatrix, gtdbConcatenated, fixedModel)

}


Expand Down
39 changes: 21 additions & 18 deletions modules/magAttributes/module.nf
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ process pCheckM {
label 'highmemMedium'

input:
tuple val(sample), val(ending), path(bins)
tuple val(sample), val(ending), path(bins), val(chunkId)

output:
tuple path("${sample}_checkm_*.tsv", type: "file"), val("${sample}"), emit: checkm
Expand Down Expand Up @@ -95,7 +95,7 @@ process pCheckM2 {
label 'medium'

input:
tuple val(sample), val(ending), path(bins)
tuple val(sample), val(ending), path(bins), val(chunkId)

output:
tuple path("${sample}_checkm2_*.tsv", type: "file"), val("${sample}"), emit: checkm
Expand Down Expand Up @@ -133,7 +133,7 @@ process pGtdbtk {
beforeScript Utils.getCreateDatabaseDirCommand("${params.polished.databases}")

input:
tuple val(sample), val(ending), path(bins)
tuple val(sample), val(ending), path(bins), val(chunkId)

output:
tuple path("chunk_*_${sample}_gtdbtk.bac120.summary.tsv"), val("${sample}"), optional: true, emit: bacteria
Expand Down Expand Up @@ -264,18 +264,19 @@ workflow wMagAttributesList {
*
* Method takes a list of the form [SAMPLE, [BIN1 path, BIN2 path]] as input
* and produces a flattend list which is grouped by dataset and sample.
* The output has the form [SAMPLE, file ending (e.g. .fa), [BIN 1 path, BIN 2 path]]
* The output has the form [SAMPLE, file ending (e.g. .fa), [BIN 1 path, BIN 2 path], chunk id]
*
*/
def groupBins(binning, buffer){
def chunkList = [];
def SAMPLE_IDX = 0;
def BIN_PATHS_IDX = 1;
binning[BIN_PATHS_IDX].collate(buffer).each {
it.groupBy{
binning[BIN_PATHS_IDX].collate(buffer).eachWithIndex {
it, indexSample -> it.groupBy{
bin -> file(bin).name.substring(file(bin).name.lastIndexOf("."))
}.each {
ending, group -> chunkList.add([binning[SAMPLE_IDX], ending, group]);
}.eachWithIndex {
ending, group, indexFileEnding -> \
chunkList.add([binning[SAMPLE_IDX], ending, group, indexSample.toString() + indexFileEnding.toString()]);
}
}
return chunkList;
Expand All @@ -294,9 +295,7 @@ workflow _wMagAttributes {
DATASET_IDX = 0
FILE_ENDING_IDX = 1
BIN_FILES_IDX = 2
BIN_FILES_OUTPUT_GROUP_IDX = 0
BIN_FILES_OUTPUT_IDX = 0
DATASET_OUTPUT_IDX = 1

// get file ending of bin files (.fa, .fasta, ...) and group by file ending and dataset
bins | flatMap({n -> groupBins(n, params?.steps?.magAttributes?.checkm?.buffer ?: CHECKM_DEFAULT_BUFFER)}) \
Expand All @@ -308,11 +307,15 @@ workflow _wMagAttributes {

checkm2.checkm | mix(checkm.checkm) | set {checkmSelected}

// Prepare checkm2 output file
checkmSelected | groupTuple(by: DATASET_OUTPUT_IDX, remainder: true) | map { it -> it[BIN_FILES_OUTPUT_GROUP_IDX] } | flatten | map { bin -> file(bin) } \
| collectFile(keepHeader: true, newLine: false ){ item -> [ "bin_attributes.tsv", item.text ] } \
| splitCsv(sep: '\t', header: true) \
| set{ checkmSelectedList }
// Prepare checkm output file
checkmSelected | splitCsv(sep: '\t', header: true) \
| map { checkmDict, sample -> checkmDict } \
| set { checkmList }

// Prepare gtdb output file
gtdb.combined | splitCsv(sep: '\t', header: true) \
| map { gtdb, sample -> gtdb } \
| set { gtdbCombinedList }

if(params.summary){
// collect checkm files for checkm2 results across multiple datasets
Expand All @@ -331,9 +334,9 @@ workflow _wMagAttributes {
}
}

pGtdbtk.out.logs | mix(pCheckM.out.logs) | pDumpLogs
pGtdbtk.out.logs | mix(pCheckM.out.logs) | mix(pCheckM2.out.logs) | pDumpLogs

emit:
checkm = checkmSelectedList
gtdb = gtdb.combined
checkm = checkmList
gtdb = gtdbCombinedList
}
2 changes: 1 addition & 1 deletion templates/checkm.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

# Prepare checkm patch, output directory and output file name
mkdir out
FILE_ID=$(mktemp XXXXXXXX)
FILE_ID=!{chunkId}
FILE=!{sample}_checkm_${FILE_ID}.tsv

# Check developer documentation
Expand Down
40 changes: 30 additions & 10 deletions templates/gtdb.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@ gtdbtk classify_wf --batchfile input.tsv --out_dir output --cpus !{task.cpus} \
# reformat gtdbtk output files
touch output/gtdbtk.bac120.summary.tsv
touch output/gtdbtk.ar53.summary.tsv
FILE_ID=$(mktemp -u XXXXXXXXXX)
FILE_ID=!{chunkId}
FILE_BAC=chunk_${FILE_ID}_!{sample}_gtdbtk.bac120.summary.tsv
FILE_BAC_TMP=chunk_${FILE_ID}_!{sample}_gtdbtk.bac120.summary.tmp.tsv
FILE_ARC=chunk_${FILE_ID}_!{sample}_gtdbtk.ar53.summary.tsv
FILE_ARC_TMP=chunk_${FILE_ID}_!{sample}_gtdbtk.ar53.summary.tmp.tsv
FILE_COMB_TMP=chunk_${FILE_ID}_!{sample}_gtdbtk_combined.tmp.tsv
FILE_COMB=chunk_${FILE_ID}_!{sample}_gtdbtk_combined.tsv
FILE_UNCLASSIFIED=chunk_${FILE_ID}_!{sample}_gtdbtk_unclassified.tsv
FILE_UNCLASSIFIED_TMP=chunk_${FILE_ID}_!{sample}_gtdbtk_unclassified.tmp.tsv

# Filter out unclassified
head -n 1 output/gtdbtk.bac120.summary.tsv > output/unclassified.tsv
Expand All @@ -31,19 +35,35 @@ grep -v "$(printf '\t')Unclassified$(printf '\t')" output/gtdbtk.bac120.summary.

grep -v "$(printf '\t')Unclassified Archaea$(printf '\t')" output/gtdbtk.ar53.summary.tsv > output/classifiedArchaea.tsv || true

sed "s/^/SAMPLE\t/g" <(head -n 1 output/unclassified.tsv) > ${FILE_UNCLASSIFIED}
sed "s/^/!{sample}\t/g" <(tail -n +2 output/unclassified.tsv) >> ${FILE_UNCLASSIFIED}
sed "s/^/SAMPLE\t/g" <(head -n 1 output/unclassified.tsv) > ${FILE_UNCLASSIFIED_TMP}
sed "s/^/!{sample}\t/g" <(tail -n +2 output/unclassified.tsv) >> ${FILE_UNCLASSIFIED_TMP}

sed "s/^/SAMPLE\t/g" <(head -n 1 output/classifiedBacteria.tsv) > $FILE_BAC
sed "s/^/!{sample}\t/g" <(tail -n +2 output/classifiedBacteria.tsv) >> $FILE_BAC
sed "s/^/SAMPLE\t/g" <(head -n 1 output/classifiedBacteria.tsv) > $FILE_BAC_TMP
sed "s/^/!{sample}\t/g" <(tail -n +2 output/classifiedBacteria.tsv) >> $FILE_BAC_TMP

sed "s/^/SAMPLE\t/g" <(head -n 1 output/classifiedArchaea.tsv) > $FILE_ARC
sed "s/^/!{sample}\t/g" <(tail -n +2 output/classifiedArchaea.tsv) >> $FILE_ARC
sed "s/^/SAMPLE\t/g" <(head -n 1 output/classifiedArchaea.tsv) > $FILE_ARC_TMP
sed "s/^/!{sample}\t/g" <(tail -n +2 output/classifiedArchaea.tsv) >> $FILE_ARC_TMP

GTDB_SUMMARY_TMP=gtdbtk_tmp.tsv
cat <(head -n 1 ${FILE_BAC}) <(head -n 1 ${FILE_ARC}) | sort | uniq | sed 's/^/DOMAIN\t/g' > $GTDB_SUMMARY_TMP
cat <(tail -n +2 ${FILE_ARC} | sed 's/^/ARCHAEA\t/g') <(tail -n +2 ${FILE_BAC} | sed 's/^/BACTERIA\t/g') >> $GTDB_SUMMARY_TMP
paste -d$'\t' <(cut -f 3 $GTDB_SUMMARY_TMP | sed '1,1s/user_genome/BIN_ID/') $GTDB_SUMMARY_TMP > $FILE_COMB
cat <(head -n 1 ${FILE_BAC_TMP}) <(head -n 1 ${FILE_ARC_TMP}) | sort | uniq | sed 's/^/DOMAIN\t/g' > $GTDB_SUMMARY_TMP
cat <(tail -n +2 ${FILE_ARC_TMP} | sed 's/^/ARCHAEA\t/g') <(tail -n +2 ${FILE_BAC_TMP} | sed 's/^/BACTERIA\t/g') >> $GTDB_SUMMARY_TMP
paste -d$'\t' <(cut -f 3 $GTDB_SUMMARY_TMP | sed '1,1s/user_genome/BIN_ID/') $GTDB_SUMMARY_TMP > $FILE_COMB_TMP

if [[ $(wc -l <${FILE_COMB_TMP}) -ge 2 ]]; then
mv ${FILE_COMB_TMP} ${FILE_COMB}
fi

if [[ $(wc -l <${FILE_BAC_TMP}) -ge 2 ]]; then
mv ${FILE_BAC_TMP} ${FILE_BAC}
fi

if [[ $(wc -l <${FILE_ARC_TMP}) -ge 2 ]]; then
mv ${FILE_ARC_TMP} ${FILE_ARC}
fi

if [[ $(wc -l <${FILE_UNCLASSIFIED_TMP}) -ge 2 ]]; then
mv ${FILE_UNCLASSIFIED_TMP} ${FILE_UNCLASSIFIED}
fi

# Copy trees to output
for tree in $(ls -1 output/classify/*.tree); do
Expand Down

0 comments on commit 8e7a493

Please sign in to comment.