Skip to content

Commit

Permalink
Test ensemble_evaluator with new scheduler
Browse files Browse the repository at this point in the history
This highlights a behavioural change in the new LocalDriver, it will
not send the same events as the legacy local driver, see
test_async_queue_execution.py::test_happy_path
  • Loading branch information
berland committed Dec 22, 2023
1 parent 4a3723b commit cd3d4d2
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 34 deletions.
8 changes: 4 additions & 4 deletions src/ert/scheduler/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
class State(str, Enum):
WAITING = "WAITING"
SUBMITTING = "SUBMITTING"
STARTING = "STARTING"
PENDING = "PENDING"
RUNNING = "RUNNING"
ABORTING = "ABORTING"
COMPLETED = "COMPLETED"
Expand All @@ -34,7 +34,7 @@ class State(str, Enum):
STATE_TO_LEGACY = {
State.WAITING: "WAITING",
State.SUBMITTING: "SUBMITTED",
State.STARTING: "PENDING",
State.PENDING: "PENDING",
State.RUNNING: "RUNNING",
State.ABORTING: "DO_KILL",
State.COMPLETED: "SUCCESS",
Expand Down Expand Up @@ -73,7 +73,7 @@ async def _submit_and_run_once(self, sem: asyncio.BoundedSemaphore) -> None:
self.real.iens, self.real.job_script, cwd=self.real.run_arg.runpath
)

await self._send(State.STARTING)
await self._send(State.PENDING)
await self.started.wait()

await self._send(State.RUNNING)
Expand Down Expand Up @@ -126,7 +126,7 @@ async def _send(self, state: State) -> None:
event = CloudEvent(
{
"type": _queue_state_event_type(status),
"source": f"/etc/ensemble/{self._scheduler._ens_id}/real/{self.iens}",
"source": f"/ert/ensemble/{self._scheduler._ens_id}/real/{self.iens}",
"datacontenttype": "application/json",
},
{
Expand Down
40 changes: 30 additions & 10 deletions tests/unit_tests/ensemble_evaluator/test_async_queue_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from ert.async_utils import get_event_loop
from ert.ensemble_evaluator._wait_for_evaluator import wait_for_evaluator
from ert.job_queue import Driver, JobQueue
from ert.scheduler import Scheduler, create_driver
from ert.shared.feature_toggling import FeatureToggling


async def mock_ws(host, port, done):
Expand All @@ -33,6 +35,7 @@ async def _handler(websocket, path):

@pytest.mark.asyncio
@pytest.mark.timeout(60)
@pytest.mark.scheduler
async def test_happy_path(
tmpdir,
unused_tcp_port,
Expand All @@ -41,6 +44,7 @@ async def test_happy_path(
queue_config,
caplog,
monkeypatch,
try_queue_and_scheduler,
):
asyncio.set_event_loop(event_loop)
host = "localhost"
Expand All @@ -51,9 +55,16 @@ async def test_happy_path(
await wait_for_evaluator(base_url=url, timeout=5)

ensemble = make_ensemble_builder(monkeypatch, tmpdir, 1, 1).build()
queue = JobQueue(queue_config, ensemble.reals, ee_uri=url, ens_id="ee_0")

if FeatureToggling.is_enabled("scheduler"):
queue = Scheduler(
create_driver(queue_config), ensemble.reals, ee_uri=url, ens_id="ee_0"
)
else:
queue = JobQueue(queue_config, ensemble.reals, ee_uri=url, ens_id="ee_0")

await queue.execute()

done.set_result(None)

await mock_ws_task
Expand All @@ -62,12 +73,21 @@ async def test_happy_path(

assert mock_ws_task.done()

event_0 = from_json(mock_ws_task.result()[0])
assert event_0["source"] == "/ert/ensemble/ee_0/real/0"
assert event_0["type"] == "com.equinor.ert.realization.waiting"
assert event_0.data == {"queue_event_type": "WAITING"}

end_event_index = len(mock_ws_task.result()) - 1
end_event = from_json(mock_ws_task.result()[end_event_index])
assert end_event["type"] == "com.equinor.ert.realization.success"
assert end_event.data == {"queue_event_type": "SUCCESS"}
if FeatureToggling.is_enabled("scheduler"):
first_expected_queue_event_type = "SUBMITTED"
else:
first_expected_queue_event_type = "WAITING"

for received_event, expected_type, expected_queue_event_type in zip(
[mock_ws_task.result()[0], mock_ws_task.result()[-1]],
["waiting", "success"],
[first_expected_queue_event_type, "SUCCESS"],
):
assert from_json(received_event)["source"] == "/ert/ensemble/ee_0/real/0"
assert (
from_json(received_event)["type"]
== f"com.equinor.ert.realization.{expected_type}"
)
assert from_json(received_event).data == {
"queue_event_type": expected_queue_event_type
}
3 changes: 2 additions & 1 deletion tests/unit_tests/ensemble_evaluator/test_ensemble_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@


@pytest.mark.parametrize("active_real", [True, False])
def test_build_ensemble(active_real):
@pytest.mark.scheduler
def test_build_ensemble(active_real, monkeypatch, try_queue_and_scheduler):
ensemble = (
EnsembleBuilder()
.set_legacy_dependencies(QueueConfig(queue_system=QueueSystem.LOCAL), False, 0)
Expand Down
31 changes: 12 additions & 19 deletions tests/unit_tests/ensemble_evaluator/test_ensemble_legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,10 @@


@pytest.mark.timeout(60)
def test_run_legacy_ensemble(tmpdir, make_ensemble_builder, monkeypatch):
_test_run_legacy_ensemble(tmpdir, make_ensemble_builder, monkeypatch)
monkeypatch.setattr(FeatureToggling._conf["scheduler"], "is_enabled", True)
shutil.rmtree(tmpdir)
tmpdir.mkdir()
_test_run_legacy_ensemble(tmpdir, make_ensemble_builder, monkeypatch)


def _test_run_legacy_ensemble(tmpdir, make_ensemble_builder, monkeypatch):
@pytest.mark.scheduler
def test_run_legacy_ensemble(
tmpdir, make_ensemble_builder, monkeypatch, try_queue_and_scheduler
):
num_reals = 2
custom_port_range = range(1024, 65535)
with tmpdir.as_cwd():
Expand Down Expand Up @@ -55,15 +50,10 @@ def _test_run_legacy_ensemble(tmpdir, make_ensemble_builder, monkeypatch):


@pytest.mark.timeout(60)
def test_run_and_cancel_legacy_ensemble(tmpdir, make_ensemble_builder, monkeypatch):
_test_run_and_cancel_legacy_ensemble(tmpdir, make_ensemble_builder, monkeypatch)
monkeypatch.setattr(FeatureToggling._conf["scheduler"], "is_enabled", True)
shutil.rmtree(tmpdir)
tmpdir.mkdir()
_test_run_and_cancel_legacy_ensemble(tmpdir, make_ensemble_builder, monkeypatch)


def _test_run_and_cancel_legacy_ensemble(tmpdir, make_ensemble_builder, monkeypatch):
@pytest.mark.scheduler
def test_run_and_cancel_legacy_ensemble(
tmpdir, make_ensemble_builder, monkeypatch, try_queue_and_scheduler
):
num_reals = 2
custom_port_range = range(1024, 65535)
with tmpdir.as_cwd():
Expand Down Expand Up @@ -99,7 +89,10 @@ def _test_run_and_cancel_legacy_ensemble(tmpdir, make_ensemble_builder, monkeypa


@pytest.mark.timeout(10)
def test_run_legacy_ensemble_exception(tmpdir, make_ensemble_builder, monkeypatch):
@pytest.mark.scheduler
def test_run_legacy_ensemble_exception(
tmpdir, make_ensemble_builder, monkeypatch, try_queue_and_scheduler
):
num_reals = 2
custom_port_range = range(1024, 65535)
with tmpdir.as_cwd():
Expand Down

0 comments on commit cd3d4d2

Please sign in to comment.