Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename job_dispatch -> fm_dispatch #9534

Merged
merged 1 commit into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/ert/developer_documentation/forward_model.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ The input to a forward model consists of:
replaced in the runpath.

After this, the forward model is submitted to the queue system. In particular,
this entails that the script :code:`job_dispatch.py` is executed with the runpath of
this entails that the script :code:`fm_dispatch.py` is executed with the runpath of
the forward model as an argument. It will locate the :code:`jobs.json` file and
execute the forward model as prescribed. During execution the status of the
forward model is dumped to the :code:`status.json` file. It contains information
Expand Down
4 changes: 2 additions & 2 deletions docs/ert/developer_documentation/roadmap.rst
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ parties also can consume the messages.
Forward models
~~~~~~~~~~~~~~

Currently this is done by the jobs in the forward model logging to files. These
should probably be propagated by the :code:`job_dispatch` to the rest of the system
Currently this is done by the forward model steps in the forward model logging to files. These
should probably be propagated by the :code:`fm_dispatch` to the rest of the system
via a message passing system.

Workflows
Expand Down
2 changes: 1 addition & 1 deletion docs/ert/reference/configuration/site_wide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ properties of the cluster. This could be an example site config file: ::

WORKFLOW_JOB_DIRECTORY workflows/jobs/internal-gui/config

JOB_SCRIPT ../../bin/job_dispatch.py
JOB_SCRIPT ../../bin/fm_dispatch.py
INSTALL_JOB_DIRECTORY forward_models/res
INSTALL_JOB_DIRECTORY forward_models/shell
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ dependencies = [

[project.scripts]
ert = "ert.__main__:main"
"job_dispatch.py" = "_ert.forward_model_runner.job_dispatch:main"
"fm_dispatch.py" = "_ert.forward_model_runner.fm_dispatch:main"
everest = "everest.bin.main:start_everest"
everserver = "everest.detached.jobs.everserver:main"
recovery_factor = "everest.jobs.scripts.recovery_factor:main"
Expand Down
4 changes: 1 addition & 3 deletions src/_ert/forward_model_runner/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,7 @@ def main(args):
try:
reporter.report(job_status)
except OSError as oserror:
print(
f"job_dispatch failed due to {oserror}. Stopping and cleaning up."
)
print(f"fm_dispatch failed due to {oserror}. Stopping and cleaning up.")
_stop_reporters_and_sigkill(reporters)

if isinstance(job_status, Finish) and not job_status.success():
Expand Down
4 changes: 2 additions & 2 deletions src/_ert/forward_model_runner/forward_model_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,11 +282,11 @@ def handle_process_timeout_and_create_exited_msg(
return None

# If the spawned process is not in the same process group as
# the callee (job_dispatch), we will kill the process group
# the callee (fm_dispatch), we will kill the process group
# explicitly.
#
# Propagating the unsuccessful Exited message will kill the
# callee group. See job_dispatch.py.
# callee group. See fm_dispatch.py.
process_group_id = os.getpgid(proc.pid)
this_group_id = os.getpgid(os.getpid())
if process_group_id != this_group_id:
Expand Down
4 changes: 2 additions & 2 deletions src/ert/config/queue_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ def _group_queue_options_by_queue_system(

@dataclass
class QueueConfig:
job_script: str = shutil.which("job_dispatch.py") or "job_dispatch.py"
job_script: str = shutil.which("fm_dispatch.py") or "fm_dispatch.py"
realization_memory: int = 0
max_submit: int = 1
queue_system: QueueSystem = QueueSystem.LOCAL
Expand All @@ -291,7 +291,7 @@ def from_dict(cls, config_dict: ConfigDict) -> QueueConfig:
config_dict.get("QUEUE_SYSTEM", QueueSystem.LOCAL)
)
job_script: str = config_dict.get(
"JOB_SCRIPT", shutil.which("job_dispatch.py") or "job_dispatch.py"
"JOB_SCRIPT", shutil.which("fm_dispatch.py") or "fm_dispatch.py"
)
realization_memory: int = _parse_realization_memory_str(
config_dict.get(ConfigKeys.REALIZATION_MEMORY, "0b")
Expand Down
2 changes: 1 addition & 1 deletion src/ert/plugins/hook_implementations/site_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
@ert.plugin(name="ert") # type: ignore
def site_config_lines():
return [
"JOB_SCRIPT job_dispatch.py",
"JOB_SCRIPT fm_dispatch.py",
"QUEUE_SYSTEM LOCAL",
"QUEUE_OPTION LOCAL MAX_RUNNING 1",
]
2 changes: 1 addition & 1 deletion src/ert/resources/site-config
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
WORKFLOW_JOB_DIRECTORY workflows/jobs/shell
WORKFLOW_JOB_DIRECTORY workflows/jobs/internal-gui/config

JOB_SCRIPT job_dispatch.py
JOB_SCRIPT fm_dispatch.py

QUEUE_SYSTEM LOCAL
QUEUE_OPTION LOCAL MAX_RUNNING 1
2 changes: 1 addition & 1 deletion tests/ert/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def _qt_add_search_paths(qapp):
@pytest.fixture()
def set_site_config(monkeypatch, tmp_path):
test_site_config = tmp_path / "test_site_config.ert"
test_site_config.write_text("JOB_SCRIPT job_dispatch.py\nQUEUE_SYSTEM LOCAL\n")
test_site_config.write_text("JOB_SCRIPT fm_dispatch.py\nQUEUE_SYSTEM LOCAL\n")
monkeypatch.setenv("ERT_SITE_CONFIG", str(test_site_config))


Expand Down
2 changes: 1 addition & 1 deletion tests/ert/ui_tests/cli/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ def test_that_setenv_sets_environment_variables_in_jobs(setenv_config):

path = os.environ["PATH"]

# and then job_dispatch should expand the variables on the compute side
# and then fm_dispatch should expand the variables on the compute side
with open("simulations/realization-0/iter-0/ECHO.stdout.0", encoding="utf-8") as f:
lines = f.readlines()
assert len(lines) == 4
Expand Down
2 changes: 1 addition & 1 deletion tests/ert/unit_tests/config/test_ert_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def test_that_job_script_can_be_set_in_site_config(monkeypatch):
os.chmod(my_script, st.st_mode | stat.S_IEXEC)
test_site_config.write_text(
dedent(f"""
JOB_SCRIPT job_dispatch.py
JOB_SCRIPT fm_dispatch.py
JOB_SCRIPT {my_script}
QUEUE_SYSTEM LOCAL
"""),
Expand Down
2 changes: 1 addition & 1 deletion tests/ert/unit_tests/config/test_queue_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ def test_that_overwriting_QUEUE_OPTIONS_warns(
f"QUEUE_SYSTEM {queue_system}\n"
f"QUEUE_OPTION {queue_system} {queue_system_option} test_1\n"
f"QUEUE_OPTION {queue_system} MAX_RUNNING 10\n",
site_config_contents="JOB_SCRIPT job_dispatch.py\n"
site_config_contents="JOB_SCRIPT fm_dispatch.py\n"
f"QUEUE_SYSTEM {queue_system}\n"
f"QUEUE_OPTION {queue_system} {queue_system_option} test_0\n"
f"QUEUE_OPTION {queue_system} MAX_RUNNING 10\n",
Expand Down
2 changes: 1 addition & 1 deletion tests/ert/unit_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
def ensure_bin_in_path():
"""
Running pytest directly without enabling a virtualenv is perfectly valid.
However, our tests assume that `job_dispatch.py` is in PATH which it may not be.
However, our tests assume that `fm_dispatch.py` is in PATH which it may not be.
This fixture prepends the path to the current Python for tests to pass when not
in a virtualenv.
"""
Expand Down
4 changes: 2 additions & 2 deletions tests/ert/unit_tests/ensemble_evaluator/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def snapshot():
@pytest.fixture(name="queue_config")
def queue_config_fixture():
return QueueConfig(
job_script="job_dispatch.py",
job_script="fm_dispatch.py",
max_submit=1,
queue_system=QueueSystem.LOCAL,
queue_options=LocalQueueOptions(max_running=50),
Expand Down Expand Up @@ -118,7 +118,7 @@ async def load_successful(**_):
active=True,
iens=iens,
fm_steps=forward_model_list,
job_script="job_dispatch.py",
job_script="fm_dispatch.py",
max_runtime=10,
num_cpu=1,
run_arg=RunArg(
Expand Down
26 changes: 13 additions & 13 deletions tests/ert/unit_tests/forward_model_runner/test_job_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,20 +92,20 @@ def test_terminate_steps():
)
os.chmod("setsid", 0o755)

job_dispatch_script = importlib.util.find_spec(
"_ert.forward_model_runner.job_dispatch"
fm_dispatch_script = importlib.util.find_spec(
"_ert.forward_model_runner.fm_dispatch"
).origin
# (we wait for the process below)
job_dispatch_process = Popen(
fm_dispatch_process = Popen(
[
os.getcwd() + "/setsid",
sys.executable,
job_dispatch_script,
fm_dispatch_script,
os.getcwd(),
]
)

p = psutil.Process(job_dispatch_process.pid)
p = psutil.Process(fm_dispatch_process.pid)

# Three levels of processes should spawn 8 children in total
wait_until(lambda: len(p.children(recursive=True)) == 8)
Expand Down Expand Up @@ -148,7 +148,7 @@ def test_memory_profile_is_logged_as_csv():
subprocess.run(
[
sys.executable,
importlib.util.find_spec("_ert.forward_model_runner.job_dispatch").origin,
importlib.util.find_spec("_ert.forward_model_runner.fm_dispatch").origin,
os.getcwd(),
],
check=False,
Expand All @@ -162,7 +162,7 @@ def test_memory_profile_is_logged_as_csv():


@pytest.mark.usefixtures("use_tmpdir")
def test_job_dispatch_run_subset_specified_as_parameter():
def test_fm_dispatch_run_subset_specified_as_parameter():
with open("dummy_executable", "w", encoding="utf-8") as f:
f.write(
"#!/usr/bin/env python\n"
Expand Down Expand Up @@ -256,22 +256,22 @@ def test_job_dispatch_run_subset_specified_as_parameter():
)
os.chmod("setsid", 0o755)

job_dispatch_script = importlib.util.find_spec(
"_ert.forward_model_runner.job_dispatch"
fm_dispatch_script = importlib.util.find_spec(
"_ert.forward_model_runner.fm_dispatch"
).origin
# (we wait for the process below)
job_dispatch_process = Popen(
fm_dispatch_process = Popen(
[
os.getcwd() + "/setsid",
sys.executable,
job_dispatch_script,
fm_dispatch_script,
os.getcwd(),
"step_B",
"step_C",
]
)

job_dispatch_process.wait()
fm_dispatch_process.wait()

assert not os.path.isfile("step_A.out")
assert os.path.isfile("step_B.out")
Expand Down Expand Up @@ -344,7 +344,7 @@ def test_setup_reporters(is_interactive_run, ens_id):


@pytest.mark.usefixtures("use_tmpdir")
def test_job_dispatch_kills_itself_after_unsuccessful_job(unused_tcp_port):
def test_fm_dispatch_kills_itself_after_unsuccessful_job(unused_tcp_port):
host = "localhost"
port = unused_tcp_port
jobs_json = json.dumps({"ens_id": "_id_", "dispatch_url": f"ws://localhost:{port}"})
Expand Down
2 changes: 1 addition & 1 deletion tests/ert/unit_tests/plugins/dummy_plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def some_func():

@plugin(name="dummy")
def site_config_lines():
return ["JOB_SCRIPT job_dispatch_dummy.py", "QUEUE_OPTION LOCAL MAX_RUNNING 2"]
return ["JOB_SCRIPT fm_dispatch_dummy.py", "QUEUE_OPTION LOCAL MAX_RUNNING 2"]


@plugin(name="dummy")
Expand Down
6 changes: 3 additions & 3 deletions tests/ert/unit_tests/plugins/test_plugin_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def test_no_plugins():

assert pm._site_config_lines() == [
"-- Content below originated from ert (site_config_lines)",
"JOB_SCRIPT job_dispatch.py",
"JOB_SCRIPT fm_dispatch.py",
"QUEUE_SYSTEM LOCAL",
"QUEUE_OPTION LOCAL MAX_RUNNING 1",
]
Expand All @@ -52,11 +52,11 @@ def test_with_plugins():

assert pm._site_config_lines() == [
"-- Content below originated from ert (site_config_lines)",
"JOB_SCRIPT job_dispatch.py",
"JOB_SCRIPT fm_dispatch.py",
"QUEUE_SYSTEM LOCAL",
"QUEUE_OPTION LOCAL MAX_RUNNING 1",
"-- Content below originated from dummy (site_config_lines)",
"JOB_SCRIPT job_dispatch_dummy.py",
"JOB_SCRIPT fm_dispatch_dummy.py",
"QUEUE_OPTION LOCAL MAX_RUNNING 2",
]

Expand Down
2 changes: 1 addition & 1 deletion tests/ert/unit_tests/scheduler/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def realization():
max_runtime=None,
run_arg=run_arg,
num_cpu=1,
job_script=str(shutil.which("job_dispatch.py")),
job_script=str(shutil.which("fm_dispatch.py")),
realization_memory=0,
)
return realization
Expand Down
2 changes: 1 addition & 1 deletion tests/ert/unit_tests/scheduler/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def create_stub_realization(ensemble, base_path: Path, iens) -> Realization:
max_runtime=None,
run_arg=run_arg,
num_cpu=1,
job_script=str(shutil.which("job_dispatch.py")),
job_script=str(shutil.which("fm_dispatch.py")),
realization_memory=0,
)
return realization
Expand Down
Loading