From fa312b40f98d6c2ddb5884a0666e9165a1809eef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5vard=20Berland?= Date: Wed, 30 Oct 2024 15:45:21 +0100 Subject: [PATCH] Log runtime for individual forward model steps There are more short forward model time steps than we want the logging system to handle, thus steps taking less than 2 minutes are skipped. --- src/ert/ensemble_evaluator/_ensemble.py | 45 ++++++++++++++++++++++++- src/ert/ensemble_evaluator/snapshot.py | 4 +++ 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/src/ert/ensemble_evaluator/_ensemble.py b/src/ert/ensemble_evaluator/_ensemble.py index 32c855f2afd..14138b1ab42 100644 --- a/src/ert/ensemble_evaluator/_ensemble.py +++ b/src/ert/ensemble_evaluator/_ensemble.py @@ -17,7 +17,15 @@ Union, ) -from _ert.events import Event, Id, event_from_dict, event_to_json +from _ert.events import ( + Event, + FMEvent, + ForwardModelStepFailure, + ForwardModelStepSuccess, + Id, + event_from_dict, + event_to_json, +) from _ert.forward_model_runner.client import Client from ert.config import ForwardModelStep, QueueConfig from ert.run_arg import RunArg @@ -142,9 +150,44 @@ def _create_snapshot(self) -> EnsembleSnapshot: def get_successful_realizations(self) -> List[int]: return self.snapshot.get_successful_realizations() + def _log_completed_fm_step( + self, event: FMEvent, step_snapshot: Optional[FMStepSnapshot] + ) -> None: + if step_snapshot is None: + logger.warning(f"Should log {event}, but there was no step_snapshot") + return + step_name = step_snapshot.get("name", "") + start_time = step_snapshot.get("start_time") + cpu_seconds = step_snapshot.get("cpu_seconds") + current_memory_usage = step_snapshot.get("current_memory_usage") + if start_time is not None and event.time is not None: + walltime = (event.time - start_time).total_seconds() + else: + # We get here if the Running event is in the same event batch as + # the Success event. That means that runtime is close to zero. + walltime = 0 + + if walltime > 120: + logger.warning( + f"{event.event_type} {step_name} " + f"{walltime=} " + f"{cpu_seconds=} " + f"{current_memory_usage=} " + f"step_index={event.fm_step} " + f"real={event.real} " + f"ensemble={event.ensemble}" + ) + def update_snapshot(self, events: Sequence[Event]) -> EnsembleSnapshot: snapshot_mutate_event = EnsembleSnapshot() for event in events: + if isinstance(event, (ForwardModelStepSuccess, ForwardModelStepFailure)): + step = ( + self.snapshot.reals[event.real] + .get("fm_steps", {}) + .get(event.fm_step) + ) + self._log_completed_fm_step(event, step) snapshot_mutate_event = snapshot_mutate_event.update_from_event( event, source_snapshot=self.snapshot ) diff --git a/src/ert/ensemble_evaluator/snapshot.py b/src/ert/ensemble_evaluator/snapshot.py index 35e0157944f..97227c100e6 100644 --- a/src/ert/ensemble_evaluator/snapshot.py +++ b/src/ert/ensemble_evaluator/snapshot.py @@ -1,3 +1,4 @@ +import logging import sys import typing from collections import defaultdict @@ -46,6 +47,8 @@ from ert.ensemble_evaluator import identifiers as ids from ert.ensemble_evaluator import state +logger = logging.getLogger(__name__) + if sys.version_info < (3, 11): from backports.datetime_fromisoformat import MonkeyPatch # type: ignore @@ -327,6 +330,7 @@ def update_from_event( start_time = convert_iso8601_to_datetime(timestamp) elif e_type in {ForwardModelStepSuccess, ForwardModelStepFailure}: end_time = convert_iso8601_to_datetime(timestamp) + if type(event) is ForwardModelStepFailure: error = event.error_msg if event.error_msg else "" else: