Skip to content

Commit

Permalink
fixes review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Diego Struk committed Apr 30, 2020
1 parent 48879db commit 206998f
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 7 deletions.
11 changes: 9 additions & 2 deletions CLI/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ def cli():
@click.argument('filename', type=click.Path(exists=True), nargs=-1)
def execute(filename):

write_to_stdout = not click.get_text_stream('stdout').isatty()

#execute each one of the workflows in the ar
for workflow_file in filename:

Expand All @@ -43,12 +45,17 @@ def execute(filename):

stdin_files.append(file_name)



if workflow_file is None:
click.echo('Please specify a workflow to run')
return
try:
click.echo('Loading workflow file form %s' % workflow_file)
Workflow.execute_workflow(workflow_file, stdin_files)
if not write_to_stdout:
click.echo('Loading workflow file from %s' % workflow_file)
Workflow.execute_workflow(workflow_file, stdin_files, write_to_stdout)


except NodeException as ne:
click.echo("Issues during node execution")
click.echo(ne)
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ data from `localhost:8000` **where the Django app must be running**.
## CLI
1. Run pipenv shell.
2. Create a workflow using UI and save it.
3. Run it as: pyworkflow execute (paths-to-json-workflow-files-separated-by-space)
3. Run it as: pyworkflow execute workflow-file

Also accepts reading input from std (i.e < file.csv) and writing to sdt out (i.e > output.csv)



Expand Down
44 changes: 40 additions & 4 deletions pyworkflow/pyworkflow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
from collections import OrderedDict
from modulefinder import ModuleFinder

from pyworkflow.nodes import ReadCsvNode
from pyworkflow.nodes import ReadCsvNode, WriteCsvNode

from .node import Node, NodeException
from .node_factory import node_factory

Expand Down Expand Up @@ -569,8 +570,39 @@ def execute_read_csv(self, node_id, csv_location):

return node_to_execute

def execute_write_csv(self, node_id):
node_to_execute = self.get_node(node_id)

if node_to_execute is None:
raise WorkflowException('execute', 'The workflow does not contain node %s' % node_id)

# Load predecessor data and FlowNode values
preceding_data = self.load_input_data(node_to_execute.node_id)
flow_nodes = self.load_flow_nodes(node_to_execute.option_replace)

try:
# Validate input data, and replace flow variables
node_to_execute.validate_input_data(len(preceding_data))
execution_options = node_to_execute.get_execution_options(flow_nodes)

# Pass in data to current Node to use in execution
output = node_to_execute.execute(preceding_data, execution_options)

#printing the output in order to include in std_out
print(output)

# Save new execution data to disk
node_to_execute.data = Workflow.store_node_data(self, node_id, output)
except NodeException as e:
raise e

if node_to_execute.data is None:
raise WorkflowException('execute', 'There was a problem saving node output.')

return node_to_execute

@staticmethod
def execute_workflow(workflow_location, stdin_files):
def execute_workflow(workflow_location, stdin_files, write_to_stdout):
"""Execute entire workflow at a certain location.
Current use case: CLI.
"""
Expand All @@ -589,11 +621,15 @@ def execute_workflow(workflow_location, stdin_files):
for node in execution_order:
if type(workflow_instance.get_node(node)) is ReadCsvNode and len(stdin_files) > 0:
csv_location = stdin_files[0]
workflow_instance.execute_read_csv(node, csv_location)
executed_node = workflow_instance.execute_read_csv(node, csv_location)
# delete file at index 0
del stdin_files[0]
elif type(workflow_instance.get_node(node)) is WriteCsvNode and write_to_stdout:
executed_node = workflow_instance.execute_write_csv(node)
else:
workflow_instance.execute(node)
executed_node = workflow_instance.execute(node)

workflow_instance.update_or_add_node(executed_node)


class WorkflowUtils:
Expand Down

0 comments on commit 206998f

Please sign in to comment.