From f00c19615f39fa86067734128d16c7bd4264d1e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5vard=20Berland?= Date: Thu, 14 Dec 2023 09:01:49 +0100 Subject: [PATCH 1/2] Test ensemble_evaluator with new scheduler This highlights a behavioural change in the new LocalDriver, it will not send the same events as the legacy local driver, see test_async_queue_execution.py::test_happy_path The new scheduler will not catch bare exceptions for now, and thus the test for that situation is only applied for the legacy JobQueue. --- src/ert/scheduler/job.py | 8 ++-- .../test_async_queue_execution.py | 40 ++++++++++++++----- .../test_ensemble_builder.py | 3 +- .../test_ensemble_legacy.py | 32 ++++++--------- 4 files changed, 49 insertions(+), 34 deletions(-) diff --git a/src/ert/scheduler/job.py b/src/ert/scheduler/job.py index 7aa17ea1b25..11a1f36224a 100644 --- a/src/ert/scheduler/job.py +++ b/src/ert/scheduler/job.py @@ -23,7 +23,7 @@ class State(str, Enum): WAITING = "WAITING" SUBMITTING = "SUBMITTING" - STARTING = "STARTING" + PENDING = "PENDING" RUNNING = "RUNNING" ABORTING = "ABORTING" COMPLETED = "COMPLETED" @@ -34,7 +34,7 @@ class State(str, Enum): STATE_TO_LEGACY = { State.WAITING: "WAITING", State.SUBMITTING: "SUBMITTED", - State.STARTING: "PENDING", + State.PENDING: "PENDING", State.RUNNING: "RUNNING", State.ABORTING: "DO_KILL", State.COMPLETED: "SUCCESS", @@ -73,7 +73,7 @@ async def _submit_and_run_once(self, sem: asyncio.BoundedSemaphore) -> None: self.real.iens, self.real.job_script, cwd=self.real.run_arg.runpath ) - await self._send(State.STARTING) + await self._send(State.PENDING) await self.started.wait() await self._send(State.RUNNING) @@ -126,7 +126,7 @@ async def _send(self, state: State) -> None: event = CloudEvent( { "type": _queue_state_event_type(status), - "source": f"/etc/ensemble/{self._scheduler._ens_id}/real/{self.iens}", + "source": f"/ert/ensemble/{self._scheduler._ens_id}/real/{self.iens}", "datacontenttype": "application/json", }, { diff --git a/tests/unit_tests/ensemble_evaluator/test_async_queue_execution.py b/tests/unit_tests/ensemble_evaluator/test_async_queue_execution.py index dc256832cb8..1098dc728c8 100644 --- a/tests/unit_tests/ensemble_evaluator/test_async_queue_execution.py +++ b/tests/unit_tests/ensemble_evaluator/test_async_queue_execution.py @@ -9,6 +9,8 @@ from ert.async_utils import get_event_loop from ert.ensemble_evaluator._wait_for_evaluator import wait_for_evaluator from ert.job_queue import Driver, JobQueue +from ert.scheduler import Scheduler, create_driver +from ert.shared.feature_toggling import FeatureToggling async def mock_ws(host, port, done): @@ -33,6 +35,7 @@ async def _handler(websocket, path): @pytest.mark.asyncio @pytest.mark.timeout(60) +@pytest.mark.scheduler async def test_happy_path( tmpdir, unused_tcp_port, @@ -41,6 +44,7 @@ async def test_happy_path( queue_config, caplog, monkeypatch, + try_queue_and_scheduler, ): asyncio.set_event_loop(event_loop) host = "localhost" @@ -51,9 +55,16 @@ async def test_happy_path( await wait_for_evaluator(base_url=url, timeout=5) ensemble = make_ensemble_builder(monkeypatch, tmpdir, 1, 1).build() - queue = JobQueue(queue_config, ensemble.reals, ee_uri=url, ens_id="ee_0") + + if FeatureToggling.is_enabled("scheduler"): + queue = Scheduler( + create_driver(queue_config), ensemble.reals, ee_uri=url, ens_id="ee_0" + ) + else: + queue = JobQueue(queue_config, ensemble.reals, ee_uri=url, ens_id="ee_0") await queue.execute() + done.set_result(None) await mock_ws_task @@ -62,12 +73,21 @@ async def test_happy_path( assert mock_ws_task.done() - event_0 = from_json(mock_ws_task.result()[0]) - assert event_0["source"] == "/ert/ensemble/ee_0/real/0" - assert event_0["type"] == "com.equinor.ert.realization.waiting" - assert event_0.data == {"queue_event_type": "WAITING"} - - end_event_index = len(mock_ws_task.result()) - 1 - end_event = from_json(mock_ws_task.result()[end_event_index]) - assert end_event["type"] == "com.equinor.ert.realization.success" - assert end_event.data == {"queue_event_type": "SUCCESS"} + if FeatureToggling.is_enabled("scheduler"): + first_expected_queue_event_type = "SUBMITTED" + else: + first_expected_queue_event_type = "WAITING" + + for received_event, expected_type, expected_queue_event_type in zip( + [mock_ws_task.result()[0], mock_ws_task.result()[-1]], + ["waiting", "success"], + [first_expected_queue_event_type, "SUCCESS"], + ): + assert from_json(received_event)["source"] == "/ert/ensemble/ee_0/real/0" + assert ( + from_json(received_event)["type"] + == f"com.equinor.ert.realization.{expected_type}" + ) + assert from_json(received_event).data == { + "queue_event_type": expected_queue_event_type + } diff --git a/tests/unit_tests/ensemble_evaluator/test_ensemble_builder.py b/tests/unit_tests/ensemble_evaluator/test_ensemble_builder.py index 4af6b69d6a0..ccd1515aab7 100644 --- a/tests/unit_tests/ensemble_evaluator/test_ensemble_builder.py +++ b/tests/unit_tests/ensemble_evaluator/test_ensemble_builder.py @@ -10,7 +10,8 @@ @pytest.mark.parametrize("active_real", [True, False]) -def test_build_ensemble(active_real): +@pytest.mark.scheduler +def test_build_ensemble(active_real, monkeypatch, try_queue_and_scheduler): ensemble = ( EnsembleBuilder() .set_legacy_dependencies(QueueConfig(queue_system=QueueSystem.LOCAL), False, 0) diff --git a/tests/unit_tests/ensemble_evaluator/test_ensemble_legacy.py b/tests/unit_tests/ensemble_evaluator/test_ensemble_legacy.py index ddc814d2cd5..f38728644a5 100644 --- a/tests/unit_tests/ensemble_evaluator/test_ensemble_legacy.py +++ b/tests/unit_tests/ensemble_evaluator/test_ensemble_legacy.py @@ -15,15 +15,10 @@ @pytest.mark.timeout(60) -def test_run_legacy_ensemble(tmpdir, make_ensemble_builder, monkeypatch): - _test_run_legacy_ensemble(tmpdir, make_ensemble_builder, monkeypatch) - monkeypatch.setattr(FeatureToggling._conf["scheduler"], "is_enabled", True) - shutil.rmtree(tmpdir) - tmpdir.mkdir() - _test_run_legacy_ensemble(tmpdir, make_ensemble_builder, monkeypatch) - - -def _test_run_legacy_ensemble(tmpdir, make_ensemble_builder, monkeypatch): +@pytest.mark.scheduler +def test_run_legacy_ensemble( + tmpdir, make_ensemble_builder, monkeypatch, try_queue_and_scheduler +): num_reals = 2 custom_port_range = range(1024, 65535) with tmpdir.as_cwd(): @@ -55,15 +50,10 @@ def _test_run_legacy_ensemble(tmpdir, make_ensemble_builder, monkeypatch): @pytest.mark.timeout(60) -def test_run_and_cancel_legacy_ensemble(tmpdir, make_ensemble_builder, monkeypatch): - _test_run_and_cancel_legacy_ensemble(tmpdir, make_ensemble_builder, monkeypatch) - monkeypatch.setattr(FeatureToggling._conf["scheduler"], "is_enabled", True) - shutil.rmtree(tmpdir) - tmpdir.mkdir() - _test_run_and_cancel_legacy_ensemble(tmpdir, make_ensemble_builder, monkeypatch) - - -def _test_run_and_cancel_legacy_ensemble(tmpdir, make_ensemble_builder, monkeypatch): +@pytest.mark.scheduler +def test_run_and_cancel_legacy_ensemble( + tmpdir, make_ensemble_builder, monkeypatch, try_queue_and_scheduler +): num_reals = 2 custom_port_range = range(1024, 65535) with tmpdir.as_cwd(): @@ -99,7 +89,11 @@ def _test_run_and_cancel_legacy_ensemble(tmpdir, make_ensemble_builder, monkeypa @pytest.mark.timeout(10) -def test_run_legacy_ensemble_exception(tmpdir, make_ensemble_builder, monkeypatch): +def test_run_legacy_ensemble_with_bare_exception( + tmpdir, make_ensemble_builder, monkeypatch +): + """This test function is not ported to Scheduler, as it will not + catch general exceptions.""" num_reals = 2 custom_port_range = range(1024, 65535) with tmpdir.as_cwd(): From b22ce4f6728d2d6fb58519accff1073cc541cf37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5vard=20Berland?= Date: Mon, 11 Dec 2023 15:58:58 +0100 Subject: [PATCH 2/2] Support max_runtime in Scheduler --- .../ensemble_evaluator/_builder/_legacy.py | 25 ++++--- src/ert/scheduler/job.py | 26 ++++++- tests/unit_tests/scheduler/test_scheduler.py | 74 ++++++++++++++++++- 3 files changed, 113 insertions(+), 12 deletions(-) diff --git a/src/ert/ensemble_evaluator/_builder/_legacy.py b/src/ert/ensemble_evaluator/_builder/_legacy.py index 1859224be62..5a70bedacdc 100644 --- a/src/ert/ensemble_evaluator/_builder/_legacy.py +++ b/src/ert/ensemble_evaluator/_builder/_legacy.py @@ -87,6 +87,9 @@ def setup_timeout_callback( cloudevent_unary_send: Callable[[CloudEvent], Awaitable[None]], event_generator: Callable[[str, Optional[int]], CloudEvent], ) -> Tuple[Callable[[int], None], asyncio.Task[None]]: + """This function is reimplemented inside the Scheduler and should + be removed when Scheduler is the only queue code.""" + def on_timeout(iens: int) -> None: timeout_queue.put_nowait( event_generator(identifiers.EVTYPE_REALIZATION_TIMEOUT, iens) @@ -173,14 +176,16 @@ async def _evaluate_inner( # pylint: disable=too-many-branches is a function (or bound method) that only takes a CloudEvent as a positional argument. """ - # Set up the timeout-mechanism - timeout_queue = asyncio.Queue() # type: ignore - # Based on the experiment id the generator will - # give a function returning cloud event event_creator = self.generate_event_creator(experiment_id=experiment_id) - on_timeout, send_timeout_future = self.setup_timeout_callback( - timeout_queue, cloudevent_unary_send, event_creator - ) + timeout_queue: Optional[asyncio.Queue[Any]] = None + if not FeatureToggling.is_enabled("scheduler"): + # Set up the timeout-mechanism + timeout_queue = asyncio.Queue() + # Based on the experiment id the generator will + # give a function returning cloud event + on_timeout, send_timeout_future = self.setup_timeout_callback( + timeout_queue, cloudevent_unary_send, event_creator + ) if not self.id_: raise ValueError("Ensemble id not set") @@ -235,8 +240,10 @@ async def _evaluate_inner( # pylint: disable=too-many-branches ) result = identifiers.EVTYPE_ENSEMBLE_FAILED - await timeout_queue.put(None) # signal to exit timer - await send_timeout_future + if not isinstance(self._job_queue, Scheduler): + assert timeout_queue is not None + await timeout_queue.put(None) # signal to exit timer + await send_timeout_future # Dispatch final result from evaluator - FAILED, CANCEL or STOPPED await cloudevent_unary_send(event_creator(result, None)) diff --git a/src/ert/scheduler/job.py b/src/ert/scheduler/job.py index 11a1f36224a..71831c533b6 100644 --- a/src/ert/scheduler/job.py +++ b/src/ert/scheduler/job.py @@ -2,13 +2,15 @@ import asyncio import logging +import uuid from enum import Enum -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Optional from cloudevents.conversion import to_json from cloudevents.http import CloudEvent from ert.callbacks import forward_model_ok +from ert.ensemble_evaluator.identifiers import EVTYPE_REALIZATION_TIMEOUT from ert.job_queue.queue import _queue_state_event_type from ert.load_status import LoadStatus from ert.scheduler.driver import Driver @@ -67,6 +69,8 @@ def driver(self) -> Driver: async def _submit_and_run_once(self, sem: asyncio.BoundedSemaphore) -> None: await sem.acquire() + timeout_task: Optional[asyncio.Task[None]] = None + try: await self._send(State.SUBMITTING) await self.driver.submit( @@ -77,6 +81,8 @@ async def _submit_and_run_once(self, sem: asyncio.BoundedSemaphore) -> None: await self.started.wait() await self._send(State.RUNNING) + if self.real.max_runtime is not None and self.real.max_runtime > 0: + timeout_task = asyncio.create_task(self._max_runtime_task()) while not self.returncode.done(): await asyncio.sleep(0.01) returncode = await self.returncode @@ -95,10 +101,11 @@ async def _submit_and_run_once(self, sem: asyncio.BoundedSemaphore) -> None: except asyncio.CancelledError: await self._send(State.ABORTING) await self.driver.kill(self.iens) - await self.aborted.wait() await self._send(State.ABORTED) finally: + if timeout_task and not timeout_task.done(): + timeout_task.cancel() sem.release() async def __call__( @@ -121,6 +128,21 @@ async def __call__( ) logger.error(message) + async def _max_runtime_task(self) -> None: + assert self.real.max_runtime is not None + await asyncio.sleep(self.real.max_runtime) + timeout_event = CloudEvent( + { + "type": EVTYPE_REALIZATION_TIMEOUT, + "source": f"/ert/ensemble/{self._scheduler._ens_id}/real/{self.iens}", + "id": str(uuid.uuid1()), + } + ) + assert self._scheduler._events is not None + await self._scheduler._events.put(to_json(timeout_event)) + + self.returncode.cancel() # Triggers CancelledError + async def _send(self, state: State) -> None: status = STATE_TO_LEGACY[state] event = CloudEvent( diff --git a/tests/unit_tests/scheduler/test_scheduler.py b/tests/unit_tests/scheduler/test_scheduler.py index 33bfc08f252..c32588db69f 100644 --- a/tests/unit_tests/scheduler/test_scheduler.py +++ b/tests/unit_tests/scheduler/test_scheduler.py @@ -4,9 +4,10 @@ from pathlib import Path import pytest +from cloudevents.http import CloudEvent, from_json from ert.ensemble_evaluator._builder._realization import Realization -from ert.job_queue.queue import EVTYPE_ENSEMBLE_STOPPED +from ert.job_queue.queue import EVTYPE_ENSEMBLE_CANCELLED, EVTYPE_ENSEMBLE_STOPPED from ert.run_arg import RunArg from ert.scheduler import scheduler @@ -177,3 +178,74 @@ async def wait(): assert await sch.execute() == EVTYPE_ENSEMBLE_STOPPED assert retries == max_submit + + +@pytest.mark.timeout(10) +async def test_max_runtime(realization, mock_driver): + wait_started = asyncio.Event() + + async def wait(): + wait_started.set() + await asyncio.sleep(100) + + realization.max_runtime = 1 + + sch = scheduler.Scheduler(mock_driver(wait=wait), [realization]) + + result = await asyncio.create_task(sch.execute()) + assert wait_started.is_set() + assert result == EVTYPE_ENSEMBLE_STOPPED + + timeouteventfound = False + while not timeouteventfound and not sch._events.empty(): + event = await sch._events.get() + if from_json(event)["type"] == "com.equinor.ert.realization.timeout": + timeouteventfound = True + assert timeouteventfound + + +@pytest.mark.timeout(6) +async def test_max_runtime_while_killing(realization, mock_driver): + wait_started = asyncio.Event() + now_kill_me = asyncio.Event() + + async def wait(): + # A realization function that lives forever if it was not killed + wait_started.set() + await asyncio.sleep(0.1) + now_kill_me.set() + await asyncio.sleep(1000) + + async def kill(): + # A kill function that is triggered before the timeout, but finishes + # after MAX_RUNTIME + await asyncio.sleep(1) + + realization.max_runtime = 1 + + sch = scheduler.Scheduler(mock_driver(wait=wait, kill=kill), [realization]) + + scheduler_task = asyncio.create_task(sch.execute()) + + await now_kill_me.wait() + sch.kill_all_jobs() + + # Sleep until max_runtime must have kicked in: + await asyncio.sleep(1.1) + + timeouteventfound = False + while not timeouteventfound and not sch._events.empty(): + event = await sch._events.get() + if from_json(event)["type"] == "com.equinor.ert.realization.timeout": + timeouteventfound = True + + # Assert that a timeout_event is actually emitted, because killing took a + # long time, and that we should exit normally (asserting no bad things + # happen just because we have two things killing the realization). + + assert timeouteventfound + await scheduler_task + + # The result from execute is that we were cancelled, not stopped + # as if the timeout happened before kill_all_jobs() + assert scheduler_task.result() == EVTYPE_ENSEMBLE_CANCELLED