From e7ec249be8d71d580045fcf16b9370040bfc56e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5vard=20Berland?= Date: Wed, 25 Oct 2023 11:05:13 +0200 Subject: [PATCH] Remove step states from snapshots. The step state is moved upwards to the realization. --- src/_ert_job_runner/runner.py | 2 - .../ensemble_evaluator/_builder/_ensemble.py | 2 - .../ensemble_evaluator/_builder/_legacy.py | 2 +- src/ert/ensemble_evaluator/identifiers.py | 32 ++-- src/ert/ensemble_evaluator/snapshot.py | 157 ++++-------------- src/ert/job_queue/queue.py | 43 +++-- tests/performance_tests/test_snapshot.py | 6 +- .../unit_tests/ensemble_evaluator/conftest.py | 1 - .../ensemble_evaluator_utils.py | 14 +- .../test_async_queue_execution.py | 6 +- .../ensemble_evaluator/test_dispatch.py | 11 +- .../test_ensemble_evaluator.py | 36 ++-- .../test_evaluator_tracker.py | 64 ++++--- .../ensemble_evaluator/test_snapshot.py | 13 +- tests/unit_tests/gui/conftest.py | 11 +- tests/unit_tests/gui/model/test_real_list.py | 2 +- .../gui/simulation/test_run_dialog.py | 17 +- 17 files changed, 159 insertions(+), 260 deletions(-) diff --git a/src/_ert_job_runner/runner.py b/src/_ert_job_runner/runner.py index 88f607b6e8b..5259fbac173 100644 --- a/src/_ert_job_runner/runner.py +++ b/src/_ert_job_runner/runner.py @@ -10,7 +10,6 @@ def __init__(self, jobs_data): self.experiment_id = jobs_data.get("experiment_id") self.ens_id = jobs_data.get("ens_id") self.real_id = jobs_data.get("real_id") - self.step_id = jobs_data.get("step_id") self.ert_pid = jobs_data.get("ert_pid") self.global_environment = jobs_data.get("global_environment") job_data_list = jobs_data["jobList"] @@ -38,7 +37,6 @@ def run(self, names_of_jobs_to_run): self.ert_pid, self.ens_id, self.real_id, - self.step_id, self.experiment_id, ) diff --git a/src/ert/ensemble_evaluator/_builder/_ensemble.py b/src/ert/ensemble_evaluator/_builder/_ensemble.py index 752bc000a6c..3bac61b70bd 100644 --- a/src/ert/ensemble_evaluator/_builder/_ensemble.py +++ b/src/ert/ensemble_evaluator/_builder/_ensemble.py @@ -22,7 +22,6 @@ RealizationSnapshot, Snapshot, SnapshotDict, - Step, ) from ert.serialization import evaluator_marshaller @@ -161,7 +160,6 @@ def _create_snapshot(self) -> Snapshot: active=True, status=state.REALIZATION_STATE_WAITING, ) - reals[str(real.iens)].step = Step(status=state.STEP_STATE_UNKNOWN) for job in real.jobs: reals[str(real.iens)].jobs[str(job.id_)] = Job( status=state.JOB_STATE_START, diff --git a/src/ert/ensemble_evaluator/_builder/_legacy.py b/src/ert/ensemble_evaluator/_builder/_legacy.py index ae6d7e58f2c..3f174ef66a6 100644 --- a/src/ert/ensemble_evaluator/_builder/_legacy.py +++ b/src/ert/ensemble_evaluator/_builder/_legacy.py @@ -82,7 +82,7 @@ def setup_timeout_callback( ) -> Tuple[Callable[[int], None], asyncio.Task[None]]: def on_timeout(iens: int) -> None: timeout_queue.put_nowait( - event_generator(identifiers.EVTYPE_FM_STEP_TIMEOUT, iens) + event_generator(identifiers.EVTYPE_REALIZATION_TIMEOUT, iens) ) async def send_timeout_message() -> None: diff --git a/src/ert/ensemble_evaluator/identifiers.py b/src/ert/ensemble_evaluator/identifiers.py index 43b6c85968c..97bba0ad3cd 100644 --- a/src/ert/ensemble_evaluator/identifiers.py +++ b/src/ert/ensemble_evaluator/identifiers.py @@ -17,13 +17,13 @@ STDOUT = "stdout" STEPS = "steps" -EVTYPE_FM_STEP_FAILURE = "com.equinor.ert.forward_model_step.failure" -EVTYPE_FM_STEP_PENDING = "com.equinor.ert.forward_model_step.pending" -EVTYPE_FM_STEP_RUNNING = "com.equinor.ert.forward_model_step.running" -EVTYPE_FM_STEP_SUCCESS = "com.equinor.ert.forward_model_step.success" -EVTYPE_FM_STEP_UNKNOWN = "com.equinor.ert.forward_model_step.unknown" -EVTYPE_FM_STEP_WAITING = "com.equinor.ert.forward_model_step.waiting" -EVTYPE_FM_STEP_TIMEOUT = "com.equinor.ert.forward_model_step.timeout" +EVTYPE_REALIZATION_FAILURE = "com.equinor.ert.realization.failure" +EVTYPE_REALIZATION_PENDING = "com.equinor.ert.realization.pending" +EVTYPE_REALIZATION_RUNNING = "com.equinor.ert.realization.running" +EVTYPE_REALIZATION_SUCCESS = "com.equinor.ert.realization.success" +EVTYPE_REALIZATION_UNKNOWN = "com.equinor.ert.realization.unknown" +EVTYPE_REALIZATION_WAITING = "com.equinor.ert.realization.waiting" +EVTYPE_REALIZATION_TIMEOUT = "com.equinor.ert.realization.timeout" EVTYPE_FM_JOB_START = "com.equinor.ert.forward_model_job.start" EVTYPE_FM_JOB_RUNNING = "com.equinor.ert.forward_model_job.running" @@ -31,14 +31,14 @@ EVTYPE_FM_JOB_FAILURE = "com.equinor.ert.forward_model_job.failure" -EVGROUP_FM_STEP = { - EVTYPE_FM_STEP_FAILURE, - EVTYPE_FM_STEP_PENDING, - EVTYPE_FM_STEP_RUNNING, - EVTYPE_FM_STEP_SUCCESS, - EVTYPE_FM_STEP_UNKNOWN, - EVTYPE_FM_STEP_WAITING, - EVTYPE_FM_STEP_TIMEOUT, +EVGROUP_REALIZATION = { + EVTYPE_REALIZATION_FAILURE, + EVTYPE_REALIZATION_PENDING, + EVTYPE_REALIZATION_RUNNING, + EVTYPE_REALIZATION_SUCCESS, + EVTYPE_REALIZATION_UNKNOWN, + EVTYPE_REALIZATION_WAITING, + EVTYPE_REALIZATION_TIMEOUT, } EVGROUP_FM_JOB = { @@ -48,7 +48,7 @@ EVTYPE_FM_JOB_FAILURE, } -EVGROUP_FM_ALL = EVGROUP_FM_STEP | EVGROUP_FM_JOB +EVGROUP_FM_ALL = EVGROUP_REALIZATION | EVGROUP_FM_JOB EVTYPE_EE_SNAPSHOT = "com.equinor.ert.ee.snapshot" EVTYPE_EE_SNAPSHOT_UPDATE = "com.equinor.ert.ee.snapshot_update" diff --git a/src/ert/ensemble_evaluator/snapshot.py b/src/ert/ensemble_evaluator/snapshot.py index 59692d75975..3ac7478e7a8 100644 --- a/src/ert/ensemble_evaluator/snapshot.py +++ b/src/ert/ensemble_evaluator/snapshot.py @@ -37,13 +37,13 @@ class UnsupportedOperationException(ValueError): _FM_TYPE_EVENT_TO_STATUS = { - ids.EVTYPE_FM_STEP_WAITING: state.STEP_STATE_WAITING, - ids.EVTYPE_FM_STEP_PENDING: state.STEP_STATE_PENDING, - ids.EVTYPE_FM_STEP_RUNNING: state.STEP_STATE_RUNNING, - ids.EVTYPE_FM_STEP_FAILURE: state.STEP_STATE_FAILURE, - ids.EVTYPE_FM_STEP_SUCCESS: state.STEP_STATE_SUCCESS, - ids.EVTYPE_FM_STEP_UNKNOWN: state.STEP_STATE_UNKNOWN, - ids.EVTYPE_FM_STEP_TIMEOUT: state.STEP_STATE_FAILURE, + ids.EVTYPE_REALIZATION_WAITING: state.REALIZATION_STATE_WAITING, + ids.EVTYPE_REALIZATION_PENDING: state.REALIZATION_STATE_PENDING, + ids.EVTYPE_REALIZATION_RUNNING: state.REALIZATION_STATE_RUNNING, + ids.EVTYPE_REALIZATION_FAILURE: state.REALIZATION_STATE_FAILED, + ids.EVTYPE_REALIZATION_SUCCESS: state.REALIZATION_STATE_FINISHED, + ids.EVTYPE_REALIZATION_UNKNOWN: state.REALIZATION_STATE_UNKNOWN, + ids.EVTYPE_REALIZATION_TIMEOUT: state.REALIZATION_STATE_FAILED, ids.EVTYPE_FM_JOB_START: state.JOB_STATE_START, ids.EVTYPE_FM_JOB_RUNNING: state.JOB_STATE_RUNNING, ids.EVTYPE_FM_JOB_SUCCESS: state.JOB_STATE_FINISHED, @@ -57,14 +57,6 @@ class UnsupportedOperationException(ValueError): ids.EVTYPE_ENSEMBLE_FAILED: state.ENSEMBLE_STATE_FAILED, } -_STEP_STATE_TO_REALIZATION_STATE = { - state.STEP_STATE_WAITING: state.REALIZATION_STATE_WAITING, - state.STEP_STATE_PENDING: state.REALIZATION_STATE_PENDING, - state.STEP_STATE_RUNNING: state.REALIZATION_STATE_RUNNING, - state.STEP_STATE_UNKNOWN: state.REALIZATION_STATE_UNKNOWN, - state.STEP_STATE_FAILURE: state.REALIZATION_STATE_FAILED, -} - def convert_iso8601_to_datetime( timestamp: Union[datetime.datetime, str] @@ -88,13 +80,6 @@ def __init__(self, snapshot: Optional["Snapshot"] = None) -> None: realization number, pointing to a dict with keys active (bool), start_time (datetime), end_time (datetime) and status (str).""" - self._step_states: Dict[ - str, Dict[str, Union[str, datetime.datetime]] - ] = defaultdict(dict) - """A shallow dictionary of step states. The key is a string - realization id, pointing to a dict with the same members as the Step - class.""" - self._job_states: Dict[ Tuple[str, str], Dict[str, Union[str, datetime.datetime]] ] = defaultdict(dict) @@ -118,18 +103,18 @@ def update_metadata(self, metadata: Dict[str, Any]) -> None: snapshot's metadata""" self._metadata.update(_filter_nones(metadata)) - def update_step(self, real_id: str, step: "Step") -> "PartialSnapshot": - step_update = _filter_nones( - { - "status": step.status, - "start_time": step.start_time, - "end_time": step.end_time, - } + def update_realization( + self, + real_id: str, + status: str, + start_time: Optional[datetime.datetime] = None, + end_time: Optional[datetime.datetime] = None, + ) -> "PartialSnapshot": + self._realization_states[real_id].update( + _filter_nones( + {"status": status, "start_time": start_time, "end_time": end_time} + ) ) - self._step_states[real_id].update(step_update) - if self._snapshot: - self._snapshot._my_partial._step_states[real_id].update(step_update) - self._propagate_step_update_to_realization(real_id) return self def update_job( @@ -145,35 +130,6 @@ def update_job( self._snapshot._my_partial._job_states[(real_id, job_id)].update(job_update) return self - def _propagate_step_update_to_realization(self, real_id: str) -> "PartialSnapshot": - step = self._step_states[real_id] - step_status = step.get("status") - assert isinstance(step_status, str) - assert self._snapshot is not None - - real_state = self._realization_states[real_id] - if real_state.get("status") == state.REALIZATION_STATE_FAILED: - return self - if ( - step_status in _STEP_STATE_TO_REALIZATION_STATE - ): # All but the finished state - self._realization_states[real_id].update( - {"status": _STEP_STATE_TO_REALIZATION_STATE[step_status]} - ) - elif ( - step_status == state.REALIZATION_STATE_FINISHED - and self._snapshot.step_finished(real_id) - ): - real_state["status"] = state.REALIZATION_STATE_FINISHED - elif ( - step_status == state.STEP_STATE_SUCCESS - and not self._snapshot.step_finished(real_id) - ): - pass - else: - raise ValueError(f"unknown step status {step_status} for real: {real_id}") - return self - def get_all_jobs( self, ) -> Mapping[Tuple[str, str], "Job"]: @@ -226,20 +182,12 @@ def to_dict(self) -> Dict[str, Any]: if self._realization_states: _dict["reals"] = self._realization_states - for real_id, step_state in self._step_states.items(): - if "reals" not in _dict: - _dict["reals"] = {real_id: {}} - if "step" not in _dict["reals"][real_id]: - _dict["reals"][real_id]["step"] = step_state - for job_tuple, job_values_dict in self._job_states.items(): real_id = job_tuple[0] if "reals" not in _dict: _dict["reals"] = {} if real_id not in _dict["reals"]: _dict["reals"][real_id] = {} - if "step" not in _dict["reals"][real_id]: - _dict["reals"][real_id]["step"] = {} if "jobs" not in _dict["reals"][real_id]: _dict["reals"][real_id]["jobs"] = {} @@ -257,8 +205,6 @@ def _merge(self, other: "PartialSnapshot") -> "PartialSnapshot": self._ensemble_state = other._ensemble_state for real_id, other_real_data in other._realization_states.items(): self._realization_states[real_id].update(other_real_data) - for real_id, other_step_data in other._step_states.items(): - self._step_states[real_id].update(other_step_data) for job_id, other_job_data in other._job_states.items(): self._job_states[job_id].update(other_job_data) return self @@ -266,7 +212,6 @@ def _merge(self, other: "PartialSnapshot") -> "PartialSnapshot": def from_cloudevent(self, event: CloudEvent) -> "PartialSnapshot": e_type = event["type"] e_source = event["source"] - status = _FM_TYPE_EVENT_TO_STATUS.get(e_type) timestamp = event["time"] if self._snapshot is None: @@ -274,31 +219,24 @@ def from_cloudevent(self, event: CloudEvent) -> "PartialSnapshot": f"updating {self.__class__} without a snapshot is not supported" ) - if e_type in ids.EVGROUP_FM_STEP: + if e_type in ids.EVGROUP_REALIZATION: + status = _FM_TYPE_EVENT_TO_STATUS[e_type] start_time = None end_time = None - if e_type == ids.EVTYPE_FM_STEP_RUNNING: + if e_type == ids.EVTYPE_REALIZATION_RUNNING: start_time = convert_iso8601_to_datetime(timestamp) elif e_type in { - ids.EVTYPE_FM_STEP_SUCCESS, - ids.EVTYPE_FM_STEP_FAILURE, - ids.EVTYPE_FM_STEP_TIMEOUT, + ids.EVTYPE_REALIZATION_SUCCESS, + ids.EVTYPE_REALIZATION_FAILURE, + ids.EVTYPE_REALIZATION_TIMEOUT, }: end_time = convert_iso8601_to_datetime(timestamp) - self.update_step( - _get_real_id(e_source), - Step( - **_filter_nones( - { - "status": status, - "start_time": start_time, - "end_time": end_time, - } - ) - ), + + self.update_realization( + _get_real_id(e_source), status, start_time, end_time ) - if e_type == ids.EVTYPE_FM_STEP_TIMEOUT: + if e_type == ids.EVTYPE_REALIZATION_TIMEOUT: for job_id, job in self._snapshot.get_jobs_for_real( _get_real_id(e_source) ).items(): @@ -317,6 +255,7 @@ def from_cloudevent(self, event: CloudEvent) -> "PartialSnapshot": ) elif e_type in ids.EVGROUP_FM_JOB: + status = _FM_TYPE_EVENT_TO_STATUS[e_type] start_time = None end_time = None if e_type == ids.EVTYPE_FM_JOB_START: @@ -400,9 +339,6 @@ def get_job_status_for_all_reals( def reals(self) -> Mapping[str, "RealizationSnapshot"]: return self._my_partial.reals - def step(self, real_id: str) -> Dict[str, Union[str, datetime.datetime]]: - return self._my_partial._step_states[real_id] - def get_jobs_for_real(self, real_id: str) -> Dict[str, "Job"]: return { job_idx[1]: Job(**job_data) @@ -413,17 +349,9 @@ def get_jobs_for_real(self, real_id: str) -> Dict[str, "Job"]: def get_real(self, real_id: str) -> "RealizationSnapshot": return RealizationSnapshot(**self._my_partial._realization_states[real_id]) - def get_step(self, real_id: str) -> "Step": - return Step(**self._my_partial._step_states[real_id]) - def get_job(self, real_id: str, job_id: str) -> "Job": return Job(**self._my_partial._job_states[(real_id, job_id)]) - def step_finished(self, real_id: str) -> bool: - return ( - self._my_partial._step_states[real_id]["status"] == state.STEP_STATE_SUCCESS - ) - def get_successful_realizations(self) -> int: return len( [ @@ -459,18 +387,11 @@ class Job(BaseModel): stderr: Optional[str] -class Step(BaseModel): - status: Optional[str] - start_time: Optional[datetime.datetime] - end_time: Optional[datetime.datetime] - - class RealizationSnapshot(BaseModel): status: Optional[str] active: Optional[bool] start_time: Optional[datetime.datetime] end_time: Optional[datetime.datetime] - step: Optional[Step] jobs: Dict[str, Job] = {} @@ -481,7 +402,6 @@ class SnapshotDict(BaseModel): class SnapshotBuilder(BaseModel): - step: Optional[Step] = None jobs: Dict[str, Job] = {} metadata: Dict[str, Any] = {} @@ -496,7 +416,6 @@ def build( for r_id in real_ids: top.reals[r_id] = RealizationSnapshot( active=True, - step=self.step, jobs=self.jobs, start_time=start_time, end_time=end_time, @@ -504,15 +423,6 @@ def build( ) return Snapshot(top.dict()) - def add_step( - self, - status: Optional[str], - start_time: Optional[datetime.datetime] = None, - end_time: Optional[datetime.datetime] = None, - ) -> "SnapshotBuilder": - self.step = Step(status=status, start_time=start_time, end_time=end_time) - return self - def add_job( self, job_id: str, @@ -555,15 +465,6 @@ def _from_nested_dict(data: Mapping[str, Any]) -> PartialSnapshot: "end_time": realization_data.get("end_time"), } ) - step_data = data["reals"][real_id].get("step", {}) - if step_data is not None: - partial._step_states[real_id] = _filter_nones( - { - "status": step_data.get("status"), - "start_time": step_data.get("start_time"), - "end_time": step_data.get("end_time"), - } - ) for job_id, job in realization_data.get("jobs", {}).items(): job_idx = (real_id, job_id) partial._job_states[job_idx] = job diff --git a/src/ert/job_queue/queue.py b/src/ert/job_queue/queue.py index c3b78e0aff8..4d2a56a780e 100644 --- a/src/ert/job_queue/queue.py +++ b/src/ert/job_queue/queue.py @@ -50,30 +50,29 @@ LONG_RUNNING_FACTOR = 1.25 - -_FM_STEP_FAILURE = "com.equinor.ert.forward_model_step.failure" -_FM_STEP_PENDING = "com.equinor.ert.forward_model_step.pending" -_FM_STEP_RUNNING = "com.equinor.ert.forward_model_step.running" -_FM_STEP_SUCCESS = "com.equinor.ert.forward_model_step.success" -_FM_STEP_UNKNOWN = "com.equinor.ert.forward_model_step.unknown" -_FM_STEP_WAITING = "com.equinor.ert.forward_model_step.waiting" - +EVTYPE_REALIZATION_FAILURE = "com.equinor.ert.realization.failure" +EVTYPE_REALIZATION_PENDING = "com.equinor.ert.realization.pending" +EVTYPE_REALIZATION_RUNNING = "com.equinor.ert.realization.running" +EVTYPE_REALIZATION_SUCCESS = "com.equinor.ert.realization.success" +EVTYPE_REALIZATION_UNKNOWN = "com.equinor.ert.realization.unknown" +EVTYPE_REALIZATION_WAITING = "com.equinor.ert.realization.waiting" +EVTYPE_REALIZATION_TIMEOUT = "com.equinor.ert.realization.timeout" _queue_state_to_event_type_map = { - "NOT_ACTIVE": _FM_STEP_WAITING, - "WAITING": _FM_STEP_WAITING, - "SUBMITTED": _FM_STEP_WAITING, - "PENDING": _FM_STEP_PENDING, - "RUNNING": _FM_STEP_RUNNING, - "DONE": _FM_STEP_RUNNING, - "EXIT": _FM_STEP_RUNNING, - "IS_KILLED": _FM_STEP_FAILURE, - "DO_KILL": _FM_STEP_FAILURE, - "SUCCESS": _FM_STEP_SUCCESS, - "STATUS_FAILURE": _FM_STEP_UNKNOWN, - "FAILED": _FM_STEP_FAILURE, - "DO_KILL_NODE_FAILURE": _FM_STEP_FAILURE, - "UNKNOWN": _FM_STEP_UNKNOWN, + "NOT_ACTIVE": EVTYPE_REALIZATION_WAITING, + "WAITING": EVTYPE_REALIZATION_WAITING, + "SUBMITTED": EVTYPE_REALIZATION_WAITING, + "PENDING": EVTYPE_REALIZATION_PENDING, + "RUNNING": EVTYPE_REALIZATION_RUNNING, + "DONE": EVTYPE_REALIZATION_RUNNING, + "EXIT": EVTYPE_REALIZATION_RUNNING, + "IS_KILLED": EVTYPE_REALIZATION_FAILURE, + "DO_KILL": EVTYPE_REALIZATION_FAILURE, + "SUCCESS": EVTYPE_REALIZATION_SUCCESS, + "STATUS_FAILURE": EVTYPE_REALIZATION_UNKNOWN, + "FAILED": EVTYPE_REALIZATION_FAILURE, + "DO_KILL_NODE_FAILURE": EVTYPE_REALIZATION_FAILURE, + "UNKNOWN": EVTYPE_REALIZATION_UNKNOWN, } diff --git a/tests/performance_tests/test_snapshot.py b/tests/performance_tests/test_snapshot.py index 381a4351043..9fb45342066 100644 --- a/tests/performance_tests/test_snapshot.py +++ b/tests/performance_tests/test_snapshot.py @@ -12,7 +12,6 @@ RealizationSnapshot, Snapshot, SnapshotDict, - Step, ) from ..unit_tests.gui.conftest import ( @@ -71,7 +70,6 @@ def simulate_forward_model_event_handling( active=True, status=state.REALIZATION_STATE_WAITING, ) - reals[str(real)].step = Step(status=state.STEP_STATE_UNKNOWN) for job_idx in range(forward_models): reals[f"{real}"].jobs[str(job_idx)] = Job( status=state.JOB_STATE_START, @@ -102,7 +100,7 @@ def simulate_forward_model_event_handling( CloudEvent( { "source": f"/ert/ensemble/{ens_id}/real/{real}", - "type": ids.EVTYPE_FM_STEP_WAITING, + "type": ids.EVTYPE_REALIZATION_WAITING, "id": str(uuid.uuid1()), } ) @@ -154,7 +152,7 @@ def simulate_forward_model_event_handling( CloudEvent( { "source": f"/ert/ensemble/{ens_id}/real/{real}", - "type": ids.EVTYPE_FM_STEP_SUCCESS, + "type": ids.EVTYPE_REALIZATION_SUCCESS, "id": str(uuid.uuid1()), } ) diff --git a/tests/unit_tests/ensemble_evaluator/conftest.py b/tests/unit_tests/ensemble_evaluator/conftest.py index 482d03f8fa4..6b3131b68ab 100644 --- a/tests/unit_tests/ensemble_evaluator/conftest.py +++ b/tests/unit_tests/ensemble_evaluator/conftest.py @@ -23,7 +23,6 @@ def snapshot(): return ( SnapshotBuilder() - .add_step(status="Unknown") .add_job( job_id="0", index="0", diff --git a/tests/unit_tests/ensemble_evaluator/ensemble_evaluator_utils.py b/tests/unit_tests/ensemble_evaluator/ensemble_evaluator_utils.py index 3aece7e330b..5de1a862041 100644 --- a/tests/unit_tests/ensemble_evaluator/ensemble_evaluator_utils.py +++ b/tests/unit_tests/ensemble_evaluator/ensemble_evaluator_utils.py @@ -105,7 +105,7 @@ def _evaluate(self, url): job_failed = False send_dispatch_event( dispatch, - identifiers.EVTYPE_FM_STEP_UNKNOWN, + identifiers.EVTYPE_REALIZATION_UNKNOWN, f"/ert/ensemble/{self.id_}/real/{real}", f"event-{event_id}", None, @@ -115,7 +115,7 @@ def _evaluate(self, url): send_dispatch_event( dispatch, identifiers.EVTYPE_FM_JOB_RUNNING, - f"/ert/ensemble/{self.id_}/real/" + f"{real}//job/{job}", + f"/ert/ensemble/{self.id_}/real/{real}/job/{job}", f"event-{event_id}", {"current_memory_usage": 1000}, ) @@ -124,7 +124,7 @@ def _evaluate(self, url): send_dispatch_event( dispatch, identifiers.EVTYPE_FM_JOB_FAILURE, - f"/ert/ensemble/{self.id_}/real/" + f"{real}/job/{job}", + f"/ert/ensemble/{self.id_}/real/{real}/job/{job}", f"event-{event_id}", {}, ) @@ -134,7 +134,7 @@ def _evaluate(self, url): send_dispatch_event( dispatch, identifiers.EVTYPE_FM_JOB_SUCCESS, - f"/ert/ensemble/{self.id_}/real/" + f"{real}/job/{job}", + f"/ert/ensemble/{self.id_}/real/{real}/job/{job}", f"event-{event_id}", {"current_memory_usage": 1000}, ) @@ -142,8 +142,8 @@ def _evaluate(self, url): if job_failed: send_dispatch_event( dispatch, - identifiers.EVTYPE_FM_STEP_FAILURE, - f"/ert/ensemble/{self.id_}/real/" + f"{real}/job/{job}", + identifiers.EVTYPE_REALIZATION_FAILURE, + f"/ert/ensemble/{self.id_}/real/{real}/job/{job}", f"event-{event_id}", {}, ) @@ -152,7 +152,7 @@ def _evaluate(self, url): send_dispatch_event( dispatch, identifiers.EVTYPE_FM_STEP_SUCCESS, - f"/ert/ensemble/{self.id_}/real/" + f"{real}/job/{job}", + f"/ert/ensemble/{self.id_}/real/{real}/job/{job}", f"event-{event_id}", {}, ) diff --git a/tests/unit_tests/ensemble_evaluator/test_async_queue_execution.py b/tests/unit_tests/ensemble_evaluator/test_async_queue_execution.py index e87a0b4f540..997bff84228 100644 --- a/tests/unit_tests/ensemble_evaluator/test_async_queue_execution.py +++ b/tests/unit_tests/ensemble_evaluator/test_async_queue_execution.py @@ -23,7 +23,7 @@ async def _handler(websocket, path): event = await websocket.recv() events.append(event) cloud_event = from_json(event) - if cloud_event["type"] == "com.equinor.ert.forward_model_stage.success": + if cloud_event["type"] == "com.equinor.ert.realization.success": break async with serve(_handler, host, port, process_request=process_request): @@ -70,10 +70,10 @@ async def test_happy_path( event_0 = from_json(mock_ws_task.result()[0]) assert event_0["source"] == "/ert/ensemble/ee_0/real/0" - assert event_0["type"] == "com.equinor.ert.forward_model_step.waiting" + assert event_0["type"] == "com.equinor.ert.realization.waiting" assert event_0.data == {"queue_event_type": "WAITING"} end_event_index = len(mock_ws_task.result()) - 1 end_event = from_json(mock_ws_task.result()[end_event_index]) - assert end_event["type"] == "com.equinor.ert.forward_model_step.success" + assert end_event["type"] == "com.equinor.ert.realization.success" assert end_event.data == {"queue_event_type": "SUCCESS"} diff --git a/tests/unit_tests/ensemble_evaluator/test_dispatch.py b/tests/unit_tests/ensemble_evaluator/test_dispatch.py index d4b621c4c4a..28fbe349898 100644 --- a/tests/unit_tests/ensemble_evaluator/test_dispatch.py +++ b/tests/unit_tests/ensemble_evaluator/test_dispatch.py @@ -53,13 +53,14 @@ async def test_that_dispatcher_uses_right_handle_function_for_one_event(): async def test_that_event_dispatcher_uses_right_handle_functions_for_two_events(): event_handler = DummyEventHandler() - step_event = _create_dummy_event(ids.EVTYPE_FM_STEP_UNKNOWN) + realization_event = _create_dummy_event(ids.EVTYPE_REALIZATION_UNKNOWN) fail_event = _create_dummy_event(ids.EVTYPE_ENSEMBLE_FAILED) - await event_handler.dispatcher.handle_event(step_event) + + await event_handler.dispatcher.handle_event(realization_event) await event_handler.dispatcher.handle_event(fail_event) await event_handler.join() - event_handler.mock_fm.assert_called_with([step_event]) + event_handler.mock_fm.assert_called_with([realization_event]) event_handler.mock_fail.assert_called_with([fail_event]) await event_handler.join() @@ -82,8 +83,8 @@ async def test_that_event_dispatcher_ignores_event_without_registered_handle_fun async def test_event_dispatcher_batching_two_handlers(): event_handler = DummyEventHandler() - event1 = _create_dummy_event(ids.EVTYPE_FM_STEP_UNKNOWN) - event2 = _create_dummy_event(ids.EVTYPE_FM_STEP_UNKNOWN) + event1 = _create_dummy_event(ids.EVTYPE_REALIZATION_UNKNOWN) + event2 = _create_dummy_event(ids.EVTYPE_REALIZATION_UNKNOWN) await event_handler.dispatcher.handle_event(event1) await event_handler.dispatcher.handle_event(event2) diff --git a/tests/unit_tests/ensemble_evaluator/test_ensemble_evaluator.py b/tests/unit_tests/ensemble_evaluator/test_ensemble_evaluator.py index a077c0c6742..3ce4a02c924 100644 --- a/tests/unit_tests/ensemble_evaluator/test_ensemble_evaluator.py +++ b/tests/unit_tests/ensemble_evaluator/test_ensemble_evaluator.py @@ -46,7 +46,7 @@ def test_new_monitor_can_pick_up_where_we_left_off(evaluator): send_dispatch_event( dispatch1, identifiers.EVTYPE_FM_JOB_RUNNING, - f"/ert/ensemble/{evaluator.ensemble.id_}/real/0/step/0/job/0", + f"/ert/ensemble/{evaluator.ensemble.id_}/real/0/job/0", "event1", {"current_memory_usage": 1000}, ) @@ -55,7 +55,7 @@ def test_new_monitor_can_pick_up_where_we_left_off(evaluator): send_dispatch_event( dispatch2, identifiers.EVTYPE_FM_JOB_RUNNING, - f"/ert/ensemble/{evaluator.ensemble.id_}/real/1/step/0/job/0", + f"/ert/ensemble/{evaluator.ensemble.id_}/real/1/job/0", "event1", {"current_memory_usage": 1000}, ) @@ -63,7 +63,7 @@ def test_new_monitor_can_pick_up_where_we_left_off(evaluator): send_dispatch_event( dispatch2, identifiers.EVTYPE_FM_JOB_RUNNING, - f"/ert/ensemble/{evaluator.ensemble.id_}/real/1/step/0/job/1", + f"/ert/ensemble/{evaluator.ensemble.id_}/real/1/job/1", "event1", {"current_memory_usage": 1000}, ) @@ -86,7 +86,7 @@ def test_new_monitor_can_pick_up_where_we_left_off(evaluator): send_dispatch_event( dispatch2, identifiers.EVTYPE_FM_JOB_SUCCESS, - f"/ert/ensemble/{evaluator.ensemble.id_}/real/1/step/0/job/0", + f"/ert/ensemble/{evaluator.ensemble.id_}/real/1/job/0", "event1", {"current_memory_usage": 1000}, ) @@ -95,7 +95,7 @@ def test_new_monitor_can_pick_up_where_we_left_off(evaluator): send_dispatch_event( dispatch2, identifiers.EVTYPE_FM_JOB_FAILURE, - f"/ert/ensemble/{evaluator.ensemble.id_}/real/1/step/0/job/1", + f"/ert/ensemble/{evaluator.ensemble.id_}/real/1/job/1", "event_job_1_fail", {identifiers.ERROR_MSG: "error"}, ) @@ -150,7 +150,7 @@ def test_dispatch_endpoint_clients_can_connect_and_monitor_can_shut_down_evaluat send_dispatch_event( dispatch1, identifiers.EVTYPE_FM_JOB_RUNNING, - f"/ert/ensemble/{evaluator.ensemble.id_}/real/0/step/0/job/0", + f"/ert/ensemble/{evaluator.ensemble.id_}/real/0/job/0", "event1", {"current_memory_usage": 1000}, ) @@ -159,7 +159,7 @@ def test_dispatch_endpoint_clients_can_connect_and_monitor_can_shut_down_evaluat send_dispatch_event( dispatch2, identifiers.EVTYPE_FM_JOB_RUNNING, - f"/ert/ensemble/{evaluator.ensemble.id_}/real/1/step/0/job/0", + f"/ert/ensemble/{evaluator.ensemble.id_}/real/1/job/0", "event1", {"current_memory_usage": 1000}, ) @@ -168,7 +168,7 @@ def test_dispatch_endpoint_clients_can_connect_and_monitor_can_shut_down_evaluat send_dispatch_event( dispatch2, identifiers.EVTYPE_FM_JOB_SUCCESS, - f"/ert/ensemble/{evaluator.ensemble.id_}/real/1/step/0/job/0", + f"/ert/ensemble/{evaluator.ensemble.id_}/real/1/job/0", "event1", {"current_memory_usage": 1000}, ) @@ -177,7 +177,7 @@ def test_dispatch_endpoint_clients_can_connect_and_monitor_can_shut_down_evaluat send_dispatch_event( dispatch2, identifiers.EVTYPE_FM_JOB_FAILURE, - f"/ert/ensemble/{evaluator.ensemble.id_}/real/1/step/0/job/1", + f"/ert/ensemble/{evaluator.ensemble.id_}/real/1/job/1", "event_job_1_fail", {identifiers.ERROR_MSG: "error"}, ) @@ -236,15 +236,15 @@ def test_ensure_multi_level_events_in_order(evaluator): ) send_dispatch_event( dispatch1, - identifiers.EVTYPE_FM_STEP_SUCCESS, - f"/ert/ensemble/{evaluator.ensemble.id_}/real/0/step/0", + identifiers.EVTYPE_REALIZATION_SUCCESS, + f"/ert/ensemble/{evaluator.ensemble.id_}/real/0", "event1", {}, ) send_dispatch_event( dispatch1, - identifiers.EVTYPE_FM_STEP_SUCCESS, - f"/ert/ensemble/{evaluator.ensemble.id_}/real/1/step/0", + identifiers.EVTYPE_REALIZATION_SUCCESS, + f"/ert/ensemble/{evaluator.ensemble.id_}/real/1", "event2", {}, ) @@ -295,28 +295,28 @@ def exploding_handler(events): send_dispatch_event( dispatch, identifiers.EVTYPE_FM_JOB_RUNNING, - f"/ert/ensemble/{evaluator.ensemble.id_}/real/0/step/0/job/0", + f"/ert/ensemble/{evaluator.ensemble.id_}/real/0/job/0", "event1", {"current_memory_usage": 1000}, ) send_dispatch_event( dispatch, identifiers.EVTYPE_FM_JOB_RUNNING, - f"/ert/ensemble/{evaluator.ensemble.id_}/real/0/step/0/job/0", + f"/ert/ensemble/{evaluator.ensemble.id_}/real/0/job/0", "event2", {}, ) send_dispatch_event( dispatch, "EXPLODING", - f"/ert/ensemble/{evaluator.ensemble.id_}/real/1/step/0", + f"/ert/ensemble/{evaluator.ensemble.id_}/real/1", "event3", {}, ) send_dispatch_event( dispatch, - identifiers.EVTYPE_FM_STEP_SUCCESS, - f"/ert/ensemble/{evaluator.ensemble.id_}/real/0/step/0/job/0", + identifiers.EVTYPE_REALIZATION_SUCCESS, + f"/ert/ensemble/{evaluator.ensemble.id_}/real/0/job/0", "event4", {}, ) diff --git a/tests/unit_tests/ensemble_evaluator/test_evaluator_tracker.py b/tests/unit_tests/ensemble_evaluator/test_evaluator_tracker.py index e57e71aba03..22b47774f10 100644 --- a/tests/unit_tests/ensemble_evaluator/test_evaluator_tracker.py +++ b/tests/unit_tests/ensemble_evaluator/test_evaluator_tracker.py @@ -9,7 +9,7 @@ from ert.ensemble_evaluator import EvaluatorTracker, state from ert.ensemble_evaluator.config import EvaluatorServerConfig from ert.ensemble_evaluator.event import EndEvent, SnapshotUpdateEvent -from ert.ensemble_evaluator.snapshot import PartialSnapshot, SnapshotBuilder, Step +from ert.ensemble_evaluator.snapshot import PartialSnapshot, SnapshotBuilder from ert.run_models import BaseRunModel @@ -17,11 +17,7 @@ def build_snapshot(real_list: Optional[List[str]] = None): if real_list is None: # passing ["0"] is required real_list = ["0"] - return ( - SnapshotBuilder() - .add_step(status=state.STEP_STATE_UNKNOWN) - .build(real_list, state.REALIZATION_STATE_UNKNOWN) - ) + return SnapshotBuilder().build(real_list, state.REALIZATION_STATE_UNKNOWN) def build_partial(real_list: Optional[List[str]] = None): @@ -65,7 +61,9 @@ def _track(): data={ **( build_partial(["0", "1"]) - .update_step("0", Step(status=state.STEP_STATE_SUCCESS)) + .update_realization( + "0", status=state.REALIZATION_STATE_FINISHED + ) .to_dict() ), "iter": 2, @@ -91,7 +89,9 @@ def _track(): data={ **( build_partial(["0", "1"]) - .update_step("0", Step(status=state.STEP_STATE_SUCCESS)) + .update_realization( + "0", status=state.REALIZATION_STATE_FINISHED + ) .to_dict() ), "iter": 0, @@ -117,7 +117,9 @@ def _track(): data={ **( build_partial(["0", "1"]) - .update_step("0", Step(status=state.STEP_STATE_SUCCESS)) + .update_realization( + "0", status=state.REALIZATION_STATE_FINISHED + ) .to_dict() ), "iter": 0, @@ -143,8 +145,12 @@ def _track(): data={ **( build_partial(["0", "1"]) - .update_step("0", Step(status=state.STEP_STATE_SUCCESS)) - .update_step("1", Step(status=state.STEP_STATE_SUCCESS)) + .update_realization( + "0", status=state.REALIZATION_STATE_FINISHED + ) + .update_realization( + "1", status=state.REALIZATION_STATE_FINISHED + ) .to_dict() ), "iter": 0, @@ -162,7 +168,9 @@ def _track(): data={ **( build_partial(["0", "1"]) - .update_step("0", Step(status=state.STEP_STATE_SUCCESS)) + .update_realization( + "0", status=state.REALIZATION_STATE_FINISHED + ) .to_dict() ), "iter": 1, @@ -193,8 +201,12 @@ def _track(): data={ **( build_partial(["0", "1"]) - .update_step("0", Step(status=state.STEP_STATE_SUCCESS)) - .update_step("1", Step(status=state.STEP_STATE_SUCCESS)) + .update_realization( + "0", status=state.REALIZATION_STATE_FINISHED + ) + .update_realization( + "1", status=state.REALIZATION_STATE_FINISHED + ) .to_dict() ), "iter": 0, @@ -212,8 +224,12 @@ def _track(): data={ **( build_partial(["0", "1"]) - .update_step("0", Step(status=state.STEP_STATE_SUCCESS)) - .update_step("1", Step(status=state.STEP_STATE_SUCCESS)) + .update_realization( + "0", status=state.REALIZATION_STATE_FINISHED + ) + .update_realization( + "1", status=state.REALIZATION_STATE_FINISHED + ) .to_dict() ), "iter": 1, @@ -244,8 +260,12 @@ def _track(): data={ **( build_partial(["0", "1"]) - .update_step("0", Step(status=state.STEP_STATE_SUCCESS)) - .update_step("1", Step(status=state.STEP_STATE_SUCCESS)) + .update_realization( + "0", status=state.REALIZATION_STATE_FINISHED + ) + .update_realization( + "1", status=state.REALIZATION_STATE_FINISHED + ) .to_dict() ), "iter": 1, @@ -263,8 +283,12 @@ def _track(): data={ **( build_partial(["0", "1"]) - .update_step("0", Step(status=state.STEP_STATE_SUCCESS)) - .update_step("1", Step(status=state.STEP_STATE_SUCCESS)) + .update_realization( + "0", status=state.REALIZATION_STATE_FINISHED + ) + .update_realization( + "1", status=state.REALIZATION_STATE_FINISHED + ) .to_dict() ), "iter": 2, diff --git a/tests/unit_tests/ensemble_evaluator/test_snapshot.py b/tests/unit_tests/ensemble_evaluator/test_snapshot.py index 8a17b5ddc6b..5bf8a6b99a3 100644 --- a/tests/unit_tests/ensemble_evaluator/test_snapshot.py +++ b/tests/unit_tests/ensemble_evaluator/test_snapshot.py @@ -104,7 +104,7 @@ def test_source_get_ids(source_string, expected_ids): assert _get_job_id(source_string) == expected_ids["job"] -def test_update_partial_from_multiple_cloudevents(snapshot): +def test_update_jobs_in_partial_from_multiple_cloudevents(snapshot): partial = PartialSnapshot(snapshot) partial.from_cloudevent( CloudEvent( @@ -143,19 +143,14 @@ def test_update_partial_from_multiple_cloudevents(snapshot): assert jobs["1"]["status"] == state.JOB_STATE_FINISHED -def test_multiple_cloud_events_trigger_non_communicated_change(): - """In other words, though we say all steps are finished, we don't - explicitly send an event that changes the realization status. It should - happen by virtue of the steps being completed.""" - snapshot = ( - SnapshotBuilder().add_step(status="Unknown").build(["0"], status="Unknown") - ) +def test_that_realization_success_message_updates_state(snapshot): + snapshot = SnapshotBuilder().build(["0"], status="Unknown") partial = PartialSnapshot(snapshot) partial.from_cloudevent( CloudEvent( { "id": "0", - "type": ids.EVTYPE_FM_STEP_SUCCESS, + "type": ids.EVTYPE_REALIZATION_SUCCESS, "source": "/real/0", } ) diff --git a/tests/unit_tests/gui/conftest.py b/tests/unit_tests/gui/conftest.py index 6bdb0e4ba39..c47b183a054 100644 --- a/tests/unit_tests/gui/conftest.py +++ b/tests/unit_tests/gui/conftest.py @@ -24,13 +24,11 @@ Snapshot, SnapshotBuilder, SnapshotDict, - Step, ) from ert.ensemble_evaluator.state import ( ENSEMBLE_STATE_STARTED, JOB_STATE_START, REALIZATION_STATE_UNKNOWN, - STEP_STATE_UNKNOWN, ) from ert.gui.ertwidgets import ClosableDialog from ert.gui.ertwidgets.caselist import AddRemoveWidget @@ -242,11 +240,6 @@ def full_snapshot() -> Snapshot: real = RealizationSnapshot( status=REALIZATION_STATE_UNKNOWN, active=True, - steps={ - "0": Step( - status="", - ) - }, jobs={ "0": Job( start_time=dt.now(), @@ -298,7 +291,7 @@ def full_snapshot() -> Snapshot: @pytest.fixture() def large_snapshot() -> Snapshot: - builder = SnapshotBuilder().add_step(status=STEP_STATE_UNKNOWN) + builder = SnapshotBuilder() for i in range(0, 150): builder.add_job( job_id=str(i), @@ -318,7 +311,7 @@ def large_snapshot() -> Snapshot: @pytest.fixture() def small_snapshot() -> Snapshot: - builder = SnapshotBuilder().add_step(status=STEP_STATE_UNKNOWN) + builder = SnapshotBuilder() for i in range(0, 2): builder.add_job( job_id=str(i), diff --git a/tests/unit_tests/gui/model/test_real_list.py b/tests/unit_tests/gui/model/test_real_list.py index c03f8b48939..3c3bd042c6f 100644 --- a/tests/unit_tests/gui/model/test_real_list.py +++ b/tests/unit_tests/gui/model/test_real_list.py @@ -18,7 +18,7 @@ def test_using_qt_model_tester(qtmodeltester, full_snapshot): model.setSourceModel(source_model) reporting_mode = qt_api.QtTest.QAbstractItemModelTester.FailureReportingMode.Warning - tester = qt_api.QtTest.QAbstractItemModelTester( # noqa: F841, prevent GC + tester = qt_api.QtTest.QAbstractItemModelTester( # noqa: F841, prevent GAC model, reporting_mode ) diff --git a/tests/unit_tests/gui/simulation/test_run_dialog.py b/tests/unit_tests/gui/simulation/test_run_dialog.py index eb936db9508..f7bcf1d02c1 100644 --- a/tests/unit_tests/gui/simulation/test_run_dialog.py +++ b/tests/unit_tests/gui/simulation/test_run_dialog.py @@ -132,7 +132,6 @@ def test_large_snapshot( FullSnapshotEvent( snapshot=( SnapshotBuilder() - .add_step(status=state.STEP_STATE_UNKNOWN) .add_job( job_id="0", index="0", @@ -171,7 +170,6 @@ def test_large_snapshot( FullSnapshotEvent( snapshot=( SnapshotBuilder() - .add_step(status=state.STEP_STATE_UNKNOWN) .add_job( job_id="0", index="0", @@ -191,9 +189,9 @@ def test_large_snapshot( ), SnapshotUpdateEvent( partial_snapshot=PartialSnapshot( - SnapshotBuilder() - .add_step(status=state.STEP_STATE_SUCCESS) - .build(["0"], status=state.REALIZATION_STATE_FINISHED) + SnapshotBuilder().build( + ["0"], status=state.REALIZATION_STATE_FINISHED + ) ), phase_name="Foo", current_phase=0, @@ -212,7 +210,6 @@ def test_large_snapshot( FullSnapshotEvent( snapshot=( SnapshotBuilder() - .add_step(status=state.STEP_STATE_UNKNOWN) .add_job( job_id="0", index="0", @@ -237,14 +234,13 @@ def test_large_snapshot( SnapshotUpdateEvent( partial_snapshot=PartialSnapshot( SnapshotBuilder() - .add_step(status=state.STEP_STATE_SUCCESS) + # .add_step(status=state.STEP_STATE_SUCCESS) .add_job( job_id="0", index="0", status=state.JOB_STATE_FINISHED, name="job_0", - ) - .build(["1"], status=state.REALIZATION_STATE_RUNNING) + ).build(["1"], status=state.REALIZATION_STATE_RUNNING) ), phase_name="Foo", current_phase=0, @@ -256,7 +252,6 @@ def test_large_snapshot( SnapshotUpdateEvent( partial_snapshot=PartialSnapshot( SnapshotBuilder() - .add_step(status=state.STEP_STATE_FAILURE) .add_job( job_id="1", index="1", @@ -282,7 +277,6 @@ def test_large_snapshot( FullSnapshotEvent( snapshot=( SnapshotBuilder() - .add_step(status=state.STEP_STATE_UNKNOWN) .add_job( job_id="0", index="0", @@ -301,7 +295,6 @@ def test_large_snapshot( FullSnapshotEvent( snapshot=( SnapshotBuilder() - .add_step(status=state.STEP_STATE_UNKNOWN) .add_job( job_id="0", index="0",