Skip to content

Commit

Permalink
Make sure to not cancel the job when job is done
Browse files Browse the repository at this point in the history
- Test that failed realization will not be cancelled
  • Loading branch information
xjules authored and berland committed Apr 25, 2024
1 parent 4f9da11 commit fde22e2
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 12 deletions.
26 changes: 14 additions & 12 deletions src/ert/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,15 @@ async def cancel_all_jobs(self) -> None:

async def _cancel_job_tasks(self) -> None:
for task in self._job_tasks.values():
task.cancel()
with suppress(asyncio.TimeoutError):
await asyncio.wait(
self._job_tasks.values(),
timeout=1.0,
return_when=asyncio.ALL_COMPLETED,
)
if not task.done():
task.cancel()
_, pending = await asyncio.wait(
self._job_tasks.values(),
timeout=1.0,
return_when=asyncio.ALL_COMPLETED,
)
for task in pending:
logger.error(f"Task {task.get_name()} was not killed properly!")

async def _update_avg_job_runtime(self) -> None:
while True:
Expand Down Expand Up @@ -288,11 +290,11 @@ async def execute(
await self.driver.finish()
for scheduling_task in scheduling_tasks:
scheduling_task.cancel()
with suppress(asyncio.CancelledError):
await asyncio.wait(
scheduling_tasks,
return_when=asyncio.ALL_COMPLETED,
)
# We discard exceptions when cancelling the scheduling tasks
await asyncio.gather(
*scheduling_tasks,
return_exceptions=True,
)

if self._cancelled:
logger.debug("scheduler cancelled, stopping jobs...")
Expand Down
34 changes: 34 additions & 0 deletions tests/unit_tests/scheduler/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,40 @@ async def init(iens, *args, **kwargs):
assert not sch.is_active()


@pytest.mark.timeout(6)
@pytest.mark.parametrize("should_fail", [True, False])
async def test_that_failed_realization_will_not_be_cancelled(
should_fail, realization, mock_driver
):
started = asyncio.Event()
kill_called = False

async def wait(iens):
started.set()
if should_fail:
# the job failed with exit code 1
return 1
await asyncio.sleep(100)
return 0

async def kill(iens):
nonlocal kill_called
kill_called = True

driver = mock_driver(wait=wait, kill=kill)
sch = scheduler.Scheduler(driver, [realization])

scheduler_task = asyncio.create_task(sch.execute())

await started.wait()
await sch.cancel_all_jobs()

await scheduler_task
assert scheduler_task.result() == EVTYPE_ENSEMBLE_CANCELLED

assert kill_called == (not should_fail)


@pytest.mark.timeout(6)
async def test_that_long_running_jobs_were_stopped(storage, tmp_path, mock_driver):
killed_iens = []
Expand Down

0 comments on commit fde22e2

Please sign in to comment.