From 54e980eed32dc77bed99039065f0a3461f612f99 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5vard=20Berland?= Date: Thu, 28 Dec 2023 12:31:06 +0100 Subject: [PATCH] Support submit_sleep in Scheduler Notes: * The test setup possibly allows flakyness * Picks SUBMIT_SLEEP from queue system configuration, but leans towards a future global setting for SUBMIT_SLEEP. --- src/ert/config/queue_config.py | 9 +++- .../ensemble_evaluator/_builder/_legacy.py | 1 + src/ert/scheduler/scheduler.py | 15 +++++-- tests/unit_tests/scheduler/test_scheduler.py | 44 +++++++++++++++++++ 4 files changed, 65 insertions(+), 4 deletions(-) diff --git a/src/ert/config/queue_config.py b/src/ert/config/queue_config.py index 9000fa304ba..bdd6fe49bcd 100644 --- a/src/ert/config/queue_config.py +++ b/src/ert/config/queue_config.py @@ -32,6 +32,7 @@ class QueueConfig: job_script: str = shutil.which("job_dispatch.py") or "job_dispatch.py" max_submit: int = 2 + submit_sleep: float = 0 queue_system: QueueSystem = QueueSystem.LOCAL queue_options: Dict[QueueSystem, List[Tuple[str, str]]] = field( default_factory=dict @@ -48,6 +49,7 @@ def from_dict(cls, config_dict: ConfigDict) -> QueueConfig: ) job_script = job_script or "job_dispatch.py" max_submit: int = config_dict.get("MAX_SUBMIT", 2) + submit_sleep: float = config_dict.get("SUBMIT_SLEEP", 0.0) queue_options: Dict[QueueSystem, List[Tuple[str, str]]] = defaultdict(list) for queue_system, option_name, *values in config_dict.get("QUEUE_OPTION", []): if option_name not in VALID_QUEUE_OPTIONS[queue_system]: @@ -67,6 +69,8 @@ def from_dict(cls, config_dict: ConfigDict) -> QueueConfig: " usually provided by the site-configuration file, beware that" " you are effectively replacing the default value provided." ) + if option_name == "SUBMIT_SLEEP" and selected_queue_system == queue_system: + submit_sleep = float(values[0]) for queue_system_val in queue_options: if queue_options[queue_system_val]: @@ -85,12 +89,15 @@ def from_dict(cls, config_dict: ConfigDict) -> QueueConfig: queue_options[selected_queue_system], ) - return QueueConfig(job_script, max_submit, selected_queue_system, queue_options) + return QueueConfig( + job_script, max_submit, submit_sleep, selected_queue_system, queue_options + ) def create_local_copy(self) -> QueueConfig: return QueueConfig( self.job_script, self.max_submit, + self.submit_sleep, QueueSystem.LOCAL, self.queue_options, ) diff --git a/src/ert/ensemble_evaluator/_builder/_legacy.py b/src/ert/ensemble_evaluator/_builder/_legacy.py index 5a70bedacdc..288837684f4 100644 --- a/src/ert/ensemble_evaluator/_builder/_legacy.py +++ b/src/ert/ensemble_evaluator/_builder/_legacy.py @@ -202,6 +202,7 @@ async def _evaluate_inner( # pylint: disable=too-many-branches self.active_reals, max_submit=self._queue_config.max_submit, max_running=self._queue_config.max_running, + submit_sleep=self._queue_config.submit_sleep, ens_id=self.id_, ee_uri=self._config.dispatch_uri, ee_cert=self._config.cert, diff --git a/src/ert/scheduler/scheduler.py b/src/ert/scheduler/scheduler.py index 03917e0296a..92e21da4ef3 100644 --- a/src/ert/scheduler/scheduler.py +++ b/src/ert/scheduler/scheduler.py @@ -50,6 +50,7 @@ def __init__( *, max_submit: int = 1, max_running: int = 1, + submit_sleep: float = 0, ens_id: Optional[str] = None, ee_uri: Optional[str] = None, ee_cert: Optional[str] = None, @@ -68,6 +69,7 @@ def __init__( self._cancelled = False self._max_submit = max_submit self._max_running = max_running + self._submit_sleep = submit_sleep self._ee_uri = ee_uri self._ens_id = ens_id @@ -118,14 +120,21 @@ async def execute( cancel_when_execute_is_done(self._process_event_queue()) cancel_when_execute_is_done(self.driver.poll()) - start = asyncio.Event() + start_events: MutableMapping[int, asyncio.Event] = {} sem = asyncio.BoundedSemaphore(self._max_running or len(self._jobs)) for iens, job in self._jobs.items(): + start_events[iens] = asyncio.Event() self._tasks[iens] = asyncio.create_task( - job(start, sem, self._max_submit) + job(start_events[iens], sem, self._max_submit) ) - start.set() + async def _set_start_event(iens: int, sleep_time: float) -> None: + await asyncio.sleep(sleep_time) + start_events[iens].set() + + for idx, iens in enumerate(self._jobs.keys()): + asyncio.create_task(_set_start_event(iens, idx * self._submit_sleep)) + for task in self._tasks.values(): await task diff --git a/tests/unit_tests/scheduler/test_scheduler.py b/tests/unit_tests/scheduler/test_scheduler.py index c3113914714..774ad1cdd37 100644 --- a/tests/unit_tests/scheduler/test_scheduler.py +++ b/tests/unit_tests/scheduler/test_scheduler.py @@ -282,3 +282,47 @@ async def kill(): # The result from execute is that we were cancelled, not stopped # as if the timeout happened before kill_all_jobs() assert scheduler_task.result() == EVTYPE_ENSEMBLE_CANCELLED + + +@pytest.mark.parametrize( + "submit_sleep, iens_stride, realization_runtime, expected_max_running", + [(0, 1, 0.1, 10), (0.1, 1, 0.1, 2), (0.1, 2, 0.1, 2), (0.1, 3, 0.4, 5)], +) +async def test_submit_sleep( + submit_sleep, + iens_stride, + realization_runtime, + expected_max_running, + storage, + tmp_path, + mock_driver, +): + currently_running = 0 + max_running_observed = 0 + + async def wait(): + nonlocal currently_running, realization_runtime + currently_running += 1 + await asyncio.sleep(realization_runtime) + currently_running -= 1 + + ensemble_size = 10 + + ensemble = storage.create_experiment().create_ensemble( + name="foo", ensemble_size=ensemble_size * iens_stride + ) + realizations = [ + create_stub_realization(ensemble, tmp_path, iens * iens_stride) + for iens in range(ensemble_size) + ] + + sch = scheduler.Scheduler( + mock_driver(wait=wait), realizations, submit_sleep=submit_sleep, max_running=0 + ) + task = asyncio.create_task(sch.execute()) + + while not task.done(): + max_running_observed = max(currently_running, max_running_observed) + assert currently_running <= expected_max_running + await asyncio.sleep(0) + assert max_running_observed == expected_max_running