Skip to content

Commit

Permalink
very wip
Browse files Browse the repository at this point in the history
  • Loading branch information
berland committed Jan 4, 2024
1 parent e13e47c commit f20b9fe
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 10 deletions.
2 changes: 1 addition & 1 deletion src/ert/config/queue_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
class QueueConfig:
job_script: str = shutil.which("job_dispatch.py") or "job_dispatch.py"
max_submit: int = 2
submit_sleep: float = 0
submit_sleep: float = 0.0
queue_system: QueueSystem = QueueSystem.LOCAL
queue_options: Dict[QueueSystem, List[Tuple[str, str]]] = field(
default_factory=dict
Expand Down
3 changes: 2 additions & 1 deletion src/ert/scheduler/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ async def _submit_and_run_once(self, sem: asyncio.BoundedSemaphore) -> None:
timeout_task: Optional[asyncio.Task[None]] = None

try:
await self._scheduler.submit_sleep_state.sleep_until_we_can_submit()
await self._send(State.SUBMITTING)

await self.driver.submit(
self.real.iens, self.real.job_script, cwd=self.real.run_arg.runpath
)
Expand Down Expand Up @@ -146,7 +148,6 @@ async def __call__(
) -> None:
self._requested_max_submit = max_submit
await start.wait()

for attempt in range(max_submit):
await self._submit_and_run_once(sem)

Expand Down
27 changes: 24 additions & 3 deletions src/ert/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import os
import ssl
import time
from collections import defaultdict
from dataclasses import asdict
from pathlib import Path
Expand Down Expand Up @@ -39,6 +40,26 @@ class _JobsJson:
experiment_id: Optional[str]


class SubmitSleeper:
_submit_sleep: float
_last_started: float

def __init__(self, submit_sleep: float):
self._submit_sleep = submit_sleep
self._last_started = time.time() - submit_sleep

async def sleep_until_we_can_submit(self):
now = time.time()
if now - self._last_started > self._submit_sleep:
print("no need to wait, hooray")
self._last_started = now
return
next_allowed_start = now + (now - self._last_started - self._submit_sleep)
self._last_started += self._submit_sleep # too much..
print(f"Sleeping, {self._last_started=}")
await asyncio.sleep(self._submit_sleep)


class Scheduler:
def __init__(
self,
Expand All @@ -58,6 +79,8 @@ def __init__(
self.driver = driver
self._tasks: MutableMapping[int, asyncio.Task[None]] = {}

self.submit_sleep_state = SubmitSleeper(submit_sleep)

self._jobs: MutableMapping[int, Job] = {
real.iens: Job(self, real) for real in (realizations or [])
}
Expand All @@ -70,7 +93,6 @@ def __init__(
self._cancelled = False
self._max_submit = max_submit
self._max_running = max_running
self._submit_sleep = submit_sleep

self._ee_uri = ee_uri
self._ens_id = ens_id
Expand Down Expand Up @@ -169,11 +191,10 @@ async def execute(
)

async def _set_start_event(iens: int, sleep_time: float) -> None:
await asyncio.sleep(sleep_time)
start_events[iens].set()

for idx, iens in enumerate(self._jobs.keys()):
asyncio.create_task(_set_start_event(iens, idx * self._submit_sleep))
asyncio.create_task(_set_start_event(iens, 0))

for task in self._tasks.values():
await task
Expand Down
14 changes: 9 additions & 5 deletions tests/unit_tests/scheduler/test_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import datetime
import json
import random
import shutil
from pathlib import Path
from typing import List
Expand Down Expand Up @@ -389,18 +390,21 @@ async def wait():


@pytest.mark.parametrize(
"submit_sleep, realization_runtime, max_running",
[(0.1, 0.1, 10), (0.1, 0.5, 5)],
"submit_sleep, realization_max_runtime, max_running",
[(1, 10, 5)],
# [(0.1, 0.1, 10), (0.05, 1, 5)],
)
async def test_submit_sleep_with_max_running(
submit_sleep, realization_runtime, max_running, storage, tmp_path, mock_driver
submit_sleep, realization_max_runtime, max_running, storage, tmp_path, mock_driver
):
run_start_times: List[datetime.datetime] = []

async def wait():
nonlocal run_start_times
run_start_times.append(datetime.datetime.now())
await asyncio.sleep(realization_runtime)
# If the realization runtimes are constant, we will never get into
# the situation where we can start many realizations at the same moment
await asyncio.sleep(realization_max_runtime * random.random())

ensemble_size = 10

Expand All @@ -425,4 +429,4 @@ async def wait():
for start, next_start in zip(run_start_times[:-1], run_start_times[1:])
]
print([d.total_seconds() for d in deltas])
assert min(deltas).total_seconds() >= submit_sleep * 0.99
assert min(deltas).total_seconds() >= submit_sleep * 0.9

0 comments on commit f20b9fe

Please sign in to comment.