diff --git a/src/ert/ensemble_evaluator/evaluator.py b/src/ert/ensemble_evaluator/evaluator.py index 873cebe0f2f..b9ea3bfaa78 100644 --- a/src/ert/ensemble_evaluator/evaluator.py +++ b/src/ert/ensemble_evaluator/evaluator.py @@ -84,7 +84,6 @@ def __init__(self, ensemble: Ensemble, config: EvaluatorServerConfig): self._dispatchers_empty.set() async def _do_heartbeat_clients(self) -> None: - await self._server_started while True: if self._clients_connected: await self._events_to_send.put(HeartbeatEvent.event) @@ -93,7 +92,6 @@ async def _do_heartbeat_clients(self) -> None: await asyncio.sleep(0.1) async def _publisher(self) -> None: - await self._server_started while True: event = await self._events_to_send.get() for identity in self._clients_connected: @@ -270,7 +268,6 @@ async def handle_dispatch(self, dealer: bytes, frame: bytes) -> None: await self._events.put(event) async def listen_for_messages(self) -> None: - await self._server_started while True: try: dealer, _, frame = await self._router_socket.recv_multipart() @@ -368,8 +365,9 @@ async def _signal_cancel(self) -> None: async def _start_running(self) -> None: if not self._config: raise ValueError("no config for evaluator") - self._ee_tasks = [ - asyncio.create_task(self._server(), name="server_task"), + self._ee_tasks = [asyncio.create_task(self._server(), name="server_task")] + await self._server_started + self._ee_tasks += [ asyncio.create_task(self._do_heartbeat_clients(), name="heartbeat_task"), asyncio.create_task( self._batch_events_into_buffer(), name="dispatcher_task" @@ -377,17 +375,13 @@ async def _start_running(self) -> None: asyncio.create_task(self._process_event_buffer(), name="processing_task"), asyncio.create_task(self._publisher(), name="publisher_task"), asyncio.create_task(self.listen_for_messages(), name="listener_task"), - ] - - await self._server_started - self._ee_tasks.append( asyncio.create_task( self._ensemble.evaluate( self._config, self._events, self._manifest_queue ), name="ensemble_task", - ) - ) + ), + ] CLOSE_SERVER_TIMEOUT = 60 diff --git a/tests/ert/unit_tests/ensemble_evaluator/test_ensemble_evaluator.py b/tests/ert/unit_tests/ensemble_evaluator/test_ensemble_evaluator.py index d22bbdf12ad..17edb28fd69 100644 --- a/tests/ert/unit_tests/ensemble_evaluator/test_ensemble_evaluator.py +++ b/tests/ert/unit_tests/ensemble_evaluator/test_ensemble_evaluator.py @@ -5,6 +5,7 @@ from unittest.mock import patch import pytest +import zmq.asyncio from hypothesis import given from hypothesis import strategies as st from pydantic import ValidationError @@ -97,6 +98,19 @@ async def test_evaluator_handles_dispatchers_connected( assert evaluator._dispatchers_empty.is_set() +async def test_evaluator_raises_on_start_with_address_in_use(make_ee_config): + ee_config = make_ee_config(use_ipc_protocol=False) + ctx = zmq.asyncio.Context() + socket = ctx.socket(zmq.ROUTER) + try: + socket.bind(f"tcp://*:{ee_config.router_port}") + evaluator = EnsembleEvaluator(TestEnsemble(0, 2, 2, id_="0"), ee_config) + with pytest.raises(zmq.error.ZMQError, match="Address already in use"): + await evaluator.run_and_get_successful_realizations() + finally: + ctx.destroy() + + async def test_no_config_raises_valueerror_when_running(): evaluator = EnsembleEvaluator(TestEnsemble(0, 2, 2, id_="0"), None) with pytest.raises(ValueError, match="no config for evaluator"):