Skip to content

Commit

Permalink
Add test for scheduler publishings its events to a websocket
Browse files Browse the repository at this point in the history
  • Loading branch information
berland committed Apr 5, 2024
1 parent b21fab1 commit 994b8c2
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/ert/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ async def _monitor_and_handle_tasks(
if task in scheduling_tasks:
await self._cancel_job_tasks()
raise task_exception
while not self._events.empty():
await asyncio.sleep(0.3) # Workaround, not to be merged
if not self.is_active():
for task in self._job_tasks.values():
if task.cancelled():
Expand Down
41 changes: 41 additions & 0 deletions tests/unit_tests/scheduler/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import List

import pytest
import websockets
from cloudevents.http import from_json

from ert.config import QueueConfig
Expand Down Expand Up @@ -476,6 +477,46 @@ async def mock_failure(message, *args, **kwargs):
raise RuntimeError(message)


async def _mock_ws(set_when_done: asyncio.Event, handler, port: int):
async with websockets.server.serve(handler, "127.0.0.1", port):
while not set_when_done.is_set():
await asyncio.sleep(0)


async def test_scheduler_publishes_to_websocket(
mock_driver, realization, unused_tcp_port
):
set_when_done = asyncio.Event()

events_received: List[str] = []

async def mock_ws_event_handler(websocket):
nonlocal events_received
async for message in websocket:
events_received.append(message)

websocket_server_task = asyncio.create_task(
_mock_ws(set_when_done, mock_ws_event_handler, unused_tcp_port)
)

driver = mock_driver()
sch = scheduler.Scheduler(
driver, [realization], ee_uri=f"ws://127.0.0.1:{unused_tcp_port}"
)
await sch.execute()

assert [
json.loads(event)["data"]["queue_event_type"] for event in events_received
] == ["SUBMITTED", "PENDING", "RUNNING", "SUCCESS"]

assert (
sch._events.empty()
), "Schedulers internal queue of events to be sent must be empty before it can finish"

set_when_done.set()
await websocket_server_task


@pytest.mark.timeout(5)
async def test_that_driver_poll_exceptions_are_propagated(mock_driver, realization):
driver = mock_driver()
Expand Down

0 comments on commit 994b8c2

Please sign in to comment.