Skip to content

Commit

Permalink
Support submit_sleep in Scheduler
Browse files Browse the repository at this point in the history
Notes:
* The test setup possibly allows flakyness
* Picks SUBMIT_SLEEP from queue system configuration,
  but leans towards a future global setting for SUBMIT_SLEEP.
  • Loading branch information
berland committed Jan 16, 2024
1 parent 7ef6fd6 commit 57ffe2a
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 1 deletion.
13 changes: 12 additions & 1 deletion src/ert/config/queue_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
class QueueConfig:
job_script: str = shutil.which("job_dispatch.py") or "job_dispatch.py"
max_submit: int = 2
submit_sleep: float = 0.0
queue_system: QueueSystem = QueueSystem.LOCAL
queue_options: Dict[QueueSystem, List[Tuple[str, str]]] = field(
default_factory=dict
Expand All @@ -48,6 +49,7 @@ def from_dict(cls, config_dict: ConfigDict) -> QueueConfig:
)
job_script = job_script or "job_dispatch.py"
max_submit: int = config_dict.get("MAX_SUBMIT", 2)
submit_sleep: float = config_dict.get("SUBMIT_SLEEP", 0.0)
queue_options: Dict[QueueSystem, List[Tuple[str, str]]] = defaultdict(list)
for queue_system, option_name, *values in config_dict.get("QUEUE_OPTION", []):
if option_name not in VALID_QUEUE_OPTIONS[queue_system]:
Expand All @@ -67,6 +69,12 @@ def from_dict(cls, config_dict: ConfigDict) -> QueueConfig:
" usually provided by the site-configuration file, beware that"
" you are effectively replacing the default value provided."
)
if (
values
and option_name == "SUBMIT_SLEEP"
and selected_queue_system == queue_system
):
submit_sleep = float(values[0])

for queue_system_val in queue_options:
if queue_options[queue_system_val]:
Expand All @@ -85,12 +93,15 @@ def from_dict(cls, config_dict: ConfigDict) -> QueueConfig:
queue_options[selected_queue_system],
)

return QueueConfig(job_script, max_submit, selected_queue_system, queue_options)
return QueueConfig(
job_script, max_submit, submit_sleep, selected_queue_system, queue_options
)

def create_local_copy(self) -> QueueConfig:
return QueueConfig(
self.job_script,
self.max_submit,
self.submit_sleep,
QueueSystem.LOCAL,
self.queue_options,
)
Expand Down
1 change: 1 addition & 0 deletions src/ert/ensemble_evaluator/_builder/_legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ async def _evaluate_inner( # pylint: disable=too-many-branches
self.active_reals,
max_submit=self._queue_config.max_submit,
max_running=self._queue_config.max_running,
submit_sleep=self._queue_config.submit_sleep,
ens_id=self.id_,
ee_uri=self._config.dispatch_uri,
ee_cert=self._config.cert,
Expand Down
2 changes: 2 additions & 0 deletions src/ert/scheduler/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ async def _submit_and_run_once(self, sem: asyncio.BoundedSemaphore) -> None:
timeout_task: Optional[asyncio.Task[None]] = None

try:
if self._scheduler.submit_sleep_state:
await self._scheduler.submit_sleep_state.sleep_until_we_can_submit()
await self._send(State.SUBMITTING)
await self.driver.submit(
self.real.iens, self.real.job_script, cwd=self.real.run_arg.runpath
Expand Down
23 changes: 23 additions & 0 deletions src/ert/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import os
import ssl
import time
from collections import defaultdict
from dataclasses import asdict
from pathlib import Path
Expand Down Expand Up @@ -39,6 +40,23 @@ class _JobsJson:
experiment_id: Optional[str]


class SubmitSleeper:
_submit_sleep: float
_last_started: float

def __init__(self, submit_sleep: float):
self._submit_sleep = submit_sleep
self._last_started = (
time.time() - submit_sleep
) # Allow the first to start immediately

async def sleep_until_we_can_submit(self) -> None:
now = time.time()
next_start_time = max(self._last_started + self._submit_sleep, now)
self._last_started = next_start_time
await asyncio.sleep(max(0, next_start_time - now))


class Scheduler:
def __init__(
self,
Expand All @@ -47,6 +65,7 @@ def __init__(
*,
max_submit: int = 1,
max_running: int = 1,
submit_sleep: float = 0.0,
ens_id: Optional[str] = None,
ee_uri: Optional[str] = None,
ee_cert: Optional[str] = None,
Expand All @@ -57,6 +76,10 @@ def __init__(
self.driver = driver
self._tasks: MutableMapping[int, asyncio.Task[None]] = {}

self.submit_sleep_state: Optional[SubmitSleeper] = None
if submit_sleep > 0:
self.submit_sleep_state = SubmitSleeper(submit_sleep)

self._jobs: MutableMapping[int, Job] = {
real.iens: Job(self, real) for real in (realizations or [])
}
Expand Down
90 changes: 90 additions & 0 deletions tests/unit_tests/scheduler/test_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import asyncio
import json
import random
import shutil
import time
from pathlib import Path
from typing import List

Expand Down Expand Up @@ -342,3 +344,91 @@ async def wait(iens):

assert await sch.execute(min_required_realizations=5) == EVTYPE_ENSEMBLE_STOPPED
assert killed_iens == [6, 7, 8, 9]


@pytest.mark.parametrize(
"submit_sleep, iens_stride, realization_runtime",
[(0, 1, 0.1), (0.1, 1, 0.1), (0.1, 1, 0), (0.1, 2, 0)],
)
async def test_submit_sleep(
submit_sleep,
iens_stride, # Gives sparse ensembles when > 1
realization_runtime,
storage,
tmp_path,
mock_driver,
):
run_start_times: List[float] = []

async def wait():
nonlocal run_start_times
run_start_times.append(time.time())
await asyncio.sleep(realization_runtime)

ensemble_size = 10

ensemble = storage.create_experiment().create_ensemble(
name="foo", ensemble_size=ensemble_size * iens_stride
)
realizations = [
create_stub_realization(ensemble, tmp_path, iens * iens_stride)
for iens in range(ensemble_size)
]

sch = scheduler.Scheduler(
mock_driver(wait=wait), realizations, submit_sleep=submit_sleep, max_running=0
)
await sch.execute()

deltas = [
next_start - start
for start, next_start in zip(run_start_times[:-1], run_start_times[1:])
]
assert min(deltas) >= submit_sleep * 0.8
assert max(deltas) <= submit_sleep + 0.1


@pytest.mark.parametrize(
"submit_sleep, realization_max_runtime, max_running",
[
(0.01, 0.01, 1),
(0.01, 0.01, 10),
(0.01, 0.1, 5),
],
)
async def test_submit_sleep_with_max_running(
submit_sleep, realization_max_runtime, max_running, storage, tmp_path, mock_driver
):
run_start_times: List[float] = []

async def wait():
nonlocal run_start_times
run_start_times.append(time.time())
# If the realization runtimes are constant, we will never get into
# the situation where we can start many realizations at the same moment
runtime = realization_max_runtime * random.random()
await asyncio.sleep(runtime)

ensemble_size = 10

ensemble = storage.create_experiment().create_ensemble(
name="foo", ensemble_size=ensemble_size
)
realizations = [
create_stub_realization(ensemble, tmp_path, iens)
for iens in range(ensemble_size)
]

sch = scheduler.Scheduler(
mock_driver(wait=wait),
realizations,
submit_sleep=submit_sleep,
max_running=max_running,
)
await sch.execute()

deltas = [
next_start - start
for start, next_start in zip(run_start_times[:-1], run_start_times[1:])
]
assert min(deltas) >= submit_sleep * 0.8

0 comments on commit 57ffe2a

Please sign in to comment.