-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reads to stdin and writes to stdout, batch execution of workflows #69
Conversation
…ifiying a location
Lookin good! I left a comment about finding a way not to duplicate all the execute logic (but no actual solution haha). I also imagine this is going to have hella conflicts with Matt's significant node refactoring in #70. |
Agree with Samir, this is looking good so far! I would try updating this branch from master to incorporate changes from #57—there's a refined One suggestion: switch the One question: I see the simplicity writing stdin to a temp file to pass in to execution, but is there a downside to passing the raw input directly? |
@reelmatt thanks for the suggestion of passing the file directories as an argument, I added it to this PR. I don't see any downsides of passing the raw input directly, I mainly just thought it is useful to have the file uploaded to the directory for future use, but I can look into it and push it in the next PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing to change before merging is the comment below about saving the executed node back to the workflow. A few different ways we can go about it. Reading from stdin is still working though just limited to one file, correct?
I also might be missing writing to stdout, but if I do pyworkflow execute workflow_file > output-file
, the contents of output-file
is just the print statements logged to the terminal (e.g. "Loading workflow file..."). The behavior I was thinking of was printing the data of a Write CSV node. Based on how you have the Read CSV node working, you could probably do a simple len(stdin_files) > 0
and if true, do print(df.to_json())
before/instead of writing the file. That would still include the other printed statements, but at least it gets the data into output-file
which I think is more ideal. Thoughts?
pyworkflow/pyworkflow/workflow.py
Outdated
# delete file at index 0 | ||
del stdin_files[0] | ||
else: | ||
workflow_instance.execute(node) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ran into this bug the other day where the graph didn't update after execution. The execute
call returns the executed node for the front-end to then call workflow.update_or_add_node()
to actually store the node.data
attribute. Without this, I got complaints that predecessor data was missing when it's actually written to disk.
Changing both if/else to include executed_node = workflow_instance.<execute_method_here>...
and then workflow_instance.update_or_add_node(executed_node)
outside the if/else but within the for loop solves the issue. Come to think of it, we should probably update the execute
method to have this behavior and update the execute endpoint as well (to avoid double-saving).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, I've modified the code to do exactly what you suggested and seems to be working fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added a modification that includes the dataframe output in sdtout. Didn't need to duplicate the exact functionality as ReadCsv I was able to print output = node_to_execute.execute(preceding_data, execution_options). I'm still not happy with the repeating of execute code in workflow, so will be refactoring this in the next PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking better now!
3. Run it as: pyworkflow --file-directory (path-to-json-workflow-file) execute | ||
3. Run it as: pyworkflow execute workflow-file | ||
|
||
Also accepts reading input from std (i.e < file.csv) and writing to sdt out (i.e > output.csv) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: Should be stdin
and stdout
.
pyworkflow/pyworkflow/node.py
Outdated
@@ -109,6 +109,16 @@ def execute(self, predecessor_data, flow_vars): | |||
except Exception as e: | |||
raise NodeException('read csv', str(e)) | |||
|
|||
def execute_for_read(self, predecessor_data, flow_vars, file_to_read): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to alter the node configuration (probably through node.option_values
) to point to the stdin-copied file rather than create a new method? I think this would resolve the need for duplicated execute calls in the workflow object as you mentioned in the comments there.
pyworkflow execute < input-file
pyworkflow execute > output-file
TODO:
For reading from stdin I created a method in workflow and an execute specific one in ReadCsvNode class. I would like to refactor this to re-use the already existing methods. While some basic validations are in place there is still some work needed to be done.