From fde22e29e8b5bb2bf867100d63af4ec57b477ee6 Mon Sep 17 00:00:00 2001 From: xjules Date: Thu, 11 Apr 2024 13:25:34 +0200 Subject: [PATCH] Make sure to not cancel the job when job is done - Test that failed realization will not be cancelled --- src/ert/scheduler/scheduler.py | 26 ++++++++------- tests/unit_tests/scheduler/test_scheduler.py | 34 ++++++++++++++++++++ 2 files changed, 48 insertions(+), 12 deletions(-) diff --git a/src/ert/scheduler/scheduler.py b/src/ert/scheduler/scheduler.py index 0017bb3c4f5..4ef44121dbf 100644 --- a/src/ert/scheduler/scheduler.py +++ b/src/ert/scheduler/scheduler.py @@ -125,13 +125,15 @@ async def cancel_all_jobs(self) -> None: async def _cancel_job_tasks(self) -> None: for task in self._job_tasks.values(): - task.cancel() - with suppress(asyncio.TimeoutError): - await asyncio.wait( - self._job_tasks.values(), - timeout=1.0, - return_when=asyncio.ALL_COMPLETED, - ) + if not task.done(): + task.cancel() + _, pending = await asyncio.wait( + self._job_tasks.values(), + timeout=1.0, + return_when=asyncio.ALL_COMPLETED, + ) + for task in pending: + logger.error(f"Task {task.get_name()} was not killed properly!") async def _update_avg_job_runtime(self) -> None: while True: @@ -288,11 +290,11 @@ async def execute( await self.driver.finish() for scheduling_task in scheduling_tasks: scheduling_task.cancel() - with suppress(asyncio.CancelledError): - await asyncio.wait( - scheduling_tasks, - return_when=asyncio.ALL_COMPLETED, - ) + # We discard exceptions when cancelling the scheduling tasks + await asyncio.gather( + *scheduling_tasks, + return_exceptions=True, + ) if self._cancelled: logger.debug("scheduler cancelled, stopping jobs...") diff --git a/tests/unit_tests/scheduler/test_scheduler.py b/tests/unit_tests/scheduler/test_scheduler.py index ab1d30abea8..0687e9a2f10 100644 --- a/tests/unit_tests/scheduler/test_scheduler.py +++ b/tests/unit_tests/scheduler/test_scheduler.py @@ -351,6 +351,40 @@ async def init(iens, *args, **kwargs): assert not sch.is_active() +@pytest.mark.timeout(6) +@pytest.mark.parametrize("should_fail", [True, False]) +async def test_that_failed_realization_will_not_be_cancelled( + should_fail, realization, mock_driver +): + started = asyncio.Event() + kill_called = False + + async def wait(iens): + started.set() + if should_fail: + # the job failed with exit code 1 + return 1 + await asyncio.sleep(100) + return 0 + + async def kill(iens): + nonlocal kill_called + kill_called = True + + driver = mock_driver(wait=wait, kill=kill) + sch = scheduler.Scheduler(driver, [realization]) + + scheduler_task = asyncio.create_task(sch.execute()) + + await started.wait() + await sch.cancel_all_jobs() + + await scheduler_task + assert scheduler_task.result() == EVTYPE_ENSEMBLE_CANCELLED + + assert kill_called == (not should_fail) + + @pytest.mark.timeout(6) async def test_that_long_running_jobs_were_stopped(storage, tmp_path, mock_driver): killed_iens = []