diff --git a/CLI/cli.py b/CLI/cli.py index 80ad501..2e6bd98 100644 --- a/CLI/cli.py +++ b/CLI/cli.py @@ -1,6 +1,11 @@ +import sys + import click +import os +import uuid from pyworkflow import Workflow +from pyworkflow import NodeException class Config(object): @@ -10,16 +15,47 @@ def __init__(self): pass_config = click.make_pass_decorator(Config, ensure=True) @click.group() -@click.option('--file-directory', type=click.Path()) -@pass_config -def cli(config, file_directory): - if file_directory is None: - file_directory = '.' - config.file_directory = file_directory +def cli(): + pass @cli.command() -@pass_config -def execute(config): - click.echo('Loading workflow file form %s' % config.file_directory) - Workflow.execute_workflow(config.file_directory) \ No newline at end of file +@click.argument('filename', type=click.Path(exists=True), nargs=-1) +def execute(filename): + + write_to_stdout = not click.get_text_stream('stdout').isatty() + + #execute each one of the workflows in the ar + for workflow_file in filename: + + stdin_files = [] + + if not click.get_text_stream('stdin').isatty(): + stdin_text = click.get_text_stream('stdin') + + # write standard in to a new file in local filesystem + file_name = str(uuid.uuid4()) + + # TODO small issue here, might be better to upload this file to the workflow directory instead of cwd + new_file_path = os.path.join(os.getcwd(), file_name) + + # read from std in and upload a new file in project directory + with open(new_file_path, 'w') as f: + f.write(stdin_text.read()) + + stdin_files.append(file_name) + + + + if workflow_file is None: + click.echo('Please specify a workflow to run') + return + try: + if not write_to_stdout: + click.echo('Loading workflow file from %s' % workflow_file) + Workflow.execute_workflow(workflow_file, stdin_files, write_to_stdout) + + + except NodeException as ne: + click.echo("Issues during node execution") + click.echo(ne) diff --git a/README.md b/README.md index e6b18b7..90d589e 100644 --- a/README.md +++ b/README.md @@ -65,7 +65,11 @@ data from `localhost:8000` **where the Django app must be running**. ## CLI 1. Run pipenv shell. 2. Create a workflow using UI and save it. -3. Run it as: pyworkflow --file-directory (path-to-json-workflow-file) execute +3. Run it as: pyworkflow execute workflow-file + +Also accepts reading input from std (i.e < file.csv) and writing to sdt out (i.e > output.csv) + + --- ## Tests diff --git a/pyworkflow/pyworkflow/nodes/io/read_csv.py b/pyworkflow/pyworkflow/nodes/io/read_csv.py index f829d61..f909809 100644 --- a/pyworkflow/pyworkflow/nodes/io/read_csv.py +++ b/pyworkflow/pyworkflow/nodes/io/read_csv.py @@ -46,3 +46,13 @@ def execute(self, predecessor_data, flow_vars): return df.to_json() except Exception as e: raise NodeException('read csv', str(e)) + + def execute_for_read(self, predecessor_data, flow_vars, file_to_read): + try: + fname = file_to_read + sep = self.options["sep"].get_value() + hdr = self.options["header"].get_value() + df = pd.read_csv(fname, sep=sep, header=hdr) + return df.to_json() + except Exception as e: + raise NodeException('read csv', str(e)) \ No newline at end of file diff --git a/pyworkflow/pyworkflow/workflow.py b/pyworkflow/pyworkflow/workflow.py index 43db599..acdabcb 100644 --- a/pyworkflow/pyworkflow/workflow.py +++ b/pyworkflow/pyworkflow/workflow.py @@ -8,6 +8,8 @@ from collections import OrderedDict from modulefinder import ModuleFinder +from pyworkflow.nodes import ReadCsvNode, WriteCsvNode + from .node import Node, NodeException from .node_factory import node_factory @@ -547,8 +549,60 @@ def to_session_dict(self): except nx.NetworkXError as e: raise WorkflowException('to_session_dict', str(e)) + def execute_read_csv(self, node_id, csv_location): + # TODO: some duplicated code here from execute common method. Need to refactor. + """Execute read_csv from a file specified to standard input. + Current use case: CLI. + """ + preceding_data = list() + flow_vars = list() + node_to_execute = self.get_node(node_id) + if node_to_execute is None: + raise WorkflowException('execute', 'The workflow does not contain node %s' % node_id) + # Pass in data to current Node to use in execution + output = node_to_execute.execute_for_read(preceding_data.append(None), flow_vars.append(None), csv_location) + + # Save new execution data to disk + node_to_execute.data = Workflow.store_node_data(self, node_id, output) + + if node_to_execute.data is None: + raise WorkflowException('execute', 'There was a problem saving node output.') + + return node_to_execute + + def execute_write_csv(self, node_id): + node_to_execute = self.get_node(node_id) + + if node_to_execute is None: + raise WorkflowException('execute', 'The workflow does not contain node %s' % node_id) + + # Load predecessor data and FlowNode values + preceding_data = self.load_input_data(node_to_execute.node_id) + flow_nodes = self.load_flow_nodes(node_to_execute.option_replace) + + try: + # Validate input data, and replace flow variables + node_to_execute.validate_input_data(len(preceding_data)) + execution_options = node_to_execute.get_execution_options(flow_nodes) + + # Pass in data to current Node to use in execution + output = node_to_execute.execute(preceding_data, execution_options) + + #printing the output in order to include in std_out + print(output) + + # Save new execution data to disk + node_to_execute.data = Workflow.store_node_data(self, node_id, output) + except NodeException as e: + raise e + + if node_to_execute.data is None: + raise WorkflowException('execute', 'There was a problem saving node output.') + + return node_to_execute + @staticmethod - def execute_workflow(workflow_location): + def execute_workflow(workflow_location, stdin_files, write_to_stdout): """Execute entire workflow at a certain location. Current use case: CLI. """ @@ -565,7 +619,17 @@ def execute_workflow(workflow_location): #execute each node in the order returned by execution order method #TODO exception handling: stop and provide details on which node failed to execute for node in execution_order: - workflow_instance.execute(node) + if type(workflow_instance.get_node(node)) is ReadCsvNode and len(stdin_files) > 0: + csv_location = stdin_files[0] + executed_node = workflow_instance.execute_read_csv(node, csv_location) + # delete file at index 0 + del stdin_files[0] + elif type(workflow_instance.get_node(node)) is WriteCsvNode and write_to_stdout: + executed_node = workflow_instance.execute_write_csv(node) + else: + executed_node = workflow_instance.execute(node) + + workflow_instance.update_or_add_node(executed_node) class WorkflowUtils: