Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pipeline): initial implementation for easy/preset pipeline execution mode added #345

Merged
merged 10 commits into from
Feb 3, 2024
24 changes: 24 additions & 0 deletions .github/workflows/workflow_modules.yml
Original file line number Diff line number Diff line change
Expand Up @@ -431,9 +431,33 @@ jobs:
MODULE_DB_TEST_YML_SCRIPT="./scripts/test_plasmids.sh" MODULE_DB_TEST_GENERATED_YML_DIR="${WORK_DIR}/plasmid_yaml_database_tests" \
MODULE_DB_TEST_REMOVE_DB="yes"

settings:
timeout-minutes: 2500
runs-on: [ self-hosted, slurm]
steps:
- uses: actions/checkout@v2
- name: Test whether settings in easy mode can be updated
run: |
VERSION=$(sort VERSIONS.txt | tail -n 1)
OUTPUT=outputEasy
bash ./scripts/test_settings.sh \
" --preset --scratch /vol/scratch --input.paired.path test_data/fullPipeline/reads_split.tsv --highmemLarge=28,2000 --s3SignIn false --databases=/vol/scratch/databases/ --output=${OUTPUT} " \
" " "${WORK_DIR}" ${PROFILE} ${VERSION} "preset" || exit 1
bash ./scripts/check_parameter.sh ${OUTPUT} || exit 1
- name: Test whether settings in default mode can be updated
run: |
VERSION=$(sort VERSIONS.txt | tail -n 1)
OUTPUT=outputDefault
bash ./scripts/test_settings.sh \
" --scratch /vol/scratch --resources.highmemLarge.memory=2000 --s3SignIn false --databases=/vol/scratch/databases/ --output=${OUTPUT} " \
"" "${WORK_DIR}" ${PROFILE} ${VERSION} "" || exit 1
bash ./scripts/check_parameter.sh ${OUTPUT} || exit 1



codePolicy:
runs-on: [ self-hosted, slurm]
needs: [settings]
steps:
- uses: actions/checkout@v2
- name: Checks if every process defines a publishDirMode
Expand Down
11 changes: 10 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ ifndef PARAMS_FILE
override PARAMS_FILE = ${CURRENT_DIR}/example_params/fullPipeline.yml
endif

ifdef PRESET
override PARAMS_FILE=""
endif

ifneq (${PARAMS_FILE}, "")
override PARAMS_COMMAND = -params-file ${PARAMS_FILE}
endif


ifndef BRANCH
override BRANCH = "dev"
endif
Expand Down Expand Up @@ -126,7 +135,7 @@ set_secrets: nextflow ## Set secrets for sensitive data access
NXF_HOME=$$PWD/.nextflow ./nextflow secrets set ${SECRET_NAME} ${SECRET_VALUE}

run_small_full_test: nextflow ## Prepares input files like downloading bins and reads and executes Nextflow. The default configuration it runs the full pipeline locally.
NXF_HOME=$$PWD/.nextflow ./nextflow run main.nf ${OPTIONS} -work-dir ${WORK_DIR} -profile ${PROFILE} -resume -entry ${ENTRY} -params-file ${PARAMS_FILE} --logDir ${LOG_DIR} ; exit $$?
NXF_HOME=$$PWD/.nextflow ./nextflow run main.nf ${OPTIONS} -work-dir ${WORK_DIR} -profile ${PROFILE} -resume -entry ${ENTRY} ${PARAMS_COMMAND} --logDir ${LOG_DIR} ; exit $$?


help: ## Lists available Makefile commands
Expand Down
9 changes: 8 additions & 1 deletion main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,13 @@ workflow _wConfigurePipeline {
}


workflow wSaveSettings {
inputSamples = wInputFile()
wSaveSettingsList(inputSamples | map { it -> it.SAMPLE })
}



def flattenBins(binning){
def chunkList = [];
def SAMPLE_IDX = 0;
Expand Down Expand Up @@ -494,7 +501,7 @@ workflow _wProcessOnt {
* Left and right read could be https, s3 links or file path.
*/
workflow wFullPipeline {

_wConfigurePipeline()

inputSamples = wInputFile()
Expand Down
249 changes: 242 additions & 7 deletions nextflow.config
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ MAX_RETRIES=3
defaultErrorStrategy = { if(task.attempt <= MAX_RETRIES){ sleep(Math.pow(2, task.attempt) * 2000 as long); return 'retry' } else { return 'ignore' } }

import java.util.regex.*;
import groovy.json.*
import org.yaml.snakeyaml.Yaml
import static groovy.json.JsonOutput.*



/*
* This method checks if a key exist before
Expand All @@ -21,6 +26,182 @@ manifest {
name = "Metagenomics-Tk"
}

errorMess = [
"databases": "",
"output": "",
"input": "",
"scratch": ""
]

PRETTY_SEPARATOR = "###################################################################"

System.out.println(PRETTY_SEPARATOR)
System.out.println("############# Metagenomics-TK ##############")
System.out.println(PRETTY_SEPARATOR)

System.out.println("Version: " + manifest.version)
System.out.println("Mode: " + (params.containsKey("preset") ? "preset" : "default"))

if("dry-run" in params.keySet()){
System.out.println("Dry-run is enabled!")
}


template = (params.containsKey("template") && params.containsKey("preset")) ? params.template : "default/fullPipeline_illumina_nanpore.yml"

defaultYmlFile = new File("${baseDir}/" + template)
preLoadYML = new Yaml().load(defaultYmlFile.text)




/*
*
* Filter and compile steps (modules) that are provided by the user
*
*/
def getPresetSteps(){
disableModules = ["metabolomics", "annotation", "fragmentRecruitment"]
steps = preLoadYML["steps"]

stepsKeys = steps.keySet()

def stepsMap = [:]

System.out.println(PRETTY_SEPARATOR)
System.out.println("The following modules will be executed:\n")
for(step in stepsKeys){
if(!params.keySet().contains("no-" + step) && ! disableModules.contains(step)){
System.out.println(step)
stepsMap[step] = steps[step]
}
}
if("dry-run" in params.keySet()){
System.exit(0)
} else {
return stepsMap
}
}

/*
* Checks if the "preset" mode is provided by the user and calls an input parameter closure. Example closure could
* fetch parameters set in the default yaml.
*/
def getParamsVariable(parameter, Closure handleParameters = null, String finalParameterPreset = null, String finalParameter = null){
def cl = null

// If no closure is provided then fetch the parameter from the pre loaded yml which is saved in the
// toolkit repository.
if(handleParameters==null){
cl = { parameterClosure -> preLoadYML[parameterClosure] }
} else {
cl = handleParameters
}

// If the preset mode is enabled then call the closure, if not then
// use the parameters specified in the provided yaml.
if(params.containsKey("preset")){
if(finalParameterPreset==null){
return cl(parameter);
} else {
return finalParameterPreset;
}
} else {
if(params.containsKey(parameter)){
return params[parameter];
} else {
return finalParameter;
}
}
}

/*
* This method sets the database directory.
*/
def getPresetDatabases(){
if(!params.containsKey("databases")){
msg = "ERROR: No databases parameter provided!"
errorMess["databases"] = msg
} else {
System.out.println("Databases directory: " + params.databases)
return params.databases
}

}

/*
* This method sets the scratch directory.
*/
def getPresetScratch(){
if(!params.containsKey("scratch")){
msg = "ERROR: No scratch parameter provided!"
errorMess["scratch"] = msg
} else {
System.out.println("Scratch directory: " + params.scratch)
return params.scratch
}
}


/*
* This method checks if the user has provided an output parameter.
*/
def getPresetOutput(){
if(!params.containsKey("output")){
msg = "ERROR: No output parameter provided!"
errorMess["output"] = msg
} else {
System.out.println("Output path: " + params.output)
return params.output
}
}


/*
* Retrieves input parameters specified by the user (Paired vs. ONT parameter).
*/
def getPresetInput(){
System.out.println(PRETTY_SEPARATOR)

if(!params?.input.containsKey("ont") && !params?.input.containsKey("paired")){
msg = "ERROR: No valid input parameter provided!"
errorMess["input"] = msg
return []
} else {
System.out.println("Input parameters are the following: \n")
System.out.println(prettyPrint(toJson(params.input)))
return params.input
}
}


/*
* Resources can be defined in preset mode using the --flavorSize cpus,memory parameter.
*
*/
def getPresetResources(){
def resources = preLoadYML["resources"]
System.out.println(PRETTY_SEPARATOR)
System.out.println("The following job flavors are defined:\n")

def CPUS_KEY = 0
def MEMORY_KEY = 1

for(flavorSize in resources.keySet()){
cpus = resources[flavorSize]["cpus"]
ram = resources[flavorSize]["memory"]

if(params.containsKey(flavorSize)){
cpus = params[flavorSize].split(",")[CPUS_KEY] as Integer
ram = params[flavorSize].split(",")[MEMORY_KEY] as Integer
resources[flavorSize]["cpus"] = cpus
resources[flavorSize]["memory"] = ram
}
System.out.println("Flavor:" + flavorSize + ", CPUs:" + cpus + ", Memory:" + ram)
}
return resources
}


defaultResources {
highmemLarge {
Expand Down Expand Up @@ -52,12 +233,23 @@ defaultResources {

params {

tempdir = getParamsVariable("tempdir")
summary = getParamsVariable("summary")
input = getParamsVariable("input", { _ -> getPresetInput() })
output = getParamsVariable("output", { _ -> getPresetOutput() })
logDir = getParamsVariable( "logDir", null, "log" )
runid = getParamsVariable("runid")
databases = getParamsVariable("databases", { _ -> getPresetDatabases() }, null, "")
publishDirMode = getParamsVariable("publishDirMode", null, "symlink", "symlink")
logLevel = getParamsVariable("logLevel", null, "1", "1")
scratch = getParamsVariable("scratch", { _ -> getPresetScratch() })

steps = getParamsVariable("steps", { _ -> getPresetSteps() } )

polished {
databases = checkParamsKey(params, "databases", { path -> path.endsWith("/") ? path : path + "/" })
databases = checkParamsKey(params, "databases", { path -> (path.endsWith("/") || !path) ? path : path + "/" })
}

logDir = "log"

logFileName = ""

supportedVersions = [
Expand All @@ -66,8 +258,6 @@ params {

skipVersionCheck = false

publishDirMode = "symlink"

pysradb_image = "quay.io/biocontainers/pysradb:1.4.1--pyhdfd78af_0"
minimap2_image= "quay.io/biocontainers/minimap2:2.24--h7132678_1"
metaflye_image = "quay.io/biocontainers/flye:2.9--py36h7281c5b_1"
Expand Down Expand Up @@ -115,7 +305,7 @@ params {
magscot_image = "quay.io/metagenomics/toolkit-magscot:1.0.0"
kmc_image = "quay.io/biocontainers/kmc:3.2.1--hf1761c0_2"

resources = defaultResources
resources = params.containsKey("preset") ? getPresetResources() : defaultResources
publishDirMode = "symlink"


Expand Down Expand Up @@ -304,7 +494,8 @@ params {
}
}
}
}
}

}


Expand Down Expand Up @@ -428,6 +619,50 @@ profiles {
}
}

if(params.containsKey("help") || !errorMess.values().every{ it.isEmpty() }){
System.out.println(PRETTY_SEPARATOR)
System.out.println("Help Page:\n")

System.out.println("Mandatory Parameters:")
def additionalParameters = ["databases": "\t\tPath to a folder where databases are downloaded and extracted. \n" +
"\t\t\tIf you are using slurm then the path should point to a folder which is local to the worker host and not shared by all workers.",
"scratch": "\t\tScratch directory which is used for storing intermediate results.",
"output": "\t\tOutput directory path or S3 url.",
"input.ont.path": "\tPath to a samplesheet containing the two columns: SAMPLE and PATH.\n" +
"\t\t\tSAMPLE contains the id of the dataset and the PATH column contains the path or url that points to the nanopore datasets.",
"input.paired.path": "\tPath that points to a samplesheet with the required columns SAMPLE, READS1 and READS2.\n" +
"\t\t\tREADS1 and READS2 point to paths or urls of the input datasets."]

for(key in additionalParameters.keySet()){
System.out.println("--" + key + ": " + additionalParameters[key])
}
System.out.println("")

System.out.println("Optional Parameters:")

System.out.println("Possible resource settings with the values cpus and ram.")
System.out.println("Example: --tiny 1,4 means 1 cpu and 4 GB RAM.")
resourceKeys = params.resources.keySet()
for(key in resourceKeys){
System.out.println("--" + key)
}

modulesKeys = params.steps.keySet()

System.out.println("")
System.out.println("You can disable modules via the following parameters:")
for(key in modulesKeys){
System.out.println("--no-" + key)
}

if(!params.containsKey("help")){
for(err in errorMess.keySet()){
System.err.println(errorMess[err])
}
}
System.exit(0)
}

aws {
accessKey = params.s3SignIn ? secrets.S3_ACCESS : ""
secretKey = params.s3SignIn ? secrets.S3_SECRET : ""
Expand Down
10 changes: 10 additions & 0 deletions scripts/check_parameter.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
OUTPUT=$1

EXPECTED_MEMORY=2000
MEMORY=$(find $OUTPUT -name 'params_*' | head -n 1 | xargs -I {} yq '.resources | .highmemLarge | .memory' {})

if [ "$MEMORY" -eq "$EXPECTED_MEMORY" ]; then
exit 0
else
exit 1
fi
Loading
Loading