diff --git a/src/ert/scheduler/scheduler.py b/src/ert/scheduler/scheduler.py index deadf123935..03917e0296a 100644 --- a/src/ert/scheduler/scheduler.py +++ b/src/ert/scheduler/scheduler.py @@ -119,7 +119,7 @@ async def execute( cancel_when_execute_is_done(self.driver.poll()) start = asyncio.Event() - sem = asyncio.BoundedSemaphore(self._max_running) + sem = asyncio.BoundedSemaphore(self._max_running or len(self._jobs)) for iens, job in self._jobs.items(): self._tasks[iens] = asyncio.create_task( job(start, sem, self._max_submit) diff --git a/tests/unit_tests/scheduler/test_scheduler.py b/tests/unit_tests/scheduler/test_scheduler.py index c32588db69f..c3113914714 100644 --- a/tests/unit_tests/scheduler/test_scheduler.py +++ b/tests/unit_tests/scheduler/test_scheduler.py @@ -204,6 +204,39 @@ async def wait(): assert timeouteventfound +@pytest.mark.parametrize("max_running", [0, 1, 2, 10]) +async def test_max_running(max_running, mock_driver, storage, tmp_path): + currently_running = 0 + max_running_observed = 0 + + async def wait(): + nonlocal currently_running + currently_running += 1 + await asyncio.sleep(0.01) + currently_running -= 1 + + ensemble_size = max_running * 3 if max_running > 0 else 10 + + ensemble = storage.create_experiment().create_ensemble( + name="foo", ensemble_size=ensemble_size + ) + realizations = [ + create_stub_realization(ensemble, tmp_path, iens) + for iens in range(ensemble_size) + ] + + sch = scheduler.Scheduler( + mock_driver(wait=wait), realizations, max_running=max_running + ) + task = asyncio.create_task(sch.execute()) + effective_max_running = max_running if max_running else ensemble_size + while not task.done(): + max_running_observed = max(currently_running, max_running_observed) + assert currently_running <= effective_max_running + await asyncio.sleep(0) + assert max_running_observed == effective_max_running + + @pytest.mark.timeout(6) async def test_max_runtime_while_killing(realization, mock_driver): wait_started = asyncio.Event()