diff --git a/src/_ert/forward_model_runner/__init__.py b/src/_ert/forward_model_runner/__init__.py
index aa20d9b5046..c4f40af8445 100644
--- a/src/_ert/forward_model_runner/__init__.py
+++ b/src/_ert/forward_model_runner/__init__.py
@@ -1,4 +1,4 @@
-"""_ert.forward_model_runner is called by ert to run jobs in the runpath.
+"""_ert.forward_model_runner is called by ert to run forward model steps in the runpath.
It is split into its own package for performance reasons,
simply importing ert can take several seconds, which is not ideal when
diff --git a/src/_ert/forward_model_runner/job.py b/src/_ert/forward_model_runner/forward_model_step.py
similarity index 99%
rename from src/_ert/forward_model_runner/job.py
rename to src/_ert/forward_model_runner/forward_model_step.py
index bf2ebe253b4..0249bafe5b1 100644
--- a/src/_ert/forward_model_runner/job.py
+++ b/src/_ert/forward_model_runner/forward_model_step.py
@@ -76,7 +76,7 @@ def killed_by_oom(pids: Sequence[int]) -> bool:
return False
-class Job:
+class ForwardModelStep:
MEMORY_POLL_PERIOD = 5 # Seconds between memory polls
def __init__(
diff --git a/src/_ert/forward_model_runner/reporting/message.py b/src/_ert/forward_model_runner/reporting/message.py
index 2811488da29..2dbca378cfc 100644
--- a/src/_ert/forward_model_runner/reporting/message.py
+++ b/src/_ert/forward_model_runner/reporting/message.py
@@ -5,7 +5,7 @@
import psutil
if TYPE_CHECKING:
- from _ert.forward_model_runner.job import Job
+ from _ert.forward_model_runner.forward_model_step import ForwardModelStep
class _ChecksumDictBase(TypedDict):
type: Literal["file"]
@@ -71,7 +71,7 @@ def __repr__(cls):
class Message(metaclass=_MetaMessage):
def __init__(self, job=None):
self.timestamp = dt.now()
- self.job: Optional[Job] = job
+ self.job: Optional[ForwardModelStep] = job
self.error_message: Optional[str] = None
def __repr__(self):
@@ -116,19 +116,19 @@ def __init__(self):
class Start(Message):
- def __init__(self, job: "Job"):
- super().__init__(job)
+ def __init__(self, fm_step: "ForwardModelStep"):
+ super().__init__(fm_step)
class Running(Message):
- def __init__(self, job: "Job", memory_status: ProcessTreeStatus):
- super().__init__(job)
+ def __init__(self, fm_step: "ForwardModelStep", memory_status: ProcessTreeStatus):
+ super().__init__(fm_step)
self.memory_status = memory_status
class Exited(Message):
- def __init__(self, job, exit_code: int):
- super().__init__(job)
+ def __init__(self, fm_step, exit_code: int):
+ super().__init__(fm_step)
self.exit_code = exit_code
diff --git a/src/_ert/forward_model_runner/runner.py b/src/_ert/forward_model_runner/runner.py
index 0892f4c02cf..1b76e393fae 100644
--- a/src/_ert/forward_model_runner/runner.py
+++ b/src/_ert/forward_model_runner/runner.py
@@ -4,7 +4,7 @@
from pathlib import Path
from typing import List
-from _ert.forward_model_runner.job import Job
+from _ert.forward_model_runner.forward_model_step import ForwardModelStep
from _ert.forward_model_runner.reporting.message import Checksum, Finish, Init
@@ -21,9 +21,9 @@ def __init__(self, jobs_data):
if self.simulation_id is not None:
os.environ["ERT_RUN_ID"] = self.simulation_id
- self.jobs: List[Job] = []
+ self.jobs: List[ForwardModelStep] = []
for index, job_data in enumerate(job_data_list):
- self.jobs.append(Job(job_data, index))
+ self.jobs.append(ForwardModelStep(job_data, index))
self._set_environment()
diff --git a/tests/ert/unit_tests/forward_model_runner/test_event_reporter.py b/tests/ert/unit_tests/forward_model_runner/test_event_reporter.py
index b1d15dcf817..d7dad85f0e8 100644
--- a/tests/ert/unit_tests/forward_model_runner/test_event_reporter.py
+++ b/tests/ert/unit_tests/forward_model_runner/test_event_reporter.py
@@ -16,7 +16,7 @@
ClientConnectionClosedOK,
ClientConnectionError,
)
-from _ert.forward_model_runner.job import Job
+from _ert.forward_model_runner.forward_model_step import ForwardModelStep
from _ert.forward_model_runner.reporting import Event
from _ert.forward_model_runner.reporting.message import (
Exited,
@@ -41,11 +41,13 @@ def test_report_with_successful_start_message_argument(unused_tcp_port):
host = "localhost"
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
- job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
+ fmstep1 = ForwardModelStep(
+ {"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0
+ )
lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
- reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
- reporter.report(Start(job1))
+ reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
+ reporter.report(Start(fmstep1))
reporter.report(Finish())
assert len(lines) == 1
@@ -63,13 +65,15 @@ def test_report_with_failed_start_message_argument(unused_tcp_port):
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
- job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
+ fmstep1 = ForwardModelStep(
+ {"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0
+ )
lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
- reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
+ reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
- msg = Start(job1).with_error("massive_failure")
+ msg = Start(fmstep1).with_error("massive_failure")
reporter.report(msg)
reporter.report(Finish())
@@ -84,12 +88,14 @@ def test_report_with_successful_exit_message_argument(unused_tcp_port):
host = "localhost"
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
- job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
+ fmstep1 = ForwardModelStep(
+ {"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0
+ )
lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
- reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
- reporter.report(Exited(job1, 0))
+ reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
+ reporter.report(Exited(fmstep1, 0))
reporter.report(Finish().with_error("failed"))
assert len(lines) == 1
@@ -101,12 +107,14 @@ def test_report_with_failed_exit_message_argument(unused_tcp_port):
host = "localhost"
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
- job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
+ fmstep1 = ForwardModelStep(
+ {"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0
+ )
lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
- reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
- reporter.report(Exited(job1, 1).with_error("massive_failure"))
+ reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
+ reporter.report(Exited(fmstep1, 1).with_error("massive_failure"))
reporter.report(Finish())
assert len(lines) == 1
@@ -119,12 +127,14 @@ def test_report_with_running_message_argument(unused_tcp_port):
host = "localhost"
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
- job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
+ fmstep1 = ForwardModelStep(
+ {"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0
+ )
lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
- reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
- reporter.report(Running(job1, ProcessTreeStatus(max_rss=100, rss=10)))
+ reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
+ reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Finish())
assert len(lines) == 1
@@ -138,12 +148,14 @@ def test_report_only_job_running_for_successful_run(unused_tcp_port):
host = "localhost"
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
- job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
+ fmstep1 = ForwardModelStep(
+ {"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0
+ )
lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
- reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
- reporter.report(Running(job1, ProcessTreeStatus(max_rss=100, rss=10)))
+ reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
+ reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Finish())
assert len(lines) == 1
@@ -153,12 +165,14 @@ def test_report_with_failed_finish_message_argument(unused_tcp_port):
host = "localhost"
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
- job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
+ fmstep1 = ForwardModelStep(
+ {"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0
+ )
lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
- reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
- reporter.report(Running(job1, ProcessTreeStatus(max_rss=100, rss=10)))
+ reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
+ reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Finish().with_error("massive_failure"))
assert len(lines) == 1
@@ -195,16 +209,18 @@ def mock_send(msg):
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
reporter._reporter_timeout = 4
- job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
+ fmstep1 = ForwardModelStep(
+ {"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0
+ )
lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
with patch(
"_ert.forward_model_runner.client.Client.send", lambda x, y: mock_send(y)
):
- reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
- reporter.report(Running(job1, ProcessTreeStatus(max_rss=100, rss=10)))
- reporter.report(Running(job1, ProcessTreeStatus(max_rss=1100, rss=10)))
- reporter.report(Running(job1, ProcessTreeStatus(max_rss=1100, rss=10)))
+ reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
+ reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=100, rss=10)))
+ reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=1100, rss=10)))
+ reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=1100, rss=10)))
# set _stop_timestamp
reporter.report(Finish())
if reporter._event_publisher_thread.is_alive():
@@ -234,16 +250,18 @@ def send_func(msg):
host = "localhost"
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
- job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
+ fmstep1 = ForwardModelStep(
+ {"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0
+ )
lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
with patch("_ert.forward_model_runner.client.Client.send") as patched_send:
patched_send.side_effect = send_func
- reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
- reporter.report(Running(job1, ProcessTreeStatus(max_rss=100, rss=10)))
- reporter.report(Running(job1, ProcessTreeStatus(max_rss=200, rss=10)))
- reporter.report(Running(job1, ProcessTreeStatus(max_rss=300, rss=10)))
+ reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
+ reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=100, rss=10)))
+ reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=200, rss=10)))
+ reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=300, rss=10)))
_wait_until(
condition=lambda: patched_send.call_count == 3,
@@ -275,12 +293,14 @@ def mock_send(msg):
host = "localhost"
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
- job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
+ fmstep1 = ForwardModelStep(
+ {"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0
+ )
lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
- reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
- reporter.report(Running(job1, ProcessTreeStatus(max_rss=100, rss=10)))
- reporter.report(Running(job1, ProcessTreeStatus(max_rss=200, rss=10)))
+ reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
+ reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=100, rss=10)))
+ reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=200, rss=10)))
# sleep until both Running events have been received
_wait_until(
@@ -292,20 +312,20 @@ def mock_send(msg):
with patch(
"_ert.forward_model_runner.client.Client.send", lambda x, y: mock_send(y)
):
- reporter.report(Running(job1, ProcessTreeStatus(max_rss=300, rss=10)))
+ reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=300, rss=10)))
# Make sure the publisher thread exits because it got
# ClientConnectionClosedOK. If it hangs it could indicate that the
# exception is not caught/handled correctly
if reporter._event_publisher_thread.is_alive():
reporter._event_publisher_thread.join()
- reporter.report(Running(job1, ProcessTreeStatus(max_rss=400, rss=10)))
+ reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=400, rss=10)))
reporter.report(Finish())
# set _stop_timestamp was not set to None since the reporter finished on time
assert reporter._timeout_timestamp is not None
- # The Running(job1, 300, 10) is popped from the queue, but never sent.
+ # The Running(fmstep1, 300, 10) is popped from the queue, but never sent.
# The following Running is added to queue along with the sentinel
assert reporter._event_queue.qsize() == 2
# None of the messages after ClientConnectionClosedOK was raised, has been sent
diff --git a/tests/ert/unit_tests/forward_model_runner/test_file_reporter.py b/tests/ert/unit_tests/forward_model_runner/test_file_reporter.py
index 4ac53b95ee2..e9f97d5fc72 100644
--- a/tests/ert/unit_tests/forward_model_runner/test_file_reporter.py
+++ b/tests/ert/unit_tests/forward_model_runner/test_file_reporter.py
@@ -3,7 +3,7 @@
import pytest
-from _ert.forward_model_runner.job import Job
+from _ert.forward_model_runner.forward_model_step import ForwardModelStep
from _ert.forward_model_runner.reporting import File
from _ert.forward_model_runner.reporting.message import (
Exited,
@@ -24,24 +24,26 @@ def reporter():
@pytest.mark.usefixtures("use_tmpdir")
def test_report_with_init_message_argument(reporter):
r = reporter
- job1 = Job({"name": "job1", "stdout": "/stdout", "stderr": "/stderr"}, 0)
+ fmstep1 = ForwardModelStep(
+ {"name": "fmstep1", "stdout": "/stdout", "stderr": "/stderr"}, 0
+ )
- r.report(Init([job1], 1, 19))
+ r.report(Init([fmstep1], 1, 19))
with open(STATUS_file, "r", encoding="utf-8") as f:
assert "Current host" in f.readline(), "STATUS file missing expected value"
with open(STATUS_json, "r", encoding="utf-8") as f:
content = "".join(f.readlines())
- assert '"name": "job1"' in content, "status.json missing job1"
+ assert '"name": "fmstep1"' in content, "status.json missing fmstep1"
assert '"status": "Waiting"' in content, "status.json missing Waiting status"
@pytest.mark.usefixtures("use_tmpdir")
def test_report_with_successful_start_message_argument(reporter):
msg = Start(
- Job(
+ ForwardModelStep(
{
- "name": "job1",
+ "name": "fmstep1",
"stdout": "/stdout.0",
"stderr": "/stderr.0",
"argList": ["--foo", "1", "--bar", "2"],
@@ -55,7 +57,7 @@ def test_report_with_successful_start_message_argument(reporter):
reporter.report(msg)
with open(STATUS_file, "r", encoding="utf-8") as f:
- assert "job1" in f.readline(), "STATUS file missing job1"
+ assert "fmstep1" in f.readline(), "STATUS file missing fmstep1"
with open(LOG_file, "r", encoding="utf-8") as f:
assert (
"Calling: /bin/sh --foo 1 --bar 2" in f.readline()
@@ -69,7 +71,7 @@ def test_report_with_successful_start_message_argument(reporter):
@pytest.mark.usefixtures("use_tmpdir")
def test_report_with_failed_start_message_argument(reporter):
- msg = Start(Job({"name": "job1"}, 0)).with_error("massive_failure")
+ msg = Start(ForwardModelStep({"name": "fmstep1"}, 0)).with_error("massive_failure")
reporter.status_dict = reporter._init_job_status_dict(msg.timestamp, 0, [msg.job])
reporter.report(msg)
@@ -86,12 +88,12 @@ def test_report_with_failed_start_message_argument(reporter):
), "status.json missing error message"
assert (
reporter.status_dict["jobs"][0]["end_time"] is not None
- ), "end_time not set for job1"
+ ), "end_time not set for fmstep1"
@pytest.mark.usefixtures("use_tmpdir")
def test_report_with_successful_exit_message_argument(reporter):
- msg = Exited(Job({"name": "job1"}, 0), 0)
+ msg = Exited(ForwardModelStep({"name": "fmstep1"}, 0), 0)
reporter.status_dict = reporter._init_job_status_dict(msg.timestamp, 0, [msg.job])
reporter.report(msg)
@@ -103,7 +105,9 @@ def test_report_with_successful_exit_message_argument(reporter):
@pytest.mark.usefixtures("use_tmpdir")
def test_report_with_failed_exit_message_argument(reporter):
- msg = Exited(Job({"name": "job1"}, 0), 1).with_error("massive_failure")
+ msg = Exited(ForwardModelStep({"name": "fmstep1"}, 0), 1).with_error(
+ "massive_failure"
+ )
reporter.status_dict = reporter._init_job_status_dict(msg.timestamp, 0, [msg.job])
reporter.report(msg)
@@ -112,7 +116,7 @@ def test_report_with_failed_exit_message_argument(reporter):
assert "EXIT: 1/massive_failure" in f.readline()
with open(ERROR_file, "r", encoding="utf-8") as f:
content = "".join(f.readlines())
- assert "job1" in content, "ERROR file missing job"
+ assert "fmstep1" in content, "ERROR file missing job"
assert (
"massive_failure" in content
), "ERROR file missing reason"
@@ -131,7 +135,7 @@ def test_report_with_failed_exit_message_argument(reporter):
@pytest.mark.usefixtures("use_tmpdir")
def test_report_with_running_message_argument(reporter):
msg = Running(
- Job({"name": "job1"}, 0),
+ ForwardModelStep({"name": "fmstep1"}, 0),
ProcessTreeStatus(max_rss=100, rss=10, cpu_seconds=1.1),
)
reporter.status_dict = reporter._init_job_status_dict(msg.timestamp, 0, [msg.job])
@@ -170,7 +174,8 @@ def test_dump_error_file_with_stderr(reporter):
stderr.write("E_MASSIVE_FAILURE\n")
reporter._dump_error_file(
- Job({"name": "job1", "stderr": "stderr.out.0"}, 0), "massive_failure"
+ ForwardModelStep({"name": "fmstep1", "stderr": "stderr.out.0"}, 0),
+ "massive_failure",
)
with open(ERROR_file, "r", encoding="utf-8") as f:
@@ -199,8 +204,8 @@ def test_status_file_is_correct(reporter):
such.
See https://github.com/equinor/libres/issues/764
"""
- j_1 = Job({"name": "j_1", "executable": "", "argList": []}, 0)
- j_2 = Job({"name": "j_2", "executable": "", "argList": []}, 0)
+ j_1 = ForwardModelStep({"name": "j_1", "executable": "", "argList": []}, 0)
+ j_2 = ForwardModelStep({"name": "j_2", "executable": "", "argList": []}, 0)
init = Init([j_1, j_2], 1, 1)
start_j_1 = Start(j_1)
exited_j_1 = Exited(j_1, 0)
diff --git a/tests/ert/unit_tests/forward_model_runner/test_job.py b/tests/ert/unit_tests/forward_model_runner/test_forward_model_step.py
similarity index 89%
rename from tests/ert/unit_tests/forward_model_runner/test_job.py
rename to tests/ert/unit_tests/forward_model_runner/test_forward_model_step.py
index 5bfd3c1b056..0a41cdb5a8a 100644
--- a/tests/ert/unit_tests/forward_model_runner/test_job.py
+++ b/tests/ert/unit_tests/forward_model_runner/test_forward_model_step.py
@@ -12,23 +12,26 @@
import numpy as np
import pytest
-from _ert.forward_model_runner.job import Job, _get_processtree_data
+from _ert.forward_model_runner.forward_model_step import (
+ ForwardModelStep,
+ _get_processtree_data,
+)
from _ert.forward_model_runner.reporting.message import Exited, Running, Start
-@patch("_ert.forward_model_runner.job.check_executable")
-@patch("_ert.forward_model_runner.job.Popen")
-@patch("_ert.forward_model_runner.job.Process")
+@patch("_ert.forward_model_runner.forward_model_step.check_executable")
+@patch("_ert.forward_model_runner.forward_model_step.Popen")
+@patch("_ert.forward_model_runner.forward_model_step.Process")
@pytest.mark.usefixtures("use_tmpdir")
def test_run_with_process_failing(mock_process, mock_popen, mock_check_executable):
- job = Job({}, 0)
+ fmstep = ForwardModelStep({}, 0)
mock_check_executable.return_value = ""
type(mock_process.return_value.memory_info.return_value).rss = PropertyMock(
return_value=10
)
mock_process.return_value.wait.return_value = 9
- run = job.run()
+ run = fmstep.run()
assert isinstance(next(run), Start), "run did not yield Start message"
assert isinstance(next(run), Running), "run did not yield Running message"
@@ -44,7 +47,7 @@ def test_run_with_process_failing(mock_process, mock_popen, mock_check_executabl
@pytest.mark.integration_test
@pytest.mark.usefixtures("use_tmpdir")
def test_cpu_seconds_can_detect_multiprocess():
- """Run a job that sets of two simultaneous processes that
+ """Run a fm step that sets of two simultaneous processes that
each run for 1 second. We should be able to detect the total
cpu seconds consumed to be roughly 2 seconds.
@@ -77,15 +80,15 @@ def test_cpu_seconds_can_detect_multiprocess():
)
executable = os.path.realpath(scriptname)
os.chmod(scriptname, stat.S_IRWXU | stat.S_IRWXO | stat.S_IRWXG)
- job = Job(
+ fmstep = ForwardModelStep(
{
"executable": executable,
},
0,
)
- job.MEMORY_POLL_PERIOD = 0.05
+ fmstep.MEMORY_POLL_PERIOD = 0.05
cpu_seconds = 0.0
- for status in job.run():
+ for status in fmstep.run():
if isinstance(status, Running):
cpu_seconds = max(cpu_seconds, status.memory_status.cpu_seconds)
assert 1.4 < cpu_seconds < 2.2
@@ -118,16 +121,16 @@ def test_memory_usage_counts_grandchildren():
os.chmod(scriptname, stat.S_IRWXU | stat.S_IRWXO | stat.S_IRWXG)
def max_memory_per_subprocess_layer(layers: int) -> int:
- job = Job(
+ fmstep = ForwardModelStep(
{
"executable": executable,
"argList": [str(layers), str(int(1e6))],
},
0,
)
- job.MEMORY_POLL_PERIOD = 0.01
+ fmstep.MEMORY_POLL_PERIOD = 0.01
max_seen = 0
- for status in job.run():
+ for status in fmstep.run():
if isinstance(status, Running):
max_seen = max(max_seen, status.memory_status.max_rss)
return max_seen
@@ -165,7 +168,7 @@ def test_memory_profile_in_running_events():
executable = os.path.realpath(scriptname)
os.chmod(scriptname, stat.S_IRWXU | stat.S_IRWXO | stat.S_IRWXG)
- fm_step = Job(
+ fm_step = ForwardModelStep(
{
"executable": executable,
"argList": [""],
@@ -258,7 +261,7 @@ def read_text_side_effect(self: pathlib.Path, *args, **kwargs):
@pytest.mark.usefixtures("use_tmpdir")
def test_run_fails_using_exit_bash_builtin():
- job = Job(
+ fmstep = ForwardModelStep(
{
"name": "exit 1",
"executable": "/bin/sh",
@@ -269,7 +272,7 @@ def test_run_fails_using_exit_bash_builtin():
0,
)
- statuses = list(job.run())
+ statuses = list(fmstep.run())
assert len(statuses) == 3, "Wrong statuses count"
assert statuses[2].exit_code == 1, "Exited status wrong exit_code"
@@ -281,7 +284,7 @@ def test_run_fails_using_exit_bash_builtin():
@pytest.mark.usefixtures("use_tmpdir")
def test_run_with_defined_executable_but_missing():
executable = os.path.join(os.getcwd(), "this/is/not/a/file")
- job = Job(
+ fmstep = ForwardModelStep(
{
"name": "TEST_EXECUTABLE_NOT_FOUND",
"executable": executable,
@@ -291,7 +294,7 @@ def test_run_with_defined_executable_but_missing():
0,
)
- start_message = next(job.run())
+ start_message = next(fmstep.run())
assert isinstance(start_message, Start)
assert "this/is/not/a/file is not a file" in start_message.error_message
@@ -304,7 +307,7 @@ def test_run_with_empty_executable():
st = os.stat(empty_executable)
os.chmod(empty_executable, st.st_mode | stat.S_IEXEC)
- job = Job(
+ fmstep = ForwardModelStep(
{
"name": "TEST_EXECUTABLE_NOT_EXECUTABLE",
"executable": empty_executable,
@@ -313,7 +316,7 @@ def test_run_with_empty_executable():
},
0,
)
- run_status = list(job.run())
+ run_status = list(fmstep.run())
assert len(run_status) == 2
start_msg, exit_msg = run_status
assert isinstance(start_msg, Start)
@@ -328,7 +331,7 @@ def test_run_with_defined_executable_no_exec_bit():
with open(non_executable, "a", encoding="utf-8"):
pass
- job = Job(
+ fmstep = ForwardModelStep(
{
"name": "TEST_EXECUTABLE_NOT_EXECUTABLE",
"executable": non_executable,
@@ -337,30 +340,30 @@ def test_run_with_defined_executable_no_exec_bit():
},
0,
)
- start_message = next(job.run())
+ start_message = next(fmstep.run())
assert isinstance(start_message, Start)
assert "foo is not an executable" in start_message.error_message
-def test_init_job_no_std():
- job = Job(
+def test_init_fmstep_no_std():
+ fmstep = ForwardModelStep(
{},
0,
)
- assert job.std_err is None
- assert job.std_out is None
+ assert fmstep.std_err is None
+ assert fmstep.std_out is None
-def test_init_job_with_std():
- job = Job(
+def test_init_fmstep_with_std():
+ fmstep = ForwardModelStep(
{
"stdout": "exit_out",
"stderr": "exit_err",
},
0,
)
- assert job.std_err == "exit_err"
- assert job.std_out == "exit_out"
+ assert fmstep.std_err == "exit_err"
+ assert fmstep.std_out == "exit_out"
def test_makedirs(monkeypatch, tmp_path):
@@ -369,7 +372,7 @@ def test_makedirs(monkeypatch, tmp_path):
they don't exist
"""
monkeypatch.chdir(tmp_path)
- job = Job(
+ fmstep = ForwardModelStep(
{
"executable": "true",
"stdout": "a/file",
@@ -377,7 +380,7 @@ def test_makedirs(monkeypatch, tmp_path):
},
0,
)
- for _ in job.run():
+ for _ in fmstep.run():
pass
assert (tmp_path / "a/file").is_file()
assert (tmp_path / "b/c/file").is_file()
diff --git a/tests/ert/unit_tests/forward_model_runner/test_job_dispatch.py b/tests/ert/unit_tests/forward_model_runner/test_job_dispatch.py
index efbd49a04e3..58075884bb8 100644
--- a/tests/ert/unit_tests/forward_model_runner/test_job_dispatch.py
+++ b/tests/ert/unit_tests/forward_model_runner/test_job_dispatch.py
@@ -19,7 +19,7 @@
import _ert.forward_model_runner.cli
from _ert.forward_model_runner.cli import JOBS_FILE, _setup_reporters, main
-from _ert.forward_model_runner.job import killed_by_oom
+from _ert.forward_model_runner.forward_model_step import killed_by_oom
from _ert.forward_model_runner.reporting import Event, Interactive
from _ert.forward_model_runner.reporting.message import Finish, Init
from _ert.threading import ErtThread