diff --git a/front-end/src/API.js b/front-end/src/API.js index 392b52a..9cfc46f 100644 --- a/front-end/src/API.js +++ b/front-end/src/API.js @@ -107,7 +107,9 @@ export async function getNodes() { export async function initWorkflow(model) { const options = { method: "POST", - body: JSON.stringify(model.options.id) + body: JSON.stringify({ + "id": model.options.id + }) }; return fetchWrapper("/workflow/new", options); diff --git a/pyworkflow/pyworkflow/node.py b/pyworkflow/pyworkflow/node.py index e30d5eb..e447899 100644 --- a/pyworkflow/pyworkflow/node.py +++ b/pyworkflow/pyworkflow/node.py @@ -23,21 +23,100 @@ def __init__(self, node_info): if node_info.get("options"): self.option_values.update(node_info["options"]) + self.option_replace = dict() + if node_info.get("option_replace"): + self.option_replace.update(node_info["option_replace"]) + def execute(self, predecessor_data, flow_vars): - pass + raise NotImplementedError() + + def get_execution_options(self, flow_nodes): + """Replace Node options with flow variables. + + If the user has specified any flow variables to replace Node options, + perform the replacement and return a dict with all options to use for + execution. If no flow variables are included, this method will return + a copy of all Node options unchanged. + + Args: + flow_nodes: dict of FlowNodes used to replace options + + Returns: + dict containing options to use for execution + """ + execution_options = dict() + + # TODO: Can we iterate through flow_vars instead? + # If none are included, we can just return `self.options`. + for key, option in self.options.items(): + + if key in flow_nodes: + option.set_value(flow_nodes[key].get_replacement_value()) + + execution_options[key] = option + + return execution_options def validate(self): - return True + """Validate Node configuration + + Checks all Node options and validates all Parameter classes using + their validation method. + + Raises: + ParameterValidationError: invalid Parameter value + """ + for option in self.options.values(): + option.validate() + + def validate_input_data(self, num_input_data): + """Validate Node input data. + + Checks that input data, if any, matches with required number of input + ports. + + Args: + num_input_data: Number of input data passed in + + Raises: + NodeException on mis-matched input ports/data + """ + if num_input_data != self.num_in: + raise NodeException( + 'execute', + f'{self.node_key} requires {self.num_in} inputs. {num_input_data} were provided' + ) + + def to_json(self): + return { + "name": self.name, + "node_id": self.node_id, + "node_type": self.node_type, + "node_key": self.node_key, + "data": self.data, + "is_global": self.is_global, + "option_values": self.option_values, + "option_replace": self.option_replace, + } def __str__(self): return "Test" class FlowNode(Node): - """FlowNode object + """FlowNodes object. + + FlowNodes do not execute. They specify a variable name and value to pass + to other Nodes as a way to dynamically change other parameter values. """ display_name = "Flow Control" + def execute(self, predecessor_data, flow_vars): + return + + def get_replacement_value(self): + return self.options['default_value'].get_value() + class StringNode(FlowNode): """StringNode object @@ -50,10 +129,15 @@ class StringNode(FlowNode): color = 'purple' OPTIONS = { - "default_value": StringParameter("Default Value", - docstring="Value this node will pass as a flow variable"), - "var_name": StringParameter("Variable Name", default="my_var", - docstring="Name of the variable to use in another Node") + "default_value": StringParameter( + "Default Value", + docstring="Value this node will pass as a flow variable" + ), + "var_name": StringParameter( + "Variable Name", + default="my_var", + docstring="Name of the variable to use in another Node" + ) } @@ -68,10 +152,7 @@ class IONode(Node): display_name = "I/O" def execute(self, predecessor_data, flow_vars): - pass - - def validate(self): - return True + raise NotImplementedError() class ReadCsvNode(IONode): @@ -88,30 +169,35 @@ class ReadCsvNode(IONode): num_out = 1 OPTIONS = { - "file": FileParameter("File", docstring="CSV File"), - "sep": StringParameter("Delimiter", default=",", docstring="Column delimiter"), + "file": FileParameter( + "File", + docstring="CSV File" + ), + "sep": StringParameter( + "Delimiter", + default=",", + docstring="Column delimiter" + ), # user-specified headers are probably integers, but haven't figured out # arguments with multiple possible types - "header": StringParameter("Header Row", default="infer", - docstring="Row number containing column names (0-indexed)"), + "header": StringParameter( + "Header Row", + default="infer", + docstring="Row number containing column names (0-indexed)" + ), } def execute(self, predecessor_data, flow_vars): try: - # Read CSV needs exactly 0 input DataFrame - NodeUtils.validate_predecessor_data(len(predecessor_data), self.num_in, self.node_key) - NodeUtils.replace_flow_vars(self.options, flow_vars) - fname = self.options["file"].get_value() - sep = self.options["sep"].get_value() - hdr = self.options["header"].get_value() - df = pd.read_csv(fname, sep=sep, header=hdr) + df = pd.read_csv( + flow_vars["file"].get_value(), + sep=flow_vars["sep"].get_value(), + header=flow_vars["header"].get_value() + ) return df.to_json() except Exception as e: raise NodeException('read csv', str(e)) - def __str__(self): - return "ReadCsvNode" - class WriteCsvNode(IONode): """WriteCsvNode @@ -128,24 +214,33 @@ class WriteCsvNode(IONode): download_result = True OPTIONS = { - "file": StringParameter("Filename", docstring="CSV file to write"), - "sep": StringParameter("Delimiter", default=",", docstring="Column delimiter"), - "index": BooleanParameter("Write Index", default=True, docstring="Write index as column?"), + "file": StringParameter( + "Filename", + docstring="CSV file to write" + ), + "sep": StringParameter( + "Delimiter", + default=",", + docstring="Column delimiter" + ), + "index": BooleanParameter( + "Write Index", + default=True, + docstring="Write index as column?" + ), } def execute(self, predecessor_data, flow_vars): try: - # Write CSV needs exactly 1 input DataFrame - NodeUtils.validate_predecessor_data(len(predecessor_data), self.num_in, self.node_key) - # Convert JSON data to DataFrame df = pd.DataFrame.from_dict(predecessor_data[0]) # Write to CSV and save - fname = self.options["file"].get_value() - sep = self.options["sep"].get_value() - index = self.options["index"].get_value() - df.to_csv(fname, sep=sep, index=index) + df.to_csv( + flow_vars["file"].get_value(), + sep=flow_vars["sep"].get_value(), + index=flow_vars["index"].get_value() + ) return df.to_json() except Exception as e: raise NodeException('write csv', str(e)) @@ -163,10 +258,7 @@ class ManipulationNode(Node): color = 'goldenrod' def execute(self, predecessor_data, flow_vars): - pass - - def validate(self): - return True + raise NotImplementedError() class PivotNode(ManipulationNode): @@ -174,70 +266,52 @@ class PivotNode(ManipulationNode): num_in = 1 num_out = 3 - DEFAULT_OPTIONS = { - 'index': None, - 'values': None, - 'columns': None, - 'aggfunc': 'mean', - 'fill_value': None, - 'margins': False, - 'dropna': True, - 'margins_name': 'All', - 'observed': False - } - - OPTION_TYPES = { - 'index': { - "type": "column, grouper, array or list", - "name": "Index", - "desc": "Column to aggregate" - }, - 'values': { - "type": "column, grouper, array or list", - "name": "Values", - "desc": "Column name to use to populate new frame's values" - }, - 'columns': { - "type": "column, grouper, array or list", - "name": "Column Name Row", - "desc": "Column(s) to use for populating new frame values.'" - }, - 'aggfunc': { - "type": "function, list of functions, dict, default numpy.mean", - "name": "Aggregation function", - "desc": "Function used for aggregation" - }, - 'fill_value': { - "type": "scalar", - "name": "Fill value", - "desc": "Value to replace missing values with" - }, - 'margins': { - "type": "boolean", - "name": "Margins name", - "desc": "Add all rows/columns" - }, - - 'dropna': { - "type": "boolean", - "name": "Drop NaN columns", - "desc": "Ignore columns with all NaN entries" - }, - 'margins_name': { - "type": "string", - "name": "Margins name", - "desc": "Name of the row/column that will contain the totals when margins is True" - }, - 'observed': { - "type": "string", - "name": "Column Name Row", - "desc": "Row number with column names (0-indexed) or 'infer'" - } + OPTIONS = { + 'index': StringParameter( + 'Index', + docstring='Column to aggregate (column, grouper, array or list)' + ), + 'values': StringParameter( + 'Values', + docstring='Column name to use to populate new frame\'s values (column, grouper, array or list)' + ), + 'columns': StringParameter( + 'Column Name Row', + docstring='Column(s) to use for populating new frame values. (column, grouper, array or list)' + ), + 'aggfunc': StringParameter( + 'Aggregation function', + default='mean', + docstring='Function used for aggregation (function, list of functions, dict, default numpy.mean)' + ), + 'fill_value': StringParameter( + 'Fill value', + docstring='Value to replace missing values with (scalar)' + ), + 'margins': BooleanParameter( + 'Margins name', + default=False, + docstring='Add all rows/columns' + ), + 'dropna': BooleanParameter( + 'Drop NaN columns', + default=True, + docstring='Ignore columns with all NaN entries' + ), + 'margins_name': StringParameter( + 'Margins name', + default='All', + docstring='Name of the row/column that will contain the totals when margins is True' + ), + 'observed': BooleanParameter( + 'Column Name Row', + default=False, + docstring='Row number with column names (0-indexed) or "infer"' + ) } def execute(self, predecessor_data, flow_vars): try: - NodeUtils.validate_predecessor_data(len(predecessor_data), self.num_in, self.node_key) input_df = pd.DataFrame.from_dict(predecessor_data[0]) output_df = pd.DataFrame.pivot_table(input_df, **self.options) return output_df.to_json() @@ -255,14 +329,14 @@ class JoinNode(ManipulationNode): } def execute(self, predecessor_data, flow_vars): - # Join cannot accept more than 2 input DataFrames - # TODO: Add more error-checking if 1, or no, DataFrames passed through try: - NodeUtils.validate_predecessor_data(len(predecessor_data), self.num_in, self.node_key) first_df = pd.DataFrame.from_dict(predecessor_data[0]) second_df = pd.DataFrame.from_dict(predecessor_data[1]) - combined_df = pd.merge(first_df, second_df, - on=self.options["on"].get_value()) + combined_df = pd.merge( + first_df, + second_df, + on=flow_vars["on"].get_value() + ) return combined_df.to_json() except Exception as e: raise NodeException('join', str(e)) @@ -273,42 +347,27 @@ class FilterNode(ManipulationNode): num_in = 1 num_out = 1 - DEFAULT_OPTIONS = { - 'items': None, - 'like': None, - 'regex': None, - 'axis': None - } - - OPTION_TYPES = { - 'items': { - "type": "list", - "name": "Items", - "desc": "Keep labels from axis which are in items" - }, - 'like': { - "type": "string", - "name": "Like", - "desc": "Keep labels from axis for which like in label == True." - }, - 'regex': { - "type": "string", - "name": "Regex", - "desc": "Keep labels from axis for which re.search(regex, label) == True." - }, - 'axis': { - "type": "int or string", - "name": "Axis", - "desc": "The axis to filter on." - } + OPTIONS = { + 'items': StringParameter( + 'Items', + docstring='Keep labels from axis which are in items' + ), + 'like': StringParameter( + 'Like', + docstring='Keep labels from axis for which like in label == True.' + ), + 'regex': StringParameter( + 'Regex', + docstring='Keep labels from axis for which re.search(regex, label) == True.' + ), + 'axis': StringParameter( + 'Axis', + docstring='The axis to filter on.' + ) } - def __init__(self, node_info, options=dict()): - super().__init__(node_info, {**self.DEFAULT_OPTIONS, **options}) - def execute(self, predecessor_data, flow_vars): try: - NodeUtils.validate_predecessor_data(len(predecessor_data), self.num_in, self.node_key) input_df = pd.DataFrame.from_dict(predecessor_data[0]) output_df = pd.DataFrame.filter(input_df, **self.options) return output_df.to_json() @@ -323,36 +382,3 @@ def __init__(self, action: str, reason: str): def __str__(self): return self.action + ': ' + self.reason - - -class NodeUtils: - - FIXED_INPUT_NODES = ['WriteCsvNode', 'FilterNode', 'JoinNode'] # nodes which can only have a fixed number of predecessors - MAX_INPUT_NODES = ['ReadCsvNode'] # nodes for which num_in represents a maximum number of predecessors - - @staticmethod - def validate_predecessor_data(predecessor_data_len, num_in, node_key): - validation_failed = False - exception_txt = "" - if node_key in NodeUtils.FIXED_INPUT_NODES and predecessor_data_len != num_in: - validation_failed = True - exception_txt = '%s needs %d inputs. %d were provided' - elif (node_key in NodeUtils.MAX_INPUT_NODES and predecessor_data_len > num_in): - validation_failed = True - exception_txt = '%s can take up to %d inputs. %d were provided' - - if validation_failed: - raise NodeException( - 'execute', - exception_txt % (node_key, num_in, predecessor_data_len) - ) - - @staticmethod - def replace_flow_vars(node_options, flow_vars): - # TODO: this will no longer work with the Node.options descriptor, - # which uses Node.option_values to populate the Parameter - # class values upon access - for var in flow_vars: - node_options[var['var_name']] = var['default_value'] - - return diff --git a/pyworkflow/pyworkflow/workflow.py b/pyworkflow/pyworkflow/workflow.py index 7930beb..6ffeb44 100644 --- a/pyworkflow/pyworkflow/workflow.py +++ b/pyworkflow/pyworkflow/workflow.py @@ -2,7 +2,7 @@ import networkx as nx import json -from .node import Node +from .node import Node, NodeException from .node_factory import node_factory @@ -55,25 +55,43 @@ def get_flow_var(self, node_id): Return: FlowNode object, if one exists. Otherwise, None. """ - if self._flow_vars.has_node(node_id) is not True: + if self.flow_vars.has_node(node_id) is not True: return None node_info = self.flow_vars.nodes[node_id] return node_factory(node_info) + def get_all_flow_var_options(self, node_id): + """Retrieve all FlowNode options for a specified Node. + + A Node can use all global FlowNodes, and any connected local FlowNodes + for variable substitution. + + Args: + node_id: The Node to GET + + Returns: + list of all FlowNode objects, converted to JSON + """ + # Add global FlowNodes + graph_data = Workflow.to_graph_json(self.flow_vars) + flow_variables = graph_data['nodes'] + + # Append local FlowNodes + for predecessor_id in self.get_node_predecessors(node_id): + node = self.get_node(predecessor_id) + + if node.node_type == 'FlowNode': + flow_variables.append(node.to_json()) + + return flow_variables + def update_or_add_node(self, node: Node): """ Update or add a Node object to the graph. Args: node - The Node object to update or add to the graph - - TODO: - * validate() always returns True; this should perform actual validation """ - # Do not add/update if invalid - if node.validate() is False: - raise WorkflowException('update_or_add_node', 'Node is invalid') - # Select the correct graph to modify graph = self.flow_vars if node.is_global else self.graph @@ -112,9 +130,6 @@ def add_edge(self, node_from: Node, node_to: Node): Returns: Tuple representing the new Edge (from, to) - - TODO: - * validate() always returns True; this should perform actual validation """ # Prevent duplicate edges between the same two nodes # TODO: This may be incorrect usage for a `node_to` that has multi-in @@ -124,8 +139,7 @@ def add_edge(self, node_from: Node, node_to: Node): if self._graph.has_edge(from_id, to_id): raise WorkflowException('add_node', 'Edge between nodes already exists.') - if node_from.validate() and node_to.validate(): - self._graph.add_edge(from_id, to_id) + self._graph.add_edge(from_id, to_id) return (from_id, to_id) @@ -192,37 +206,101 @@ def execute(self, node_id): if node_to_execute is None: raise WorkflowException('execute', 'The workflow does not contain node %s' % node_id) - # Read in any data from predecessor/flow nodes - # TODO: This should work for local FlowNodes, but global flow_vars still - # need a way to be assigned/replace Node options - preceding_data = list() - flow_vars = list() - for predecessor in self.get_node_predecessors(node_id): - try: - node_to_retrieve = self.get_node(predecessor) + # Load predecessor data and FlowNode values + preceding_data = self.load_input_data(node_to_execute.node_id) + flow_nodes = self.load_flow_nodes(node_to_execute.option_replace) - if node_to_retrieve is None: - raise WorkflowException('retrieve node data', 'The workflow does not contain node %s' % node_id) + try: + # Validate input data, and replace flow variables + node_to_execute.validate_input_data(len(preceding_data)) + execution_options = node_to_execute.get_execution_options(flow_nodes) + + # Pass in data to current Node to use in execution + output = node_to_execute.execute(preceding_data, execution_options) + + # Save new execution data to disk + node_to_execute.data = Workflow.store_node_data(self, node_id, output) + except NodeException as e: + raise e + + if node_to_execute.data is None: + raise WorkflowException('execute', 'There was a problem saving node output.') + + return node_to_execute + + def load_flow_nodes(self, option_replace): + """Construct dict of FlowNodes indexed by option name. + + During Node configuration, the user has the option to select FlowNodes + to replace a given parameter value. A FlowNode selection is structured + like the following JSON: + + "option_replace": { + "sep": { + "node_id": id, + "is_global": true + } + } - if node_to_retrieve.node_type == 'FlowNode': - flow_vars.append(node_to_retrieve.options) + This method retrieves the specified FlowNode where the replacement value + can then be retrieved. + + Args: + option_replace: Flow variables user selected for a given Node. + + Returns: + dict of FlowNode objects, indexed by the option name. + """ + flow_nodes = dict() + + for key, option in option_replace.items(): + try: + flow_node_id = option["node_id"] + + if option["is_global"]: + flow_node = self.get_flow_var(flow_node_id) else: - preceding_data.append(self.retrieve_node_data(node_to_retrieve)) + flow_node = self.get_node(flow_node_id) + + if flow_node is None or flow_node.node_type != 'FlowNode': + raise WorkflowException('load flow vars', 'The workflow does not contain FlowNode %s' % flow_node_id) + flow_nodes[key] = flow_node except WorkflowException: - # TODO: Should this append None, skip reading, or raise exception to view? - preceding_data.append(None) + # TODO: Should this add a blank value, skip reading, or raise exception to view? + continue - # Pass in data to current Node to use in execution - output = node_to_execute.execute(preceding_data, flow_vars) + return flow_nodes - # Save new execution data to disk - node_to_execute.data = Workflow.store_node_data(self, node_id, output) + def load_input_data(self, node_id): + """Construct list of predecessor DataFrames - if node_to_execute.data is None: - raise WorkflowException('execute', 'There was a problem saving node output.') + Retrieves the data file for all of a Node's predecessors. Ignores + exceptions for missing Nodes/data as this is checked prior to execution. - return node_to_execute + Args: + node_id: The Node with predecessors + + Returns: + list of dict-like DataFrames, used for Node execution + """ + input_data = list() + + for predecessor_id in self.get_node_predecessors(node_id): + try: + node_to_retrieve = self.get_node(predecessor_id) + + if node_to_retrieve is None: + raise WorkflowException('retrieve node data', 'The workflow does not contain node %s' % predecessor_id) + + if node_to_retrieve.node_type != 'FlowNode': + input_data.append(self.retrieve_node_data(node_to_retrieve)) + + except WorkflowException: + # TODO: Should this append None, skip reading, or raise exception to view? + continue + + return input_data def execution_order(self): try: diff --git a/vp/node/views.py b/vp/node/views.py index 4a27150..740c85a 100644 --- a/vp/node/views.py +++ b/vp/node/views.py @@ -2,7 +2,7 @@ from django.http import JsonResponse from django.views.decorators.csrf import csrf_exempt -from pyworkflow import Workflow, WorkflowException, Node, NodeException, node_factory +from pyworkflow import Workflow, WorkflowException, Node, NodeException, node_factory, ParameterValidationError from rest_framework.decorators import api_view from drf_yasg.utils import swagger_auto_schema @@ -167,10 +167,12 @@ def handle_node(request, node_id): }, status=404) # Process request - # Node class not JSON serializable; pass __dict__ to response for display try: if request.method == 'GET': - response = JsonResponse(retrieved_node.__dict__, safe=False) + response = JsonResponse({ + "retrieved_node": retrieved_node.to_json(), + "flow_variables": request.pyworkflow.get_all_flow_var_options(node_id), + }, safe=False) elif request.method == 'POST': updated_node = create_node(request) @@ -190,8 +192,11 @@ def handle_node(request, node_id): 'updated_node': str(updated_node.is_global), }, status=500) + # Validation raises exception if failed + updated_node.validate() + request.pyworkflow.update_or_add_node(updated_node) - response = JsonResponse(updated_node.__dict__, safe=False) + response = JsonResponse(updated_node.to_json(), safe=False) elif request.method == 'DELETE': request.pyworkflow.remove_node(retrieved_node) response = JsonResponse({ @@ -203,6 +208,8 @@ def handle_node(request, node_id): }, status=405) except (NodeException, WorkflowException) as e: return JsonResponse({e.action: e.reason}, status=500) + except ParameterValidationError as e: + return JsonResponse({'message': str(e)}, status=500) return response diff --git a/vp/workflow/views.py b/vp/workflow/views.py index a76c28c..f468697 100644 --- a/vp/workflow/views.py +++ b/vp/workflow/views.py @@ -25,14 +25,14 @@ def new_workflow(request): """ try: workflow_id = json.loads(request.body) - except json.JSONDecodeError as e: - return JsonResponse({'No React model ID provided': str(e)}, status=500) - # Create new Workflow - request.pyworkflow = Workflow(name=workflow_id, root_dir=settings.MEDIA_ROOT) - request.session.update(request.pyworkflow.to_session_dict()) + # Create new Workflow + request.pyworkflow = Workflow(name=workflow_id['id'], root_dir=settings.MEDIA_ROOT) + request.session.update(request.pyworkflow.to_session_dict()) - return JsonResponse(Workflow.to_graph_json(request.pyworkflow.graph)) + return JsonResponse(Workflow.to_graph_json(request.pyworkflow.graph)) + except (json.JSONDecodeError, KeyError) as e: + return JsonResponse({'No React model ID provided': str(e)}, status=500) @swagger_auto_schema(method='post',