From 57ffe2a4305dac818766e9896d8e85852646f03e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5vard=20Berland?= Date: Thu, 28 Dec 2023 12:31:06 +0100 Subject: [PATCH] Support submit_sleep in Scheduler Notes: * The test setup possibly allows flakyness * Picks SUBMIT_SLEEP from queue system configuration, but leans towards a future global setting for SUBMIT_SLEEP. --- src/ert/config/queue_config.py | 13 ++- .../ensemble_evaluator/_builder/_legacy.py | 1 + src/ert/scheduler/job.py | 2 + src/ert/scheduler/scheduler.py | 23 +++++ tests/unit_tests/scheduler/test_scheduler.py | 90 +++++++++++++++++++ 5 files changed, 128 insertions(+), 1 deletion(-) diff --git a/src/ert/config/queue_config.py b/src/ert/config/queue_config.py index 9000fa304ba..0019ccfe25f 100644 --- a/src/ert/config/queue_config.py +++ b/src/ert/config/queue_config.py @@ -32,6 +32,7 @@ class QueueConfig: job_script: str = shutil.which("job_dispatch.py") or "job_dispatch.py" max_submit: int = 2 + submit_sleep: float = 0.0 queue_system: QueueSystem = QueueSystem.LOCAL queue_options: Dict[QueueSystem, List[Tuple[str, str]]] = field( default_factory=dict @@ -48,6 +49,7 @@ def from_dict(cls, config_dict: ConfigDict) -> QueueConfig: ) job_script = job_script or "job_dispatch.py" max_submit: int = config_dict.get("MAX_SUBMIT", 2) + submit_sleep: float = config_dict.get("SUBMIT_SLEEP", 0.0) queue_options: Dict[QueueSystem, List[Tuple[str, str]]] = defaultdict(list) for queue_system, option_name, *values in config_dict.get("QUEUE_OPTION", []): if option_name not in VALID_QUEUE_OPTIONS[queue_system]: @@ -67,6 +69,12 @@ def from_dict(cls, config_dict: ConfigDict) -> QueueConfig: " usually provided by the site-configuration file, beware that" " you are effectively replacing the default value provided." ) + if ( + values + and option_name == "SUBMIT_SLEEP" + and selected_queue_system == queue_system + ): + submit_sleep = float(values[0]) for queue_system_val in queue_options: if queue_options[queue_system_val]: @@ -85,12 +93,15 @@ def from_dict(cls, config_dict: ConfigDict) -> QueueConfig: queue_options[selected_queue_system], ) - return QueueConfig(job_script, max_submit, selected_queue_system, queue_options) + return QueueConfig( + job_script, max_submit, submit_sleep, selected_queue_system, queue_options + ) def create_local_copy(self) -> QueueConfig: return QueueConfig( self.job_script, self.max_submit, + self.submit_sleep, QueueSystem.LOCAL, self.queue_options, ) diff --git a/src/ert/ensemble_evaluator/_builder/_legacy.py b/src/ert/ensemble_evaluator/_builder/_legacy.py index 39046ad2be3..c0735d32acd 100644 --- a/src/ert/ensemble_evaluator/_builder/_legacy.py +++ b/src/ert/ensemble_evaluator/_builder/_legacy.py @@ -202,6 +202,7 @@ async def _evaluate_inner( # pylint: disable=too-many-branches self.active_reals, max_submit=self._queue_config.max_submit, max_running=self._queue_config.max_running, + submit_sleep=self._queue_config.submit_sleep, ens_id=self.id_, ee_uri=self._config.dispatch_uri, ee_cert=self._config.cert, diff --git a/src/ert/scheduler/job.py b/src/ert/scheduler/job.py index 062a0702e5b..1a978bfba1d 100644 --- a/src/ert/scheduler/job.py +++ b/src/ert/scheduler/job.py @@ -92,6 +92,8 @@ async def _submit_and_run_once(self, sem: asyncio.BoundedSemaphore) -> None: timeout_task: Optional[asyncio.Task[None]] = None try: + if self._scheduler.submit_sleep_state: + await self._scheduler.submit_sleep_state.sleep_until_we_can_submit() await self._send(State.SUBMITTING) await self.driver.submit( self.real.iens, self.real.job_script, cwd=self.real.run_arg.runpath diff --git a/src/ert/scheduler/scheduler.py b/src/ert/scheduler/scheduler.py index e13d42b2b0a..c12468f4b8c 100644 --- a/src/ert/scheduler/scheduler.py +++ b/src/ert/scheduler/scheduler.py @@ -5,6 +5,7 @@ import logging import os import ssl +import time from collections import defaultdict from dataclasses import asdict from pathlib import Path @@ -39,6 +40,23 @@ class _JobsJson: experiment_id: Optional[str] +class SubmitSleeper: + _submit_sleep: float + _last_started: float + + def __init__(self, submit_sleep: float): + self._submit_sleep = submit_sleep + self._last_started = ( + time.time() - submit_sleep + ) # Allow the first to start immediately + + async def sleep_until_we_can_submit(self) -> None: + now = time.time() + next_start_time = max(self._last_started + self._submit_sleep, now) + self._last_started = next_start_time + await asyncio.sleep(max(0, next_start_time - now)) + + class Scheduler: def __init__( self, @@ -47,6 +65,7 @@ def __init__( *, max_submit: int = 1, max_running: int = 1, + submit_sleep: float = 0.0, ens_id: Optional[str] = None, ee_uri: Optional[str] = None, ee_cert: Optional[str] = None, @@ -57,6 +76,10 @@ def __init__( self.driver = driver self._tasks: MutableMapping[int, asyncio.Task[None]] = {} + self.submit_sleep_state: Optional[SubmitSleeper] = None + if submit_sleep > 0: + self.submit_sleep_state = SubmitSleeper(submit_sleep) + self._jobs: MutableMapping[int, Job] = { real.iens: Job(self, real) for real in (realizations or []) } diff --git a/tests/unit_tests/scheduler/test_scheduler.py b/tests/unit_tests/scheduler/test_scheduler.py index d0ca81584f3..8516f2e2e96 100644 --- a/tests/unit_tests/scheduler/test_scheduler.py +++ b/tests/unit_tests/scheduler/test_scheduler.py @@ -1,6 +1,8 @@ import asyncio import json +import random import shutil +import time from pathlib import Path from typing import List @@ -342,3 +344,91 @@ async def wait(iens): assert await sch.execute(min_required_realizations=5) == EVTYPE_ENSEMBLE_STOPPED assert killed_iens == [6, 7, 8, 9] + + +@pytest.mark.parametrize( + "submit_sleep, iens_stride, realization_runtime", + [(0, 1, 0.1), (0.1, 1, 0.1), (0.1, 1, 0), (0.1, 2, 0)], +) +async def test_submit_sleep( + submit_sleep, + iens_stride, # Gives sparse ensembles when > 1 + realization_runtime, + storage, + tmp_path, + mock_driver, +): + run_start_times: List[float] = [] + + async def wait(): + nonlocal run_start_times + run_start_times.append(time.time()) + await asyncio.sleep(realization_runtime) + + ensemble_size = 10 + + ensemble = storage.create_experiment().create_ensemble( + name="foo", ensemble_size=ensemble_size * iens_stride + ) + realizations = [ + create_stub_realization(ensemble, tmp_path, iens * iens_stride) + for iens in range(ensemble_size) + ] + + sch = scheduler.Scheduler( + mock_driver(wait=wait), realizations, submit_sleep=submit_sleep, max_running=0 + ) + await sch.execute() + + deltas = [ + next_start - start + for start, next_start in zip(run_start_times[:-1], run_start_times[1:]) + ] + assert min(deltas) >= submit_sleep * 0.8 + assert max(deltas) <= submit_sleep + 0.1 + + +@pytest.mark.parametrize( + "submit_sleep, realization_max_runtime, max_running", + [ + (0.01, 0.01, 1), + (0.01, 0.01, 10), + (0.01, 0.1, 5), + ], +) +async def test_submit_sleep_with_max_running( + submit_sleep, realization_max_runtime, max_running, storage, tmp_path, mock_driver +): + run_start_times: List[float] = [] + + async def wait(): + nonlocal run_start_times + run_start_times.append(time.time()) + # If the realization runtimes are constant, we will never get into + # the situation where we can start many realizations at the same moment + runtime = realization_max_runtime * random.random() + await asyncio.sleep(runtime) + + ensemble_size = 10 + + ensemble = storage.create_experiment().create_ensemble( + name="foo", ensemble_size=ensemble_size + ) + realizations = [ + create_stub_realization(ensemble, tmp_path, iens) + for iens in range(ensemble_size) + ] + + sch = scheduler.Scheduler( + mock_driver(wait=wait), + realizations, + submit_sleep=submit_sleep, + max_running=max_running, + ) + await sch.execute() + + deltas = [ + next_start - start + for start, next_start in zip(run_start_times[:-1], run_start_times[1:]) + ] + assert min(deltas) >= submit_sleep * 0.8