Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test max runtime #6619

Merged
merged 1 commit into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/ert/config/queue_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from dataclasses import dataclass, field
from typing import Any, Dict, List, Mapping, Tuple, no_type_check


from .parsing import (
ConfigDict,
ConfigValidationError,
Expand Down
13 changes: 7 additions & 6 deletions src/ert/job_queue/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,16 @@ def stopped(self) -> bool:

async def stop_jobs_async(self) -> None:
self.kill_all_jobs()
# Wait until all kill commands are acknowlegded by the driver
while any(
(
real
for real in self._realizations
if real.current_state
not in (
RealizationState.IS_KILLED,
RealizationState.DO_KILL_NODE_FAILURE,
RealizationState.FAILED,
RealizationState.IS_KILLED,
RealizationState.SUCCESS,
)
)
):
Expand All @@ -159,7 +160,9 @@ def queue_size(self) -> int:
return len(self._realizations)

def _add_realization(self, realization: QueueableRealization) -> None:
self._realizations.append(RealizationState(self, realization, retries=1))
self._realizations.append(
RealizationState(self, realization, retries=self._queue_config.max_submit - 1)
)

def max_running(self) -> int:
max_running = 0
Expand Down Expand Up @@ -298,7 +301,7 @@ async def execute(
try:
# await self._changes_to_publish.put(self._differ.snapshot()) # Reimplement me!, maybe send waiting states?
while True:
await asyncio.sleep(2)
await asyncio.sleep(1)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lower number makes tests go faster


for func in evaluators:
func()
Expand Down Expand Up @@ -348,8 +351,6 @@ async def execute(

return EVTYPE_ENSEMBLE_STOPPED

def _add_realization(self, realization: QueueableRealization) -> None:
self._realizations.append(RealizationState(self, realization, retries=1))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(this was duplicated)

def add_realization_from_run_arg(
self,
Expand Down
4 changes: 2 additions & 2 deletions src/ert/job_queue/realization_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@
import logging
import pathlib
from dataclasses import dataclass
from enum import Enum, auto
from typing import TYPE_CHECKING, Callable, Optional

from statemachine import State, StateMachine

Check failure on line 14 in src/ert/job_queue/realization_state.py

View workflow job for this annotation

GitHub Actions / type-checking (3.11)

Skipping analyzing "statemachine": module is installed, but missing library stubs or py.typed marker

from ert.constant_filenames import ERROR_file, STATUS_file

Expand All @@ -34,15 +33,15 @@
max_runtime: Optional[int] = None
callback_timeout: Optional[Callable[[int], None]] = None

def __hash__(self):

Check failure on line 36 in src/ert/job_queue/realization_state.py

View workflow job for this annotation

GitHub Actions / type-checking (3.11)

Function is missing a return type annotation
# Elevate iens up to two levels? Check if it can be removed from run_arg
return self.run_arg.iens

def __repr__(self):

Check failure on line 40 in src/ert/job_queue/realization_state.py

View workflow job for this annotation

GitHub Actions / type-checking (3.11)

Function is missing a type annotation
return str(self.run_arg.iens)


class RealizationState(StateMachine):

Check failure on line 44 in src/ert/job_queue/realization_state.py

View workflow job for this annotation

GitHub Actions / type-checking (3.11)

Class cannot subclass "StateMachine" (has type "Any")
NOT_ACTIVE = State("NOT ACTIVE")
WAITING = State("WAITING", initial=True)
SUBMITTED = State("SUBMITTED")
Expand Down Expand Up @@ -103,7 +102,9 @@

donotgohere = UNKNOWN.to(STATUS_FAILURE)

def on_enter_state(self, target, event):

Check failure on line 105 in src/ert/job_queue/realization_state.py

View workflow job for this annotation

GitHub Actions / type-checking (3.11)

Function is missing a type annotation
if self.jobqueue._changes_to_publish is None:
return
if target in (
# RealizationState.WAITING, # This happens too soon (initially)
RealizationState.PENDING,
Expand All @@ -113,24 +114,23 @@
RealizationState.IS_KILLED,
):
change = {self.realization.run_arg.iens: target.id}
assert self.jobqueue._changes_to_publish is not None
asyncio.create_task(self.jobqueue._changes_to_publish.put(change))

def on_enter_SUBMITTED(self):

Check failure on line 119 in src/ert/job_queue/realization_state.py

View workflow job for this annotation

GitHub Actions / type-checking (3.11)

Function is missing a return type annotation
asyncio.create_task(self.jobqueue.driver.submit(self))

def on_enter_RUNNING(self):

Check failure on line 122 in src/ert/job_queue/realization_state.py

View workflow job for this annotation

GitHub Actions / type-checking (3.11)

Function is missing a return type annotation
self.start_time = datetime.datetime.now()

def on_enter_EXIT(self):

Check failure on line 125 in src/ert/job_queue/realization_state.py

View workflow job for this annotation

GitHub Actions / type-checking (3.11)

Function is missing a return type annotation
if self.retries_left > 0:
self.retry()
self.retries_left -= 1
else:
self.invalidate()

def on_enter_DONE(self):

Check failure on line 132 in src/ert/job_queue/realization_state.py

View workflow job for this annotation

GitHub Actions / type-checking (3.11)

Function is missing a return type annotation
asyncio.create_task(self.jobqueue.run_done_callback(self))

def on_enter_DO_KILL(self):

Check failure on line 135 in src/ert/job_queue/realization_state.py

View workflow job for this annotation

GitHub Actions / type-checking (3.11)

Function is missing a return type annotation
asyncio.create_task(self.jobqueue.driver.kill(self))
146 changes: 42 additions & 104 deletions tests/unit_tests/job_queue/test_job_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,6 @@
from ert.run_arg import RunArg
from ert.storage import EnsembleAccessor


def wait_for(
func: Callable, target: Any = True, interval: float = 0.1, timeout: float = 30
):
"""Sleeps (with timeout) until the provided function returns the provided target"""
t = 0.0
while func() != target:
time.sleep(interval)
t += interval
if t >= timeout:
raise AssertionError(
"Timeout reached in wait_for "
f"(function {func.__name__}, timeout {timeout}) "
)


DUMMY_CONFIG: Dict[str, Any] = {
"job_script": "job_script.py",
"num_cpu": 1,
Expand Down Expand Up @@ -89,7 +73,6 @@ def create_local_queue(
callback_timeout=callback_timeout,
)
job_queue._add_realization(qreal)

return job_queue


Expand Down Expand Up @@ -132,6 +115,33 @@ async def test_all_realizations_are_failing(tmpdir, monkeypatch, failing_script)
await asyncio.gather(execute_task)


@pytest.mark.asyncio
@pytest.mark.timeout(5)
@pytest.mark.parametrize("max_submit_num", [1, 3])
async def test_max_submit(tmpdir, monkeypatch, failing_script, max_submit_num):
monkeypatch.chdir(tmpdir)
job_queue = create_local_queue(
failing_script, num_realizations=1, max_submit=max_submit_num
)
execute_task = asyncio.create_task(job_queue.execute())
await asyncio.sleep(0.5)
assert Path("dummy_path_0/one_byte_pr_invocation").stat().st_size == max_submit_num
await job_queue.stop_jobs_async()
await asyncio.gather(execute_task)

@pytest.mark.asyncio
@pytest.mark.parametrize("max_submit_num", [1, 3])
async def test_that_kill_queue_disregards_max_submit(tmpdir, max_submit_num, monkeypatch, simple_script):
monkeypatch.chdir(tmpdir)
job_queue = create_local_queue(simple_script, max_submit=max_submit_num)
await job_queue.stop_jobs_async()
execute_task = asyncio.create_task(job_queue.execute())
await asyncio.gather(execute_task)
print(tmpdir)
for iens in range(job_queue.queue_size):
assert not Path(f"dummy_path_{iens}/STATUS").exists()
assert job_queue.count_realization_state(RealizationState.IS_KILLED) == job_queue.queue_size

@pytest.mark.asyncio
@pytest.mark.timeout(5)
async def test_submit_sleep(tmpdir, monkeypatch, never_ending_script):
Expand All @@ -147,41 +157,18 @@ async def test_submit_sleep(tmpdir, monkeypatch, never_ending_script):
await asyncio.gather(execute_task)


def test_timeout_jobs(tmpdir, monkeypatch, never_ending_script):
@pytest.mark.asyncio
@pytest.mark.timeout(5)
async def test_max_runtime(tmpdir, monkeypatch, never_ending_script):
monkeypatch.chdir(tmpdir)

mock_callback = MagicMock()

job_queue = create_local_queue(
never_ending_script,
max_submit=1,
max_runtime=5,
callback_timeout=mock_callback,
job_queue = create_local_queue(never_ending_script, max_runtime=1)
execute_task = asyncio.create_task(job_queue.execute())
await asyncio.sleep(3) # Queue operates slowly..
assert (
job_queue.count_realization_state(RealizationState.IS_KILLED)
== job_queue.queue_size
)

assert job_queue.queue_size == 10
assert job_queue.is_active()

pool_sema = BoundedSemaphore(value=10)
start_all(job_queue, pool_sema)

# Make sure NEVER_ENDING_SCRIPT jobs have started:
wait_for(job_queue.is_active)

# Wait for the timeout to kill them:
wait_for(job_queue.is_active, target=False)

job_queue._differ.transition(job_queue.job_list)

for q_index, job in enumerate(job_queue.job_list):
assert job.queue_status == JobStatus.IS_KILLED
iens = job_queue._differ.qindex_to_iens(q_index)
assert job_queue.snapshot()[iens] == str(JobStatus.IS_KILLED)

assert len(mock_callback.mock_calls) == 20

for job in job_queue.job_list:
job.wait_for()
await asyncio.gather(execute_task)


def test_add_dispatch_info(tmpdir, monkeypatch, simple_script):
Expand Down Expand Up @@ -244,42 +231,18 @@ def test_add_dispatch_info_cert_none(tmpdir, monkeypatch, simple_script):
assert not (runpath / cert_file).exists()


class MockedJob:
def __init__(self, status):
self.queue_status = status
self._start_time = 0
self._current_time = 0
self._end_time = None

@property
def runtime(self):
return self._end_time - self._start_time

def stop(self):
self.queue_status = JobStatus.FAILED

def convertToCReference(self, _):
pass


@pytest.mark.parametrize("max_submit_num", [1, 2, 3])
def test_kill_queue(tmpdir, max_submit_num, monkeypatch, simple_script):
monkeypatch.chdir(tmpdir)
job_queue = create_local_queue(simple_script, max_submit=max_submit_num)
job_queue.kill_all_jobs()
asyncio.run(job_queue.execute())

assert not Path("STATUS").exists()
for job in job_queue.job_list:
assert job.queue_status == JobStatus.FAILED


@pytest.mark.skip(reason="Needs reimplementation")
def test_stop_long_running():
"""
This test should verify that only the jobs that have a runtime
25% longer than the average completed are stopped when
stop_long_running_jobs is called.
"""
MockedJob = None # silencing ruff
JobStatus = None # silencing ruff
job_list = [MockedJob(JobStatus.WAITING) for _ in range(10)]

for i in range(5):
Expand Down Expand Up @@ -320,34 +283,9 @@ def test_stop_long_running():
assert queue.snapshot()[i] == str(JobStatus.RUNNING)


@pytest.mark.parametrize("max_submit_num", [1, 2, 3])
def test_max_submit_reached(tmpdir, max_submit_num, monkeypatch, failing_script):
"""Check that the JobQueue will submit exactly the maximum number of
resubmissions in the case of scripts that fail."""
monkeypatch.chdir(tmpdir)
num_realizations = 2
job_queue = create_local_queue(
failing_script,
max_submit=max_submit_num,
num_realizations=num_realizations,
)

asyncio.run(job_queue.execute())

assert (
Path("one_byte_pr_invocation").stat().st_size
== max_submit_num * num_realizations
)

assert job_queue.is_active() is False

for job in job_queue.job_list:
# one for every realization
assert job.queue_status == JobStatus.FAILED
assert job.submit_attempt == job_queue.max_submit


@pytest.mark.usefixtures("use_tmpdir", "mock_fm_ok")
@pytest.mark.skip(reason="Needs reimplementation")
def test_num_cpu_submitted_correctly_lsf(tmpdir, simple_script):
"""Assert that num_cpu from the ERT configuration is passed on to the bsub
command used to submit jobs to LSF"""
Expand All @@ -362,7 +300,7 @@ def test_num_cpu_submitted_correctly_lsf(tmpdir, simple_script):
num_cpus = 4
os.mkdir(DUMMY_CONFIG["run_path"].format(job_id))

job = JobQueueNode(
job = QueueableRealization(
job_script=simple_script,
num_cpu=4,
run_arg=RunArg(
Expand Down
Loading