Skip to content

Commit

Permalink
Log runtime for individual forward model steps
Browse files Browse the repository at this point in the history
There are more short forward model time steps than we want the logging
system to handle, thus steps taking less than 2 minutes are skipped.
  • Loading branch information
berland committed Nov 1, 2024
1 parent 5a220c7 commit fa312b4
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 1 deletion.
45 changes: 44 additions & 1 deletion src/ert/ensemble_evaluator/_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,15 @@
Union,
)

from _ert.events import Event, Id, event_from_dict, event_to_json
from _ert.events import (
Event,
FMEvent,
ForwardModelStepFailure,
ForwardModelStepSuccess,
Id,
event_from_dict,
event_to_json,
)
from _ert.forward_model_runner.client import Client
from ert.config import ForwardModelStep, QueueConfig
from ert.run_arg import RunArg
Expand Down Expand Up @@ -142,9 +150,44 @@ def _create_snapshot(self) -> EnsembleSnapshot:
def get_successful_realizations(self) -> List[int]:
return self.snapshot.get_successful_realizations()

def _log_completed_fm_step(
self, event: FMEvent, step_snapshot: Optional[FMStepSnapshot]
) -> None:
if step_snapshot is None:
logger.warning(f"Should log {event}, but there was no step_snapshot")
return
step_name = step_snapshot.get("name", "")
start_time = step_snapshot.get("start_time")
cpu_seconds = step_snapshot.get("cpu_seconds")
current_memory_usage = step_snapshot.get("current_memory_usage")
if start_time is not None and event.time is not None:
walltime = (event.time - start_time).total_seconds()
else:
# We get here if the Running event is in the same event batch as
# the Success event. That means that runtime is close to zero.
walltime = 0

if walltime > 120:
logger.warning(
f"{event.event_type} {step_name} "
f"{walltime=} "
f"{cpu_seconds=} "
f"{current_memory_usage=} "
f"step_index={event.fm_step} "
f"real={event.real} "
f"ensemble={event.ensemble}"
)

def update_snapshot(self, events: Sequence[Event]) -> EnsembleSnapshot:
snapshot_mutate_event = EnsembleSnapshot()
for event in events:
if isinstance(event, (ForwardModelStepSuccess, ForwardModelStepFailure)):
step = (
self.snapshot.reals[event.real]
.get("fm_steps", {})
.get(event.fm_step)
)
self._log_completed_fm_step(event, step)
snapshot_mutate_event = snapshot_mutate_event.update_from_event(
event, source_snapshot=self.snapshot
)
Expand Down
4 changes: 4 additions & 0 deletions src/ert/ensemble_evaluator/snapshot.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import sys
import typing
from collections import defaultdict
Expand Down Expand Up @@ -46,6 +47,8 @@
from ert.ensemble_evaluator import identifiers as ids
from ert.ensemble_evaluator import state

logger = logging.getLogger(__name__)

if sys.version_info < (3, 11):
from backports.datetime_fromisoformat import MonkeyPatch # type: ignore

Expand Down Expand Up @@ -327,6 +330,7 @@ def update_from_event(
start_time = convert_iso8601_to_datetime(timestamp)
elif e_type in {ForwardModelStepSuccess, ForwardModelStepFailure}:
end_time = convert_iso8601_to_datetime(timestamp)

if type(event) is ForwardModelStepFailure:
error = event.error_msg if event.error_msg else ""
else:
Expand Down

0 comments on commit fa312b4

Please sign in to comment.