Skip to content

Commit

Permalink
Add support for SUBMIT_SLEEP
Browse files Browse the repository at this point in the history
Written assuming there will be a global option to set SUBMIT_SLEEP,
but will currently pick from the selected queue system.
  • Loading branch information
berland committed Nov 21, 2023
1 parent 46f0ea8 commit e9f0149
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 27 deletions.
10 changes: 9 additions & 1 deletion src/ert/config/queue_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
class QueueConfig:
job_script: str = shutil.which("job_dispatch.py") or "job_dispatch.py"
max_submit: int = 2
submit_sleep: float = 0
queue_system: QueueSystem = QueueSystem.LOCAL
queue_options: Dict[QueueSystem, List[Tuple[str, str]]] = field(
default_factory=dict
Expand All @@ -91,6 +92,7 @@ def from_dict(cls, config_dict: ConfigDict) -> QueueConfig:
)
job_script = job_script or "job_dispatch.py"
max_submit: int = config_dict.get("MAX_SUBMIT", 2)
submit_sleep: float = config_dict.get("SUBMIT_SLEEP", 0.0)
queue_options: Dict[QueueSystem, List[Tuple[str, str]]] = defaultdict(list)
for queue_system, option_name, *values in config_dict.get("QUEUE_OPTION", []):
if option_name not in VALID_QUEUE_OPTIONS[queue_system]:
Expand All @@ -111,6 +113,9 @@ def from_dict(cls, config_dict: ConfigDict) -> QueueConfig:
" you are effectively replacing the default value provided."
)

if option_name == "SUBMIT_SLEEP" and selected_queue_system == queue_system:
submit_sleep = float(values[0])

for queue_system_val in queue_options:
if queue_options[queue_system_val]:
_validate_queue_driver_settings(
Expand All @@ -128,12 +133,15 @@ def from_dict(cls, config_dict: ConfigDict) -> QueueConfig:
queue_options[selected_queue_system],
)

return QueueConfig(job_script, max_submit, selected_queue_system, queue_options)
return QueueConfig(
job_script, max_submit, submit_sleep, selected_queue_system, queue_options
)

def create_local_copy(self) -> QueueConfig:
return QueueConfig(
self.job_script,
self.max_submit,
self.submit_sleep,
QueueSystem.LOCAL,
self.queue_options,
)
Expand Down
50 changes: 28 additions & 22 deletions src/ert/job_queue/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,14 @@ async def stop_jobs_async(self) -> None:

def kill_all_jobs(self) -> None:
for real in self._realizations:
if real.current_state not in (
RealizationState.DO_KILL,
RealizationState.IS_KILLED,
if real.current_state in (
RealizationState.SUBMITTED,
RealizationState.PENDING,
RealizationState.RUNNING,
):
real.dokill() # Initiates async killing
if real.current_state == RealizationState.WAITING:
real.remove()

@property
def queue_size(self) -> int:
Expand All @@ -166,27 +169,31 @@ def max_running(self) -> int:
return len(self._realizations)
return max_running

def available_capacity(self) -> bool:
return (
self.count_realization_state(RealizationState.RUNNING) < self.max_running()
)

def is_all_reals_state(self, state: RealizationState) -> bool:
return all(real.current_state == state for real in self._realizations)

async def launch_jobs(self) -> None:
while self.available_capacity():
try:
realization = next(
(
real
for real in self._realizations
if real.current_state == RealizationState.WAITING
async def _realization_submitter(self) -> None:
while self.is_active():
while (
self.count_realization_state(RealizationState.RUNNING)
+ self.count_realization_state(RealizationState.SUBMITTED)
< self.max_running()
):
try:
realization = next(
(
real
for real in self._realizations
if real.current_state == RealizationState.WAITING
)
)
)
realization.submit()
except StopIteration:
break
realization.submit()
await asyncio.sleep(self._queue_config.submit_sleep)
except StopIteration:
break
await asyncio.sleep(
0.1 # How fast this reacts to the queue being finished.
)

def set_ee_info(
self,
Expand Down Expand Up @@ -286,12 +293,11 @@ async def execute(

self._changes_to_publish = asyncio.Queue()
asyncio.create_task(self._jobqueue_publisher())
asyncio.create_task(self._realization_submitter())

try:
# await self._changes_to_publish.put(self._differ.snapshot()) # Reimplement me!, maybe send waiting states?
while True:
await self.launch_jobs()

await asyncio.sleep(2)

for func in evaluators:
Expand Down
1 change: 1 addition & 0 deletions src/ert/job_queue/realization_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def __init__(
retry = EXIT.to(SUBMITTED)

dokill = DO_KILL.from_(SUBMITTED, PENDING, RUNNING)
remove = WAITING.to(IS_KILLED)

verify_kill = DO_KILL.to(IS_KILLED)

Expand Down
2 changes: 1 addition & 1 deletion test-data/poly_example/poly.ert
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
JOBNAME poly_%d

QUEUE_SYSTEM LOCAL
QUEUE_OPTION LOCAL MAX_RUNNING 5
QUEUE_OPTION LOCAL MAX_RUNNING 5
--QUEUE_SYSTEM LSF

RUNPATH poly_out/realization-<IENS>/iter-<ITER>
Expand Down
27 changes: 24 additions & 3 deletions tests/unit_tests/job_queue/test_job_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,13 @@ async def test_that_all_jobs_can_be_killed(tmpdir, monkeypatch, never_ending_scr
monkeypatch.chdir(tmpdir)
job_queue = create_local_queue(never_ending_script)
execute_task = asyncio.create_task(job_queue.execute())
while job_queue.count_running() != job_queue.queue_size:
while (
job_queue.count_realization_state(RealizationState.RUNNING)
!= job_queue.queue_size
):
await asyncio.sleep(0.001)
await job_queue.stop_jobs_async()
while job_queue.count_running() > 0:
while job_queue.count_realization_state(RealizationState.RUNNING) > 0:
await asyncio.sleep(0.001)
await asyncio.gather(execute_task)

Expand All @@ -121,11 +124,29 @@ async def test_all_realizations_are_failing(tmpdir, monkeypatch, failing_script)
monkeypatch.chdir(tmpdir)
job_queue = create_local_queue(failing_script, max_submit=1)
execute_task = asyncio.create_task(job_queue.execute())
while job_queue.count_status(RealizationState.FAILED) != job_queue.queue_size:
while (
job_queue.count_realization_state(RealizationState.FAILED)
!= job_queue.queue_size
):
await asyncio.sleep(0.001)
await asyncio.gather(execute_task)


@pytest.mark.asyncio
@pytest.mark.timeout(5)
async def test_submit_sleep(tmpdir, monkeypatch, never_ending_script):
monkeypatch.chdir(tmpdir)
job_queue = create_local_queue(never_ending_script)
job_queue._queue_config.submit_sleep = 0.2
execute_task = asyncio.create_task(job_queue.execute())
await asyncio.sleep(0.1)
assert job_queue.count_realization_state(RealizationState.RUNNING) == 1
await asyncio.sleep(0.3)
assert job_queue.count_realization_state(RealizationState.RUNNING) == 2
await job_queue.stop_jobs_async()
await asyncio.gather(execute_task)


def test_timeout_jobs(tmpdir, monkeypatch, never_ending_script):
monkeypatch.chdir(tmpdir)

Expand Down

0 comments on commit e9f0149

Please sign in to comment.