Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reads to stdin and writes to stdout, batch execution of workflows #69

Merged
merged 16 commits into from
May 1, 2020
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 36 additions & 4 deletions CLI/cli.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import sys

import click
import os
import uuid

from pyworkflow import Workflow
from pyworkflow import NodeException


class Config(object):
Expand All @@ -13,13 +18,40 @@ def __init__(self):
@click.option('--file-directory', type=click.Path())
@pass_config
def cli(config, file_directory):
if file_directory is None:
file_directory = '.'
config.file_directory = file_directory

stdin_files = []

if not click.get_text_stream('stdin').isatty():

stdin_text = click.get_text_stream('stdin')

# TODO should be done for each separate file coming from stdin, currently working for one file, but easy to build up.

#write standard in to a new file in local filesystem
file_name = str(uuid.uuid4())

# TODO small issue here, might be better to upload this file to the workflow directory instead of cwd
new_file_path = os.path.join(os.getcwd(), file_name)

#read from std in and upload a new file in project directory
with open(new_file_path, 'w') as f:
f.write(stdin_text.read())

stdin_files.append(file_name)

config.stdin_files = stdin_files


@cli.command()
@pass_config
def execute(config):
click.echo('Loading workflow file form %s' % config.file_directory)
Workflow.execute_workflow(config.file_directory)
if config.file_directory is None:
click.echo('Please specify a workflow to run')
return
try:
click.echo('Loading workflow file form %s' % config.file_directory)
Workflow.execute_workflow(config.file_directory, config.stdin_files)
except NodeException as ne:
click.echo("Issues during node execution")
click.echo(ne)
10 changes: 10 additions & 0 deletions pyworkflow/pyworkflow/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,16 @@ def execute(self, predecessor_data, flow_vars):
except Exception as e:
raise NodeException('read csv', str(e))

def execute_for_read(self, predecessor_data, flow_vars, file_to_read):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to alter the node configuration (probably through node.option_values) to point to the stdin-copied file rather than create a new method? I think this would resolve the need for duplicated execute calls in the workflow object as you mentioned in the comments there.

try:
fname = file_to_read
sep = self.options["sep"].get_value()
hdr = self.options["header"].get_value()
df = pd.read_csv(fname, sep=sep, header=hdr)
return df.to_json()
except Exception as e:
raise NodeException('read csv', str(e))

def __str__(self):
return "ReadCsvNode"

Expand Down
33 changes: 30 additions & 3 deletions pyworkflow/pyworkflow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import networkx as nx
import json

from .node import Node
from .node import Node, ReadCsvNode
from .node_factory import node_factory


Expand Down Expand Up @@ -382,8 +382,29 @@ def to_session_dict(self):
except nx.NetworkXError as e:
raise WorkflowException('to_session_dict', str(e))

def execute_read_csv(self, node_id, csv_location):
# TODO: some duplicated code here from execute common method. Need to refactor.
"""Execute read_csv from a file specified to standard input.
Current use case: CLI.
"""
preceding_data = list()
flow_vars = list()
node_to_execute = self.get_node(node_id)
if node_to_execute is None:
raise WorkflowException('execute', 'The workflow does not contain node %s' % node_id)
# Pass in data to current Node to use in execution
output = node_to_execute.execute_for_read(preceding_data.append(None), flow_vars.append(None), csv_location)

# Save new execution data to disk
node_to_execute.data = Workflow.store_node_data(self, node_id, output)

if node_to_execute.data is None:
raise WorkflowException('execute', 'There was a problem saving node output.')

return node_to_execute

@staticmethod
def execute_workflow(workflow_location):
def execute_workflow(workflow_location, stdin_files):
"""Execute entire workflow at a certain location.
Current use case: CLI.
"""
Expand All @@ -400,7 +421,13 @@ def execute_workflow(workflow_location):
#execute each node in the order returned by execution order method
#TODO exception handling: stop and provide details on which node failed to execute
for node in execution_order:
workflow_instance.execute(node)
if type(workflow_instance.get_node(node)) is ReadCsvNode and len(stdin_files) > 0:
csv_location = stdin_files[0]
workflow_instance.execute_read_csv(node, csv_location)
# delete file at index 0
del stdin_files[0]
else:
workflow_instance.execute(node)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran into this bug the other day where the graph didn't update after execution. The execute call returns the executed node for the front-end to then call workflow.update_or_add_node() to actually store the node.data attribute. Without this, I got complaints that predecessor data was missing when it's actually written to disk.

Changing both if/else to include executed_node = workflow_instance.<execute_method_here>... and then workflow_instance.update_or_add_node(executed_node) outside the if/else but within the for loop solves the issue. Come to think of it, we should probably update the execute method to have this behavior and update the execute endpoint as well (to avoid double-saving).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I've modified the code to do exactly what you suggested and seems to be working fine.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a modification that includes the dataframe output in sdtout. Didn't need to duplicate the exact functionality as ReadCsv I was able to print output = node_to_execute.execute(preceding_data, execution_options). I'm still not happy with the repeating of execute code in workflow, so will be refactoring this in the next PR.



class WorkflowUtils:
Expand Down