Skip to content

Commit

Permalink
zmq: await server started first then create all evaluator tasks
Browse files Browse the repository at this point in the history
Additionally, _server_started can be removed from the tasks. The commits also
adds a test for zmq start up failure when port already in use
  • Loading branch information
xjules committed Jan 28, 2025
1 parent 8061987 commit 21bfc9a
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 11 deletions.
16 changes: 5 additions & 11 deletions src/ert/ensemble_evaluator/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ def __init__(self, ensemble: Ensemble, config: EvaluatorServerConfig):
self._dispatchers_empty.set()

async def _do_heartbeat_clients(self) -> None:
await self._server_started
while True:
if self._clients_connected:
await self._events_to_send.put(HeartbeatEvent.event)
Expand All @@ -93,7 +92,6 @@ async def _do_heartbeat_clients(self) -> None:
await asyncio.sleep(0.1)

async def _publisher(self) -> None:
await self._server_started
while True:
event = await self._events_to_send.get()
for identity in self._clients_connected:
Expand Down Expand Up @@ -270,7 +268,6 @@ async def handle_dispatch(self, dealer: bytes, frame: bytes) -> None:
await self._events.put(event)

async def listen_for_messages(self) -> None:
await self._server_started
while True:
try:
dealer, _, frame = await self._router_socket.recv_multipart()
Expand Down Expand Up @@ -368,26 +365,23 @@ async def _signal_cancel(self) -> None:
async def _start_running(self) -> None:
if not self._config:
raise ValueError("no config for evaluator")
self._ee_tasks = [
asyncio.create_task(self._server(), name="server_task"),
self._ee_tasks = [asyncio.create_task(self._server(), name="server_task")]
await self._server_started
self._ee_tasks += [
asyncio.create_task(self._do_heartbeat_clients(), name="heartbeat_task"),
asyncio.create_task(
self._batch_events_into_buffer(), name="dispatcher_task"
),
asyncio.create_task(self._process_event_buffer(), name="processing_task"),
asyncio.create_task(self._publisher(), name="publisher_task"),
asyncio.create_task(self.listen_for_messages(), name="listener_task"),
]

await self._server_started
self._ee_tasks.append(
asyncio.create_task(
self._ensemble.evaluate(
self._config, self._events, self._manifest_queue
),
name="ensemble_task",
)
)
),
]

CLOSE_SERVER_TIMEOUT = 60

Expand Down
14 changes: 14 additions & 0 deletions tests/ert/unit_tests/ensemble_evaluator/test_ensemble_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from unittest.mock import patch

import pytest
import zmq.asyncio
from hypothesis import given
from hypothesis import strategies as st
from pydantic import ValidationError
Expand Down Expand Up @@ -97,6 +98,19 @@ async def test_evaluator_handles_dispatchers_connected(
assert evaluator._dispatchers_empty.is_set()


async def test_evaluator_raises_on_start_with_address_in_use(make_ee_config):
ee_config = make_ee_config(use_ipc_protocol=False)
ctx = zmq.asyncio.Context()
socket = ctx.socket(zmq.ROUTER)
try:
socket.bind(f"tcp://*:{ee_config.router_port}")
evaluator = EnsembleEvaluator(TestEnsemble(0, 2, 2, id_="0"), ee_config)
with pytest.raises(zmq.error.ZMQError, match="Address already in use"):
await evaluator.run_and_get_successful_realizations()
finally:
ctx.destroy()


async def test_no_config_raises_valueerror_when_running():
evaluator = EnsembleEvaluator(TestEnsemble(0, 2, 2, id_="0"), None)
with pytest.raises(ValueError, match="no config for evaluator"):
Expand Down

0 comments on commit 21bfc9a

Please sign in to comment.