Skip to content

Commit

Permalink
Merge pull request #69 from matthew-t-smith/dev/cli
Browse files Browse the repository at this point in the history
Reads to stdin and writes to stdout, batch execution of workflows
  • Loading branch information
reelmatt authored May 1, 2020
2 parents 2e5d26e + 206998f commit c794f7a
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 13 deletions.
56 changes: 46 additions & 10 deletions CLI/cli.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import sys

import click
import os
import uuid

from pyworkflow import Workflow
from pyworkflow import NodeException


class Config(object):
Expand All @@ -10,16 +15,47 @@ def __init__(self):
pass_config = click.make_pass_decorator(Config, ensure=True)

@click.group()
@click.option('--file-directory', type=click.Path())
@pass_config
def cli(config, file_directory):
if file_directory is None:
file_directory = '.'
config.file_directory = file_directory
def cli():
pass


@cli.command()
@pass_config
def execute(config):
click.echo('Loading workflow file form %s' % config.file_directory)
Workflow.execute_workflow(config.file_directory)
@click.argument('filename', type=click.Path(exists=True), nargs=-1)
def execute(filename):

write_to_stdout = not click.get_text_stream('stdout').isatty()

#execute each one of the workflows in the ar
for workflow_file in filename:

stdin_files = []

if not click.get_text_stream('stdin').isatty():
stdin_text = click.get_text_stream('stdin')

# write standard in to a new file in local filesystem
file_name = str(uuid.uuid4())

# TODO small issue here, might be better to upload this file to the workflow directory instead of cwd
new_file_path = os.path.join(os.getcwd(), file_name)

# read from std in and upload a new file in project directory
with open(new_file_path, 'w') as f:
f.write(stdin_text.read())

stdin_files.append(file_name)



if workflow_file is None:
click.echo('Please specify a workflow to run')
return
try:
if not write_to_stdout:
click.echo('Loading workflow file from %s' % workflow_file)
Workflow.execute_workflow(workflow_file, stdin_files, write_to_stdout)


except NodeException as ne:
click.echo("Issues during node execution")
click.echo(ne)
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ data from `localhost:8000` **where the Django app must be running**.
## CLI
1. Run pipenv shell.
2. Create a workflow using UI and save it.
3. Run it as: pyworkflow --file-directory (path-to-json-workflow-file) execute
3. Run it as: pyworkflow execute workflow-file

Also accepts reading input from std (i.e < file.csv) and writing to sdt out (i.e > output.csv)



---
## Tests
Expand Down
10 changes: 10 additions & 0 deletions pyworkflow/pyworkflow/nodes/io/read_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,13 @@ def execute(self, predecessor_data, flow_vars):
return df.to_json()
except Exception as e:
raise NodeException('read csv', str(e))

def execute_for_read(self, predecessor_data, flow_vars, file_to_read):
try:
fname = file_to_read
sep = self.options["sep"].get_value()
hdr = self.options["header"].get_value()
df = pd.read_csv(fname, sep=sep, header=hdr)
return df.to_json()
except Exception as e:
raise NodeException('read csv', str(e))
68 changes: 66 additions & 2 deletions pyworkflow/pyworkflow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from collections import OrderedDict
from modulefinder import ModuleFinder

from pyworkflow.nodes import ReadCsvNode, WriteCsvNode

from .node import Node, NodeException
from .node_factory import node_factory

Expand Down Expand Up @@ -547,8 +549,60 @@ def to_session_dict(self):
except nx.NetworkXError as e:
raise WorkflowException('to_session_dict', str(e))

def execute_read_csv(self, node_id, csv_location):
# TODO: some duplicated code here from execute common method. Need to refactor.
"""Execute read_csv from a file specified to standard input.
Current use case: CLI.
"""
preceding_data = list()
flow_vars = list()
node_to_execute = self.get_node(node_id)
if node_to_execute is None:
raise WorkflowException('execute', 'The workflow does not contain node %s' % node_id)
# Pass in data to current Node to use in execution
output = node_to_execute.execute_for_read(preceding_data.append(None), flow_vars.append(None), csv_location)

# Save new execution data to disk
node_to_execute.data = Workflow.store_node_data(self, node_id, output)

if node_to_execute.data is None:
raise WorkflowException('execute', 'There was a problem saving node output.')

return node_to_execute

def execute_write_csv(self, node_id):
node_to_execute = self.get_node(node_id)

if node_to_execute is None:
raise WorkflowException('execute', 'The workflow does not contain node %s' % node_id)

# Load predecessor data and FlowNode values
preceding_data = self.load_input_data(node_to_execute.node_id)
flow_nodes = self.load_flow_nodes(node_to_execute.option_replace)

try:
# Validate input data, and replace flow variables
node_to_execute.validate_input_data(len(preceding_data))
execution_options = node_to_execute.get_execution_options(flow_nodes)

# Pass in data to current Node to use in execution
output = node_to_execute.execute(preceding_data, execution_options)

#printing the output in order to include in std_out
print(output)

# Save new execution data to disk
node_to_execute.data = Workflow.store_node_data(self, node_id, output)
except NodeException as e:
raise e

if node_to_execute.data is None:
raise WorkflowException('execute', 'There was a problem saving node output.')

return node_to_execute

@staticmethod
def execute_workflow(workflow_location):
def execute_workflow(workflow_location, stdin_files, write_to_stdout):
"""Execute entire workflow at a certain location.
Current use case: CLI.
"""
Expand All @@ -565,7 +619,17 @@ def execute_workflow(workflow_location):
#execute each node in the order returned by execution order method
#TODO exception handling: stop and provide details on which node failed to execute
for node in execution_order:
workflow_instance.execute(node)
if type(workflow_instance.get_node(node)) is ReadCsvNode and len(stdin_files) > 0:
csv_location = stdin_files[0]
executed_node = workflow_instance.execute_read_csv(node, csv_location)
# delete file at index 0
del stdin_files[0]
elif type(workflow_instance.get_node(node)) is WriteCsvNode and write_to_stdout:
executed_node = workflow_instance.execute_write_csv(node)
else:
executed_node = workflow_instance.execute(node)

workflow_instance.update_or_add_node(executed_node)


class WorkflowUtils:
Expand Down

0 comments on commit c794f7a

Please sign in to comment.