Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reads to stdin and writes to stdout, batch execution of workflows #69

Merged
merged 16 commits into from
May 1, 2020
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 39 additions & 10 deletions CLI/cli.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import sys

import click
import os
import uuid

from pyworkflow import Workflow
from pyworkflow import NodeException


class Config(object):
Expand All @@ -10,16 +15,40 @@ def __init__(self):
pass_config = click.make_pass_decorator(Config, ensure=True)

@click.group()
@click.option('--file-directory', type=click.Path())
@pass_config
def cli(config, file_directory):
if file_directory is None:
file_directory = '.'
config.file_directory = file_directory
def cli():
pass


@cli.command()
@pass_config
def execute(config):
click.echo('Loading workflow file form %s' % config.file_directory)
Workflow.execute_workflow(config.file_directory)
@click.argument('filename', type=click.Path(exists=True), nargs=-1)
def execute(filename):

#execute each one of the workflows in the ar
for workflow_file in filename:

stdin_files = []

if not click.get_text_stream('stdin').isatty():
stdin_text = click.get_text_stream('stdin')

# write standard in to a new file in local filesystem
file_name = str(uuid.uuid4())

# TODO small issue here, might be better to upload this file to the workflow directory instead of cwd
new_file_path = os.path.join(os.getcwd(), file_name)

# read from std in and upload a new file in project directory
with open(new_file_path, 'w') as f:
f.write(stdin_text.read())

stdin_files.append(file_name)

if workflow_file is None:
click.echo('Please specify a workflow to run')
return
try:
click.echo('Loading workflow file form %s' % workflow_file)
diegostruk marked this conversation as resolved.
Show resolved Hide resolved
Workflow.execute_workflow(workflow_file, stdin_files)
except NodeException as ne:
click.echo("Issues during node execution")
click.echo(ne)
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ data from `localhost:8000` **where the Django app must be running**.
## CLI
1. Run pipenv shell.
2. Create a workflow using UI and save it.
3. Run it as: pyworkflow --file-directory (path-to-json-workflow-file) execute
3. Run it as: pyworkflow execute (paths-to-json-workflow-files-separated-by-space)
diegostruk marked this conversation as resolved.
Show resolved Hide resolved



---
## Tests
Expand Down
10 changes: 10 additions & 0 deletions pyworkflow/pyworkflow/nodes/io/read_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,13 @@ def execute(self, predecessor_data, flow_vars):
return df.to_json()
except Exception as e:
raise NodeException('read csv', str(e))

def execute_for_read(self, predecessor_data, flow_vars, file_to_read):
try:
fname = file_to_read
sep = self.options["sep"].get_value()
hdr = self.options["header"].get_value()
df = pd.read_csv(fname, sep=sep, header=hdr)
return df.to_json()
except Exception as e:
raise NodeException('read csv', str(e))
32 changes: 30 additions & 2 deletions pyworkflow/pyworkflow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from collections import OrderedDict
from modulefinder import ModuleFinder

from pyworkflow.nodes import ReadCsvNode
from .node import Node, NodeException
from .node_factory import node_factory

Expand Down Expand Up @@ -547,8 +548,29 @@ def to_session_dict(self):
except nx.NetworkXError as e:
raise WorkflowException('to_session_dict', str(e))

def execute_read_csv(self, node_id, csv_location):
# TODO: some duplicated code here from execute common method. Need to refactor.
"""Execute read_csv from a file specified to standard input.
Current use case: CLI.
"""
preceding_data = list()
flow_vars = list()
node_to_execute = self.get_node(node_id)
if node_to_execute is None:
raise WorkflowException('execute', 'The workflow does not contain node %s' % node_id)
# Pass in data to current Node to use in execution
output = node_to_execute.execute_for_read(preceding_data.append(None), flow_vars.append(None), csv_location)

# Save new execution data to disk
node_to_execute.data = Workflow.store_node_data(self, node_id, output)

if node_to_execute.data is None:
raise WorkflowException('execute', 'There was a problem saving node output.')

return node_to_execute

@staticmethod
def execute_workflow(workflow_location):
def execute_workflow(workflow_location, stdin_files):
"""Execute entire workflow at a certain location.
Current use case: CLI.
"""
Expand All @@ -565,7 +587,13 @@ def execute_workflow(workflow_location):
#execute each node in the order returned by execution order method
#TODO exception handling: stop and provide details on which node failed to execute
for node in execution_order:
workflow_instance.execute(node)
if type(workflow_instance.get_node(node)) is ReadCsvNode and len(stdin_files) > 0:
csv_location = stdin_files[0]
workflow_instance.execute_read_csv(node, csv_location)
# delete file at index 0
del stdin_files[0]
else:
workflow_instance.execute(node)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran into this bug the other day where the graph didn't update after execution. The execute call returns the executed node for the front-end to then call workflow.update_or_add_node() to actually store the node.data attribute. Without this, I got complaints that predecessor data was missing when it's actually written to disk.

Changing both if/else to include executed_node = workflow_instance.<execute_method_here>... and then workflow_instance.update_or_add_node(executed_node) outside the if/else but within the for loop solves the issue. Come to think of it, we should probably update the execute method to have this behavior and update the execute endpoint as well (to avoid double-saving).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I've modified the code to do exactly what you suggested and seems to be working fine.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a modification that includes the dataframe output in sdtout. Didn't need to duplicate the exact functionality as ReadCsv I was able to print output = node_to_execute.execute(preceding_data, execution_options). I'm still not happy with the repeating of execute code in workflow, so will be refactoring this in the next PR.



class WorkflowUtils:
Expand Down