Skip to content

Commit

Permalink
Introduce driver.finish that makes sure all the tasks started have be…
Browse files Browse the repository at this point in the history
…en awaited.
  • Loading branch information
xjules committed Dec 15, 2023
1 parent 3c5e075 commit 366aaf2
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 10 deletions.
8 changes: 2 additions & 6 deletions src/ert/scheduler/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,5 @@ async def poll(self) -> None:
"""Poll for new job events"""

@abstractmethod
async def finish(self, iens: int) -> None:
"""wait until the job / realization execution is complete.
Args:
iens: Realization number.
"""
async def finish(self) -> None:
"""make sure that all the jobs / realizations are complete."""
2 changes: 0 additions & 2 deletions src/ert/scheduler/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,6 @@ async def _submit_and_run_once(self, sem: asyncio.BoundedSemaphore) -> None:
while not self.returncode.done():
await asyncio.sleep(0.01)
returncode = await self.returncode
# we need to make sure that the task has finished too
await self.driver.finish(self.real.iens)

if (
returncode == 0
Expand Down
7 changes: 5 additions & 2 deletions src/ert/scheduler/local_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,21 @@ def __init__(self) -> None:
self._tasks: MutableMapping[int, asyncio.Task[None]] = {}

async def submit(self, iens: int, executable: str, /, *args: str, cwd: str) -> None:
await self.kill(iens)
self._tasks[iens] = asyncio.create_task(
self._wait_until_finish(iens, executable, *args, cwd=cwd)
)

async def kill(self, iens: int) -> None:
try:
self._tasks[iens].cancel()
await self._tasks[iens]
del self._tasks[iens]
except KeyError:
return

async def finish(self, iens: int) -> None:
await self._tasks[iens]
async def finish(self) -> None:
await asyncio.gather(*self._tasks.values())

async def _wait_until_finish(
self, iens: int, executable: str, /, *args: str, cwd: str
Expand Down
2 changes: 2 additions & 0 deletions src/ert/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ async def execute(
for task in self._tasks.values():
await task

await self.driver.finish()

if self._cancelled:
return EVTYPE_ENSEMBLE_CANCELLED

Expand Down

0 comments on commit 366aaf2

Please sign in to comment.