From 366aaf2261fd3def329555f7d05b8ff91c2ea570 Mon Sep 17 00:00:00 2001 From: Julius Parulek Date: Fri, 15 Dec 2023 11:53:24 +0100 Subject: [PATCH] Introduce driver.finish that makes sure all the tasks started have been awaited. --- src/ert/scheduler/driver.py | 8 ++------ src/ert/scheduler/job.py | 2 -- src/ert/scheduler/local_driver.py | 7 +++++-- src/ert/scheduler/scheduler.py | 2 ++ 4 files changed, 9 insertions(+), 10 deletions(-) diff --git a/src/ert/scheduler/driver.py b/src/ert/scheduler/driver.py index 8d90ee0faf8..7c15c51fe9b 100644 --- a/src/ert/scheduler/driver.py +++ b/src/ert/scheduler/driver.py @@ -47,9 +47,5 @@ async def poll(self) -> None: """Poll for new job events""" @abstractmethod - async def finish(self, iens: int) -> None: - """wait until the job / realization execution is complete. - - Args: - iens: Realization number. - """ + async def finish(self) -> None: + """make sure that all the jobs / realizations are complete.""" diff --git a/src/ert/scheduler/job.py b/src/ert/scheduler/job.py index d6b0aeef36e..b1a2c2f33d5 100644 --- a/src/ert/scheduler/job.py +++ b/src/ert/scheduler/job.py @@ -80,8 +80,6 @@ async def _submit_and_run_once(self, sem: asyncio.BoundedSemaphore) -> None: while not self.returncode.done(): await asyncio.sleep(0.01) returncode = await self.returncode - # we need to make sure that the task has finished too - await self.driver.finish(self.real.iens) if ( returncode == 0 diff --git a/src/ert/scheduler/local_driver.py b/src/ert/scheduler/local_driver.py index 371270d7a1f..e88ae5453be 100644 --- a/src/ert/scheduler/local_driver.py +++ b/src/ert/scheduler/local_driver.py @@ -13,6 +13,7 @@ def __init__(self) -> None: self._tasks: MutableMapping[int, asyncio.Task[None]] = {} async def submit(self, iens: int, executable: str, /, *args: str, cwd: str) -> None: + await self.kill(iens) self._tasks[iens] = asyncio.create_task( self._wait_until_finish(iens, executable, *args, cwd=cwd) ) @@ -20,11 +21,13 @@ async def submit(self, iens: int, executable: str, /, *args: str, cwd: str) -> N async def kill(self, iens: int) -> None: try: self._tasks[iens].cancel() + await self._tasks[iens] + del self._tasks[iens] except KeyError: return - async def finish(self, iens: int) -> None: - await self._tasks[iens] + async def finish(self) -> None: + await asyncio.gather(*self._tasks.values()) async def _wait_until_finish( self, iens: int, executable: str, /, *args: str, cwd: str diff --git a/src/ert/scheduler/scheduler.py b/src/ert/scheduler/scheduler.py index 0b08f6dff0f..d0a2bbc5388 100644 --- a/src/ert/scheduler/scheduler.py +++ b/src/ert/scheduler/scheduler.py @@ -138,6 +138,8 @@ async def execute( for task in self._tasks.values(): await task + await self.driver.finish() + if self._cancelled: return EVTYPE_ENSEMBLE_CANCELLED