Skip to content

Commit

Permalink
Merge pull request #90 from matthew-t-smith/refactor/cli
Browse files Browse the repository at this point in the history
Refactors CLI to reduce need for special execute functions
  • Loading branch information
reelmatt authored May 9, 2020
2 parents a8a4a1d + d4d3a1e commit 05a0909
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 130 deletions.
125 changes: 92 additions & 33 deletions back-end/CLI/cli.py
Original file line number Diff line number Diff line change
@@ -1,64 +1,123 @@
import sys

import click
import os
import uuid
import json

from pyworkflow import Workflow
from pyworkflow import Workflow, WorkflowException
from pyworkflow import NodeException
from pyworkflow.nodes import ReadCsvNode, WriteCsvNode


class Config(object):
def __init__(self):
self.verbose = False


pass_config = click.make_pass_decorator(Config, ensure=True)


@click.group()
def cli():
pass


@cli.command()
@click.argument('filename', type=click.Path(exists=True), nargs=-1)
@click.argument('filenames', type=click.Path(exists=True), nargs=-1)
@click.option('--verbose', is_flag=True, help='Enables verbose mode.')
def execute(filename, verbose):
def execute(filenames, verbose):
"""Execute Workflow file(s)."""
# Check whether to log to terminal, or redirect output
log = click.get_text_stream('stdout').isatty()

write_to_stdout = not click.get_text_stream('stdout').isatty()
# Execute each workflow in the args
for workflow_file in filenames:

#execute each one of the workflows in the ar
for workflow_file in filename:
if workflow_file is None:
click.echo('Please specify a workflow to run', err=True)
return

stdin_files = []
if log:
click.echo('Loading workflow file from %s' % workflow_file)

if not click.get_text_stream('stdin').isatty():
stdin_text = click.get_text_stream('stdin')
try:
workflow = open_workflow(workflow_file)
execute_workflow(workflow, log, verbose)
except OSError as e:
click.echo(f"Issues loading workflow file: {e}", err=True)
except WorkflowException as e:
click.echo(f"Issues during workflow execution\n{e}", err=True)


def execute_workflow(workflow, log, verbose):
"""Execute a workflow file, node-by-node.
Retrieves the execution order from the Workflow and iterates through nodes.
If any I/O nodes are present AND stdin/stdout redirection is provided in the
command-line, overwrite the stored options and then replace before saving.
Args:
workflow - Workflow object loaded from file
log - True, for outputting to terminal; False for stdout redirection
verbose - True, for outputting debug information; False otherwise
"""
execution_order = workflow.execution_order()

# Execute each node in the order returned by the Workflow
for node in execution_order:
try:
node_to_execute = workflow.get_node(node)
original_file_option = pre_execute(workflow, node_to_execute, log)

# write standard in to a new file in local filesystem
file_name = str(uuid.uuid4())
if verbose:
print('Executing node of type ' + str(type(node_to_execute)))

# TODO small issue here, might be better to upload this file to the workflow directory instead of cwd
new_file_path = os.path.join(os.getcwd(), file_name)
# perform execution
executed_node = workflow.execute(node)

# read from std in and upload a new file in project directory
with open(new_file_path, 'w') as f:
f.write(stdin_text.read())
# If file was replaced with stdin/stdout, restore original option
if original_file_option is not None:
executed_node.option_values["file"] = original_file_option

stdin_files.append(file_name)
# Update Node in Workflow with changes (saved data file)
workflow.update_or_add_node(executed_node)
except NodeException as e:
click.echo(f"Issues during node execution\n{e}", err=True)

if workflow_file is None:
click.echo('Please specify a workflow to run')
return
try:
if not write_to_stdout:
click.echo('Loading workflow file from %s' % workflow_file)
if verbose:
click.echo('Completed workflow execution!')

Workflow.execute_workflow(workflow_file, stdin_files, write_to_stdout, verbose)

if verbose:
click.echo('Completed workflow execution!')
def pre_execute(workflow, node_to_execute, log):
"""Pre-execution steps, to overwrite file options with stdin/stdout.
If stdin is not a tty, and the Node is ReadCsv, replace file with buffer.
If stdout is not a tty, and the Node is WriteCsv, replace file with buffer.
Args:
workflow - Workflow object loaded from file
node_to_execute - The Node to execute
log - True, for outputting to terminal; False for stdout redirection
"""
stdin = click.get_text_stream('stdin')

if type(node_to_execute) is ReadCsvNode and not stdin.isatty():
new_file_location = stdin
elif type(node_to_execute) is WriteCsvNode and not log:
new_file_location = click.get_text_stream('stdout')
else:
# No file redirection needed
return None

# save original file info
original_file_option = node_to_execute.option_values["file"]

# replace with value from stdin and save
node_to_execute.option_values["file"] = new_file_location
workflow.update_or_add_node(node_to_execute)

return original_file_option


def open_workflow(workflow_file):
with open(workflow_file) as f:
json_content = json.load(f)

except NodeException as ne:
click.echo("Issues during node execution")
click.echo(ne)
return Workflow.from_json(json_content['pyworkflow'])
8 changes: 6 additions & 2 deletions back-end/pyworkflow/pyworkflow/node.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .parameters import *

import io

class Node:
"""Node object
Expand Down Expand Up @@ -58,7 +58,11 @@ def get_execution_options(self, workflow, flow_nodes):
else:
replacement_value = option.get_value()

if key == 'file':
if key == 'file' and type(replacement_value) == io.TextIOWrapper:
# For files specified via stdin/stdout, store directly
option.set_value(replacement_value)
elif key == 'file':
# Otherwise, point to filepath stored in Workflow directory
option.set_value(workflow.path(replacement_value))

execution_options[key] = option
Expand Down
10 changes: 0 additions & 10 deletions back-end/pyworkflow/pyworkflow/nodes/io/read_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,3 @@ def execute(self, predecessor_data, flow_vars):
return df.to_json()
except Exception as e:
raise NodeException('read csv', str(e))

def execute_for_read(self, predecessor_data, flow_vars, file_to_read):
try:
fname = file_to_read
sep = self.options["sep"].get_value()
hdr = self.options["header"].get_value()
df = pd.read_csv(fname, sep=sep, header=hdr)
return df.to_json()
except Exception as e:
raise NodeException('read csv', str(e))
85 changes: 0 additions & 85 deletions back-end/pyworkflow/pyworkflow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -556,91 +556,6 @@ def to_session_dict(self):
except nx.NetworkXError as e:
raise WorkflowException('to_session_dict', str(e))

def execute_read_csv(self, node_id, csv_location):
# TODO: some duplicated code here from execute common method. Need to refactor.
"""Execute read_csv from a file specified to standard input.
Current use case: CLI.
"""
preceding_data = list()
flow_vars = list()
node_to_execute = self.get_node(node_id)
if node_to_execute is None:
raise WorkflowException('execute', 'The workflow does not contain node %s' % node_id)
# Pass in data to current Node to use in execution
output = node_to_execute.execute_for_read(preceding_data.append(None), flow_vars.append(None), csv_location)

# Save new execution data to disk
node_to_execute.data = Workflow.store_node_data(self, node_id, output)

if node_to_execute.data is None:
raise WorkflowException('execute', 'There was a problem saving node output.')

return node_to_execute

def execute_write_csv(self, node_id):
node_to_execute = self.get_node(node_id)

if node_to_execute is None:
raise WorkflowException('execute', 'The workflow does not contain node %s' % node_id)

# Load predecessor data and FlowNode values
preceding_data = self.load_input_data(node_to_execute.node_id)
flow_nodes = self.load_flow_nodes(node_to_execute.option_replace)

try:
# Validate input data, and replace flow variables
node_to_execute.validate_input_data(len(preceding_data))
execution_options = node_to_execute.get_execution_options(self, flow_nodes)
# Pass in data to current Node to use in execution
output = node_to_execute.execute(preceding_data, execution_options)

#printing the output in order to include in std_out
print(output)

# Save new execution data to disk
node_to_execute.data = Workflow.store_node_data(self, node_id, output)
except NodeException as e:
raise e

if node_to_execute.data is None:
raise WorkflowException('execute', 'There was a problem saving node output.')

return node_to_execute

@staticmethod
def execute_workflow(workflow_location, stdin_files, write_to_stdout, verbose_mode):
"""Execute entire workflow at a certain location.
Current use case: CLI.
"""
#load the file at workflow_location
with open(workflow_location) as f:
json_content = json.load(f)

#convert it to a workflow
workflow_instance = Workflow.from_json(json_content['pyworkflow'])

#get the execution order
execution_order = workflow_instance.execution_order()

#execute each node in the order returned by execution order method
#TODO exception handling: stop and provide details on which node failed to execute
for node in execution_order:

if verbose_mode:
print('Executing node of type ' + str(type(workflow_instance.get_node(node))))

if type(workflow_instance.get_node(node)) is ReadCsvNode and len(stdin_files) > 0:
csv_location = stdin_files[0]
executed_node = workflow_instance.execute_read_csv(node, csv_location)
# delete file at index 0
del stdin_files[0]
elif type(workflow_instance.get_node(node)) is WriteCsvNode and write_to_stdout:
executed_node = workflow_instance.execute_write_csv(node)
else:
executed_node = workflow_instance.execute(node)

workflow_instance.update_or_add_node(executed_node)


class WorkflowUtils:
@staticmethod
Expand Down

0 comments on commit 05a0909

Please sign in to comment.