Skip to content

Guided Example ‐ General pipeline

Mahesh Binzer-Panchal edited this page Jan 14, 2025 · 4 revisions

A Guided Example

This form is a little more complex. See the nf-core example first.

Copy the module

Start out by creating a module file with the contents of the NEXTFLOW_RUN process

Warning

The input channels are different from the nf-core example.

modules/local/nextflow/run/main.nf:

process NEXTFLOW_RUN {
    tag "$pipeline_name"

    input:
    val pipeline_name     // String
    val nextflow_opts     // String
    val nextflow_files    // Map [ params-file: params.yml , c: configs/multiqc.config ]
    val pipeline_files    // Map [ input: samplesheet.csv ]

    when:
    task.ext.when == null || task.ext.when

    exec:
    def cache_dir = java.nio.file.Paths.get(workflow.workDir.resolve(pipeline_name).toUri())
    java.nio.file.Files.createDirectories(cache_dir)
    def nxf_cmd = [
        'nextflow run',
            pipeline_name,
            nextflow_opts,
            nextflow_files ? nextflow_files.collect{ key, value -> "-$key $value" }.join(' ') : '',
            pipeline_files ? pipeline_files.collect{ key, value -> "--$key $value" }.join(' ') : '',
            "--outdir $task.workDir/results",
    ]
    file("$task.workDir/nf-cmd.sh").text = nxf_cmd.join(" ")
    def builder = new ProcessBuilder(nxf_cmd.join(" ").tokenize(" "))
    builder.directory(cache_dir.toFile())
    process = builder.start()
    assert process.waitFor() == 0: process.text
    file("${cache_dir.toString()}/.nextflow.log").copyTo("$task.workDir/.nextflow.log")

    output:
    path "results"  , emit: output
    val process.text, emit: log
}

This module builds the command line instruction (for nf-core style workflows):

nextflow run $pipeline_name $nextflow_opts [-key $nextflow_file] [--key $pipeline_file] --outdir $task.workDir/results

and then runs it. All the workflow outputs are put in the folder results in the Nextflow working directory.

Include the module in your workflow

main.nf:

include { NEXTFLOW_RUN as NFCORE_DEMO } from "./modules/local/nextflow/run/main"

workflow {
    NFCORE_DEMO (
        'nf-core/demo',            // Select nf-core pipeline
        params.nfcore_demo_opts,   // workflow opts supplied as params for flexibility
        Channel.value([            // Nextflow files
            'params-file': params.nfcore_demo_params_file,
            'c': params.nfcore_demo_add_config,
        ]).map { nxf_files -> nxf_files.collectEntries { key, path -> [ (key): file( path , checkIfExists: true ) ] } },
        Channel.value([            // Pipeline files
             'input' : params.nfcore_demo_samplesheet,
        ]).map { pln_files -> pln_files.collectEntries { key, path -> [ (key): file( path , checkIfExists: true ) ] } }
    )
}

Here we've selected the nf-core workflow nf-core/demo to include.

  1. Include the NEXTFLOW_RUN module using the include keyword, and say where it's located using from.
  2. The module is aliased ( given another name ) using as to allow for readability, and extensibility ( inclusion of other workflows later on).
  3. The module is then added to the workflow, using the name NFCORE_DEMO.
  4. The first channel input is nf-core/demo, the name of the pipeline we would like to run. This is implicitly converted to Channel.value('nf-core/demo').
  5. The second channel input is a string supplied by params.nf-core_demo_opts which supplies extra workflow options such as -resume, -ansi-log false, -profile docker,test, etc. You can include multiple nextflow options here, e.g. "-resume -profile docker,test".
  6. The third channel input is a map of files that Nextflow should use. The file paths are initially String Objects which need to be converted to Path Objects. The module then converts these key value pairs to -key value in the command, i.e. file args for nextflow run. So, for example, the map [ 'params-file' : 'inputs/params.yml' ] gets converted to -params-file inputs/params.yml.
  7. The fourth channel input is also a map of files that the pipeline should use (i.e., are supplied using --param). So, for example, [ input: 'samplesheet.csv' ] becomes --input samplesheet.csv in the module.

Lastly, create a nextflow.config and add process.errorStrategy = 'finish'. Without this, if a pipeline errors, any concurrently running workflows will be killed immediately leaving a nextflow lock file in place preventing the workflow from resuming.

Running the first module

You can test the first module by doing:

nextflow run main.nf -params-file params.yml

where params.yml looks like:

nfcore_demo_opts: '-resume -profile docker'
nfcore_demo_params_file: '/path/to/nfcore/demo/params.yml' # Generate with `nf-core launch`
nfcore_demo_samplesheet: '/path/to/samplesheet/input.csv' # The samplesheet
nfcore_demo_add_config: '/path/to/nf-core/demo/custom.config' # Set configuration, e.g. resources, for nf-core/demo

Connecting to another module.

To connect another module you need to create a Channel with the map of appropriate entries. Then merge that map with any overriding inputs.

NFCORE_DEMO.out.output
    .map { dir -> [ 'input' : file(dir.resolve('/path/to/samplesheet'), checkIfExists: true) ] } // Construct a Map of pipeline files
    .map { pipeline_file_map -> 
         pipeline_file_map + [ // Add in any overrides
             input: params.nfnext_samplesheet,
         ].findAll { kvpair -> kvpair.value } // Use groovy truth to find entries with a overriding setting
         .collectEntries { key, path -> [ (key): file( path , checkIfExists: true ) ] } // Convert those `String`s to `Path`s
         // The + operator on maps merges two maps. If the same key exists in both maps, the right hand side will be the result.
    }
    .set { nfnext_pipeline_files }