Skip to content

Commit

Permalink
Make sure to wait to ensemble start before initiating monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
xjules committed Dec 9, 2024
1 parent 6d77f86 commit c8537dc
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 4 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ dependencies = [
"python-dateutil",
"python-multipart", # extra dependency for fastapi
"pyyaml",
"pyzmq",
"qtpy",
"requests",
"resfo",
Expand All @@ -70,7 +71,6 @@ dependencies = [
"uvicorn >= 0.17.0",
"xarray",
"xtgeo >= 3.3.0",
"zmq",
]

[project.scripts]
Expand Down
9 changes: 6 additions & 3 deletions src/ert/ensemble_evaluator/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ async def _publisher(self) -> None:
event = await self._events_to_send.get()
for identity in self._clients_connected:
await self._router_socket.send_multipart(
[identity, b"", event_to_json(event).encode()]
[identity, b"", event_to_json(event).encode("utf-8")]
)
self._events_to_send.task_done()

Expand Down Expand Up @@ -213,7 +213,7 @@ async def handle_client(self, dealer: bytes, frame: bytes) -> None:
ensemble=self.ensemble.id_,
)
await self._router_socket.send_multipart(
[dealer, b"", event_to_json(event).encode()]
[dealer, b"", event_to_json(event).encode("utf-8")]
)
elif raw_msg == "DISCONNECT":
self._clients_connected.discard(dealer)
Expand Down Expand Up @@ -296,7 +296,10 @@ async def _server(self) -> None:
# Attempt to bind the ROUTER socket
self._router_socket.bind(f"tcp://*:{self._config.router_port}")
self._server_started.set()
print("ROUTER listens on", f"tcp://*:{self._config.router_port}")
print(
"ROUTER listens on",
f"tcp://*:{self._config.router_port} {self._config.server_public_key=} {self._config.server_secret_key=}",
)
except zmq.error.ZMQError as e:
logger.error(f"ZMQ error encountered {e} during evaluator initialization")
print(f"ZMQ error encountered {e} during evaluator initialization")
Expand Down
1 change: 1 addition & 0 deletions src/ert/run_models/base_run_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,7 @@ async def run_ensemble_evaluator_async(
evaluator_task = asyncio.create_task(
evaluator.run_and_get_successful_realizations()
)
await evaluator._server_started.wait()
if not (await self.run_monitor(ee_config, ensemble.iteration)):
return []

Expand Down

0 comments on commit c8537dc

Please sign in to comment.