From ec7764fe83ba47c635121fc4d57739163796f2e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5vard=20Berland?= Date: Thu, 9 Jan 2025 08:52:14 +0100 Subject: [PATCH] Ignore OSErrors on subprocess call in poll() and bhist Pretend these kinds of issues are flaky. It is important not to crash on potentially intermittent failures in code that is rerun every 2 seconds. --- src/ert/scheduler/lsf_driver.py | 22 ++++++++++++------- src/ert/scheduler/openpbs_driver.py | 8 ++++--- src/ert/scheduler/slurm_driver.py | 5 +++-- .../scheduler/test_generic_driver.py | 11 ++++++---- .../unit_tests/scheduler/test_lsf_driver.py | 22 +++++++++++++++++-- .../ert/unit_tests/test_run_path_creation.py | 6 ++--- 6 files changed, 52 insertions(+), 22 deletions(-) diff --git a/src/ert/scheduler/lsf_driver.py b/src/ert/scheduler/lsf_driver.py index 469ec46c81c..c19464aad92 100644 --- a/src/ert/scheduler/lsf_driver.py +++ b/src/ert/scheduler/lsf_driver.py @@ -444,9 +444,10 @@ async def poll(self) -> None: stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) - except FileNotFoundError as e: + except OSError as e: logger.error(str(e)) - return + await asyncio.sleep(self._poll_period) + continue stdout, stderr = await process.communicate() if process.returncode: @@ -583,12 +584,17 @@ async def _poll_once_by_bhist( if time.time() - self._bhist_cache_timestamp < self._bhist_required_cache_age: return {} - process = await asyncio.create_subprocess_exec( - self._bhist_cmd, - *[str(job_id) for job_id in missing_job_ids], - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) + try: + process = await asyncio.create_subprocess_exec( + self._bhist_cmd, + *[str(job_id) for job_id in missing_job_ids], + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + except OSError as e: + logger.error(str(e)) + return {} + stdout, stderr = await process.communicate() if process.returncode: logger.error( diff --git a/src/ert/scheduler/openpbs_driver.py b/src/ert/scheduler/openpbs_driver.py index c8a6287bf79..86b6b954c3d 100644 --- a/src/ert/scheduler/openpbs_driver.py +++ b/src/ert/scheduler/openpbs_driver.py @@ -271,9 +271,10 @@ async def poll(self) -> None: stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) - except FileNotFoundError as e: + except OSError as e: logger.error(str(e)) - return + await asyncio.sleep(self._poll_period) + continue stdout, stderr = await process.communicate() if process.returncode not in {0, QSTAT_UNKNOWN_JOB_ID}: # Any unknown job ids will yield QSTAT_UNKNOWN_JOB_ID, but @@ -331,7 +332,8 @@ async def _process_job_update(self, job_id: str, new_state: AnyJob) -> None: iens, old_state = self._jobs[job_id] if isinstance(new_state, IgnoredJobstates): logger.debug( - f"Job ID '{job_id}' for {iens=} is of unknown job state '{new_state.job_state}'" + f"Job ID '{job_id}' for {iens=} is of " + f"unknown job state '{new_state.job_state}'" ) return diff --git a/src/ert/scheduler/slurm_driver.py b/src/ert/scheduler/slurm_driver.py index 3d5a4435543..d0cdde3cdc2 100644 --- a/src/ert/scheduler/slurm_driver.py +++ b/src/ert/scheduler/slurm_driver.py @@ -251,9 +251,10 @@ async def poll(self) -> None: stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) - except FileNotFoundError as e: + except OSError as e: logger.error(str(e)) - return + await asyncio.sleep(self._poll_period) + continue stdout, stderr = await process.communicate() if process.returncode: logger.warning( diff --git a/tests/ert/unit_tests/scheduler/test_generic_driver.py b/tests/ert/unit_tests/scheduler/test_generic_driver.py index 5e797e2827f..201a67da1c6 100644 --- a/tests/ert/unit_tests/scheduler/test_generic_driver.py +++ b/tests/ert/unit_tests/scheduler/test_generic_driver.py @@ -230,9 +230,10 @@ async def test_execute_with_retry_exits_on_filenotfounderror(driver: Driver, cap @pytest.mark.integration_test -async def test_poll_exits_on_filenotfounderror(driver: Driver, caplog): +async def test_poll_ignores_filenotfounderror(driver: Driver, caplog): if isinstance(driver, LocalDriver): pytest.skip("LocalDriver does not poll") + driver._poll_period = 0.01 caplog.set_level(logging.DEBUG) invalid_cmd = ["/usr/bin/foo", "bar"] driver._bjobs_cmd = invalid_cmd @@ -240,9 +241,11 @@ async def test_poll_exits_on_filenotfounderror(driver: Driver, caplog): driver._squeue = invalid_cmd driver._jobs = {"foo": "bar"} driver._non_finished_job_ids = ["foo"] - await driver.poll() + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(driver.poll(), timeout=0.1) - # We log a retry message every time we retry - assert "retry" not in str(caplog.text) + assert "retry" not in str(caplog.text), ( + "_execute_with_retry() should not be invoked in poll()" + ) assert "No such file or directory" in str(caplog.text) assert "/usr/bin/foo" in str(caplog.text) diff --git a/tests/ert/unit_tests/scheduler/test_lsf_driver.py b/tests/ert/unit_tests/scheduler/test_lsf_driver.py index 61b8b4c0364..6e469e91f3e 100644 --- a/tests/ert/unit_tests/scheduler/test_lsf_driver.py +++ b/tests/ert/unit_tests/scheduler/test_lsf_driver.py @@ -584,7 +584,9 @@ async def test_that_bsub_will_retry_and_fail( driver._max_bsub_attempts = 2 driver._sleep_time_between_cmd_retries = 0.2 match_str = ( - f'failed after 2 attempts with exit code {exit_code}.*error: "{error_msg if error_msg else ""}"' + f'failed after 2 attempts with exit code {exit_code}.*error: "{ + error_msg if error_msg else "" + }"' if exit_code != 199 else 'failed with exit code 199.*error: "Not recognized"' ) @@ -1269,6 +1271,21 @@ def mock_poll_once_by_bhist(*args, **kwargs): assert job_id in driver._bhist_cache +@pytest.mark.integration_test +async def test_no_exception_when_no_access_to_bjobs_executable( + not_found_bjobs, caplog, job_name +): + """The intent of this test is to ensure the driver will not + go down if the filesystem is temporarily flaky.""" + driver = LsfDriver() + driver._poll_period = 0.01 + Path("bin/bjobs").chmod(0x0) # Modify the bjobs from the fixture + await driver.submit(0, "sh", "-c", "echo", name=job_name) + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(driver.poll(), timeout=0.1) + assert "Permission denied" in caplog.text + + @pytest.mark.integration_test async def test_that_kill_before_submit_is_finished_works(tmp_path, monkeypatch, caplog): """This test asserts that it is possible to issue a kill command @@ -1326,7 +1343,8 @@ async def test_that_kill_before_submit_is_finished_works(tmp_path, monkeypatch, ) ) await asyncio.sleep(0.01) # Allow submit task to start executing - await driver.kill(0) # This will wait until the submit is done and then kill + # This will wait until the submit is done and then kill + await driver.kill(0) async def finished(iens: int, returncode: int): SIGTERM = 15 diff --git a/tests/ert/unit_tests/test_run_path_creation.py b/tests/ert/unit_tests/test_run_path_creation.py index 9e70ed4734e..356bd8c077a 100644 --- a/tests/ert/unit_tests/test_run_path_creation.py +++ b/tests/ert/unit_tests/test_run_path_creation.py @@ -94,9 +94,9 @@ def test_jobs_json_is_backed_up(make_run_path): assert os.path.exists("simulations/realization-0/iter-0/jobs.json") make_run_path(ert_config) iter0_output_files = os.listdir("simulations/realization-0/iter-0/") - assert ( - len([f for f in iter0_output_files if f.startswith("jobs.json")]) > 1 - ), "No backup created for jobs.json" + assert len([f for f in iter0_output_files if f.startswith("jobs.json")]) > 1, ( + "No backup created for jobs.json" + ) @pytest.mark.usefixtures("use_tmpdir")