Skip to content

Commit

Permalink
Support submit_sleep in Scheduler
Browse files Browse the repository at this point in the history
Notes:
* The test setup possibly allows flakyness
* Picks SUBMIT_SLEEP from queue system configuration,
  but leans towards a future global setting for SUBMIT_SLEEP.
  • Loading branch information
berland committed Dec 28, 2023
1 parent 8980a3f commit 54e980e
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 4 deletions.
9 changes: 8 additions & 1 deletion src/ert/config/queue_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
class QueueConfig:
job_script: str = shutil.which("job_dispatch.py") or "job_dispatch.py"
max_submit: int = 2
submit_sleep: float = 0
queue_system: QueueSystem = QueueSystem.LOCAL
queue_options: Dict[QueueSystem, List[Tuple[str, str]]] = field(
default_factory=dict
Expand All @@ -48,6 +49,7 @@ def from_dict(cls, config_dict: ConfigDict) -> QueueConfig:
)
job_script = job_script or "job_dispatch.py"
max_submit: int = config_dict.get("MAX_SUBMIT", 2)
submit_sleep: float = config_dict.get("SUBMIT_SLEEP", 0.0)
queue_options: Dict[QueueSystem, List[Tuple[str, str]]] = defaultdict(list)
for queue_system, option_name, *values in config_dict.get("QUEUE_OPTION", []):
if option_name not in VALID_QUEUE_OPTIONS[queue_system]:
Expand All @@ -67,6 +69,8 @@ def from_dict(cls, config_dict: ConfigDict) -> QueueConfig:
" usually provided by the site-configuration file, beware that"
" you are effectively replacing the default value provided."
)
if option_name == "SUBMIT_SLEEP" and selected_queue_system == queue_system:
submit_sleep = float(values[0])

for queue_system_val in queue_options:
if queue_options[queue_system_val]:
Expand All @@ -85,12 +89,15 @@ def from_dict(cls, config_dict: ConfigDict) -> QueueConfig:
queue_options[selected_queue_system],
)

return QueueConfig(job_script, max_submit, selected_queue_system, queue_options)
return QueueConfig(
job_script, max_submit, submit_sleep, selected_queue_system, queue_options
)

def create_local_copy(self) -> QueueConfig:
return QueueConfig(
self.job_script,
self.max_submit,
self.submit_sleep,
QueueSystem.LOCAL,
self.queue_options,
)
Expand Down
1 change: 1 addition & 0 deletions src/ert/ensemble_evaluator/_builder/_legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ async def _evaluate_inner( # pylint: disable=too-many-branches
self.active_reals,
max_submit=self._queue_config.max_submit,
max_running=self._queue_config.max_running,
submit_sleep=self._queue_config.submit_sleep,
ens_id=self.id_,
ee_uri=self._config.dispatch_uri,
ee_cert=self._config.cert,
Expand Down
15 changes: 12 additions & 3 deletions src/ert/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def __init__(
*,
max_submit: int = 1,
max_running: int = 1,
submit_sleep: float = 0,
ens_id: Optional[str] = None,
ee_uri: Optional[str] = None,
ee_cert: Optional[str] = None,
Expand All @@ -68,6 +69,7 @@ def __init__(
self._cancelled = False
self._max_submit = max_submit
self._max_running = max_running
self._submit_sleep = submit_sleep

self._ee_uri = ee_uri
self._ens_id = ens_id
Expand Down Expand Up @@ -118,14 +120,21 @@ async def execute(
cancel_when_execute_is_done(self._process_event_queue())
cancel_when_execute_is_done(self.driver.poll())

start = asyncio.Event()
start_events: MutableMapping[int, asyncio.Event] = {}
sem = asyncio.BoundedSemaphore(self._max_running or len(self._jobs))
for iens, job in self._jobs.items():
start_events[iens] = asyncio.Event()
self._tasks[iens] = asyncio.create_task(
job(start, sem, self._max_submit)
job(start_events[iens], sem, self._max_submit)
)

start.set()
async def _set_start_event(iens: int, sleep_time: float) -> None:
await asyncio.sleep(sleep_time)
start_events[iens].set()

for idx, iens in enumerate(self._jobs.keys()):
asyncio.create_task(_set_start_event(iens, idx * self._submit_sleep))

for task in self._tasks.values():
await task

Expand Down
44 changes: 44 additions & 0 deletions tests/unit_tests/scheduler/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,3 +282,47 @@ async def kill():
# The result from execute is that we were cancelled, not stopped
# as if the timeout happened before kill_all_jobs()
assert scheduler_task.result() == EVTYPE_ENSEMBLE_CANCELLED


@pytest.mark.parametrize(
"submit_sleep, iens_stride, realization_runtime, expected_max_running",
[(0, 1, 0.1, 10), (0.1, 1, 0.1, 2), (0.1, 2, 0.1, 2), (0.1, 3, 0.4, 5)],
)
async def test_submit_sleep(
submit_sleep,
iens_stride,
realization_runtime,
expected_max_running,
storage,
tmp_path,
mock_driver,
):
currently_running = 0
max_running_observed = 0

async def wait():
nonlocal currently_running, realization_runtime
currently_running += 1
await asyncio.sleep(realization_runtime)
currently_running -= 1

ensemble_size = 10

ensemble = storage.create_experiment().create_ensemble(
name="foo", ensemble_size=ensemble_size * iens_stride
)
realizations = [
create_stub_realization(ensemble, tmp_path, iens * iens_stride)
for iens in range(ensemble_size)
]

sch = scheduler.Scheduler(
mock_driver(wait=wait), realizations, submit_sleep=submit_sleep, max_running=0
)
task = asyncio.create_task(sch.execute())

while not task.done():
max_running_observed = max(currently_running, max_running_observed)
assert currently_running <= expected_max_running
await asyncio.sleep(0)
assert max_running_observed == expected_max_running

0 comments on commit 54e980e

Please sign in to comment.