Skip to content

Commit

Permalink
Add tests for max_runtime and max_submit
Browse files Browse the repository at this point in the history
  • Loading branch information
berland committed Nov 21, 2023
1 parent e9f0149 commit 8e94c28
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 113 deletions.
1 change: 0 additions & 1 deletion src/ert/config/queue_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from dataclasses import dataclass, field
from typing import Any, Dict, List, Mapping, Tuple, no_type_check


from .parsing import (
ConfigDict,
ConfigValidationError,
Expand Down
13 changes: 7 additions & 6 deletions src/ert/job_queue/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,16 @@ def stopped(self) -> bool:

async def stop_jobs_async(self) -> None:
self.kill_all_jobs()
# Wait until all kill commands are acknowlegded by the driver
while any(
(
real
for real in self._realizations
if real.current_state
not in (
RealizationState.IS_KILLED,
RealizationState.DO_KILL_NODE_FAILURE,
RealizationState.FAILED,
RealizationState.IS_KILLED,
RealizationState.SUCCESS,
)
)
):
Expand All @@ -159,7 +160,9 @@ def queue_size(self) -> int:
return len(self._realizations)

def _add_realization(self, realization: QueueableRealization) -> None:
self._realizations.append(RealizationState(self, realization, retries=1))
self._realizations.append(
RealizationState(self, realization, retries=self._queue_config.max_submit - 1)
)

def max_running(self) -> int:
max_running = 0
Expand Down Expand Up @@ -298,7 +301,7 @@ async def execute(
try:
# await self._changes_to_publish.put(self._differ.snapshot()) # Reimplement me!, maybe send waiting states?
while True:
await asyncio.sleep(2)
await asyncio.sleep(1)

for func in evaluators:
func()
Expand Down Expand Up @@ -348,8 +351,6 @@ async def execute(

return EVTYPE_ENSEMBLE_STOPPED

def _add_realization(self, realization: QueueableRealization) -> None:
self._realizations.append(RealizationState(self, realization, retries=1))

def add_realization_from_run_arg(
self,
Expand Down
4 changes: 2 additions & 2 deletions src/ert/job_queue/realization_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import logging
import pathlib
from dataclasses import dataclass
from enum import Enum, auto
from typing import TYPE_CHECKING, Callable, Optional

from statemachine import State, StateMachine

Check failure on line 14 in src/ert/job_queue/realization_state.py

View workflow job for this annotation

GitHub Actions / type-checking (3.11)

Skipping analyzing "statemachine": module is installed, but missing library stubs or py.typed marker
Expand Down Expand Up @@ -104,6 +103,8 @@ def __init__(
donotgohere = UNKNOWN.to(STATUS_FAILURE)

def on_enter_state(self, target, event):

Check failure on line 105 in src/ert/job_queue/realization_state.py

View workflow job for this annotation

GitHub Actions / type-checking (3.11)

Function is missing a type annotation
if self.jobqueue._changes_to_publish is None:
return
if target in (
# RealizationState.WAITING, # This happens too soon (initially)
RealizationState.PENDING,
Expand All @@ -113,7 +114,6 @@ def on_enter_state(self, target, event):
RealizationState.IS_KILLED,
):
change = {self.realization.run_arg.iens: target.id}
assert self.jobqueue._changes_to_publish is not None
asyncio.create_task(self.jobqueue._changes_to_publish.put(change))

def on_enter_SUBMITTED(self):

Check failure on line 119 in src/ert/job_queue/realization_state.py

View workflow job for this annotation

GitHub Actions / type-checking (3.11)

Function is missing a return type annotation
Expand Down
146 changes: 42 additions & 104 deletions tests/unit_tests/job_queue/test_job_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,6 @@
from ert.run_arg import RunArg
from ert.storage import EnsembleAccessor


def wait_for(
func: Callable, target: Any = True, interval: float = 0.1, timeout: float = 30
):
"""Sleeps (with timeout) until the provided function returns the provided target"""
t = 0.0
while func() != target:
time.sleep(interval)
t += interval
if t >= timeout:
raise AssertionError(
"Timeout reached in wait_for "
f"(function {func.__name__}, timeout {timeout}) "
)


DUMMY_CONFIG: Dict[str, Any] = {
"job_script": "job_script.py",
"num_cpu": 1,
Expand Down Expand Up @@ -89,7 +73,6 @@ def create_local_queue(
callback_timeout=callback_timeout,
)
job_queue._add_realization(qreal)

return job_queue


Expand Down Expand Up @@ -132,6 +115,33 @@ async def test_all_realizations_are_failing(tmpdir, monkeypatch, failing_script)
await asyncio.gather(execute_task)


@pytest.mark.asyncio
@pytest.mark.timeout(5)
@pytest.mark.parametrize("max_submit_num", [1, 3])
async def test_max_submit(tmpdir, monkeypatch, failing_script, max_submit_num):
monkeypatch.chdir(tmpdir)
job_queue = create_local_queue(
failing_script, num_realizations=1, max_submit=max_submit_num
)
execute_task = asyncio.create_task(job_queue.execute())
await asyncio.sleep(0.5)
assert Path("dummy_path_0/one_byte_pr_invocation").stat().st_size == max_submit_num
await job_queue.stop_jobs_async()
await asyncio.gather(execute_task)

@pytest.mark.asyncio
@pytest.mark.parametrize("max_submit_num", [1, 3])
async def test_that_kill_queue_disregards_max_submit(tmpdir, max_submit_num, monkeypatch, simple_script):
monkeypatch.chdir(tmpdir)
job_queue = create_local_queue(simple_script, max_submit=max_submit_num)
await job_queue.stop_jobs_async()
execute_task = asyncio.create_task(job_queue.execute())
await asyncio.gather(execute_task)
print(tmpdir)
for iens in range(job_queue.queue_size):
assert not Path(f"dummy_path_{iens}/STATUS").exists()
assert job_queue.count_realization_state(RealizationState.IS_KILLED) == job_queue.queue_size

@pytest.mark.asyncio
@pytest.mark.timeout(5)
async def test_submit_sleep(tmpdir, monkeypatch, never_ending_script):
Expand All @@ -147,41 +157,18 @@ async def test_submit_sleep(tmpdir, monkeypatch, never_ending_script):
await asyncio.gather(execute_task)


def test_timeout_jobs(tmpdir, monkeypatch, never_ending_script):
@pytest.mark.asyncio
@pytest.mark.timeout(5)
async def test_max_runtime(tmpdir, monkeypatch, never_ending_script):
monkeypatch.chdir(tmpdir)

mock_callback = MagicMock()

job_queue = create_local_queue(
never_ending_script,
max_submit=1,
max_runtime=5,
callback_timeout=mock_callback,
job_queue = create_local_queue(never_ending_script, max_runtime=1)
execute_task = asyncio.create_task(job_queue.execute())
await asyncio.sleep(3) # Queue operates slowly..
assert (
job_queue.count_realization_state(RealizationState.IS_KILLED)
== job_queue.queue_size
)

assert job_queue.queue_size == 10
assert job_queue.is_active()

pool_sema = BoundedSemaphore(value=10)
start_all(job_queue, pool_sema)

# Make sure NEVER_ENDING_SCRIPT jobs have started:
wait_for(job_queue.is_active)

# Wait for the timeout to kill them:
wait_for(job_queue.is_active, target=False)

job_queue._differ.transition(job_queue.job_list)

for q_index, job in enumerate(job_queue.job_list):
assert job.queue_status == JobStatus.IS_KILLED
iens = job_queue._differ.qindex_to_iens(q_index)
assert job_queue.snapshot()[iens] == str(JobStatus.IS_KILLED)

assert len(mock_callback.mock_calls) == 20

for job in job_queue.job_list:
job.wait_for()
await asyncio.gather(execute_task)


def test_add_dispatch_info(tmpdir, monkeypatch, simple_script):
Expand Down Expand Up @@ -244,42 +231,18 @@ def test_add_dispatch_info_cert_none(tmpdir, monkeypatch, simple_script):
assert not (runpath / cert_file).exists()


class MockedJob:
def __init__(self, status):
self.queue_status = status
self._start_time = 0
self._current_time = 0
self._end_time = None

@property
def runtime(self):
return self._end_time - self._start_time

def stop(self):
self.queue_status = JobStatus.FAILED

def convertToCReference(self, _):
pass


@pytest.mark.parametrize("max_submit_num", [1, 2, 3])
def test_kill_queue(tmpdir, max_submit_num, monkeypatch, simple_script):
monkeypatch.chdir(tmpdir)
job_queue = create_local_queue(simple_script, max_submit=max_submit_num)
job_queue.kill_all_jobs()
asyncio.run(job_queue.execute())

assert not Path("STATUS").exists()
for job in job_queue.job_list:
assert job.queue_status == JobStatus.FAILED


@pytest.mark.skip(reason="Needs reimplementation")
def test_stop_long_running():
"""
This test should verify that only the jobs that have a runtime
25% longer than the average completed are stopped when
stop_long_running_jobs is called.
"""
MockedJob = None # silencing ruff
JobStatus = None # silencing ruff
job_list = [MockedJob(JobStatus.WAITING) for _ in range(10)]

for i in range(5):
Expand Down Expand Up @@ -320,34 +283,9 @@ def test_stop_long_running():
assert queue.snapshot()[i] == str(JobStatus.RUNNING)


@pytest.mark.parametrize("max_submit_num", [1, 2, 3])
def test_max_submit_reached(tmpdir, max_submit_num, monkeypatch, failing_script):
"""Check that the JobQueue will submit exactly the maximum number of
resubmissions in the case of scripts that fail."""
monkeypatch.chdir(tmpdir)
num_realizations = 2
job_queue = create_local_queue(
failing_script,
max_submit=max_submit_num,
num_realizations=num_realizations,
)

asyncio.run(job_queue.execute())

assert (
Path("one_byte_pr_invocation").stat().st_size
== max_submit_num * num_realizations
)

assert job_queue.is_active() is False

for job in job_queue.job_list:
# one for every realization
assert job.queue_status == JobStatus.FAILED
assert job.submit_attempt == job_queue.max_submit


@pytest.mark.usefixtures("use_tmpdir", "mock_fm_ok")
@pytest.mark.skip(reason="Needs reimplementation")
def test_num_cpu_submitted_correctly_lsf(tmpdir, simple_script):
"""Assert that num_cpu from the ERT configuration is passed on to the bsub
command used to submit jobs to LSF"""
Expand All @@ -362,7 +300,7 @@ def test_num_cpu_submitted_correctly_lsf(tmpdir, simple_script):
num_cpus = 4
os.mkdir(DUMMY_CONFIG["run_path"].format(job_id))

job = JobQueueNode(
job = QueueableRealization(
job_script=simple_script,
num_cpu=4,
run_arg=RunArg(
Expand Down

0 comments on commit 8e94c28

Please sign in to comment.