From b00b766ebd042b33c972cd6c579990c6187f7e27 Mon Sep 17 00:00:00 2001 From: Matt Thomas Date: Fri, 17 Apr 2020 08:36:54 -0400 Subject: [PATCH 1/9] style: Update Node parameters to new Parameter class Made all Parameters one line per argument for readability/version control. Increases number of lines for 'small' parameters, but should make any future changes clearer. --- pyworkflow/pyworkflow/node.py | 194 +++++++++++++++++----------------- 1 file changed, 96 insertions(+), 98 deletions(-) diff --git a/pyworkflow/pyworkflow/node.py b/pyworkflow/pyworkflow/node.py index e30d5eb..e34e9ff 100644 --- a/pyworkflow/pyworkflow/node.py +++ b/pyworkflow/pyworkflow/node.py @@ -50,10 +50,15 @@ class StringNode(FlowNode): color = 'purple' OPTIONS = { - "default_value": StringParameter("Default Value", - docstring="Value this node will pass as a flow variable"), - "var_name": StringParameter("Variable Name", default="my_var", - docstring="Name of the variable to use in another Node") + "default_value": StringParameter( + "Default Value", + docstring="Value this node will pass as a flow variable" + ), + "var_name": StringParameter( + "Variable Name", + default="my_var", + docstring="Name of the variable to use in another Node" + ) } @@ -88,12 +93,22 @@ class ReadCsvNode(IONode): num_out = 1 OPTIONS = { - "file": FileParameter("File", docstring="CSV File"), - "sep": StringParameter("Delimiter", default=",", docstring="Column delimiter"), + "file": FileParameter( + "File", + docstring="CSV File" + ), + "sep": StringParameter( + "Delimiter", + default=",", + docstring="Column delimiter" + ), # user-specified headers are probably integers, but haven't figured out # arguments with multiple possible types - "header": StringParameter("Header Row", default="infer", - docstring="Row number containing column names (0-indexed)"), + "header": StringParameter( + "Header Row", + default="infer", + docstring="Row number containing column names (0-indexed)" + ), } def execute(self, predecessor_data, flow_vars): @@ -128,9 +143,20 @@ class WriteCsvNode(IONode): download_result = True OPTIONS = { - "file": StringParameter("Filename", docstring="CSV file to write"), - "sep": StringParameter("Delimiter", default=",", docstring="Column delimiter"), - "index": BooleanParameter("Write Index", default=True, docstring="Write index as column?"), + "file": StringParameter( + "Filename", + docstring="CSV file to write" + ), + "sep": StringParameter( + "Delimiter", + default=",", + docstring="Column delimiter" + ), + "index": BooleanParameter( + "Write Index", + default=True, + docstring="Write index as column?" + ), } def execute(self, predecessor_data, flow_vars): @@ -174,65 +200,48 @@ class PivotNode(ManipulationNode): num_in = 1 num_out = 3 - DEFAULT_OPTIONS = { - 'index': None, - 'values': None, - 'columns': None, - 'aggfunc': 'mean', - 'fill_value': None, - 'margins': False, - 'dropna': True, - 'margins_name': 'All', - 'observed': False - } - - OPTION_TYPES = { - 'index': { - "type": "column, grouper, array or list", - "name": "Index", - "desc": "Column to aggregate" - }, - 'values': { - "type": "column, grouper, array or list", - "name": "Values", - "desc": "Column name to use to populate new frame's values" - }, - 'columns': { - "type": "column, grouper, array or list", - "name": "Column Name Row", - "desc": "Column(s) to use for populating new frame values.'" - }, - 'aggfunc': { - "type": "function, list of functions, dict, default numpy.mean", - "name": "Aggregation function", - "desc": "Function used for aggregation" - }, - 'fill_value': { - "type": "scalar", - "name": "Fill value", - "desc": "Value to replace missing values with" - }, - 'margins': { - "type": "boolean", - "name": "Margins name", - "desc": "Add all rows/columns" - }, - - 'dropna': { - "type": "boolean", - "name": "Drop NaN columns", - "desc": "Ignore columns with all NaN entries" - }, - 'margins_name': { - "type": "string", - "name": "Margins name", - "desc": "Name of the row/column that will contain the totals when margins is True" - }, - 'observed': { - "type": "string", - "name": "Column Name Row", - "desc": "Row number with column names (0-indexed) or 'infer'" - } + OPTIONS = { + 'index': StringParameter( + 'Index', + docstring='Column to aggregate (column, grouper, array or list)' + ), + 'values': StringParameter( + 'Values', + docstring='Column name to use to populate new frame\'s values (column, grouper, array or list)' + ), + 'columns': StringParameter( + 'Column Name Row', + docstring='Column(s) to use for populating new frame values. (column, grouper, array or list)' + ), + 'aggfunc': StringParameter( + 'Aggregation function', + default='mean', + docstring='Function used for aggregation (function, list of functions, dict, default numpy.mean)' + ), + 'fill_value': StringParameter( + 'Fill value', + docstring='Value to replace missing values with (scalar)' + ), + 'margins': BooleanParameter( + 'Margins name', + default=False, + docstring='Add all rows/columns' + ), + 'dropna': BooleanParameter( + 'Drop NaN columns', + default=True, + docstring='Ignore columns with all NaN entries' + ), + 'margins_name': StringParameter( + 'Margins name', + default='All', + docstring='Name of the row/column that will contain the totals when margins is True' + ), + 'observed': BooleanParameter( + 'Column Name Row', + default=False, + docstring='Row number with column names (0-indexed) or "infer"' + ) } def execute(self, predecessor_data, flow_vars): @@ -273,34 +282,23 @@ class FilterNode(ManipulationNode): num_in = 1 num_out = 1 - DEFAULT_OPTIONS = { - 'items': None, - 'like': None, - 'regex': None, - 'axis': None - } - - OPTION_TYPES = { - 'items': { - "type": "list", - "name": "Items", - "desc": "Keep labels from axis which are in items" - }, - 'like': { - "type": "string", - "name": "Like", - "desc": "Keep labels from axis for which like in label == True." - }, - 'regex': { - "type": "string", - "name": "Regex", - "desc": "Keep labels from axis for which re.search(regex, label) == True." - }, - 'axis': { - "type": "int or string", - "name": "Axis", - "desc": "The axis to filter on." - } + OPTIONS = { + 'items': StringParameter( + 'Items', + docstring='Keep labels from axis which are in items' + ), + 'like': StringParameter( + 'Like', + docstring='Keep labels from axis for which like in label == True.' + ), + 'regex': StringParameter( + 'Regex', + docstring='Keep labels from axis for which re.search(regex, label) == True.' + ), + 'axis': StringParameter( + 'Axis', + docstring='The axis to filter on.' + ) } def __init__(self, node_info, options=dict()): From 8e6fd2eb6b2d7fdac4d08e116caf0e9e5f91347d Mon Sep 17 00:00:00 2001 From: Matt Thomas Date: Fri, 17 Apr 2020 11:39:40 -0400 Subject: [PATCH 2/9] fix: Exception-handling for 'new'; include 'id' key in POST --- front-end/src/API.js | 4 +++- vp/workflow/views.py | 12 ++++++------ 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/front-end/src/API.js b/front-end/src/API.js index f7aea84..cf0346d 100644 --- a/front-end/src/API.js +++ b/front-end/src/API.js @@ -104,7 +104,9 @@ export async function getNodes() { export async function initWorkflow(model) { const options = { method: "POST", - body: JSON.stringify(model.options.id) + body: JSON.stringify({ + "id": model.options.id + }) }; return fetchWrapper("/workflow/new", options); diff --git a/vp/workflow/views.py b/vp/workflow/views.py index a76c28c..f468697 100644 --- a/vp/workflow/views.py +++ b/vp/workflow/views.py @@ -25,14 +25,14 @@ def new_workflow(request): """ try: workflow_id = json.loads(request.body) - except json.JSONDecodeError as e: - return JsonResponse({'No React model ID provided': str(e)}, status=500) - # Create new Workflow - request.pyworkflow = Workflow(name=workflow_id, root_dir=settings.MEDIA_ROOT) - request.session.update(request.pyworkflow.to_session_dict()) + # Create new Workflow + request.pyworkflow = Workflow(name=workflow_id['id'], root_dir=settings.MEDIA_ROOT) + request.session.update(request.pyworkflow.to_session_dict()) - return JsonResponse(Workflow.to_graph_json(request.pyworkflow.graph)) + return JsonResponse(Workflow.to_graph_json(request.pyworkflow.graph)) + except (json.JSONDecodeError, KeyError) as e: + return JsonResponse({'No React model ID provided': str(e)}, status=500) @swagger_auto_schema(method='post', From 092e68914bba80b23fac6ea777fbf45baab1ac66 Mon Sep 17 00:00:00 2001 From: Matt Thomas Date: Fri, 17 Apr 2020 11:49:55 -0400 Subject: [PATCH 3/9] refactor: Node validation checks parameters `validation` defined in parent Node class. It checks all options are valid, raising `ParameterValidationError` if not. Calls to `node.validate()` are removed from `pyworkflow` when updating/adding node or edge between Nodes. A call to `validate` occurs when the POST route is hit for updating a Node to check any changes to the configuration are valid. --- pyworkflow/pyworkflow/node.py | 17 ++++++++++------- pyworkflow/pyworkflow/workflow.py | 13 +------------ vp/node/views.py | 7 ++++++- 3 files changed, 17 insertions(+), 20 deletions(-) diff --git a/pyworkflow/pyworkflow/node.py b/pyworkflow/pyworkflow/node.py index e34e9ff..e8673f7 100644 --- a/pyworkflow/pyworkflow/node.py +++ b/pyworkflow/pyworkflow/node.py @@ -27,7 +27,16 @@ def execute(self, predecessor_data, flow_vars): pass def validate(self): - return True + """Validate Node configuration + + Checks all Node options and validates all Parameter classes using + their validation method. + + Raises: + ParameterValidationError: invalid Parameter value + """ + for option in self.options.values(): + option.validate() def __str__(self): return "Test" @@ -75,9 +84,6 @@ class IONode(Node): def execute(self, predecessor_data, flow_vars): pass - def validate(self): - return True - class ReadCsvNode(IONode): """ReadCsvNode @@ -191,9 +197,6 @@ class ManipulationNode(Node): def execute(self, predecessor_data, flow_vars): pass - def validate(self): - return True - class PivotNode(ManipulationNode): name = "Pivoting" diff --git a/pyworkflow/pyworkflow/workflow.py b/pyworkflow/pyworkflow/workflow.py index 5cd40d5..b7550fd 100644 --- a/pyworkflow/pyworkflow/workflow.py +++ b/pyworkflow/pyworkflow/workflow.py @@ -66,14 +66,7 @@ def update_or_add_node(self, node: Node): Args: node - The Node object to update or add to the graph - - TODO: - * validate() always returns True; this should perform actual validation """ - # Do not add/update if invalid - if node.validate() is False: - raise WorkflowException('update_or_add_node', 'Node is invalid') - # Select the correct graph to modify graph = self.flow_vars if node.is_global else self.graph @@ -112,9 +105,6 @@ def add_edge(self, node_from: Node, node_to: Node): Returns: Tuple representing the new Edge (from, to) - - TODO: - * validate() always returns True; this should perform actual validation """ # Prevent duplicate edges between the same two nodes # TODO: This may be incorrect usage for a `node_to` that has multi-in @@ -124,8 +114,7 @@ def add_edge(self, node_from: Node, node_to: Node): if self._graph.has_edge(from_id, to_id): raise WorkflowException('add_node', 'Edge between nodes already exists.') - if node_from.validate() and node_to.validate(): - self._graph.add_edge(from_id, to_id) + self._graph.add_edge(from_id, to_id) return (from_id, to_id) diff --git a/vp/node/views.py b/vp/node/views.py index 4a27150..3e7e26d 100644 --- a/vp/node/views.py +++ b/vp/node/views.py @@ -2,7 +2,7 @@ from django.http import JsonResponse from django.views.decorators.csrf import csrf_exempt -from pyworkflow import Workflow, WorkflowException, Node, NodeException, node_factory +from pyworkflow import Workflow, WorkflowException, Node, NodeException, node_factory, ParameterValidationError from rest_framework.decorators import api_view from drf_yasg.utils import swagger_auto_schema @@ -190,6 +190,9 @@ def handle_node(request, node_id): 'updated_node': str(updated_node.is_global), }, status=500) + # Validation raises exception if failed + updated_node.validate() + request.pyworkflow.update_or_add_node(updated_node) response = JsonResponse(updated_node.__dict__, safe=False) elif request.method == 'DELETE': @@ -203,6 +206,8 @@ def handle_node(request, node_id): }, status=405) except (NodeException, WorkflowException) as e: return JsonResponse({e.action: e.reason}, status=500) + except ParameterValidationError as e: + return JsonResponse({'message': str(e)}, status=500) return response From 7d5716699383613a3f6631e9db208942d403f71b Mon Sep 17 00:00:00 2001 From: Matt Thomas Date: Fri, 17 Apr 2020 12:19:42 -0400 Subject: [PATCH 4/9] chore: Unify different Node classes Removes `__init__` from FilterNode, changes `execute` to raise `NotImplementedError()` in not present. --- pyworkflow/pyworkflow/node.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/pyworkflow/pyworkflow/node.py b/pyworkflow/pyworkflow/node.py index e8673f7..65d2c3e 100644 --- a/pyworkflow/pyworkflow/node.py +++ b/pyworkflow/pyworkflow/node.py @@ -24,7 +24,7 @@ def __init__(self, node_info): self.option_values.update(node_info["options"]) def execute(self, predecessor_data, flow_vars): - pass + raise NotImplementedError() def validate(self): """Validate Node configuration @@ -43,10 +43,16 @@ def __str__(self): class FlowNode(Node): - """FlowNode object + """FlowNodes object. + + FlowNodes do not execute. They specify a variable name and value to pass + to other Nodes as a way to dynamically change other parameter values. """ display_name = "Flow Control" + def execute(self, predecessor_data, flow_vars): + return + class StringNode(FlowNode): """StringNode object @@ -82,7 +88,7 @@ class IONode(Node): display_name = "I/O" def execute(self, predecessor_data, flow_vars): - pass + raise NotImplementedError() class ReadCsvNode(IONode): @@ -130,9 +136,6 @@ def execute(self, predecessor_data, flow_vars): except Exception as e: raise NodeException('read csv', str(e)) - def __str__(self): - return "ReadCsvNode" - class WriteCsvNode(IONode): """WriteCsvNode @@ -195,7 +198,7 @@ class ManipulationNode(Node): color = 'goldenrod' def execute(self, predecessor_data, flow_vars): - pass + raise NotImplementedError() class PivotNode(ManipulationNode): @@ -304,9 +307,6 @@ class FilterNode(ManipulationNode): ) } - def __init__(self, node_info, options=dict()): - super().__init__(node_info, {**self.DEFAULT_OPTIONS, **options}) - def execute(self, predecessor_data, flow_vars): try: NodeUtils.validate_predecessor_data(len(predecessor_data), self.num_in, self.node_key) From 52c22ea8585ef21158bc6f5422640d474605213f Mon Sep 17 00:00:00 2001 From: Matt Thomas Date: Fri, 17 Apr 2020 13:02:23 -0400 Subject: [PATCH 5/9] refactor: Move `validate_input_data` to Node class All Nodes we have implemented required the defined number of input nodes, so logic can be simplified to whether two values are equal. --- pyworkflow/pyworkflow/node.py | 47 ++++++++++++++--------------------- 1 file changed, 18 insertions(+), 29 deletions(-) diff --git a/pyworkflow/pyworkflow/node.py b/pyworkflow/pyworkflow/node.py index 65d2c3e..a5b5cee 100644 --- a/pyworkflow/pyworkflow/node.py +++ b/pyworkflow/pyworkflow/node.py @@ -38,6 +38,24 @@ def validate(self): for option in self.options.values(): option.validate() + def validate_input_data(self, num_input_data): + """Validate Node input data. + + Checks that input data, if any, matches with required number of input + ports. + + Args: + num_input_data: Number of input data passed in + + Raises: + NodeException on mis-matched input ports/data + """ + if num_input_data != self.num_in: + raise NodeException( + 'execute', + f'{self.node_key} requires {self.num_in} inputs. {num_input_data} were provided' + ) + def __str__(self): return "Test" @@ -125,8 +143,6 @@ class ReadCsvNode(IONode): def execute(self, predecessor_data, flow_vars): try: - # Read CSV needs exactly 0 input DataFrame - NodeUtils.validate_predecessor_data(len(predecessor_data), self.num_in, self.node_key) NodeUtils.replace_flow_vars(self.options, flow_vars) fname = self.options["file"].get_value() sep = self.options["sep"].get_value() @@ -170,9 +186,6 @@ class WriteCsvNode(IONode): def execute(self, predecessor_data, flow_vars): try: - # Write CSV needs exactly 1 input DataFrame - NodeUtils.validate_predecessor_data(len(predecessor_data), self.num_in, self.node_key) - # Convert JSON data to DataFrame df = pd.DataFrame.from_dict(predecessor_data[0]) @@ -252,7 +265,6 @@ class PivotNode(ManipulationNode): def execute(self, predecessor_data, flow_vars): try: - NodeUtils.validate_predecessor_data(len(predecessor_data), self.num_in, self.node_key) input_df = pd.DataFrame.from_dict(predecessor_data[0]) output_df = pd.DataFrame.pivot_table(input_df, **self.options) return output_df.to_json() @@ -270,10 +282,7 @@ class JoinNode(ManipulationNode): } def execute(self, predecessor_data, flow_vars): - # Join cannot accept more than 2 input DataFrames - # TODO: Add more error-checking if 1, or no, DataFrames passed through try: - NodeUtils.validate_predecessor_data(len(predecessor_data), self.num_in, self.node_key) first_df = pd.DataFrame.from_dict(predecessor_data[0]) second_df = pd.DataFrame.from_dict(predecessor_data[1]) combined_df = pd.merge(first_df, second_df, @@ -309,7 +318,6 @@ class FilterNode(ManipulationNode): def execute(self, predecessor_data, flow_vars): try: - NodeUtils.validate_predecessor_data(len(predecessor_data), self.num_in, self.node_key) input_df = pd.DataFrame.from_dict(predecessor_data[0]) output_df = pd.DataFrame.filter(input_df, **self.options) return output_df.to_json() @@ -328,25 +336,6 @@ def __str__(self): class NodeUtils: - FIXED_INPUT_NODES = ['WriteCsvNode', 'FilterNode', 'JoinNode'] # nodes which can only have a fixed number of predecessors - MAX_INPUT_NODES = ['ReadCsvNode'] # nodes for which num_in represents a maximum number of predecessors - - @staticmethod - def validate_predecessor_data(predecessor_data_len, num_in, node_key): - validation_failed = False - exception_txt = "" - if node_key in NodeUtils.FIXED_INPUT_NODES and predecessor_data_len != num_in: - validation_failed = True - exception_txt = '%s needs %d inputs. %d were provided' - elif (node_key in NodeUtils.MAX_INPUT_NODES and predecessor_data_len > num_in): - validation_failed = True - exception_txt = '%s can take up to %d inputs. %d were provided' - - if validation_failed: - raise NodeException( - 'execute', - exception_txt % (node_key, num_in, predecessor_data_len) - ) @staticmethod def replace_flow_vars(node_options, flow_vars): From 8bffc797be963cde8df3974c7cd81067fbe84ca3 Mon Sep 17 00:00:00 2001 From: Matt Thomas Date: Fri, 17 Apr 2020 13:04:03 -0400 Subject: [PATCH 6/9] feat: Global/local flow variable replacement, updated execution --- pyworkflow/pyworkflow/node.py | 75 ++++++++++++++++-------- pyworkflow/pyworkflow/workflow.py | 95 +++++++++++++++++++++++-------- 2 files changed, 121 insertions(+), 49 deletions(-) diff --git a/pyworkflow/pyworkflow/node.py b/pyworkflow/pyworkflow/node.py index a5b5cee..5314c90 100644 --- a/pyworkflow/pyworkflow/node.py +++ b/pyworkflow/pyworkflow/node.py @@ -23,9 +23,41 @@ def __init__(self, node_info): if node_info.get("options"): self.option_values.update(node_info["options"]) + self.option_replace = dict() + if node_info.get("option_replace"): + self.option_replace.update(node_info["option_replace"]) + def execute(self, predecessor_data, flow_vars): raise NotImplementedError() + def replace_flow_vars(self, flow_vars): + """Replace Node options with flow variables. + + If the user has specified any flow variables to replace Node options, + perform the replacement and return a dict with all options to use for + execution. If no flow variables are included, this method will return + a copy of all Node options unchanged. + + Args: + flow_vars: dict of variables to replace options + + Returns: + dict containing options to use for execution + """ + execution_options = dict() + + # TODO: Can we iterate through flow_vars instead? + # If none are included, we can just return `self.options`. + for key, option in self.options.items(): + + if key in flow_vars: + replacement = flow_vars[key].get_replacement_value() + option.set_value(replacement) + + execution_options[key] = option + + return execution_options + def validate(self): """Validate Node configuration @@ -71,6 +103,9 @@ class FlowNode(Node): def execute(self, predecessor_data, flow_vars): return + def get_replacement_value(self): + return self.options['default_value'].get_value() + class StringNode(FlowNode): """StringNode object @@ -143,11 +178,11 @@ class ReadCsvNode(IONode): def execute(self, predecessor_data, flow_vars): try: - NodeUtils.replace_flow_vars(self.options, flow_vars) - fname = self.options["file"].get_value() - sep = self.options["sep"].get_value() - hdr = self.options["header"].get_value() - df = pd.read_csv(fname, sep=sep, header=hdr) + df = pd.read_csv( + flow_vars["file"].get_value(), + sep=flow_vars["sep"].get_value(), + header=flow_vars["header"].get_value() + ) return df.to_json() except Exception as e: raise NodeException('read csv', str(e)) @@ -190,10 +225,11 @@ def execute(self, predecessor_data, flow_vars): df = pd.DataFrame.from_dict(predecessor_data[0]) # Write to CSV and save - fname = self.options["file"].get_value() - sep = self.options["sep"].get_value() - index = self.options["index"].get_value() - df.to_csv(fname, sep=sep, index=index) + df.to_csv( + flow_vars["file"].get_value(), + sep=flow_vars["sep"].get_value(), + index=flow_vars["index"].get_value() + ) return df.to_json() except Exception as e: raise NodeException('write csv', str(e)) @@ -285,8 +321,11 @@ def execute(self, predecessor_data, flow_vars): try: first_df = pd.DataFrame.from_dict(predecessor_data[0]) second_df = pd.DataFrame.from_dict(predecessor_data[1]) - combined_df = pd.merge(first_df, second_df, - on=self.options["on"].get_value()) + combined_df = pd.merge( + first_df, + second_df, + on=flow_vars["on"].get_value() + ) return combined_df.to_json() except Exception as e: raise NodeException('join', str(e)) @@ -332,17 +371,3 @@ def __init__(self, action: str, reason: str): def __str__(self): return self.action + ': ' + self.reason - - -class NodeUtils: - - - @staticmethod - def replace_flow_vars(node_options, flow_vars): - # TODO: this will no longer work with the Node.options descriptor, - # which uses Node.option_values to populate the Parameter - # class values upon access - for var in flow_vars: - node_options[var['var_name']] = var['default_value'] - - return diff --git a/pyworkflow/pyworkflow/workflow.py b/pyworkflow/pyworkflow/workflow.py index b7550fd..4bf2644 100644 --- a/pyworkflow/pyworkflow/workflow.py +++ b/pyworkflow/pyworkflow/workflow.py @@ -2,7 +2,7 @@ import networkx as nx import json -from .node import Node +from .node import Node, NodeException from .node_factory import node_factory @@ -55,7 +55,7 @@ def get_flow_var(self, node_id): Return: FlowNode object, if one exists. Otherwise, None. """ - if self._flow_vars.has_node(node_id) is not True: + if self.flow_vars.has_node(node_id) is not True: return None node_info = self.flow_vars.nodes[node_id] @@ -180,37 +180,84 @@ def execute(self, node_id): if node_to_execute is None: raise WorkflowException('execute', 'The workflow does not contain node %s' % node_id) - # Read in any data from predecessor/flow nodes - # TODO: This should work for local FlowNodes, but global flow_vars still - # need a way to be assigned/replace Node options - preceding_data = list() - flow_vars = list() - for predecessor in self.get_node_predecessors(node_id): - try: - node_to_retrieve = self.get_node(predecessor) + # Load predecessor data and FlowNode values + preceding_data = self.load_input_data(node_to_execute.node_id) + flow_nodes = self.load_flow_var_values(node_to_execute.option_replace) - if node_to_retrieve is None: - raise WorkflowException('retrieve node data', 'The workflow does not contain node %s' % node_id) + try: + # Validate input data, and replace flow variables + node_to_execute.validate_input_data(len(preceding_data)) + execution_options = node_to_execute.replace_flow_vars(flow_nodes) + + # Pass in data to current Node to use in execution + output = node_to_execute.execute(preceding_data, execution_options) + + # Save new execution data to disk + node_to_execute.data = Workflow.store_node_data(self, node_id, output) + except NodeException as e: + raise e + + if node_to_execute.data is None: + raise WorkflowException('execute', 'There was a problem saving node output.') + + return node_to_execute + + def load_flow_var_values(self, options): + """Construct dict of FlowNodes indexed by option name. + + Args: + options: Flow variables user selected for a given Node. + + Returns: + dict of FlowNode objects, indexed by the option name. + """ + flow_vars = dict() + + for key, option in options.items(): + try: + flow_node_id = option["node_id"] - if node_to_retrieve.node_type == 'FlowNode': - flow_vars.append(node_to_retrieve.options) + if option["is_global"]: + flow_node = self.get_flow_var(flow_node_id) else: - preceding_data.append(self.retrieve_node_data(node_to_retrieve)) + flow_node = self.get_node(flow_node_id) + if flow_node is None or flow_node.node_type != 'FlowNode': + raise WorkflowException('load flow vars', 'The workflow does not contain FlowNode %s' % flow_node_id) + + flow_vars[key] = flow_node except WorkflowException: - # TODO: Should this append None, skip reading, or raise exception to view? - preceding_data.append(None) + # TODO: Should this add a blank value, skip reading, or raise exception to view? + continue - # Pass in data to current Node to use in execution - output = node_to_execute.execute(preceding_data, flow_vars) + return flow_vars - # Save new execution data to disk - node_to_execute.data = Workflow.store_node_data(self, node_id, output) + def load_input_data(self, node_id): + """Construct list of predecessor DataFrames - if node_to_execute.data is None: - raise WorkflowException('execute', 'There was a problem saving node output.') + Args: + node_id: The Node with predecessors - return node_to_execute + Returns: + list of dict-like DataFrames, used for Node execution + """ + input_data = list() + + for predecessor_id in self.get_node_predecessors(node_id): + try: + node_to_retrieve = self.get_node(predecessor_id) + + if node_to_retrieve is None: + raise WorkflowException('retrieve node data', 'The workflow does not contain node %s' % predecessor_id) + + if node_to_retrieve.node_type != 'FlowNode': + input_data.append(self.retrieve_node_data(node_to_retrieve)) + + except WorkflowException: + # TODO: Should this append None, skip reading, or raise exception to view? + continue + + return input_data def execution_order(self): try: From 011516148d20806093d1522aa1331b9c4c66c525 Mon Sep 17 00:00:00 2001 From: Matt Thomas Date: Fri, 17 Apr 2020 13:19:15 -0400 Subject: [PATCH 7/9] doc: Described new flow methods, clearer variable names --- pyworkflow/pyworkflow/node.py | 9 ++++----- pyworkflow/pyworkflow/workflow.py | 33 +++++++++++++++++++++++-------- 2 files changed, 29 insertions(+), 13 deletions(-) diff --git a/pyworkflow/pyworkflow/node.py b/pyworkflow/pyworkflow/node.py index 5314c90..eaf7acd 100644 --- a/pyworkflow/pyworkflow/node.py +++ b/pyworkflow/pyworkflow/node.py @@ -30,7 +30,7 @@ def __init__(self, node_info): def execute(self, predecessor_data, flow_vars): raise NotImplementedError() - def replace_flow_vars(self, flow_vars): + def get_execution_options(self, flow_nodes): """Replace Node options with flow variables. If the user has specified any flow variables to replace Node options, @@ -39,7 +39,7 @@ def replace_flow_vars(self, flow_vars): a copy of all Node options unchanged. Args: - flow_vars: dict of variables to replace options + flow_nodes: dict of FlowNodes used to replace options Returns: dict containing options to use for execution @@ -50,9 +50,8 @@ def replace_flow_vars(self, flow_vars): # If none are included, we can just return `self.options`. for key, option in self.options.items(): - if key in flow_vars: - replacement = flow_vars[key].get_replacement_value() - option.set_value(replacement) + if key in flow_nodes: + option.set_value(flow_nodes[key].get_replacement_value()) execution_options[key] = option diff --git a/pyworkflow/pyworkflow/workflow.py b/pyworkflow/pyworkflow/workflow.py index 4bf2644..380362a 100644 --- a/pyworkflow/pyworkflow/workflow.py +++ b/pyworkflow/pyworkflow/workflow.py @@ -182,12 +182,12 @@ def execute(self, node_id): # Load predecessor data and FlowNode values preceding_data = self.load_input_data(node_to_execute.node_id) - flow_nodes = self.load_flow_var_values(node_to_execute.option_replace) + flow_nodes = self.load_flow_nodes(node_to_execute.option_replace) try: # Validate input data, and replace flow variables node_to_execute.validate_input_data(len(preceding_data)) - execution_options = node_to_execute.replace_flow_vars(flow_nodes) + execution_options = node_to_execute.get_execution_options(flow_nodes) # Pass in data to current Node to use in execution output = node_to_execute.execute(preceding_data, execution_options) @@ -202,18 +202,32 @@ def execute(self, node_id): return node_to_execute - def load_flow_var_values(self, options): + def load_flow_nodes(self, option_replace): """Construct dict of FlowNodes indexed by option name. + During Node configuration, the user has the option to select FlowNodes + to replace a given parameter value. A FlowNode selection is structured + like the following JSON: + + "option_replace": { + "sep": { + "node_id": id, + "is_global": true + } + } + + This method retrieves the specified FlowNode where the replacement value + can then be retrieved. + Args: - options: Flow variables user selected for a given Node. + option_replace: Flow variables user selected for a given Node. Returns: dict of FlowNode objects, indexed by the option name. """ - flow_vars = dict() + flow_nodes = dict() - for key, option in options.items(): + for key, option in option_replace.items(): try: flow_node_id = option["node_id"] @@ -225,16 +239,19 @@ def load_flow_var_values(self, options): if flow_node is None or flow_node.node_type != 'FlowNode': raise WorkflowException('load flow vars', 'The workflow does not contain FlowNode %s' % flow_node_id) - flow_vars[key] = flow_node + flow_nodes[key] = flow_node except WorkflowException: # TODO: Should this add a blank value, skip reading, or raise exception to view? continue - return flow_vars + return flow_nodes def load_input_data(self, node_id): """Construct list of predecessor DataFrames + Retrieves the data file for all of a Node's predecessors. Ignores + exceptions for missing Nodes/data as this is checked prior to execution. + Args: node_id: The Node with predecessors From 3926e68ac82052d64a4a5842848d662b54af5ff1 Mon Sep 17 00:00:00 2001 From: Matt Thomas Date: Fri, 17 Apr 2020 13:59:36 -0400 Subject: [PATCH 8/9] feat: Make Nodes JSON-serializable --- pyworkflow/pyworkflow/node.py | 12 ++++++++++++ vp/node/views.py | 3 +-- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/pyworkflow/pyworkflow/node.py b/pyworkflow/pyworkflow/node.py index eaf7acd..e447899 100644 --- a/pyworkflow/pyworkflow/node.py +++ b/pyworkflow/pyworkflow/node.py @@ -87,6 +87,18 @@ def validate_input_data(self, num_input_data): f'{self.node_key} requires {self.num_in} inputs. {num_input_data} were provided' ) + def to_json(self): + return { + "name": self.name, + "node_id": self.node_id, + "node_type": self.node_type, + "node_key": self.node_key, + "data": self.data, + "is_global": self.is_global, + "option_values": self.option_values, + "option_replace": self.option_replace, + } + def __str__(self): return "Test" diff --git a/vp/node/views.py b/vp/node/views.py index 3e7e26d..e33e58c 100644 --- a/vp/node/views.py +++ b/vp/node/views.py @@ -167,7 +167,6 @@ def handle_node(request, node_id): }, status=404) # Process request - # Node class not JSON serializable; pass __dict__ to response for display try: if request.method == 'GET': response = JsonResponse(retrieved_node.__dict__, safe=False) @@ -194,7 +193,7 @@ def handle_node(request, node_id): updated_node.validate() request.pyworkflow.update_or_add_node(updated_node) - response = JsonResponse(updated_node.__dict__, safe=False) + response = JsonResponse(updated_node.to_json(), safe=False) elif request.method == 'DELETE': request.pyworkflow.remove_node(retrieved_node) response = JsonResponse({ From 3f9f9b1dc2633ca82ea49083db34a1ac1d3ef8d9 Mon Sep 17 00:00:00 2001 From: Matt Thomas Date: Fri, 17 Apr 2020 13:59:54 -0400 Subject: [PATCH 9/9] feat: Retrieve list of all FlowNode options for a given Node config --- pyworkflow/pyworkflow/workflow.py | 25 +++++++++++++++++++++++++ vp/node/views.py | 5 ++++- 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/pyworkflow/pyworkflow/workflow.py b/pyworkflow/pyworkflow/workflow.py index 380362a..3165a94 100644 --- a/pyworkflow/pyworkflow/workflow.py +++ b/pyworkflow/pyworkflow/workflow.py @@ -61,6 +61,31 @@ def get_flow_var(self, node_id): node_info = self.flow_vars.nodes[node_id] return node_factory(node_info) + def get_all_flow_var_options(self, node_id): + """Retrieve all FlowNode options for a specified Node. + + A Node can use all global FlowNodes, and any connected local FlowNodes + for variable substitution. + + Args: + node_id: The Node to GET + + Returns: + list of all FlowNode objects, converted to JSON + """ + # Add global FlowNodes + graph_data = Workflow.to_graph_json(self.flow_vars) + flow_variables = graph_data['nodes'] + + # Append local FlowNodes + for predecessor_id in self.get_node_predecessors(node_id): + node = self.get_node(predecessor_id) + + if node.node_type == 'FlowNode': + flow_variables.append(node.to_json()) + + return flow_variables + def update_or_add_node(self, node: Node): """ Update or add a Node object to the graph. diff --git a/vp/node/views.py b/vp/node/views.py index e33e58c..740c85a 100644 --- a/vp/node/views.py +++ b/vp/node/views.py @@ -169,7 +169,10 @@ def handle_node(request, node_id): # Process request try: if request.method == 'GET': - response = JsonResponse(retrieved_node.__dict__, safe=False) + response = JsonResponse({ + "retrieved_node": retrieved_node.to_json(), + "flow_variables": request.pyworkflow.get_all_flow_var_options(node_id), + }, safe=False) elif request.method == 'POST': updated_node = create_node(request)