diff --git a/src/_ert/forward_model_runner/forward_model_step.py b/src/_ert/forward_model_runner/forward_model_step.py index 0757be89003..430304389e7 100644 --- a/src/_ert/forward_model_runner/forward_model_step.py +++ b/src/_ert/forward_model_runner/forward_model_step.py @@ -8,6 +8,7 @@ import socket import sys import time +import uuid from collections.abc import Generator, Sequence from datetime import datetime as dt from pathlib import Path @@ -186,13 +187,24 @@ def _run(self) -> Generator[Start | Exited | Running | None]: exit_code = None max_memory_usage = 0 - max_cpu_seconds = 0 fm_step_pids = {int(process.pid)} + cpu_seconds_pr_pid: dict[str, float] = {} while exit_code is None: - (memory_rss, cpu_seconds, oom_score, pids) = _get_processtree_data(process) - max_cpu_seconds = max(max_cpu_seconds, cpu_seconds or 0) + (memory_rss, cpu_seconds_snapshot, oom_score, pids) = _get_processtree_data( + process + ) fm_step_pids |= pids max_memory_usage = max(memory_rss, max_memory_usage) + for pid, seconds in cpu_seconds_snapshot.items(): + if cpu_seconds_pr_pid.get(str(pid), 0.0) <= seconds: + cpu_seconds_pr_pid[str(pid)] = seconds + else: + # cpu_seconds must be monotonely increasing. Since + # decreasing cpu_seconds was detected, it must be due to pid reuse + cpu_seconds_pr_pid[str(pid) + str(uuid.uuid4())] = ( + cpu_seconds_pr_pid[str(pid)] + ) + cpu_seconds_pr_pid[str(pid)] = seconds yield Running( self, ProcessTreeStatus( @@ -200,7 +212,7 @@ def _run(self) -> Generator[Start | Exited | Running | None]: max_rss=max_memory_usage, fm_step_id=self.index, fm_step_name=self.job_data.get("name"), - cpu_seconds=max_cpu_seconds, + cpu_seconds=sum(cpu_seconds_pr_pid.values()), oom_score=oom_score, ), ) @@ -416,7 +428,7 @@ def ensure_file_handles_closed(file_handles: Sequence[io.TextIOWrapper | None]) def _get_processtree_data( process: Process, -) -> tuple[int, float, int | None, set[int]]: +) -> tuple[int, dict[str, float], int | None, set[int]]: """Obtain the oom_score (the Linux kernel uses this number to decide which process to kill first in out-of-memory siturations). @@ -435,7 +447,7 @@ def _get_processtree_data( oom_score = None # A value of None means that we have no information. memory_rss = 0 - cpu_seconds = 0.0 + cpu_seconds_pr_pid: dict[str, float] = {} pids = set() with contextlib.suppress(ValueError, FileNotFoundError): oom_score = int( @@ -448,7 +460,7 @@ def _get_processtree_data( process.oneshot(), ): memory_rss = process.memory_info().rss - cpu_seconds = process.cpu_times().user + cpu_seconds_pr_pid[str(process.pid)] = process.cpu_times().user with contextlib.suppress( NoSuchProcess, AccessDenied, ZombieProcess, ProcessLookupError @@ -476,5 +488,5 @@ def _get_processtree_data( child.oneshot(), ): memory_rss += child.memory_info().rss - cpu_seconds += child.cpu_times().user - return (memory_rss, cpu_seconds, oom_score, pids) + cpu_seconds_pr_pid[str(child.pid)] = child.cpu_times().user + return (memory_rss, cpu_seconds_pr_pid, oom_score, pids) diff --git a/tests/ert/unit_tests/forward_model_runner/test_forward_model_step.py b/tests/ert/unit_tests/forward_model_runner/test_forward_model_step.py index 5a6413c93c5..63045ea67e5 100644 --- a/tests/ert/unit_tests/forward_model_runner/test_forward_model_step.py +++ b/tests/ert/unit_tests/forward_model_runner/test_forward_model_step.py @@ -49,8 +49,8 @@ def test_run_with_process_failing(mock_process, mock_popen, mock_check_executabl @pytest.mark.usefixtures("use_tmpdir") def test_cpu_seconds_can_detect_multiprocess(): """Run a fm step that sets of two simultaneous processes that - each run for 2 second. We should be able to detect the total - cpu seconds consumed to be roughly 2 seconds. + each run for 1 and 2 seconds respectively. We should be able to detect + the total cpu seconds consumed to be roughly 3 seconds. The test is flaky in that it tries to gather cpu_seconds data while the subprocesses are running. On a loaded CPU this is not very robust, @@ -64,8 +64,9 @@ def test_cpu_seconds_can_detect_multiprocess(): textwrap.dedent( """\ import time + import sys now = time.time() - while time.time() < now + 2: + while time.time() < now + int(sys.argv[1]): pass""" ) ) @@ -75,8 +76,8 @@ def test_cpu_seconds_can_detect_multiprocess(): textwrap.dedent( """\ #!/bin/sh - python busy.py & - python busy.py""" + python busy.py 1 & + python busy.py 2""" ) ) executable = os.path.realpath(scriptname) @@ -180,7 +181,7 @@ def oneshot(self): def test_cpu_seconds_for_process_with_children(): (_, cpu_seconds, _, _) = _get_processtree_data(MockedProcess(123)) - assert cpu_seconds == 123 / 10.0 + 124 / 10.0 + assert cpu_seconds == {"123": 12.3, "124": 12.4} @pytest.mark.skipif(sys.platform.startswith("darwin"), reason="No oom_score on MacOS")