Skip to content

Commit

Permalink
Add test for succesful connection, but lost connection afterwards
Browse files Browse the repository at this point in the history
  • Loading branch information
xjules committed Dec 10, 2024
1 parent 70412cd commit 43a018a
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 37 deletions.
12 changes: 8 additions & 4 deletions src/_ert/forward_model_runner/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,14 @@ async def __aenter__(self) -> Self:
async def __aexit__(
self, exc_type: Any, exc_value: Any, exc_traceback: Any
) -> None:
await self._send("DISCONNECT")
self.socket.disconnect(self.url)
await self._term_receiver_task()
self.term()
try:
await self._send("DISCONNECT")
except ClientConnectionError:
logger.error("No ack for dealer disconnection. Connection is down!")
finally:
self.socket.disconnect(self.url)
await self._term_receiver_task()
self.term()

async def _term_receiver_task(self) -> None:
if self._receiver_task and not self._receiver_task.done():
Expand Down
33 changes: 22 additions & 11 deletions tests/ert/unit_tests/ensemble_evaluator/test_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,31 @@ async def mock_event_handler(router_socket):
assert connected is False


# TODO: refactor
# async def test_unexpected_close(unused_tcp_port):
# ee_con_info = EvaluatorConnectionInfo(f"tcp://127.0.0.1:{unused_tcp_port}")
async def test_unexpected_close_after_connection_successful(
monkeypatch, unused_tcp_port
):
ee_con_info = EvaluatorConnectionInfo(f"tcp://127.0.0.1:{unused_tcp_port}")

# async def mock_event_handler(router_socket):
# router_socket.close()
monkeypatch.setattr(Monitor, "DEFAULT_MAX_RETRIES", 0)
monkeypatch.setattr(Monitor, "DEFAULT_ACK_TIMEOUT", 1)

# websocket_server_task = asyncio.create_task(
# async_zmq_server(unused_tcp_port, mock_event_handler)
# )
# async with Monitor(ee_con_info) as monitor:
# await monitor.signal_done()
async def mock_event_handler(router_socket):
dealer, _, frame = await router_socket.recv_multipart()
await router_socket.send_multipart([dealer, b"", b"ACK"])
dealer = dealer.decode("utf-8")
assert dealer.startswith("client-")
frame = frame.decode("utf-8")
assert frame == "CONNECT"
router_socket.close()

# await websocket_server_task
websocket_server_task = asyncio.create_task(
async_zmq_server(unused_tcp_port, mock_event_handler)
)
async with Monitor(ee_con_info) as monitor:
with pytest.raises(ClientConnectionError):
await monitor.signal_done()

await websocket_server_task


async def test_that_monitor_track_can_exit_without_terminated_event_from_evaluator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,7 @@ def test_report_with_reconnected_reporter_but_finished_jobs(unused_tcp_port):
# "_ert.forward_model_runner.client.Client.send",
# lambda x, y: mock_send(y),
# ):
# reporter.report(
# Running(fmstep1, ProcessTreeStatus(max_rss=300, rss=10))
# )
# reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=300, rss=10)))
# # Make sure the publisher thread exits because it got
# # ClientConnectionClosedOK. If it hangs it could indicate that the
# # exception is not caught/handled correctly
Expand All @@ -294,6 +292,7 @@ def test_report_with_reconnected_reporter_but_finished_jobs(unused_tcp_port):
# # set _stop_timestamp was not set to None since the reporter finished on time
# assert reporter._timeout_timestamp is not None


# # The Running(fmstep1, 300, 10) is popped from the queue, but never sent.
# # The following Running is added to queue along with the sentinel
# assert reporter._event_queue.qsize() == 2
Expand Down
19 changes: 0 additions & 19 deletions tests/ert/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,25 +61,6 @@ def wait_until(func, interval=0.5, timeout=30):
)


async def async_mock_zmq_server(messages, port, server_started):
async def _handler(router_socket):
while True:
dealer, __, frame = await router_socket.recv_multipart()
await router_socket.send_multipart([dealer, b"", b"ACK"])
raw_msg = frame.decode("utf-8")
messages.append(raw_msg)
if raw_msg == "DISCONNECT":
return

zmq_context = zmq.asyncio.Context() # type: ignore
router_socket = zmq_context.socket(zmq.ROUTER)
router_socket.bind(f"tcp://*:{port}")
server_started.set()
await _handler(router_socket)
router_socket.close()
zmq_context.term()


@contextlib.contextmanager
def mock_zmq_thread(port, messages, signal_queue=None):
loop = None
Expand Down

0 comments on commit 43a018a

Please sign in to comment.