Skip to content

Commit

Permalink
Refactor counting cpu-seconds for processtree
Browse files Browse the repository at this point in the history
  • Loading branch information
berland committed Feb 4, 2025
1 parent a522de5 commit 5c2b28d
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 12 deletions.
34 changes: 22 additions & 12 deletions src/_ert/forward_model_runner/forward_model_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import time
import uuid
from collections.abc import Generator, Sequence
from dataclasses import dataclass, field
from datetime import datetime as dt
from pathlib import Path
from subprocess import Popen, run
Expand Down Expand Up @@ -188,7 +189,7 @@ def _run(self) -> Generator[Start | Exited | Running]:

max_memory_usage = 0
fm_step_pids = {int(process.pid)}
cpu_seconds_pr_pid: dict[str, float] = {}
cpu_seconds_processtree: ProcesstreeTimer = ProcesstreeTimer()
while True:
try:
exit_code = process.wait(timeout=self.MEMORY_POLL_PERIOD)
Expand All @@ -207,16 +208,7 @@ def _run(self) -> Generator[Start | Exited | Running]:
(memory_rss, cpu_seconds_snapshot, oom_score, pids) = _get_processtree_data(
process
)
for pid, seconds in cpu_seconds_snapshot.items():
if cpu_seconds_pr_pid.get(str(pid), 0.0) <= seconds:
cpu_seconds_pr_pid[str(pid)] = seconds
else:
# cpu_seconds must be monotonely increasing. Since
# decreasing cpu_seconds was detected, it must be due to pid reuse
cpu_seconds_pr_pid[str(pid) + str(uuid.uuid4())] = (
cpu_seconds_pr_pid[str(pid)]
)
cpu_seconds_pr_pid[str(pid)] = seconds
cpu_seconds_processtree.update(cpu_seconds_snapshot)
fm_step_pids |= pids
max_memory_usage = max(memory_rss, max_memory_usage)
yield Running(
Expand All @@ -226,7 +218,7 @@ def _run(self) -> Generator[Start | Exited | Running]:
max_rss=max_memory_usage,
fm_step_id=self.index,
fm_step_name=self.job_data.get("name"),
cpu_seconds=sum(cpu_seconds_pr_pid.values()),
cpu_seconds=cpu_seconds_processtree.total_cpu_seconds(),
oom_score=oom_score,
),
)
Expand Down Expand Up @@ -427,6 +419,24 @@ def ensure_file_handles_closed(file_handles: Sequence[io.TextIOWrapper | None])
file_handle.close()


@dataclass
class ProcesstreeTimer:
_cpu_seconds_pr_pid: dict[str, float] = field(default_factory=dict, init=False)

def update(self, cpu_seconds_snapshot: dict[str, float]) -> None:
for pid, seconds in cpu_seconds_snapshot.items():
if self._cpu_seconds_pr_pid.get(pid, 0.0) > seconds:
# cpu_seconds for a process must increase monotonically.
# Since decreasing cpu_seconds was detected, it must be due to pid reuse
self._cpu_seconds_pr_pid[pid + "-" + str(uuid.uuid4())] = (
self._cpu_seconds_pr_pid[pid]
)
self._cpu_seconds_pr_pid[pid] = seconds

def total_cpu_seconds(self) -> float:
return sum(self._cpu_seconds_pr_pid.values())


def _get_processtree_data(
process: Process,
) -> tuple[int, dict[str, float], int | None, set[int]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from _ert.forward_model_runner.forward_model_step import (
ForwardModelStep,
ProcesstreeTimer,
_get_processtree_data,
)
from _ert.forward_model_runner.reporting.message import Exited, Running, Start
Expand Down Expand Up @@ -319,3 +320,23 @@ def test_makedirs(monkeypatch, tmp_path):
pass
assert (tmp_path / "a/file").is_file()
assert (tmp_path / "b/c/file").is_file()


@pytest.mark.parametrize(
"snapshots, expected_total_seconds",
[
([{}], 0.0),
([{"1": 1.1}], 1.1),
([{"1": 1.1}, {"1": 2.1}], 2.1),
([{"1": 1.1}, {"1": 1.1}], 1.1),
([{"1": 1.1}, {"2": 3.1}], 1.1 + 3.1),
([{"1": 1.1}, {"1": 0.2}], 1.1 + 0.2), # pid reuse
],
)
def test_processtree_timer(
snapshots: list[dict[str, float]], expected_total_seconds: float
):
timer = ProcesstreeTimer()
for snapshot in snapshots:
timer.update(snapshot)
assert timer.total_cpu_seconds() == expected_total_seconds

0 comments on commit 5c2b28d

Please sign in to comment.