From ff2fccc84312e1367bf53f3077a0e55015c4f9e1 Mon Sep 17 00:00:00 2001 From: Diego Struk Date: Sat, 18 Apr 2020 17:19:24 -0400 Subject: [PATCH 01/15] Exception catching to display user friendly error message --- CLI/cli.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/CLI/cli.py b/CLI/cli.py index 80ad501..e6cb9e1 100644 --- a/CLI/cli.py +++ b/CLI/cli.py @@ -1,6 +1,7 @@ import click from pyworkflow import Workflow +from pyworkflow import NodeException class Config(object): @@ -21,5 +22,9 @@ def cli(config, file_directory): @cli.command() @pass_config def execute(config): - click.echo('Loading workflow file form %s' % config.file_directory) - Workflow.execute_workflow(config.file_directory) \ No newline at end of file + try: + click.echo('Loading workflow file form %s' % config.file_directory) + Workflow.execute_workflow(config.file_directory) + except NodeException as ne: + click.echo("Issues during node exception") + click.echo(ne) From 144f03765bc86b5d5d9c885d4b65599e3b4c9b63 Mon Sep 17 00:00:00 2001 From: Diego Struk Date: Sat, 18 Apr 2020 17:23:32 -0400 Subject: [PATCH 02/15] Modified wording and preventing the user to run workflow without specifiying a location --- CLI/cli.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/CLI/cli.py b/CLI/cli.py index e6cb9e1..25ec24f 100644 --- a/CLI/cli.py +++ b/CLI/cli.py @@ -15,7 +15,8 @@ def __init__(self): @pass_config def cli(config, file_directory): if file_directory is None: - file_directory = '.' + click.echo('Please specify a workflow to run') + return config.file_directory = file_directory @@ -26,5 +27,5 @@ def execute(config): click.echo('Loading workflow file form %s' % config.file_directory) Workflow.execute_workflow(config.file_directory) except NodeException as ne: - click.echo("Issues during node exception") + click.echo("Issues during node execution") click.echo(ne) From d3e624c4247912a3abbf395d171bba1f88b50e85 Mon Sep 17 00:00:00 2001 From: Diego Struk Date: Sat, 18 Apr 2020 17:30:31 -0400 Subject: [PATCH 03/15] Catching empty file directory in execute function --- CLI/cli.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/CLI/cli.py b/CLI/cli.py index 25ec24f..ed4b124 100644 --- a/CLI/cli.py +++ b/CLI/cli.py @@ -14,15 +14,16 @@ def __init__(self): @click.option('--file-directory', type=click.Path()) @pass_config def cli(config, file_directory): - if file_directory is None: - click.echo('Please specify a workflow to run') - return + print('Unable to run file') config.file_directory = file_directory @cli.command() @pass_config def execute(config): + if config.file_directory is None: + click.echo('Please specify a workflow to run') + return try: click.echo('Loading workflow file form %s' % config.file_directory) Workflow.execute_workflow(config.file_directory) From 67d1b15eba643cc170bafa73a1035250234b7891 Mon Sep 17 00:00:00 2001 From: Diego Struk Date: Sat, 18 Apr 2020 17:31:05 -0400 Subject: [PATCH 04/15] Removing print statement --- CLI/cli.py | 1 - 1 file changed, 1 deletion(-) diff --git a/CLI/cli.py b/CLI/cli.py index ed4b124..ea75029 100644 --- a/CLI/cli.py +++ b/CLI/cli.py @@ -14,7 +14,6 @@ def __init__(self): @click.option('--file-directory', type=click.Path()) @pass_config def cli(config, file_directory): - print('Unable to run file') config.file_directory = file_directory From d0cd15ac6367405bad887d7c138cae20ee62bb31 Mon Sep 17 00:00:00 2001 From: Diego Struk Date: Wed, 22 Apr 2020 17:14:05 -0400 Subject: [PATCH 05/15] Reading input from stdin and executing workflow based on that input --- CLI/cli.py | 16 +++++++++++++++- pyworkflow/pyworkflow/node.py | 11 +++++++++++ pyworkflow/pyworkflow/workflow.py | 27 ++++++++++++++++++++++++--- 3 files changed, 50 insertions(+), 4 deletions(-) diff --git a/CLI/cli.py b/CLI/cli.py index ea75029..a7a0e49 100644 --- a/CLI/cli.py +++ b/CLI/cli.py @@ -1,4 +1,6 @@ import click +import os +import uuid from pyworkflow import Workflow from pyworkflow import NodeException @@ -15,7 +17,19 @@ def __init__(self): @pass_config def cli(config, file_directory): config.file_directory = file_directory + stdin_text = click.get_text_stream('stdin') + stdin_files = [] + #write standard in to a new file in local filesystem + #TODO should be done for each separate file coming from stdin + + file_name = str(uuid.uuid4()) + new_file_path = os.path.join(os.getcwd(), file_name) + #read from std in and upload a new file in project directory + with open(new_file_path, 'w') as f: + f.write(stdin_text.read()) + stdin_files.append(file_name) + config.stdin_files = stdin_files @cli.command() @pass_config @@ -25,7 +39,7 @@ def execute(config): return try: click.echo('Loading workflow file form %s' % config.file_directory) - Workflow.execute_workflow(config.file_directory) + Workflow.execute_workflow(config.file_directory, config.stdin_files) except NodeException as ne: click.echo("Issues during node execution") click.echo(ne) diff --git a/pyworkflow/pyworkflow/node.py b/pyworkflow/pyworkflow/node.py index e30d5eb..d8cc2c1 100644 --- a/pyworkflow/pyworkflow/node.py +++ b/pyworkflow/pyworkflow/node.py @@ -109,6 +109,17 @@ def execute(self, predecessor_data, flow_vars): except Exception as e: raise NodeException('read csv', str(e)) + def execute_for_read(self, predecessor_data, flow_vars, file_to_read): + try: + fname = file_to_read + print(fname) + sep = self.options["sep"].get_value() + hdr = self.options["header"].get_value() + df = pd.read_csv(fname, sep=sep, header=hdr) + return df.to_json() + except Exception as e: + raise NodeException('read csv', str(e)) + def __str__(self): return "ReadCsvNode" diff --git a/pyworkflow/pyworkflow/workflow.py b/pyworkflow/pyworkflow/workflow.py index 7930beb..add71b7 100644 --- a/pyworkflow/pyworkflow/workflow.py +++ b/pyworkflow/pyworkflow/workflow.py @@ -2,7 +2,7 @@ import networkx as nx import json -from .node import Node +from .node import Node, ReadCsvNode from .node_factory import node_factory @@ -382,8 +382,25 @@ def to_session_dict(self): except nx.NetworkXError as e: raise WorkflowException('to_session_dict', str(e)) + def execute_read_csv(self, node_id, csv_location): + preceding_data = list() + flow_vars = list() + node_to_execute = self.get_node(node_id) + if node_to_execute is None: + raise WorkflowException('execute', 'The workflow does not contain node %s' % node_id) + # Pass in data to current Node to use in execution + output = node_to_execute.execute_for_read(preceding_data.append(None), flow_vars.append(None), csv_location) + + # Save new execution data to disk + node_to_execute.data = Workflow.store_node_data(self, node_id, output) + + if node_to_execute.data is None: + raise WorkflowException('execute', 'There was a problem saving node output.') + + return node_to_execute + @staticmethod - def execute_workflow(workflow_location): + def execute_workflow(workflow_location, stdin_files): """Execute entire workflow at a certain location. Current use case: CLI. """ @@ -400,7 +417,11 @@ def execute_workflow(workflow_location): #execute each node in the order returned by execution order method #TODO exception handling: stop and provide details on which node failed to execute for node in execution_order: - workflow_instance.execute(node) + if type(workflow_instance.get_node(node)) is ReadCsvNode: + csv_location = stdin_files[0] + workflow_instance.execute_read_csv(node, csv_location) + else: + workflow_instance.execute(node) class WorkflowUtils: From a72e01a8ed94662a4be04a2dbef2c17a4fff1030 Mon Sep 17 00:00:00 2001 From: Diego Struk Date: Wed, 22 Apr 2020 17:19:42 -0400 Subject: [PATCH 06/15] Removing print statment --- pyworkflow/pyworkflow/node.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pyworkflow/pyworkflow/node.py b/pyworkflow/pyworkflow/node.py index d8cc2c1..1764814 100644 --- a/pyworkflow/pyworkflow/node.py +++ b/pyworkflow/pyworkflow/node.py @@ -112,7 +112,6 @@ def execute(self, predecessor_data, flow_vars): def execute_for_read(self, predecessor_data, flow_vars, file_to_read): try: fname = file_to_read - print(fname) sep = self.options["sep"].get_value() hdr = self.options["header"].get_value() df = pd.read_csv(fname, sep=sep, header=hdr) From aab4a382273b6ea515c47b58265014fe4f70f64f Mon Sep 17 00:00:00 2001 From: Diego Struk Date: Thu, 23 Apr 2020 15:05:38 -0400 Subject: [PATCH 07/15] Continue with execution if there is no input from stdin --- CLI/cli.py | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/CLI/cli.py b/CLI/cli.py index a7a0e49..2c85907 100644 --- a/CLI/cli.py +++ b/CLI/cli.py @@ -1,3 +1,5 @@ +import sys + import click import os import uuid @@ -17,20 +19,30 @@ def __init__(self): @pass_config def cli(config, file_directory): config.file_directory = file_directory - stdin_text = click.get_text_stream('stdin') + stdin_files = [] - #write standard in to a new file in local filesystem - #TODO should be done for each separate file coming from stdin + if not click.get_text_stream('stdin').isatty(): + + stdin_text = click.get_text_stream('stdin') + + # TODO should be done for each separate file coming from stdin, currently working for one file, but easy to build up. + + #write standard in to a new file in local filesystem + file_name = str(uuid.uuid4()) + + # TODO small issue here, might be better to upload this file to the workflow directory instead of cwd + new_file_path = os.path.join(os.getcwd(), file_name) + + #read from std in and upload a new file in project directory + with open(new_file_path, 'w') as f: + f.write(stdin_text.read()) + + stdin_files.append(file_name) - file_name = str(uuid.uuid4()) - new_file_path = os.path.join(os.getcwd(), file_name) - #read from std in and upload a new file in project directory - with open(new_file_path, 'w') as f: - f.write(stdin_text.read()) - stdin_files.append(file_name) config.stdin_files = stdin_files + @cli.command() @pass_config def execute(config): From 69d8b8c8b3da29a23e880804c82ff236166cae50 Mon Sep 17 00:00:00 2001 From: Diego Struk Date: Thu, 23 Apr 2020 15:24:26 -0400 Subject: [PATCH 08/15] Allowing to execute workflow from CLI with or without input from stdin --- pyworkflow/pyworkflow/workflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyworkflow/pyworkflow/workflow.py b/pyworkflow/pyworkflow/workflow.py index add71b7..d8cd58b 100644 --- a/pyworkflow/pyworkflow/workflow.py +++ b/pyworkflow/pyworkflow/workflow.py @@ -417,7 +417,7 @@ def execute_workflow(workflow_location, stdin_files): #execute each node in the order returned by execution order method #TODO exception handling: stop and provide details on which node failed to execute for node in execution_order: - if type(workflow_instance.get_node(node)) is ReadCsvNode: + if type(workflow_instance.get_node(node)) is ReadCsvNode and len(stdin_files) > 0: csv_location = stdin_files[0] workflow_instance.execute_read_csv(node, csv_location) else: From 2d3dfe91d30078dce73363c4ed04c4a0c5072beb Mon Sep 17 00:00:00 2001 From: Diego Struk Date: Thu, 23 Apr 2020 16:15:59 -0400 Subject: [PATCH 09/15] Handles multiple file input from std_in --- pyworkflow/pyworkflow/workflow.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pyworkflow/pyworkflow/workflow.py b/pyworkflow/pyworkflow/workflow.py index d8cd58b..1b85e9b 100644 --- a/pyworkflow/pyworkflow/workflow.py +++ b/pyworkflow/pyworkflow/workflow.py @@ -383,6 +383,10 @@ def to_session_dict(self): raise WorkflowException('to_session_dict', str(e)) def execute_read_csv(self, node_id, csv_location): + # TODO: some duplicated code here from execute common method. Need to refactor. + """Execute read_csv from a file specified to standard input. + Current use case: CLI. + """ preceding_data = list() flow_vars = list() node_to_execute = self.get_node(node_id) @@ -420,6 +424,8 @@ def execute_workflow(workflow_location, stdin_files): if type(workflow_instance.get_node(node)) is ReadCsvNode and len(stdin_files) > 0: csv_location = stdin_files[0] workflow_instance.execute_read_csv(node, csv_location) + # delete file at index 0 + del stdin_files[0] else: workflow_instance.execute(node) From 98cdef846d65b8347c48587070bd9c9c535d8e15 Mon Sep 17 00:00:00 2001 From: Diego Struk Date: Tue, 28 Apr 2020 11:33:48 -0400 Subject: [PATCH 10/15] Adapts reading from stdin to node refactoring --- pyworkflow/pyworkflow/node.py | 10 ---------- pyworkflow/pyworkflow/nodes/io/read_csv.py | 10 ++++++++++ pyworkflow/pyworkflow/workflow.py | 5 ++++- 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/pyworkflow/pyworkflow/node.py b/pyworkflow/pyworkflow/node.py index 4e87862..c59c521 100644 --- a/pyworkflow/pyworkflow/node.py +++ b/pyworkflow/pyworkflow/node.py @@ -97,16 +97,6 @@ def to_json(self): "option_replace": self.option_replace, } - def execute_for_read(self, predecessor_data, flow_vars, file_to_read): - try: - fname = file_to_read - sep = self.options["sep"].get_value() - hdr = self.options["header"].get_value() - df = pd.read_csv(fname, sep=sep, header=hdr) - return df.to_json() - except Exception as e: - raise NodeException('read csv', str(e)) - def __str__(self): return "Test" diff --git a/pyworkflow/pyworkflow/nodes/io/read_csv.py b/pyworkflow/pyworkflow/nodes/io/read_csv.py index f829d61..f909809 100644 --- a/pyworkflow/pyworkflow/nodes/io/read_csv.py +++ b/pyworkflow/pyworkflow/nodes/io/read_csv.py @@ -46,3 +46,13 @@ def execute(self, predecessor_data, flow_vars): return df.to_json() except Exception as e: raise NodeException('read csv', str(e)) + + def execute_for_read(self, predecessor_data, flow_vars, file_to_read): + try: + fname = file_to_read + sep = self.options["sep"].get_value() + hdr = self.options["header"].get_value() + df = pd.read_csv(fname, sep=sep, header=hdr) + return df.to_json() + except Exception as e: + raise NodeException('read csv', str(e)) \ No newline at end of file diff --git a/pyworkflow/pyworkflow/workflow.py b/pyworkflow/pyworkflow/workflow.py index ed030b7..c7e5769 100644 --- a/pyworkflow/pyworkflow/workflow.py +++ b/pyworkflow/pyworkflow/workflow.py @@ -8,7 +8,8 @@ from collections import OrderedDict from modulefinder import ModuleFinder -from .node import Node, NodeException, ReadCsvNode +from pyworkflow.nodes import ReadCsvNode +from .node import Node, NodeException from .node_factory import node_factory @@ -292,6 +293,8 @@ def execute(self, node_id): """ node_to_execute = self.get_node(node_id) + print(node_id) + if node_to_execute is None: raise WorkflowException('execute', 'The workflow does not contain node %s' % node_id) From 8559dc36806c3e1dcf2e77d130a26d710ceaa875 Mon Sep 17 00:00:00 2001 From: Diego Struk Date: Tue, 28 Apr 2020 13:17:19 -0400 Subject: [PATCH 11/15] Removed print statement --- pyworkflow/pyworkflow/workflow.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/pyworkflow/pyworkflow/workflow.py b/pyworkflow/pyworkflow/workflow.py index c7e5769..a35b602 100644 --- a/pyworkflow/pyworkflow/workflow.py +++ b/pyworkflow/pyworkflow/workflow.py @@ -293,8 +293,6 @@ def execute(self, node_id): """ node_to_execute = self.get_node(node_id) - print(node_id) - if node_to_execute is None: raise WorkflowException('execute', 'The workflow does not contain node %s' % node_id) From 7fe89c64bd61647e1efbd0cb7fbb18425184c46f Mon Sep 17 00:00:00 2001 From: Diego Struk Date: Tue, 28 Apr 2020 14:36:01 -0400 Subject: [PATCH 12/15] Modify command for execution to pyworkflow execute workflow-file-location --- CLI/cli.py | 28 ++++++++++++---------------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/CLI/cli.py b/CLI/cli.py index 2c85907..399e813 100644 --- a/CLI/cli.py +++ b/CLI/cli.py @@ -15,43 +15,39 @@ def __init__(self): pass_config = click.make_pass_decorator(Config, ensure=True) @click.group() -@click.option('--file-directory', type=click.Path()) -@pass_config -def cli(config, file_directory): - config.file_directory = file_directory +def cli(): + pass + + +@cli.command() +@click.argument('filename', type=click.Path(exists=True)) +def execute(filename): stdin_files = [] if not click.get_text_stream('stdin').isatty(): - stdin_text = click.get_text_stream('stdin') # TODO should be done for each separate file coming from stdin, currently working for one file, but easy to build up. - #write standard in to a new file in local filesystem + # write standard in to a new file in local filesystem file_name = str(uuid.uuid4()) # TODO small issue here, might be better to upload this file to the workflow directory instead of cwd new_file_path = os.path.join(os.getcwd(), file_name) - #read from std in and upload a new file in project directory + # read from std in and upload a new file in project directory with open(new_file_path, 'w') as f: f.write(stdin_text.read()) stdin_files.append(file_name) - config.stdin_files = stdin_files - - -@cli.command() -@pass_config -def execute(config): - if config.file_directory is None: + if filename is None: click.echo('Please specify a workflow to run') return try: - click.echo('Loading workflow file form %s' % config.file_directory) - Workflow.execute_workflow(config.file_directory, config.stdin_files) + click.echo('Loading workflow file form %s' % filename) + Workflow.execute_workflow(filename, stdin_files) except NodeException as ne: click.echo("Issues during node execution") click.echo(ne) From f798249218aa23f486245d56da30f2663a7912de Mon Sep 17 00:00:00 2001 From: Diego Struk Date: Tue, 28 Apr 2020 15:54:14 -0400 Subject: [PATCH 13/15] Batch execution of workflow --- CLI/cli.py | 45 +++++++++++++++++++++++---------------------- 1 file changed, 23 insertions(+), 22 deletions(-) diff --git a/CLI/cli.py b/CLI/cli.py index 399e813..54d93c5 100644 --- a/CLI/cli.py +++ b/CLI/cli.py @@ -20,34 +20,35 @@ def cli(): @cli.command() -@click.argument('filename', type=click.Path(exists=True)) +@click.argument('filename', type=click.Path(exists=True), nargs=-1) def execute(filename): - stdin_files = [] + #execute each one of the workflows in the ar + for workflow_file in filename: - if not click.get_text_stream('stdin').isatty(): - stdin_text = click.get_text_stream('stdin') + stdin_files = [] - # TODO should be done for each separate file coming from stdin, currently working for one file, but easy to build up. + if not click.get_text_stream('stdin').isatty(): + stdin_text = click.get_text_stream('stdin') - # write standard in to a new file in local filesystem - file_name = str(uuid.uuid4()) + # write standard in to a new file in local filesystem + file_name = str(uuid.uuid4()) - # TODO small issue here, might be better to upload this file to the workflow directory instead of cwd - new_file_path = os.path.join(os.getcwd(), file_name) + # TODO small issue here, might be better to upload this file to the workflow directory instead of cwd + new_file_path = os.path.join(os.getcwd(), file_name) - # read from std in and upload a new file in project directory - with open(new_file_path, 'w') as f: - f.write(stdin_text.read()) + # read from std in and upload a new file in project directory + with open(new_file_path, 'w') as f: + f.write(stdin_text.read()) - stdin_files.append(file_name) + stdin_files.append(file_name) - if filename is None: - click.echo('Please specify a workflow to run') - return - try: - click.echo('Loading workflow file form %s' % filename) - Workflow.execute_workflow(filename, stdin_files) - except NodeException as ne: - click.echo("Issues during node execution") - click.echo(ne) + if workflow_file is None: + click.echo('Please specify a workflow to run') + return + try: + click.echo('Loading workflow file form %s' % workflow_file) + Workflow.execute_workflow(workflow_file, stdin_files) + except NodeException as ne: + click.echo("Issues during node execution") + click.echo(ne) From 48879dbc36f4af34deb244e816311a3c9682e757 Mon Sep 17 00:00:00 2001 From: Diego Struk Date: Tue, 28 Apr 2020 16:01:34 -0400 Subject: [PATCH 14/15] Updates readme --- README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index c29de6c..dd9855e 100644 --- a/README.md +++ b/README.md @@ -61,7 +61,9 @@ data from `localhost:8000` **where the Django app must be running**. ## CLI 1. Run pipenv shell. 2. Create a workflow using UI and save it. -3. Run it as: pyworkflow --file-directory (path-to-json-workflow-file) execute +3. Run it as: pyworkflow execute (paths-to-json-workflow-files-separated-by-space) + + --- ## Tests From 206998f220444cbdc61a1dd96419272677cb70aa Mon Sep 17 00:00:00 2001 From: Diego Struk Date: Thu, 30 Apr 2020 16:45:28 -0400 Subject: [PATCH 15/15] fixes review comments --- CLI/cli.py | 11 ++++++-- README.md | 4 ++- pyworkflow/pyworkflow/workflow.py | 44 ++++++++++++++++++++++++++++--- 3 files changed, 52 insertions(+), 7 deletions(-) diff --git a/CLI/cli.py b/CLI/cli.py index 54d93c5..2e6bd98 100644 --- a/CLI/cli.py +++ b/CLI/cli.py @@ -23,6 +23,8 @@ def cli(): @click.argument('filename', type=click.Path(exists=True), nargs=-1) def execute(filename): + write_to_stdout = not click.get_text_stream('stdout').isatty() + #execute each one of the workflows in the ar for workflow_file in filename: @@ -43,12 +45,17 @@ def execute(filename): stdin_files.append(file_name) + + if workflow_file is None: click.echo('Please specify a workflow to run') return try: - click.echo('Loading workflow file form %s' % workflow_file) - Workflow.execute_workflow(workflow_file, stdin_files) + if not write_to_stdout: + click.echo('Loading workflow file from %s' % workflow_file) + Workflow.execute_workflow(workflow_file, stdin_files, write_to_stdout) + + except NodeException as ne: click.echo("Issues during node execution") click.echo(ne) diff --git a/README.md b/README.md index dd9855e..6e9bab3 100644 --- a/README.md +++ b/README.md @@ -61,7 +61,9 @@ data from `localhost:8000` **where the Django app must be running**. ## CLI 1. Run pipenv shell. 2. Create a workflow using UI and save it. -3. Run it as: pyworkflow execute (paths-to-json-workflow-files-separated-by-space) +3. Run it as: pyworkflow execute workflow-file + +Also accepts reading input from std (i.e < file.csv) and writing to sdt out (i.e > output.csv) diff --git a/pyworkflow/pyworkflow/workflow.py b/pyworkflow/pyworkflow/workflow.py index a35b602..acdabcb 100644 --- a/pyworkflow/pyworkflow/workflow.py +++ b/pyworkflow/pyworkflow/workflow.py @@ -8,7 +8,8 @@ from collections import OrderedDict from modulefinder import ModuleFinder -from pyworkflow.nodes import ReadCsvNode +from pyworkflow.nodes import ReadCsvNode, WriteCsvNode + from .node import Node, NodeException from .node_factory import node_factory @@ -569,8 +570,39 @@ def execute_read_csv(self, node_id, csv_location): return node_to_execute + def execute_write_csv(self, node_id): + node_to_execute = self.get_node(node_id) + + if node_to_execute is None: + raise WorkflowException('execute', 'The workflow does not contain node %s' % node_id) + + # Load predecessor data and FlowNode values + preceding_data = self.load_input_data(node_to_execute.node_id) + flow_nodes = self.load_flow_nodes(node_to_execute.option_replace) + + try: + # Validate input data, and replace flow variables + node_to_execute.validate_input_data(len(preceding_data)) + execution_options = node_to_execute.get_execution_options(flow_nodes) + + # Pass in data to current Node to use in execution + output = node_to_execute.execute(preceding_data, execution_options) + + #printing the output in order to include in std_out + print(output) + + # Save new execution data to disk + node_to_execute.data = Workflow.store_node_data(self, node_id, output) + except NodeException as e: + raise e + + if node_to_execute.data is None: + raise WorkflowException('execute', 'There was a problem saving node output.') + + return node_to_execute + @staticmethod - def execute_workflow(workflow_location, stdin_files): + def execute_workflow(workflow_location, stdin_files, write_to_stdout): """Execute entire workflow at a certain location. Current use case: CLI. """ @@ -589,11 +621,15 @@ def execute_workflow(workflow_location, stdin_files): for node in execution_order: if type(workflow_instance.get_node(node)) is ReadCsvNode and len(stdin_files) > 0: csv_location = stdin_files[0] - workflow_instance.execute_read_csv(node, csv_location) + executed_node = workflow_instance.execute_read_csv(node, csv_location) # delete file at index 0 del stdin_files[0] + elif type(workflow_instance.get_node(node)) is WriteCsvNode and write_to_stdout: + executed_node = workflow_instance.execute_write_csv(node) else: - workflow_instance.execute(node) + executed_node = workflow_instance.execute(node) + + workflow_instance.update_or_add_node(executed_node) class WorkflowUtils: