Skip to content

Commit

Permalink
Add test for long_running_jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
xjules committed Jan 2, 2024
1 parent 397f3ea commit 513c3e3
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 7 deletions.
4 changes: 4 additions & 0 deletions src/ert/scheduler/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def __init__(self, scheduler: Scheduler, real: Realization) -> None:
self._callback_status_msg: str = ""
self._requested_max_submit: Optional[int] = None
self._start_time: Optional[float] = None
self._end_time: Optional[float] = None

@property
def iens(self) -> int:
Expand All @@ -81,6 +82,8 @@ def driver(self) -> Driver:
@property
def running_duration(self) -> float:
if self._start_time:
if self._end_time:
return self._end_time - self._start_time
return time.time() - self._start_time
return -1

Expand Down Expand Up @@ -115,6 +118,7 @@ async def _submit_and_run_once(self, sem: asyncio.BoundedSemaphore) -> None:
)

if callback_status == LoadStatus.LOAD_SUCCESSFUL:
self._end_time = time.time()
await self._send(State.COMPLETED)
else:
assert callback_status in (
Expand Down
3 changes: 1 addition & 2 deletions src/ert/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ async def _stop_long_running_jobs(self, minimum_required_realizations: int) -> N
average_runtime = sum(
self._jobs[job_id].running_duration for job_id in completed_jobs
) / len(completed_jobs)

for job_id, task in self._tasks.items():
if (
self._jobs[job_id].running_duration
Expand All @@ -101,7 +100,7 @@ async def _stop_long_running_jobs(self, minimum_required_realizations: int) -> N
):
task.cancel()
await task
await asyncio.sleep(2)
await asyncio.sleep(0.1)

def set_realization(self, realization: Realization) -> None:
self._jobs[realization.iens] = Job(self, realization)
Expand Down
17 changes: 12 additions & 5 deletions tests/unit_tests/scheduler/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,26 @@ def __init__(self, init=None, wait=None, kill=None):
self._mock_wait = wait
self._mock_kill = kill

async def _init(self, *args, **kwargs):
async def _init(self, iens, *args, **kwargs):
if self._mock_init is not None:
await self._mock_init(*args, **kwargs)
return iens

async def _wait(self, *args):
async def _wait(self, iens):
if self._mock_wait is not None:
result = await self._mock_wait()
if self._mock_wait.__code__.co_argcount > 0:
result = await self._mock_wait(iens)
else:
result = await self._mock_wait()
return True if result is None else bool(result)
return True

async def _kill(self, *args):
async def _kill(self, iens, *args):
if self._mock_kill is not None:
await self._mock_kill()
if self._mock_kill.__code__.co_argcount > 0:
await self._mock_kill(iens)
else:
await self._mock_kill()


@pytest.fixture
Expand Down
33 changes: 33 additions & 0 deletions tests/unit_tests/scheduler/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,3 +270,36 @@ async def init(iens, *args, **kwargs):
assert sch.is_active()
await execute_task
assert not sch.is_active()


@pytest.mark.timeout(6)
async def test_that_long_running_jobs_were_stopped(storage, tmp_path, mock_driver):
killed_iens = []

async def kill(iens):
nonlocal killed_iens
killed_iens.append(iens)

async def wait(iens):
# all jobs with iens > 5 will sleep for 10 seconds and should be killed
if iens < 6:
await asyncio.sleep(0.5)
else:
await asyncio.sleep(10)
return True

ensemble_size = 10
ensemble = storage.create_experiment().create_ensemble(
name="foo", ensemble_size=ensemble_size
)
realizations = [
create_stub_realization(ensemble, tmp_path, iens)
for iens in range(ensemble_size)
]

sch = scheduler.Scheduler(realizations=realizations)

sch = scheduler.Scheduler(mock_driver(wait=wait, kill=kill), realizations)

assert await sch.execute(min_required_realizations=5) == EVTYPE_ENSEMBLE_STOPPED
assert killed_iens == [6, 7, 8, 9]

0 comments on commit 513c3e3

Please sign in to comment.