From cd3d4d2a990be3339556ab08a0561ef7e5a74afe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5vard=20Berland?= Date: Thu, 14 Dec 2023 09:01:49 +0100 Subject: [PATCH] Test ensemble_evaluator with new scheduler This highlights a behavioural change in the new LocalDriver, it will not send the same events as the legacy local driver, see test_async_queue_execution.py::test_happy_path --- src/ert/scheduler/job.py | 8 ++-- .../test_async_queue_execution.py | 40 ++++++++++++++----- .../test_ensemble_builder.py | 3 +- .../test_ensemble_legacy.py | 31 ++++++-------- 4 files changed, 48 insertions(+), 34 deletions(-) diff --git a/src/ert/scheduler/job.py b/src/ert/scheduler/job.py index 7aa17ea1b25..11a1f36224a 100644 --- a/src/ert/scheduler/job.py +++ b/src/ert/scheduler/job.py @@ -23,7 +23,7 @@ class State(str, Enum): WAITING = "WAITING" SUBMITTING = "SUBMITTING" - STARTING = "STARTING" + PENDING = "PENDING" RUNNING = "RUNNING" ABORTING = "ABORTING" COMPLETED = "COMPLETED" @@ -34,7 +34,7 @@ class State(str, Enum): STATE_TO_LEGACY = { State.WAITING: "WAITING", State.SUBMITTING: "SUBMITTED", - State.STARTING: "PENDING", + State.PENDING: "PENDING", State.RUNNING: "RUNNING", State.ABORTING: "DO_KILL", State.COMPLETED: "SUCCESS", @@ -73,7 +73,7 @@ async def _submit_and_run_once(self, sem: asyncio.BoundedSemaphore) -> None: self.real.iens, self.real.job_script, cwd=self.real.run_arg.runpath ) - await self._send(State.STARTING) + await self._send(State.PENDING) await self.started.wait() await self._send(State.RUNNING) @@ -126,7 +126,7 @@ async def _send(self, state: State) -> None: event = CloudEvent( { "type": _queue_state_event_type(status), - "source": f"/etc/ensemble/{self._scheduler._ens_id}/real/{self.iens}", + "source": f"/ert/ensemble/{self._scheduler._ens_id}/real/{self.iens}", "datacontenttype": "application/json", }, { diff --git a/tests/unit_tests/ensemble_evaluator/test_async_queue_execution.py b/tests/unit_tests/ensemble_evaluator/test_async_queue_execution.py index dc256832cb8..1098dc728c8 100644 --- a/tests/unit_tests/ensemble_evaluator/test_async_queue_execution.py +++ b/tests/unit_tests/ensemble_evaluator/test_async_queue_execution.py @@ -9,6 +9,8 @@ from ert.async_utils import get_event_loop from ert.ensemble_evaluator._wait_for_evaluator import wait_for_evaluator from ert.job_queue import Driver, JobQueue +from ert.scheduler import Scheduler, create_driver +from ert.shared.feature_toggling import FeatureToggling async def mock_ws(host, port, done): @@ -33,6 +35,7 @@ async def _handler(websocket, path): @pytest.mark.asyncio @pytest.mark.timeout(60) +@pytest.mark.scheduler async def test_happy_path( tmpdir, unused_tcp_port, @@ -41,6 +44,7 @@ async def test_happy_path( queue_config, caplog, monkeypatch, + try_queue_and_scheduler, ): asyncio.set_event_loop(event_loop) host = "localhost" @@ -51,9 +55,16 @@ async def test_happy_path( await wait_for_evaluator(base_url=url, timeout=5) ensemble = make_ensemble_builder(monkeypatch, tmpdir, 1, 1).build() - queue = JobQueue(queue_config, ensemble.reals, ee_uri=url, ens_id="ee_0") + + if FeatureToggling.is_enabled("scheduler"): + queue = Scheduler( + create_driver(queue_config), ensemble.reals, ee_uri=url, ens_id="ee_0" + ) + else: + queue = JobQueue(queue_config, ensemble.reals, ee_uri=url, ens_id="ee_0") await queue.execute() + done.set_result(None) await mock_ws_task @@ -62,12 +73,21 @@ async def test_happy_path( assert mock_ws_task.done() - event_0 = from_json(mock_ws_task.result()[0]) - assert event_0["source"] == "/ert/ensemble/ee_0/real/0" - assert event_0["type"] == "com.equinor.ert.realization.waiting" - assert event_0.data == {"queue_event_type": "WAITING"} - - end_event_index = len(mock_ws_task.result()) - 1 - end_event = from_json(mock_ws_task.result()[end_event_index]) - assert end_event["type"] == "com.equinor.ert.realization.success" - assert end_event.data == {"queue_event_type": "SUCCESS"} + if FeatureToggling.is_enabled("scheduler"): + first_expected_queue_event_type = "SUBMITTED" + else: + first_expected_queue_event_type = "WAITING" + + for received_event, expected_type, expected_queue_event_type in zip( + [mock_ws_task.result()[0], mock_ws_task.result()[-1]], + ["waiting", "success"], + [first_expected_queue_event_type, "SUCCESS"], + ): + assert from_json(received_event)["source"] == "/ert/ensemble/ee_0/real/0" + assert ( + from_json(received_event)["type"] + == f"com.equinor.ert.realization.{expected_type}" + ) + assert from_json(received_event).data == { + "queue_event_type": expected_queue_event_type + } diff --git a/tests/unit_tests/ensemble_evaluator/test_ensemble_builder.py b/tests/unit_tests/ensemble_evaluator/test_ensemble_builder.py index 4af6b69d6a0..ccd1515aab7 100644 --- a/tests/unit_tests/ensemble_evaluator/test_ensemble_builder.py +++ b/tests/unit_tests/ensemble_evaluator/test_ensemble_builder.py @@ -10,7 +10,8 @@ @pytest.mark.parametrize("active_real", [True, False]) -def test_build_ensemble(active_real): +@pytest.mark.scheduler +def test_build_ensemble(active_real, monkeypatch, try_queue_and_scheduler): ensemble = ( EnsembleBuilder() .set_legacy_dependencies(QueueConfig(queue_system=QueueSystem.LOCAL), False, 0) diff --git a/tests/unit_tests/ensemble_evaluator/test_ensemble_legacy.py b/tests/unit_tests/ensemble_evaluator/test_ensemble_legacy.py index ddc814d2cd5..e5d7b0358a3 100644 --- a/tests/unit_tests/ensemble_evaluator/test_ensemble_legacy.py +++ b/tests/unit_tests/ensemble_evaluator/test_ensemble_legacy.py @@ -15,15 +15,10 @@ @pytest.mark.timeout(60) -def test_run_legacy_ensemble(tmpdir, make_ensemble_builder, monkeypatch): - _test_run_legacy_ensemble(tmpdir, make_ensemble_builder, monkeypatch) - monkeypatch.setattr(FeatureToggling._conf["scheduler"], "is_enabled", True) - shutil.rmtree(tmpdir) - tmpdir.mkdir() - _test_run_legacy_ensemble(tmpdir, make_ensemble_builder, monkeypatch) - - -def _test_run_legacy_ensemble(tmpdir, make_ensemble_builder, monkeypatch): +@pytest.mark.scheduler +def test_run_legacy_ensemble( + tmpdir, make_ensemble_builder, monkeypatch, try_queue_and_scheduler +): num_reals = 2 custom_port_range = range(1024, 65535) with tmpdir.as_cwd(): @@ -55,15 +50,10 @@ def _test_run_legacy_ensemble(tmpdir, make_ensemble_builder, monkeypatch): @pytest.mark.timeout(60) -def test_run_and_cancel_legacy_ensemble(tmpdir, make_ensemble_builder, monkeypatch): - _test_run_and_cancel_legacy_ensemble(tmpdir, make_ensemble_builder, monkeypatch) - monkeypatch.setattr(FeatureToggling._conf["scheduler"], "is_enabled", True) - shutil.rmtree(tmpdir) - tmpdir.mkdir() - _test_run_and_cancel_legacy_ensemble(tmpdir, make_ensemble_builder, monkeypatch) - - -def _test_run_and_cancel_legacy_ensemble(tmpdir, make_ensemble_builder, monkeypatch): +@pytest.mark.scheduler +def test_run_and_cancel_legacy_ensemble( + tmpdir, make_ensemble_builder, monkeypatch, try_queue_and_scheduler +): num_reals = 2 custom_port_range = range(1024, 65535) with tmpdir.as_cwd(): @@ -99,7 +89,10 @@ def _test_run_and_cancel_legacy_ensemble(tmpdir, make_ensemble_builder, monkeypa @pytest.mark.timeout(10) -def test_run_legacy_ensemble_exception(tmpdir, make_ensemble_builder, monkeypatch): +@pytest.mark.scheduler +def test_run_legacy_ensemble_exception( + tmpdir, make_ensemble_builder, monkeypatch, try_queue_and_scheduler +): num_reals = 2 custom_port_range = range(1024, 65535) with tmpdir.as_cwd():