diff --git a/back-end/CLI/cli.py b/back-end/CLI/cli.py index 48f663e..592c14f 100644 --- a/back-end/CLI/cli.py +++ b/back-end/CLI/cli.py @@ -1,64 +1,123 @@ -import sys - import click -import os -import uuid +import json -from pyworkflow import Workflow +from pyworkflow import Workflow, WorkflowException from pyworkflow import NodeException +from pyworkflow.nodes import ReadCsvNode, WriteCsvNode class Config(object): def __init__(self): self.verbose = False + pass_config = click.make_pass_decorator(Config, ensure=True) + @click.group() def cli(): pass @cli.command() -@click.argument('filename', type=click.Path(exists=True), nargs=-1) +@click.argument('filenames', type=click.Path(exists=True), nargs=-1) @click.option('--verbose', is_flag=True, help='Enables verbose mode.') -def execute(filename, verbose): +def execute(filenames, verbose): + """Execute Workflow file(s).""" + # Check whether to log to terminal, or redirect output + log = click.get_text_stream('stdout').isatty() - write_to_stdout = not click.get_text_stream('stdout').isatty() + # Execute each workflow in the args + for workflow_file in filenames: - #execute each one of the workflows in the ar - for workflow_file in filename: + if workflow_file is None: + click.echo('Please specify a workflow to run', err=True) + return - stdin_files = [] + if log: + click.echo('Loading workflow file from %s' % workflow_file) - if not click.get_text_stream('stdin').isatty(): - stdin_text = click.get_text_stream('stdin') + try: + workflow = open_workflow(workflow_file) + execute_workflow(workflow, log, verbose) + except OSError as e: + click.echo(f"Issues loading workflow file: {e}", err=True) + except WorkflowException as e: + click.echo(f"Issues during workflow execution\n{e}", err=True) + + +def execute_workflow(workflow, log, verbose): + """Execute a workflow file, node-by-node. + + Retrieves the execution order from the Workflow and iterates through nodes. + If any I/O nodes are present AND stdin/stdout redirection is provided in the + command-line, overwrite the stored options and then replace before saving. + + Args: + workflow - Workflow object loaded from file + log - True, for outputting to terminal; False for stdout redirection + verbose - True, for outputting debug information; False otherwise + """ + execution_order = workflow.execution_order() + + # Execute each node in the order returned by the Workflow + for node in execution_order: + try: + node_to_execute = workflow.get_node(node) + original_file_option = pre_execute(workflow, node_to_execute, log) - # write standard in to a new file in local filesystem - file_name = str(uuid.uuid4()) + if verbose: + print('Executing node of type ' + str(type(node_to_execute))) - # TODO small issue here, might be better to upload this file to the workflow directory instead of cwd - new_file_path = os.path.join(os.getcwd(), file_name) + # perform execution + executed_node = workflow.execute(node) - # read from std in and upload a new file in project directory - with open(new_file_path, 'w') as f: - f.write(stdin_text.read()) + # If file was replaced with stdin/stdout, restore original option + if original_file_option is not None: + executed_node.option_values["file"] = original_file_option - stdin_files.append(file_name) + # Update Node in Workflow with changes (saved data file) + workflow.update_or_add_node(executed_node) + except NodeException as e: + click.echo(f"Issues during node execution\n{e}", err=True) - if workflow_file is None: - click.echo('Please specify a workflow to run') - return - try: - if not write_to_stdout: - click.echo('Loading workflow file from %s' % workflow_file) + if verbose: + click.echo('Completed workflow execution!') - Workflow.execute_workflow(workflow_file, stdin_files, write_to_stdout, verbose) - if verbose: - click.echo('Completed workflow execution!') +def pre_execute(workflow, node_to_execute, log): + """Pre-execution steps, to overwrite file options with stdin/stdout. + + If stdin is not a tty, and the Node is ReadCsv, replace file with buffer. + If stdout is not a tty, and the Node is WriteCsv, replace file with buffer. + + Args: + workflow - Workflow object loaded from file + node_to_execute - The Node to execute + log - True, for outputting to terminal; False for stdout redirection + """ + stdin = click.get_text_stream('stdin') + + if type(node_to_execute) is ReadCsvNode and not stdin.isatty(): + new_file_location = stdin + elif type(node_to_execute) is WriteCsvNode and not log: + new_file_location = click.get_text_stream('stdout') + else: + # No file redirection needed + return None + + # save original file info + original_file_option = node_to_execute.option_values["file"] + + # replace with value from stdin and save + node_to_execute.option_values["file"] = new_file_location + workflow.update_or_add_node(node_to_execute) + + return original_file_option + +def open_workflow(workflow_file): + with open(workflow_file) as f: + json_content = json.load(f) - except NodeException as ne: - click.echo("Issues during node execution") - click.echo(ne) + return Workflow.from_json(json_content['pyworkflow']) diff --git a/back-end/pyworkflow/pyworkflow/node.py b/back-end/pyworkflow/pyworkflow/node.py index 898fe1c..ee69f16 100644 --- a/back-end/pyworkflow/pyworkflow/node.py +++ b/back-end/pyworkflow/pyworkflow/node.py @@ -1,5 +1,5 @@ from .parameters import * - +import io class Node: """Node object @@ -58,7 +58,11 @@ def get_execution_options(self, workflow, flow_nodes): else: replacement_value = option.get_value() - if key == 'file': + if key == 'file' and type(replacement_value) == io.TextIOWrapper: + # For files specified via stdin/stdout, store directly + option.set_value(replacement_value) + elif key == 'file': + # Otherwise, point to filepath stored in Workflow directory option.set_value(workflow.path(replacement_value)) execution_options[key] = option diff --git a/back-end/pyworkflow/pyworkflow/nodes/io/read_csv.py b/back-end/pyworkflow/pyworkflow/nodes/io/read_csv.py index f909809..f829d61 100644 --- a/back-end/pyworkflow/pyworkflow/nodes/io/read_csv.py +++ b/back-end/pyworkflow/pyworkflow/nodes/io/read_csv.py @@ -46,13 +46,3 @@ def execute(self, predecessor_data, flow_vars): return df.to_json() except Exception as e: raise NodeException('read csv', str(e)) - - def execute_for_read(self, predecessor_data, flow_vars, file_to_read): - try: - fname = file_to_read - sep = self.options["sep"].get_value() - hdr = self.options["header"].get_value() - df = pd.read_csv(fname, sep=sep, header=hdr) - return df.to_json() - except Exception as e: - raise NodeException('read csv', str(e)) \ No newline at end of file diff --git a/back-end/pyworkflow/pyworkflow/workflow.py b/back-end/pyworkflow/pyworkflow/workflow.py index aa561e3..91100e9 100644 --- a/back-end/pyworkflow/pyworkflow/workflow.py +++ b/back-end/pyworkflow/pyworkflow/workflow.py @@ -556,91 +556,6 @@ def to_session_dict(self): except nx.NetworkXError as e: raise WorkflowException('to_session_dict', str(e)) - def execute_read_csv(self, node_id, csv_location): - # TODO: some duplicated code here from execute common method. Need to refactor. - """Execute read_csv from a file specified to standard input. - Current use case: CLI. - """ - preceding_data = list() - flow_vars = list() - node_to_execute = self.get_node(node_id) - if node_to_execute is None: - raise WorkflowException('execute', 'The workflow does not contain node %s' % node_id) - # Pass in data to current Node to use in execution - output = node_to_execute.execute_for_read(preceding_data.append(None), flow_vars.append(None), csv_location) - - # Save new execution data to disk - node_to_execute.data = Workflow.store_node_data(self, node_id, output) - - if node_to_execute.data is None: - raise WorkflowException('execute', 'There was a problem saving node output.') - - return node_to_execute - - def execute_write_csv(self, node_id): - node_to_execute = self.get_node(node_id) - - if node_to_execute is None: - raise WorkflowException('execute', 'The workflow does not contain node %s' % node_id) - - # Load predecessor data and FlowNode values - preceding_data = self.load_input_data(node_to_execute.node_id) - flow_nodes = self.load_flow_nodes(node_to_execute.option_replace) - - try: - # Validate input data, and replace flow variables - node_to_execute.validate_input_data(len(preceding_data)) - execution_options = node_to_execute.get_execution_options(self, flow_nodes) - # Pass in data to current Node to use in execution - output = node_to_execute.execute(preceding_data, execution_options) - - #printing the output in order to include in std_out - print(output) - - # Save new execution data to disk - node_to_execute.data = Workflow.store_node_data(self, node_id, output) - except NodeException as e: - raise e - - if node_to_execute.data is None: - raise WorkflowException('execute', 'There was a problem saving node output.') - - return node_to_execute - - @staticmethod - def execute_workflow(workflow_location, stdin_files, write_to_stdout, verbose_mode): - """Execute entire workflow at a certain location. - Current use case: CLI. - """ - #load the file at workflow_location - with open(workflow_location) as f: - json_content = json.load(f) - - #convert it to a workflow - workflow_instance = Workflow.from_json(json_content['pyworkflow']) - - #get the execution order - execution_order = workflow_instance.execution_order() - - #execute each node in the order returned by execution order method - #TODO exception handling: stop and provide details on which node failed to execute - for node in execution_order: - - if verbose_mode: - print('Executing node of type ' + str(type(workflow_instance.get_node(node)))) - - if type(workflow_instance.get_node(node)) is ReadCsvNode and len(stdin_files) > 0: - csv_location = stdin_files[0] - executed_node = workflow_instance.execute_read_csv(node, csv_location) - # delete file at index 0 - del stdin_files[0] - elif type(workflow_instance.get_node(node)) is WriteCsvNode and write_to_stdout: - executed_node = workflow_instance.execute_write_csv(node) - else: - executed_node = workflow_instance.execute(node) - - workflow_instance.update_or_add_node(executed_node) - class WorkflowUtils: @staticmethod