Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ignore OSErrors on subprocess call of bjobs and bhist #9696

Merged
merged 2 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions src/ert/scheduler/lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,9 +444,10 @@ async def poll(self) -> None:
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
except FileNotFoundError as e:
except OSError as e:
logger.error(str(e))
return
await asyncio.sleep(self._poll_period)
continue

stdout, stderr = await process.communicate()
if process.returncode:
Expand Down Expand Up @@ -583,12 +584,17 @@ async def _poll_once_by_bhist(
if time.time() - self._bhist_cache_timestamp < self._bhist_required_cache_age:
return {}

process = await asyncio.create_subprocess_exec(
self._bhist_cmd,
*[str(job_id) for job_id in missing_job_ids],
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
process = await asyncio.create_subprocess_exec(
self._bhist_cmd,
*[str(job_id) for job_id in missing_job_ids],
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
except OSError as e:
logger.error(str(e))
return {}

stdout, stderr = await process.communicate()
if process.returncode:
logger.error(
Expand Down
8 changes: 5 additions & 3 deletions src/ert/scheduler/openpbs_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,10 @@ async def poll(self) -> None:
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
except FileNotFoundError as e:
except OSError as e:
logger.error(str(e))
return
await asyncio.sleep(self._poll_period)
continue
stdout, stderr = await process.communicate()
if process.returncode not in {0, QSTAT_UNKNOWN_JOB_ID}:
# Any unknown job ids will yield QSTAT_UNKNOWN_JOB_ID, but
Expand Down Expand Up @@ -331,7 +332,8 @@ async def _process_job_update(self, job_id: str, new_state: AnyJob) -> None:
iens, old_state = self._jobs[job_id]
if isinstance(new_state, IgnoredJobstates):
logger.debug(
f"Job ID '{job_id}' for {iens=} is of unknown job state '{new_state.job_state}'"
f"Job ID '{job_id}' for {iens=} is of "
f"unknown job state '{new_state.job_state}'"
)
return

Expand Down
5 changes: 3 additions & 2 deletions src/ert/scheduler/slurm_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,10 @@ async def poll(self) -> None:
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
except FileNotFoundError as e:
except OSError as e:
logger.error(str(e))
return
await asyncio.sleep(self._poll_period)
continue
stdout, stderr = await process.communicate()
if process.returncode:
logger.warning(
Expand Down
11 changes: 7 additions & 4 deletions tests/ert/unit_tests/scheduler/test_generic_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,19 +230,22 @@ async def test_execute_with_retry_exits_on_filenotfounderror(driver: Driver, cap


@pytest.mark.integration_test
async def test_poll_exits_on_filenotfounderror(driver: Driver, caplog):
async def test_poll_ignores_filenotfounderror(driver: Driver, caplog):
if isinstance(driver, LocalDriver):
pytest.skip("LocalDriver does not poll")
driver._poll_period = 0.01
caplog.set_level(logging.DEBUG)
invalid_cmd = ["/usr/bin/foo", "bar"]
driver._bjobs_cmd = invalid_cmd
driver._qstat_cmd = invalid_cmd
driver._squeue = invalid_cmd
driver._jobs = {"foo": "bar"}
driver._non_finished_job_ids = ["foo"]
await driver.poll()
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(driver.poll(), timeout=0.1)

# We log a retry message every time we retry
assert "retry" not in str(caplog.text)
assert "retry" not in str(caplog.text), (
"_execute_with_retry() should not be invoked in poll()"
)
assert "No such file or directory" in str(caplog.text)
assert "/usr/bin/foo" in str(caplog.text)
21 changes: 19 additions & 2 deletions tests/ert/unit_tests/scheduler/test_lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,8 @@ async def test_that_bsub_will_retry_and_fail(
driver._max_bsub_attempts = 2
driver._sleep_time_between_cmd_retries = 0.2
match_str = (
f'failed after 2 attempts with exit code {exit_code}.*error: "{error_msg if error_msg else "<empty>"}"'
f"failed after 2 attempts with exit code {exit_code}.*"
f'error: "{error_msg if error_msg else "<empty>"}"'
if exit_code != 199
else 'failed with exit code 199.*error: "Not recognized"'
)
Expand Down Expand Up @@ -1269,6 +1270,21 @@ def mock_poll_once_by_bhist(*args, **kwargs):
assert job_id in driver._bhist_cache


@pytest.mark.integration_test
async def test_no_exception_when_no_access_to_bjobs_executable(
not_found_bjobs, caplog, job_name
):
"""The intent of this test is to ensure the driver will not
go down if the filesystem is temporarily flaky."""
driver = LsfDriver()
driver._poll_period = 0.01
Path("bin/bjobs").chmod(0x0) # Modify the bjobs from the fixture
await driver.submit(0, "sh", "-c", "echo", name=job_name)
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(driver.poll(), timeout=0.1)
assert "Permission denied" in caplog.text


@pytest.mark.integration_test
async def test_that_kill_before_submit_is_finished_works(tmp_path, monkeypatch, caplog):
"""This test asserts that it is possible to issue a kill command
Expand Down Expand Up @@ -1326,7 +1342,8 @@ async def test_that_kill_before_submit_is_finished_works(tmp_path, monkeypatch,
)
)
await asyncio.sleep(0.01) # Allow submit task to start executing
await driver.kill(0) # This will wait until the submit is done and then kill
# This will wait until the submit is done and then kill
await driver.kill(0)

async def finished(iens: int, returncode: int):
SIGTERM = 15
Expand Down
6 changes: 3 additions & 3 deletions tests/ert/unit_tests/test_run_path_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ def test_jobs_json_is_backed_up(make_run_path):
assert os.path.exists("simulations/realization-0/iter-0/jobs.json")
make_run_path(ert_config)
iter0_output_files = os.listdir("simulations/realization-0/iter-0/")
assert (
len([f for f in iter0_output_files if f.startswith("jobs.json")]) > 1
), "No backup created for jobs.json"
assert len([f for f in iter0_output_files if f.startswith("jobs.json")]) > 1, (
"No backup created for jobs.json"
)


@pytest.mark.usefixtures("use_tmpdir")
Expand Down
Loading