diff --git a/src/ert/scheduler/driver.py b/src/ert/scheduler/driver.py index 8d90ee0faf8..7c15c51fe9b 100644 --- a/src/ert/scheduler/driver.py +++ b/src/ert/scheduler/driver.py @@ -47,9 +47,5 @@ async def poll(self) -> None: """Poll for new job events""" @abstractmethod - async def finish(self, iens: int) -> None: - """wait until the job / realization execution is complete. - - Args: - iens: Realization number. - """ + async def finish(self) -> None: + """make sure that all the jobs / realizations are complete.""" diff --git a/src/ert/scheduler/job.py b/src/ert/scheduler/job.py index d6b0aeef36e..b1a2c2f33d5 100644 --- a/src/ert/scheduler/job.py +++ b/src/ert/scheduler/job.py @@ -80,8 +80,6 @@ async def _submit_and_run_once(self, sem: asyncio.BoundedSemaphore) -> None: while not self.returncode.done(): await asyncio.sleep(0.01) returncode = await self.returncode - # we need to make sure that the task has finished too - await self.driver.finish(self.real.iens) if ( returncode == 0 diff --git a/src/ert/scheduler/local_driver.py b/src/ert/scheduler/local_driver.py index 371270d7a1f..e88ae5453be 100644 --- a/src/ert/scheduler/local_driver.py +++ b/src/ert/scheduler/local_driver.py @@ -13,6 +13,7 @@ def __init__(self) -> None: self._tasks: MutableMapping[int, asyncio.Task[None]] = {} async def submit(self, iens: int, executable: str, /, *args: str, cwd: str) -> None: + await self.kill(iens) self._tasks[iens] = asyncio.create_task( self._wait_until_finish(iens, executable, *args, cwd=cwd) ) @@ -20,11 +21,13 @@ async def submit(self, iens: int, executable: str, /, *args: str, cwd: str) -> N async def kill(self, iens: int) -> None: try: self._tasks[iens].cancel() + await self._tasks[iens] + del self._tasks[iens] except KeyError: return - async def finish(self, iens: int) -> None: - await self._tasks[iens] + async def finish(self) -> None: + await asyncio.gather(*self._tasks.values()) async def _wait_until_finish( self, iens: int, executable: str, /, *args: str, cwd: str diff --git a/src/ert/scheduler/scheduler.py b/src/ert/scheduler/scheduler.py index 0b08f6dff0f..d0a2bbc5388 100644 --- a/src/ert/scheduler/scheduler.py +++ b/src/ert/scheduler/scheduler.py @@ -138,6 +138,8 @@ async def execute( for task in self._tasks.values(): await task + await self.driver.finish() + if self._cancelled: return EVTYPE_ENSEMBLE_CANCELLED