Skip to content

Commit

Permalink
feat(pipeline): initial implementation for easy/preset pipeline execu…
Browse files Browse the repository at this point in the history
…tion mode added (#345)

* feat(pipeline): initial implementation for easy/preset pipeline execution mode added

* fix(config): allow to disable preset mode

* feat(usage): add initial tests for preset usage

* fix(tests): use correct preset commands

* fix(tests): add wrapped test script for settings

* fix(test): remove log check

* fix(test): remove check parameter

* fix(test): set config yaml only if preset is disabled
  • Loading branch information
pbelmann authored Feb 3, 2024
1 parent 172fef4 commit 85af85e
Show file tree
Hide file tree
Showing 6 changed files with 314 additions and 9 deletions.
24 changes: 24 additions & 0 deletions .github/workflows/workflow_modules.yml
Original file line number Diff line number Diff line change
Expand Up @@ -431,9 +431,33 @@ jobs:
MODULE_DB_TEST_YML_SCRIPT="./scripts/test_plasmids.sh" MODULE_DB_TEST_GENERATED_YML_DIR="${WORK_DIR}/plasmid_yaml_database_tests" \
MODULE_DB_TEST_REMOVE_DB="yes"
settings:
timeout-minutes: 2500
runs-on: [ self-hosted, slurm]
steps:
- uses: actions/checkout@v2
- name: Test whether settings in easy mode can be updated
run: |
VERSION=$(sort VERSIONS.txt | tail -n 1)
OUTPUT=outputEasy
bash ./scripts/test_settings.sh \
" --preset --scratch /vol/scratch --input.paired.path test_data/fullPipeline/reads_split.tsv --highmemLarge=28,2000 --s3SignIn false --databases=/vol/scratch/databases/ --output=${OUTPUT} " \
" " "${WORK_DIR}" ${PROFILE} ${VERSION} "preset" || exit 1
bash ./scripts/check_parameter.sh ${OUTPUT} || exit 1
- name: Test whether settings in default mode can be updated
run: |
VERSION=$(sort VERSIONS.txt | tail -n 1)
OUTPUT=outputDefault
bash ./scripts/test_settings.sh \
" --scratch /vol/scratch --resources.highmemLarge.memory=2000 --s3SignIn false --databases=/vol/scratch/databases/ --output=${OUTPUT} " \
"" "${WORK_DIR}" ${PROFILE} ${VERSION} "" || exit 1
bash ./scripts/check_parameter.sh ${OUTPUT} || exit 1
codePolicy:
runs-on: [ self-hosted, slurm]
needs: [settings]
steps:
- uses: actions/checkout@v2
- name: Checks if every process defines a publishDirMode
Expand Down
11 changes: 10 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ ifndef PARAMS_FILE
override PARAMS_FILE = ${CURRENT_DIR}/example_params/fullPipeline.yml
endif

ifdef PRESET
override PARAMS_FILE=""
endif

ifneq (${PARAMS_FILE}, "")
override PARAMS_COMMAND = -params-file ${PARAMS_FILE}
endif


ifndef BRANCH
override BRANCH = "dev"
endif
Expand Down Expand Up @@ -126,7 +135,7 @@ set_secrets: nextflow ## Set secrets for sensitive data access
NXF_HOME=$$PWD/.nextflow ./nextflow secrets set ${SECRET_NAME} ${SECRET_VALUE}

run_small_full_test: nextflow ## Prepares input files like downloading bins and reads and executes Nextflow. The default configuration it runs the full pipeline locally.
NXF_HOME=$$PWD/.nextflow ./nextflow run main.nf ${OPTIONS} -work-dir ${WORK_DIR} -profile ${PROFILE} -resume -entry ${ENTRY} -params-file ${PARAMS_FILE} --logDir ${LOG_DIR} ; exit $$?
NXF_HOME=$$PWD/.nextflow ./nextflow run main.nf ${OPTIONS} -work-dir ${WORK_DIR} -profile ${PROFILE} -resume -entry ${ENTRY} ${PARAMS_COMMAND} --logDir ${LOG_DIR} ; exit $$?


help: ## Lists available Makefile commands
Expand Down
9 changes: 8 additions & 1 deletion main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,13 @@ workflow _wConfigurePipeline {
}


workflow wSaveSettings {
inputSamples = wInputFile()
wSaveSettingsList(inputSamples | map { it -> it.SAMPLE })
}



def flattenBins(binning){
def chunkList = [];
def SAMPLE_IDX = 0;
Expand Down Expand Up @@ -494,7 +501,7 @@ workflow _wProcessOnt {
* Left and right read could be https, s3 links or file path.
*/
workflow wFullPipeline {

_wConfigurePipeline()

inputSamples = wInputFile()
Expand Down
249 changes: 242 additions & 7 deletions nextflow.config
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ MAX_RETRIES=3
defaultErrorStrategy = { if(task.attempt <= MAX_RETRIES){ sleep(Math.pow(2, task.attempt) * 2000 as long); return 'retry' } else { return 'ignore' } }

import java.util.regex.*;
import groovy.json.*
import org.yaml.snakeyaml.Yaml
import static groovy.json.JsonOutput.*



/*
* This method checks if a key exist before
Expand All @@ -21,6 +26,182 @@ manifest {
name = "Metagenomics-Tk"
}

errorMess = [
"databases": "",
"output": "",
"input": "",
"scratch": ""
]

PRETTY_SEPARATOR = "###################################################################"

System.out.println(PRETTY_SEPARATOR)
System.out.println("############# Metagenomics-TK ##############")
System.out.println(PRETTY_SEPARATOR)

System.out.println("Version: " + manifest.version)
System.out.println("Mode: " + (params.containsKey("preset") ? "preset" : "default"))

if("dry-run" in params.keySet()){
System.out.println("Dry-run is enabled!")
}


template = (params.containsKey("template") && params.containsKey("preset")) ? params.template : "default/fullPipeline_illumina_nanpore.yml"

defaultYmlFile = new File("${baseDir}/" + template)
preLoadYML = new Yaml().load(defaultYmlFile.text)




/*
*
* Filter and compile steps (modules) that are provided by the user
*
*/
def getPresetSteps(){
disableModules = ["metabolomics", "annotation", "fragmentRecruitment"]
steps = preLoadYML["steps"]

stepsKeys = steps.keySet()

def stepsMap = [:]

System.out.println(PRETTY_SEPARATOR)
System.out.println("The following modules will be executed:\n")
for(step in stepsKeys){
if(!params.keySet().contains("no-" + step) && ! disableModules.contains(step)){
System.out.println(step)
stepsMap[step] = steps[step]
}
}
if("dry-run" in params.keySet()){
System.exit(0)
} else {
return stepsMap
}
}

/*
* Checks if the "preset" mode is provided by the user and calls an input parameter closure. Example closure could
* fetch parameters set in the default yaml.
*/
def getParamsVariable(parameter, Closure handleParameters = null, String finalParameterPreset = null, String finalParameter = null){
def cl = null

// If no closure is provided then fetch the parameter from the pre loaded yml which is saved in the
// toolkit repository.
if(handleParameters==null){
cl = { parameterClosure -> preLoadYML[parameterClosure] }
} else {
cl = handleParameters
}

// If the preset mode is enabled then call the closure, if not then
// use the parameters specified in the provided yaml.
if(params.containsKey("preset")){
if(finalParameterPreset==null){
return cl(parameter);
} else {
return finalParameterPreset;
}
} else {
if(params.containsKey(parameter)){
return params[parameter];
} else {
return finalParameter;
}
}
}

/*
* This method sets the database directory.
*/
def getPresetDatabases(){
if(!params.containsKey("databases")){
msg = "ERROR: No databases parameter provided!"
errorMess["databases"] = msg
} else {
System.out.println("Databases directory: " + params.databases)
return params.databases
}

}

/*
* This method sets the scratch directory.
*/
def getPresetScratch(){
if(!params.containsKey("scratch")){
msg = "ERROR: No scratch parameter provided!"
errorMess["scratch"] = msg
} else {
System.out.println("Scratch directory: " + params.scratch)
return params.scratch
}
}


/*
* This method checks if the user has provided an output parameter.
*/
def getPresetOutput(){
if(!params.containsKey("output")){
msg = "ERROR: No output parameter provided!"
errorMess["output"] = msg
} else {
System.out.println("Output path: " + params.output)
return params.output
}
}


/*
* Retrieves input parameters specified by the user (Paired vs. ONT parameter).
*/
def getPresetInput(){
System.out.println(PRETTY_SEPARATOR)

if(!params?.input.containsKey("ont") && !params?.input.containsKey("paired")){
msg = "ERROR: No valid input parameter provided!"
errorMess["input"] = msg
return []
} else {
System.out.println("Input parameters are the following: \n")
System.out.println(prettyPrint(toJson(params.input)))
return params.input
}
}


/*
* Resources can be defined in preset mode using the --flavorSize cpus,memory parameter.
*
*/
def getPresetResources(){
def resources = preLoadYML["resources"]
System.out.println(PRETTY_SEPARATOR)
System.out.println("The following job flavors are defined:\n")

def CPUS_KEY = 0
def MEMORY_KEY = 1

for(flavorSize in resources.keySet()){
cpus = resources[flavorSize]["cpus"]
ram = resources[flavorSize]["memory"]

if(params.containsKey(flavorSize)){
cpus = params[flavorSize].split(",")[CPUS_KEY] as Integer
ram = params[flavorSize].split(",")[MEMORY_KEY] as Integer
resources[flavorSize]["cpus"] = cpus
resources[flavorSize]["memory"] = ram
}
System.out.println("Flavor:" + flavorSize + ", CPUs:" + cpus + ", Memory:" + ram)
}
return resources
}


defaultResources {
highmemLarge {
Expand Down Expand Up @@ -52,12 +233,23 @@ defaultResources {

params {

tempdir = getParamsVariable("tempdir")
summary = getParamsVariable("summary")
input = getParamsVariable("input", { _ -> getPresetInput() })
output = getParamsVariable("output", { _ -> getPresetOutput() })
logDir = getParamsVariable( "logDir", null, "log" )
runid = getParamsVariable("runid")
databases = getParamsVariable("databases", { _ -> getPresetDatabases() }, null, "")
publishDirMode = getParamsVariable("publishDirMode", null, "symlink", "symlink")
logLevel = getParamsVariable("logLevel", null, "1", "1")
scratch = getParamsVariable("scratch", { _ -> getPresetScratch() })

steps = getParamsVariable("steps", { _ -> getPresetSteps() } )

polished {
databases = checkParamsKey(params, "databases", { path -> path.endsWith("/") ? path : path + "/" })
databases = checkParamsKey(params, "databases", { path -> (path.endsWith("/") || !path) ? path : path + "/" })
}

logDir = "log"

logFileName = ""

supportedVersions = [
Expand All @@ -66,8 +258,6 @@ params {

skipVersionCheck = false

publishDirMode = "symlink"

pysradb_image = "quay.io/biocontainers/pysradb:1.4.1--pyhdfd78af_0"
minimap2_image= "quay.io/biocontainers/minimap2:2.24--h7132678_1"
metaflye_image = "quay.io/biocontainers/flye:2.9--py36h7281c5b_1"
Expand Down Expand Up @@ -115,7 +305,7 @@ params {
magscot_image = "quay.io/metagenomics/toolkit-magscot:1.0.0"
kmc_image = "quay.io/biocontainers/kmc:3.2.1--hf1761c0_2"

resources = defaultResources
resources = params.containsKey("preset") ? getPresetResources() : defaultResources
publishDirMode = "symlink"


Expand Down Expand Up @@ -304,7 +494,8 @@ params {
}
}
}
}
}

}


Expand Down Expand Up @@ -428,6 +619,50 @@ profiles {
}
}

if(params.containsKey("help") || !errorMess.values().every{ it.isEmpty() }){
System.out.println(PRETTY_SEPARATOR)
System.out.println("Help Page:\n")

System.out.println("Mandatory Parameters:")
def additionalParameters = ["databases": "\t\tPath to a folder where databases are downloaded and extracted. \n" +
"\t\t\tIf you are using slurm then the path should point to a folder which is local to the worker host and not shared by all workers.",
"scratch": "\t\tScratch directory which is used for storing intermediate results.",
"output": "\t\tOutput directory path or S3 url.",
"input.ont.path": "\tPath to a samplesheet containing the two columns: SAMPLE and PATH.\n" +
"\t\t\tSAMPLE contains the id of the dataset and the PATH column contains the path or url that points to the nanopore datasets.",
"input.paired.path": "\tPath that points to a samplesheet with the required columns SAMPLE, READS1 and READS2.\n" +
"\t\t\tREADS1 and READS2 point to paths or urls of the input datasets."]

for(key in additionalParameters.keySet()){
System.out.println("--" + key + ": " + additionalParameters[key])
}
System.out.println("")

System.out.println("Optional Parameters:")

System.out.println("Possible resource settings with the values cpus and ram.")
System.out.println("Example: --tiny 1,4 means 1 cpu and 4 GB RAM.")
resourceKeys = params.resources.keySet()
for(key in resourceKeys){
System.out.println("--" + key)
}

modulesKeys = params.steps.keySet()

System.out.println("")
System.out.println("You can disable modules via the following parameters:")
for(key in modulesKeys){
System.out.println("--no-" + key)
}

if(!params.containsKey("help")){
for(err in errorMess.keySet()){
System.err.println(errorMess[err])
}
}
System.exit(0)
}

aws {
accessKey = params.s3SignIn ? secrets.S3_ACCESS : ""
secretKey = params.s3SignIn ? secrets.S3_SECRET : ""
Expand Down
10 changes: 10 additions & 0 deletions scripts/check_parameter.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
OUTPUT=$1

EXPECTED_MEMORY=2000
MEMORY=$(find $OUTPUT -name 'params_*' | head -n 1 | xargs -I {} yq '.resources | .highmemLarge | .memory' {})

if [ "$MEMORY" -eq "$EXPECTED_MEMORY" ]; then
exit 0
else
exit 1
fi
Loading

0 comments on commit 85af85e

Please sign in to comment.