Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support submit sleep in scheduler #6858

Merged
merged 1 commit into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion src/ert/config/queue_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
class QueueConfig:
job_script: str = shutil.which("job_dispatch.py") or "job_dispatch.py"
max_submit: int = 2
submit_sleep: float = 0.0
queue_system: QueueSystem = QueueSystem.LOCAL
queue_options: Dict[QueueSystem, List[Tuple[str, str]]] = field(
default_factory=dict
Expand All @@ -48,6 +49,7 @@ def from_dict(cls, config_dict: ConfigDict) -> QueueConfig:
)
job_script = job_script or "job_dispatch.py"
max_submit: int = config_dict.get("MAX_SUBMIT", 2)
submit_sleep: float = config_dict.get("SUBMIT_SLEEP", 0.0)
jonathan-eq marked this conversation as resolved.
Show resolved Hide resolved
queue_options: Dict[QueueSystem, List[Tuple[str, str]]] = defaultdict(list)
for queue_system, option_name, *values in config_dict.get("QUEUE_OPTION", []):
if option_name not in VALID_QUEUE_OPTIONS[queue_system]:
Expand All @@ -67,6 +69,12 @@ def from_dict(cls, config_dict: ConfigDict) -> QueueConfig:
" usually provided by the site-configuration file, beware that"
" you are effectively replacing the default value provided."
)
if (
values
xjules marked this conversation as resolved.
Show resolved Hide resolved
and option_name == "SUBMIT_SLEEP"
and selected_queue_system == queue_system
):
submit_sleep = float(values[0])

for queue_system_val in queue_options:
if queue_options[queue_system_val]:
Expand All @@ -85,12 +93,15 @@ def from_dict(cls, config_dict: ConfigDict) -> QueueConfig:
queue_options[selected_queue_system],
)

return QueueConfig(job_script, max_submit, selected_queue_system, queue_options)
return QueueConfig(
job_script, max_submit, submit_sleep, selected_queue_system, queue_options
)

def create_local_copy(self) -> QueueConfig:
return QueueConfig(
self.job_script,
self.max_submit,
self.submit_sleep,
QueueSystem.LOCAL,
self.queue_options,
)
Expand Down
1 change: 1 addition & 0 deletions src/ert/ensemble_evaluator/_builder/_legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ async def _evaluate_inner( # pylint: disable=too-many-branches
self.active_reals,
max_submit=self._queue_config.max_submit,
max_running=self._queue_config.max_running,
submit_sleep=self._queue_config.submit_sleep,
ens_id=self.id_,
ee_uri=self._config.dispatch_uri,
ee_cert=self._config.cert,
Expand Down
2 changes: 2 additions & 0 deletions src/ert/scheduler/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ async def _submit_and_run_once(self, sem: asyncio.BoundedSemaphore) -> None:
timeout_task: Optional[asyncio.Task[None]] = None

try:
if self._scheduler.submit_sleep_state:
await self._scheduler.submit_sleep_state.sleep_until_we_can_submit()
await self._send(State.SUBMITTING)
await self.driver.submit(
self.real.iens, self.real.job_script, cwd=self.real.run_arg.runpath
Expand Down
23 changes: 23 additions & 0 deletions src/ert/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import os
import ssl
import time
from collections import defaultdict
from dataclasses import asdict
from pathlib import Path
Expand Down Expand Up @@ -39,6 +40,23 @@ class _JobsJson:
experiment_id: Optional[str]


class SubmitSleeper:
_submit_sleep: float
_last_started: float

def __init__(self, submit_sleep: float):
self._submit_sleep = submit_sleep
self._last_started = (
time.time() - submit_sleep
) # Allow the first to start immediately

async def sleep_until_we_can_submit(self) -> None:
now = time.time()
next_start_time = max(self._last_started + self._submit_sleep, now)
self._last_started = next_start_time
await asyncio.sleep(max(0, next_start_time - now))


class Scheduler:
def __init__(
self,
Expand All @@ -47,6 +65,7 @@ def __init__(
*,
max_submit: int = 1,
max_running: int = 1,
submit_sleep: float = 0.0,
ens_id: Optional[str] = None,
ee_uri: Optional[str] = None,
ee_cert: Optional[str] = None,
Expand All @@ -57,6 +76,10 @@ def __init__(
self.driver = driver
self._tasks: MutableMapping[int, asyncio.Task[None]] = {}

self.submit_sleep_state: Optional[SubmitSleeper] = None
if submit_sleep > 0:
self.submit_sleep_state = SubmitSleeper(submit_sleep)

self._jobs: MutableMapping[int, Job] = {
real.iens: Job(self, real) for real in (realizations or [])
}
Expand Down
90 changes: 90 additions & 0 deletions tests/unit_tests/scheduler/test_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import asyncio
import json
import random
import shutil
import time
from pathlib import Path
from typing import List

Expand Down Expand Up @@ -342,3 +344,91 @@ async def wait(iens):

assert await sch.execute(min_required_realizations=5) == EVTYPE_ENSEMBLE_STOPPED
assert killed_iens == [6, 7, 8, 9]


@pytest.mark.parametrize(
"submit_sleep, iens_stride, realization_runtime",
[(0, 1, 0.1), (0.1, 1, 0.1), (0.1, 1, 0), (0.1, 2, 0)],
)
async def test_submit_sleep(
berland marked this conversation as resolved.
Show resolved Hide resolved
submit_sleep,
iens_stride, # Gives sparse ensembles when > 1
realization_runtime,
storage,
tmp_path,
mock_driver,
):
run_start_times: List[float] = []

async def wait():
nonlocal run_start_times
run_start_times.append(time.time())
await asyncio.sleep(realization_runtime)

ensemble_size = 10

ensemble = storage.create_experiment().create_ensemble(
name="foo", ensemble_size=ensemble_size * iens_stride
)
realizations = [
create_stub_realization(ensemble, tmp_path, iens * iens_stride)
for iens in range(ensemble_size)
]

sch = scheduler.Scheduler(
mock_driver(wait=wait), realizations, submit_sleep=submit_sleep, max_running=0
)
await sch.execute()

deltas = [
next_start - start
for start, next_start in zip(run_start_times[:-1], run_start_times[1:])
]
assert min(deltas) >= submit_sleep * 0.8
assert max(deltas) <= submit_sleep + 0.1


@pytest.mark.parametrize(
"submit_sleep, realization_max_runtime, max_running",
[
(0.01, 0.01, 1),
(0.01, 0.01, 10),
(0.01, 0.1, 5),
],
)
async def test_submit_sleep_with_max_running(
submit_sleep, realization_max_runtime, max_running, storage, tmp_path, mock_driver
):
run_start_times: List[float] = []

async def wait():
nonlocal run_start_times
run_start_times.append(time.time())
# If the realization runtimes are constant, we will never get into
# the situation where we can start many realizations at the same moment
runtime = realization_max_runtime * random.random()
await asyncio.sleep(runtime)

ensemble_size = 10

ensemble = storage.create_experiment().create_ensemble(
name="foo", ensemble_size=ensemble_size
)
realizations = [
create_stub_realization(ensemble, tmp_path, iens)
for iens in range(ensemble_size)
]

sch = scheduler.Scheduler(
mock_driver(wait=wait),
realizations,
submit_sleep=submit_sleep,
max_running=max_running,
)
await sch.execute()

deltas = [
next_start - start
for start, next_start in zip(run_start_times[:-1], run_start_times[1:])
]
assert min(deltas) >= submit_sleep * 0.8
Loading