Skip to content

Commit

Permalink
Rename job.py to fm_step.py
Browse files Browse the repository at this point in the history
Plus follow-up in test code.
  • Loading branch information
berland committed Oct 23, 2024
1 parent e55b1f9 commit d2092cf
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def killed_by_oom(pids: Sequence[int]) -> bool:
return False


class Job:
class FMstep:
MEMORY_POLL_PERIOD = 5 # Seconds between memory polls

def __init__(
Expand Down
8 changes: 4 additions & 4 deletions src/_ert/forward_model_runner/reporting/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import psutil

if TYPE_CHECKING:
from _ert.forward_model_runner.job import Job
from _ert.forward_model_runner.job import FMstep

class _ChecksumDictBase(TypedDict):
type: Literal["file"]
Expand Down Expand Up @@ -71,7 +71,7 @@ def __repr__(cls):
class Message(metaclass=_MetaMessage):
def __init__(self, job=None):
self.timestamp = dt.now()
self.job: Optional[Job] = job
self.job: Optional[FMstep] = job
self.error_message: Optional[str] = None

def __repr__(self):
Expand Down Expand Up @@ -116,12 +116,12 @@ def __init__(self):


class Start(Message):
def __init__(self, job: "Job"):
def __init__(self, job: "FMstep"):
super().__init__(job)


class Running(Message):
def __init__(self, job: "Job", memory_status: ProcessTreeStatus):
def __init__(self, job: "FMstep", memory_status: ProcessTreeStatus):
super().__init__(job)
self.memory_status = memory_status

Expand Down
6 changes: 3 additions & 3 deletions src/_ert/forward_model_runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from pathlib import Path
from typing import List

from _ert.forward_model_runner.job import Job
from _ert.forward_model_runner.fm_step import FMstep
from _ert.forward_model_runner.reporting.message import Checksum, Finish, Init


Expand All @@ -21,9 +21,9 @@ def __init__(self, jobs_data):
if self.simulation_id is not None:
os.environ["ERT_RUN_ID"] = self.simulation_id

self.jobs: List[Job] = []
self.jobs: List[FMstep] = []
for index, job_data in enumerate(job_data_list):
self.jobs.append(Job(job_data, index))
self.jobs.append(FMstep(job_data, index))

self._set_environment()

Expand Down
78 changes: 39 additions & 39 deletions tests/ert/unit_tests/forward_model_runner/test_event_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
ClientConnectionClosedOK,
ClientConnectionError,
)
from _ert.forward_model_runner.job import Job
from _ert.forward_model_runner.fm_step import FMstep
from _ert.forward_model_runner.reporting import Event
from _ert.forward_model_runner.reporting.message import (
Exited,
Expand All @@ -41,11 +41,11 @@ def test_report_with_successful_start_message_argument(unused_tcp_port):
host = "localhost"
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
fmstep1 = FMstep({"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0)
lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Start(job1))
reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Start(fmstep1))
reporter.report(Finish())

assert len(lines) == 1
Expand All @@ -63,13 +63,13 @@ def test_report_with_failed_start_message_argument(unused_tcp_port):
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)

job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
fmstep1 = FMstep({"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0)

lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))

msg = Start(job1).with_error("massive_failure")
msg = Start(fmstep1).with_error("massive_failure")

reporter.report(msg)
reporter.report(Finish())
Expand All @@ -84,12 +84,12 @@ def test_report_with_successful_exit_message_argument(unused_tcp_port):
host = "localhost"
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
fmstep1 = FMstep({"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0)

lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Exited(job1, 0))
reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Exited(fmstep1, 0))
reporter.report(Finish().with_error("failed"))

assert len(lines) == 1
Expand All @@ -101,12 +101,12 @@ def test_report_with_failed_exit_message_argument(unused_tcp_port):
host = "localhost"
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
fmstep1 = FMstep({"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0)

lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Exited(job1, 1).with_error("massive_failure"))
reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Exited(fmstep1, 1).with_error("massive_failure"))
reporter.report(Finish())

assert len(lines) == 1
Expand All @@ -119,12 +119,12 @@ def test_report_with_running_message_argument(unused_tcp_port):
host = "localhost"
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
fmstep1 = FMstep({"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0)

lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(job1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Finish())

assert len(lines) == 1
Expand All @@ -138,12 +138,12 @@ def test_report_only_job_running_for_successful_run(unused_tcp_port):
host = "localhost"
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
fmstep1 = FMstep({"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0)

lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(job1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Finish())

assert len(lines) == 1
Expand All @@ -153,12 +153,12 @@ def test_report_with_failed_finish_message_argument(unused_tcp_port):
host = "localhost"
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
fmstep1 = FMstep({"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0)

lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(job1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Finish().with_error("massive_failure"))

assert len(lines) == 1
Expand Down Expand Up @@ -195,16 +195,16 @@ def mock_send(msg):
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
reporter._reporter_timeout = 4
job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
fmstep1 = FMstep({"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0)
lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
with patch(
"_ert.forward_model_runner.client.Client.send", lambda x, y: mock_send(y)
):
reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(job1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Running(job1, ProcessTreeStatus(max_rss=1100, rss=10)))
reporter.report(Running(job1, ProcessTreeStatus(max_rss=1100, rss=10)))
reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=1100, rss=10)))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=1100, rss=10)))
# set _stop_timestamp
reporter.report(Finish())
if reporter._event_publisher_thread.is_alive():
Expand Down Expand Up @@ -234,16 +234,16 @@ def send_func(msg):
host = "localhost"
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
fmstep1 = FMstep({"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0)
lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
with patch("_ert.forward_model_runner.client.Client.send") as patched_send:
patched_send.side_effect = send_func

reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(job1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Running(job1, ProcessTreeStatus(max_rss=200, rss=10)))
reporter.report(Running(job1, ProcessTreeStatus(max_rss=300, rss=10)))
reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=200, rss=10)))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=300, rss=10)))

_wait_until(
condition=lambda: patched_send.call_count == 3,
Expand Down Expand Up @@ -275,12 +275,12 @@ def mock_send(msg):
host = "localhost"
url = f"ws://{host}:{unused_tcp_port}"
reporter = Event(evaluator_url=url)
job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0)
fmstep1 = FMstep({"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0)
lines = []
with _mock_ws_thread(host, unused_tcp_port, lines):
reporter.report(Init([job1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(job1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Running(job1, ProcessTreeStatus(max_rss=200, rss=10)))
reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=100, rss=10)))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=200, rss=10)))

# sleep until both Running events have been received
_wait_until(
Expand All @@ -292,20 +292,20 @@ def mock_send(msg):
with patch(
"_ert.forward_model_runner.client.Client.send", lambda x, y: mock_send(y)
):
reporter.report(Running(job1, ProcessTreeStatus(max_rss=300, rss=10)))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=300, rss=10)))
# Make sure the publisher thread exits because it got
# ClientConnectionClosedOK. If it hangs it could indicate that the
# exception is not caught/handled correctly
if reporter._event_publisher_thread.is_alive():
reporter._event_publisher_thread.join()

reporter.report(Running(job1, ProcessTreeStatus(max_rss=400, rss=10)))
reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=400, rss=10)))
reporter.report(Finish())

# set _stop_timestamp was not set to None since the reporter finished on time
assert reporter._timeout_timestamp is not None

# The Running(job1, 300, 10) is popped from the queue, but never sent.
# The Running(fmstep1, 300, 10) is popped from the queue, but never sent.
# The following Running is added to queue along with the sentinel
assert reporter._event_queue.qsize() == 2
# None of the messages after ClientConnectionClosedOK was raised, has been sent
Expand Down
32 changes: 16 additions & 16 deletions tests/ert/unit_tests/forward_model_runner/test_file_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import pytest

from _ert.forward_model_runner.job import Job
from _ert.forward_model_runner.fm_step import FMstep
from _ert.forward_model_runner.reporting import File
from _ert.forward_model_runner.reporting.message import (
Exited,
Expand All @@ -24,24 +24,24 @@ def reporter():
@pytest.mark.usefixtures("use_tmpdir")
def test_report_with_init_message_argument(reporter):
r = reporter
job1 = Job({"name": "job1", "stdout": "/stdout", "stderr": "/stderr"}, 0)
fmstep1 = FMstep({"name": "fmstep1", "stdout": "/stdout", "stderr": "/stderr"}, 0)

r.report(Init([job1], 1, 19))
r.report(Init([fmstep1], 1, 19))

with open(STATUS_file, "r", encoding="utf-8") as f:
assert "Current host" in f.readline(), "STATUS file missing expected value"
with open(STATUS_json, "r", encoding="utf-8") as f:
content = "".join(f.readlines())
assert '"name": "job1"' in content, "status.json missing job1"
assert '"name": "fmstep1"' in content, "status.json missing fmstep1"
assert '"status": "Waiting"' in content, "status.json missing Waiting status"


@pytest.mark.usefixtures("use_tmpdir")
def test_report_with_successful_start_message_argument(reporter):
msg = Start(
Job(
FMstep(
{
"name": "job1",
"name": "fmstep1",
"stdout": "/stdout.0",
"stderr": "/stderr.0",
"argList": ["--foo", "1", "--bar", "2"],
Expand All @@ -55,7 +55,7 @@ def test_report_with_successful_start_message_argument(reporter):
reporter.report(msg)

with open(STATUS_file, "r", encoding="utf-8") as f:
assert "job1" in f.readline(), "STATUS file missing job1"
assert "fmstep1" in f.readline(), "STATUS file missing fmstep1"
with open(LOG_file, "r", encoding="utf-8") as f:
assert (
"Calling: /bin/sh --foo 1 --bar 2" in f.readline()
Expand All @@ -69,7 +69,7 @@ def test_report_with_successful_start_message_argument(reporter):

@pytest.mark.usefixtures("use_tmpdir")
def test_report_with_failed_start_message_argument(reporter):
msg = Start(Job({"name": "job1"}, 0)).with_error("massive_failure")
msg = Start(FMstep({"name": "fmstep1"}, 0)).with_error("massive_failure")
reporter.status_dict = reporter._init_job_status_dict(msg.timestamp, 0, [msg.job])

reporter.report(msg)
Expand All @@ -86,12 +86,12 @@ def test_report_with_failed_start_message_argument(reporter):
), "status.json missing error message"
assert (
reporter.status_dict["jobs"][0]["end_time"] is not None
), "end_time not set for job1"
), "end_time not set for fmstep1"


@pytest.mark.usefixtures("use_tmpdir")
def test_report_with_successful_exit_message_argument(reporter):
msg = Exited(Job({"name": "job1"}, 0), 0)
msg = Exited(FMstep({"name": "fmstep1"}, 0), 0)
reporter.status_dict = reporter._init_job_status_dict(msg.timestamp, 0, [msg.job])

reporter.report(msg)
Expand All @@ -103,7 +103,7 @@ def test_report_with_successful_exit_message_argument(reporter):

@pytest.mark.usefixtures("use_tmpdir")
def test_report_with_failed_exit_message_argument(reporter):
msg = Exited(Job({"name": "job1"}, 0), 1).with_error("massive_failure")
msg = Exited(FMstep({"name": "fmstep1"}, 0), 1).with_error("massive_failure")
reporter.status_dict = reporter._init_job_status_dict(msg.timestamp, 0, [msg.job])

reporter.report(msg)
Expand All @@ -112,7 +112,7 @@ def test_report_with_failed_exit_message_argument(reporter):
assert "EXIT: 1/massive_failure" in f.readline()
with open(ERROR_file, "r", encoding="utf-8") as f:
content = "".join(f.readlines())
assert "<job>job1</job>" in content, "ERROR file missing job"
assert "<job>fmstep1</job>" in content, "ERROR file missing job"
assert (
"<reason>massive_failure</reason>" in content
), "ERROR file missing reason"
Expand All @@ -131,7 +131,7 @@ def test_report_with_failed_exit_message_argument(reporter):
@pytest.mark.usefixtures("use_tmpdir")
def test_report_with_running_message_argument(reporter):
msg = Running(
Job({"name": "job1"}, 0),
FMstep({"name": "fmstep1"}, 0),
ProcessTreeStatus(max_rss=100, rss=10, cpu_seconds=1.1),
)
reporter.status_dict = reporter._init_job_status_dict(msg.timestamp, 0, [msg.job])
Expand Down Expand Up @@ -170,7 +170,7 @@ def test_dump_error_file_with_stderr(reporter):
stderr.write("E_MASSIVE_FAILURE\n")

reporter._dump_error_file(
Job({"name": "job1", "stderr": "stderr.out.0"}, 0), "massive_failure"
FMstep({"name": "fmstep1", "stderr": "stderr.out.0"}, 0), "massive_failure"
)

with open(ERROR_file, "r", encoding="utf-8") as f:
Expand Down Expand Up @@ -199,8 +199,8 @@ def test_status_file_is_correct(reporter):
such.
See https://github.com/equinor/libres/issues/764
"""
j_1 = Job({"name": "j_1", "executable": "", "argList": []}, 0)
j_2 = Job({"name": "j_2", "executable": "", "argList": []}, 0)
j_1 = FMstep({"name": "j_1", "executable": "", "argList": []}, 0)
j_2 = FMstep({"name": "j_2", "executable": "", "argList": []}, 0)
init = Init([j_1, j_2], 1, 1)
start_j_1 = Start(j_1)
exited_j_1 = Exited(j_1, 0)
Expand Down
Loading

0 comments on commit d2092cf

Please sign in to comment.