Skip to content

Commit

Permalink
Bug found and fixed, debugging statements not removed
Browse files Browse the repository at this point in the history
  • Loading branch information
berland committed Jan 5, 2024
1 parent 7ccfe25 commit 8c58a4e
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 19 deletions.
24 changes: 14 additions & 10 deletions src/ert/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,21 +43,25 @@ class _JobsJson:
class SubmitSleeper:
_submit_sleep: float
_last_started: float
_starttime: float

def __init__(self, submit_sleep: float):
self._submit_sleep = submit_sleep
self._last_started = time.time() - submit_sleep
self._last_started = (
time.time() - submit_sleep
) # Allow the first to start immediately
self._starttime = time.time()

async def sleep_until_we_can_submit(self):
now = time.time()
if now - self._last_started > self._submit_sleep:
print("no need to wait, hooray")
self._last_started = now
return
next_allowed_start = now + (now - self._last_started - self._submit_sleep)
self._last_started += self._submit_sleep # too much..
print(f"Sleeping, {self._last_started=}")
await asyncio.sleep(self._submit_sleep)
# next_start_time = self._last_started + self._submit_sleep
next_start_time = max(self._last_started + self._submit_sleep, now)
delay = next_start_time - now
self._last_started = next_start_time
print(
f"at {now - self._starttime}, sleeping {max(0, delay)} until {round(self._last_started - self._starttime, 3)}"
)
await asyncio.sleep(max(0, delay))


class Scheduler:
Expand Down Expand Up @@ -193,7 +197,7 @@ async def execute(
async def _set_start_event(iens: int, sleep_time: float) -> None:
start_events[iens].set()

for idx, iens in enumerate(self._jobs.keys()):
for _, iens in enumerate(self._jobs.keys()):
asyncio.create_task(_set_start_event(iens, 0))

for task in self._tasks.values():
Expand Down
34 changes: 25 additions & 9 deletions tests/unit_tests/scheduler/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,20 +393,34 @@ async def wait():

@pytest.mark.parametrize(
"submit_sleep, realization_max_runtime, max_running",
[(1, 10, 5)],
# [(0.1, 0.1, 10), (0.05, 1, 5)],
[
(0.01, 0.01, 1),
(0.01, 0.1, 5),
# (0.1, 1, 2),
# (0.05, 1, 5),
],
)
async def test_submit_sleep_with_max_running(
submit_sleep, realization_max_runtime, max_running, storage, tmp_path, mock_driver
):
run_start_times: List[datetime.datetime] = []
print()
import time

run_start_times: List[float] = []
iens = 0
globalnow = time.time()

async def wait():
nonlocal run_start_times
run_start_times.append(datetime.datetime.now())
nonlocal run_start_times, iens, globalnow
origiens = iens
print(f"starting realization {iens} at {time.time() - globalnow}")
run_start_times.append(time.time())
# If the realization runtimes are constant, we will never get into
# the situation where we can start many realizations at the same moment
await asyncio.sleep(realization_max_runtime * random.random())
iens += 1
runtime = realization_max_runtime * random.random()
await asyncio.sleep(runtime)
print(f"a realization {origiens} finished in {round(runtime,3)} seconds")

ensemble_size = 10

Expand All @@ -427,8 +441,10 @@ async def wait():
await sch.execute()

deltas = [
next_start - start
round(next_start - start, 3)
for start, next_start in zip(run_start_times[:-1], run_start_times[1:])
]
print([d.total_seconds() for d in deltas])
assert min(deltas).total_seconds() >= submit_sleep * 0.9
print(deltas)
# print([d.total_seconds() for d in deltas])
# 20% deviation is allowed to let tests pass with short runtimes.
assert min(deltas) >= submit_sleep * 0.8

0 comments on commit 8c58a4e

Please sign in to comment.