Skip to content

Commit

Permalink
Merge branch 'CW-4899' into 'dev'
Browse files Browse the repository at this point in the history
Use new nextclade3 image [CW-4899]

Closes CW-4899

See merge request epi2melabs/workflows/wf-artic!167
  • Loading branch information
bede committed Sep 11, 2024
2 parents 7752e7a + f906bb4 commit 8a668e5
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 37 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ repos:
always_run: true
pass_filenames: false
additional_dependencies:
- epi2melabs==0.0.56
- epi2melabs==0.0.57
- id: build_models
name: build_models
entry: datamodel-codegen --strict-nullable --base-class workflow_glue.results_schema_helpers.BaseModel --use-schema-description --disable-timestamp --input results_schema.yml --input-file-type openapi --output bin/workflow_glue/results_schema.py
Expand Down
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [v1.2.1]
### Changed
- Reconcile workflow with wf-template v5.2.5

## [v1.2.0]
### Added
- Logging to file for artic commands (`{barcode}.artic.log.txt`)
Expand Down Expand Up @@ -36,7 +40,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [v0.3.32]
### Fixed
- reporting of sequence summaries crashing with `TypeError`
- reporting of sequence summaries crashing with `TypeError`
- sed error when writing to tmp

## [v0.3.31]
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ therefore Nextflow will need to be
installed before attempting to run the workflow.

The workflow can currently be run using either
[Docker](https://www.docker.com/products/docker-desktop
[Docker](https://www.docker.com/products/docker-desktop)
or [Singularity](https://docs.sylabs.io/guides/3.0/user-guide/index.html)
to provide isolation of the required software.
Both methods are automated out-of-the-box provided
Expand Down
8 changes: 7 additions & 1 deletion bin/workflow_glue/check_bam_headers_in_dir.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,13 @@ def main(args):
for xam_file in target_files:
# get the `@SQ` and `@HD` lines in the header
with pysam.AlignmentFile(xam_file, check_sq=False) as f:
sq_lines = f.header.get("SQ")
# compare only the SN/LN/M5 elements of SQ to avoid labelling XAM with
# same reference but different SQ.UR as mixed_header (see CW-4842)
sq_lines = [{
"SN": sq["SN"],
"LN": sq["LN"],
"M5": sq.get("M5"),
} for sq in f.header.get("SQ", [])]
hd_lines = f.header.get("HD")
# Check if it is sorted.
# When there is more than one BAM, merging/sorting
Expand Down
12 changes: 6 additions & 6 deletions bin/workflow_glue/check_xam_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ def validate_xam_index(xam_file):
Invalid indexes will fail the call with a ValueError:
ValueError: fetch called on bamfile without index
"""
alignments = pysam.AlignmentFile(xam_file, check_sq=False)
try:
alignments.fetch()
has_valid_index = True
except ValueError:
has_valid_index = False
with pysam.AlignmentFile(xam_file, check_sq=False) as alignments:
try:
alignments.fetch()
has_valid_index = True
except ValueError:
has_valid_index = False
return has_valid_index


Expand Down
2 changes: 1 addition & 1 deletion docs/04_install_and_run.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ therefore Nextflow will need to be
installed before attempting to run the workflow.

The workflow can currently be run using either
[Docker](https://www.docker.com/products/docker-desktop
[Docker](https://www.docker.com/products/docker-desktop)
or [Singularity](https://docs.sylabs.io/guides/3.0/user-guide/index.html)
to provide isolation of the required software.
Both methods are automated out-of-the-box provided
Expand Down
1 change: 1 addition & 0 deletions lib/common.nf
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ process getParams {
}

process configure_igv {
publishDir "${params.out_dir}/", mode: 'copy', pattern: 'igv.json', enabled: params.containsKey("igv") && params.igv
label "wf_common"
cpus 1
memory "2 GB"
Expand Down
68 changes: 49 additions & 19 deletions lib/ingress.nf
Original file line number Diff line number Diff line change
Expand Up @@ -197,15 +197,15 @@ def fastq_ingress(Map arguments)
.map { meta, files, stats ->
// new `arity: '1..*'` would be nice here
files = files instanceof List ? files : [files]
new_keys = [
def new_keys = [
"group_key": groupKey(meta["alias"], files.size()),
"n_fastq": files.size()]
grp_index = (0..<files.size()).collect()
def grp_index = (0..<files.size()).collect()
[meta + new_keys, files, grp_index, stats]
}
.transpose(by: [1, 2]) // spread multiple fastq files into separate emissions
.map { meta, files, grp_i, stats ->
new_keys = [
def new_keys = [
"group_index": "${meta["alias"]}_${grp_i}"]
[meta + new_keys, files, stats]
}
Expand Down Expand Up @@ -279,17 +279,19 @@ def xam_ingress(Map arguments)
// sorted, the index will be used.
meta, paths ->
boolean is_array = paths instanceof ArrayList
String xai_fn
String src_xam
String src_xai
// Using `.uri` or `.Uri()` leads to S3 paths to be prefixed with `s3:///`
// instead of `s3://`, causing the workflow to not find the index file.
// `.toUriString()` returns the correct path.
if (!is_array){
src_xam = paths.toUriString()
def xai = file(paths.toUriString() + ".bai")
if (xai.exists()){
xai_fn = xai.toUriString()
src_xai = xai.toUriString()
}
}
[meta + [xai_fn: xai_fn], paths]
[meta + [src_xam: src_xam, src_xai: src_xai], paths]
}
| checkBamHeaders
| map { meta, paths, is_unaligned_env, mixed_headers_env, is_sorted_env ->
Expand Down Expand Up @@ -331,9 +333,9 @@ def xam_ingress(Map arguments)
// - between 1 and `N_OPEN_FILES_LIMIT` aligned files
no_files: n_files == 0
indexed: \
n_files == 1 && (meta["is_unaligned"] || meta["is_sorted"]) && meta["xai_fn"]
to_index:
n_files == 1 && (meta["is_unaligned"] || meta["is_sorted"]) && !meta["xai_fn"]
n_files == 1 && (meta["is_unaligned"] || meta["is_sorted"]) && meta["src_xai"]
to_index: \
n_files == 1 && (meta["is_unaligned"] || meta["is_sorted"]) && !meta["src_xai"]
to_catsort: \
(n_files == 1) || (n_files > N_OPEN_FILES_LIMIT) || meta["is_unaligned"]
to_merge: true
Expand All @@ -358,20 +360,20 @@ def xam_ingress(Map arguments)
.map { meta, files, stats ->
// new `arity: '1..*'` would be nice here
files = files instanceof List ? files : [files]
new_keys = [
def new_keys = [
"group_key": groupKey(meta["alias"], files.size()),
"n_fastq": files.size()]
grp_index = (0..<files.size()).collect()
def grp_index = (0..<files.size()).collect()
[meta + new_keys, files, grp_index, stats]
}
.transpose(by: [1, 2]) // spread multiple fastq files into separate emissions
.map { meta, files, grp_i, stats ->
new_keys = [
def new_keys = [
"group_index": "${meta["alias"]}_${grp_i}"]
[meta + new_keys, files, stats]
}
.map { meta, path, stats ->
[meta.findAll { it.key !in ['xai_fn', 'is_sorted'] }, path, stats]
[meta.findAll { it.key !in ['is_sorted', 'src_xam', 'src_xai'] }, path, stats]
}

// add number of reads, run IDs, and basecall models to meta
Expand All @@ -388,18 +390,26 @@ def xam_ingress(Map arguments)
| sortBam
| groupTuple
| mergeBams
| map{
meta, bam, bai ->
[meta + [src_xam: null, src_xai: null], bam, bai]
}

// now handle samples with too many files for `samtools merge`
ch_catsorted = ch_result.to_catsort
| catSortBams
| map{
meta, bam, bai ->
[meta + [src_xam: null, src_xai: null], bam, bai]
}

// Validate the index of the input BAM.
// If the input BAM index is invalid, regenerate it.
// First separate the BAM from the null input channels.
ch_to_validate = ch_result.indexed
| map{
meta, paths ->
bai = paths && meta.xai_fn ? file(meta.xai_fn) : null
def bai = paths && meta.src_xai ? file(meta.src_xai) : null
[meta, paths, bai]
}
| branch {
Expand Down Expand Up @@ -429,6 +439,10 @@ def xam_ingress(Map arguments)
ch_indexed = ch_result.to_index
| mix( ch_validated.invalid_idx )
| samtools_index
| map{
meta, bam, bai ->
[meta + [src_xai: null], bam, bai]
}

// Add extra null for the missing index to input.missing
// as well as the missing metadata.
Expand All @@ -439,7 +453,7 @@ def xam_ingress(Map arguments)
)
| map{
meta, paths ->
[meta + [xai_fn: null, is_sorted: false], paths, null]
[meta + [src_xam: null, src_xai: null, is_sorted: false], paths, null]
}

// Combine all possible inputs
Expand Down Expand Up @@ -480,7 +494,7 @@ def xam_ingress(Map arguments)
}

// Remove metadata that are unnecessary downstream:
// meta.xai_fn: not needed, as it will be part of the channel as a file
// meta.src_xai: not needed, as it will be part of the channel as a file
// meta.is_sorted: if data are aligned, they will also be sorted/indexed
//
// The output meta can contain the following flags:
Expand All @@ -498,7 +512,7 @@ def xam_ingress(Map arguments)
ch_result
| map{
meta, bam, bai, stats ->
[meta.findAll { it.key !in ['xai_fn', 'is_sorted'] }, [bam, bai], stats]
[meta.findAll { it.key !in ['is_sorted'] }, [bam, bai], stats]
},
"xam"
)
Expand All @@ -508,6 +522,19 @@ def xam_ingress(Map arguments)
| map{
it.flatten()
}
// Final check to ensure that src_xam/src_xai is not an s3
// path. If so, drop it. We check src_xam also for src_xai
// as, the latter is irrelevant if the former is in s3.
| map{
meta, bam, bai, stats ->
def xam = meta.src_xam
def xai = meta.src_xai
if (meta.src_xam){
xam = meta.src_xam.startsWith('s3://') ? null : meta.src_xam
xai = meta.src_xam.startsWith('s3://') ? null : meta.src_xai
}
[ meta + [src_xam: xam, src_xai: xai], bam, bai, stats ]
}

return ch_result
}
Expand Down Expand Up @@ -621,6 +648,8 @@ process validateIndex {
}


// Sort FOFN for samtools merge to ensure samtools sort breaks ties deterministically.
// Uses -c to ensure matching RG.IDs across multiple inputs are not unnecessarily modified to avoid collisions.
process mergeBams {
label "ingress"
label "wf_common"
Expand All @@ -632,11 +661,12 @@ process mergeBams {
def merge_threads = Math.max(1, task.cpus - 1)
"""
samtools merge -@ ${merge_threads} \
-b <(find input_bams -name 'reads*.bam') --write-index -o reads.bam##idx##reads.bam.bai
-c -b <(find input_bams -name 'reads*.bam' | sort) --write-index -o reads.bam##idx##reads.bam.bai
"""
}


// Sort FOFN for samtools cat to ensure samtools sort breaks ties deterministically.
process catSortBams {
label "ingress"
label "wf_common"
Expand All @@ -647,7 +677,7 @@ process catSortBams {
script:
def sort_threads = Math.max(1, task.cpus - 2)
"""
samtools cat -b <(find input_bams -name 'reads*.bam') \
samtools cat -b <(find input_bams -name 'reads*.bam' | sort) \
| samtools sort - -@ ${sort_threads} --write-index -o reads.bam##idx##reads.bam.bai
"""
}
Expand Down
19 changes: 12 additions & 7 deletions nextflow.config
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,10 @@ params {
"--scheme_name 'SARS-CoV-2'",
"--scheme_version 'Midnight-ONT/V3'"
]
common_sha = 'sha8b5843d549bb210558cbb676fe537a153ce771d6'
common_sha = 'shae58638742cf84dbeeec683ba24bcdee67f64b986'
container_sha = 'sha15e9dfa0469ddd0641dfe1a5f07bedb475a8a03d'
pangolin_sha = 'shae304dd3bc308a519f26908eb9d5ffa7686131d17'
nextclade3_sha = 'sha6913f7b3db54356e74fd984933278a66b0c1f286'
}
}

Expand All @@ -69,7 +70,7 @@ manifest {
description = 'Run the ARTIC SARS-CoV-2 methodology on multiplexed MinION, GridION, and PromethION data.'
mainScript = 'main.nf'
nextflowVersion = '>=23.04.2'
version = 'v1.2.0'
version = 'v1.2.1'
}

epi2melabs {
Expand All @@ -84,20 +85,20 @@ env {
}

process {
withLabel:wf_common {
withLabel:wf_common {
container = "ontresearch/wf-common:${params.wf.common_sha}"
memory = '1G'
}
withLabel:artic {
withLabel:artic {
container = "ontresearch/wf-artic:${params.wf.container_sha}"
memory = '2G'
memory = '2G'
}
withLabel:pangolin {
container = "ontresearch/pangolin:${params.wf.pangolin_sha}"
memory = '2G'
}
withLabel:nextclade {
container = "nextstrain/nextclade:3.2.1"
withLabel:nextclade {
container = "ontresearch/nextclade3:${params.wf.nextclade3_sha}"
memory = '1G'
}
shell = ['/bin/bash', '-euo', 'pipefail']
Expand Down Expand Up @@ -146,6 +147,10 @@ profiles {
container = "${params.aws_image_prefix}-pangolin:${params.wf.pangolin_sha}-root"
memory = '2G'
}
withLabel:nextclade {
container = "${params.aws_image_prefix}-nextclade3:${params.wf.nextclade3_sha}"
memory = '1G'
}
shell = ['/bin/bash', '-euo', 'pipefail']
}
}
Expand Down

0 comments on commit 8a668e5

Please sign in to comment.