-
Notifications
You must be signed in to change notification settings - Fork 110
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Replace websockets with zmq #9173
Conversation
feac786
to
5acae39
Compare
e553dbc
to
016b43d
Compare
ca1e52c
to
0884d51
Compare
e3d6fa0
to
8b9a9ab
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #9173 +/- ##
==========================================
- Coverage 91.86% 91.71% -0.15%
==========================================
Files 433 430 -3
Lines 26780 26684 -96
==========================================
- Hits 24602 24474 -128
- Misses 2178 2210 +32
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
All tests passing 😻 |
|
||
def __enter__(self) -> Self: | ||
self.loop.run_until_complete(self.__aenter__()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can remove all of the synced wrappers of the async methods?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe. I think the synced version if now only used in tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have made some exploration and the issue that the reporter tests are still fully synced and thus we need to preserve both (async and sync) versions alive. Actually it's only the zmq socket that needs to be synced. Yes, this can go away!
async def __aexit__( | ||
self, exc_type: Any, exc_value: Any, exc_traceback: Any |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this in the method signature?
I see that we decorate the equivalent method in EnsembleEvaluator with contextlib.asynccontextmanager
. Maybe we should do the same here if it is not used.
async def _term_receiver_task(self) -> None: | ||
if self._receiver_task and not self._receiver_task.done(): | ||
self._receiver_task.cancel() | ||
await asyncio.gather(self._receiver_task, return_exceptions=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have to gather this cancelled task, or can we await it (and suppress asyncio.cancellationerror if it is raised)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we capture asyncio.CancelledError
we can just do await. :+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually the CancelledError is not catched there, so I would keep it.
return self | ||
|
||
def term(self) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should rearrange the order of the methods, it is a bit difficult to read
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can do 👍
while True: | ||
try: | ||
if self._done.is_set() and start_time is None: | ||
start_time = time.time() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is start_time? Is that the start of closing the event_publisher?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's the start_time
of closing event_publisher
. Maybe I can rename it start_closing_time
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that sounds good
@@ -82,14 +65,20 @@ def __init__(self, ensemble: Ensemble, config: EvaluatorServerConfig): | |||
self._max_batch_size: int = 500 | |||
self._batching_interval: float = 2.0 | |||
self._complete_batch: asyncio.Event = asyncio.Event() | |||
self._server_started: asyncio.Event = asyncio.Event() | |||
self._clients_connected: set[bytes] = set() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think self._connected_clients
would be a better name. clients_connected
sounds more like a boolean
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, what is client? Is it the Client we use for sending/receiving, or is it referring to the monitor for UI?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can rename it to monitors
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm. I was trying to preserve the naming conventions we had before (which was clients), but maybe having it explicit as Monitor
would make more sense. Then I would change handle_client
-> handle_monitor
as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we rename it now that we are already giving it an overhaul, or is that a separate PR? If not, let's change it to connected_clients
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would do it in another PR to keep the semantics as it was before.
self._router_socket.close() | ||
zmq_context.destroy() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self._router_socket.close() | |
zmq_context.destroy() | |
zmq_context.destroy() |
The destroy method should close all associated sockets automatically
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll leave it for the time being as to be explicit.
if sys.platform != "linux": | ||
kwargs["use_ipc_protocol"] = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about the tests running on the GH Actions ubuntu runner?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is meant only for MacOS GH Action runners.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will we not use ipc on the ubuntu runners? Maybe that can speed up some tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but we should test tcp sockets too. Hm, let's see.
CodSpeed Performance ReportMerging #9173 will not alter performanceComparing Summary
|
@xjules Seems codspeed is very impressed by your changes 🤣 |
b965bf4
to
dec2341
Compare
async def process_message(self, msg: str) -> None: | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we throw on a @abstractmethod
?
It is defined in monitor.py which inherits from Client
def signal(self, value): | ||
self.value = value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def signal(self, value): | |
self.value = value | |
def signal(self, should_stop: bool): | |
self.should_stop = should_stop |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the signal(value) has more meaning than just "stop". Will put a comment there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added a comment:
"""Mock ZMQ server for testing
signal = 0: normal operation
signal = 1: don't send ACK and don't receive messages
signal = 2: don't send ACK, but receive messages
"""
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very good!
while True: | ||
try: | ||
if self._done.is_set() and start_time is None: | ||
start_time = time.time() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that sounds good
if sys.platform != "linux": | ||
kwargs["use_ipc_protocol"] = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will we not use ipc on the ubuntu runners? Maybe that can speed up some tests
1318cf5
to
0f1bb30
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very good, let's start the new year without websockets 🥇
- dealers always wait for acknowledgment from the evaluator - removing websockets and wait_for_evaluator - Settup encryption with curve - each dealer (client, dispatcher) will get a unique name - Monitor is an advanced version Client - _server_started.wait() is to signal that zmq router socket is bound - Use TCP protocol only when using LSF, SLURM or TORQUE queues -- Use ipc_protocol when using LOCAL driver - Remove certificate - Remove synced _send from Client - Remove cert generator - Remove ClientConnectionClosedOK - Add test for new connection while closing down evaluator - Add test for handle dispatcher and dispatcher messages in evaluator - Add tests for ipc and tcp ee config - Add test for clear connect and disconnect of Monitor - Set a a correct protocol for everestserver
Issue
POC: testing feasibility of zmq.
Refactor tests
Approach
Implementing router-dealer pattern with custom acknowledgments with zmq
Server / router (ensemble evaluator) gets messages and keeps track of connected clients wherein in additional it sends
ack
message to dealaers to confirm that the message has been received .Dealer(s) = clients and dispatchers are sending messages to the router via:
dealers always wait for acknowledgment from the evaluator
removing websockets, no more wait_for_evaluator
Settup encryption with curve
each dealer (client, dispatcher) will get a unique name
Make sure to check cancellation error when sending event from client
Monitor is an advanced version Client
Use TCP protocol only when using LSF, SLURM or TORQUE queues
PR title captures the intent of the changes, and is fitting for release notes.
Added appropriate release note label
Commit history is consistent and clean, in line with the contribution guidelines.
Make sure unit tests pass locally after every commit (
git rebase -i main --exec 'pytest tests/ert/unit_tests -n logical -m "not integration_test"'
)When applicable