-
Notifications
You must be signed in to change notification settings - Fork 110
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Unify execute API of JobQueue, refactor websocket connection #6558
Conversation
d805142
to
41868ba
Compare
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## main #6558 +/- ##
==========================================
- Coverage 83.51% 83.51% -0.01%
==========================================
Files 346 346
Lines 20764 20778 +14
Branches 948 948
==========================================
+ Hits 17341 17352 +11
- Misses 3129 3132 +3
Partials 294 294 ☔ View full report in Codecov by Sentry. |
0c76269
to
ceb0b47
Compare
Tested manually with GUI for poly_example, bigpoly and also with everest's egg model. |
Up for discussion: Should this be squashed? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great Job!
) -> None: | ||
assert self.ens_id is not None # mypy | ||
events = deque( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we use deque here instead of a normal queue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably a local and minor optimization thing to have a double-ended queue.
if self._ee_uri is None: | ||
# If no ensemble evaluator present, we will publish to the log | ||
while ( | ||
change := await self._changes_to_publish.get() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not know about this (walrus operator :=). Thanks🎉
experiment_id: Optional[str] = None, | ||
) -> None: | ||
for q_index, q_node in enumerate(self.job_list): | ||
cert_path = f"{q_node.run_path}/{CERT_FILE}" | ||
if cert is not None: | ||
if self._ee_cert is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was it not clearer when conninfo was passed directly into the function as a parameter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was more explicit, but that explicitness would f.ex make it possible to mixup the ee connection for the execute
function with what is given in this jobs file. I think it is cleaner to set the connection info just one place in the Queue object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only thing is that Queue does not need to know about connection setup between ee and job_runner; hence my comment on passing it rather as a function params 🤷
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is correct, I am removing that "feature" to have different connection info.
I would squash the fixup commits at least :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it looks good! If there are troubles with mypy I'd squash it as it sort of addresses the same thing.
The websocket connection from the JobQueue is refactored to use a single asyncio task for fetching changes to be published, publish them, and also keep the connection open. The queue execute function had two variants, one that included websocket communication (for "legacy" ert, and one for Everest, the "simulation context"). Now there is only one, which is async. The changes that are published to the websocket channel is in the simulation context (Everest) logged to the debug log. Previously that information was not propagated. The connection to the ensemble evaluator is now configured once, and from now on it is not possible to have the job_runner use a different connection than the JobQueue uses for publishing its realization state changes.
c327ea1
to
1ddc0c8
Compare
The simulation_context now has to call it in an async loop, and it is also doing slightly more work (logging changes) than before.
Pre review checklist
Ground Rules),
and changes to existing code have good test coverage.
Pre merge checklist