Skip to content

Commit

Permalink
Support max_running in Scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
berland committed Dec 28, 2023
1 parent 211888d commit 8980a3f
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/ert/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ async def execute(
cancel_when_execute_is_done(self.driver.poll())

start = asyncio.Event()
sem = asyncio.BoundedSemaphore(self._max_running)
sem = asyncio.BoundedSemaphore(self._max_running or len(self._jobs))
for iens, job in self._jobs.items():
self._tasks[iens] = asyncio.create_task(
job(start, sem, self._max_submit)
Expand Down
33 changes: 33 additions & 0 deletions tests/unit_tests/scheduler/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,39 @@ async def wait():
assert timeouteventfound


@pytest.mark.parametrize("max_running", [0, 1, 2, 10])
async def test_max_running(max_running, mock_driver, storage, tmp_path):
currently_running = 0
max_running_observed = 0

async def wait():
nonlocal currently_running
currently_running += 1
await asyncio.sleep(0.01)
currently_running -= 1

ensemble_size = max_running * 3 if max_running > 0 else 10

ensemble = storage.create_experiment().create_ensemble(
name="foo", ensemble_size=ensemble_size
)
realizations = [
create_stub_realization(ensemble, tmp_path, iens)
for iens in range(ensemble_size)
]

sch = scheduler.Scheduler(
mock_driver(wait=wait), realizations, max_running=max_running
)
task = asyncio.create_task(sch.execute())
effective_max_running = max_running if max_running else ensemble_size
while not task.done():
max_running_observed = max(currently_running, max_running_observed)
assert currently_running <= effective_max_running
await asyncio.sleep(0)
assert max_running_observed == effective_max_running


@pytest.mark.timeout(6)
async def test_max_runtime_while_killing(realization, mock_driver):
wait_started = asyncio.Event()
Expand Down

0 comments on commit 8980a3f

Please sign in to comment.