From 8e89425e0f8ded08e7f55c24b6d6af8548deebd1 Mon Sep 17 00:00:00 2001 From: Lukas Forer Date: Wed, 30 Aug 2023 12:30:31 +0200 Subject: [PATCH] Fix issues in channel serialization (#112) --- .../nf/test/lang/process/WorkflowMock.nf | 69 ++++++++++-------- .../nf/test/lang/workflow/WorkflowMock.nf | 72 +++++++++++-------- .../askimed/nf/test/lang/WorkflowTest.java | 17 +++++ .../workflow/hanging/meaningless_workflow.nf | 10 +++ .../hanging/meaningless_workflow.nf.test | 28 ++++++++ test-data/workflow/unnamed/trial.unnamed.nf | 30 ++++++++ .../workflow/unnamed/trial.unnamed.nf.test | 26 +++++++ 7 files changed, 191 insertions(+), 61 deletions(-) create mode 100644 test-data/workflow/hanging/meaningless_workflow.nf create mode 100644 test-data/workflow/hanging/meaningless_workflow.nf.test create mode 100644 test-data/workflow/unnamed/trial.unnamed.nf create mode 100644 test-data/workflow/unnamed/trial.unnamed.nf.test diff --git a/src/main/resources/com/askimed/nf/test/lang/process/WorkflowMock.nf b/src/main/resources/com/askimed/nf/test/lang/process/WorkflowMock.nf index ee875334..c056cd2e 100644 --- a/src/main/resources/com/askimed/nf/test/lang/process/WorkflowMock.nf +++ b/src/main/resources/com/askimed/nf/test/lang/process/WorkflowMock.nf @@ -3,8 +3,7 @@ import groovy.json.JsonGenerator.Converter nextflow.enable.dsl=2 - -// comes from testflight to find json files +// comes from nf-test to store json files params.nf_test_output = "" // process mapping @@ -15,7 +14,6 @@ ${mapping} // include test process include { ${process} } from '${script}' - // define custom rules for JSON that will be generated. def jsonOutput = new JsonGenerator.Options() @@ -26,42 +24,53 @@ def jsonOutput = workflow { - ${process}(*input) + //run process + ${process}(*input) - if (${process}.output){ - // consumes all output channels and stores items in a json - def channel = Channel.empty() - for (def name in ${process}.out.getNames()) { - channel << tuple(name, ${process}.out.getProperty(name)) - } - - def array = ${process}.out as Object[] - for (def i = 0; i < array.length ; i++) { - channel << tuple(i, array[i]) - } + if (${process}.output){ + + // consumes all named output channels and stores items in a json file + for (def name in ${process}.out.getNames()) { + serializeChannel(name, ${process}.out.getProperty(name), jsonOutput) + } + + // consumes all unnamed output channels and stores items in a json file + def array = ${process}.out as Object[] + for (def i = 0; i < array.length ; i++) { + serializeChannel(i, array[i], jsonOutput) + } - channel.subscribe { outputTupel -> - def sortedList = outputTupel[1].toList() - sortedList.subscribe { list -> - def map = new HashMap() - def outputName = outputTupel[0] - map[outputName] = list - new File("\${params.nf_test_output}/output_\${outputName}.json").text = jsonOutput.toJson(map) - } } - } } +def serializeChannel(name, channel, jsonOutput) { + def _name = name + println "Process channel \${_name}..." + def list = [ ] + channel.subscribe( + onNext: { + list.add(it) + }, + onComplete: { + def map = new HashMap() + map[_name] = list + def filename = "\${params.nf_test_output}/output_\${_name}.json" + new File(filename).text = jsonOutput.toJson(map) + println "Wrote channel \${_name} to \${filename}" + } + ) +} + workflow.onComplete { - def result = [ - success: workflow.success, - exitStatus: workflow.exitStatus, - errorMessage: workflow.errorMessage, - errorReport: workflow.errorReport - ] + def result = [ + success: workflow.success, + exitStatus: workflow.exitStatus, + errorMessage: workflow.errorMessage, + errorReport: workflow.errorReport + ] new File("\${params.nf_test_output}/workflow.json").text = jsonOutput.toJson(result) } \ No newline at end of file diff --git a/src/main/resources/com/askimed/nf/test/lang/workflow/WorkflowMock.nf b/src/main/resources/com/askimed/nf/test/lang/workflow/WorkflowMock.nf index 2ce3b99a..c49919ea 100644 --- a/src/main/resources/com/askimed/nf/test/lang/workflow/WorkflowMock.nf +++ b/src/main/resources/com/askimed/nf/test/lang/workflow/WorkflowMock.nf @@ -3,8 +3,7 @@ import groovy.json.JsonGenerator.Converter nextflow.enable.dsl=2 - -// comes from testflight to find json files +// comes from nf-test to store json files params.nf_test_output = "" // process mapping @@ -25,42 +24,53 @@ def jsonOutput = workflow { - ${workflow}(*input) - - if (${workflow}.output){ - // consumes all output channels and stores items in a json - def channel = Channel.empty() - for (def name in ${workflow}.out.getNames()) { - channel << tuple(name, ${workflow}.out.getProperty(name)) - } + //run workflow + ${workflow}(*input) + + if (${workflow}.output){ + // consumes all named output channels and stores items in a json file + for (def name in ${workflow}.out.getNames()) { + serializeChannel(name, ${workflow}.out.getProperty(name), jsonOutput) + } - def array = ${workflow}.out as Object[] - for (def i = 0; i < array.length ; i++) { - channel << tuple(i, array[i]) - } - - channel.subscribe { outputTupel -> - def sortedList = outputTupel[1].toList() - sortedList.subscribe { list -> - def map = new HashMap() - def outputName = outputTupel[0] - map[outputName] = list - new File("\${params.nf_test_output}/output_\${outputName}.json").text = jsonOutput.toJson(map) - } - } - } + // consumes all unnamed output channels and stores items in a json file + def array = ${workflow}.out as Object[] + for (def i = 0; i < array.length ; i++) { + serializeChannel(i, array[i], jsonOutput) + } + + } +} + + +def serializeChannel(name, channel, jsonOutput) { + def _name = name + println "Process channel \${_name}..." + def list = [ ] + channel.subscribe( + onNext: { + list.add(it) + }, + onComplete: { + def map = new HashMap() + map[_name] = list + def filename = "\${params.nf_test_output}/output_\${_name}.json" + new File(filename).text = jsonOutput.toJson(map) + println "Wrote channel \${_name} to \${filename}" + } + ) } workflow.onComplete { - def result = [ - success: workflow.success, - exitStatus: workflow.exitStatus, - errorMessage: workflow.errorMessage, - errorReport: workflow.errorReport - ] + def result = [ + success: workflow.success, + exitStatus: workflow.exitStatus, + errorMessage: workflow.errorMessage, + errorReport: workflow.errorReport + ] new File("\${params.nf_test_output}/workflow.json").text = jsonOutput.toJson(result) } \ No newline at end of file diff --git a/src/test/java/com/askimed/nf/test/lang/WorkflowTest.java b/src/test/java/com/askimed/nf/test/lang/WorkflowTest.java index fb5c7f9b..d8086427 100644 --- a/src/test/java/com/askimed/nf/test/lang/WorkflowTest.java +++ b/src/test/java/com/askimed/nf/test/lang/WorkflowTest.java @@ -43,7 +43,15 @@ public void testWorkflowWithRelativePath() throws Exception { assertEquals(0, exitCode); } + + @Test + public void testWorkflowUnamedOutputs() throws Exception { + App app = new App(); + int exitCode = app.run(new String[] { "test", "test-data/workflow/unnamed/trial.unnamed.nf.test" }); + assertEquals(0, exitCode); + } + @Test public void testWorkflowWithNoOutputs() throws Exception { @@ -88,5 +96,14 @@ public void testParamsIssue34Setup() throws Exception { assertEquals(0, exitCode); } + + @Test + public void testHangingWorkflowIssue57() throws Exception { + + App app = new App(); + int exitCode = app.run(new String[] { "test", "test-data/workflow/hanging/meaningless_workflow.nf.test","--debug"}); + assertEquals(0, exitCode); + + } } diff --git a/test-data/workflow/hanging/meaningless_workflow.nf b/test-data/workflow/hanging/meaningless_workflow.nf new file mode 100644 index 00000000..431cfec8 --- /dev/null +++ b/test-data/workflow/hanging/meaningless_workflow.nf @@ -0,0 +1,10 @@ +workflow PipeWf { + take: + inputCh + + main: + inputCh.set { outputCh } + + emit: + outputCh +} \ No newline at end of file diff --git a/test-data/workflow/hanging/meaningless_workflow.nf.test b/test-data/workflow/hanging/meaningless_workflow.nf.test new file mode 100644 index 00000000..fe129644 --- /dev/null +++ b/test-data/workflow/hanging/meaningless_workflow.nf.test @@ -0,0 +1,28 @@ +nextflow_workflow { + + name "Test workflow" + script "test-data/workflow/hanging/meaningless_workflow.nf" + workflow "PipeWf" + + test("PipeWf will hang") { + + when { + workflow { + """ + input[0] = Channel.from([ + [ + ["patientID": "patientA"], + 'test_file_1.txt' + ] + ]) + """ + } + } + + then { + assert workflow.success + } + + } + +} \ No newline at end of file diff --git a/test-data/workflow/unnamed/trial.unnamed.nf b/test-data/workflow/unnamed/trial.unnamed.nf new file mode 100644 index 00000000..22eaf8c9 --- /dev/null +++ b/test-data/workflow/unnamed/trial.unnamed.nf @@ -0,0 +1,30 @@ +#!/usr/bin/env nextflow +nextflow.enable.dsl=2 + +process sayHello { + input: + val cheers + + output: + stdout emit: verbiage + path "*.txt", emit: output_files + + script: + """ + echo -n $cheers + echo -n $cheers > ${cheers}.txt + """ +} + +workflow trial { + take: things + main: + sayHello(things) + emit: + sayHello.out.verbiage + sayHello.out.output_files +} + +workflow { + Channel.from('a','b') | trial +} \ No newline at end of file diff --git a/test-data/workflow/unnamed/trial.unnamed.nf.test b/test-data/workflow/unnamed/trial.unnamed.nf.test new file mode 100644 index 00000000..8ffeead8 --- /dev/null +++ b/test-data/workflow/unnamed/trial.unnamed.nf.test @@ -0,0 +1,26 @@ +nextflow_workflow { + + name "Test workflow" + script "test-data/workflow/unnamed/trial.unnamed.nf" + workflow "trial" + + test("Should run without failures") { + when { + params { + outdir = "tests/results" + } + workflow { + """ + input[0] = Channel.of('a','b') + """ + } + } + + then { + //check if test case succeeded + assert workflow.success + assert workflow.out[0].size() == 2 + assert workflow.out[1].size() == 2 + } + } +} \ No newline at end of file