Skip to content

Commit

Permalink
Fix lsf integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-eq committed Dec 4, 2023
1 parent 347ecca commit 1ef5692
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 63 deletions.
12 changes: 6 additions & 6 deletions src/ert/scheduler/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,9 @@ async def run_with_retries(
function_output = await func()
if function_output:
return function_output
await asyncio.sleep(self._retry_sleep_period)
except asyncio.CancelledError as e:
logger.error(e)
await asyncio.sleep(self._retry_sleep_period)
await asyncio.sleep(self._retry_sleep_period)
raise RuntimeError(error_msg)

async def submit(self, realization: "RealizationState") -> None:
Expand All @@ -177,12 +176,12 @@ async def _submit(
(process, output, error) = result
self._submit_processes[realization] = process
lsf_id_match = re.match(
"Job <\\d+> is submitted to \\w+ queue <\\w+>\\.", output.decode()
"Job <(\\d+)> is submitted to \\w+ queue <\\w+>\\.", output.decode()
)
if lsf_id_match is None:
logger.error(f"Could not parse lsf id from: {output.decode()}")
return False
lsf_id = lsf_id_match.group(0)
lsf_id = lsf_id_match.group(1)
self._realstate_to_lsfid[realization] = lsf_id
self._lsfid_to_realstate[lsf_id] = realization
realization.accept()
Expand All @@ -205,6 +204,7 @@ async def run_shell_command(
stderr=asyncio.subprocess.PIPE,
)
output, _error = await process.communicate()
logger.debug(output)
if process.returncode != 0:
logger.error(
(
Expand Down Expand Up @@ -237,7 +237,7 @@ async def poll_statuses(self) -> None:
return
except ValueError as e:
# raise this value error as runtime error
raise RuntimeError(e)
raise RuntimeError(e) from e

async def _poll_statuses(self, poll_cmd: List[str]) -> bool:
self._currently_polling = True
Expand All @@ -257,7 +257,7 @@ async def _poll_statuses(self, poll_cmd: List[str]) -> bool:
continue
if tokens[0] not in self._lsfid_to_realstate:
# A LSF id we know nothing of, this should not happen.
raise ValueError(f"Found unknown job id ({tokens[0]})")
continue

realstate = self._lsfid_to_realstate[tokens[0]]

Expand Down
14 changes: 12 additions & 2 deletions src/ert/scheduler/realization_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import pathlib
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Callable, List, Optional
from typing import TYPE_CHECKING, Callable, List, Optional, Set

from lxml import etree
from statemachine import State, StateMachine
Expand Down Expand Up @@ -65,6 +65,7 @@ def __init__(
self.retries_left: int = retries
self._max_submit = retries + 1
self._callback_status_msg: str = ""
self.background_tasks: Set[asyncio.Task[None]] = set()
super().__init__()

allocate = UNKNOWN.to(NOT_ACTIVE)
Expand Down Expand Up @@ -117,7 +118,16 @@ def on_enter_state(self, target: RealizationState) -> None:
asyncio.create_task(self.jobqueue._statechanges_to_publish.put(change))

def on_enter_SUBMITTED(self) -> None:
asyncio.create_task(self.jobqueue.driver.submit(self))
task = asyncio.create_task(self._submit_async())
self.background_tasks.add(task)
task.add_done_callback(self.background_tasks.discard)

async def _submit_async(self) -> None:
try:
await self.jobqueue.driver.submit(self)
except Exception:
self.somethingwentwrong()
raise

def on_enter_RUNNING(self) -> None:
self.start_time = datetime.datetime.now()
Expand Down
213 changes: 158 additions & 55 deletions tests/unit_tests/scheduler/test_lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@
from pathlib import Path
from textwrap import dedent
from threading import BoundedSemaphore
from typing import Any, Dict, List
from unittest.mock import AsyncMock, MagicMock, patch
from typing import Dict, List, Tuple
from typing import Any, Dict, List, Tuple
from unittest.mock import MagicMock

import pytest

Expand All @@ -22,7 +21,6 @@
from ert.scheduler.driver import LSFDriver
from ert.storage import EnsembleAccessor


DUMMY_CONFIG: Dict[str, Any] = {
"job_script": "job_script.py",
"num_cpu": 1,
Expand All @@ -37,7 +35,6 @@
an actual cluster node might have done."""



@pytest.fixture
def mock_bsub(tmp_path):
script_path = tmp_path / "mock_bsub"
Expand Down Expand Up @@ -97,46 +94,6 @@ def mock_bkill(tmp_path):
os.chmod(script_path, 0o755)


@pytest.fixture
def mock_bjobs(tmp_path):
script = "#!/usr/bin/env python3" + dedent(
"""
import datetime
import json
import os.path
import sys
# Just to have a log for test purposes what is actually thrown
# towards the bjobs command
with open("bjobs_log", "a+", encoding="utf-8") as f:
f.write(f"{str(sys.argv)}\\n")
print("JOBID\tUSER\tSTAT\tQUEUE\tFROM_HOST\tEXEC_HOST\tJOB_NAME\tSUBMIT_TIME")
# Statuses LSF can give us
# "PEND"
# "SSUSP"
# "PSUSP"
# "USUSP"
# "RUN"
# "EXIT"
# "ZOMBI" : does not seem to be available from the api.
# "DONE"
# "PDONE" : Post-processor is done.
# "UNKWN"
with open("mocked_result", mode="r+", encoding="utf-8") as result_line:
result = result_line.read()
if "exit" in result.split("\t"):
exit(1)
print(result)
"""
)
script_path = tmp_path / "mock_bjobs"
with open(script_path, "w", encoding="utf-8") as fh:
fh.write(script)

os.chmod(script_path, 0o755)


class MockStateHandler:
id = "SUBMITTED"

Expand Down Expand Up @@ -255,7 +212,7 @@ async def test_poll_statuses_while_already_polling(

lsf_driver = LSFDriver(None)
lsf_driver._currently_polling = True

await lsf_driver.poll_statuses()

# Should not call bjobs
Expand Down Expand Up @@ -320,9 +277,8 @@ async def test_poll_statuses_bjobs_returning_unknown_job_id(
lsf_driver._realstate_to_lsfid[mock_realization_state] = "valid_job_id"
lsf_driver._lsfid_to_realstate["valid_job_id"] = mock_realization_state

# should print out and ignore the unknown job id
with pytest.raises(RuntimeError, match="Found unknown job id \\(unknown_job_id\\)"):
await lsf_driver.poll_statuses()
# Should ignore the unknown job id
await lsf_driver.poll_statuses()

bjobs_logs = Path("bjobs_log").read_text(encoding="utf-8").strip().split("\n")

Expand Down Expand Up @@ -402,10 +358,8 @@ async def test_kill_bkill_non_existent_jobid_exit_code_1(
await lsf_driver.kill(mock_realization_state)

output = caplog.text
out_log = output.split("\n")
job_ids_from_file = Path("job_ids").read_text(encoding="utf-8").strip().split("\n")
assert len(job_ids_from_file) == lsf_driver._max_attempt
print(f"{out_log=}")
assert (
len(re.findall("bkill: jobid non_existent_jobid not found", output))
== lsf_driver._max_attempt
Expand Down Expand Up @@ -489,7 +443,9 @@ async def test_lsf_submit_lsf_queue_option_is_added(
):
monkeypatch.chdir(tmpdir)

lsf_driver = LSFDriver(queue_options=[("BSUB_CMD", tmpdir / "mock_bsub"), *driver_options])
lsf_driver = LSFDriver(
queue_options=[("BSUB_CMD", tmpdir / "mock_bsub"), *driver_options]
)

mock_realization_state = MockRealizationState()
mock_realization_state.realization.job_script = "valid_script.sh"
Expand All @@ -510,7 +466,7 @@ def copy_lsf_poly_case(copy_poly_case, tmp_path):

config = [
"JOBNAME poly_%d\n",
"QUEUE_SYSTEM LSF\n",
"QUEUE_SYSTEM LSF\n",
"QUEUE_OPTION LSF MAX_RUNNING 10\n",
f"QUEUE_OPTION LSF BJOBS_CMD {tmp_path}/mock_bjobs\n",
f"QUEUE_OPTION LSF BSUB_CMD {tmp_path}/mock_bsub\n",
Expand All @@ -527,10 +483,157 @@ def copy_lsf_poly_case(copy_poly_case, tmp_path):
fh.writelines(config)


@pytest.mark.skip(reason="Needs reimplementation")
@pytest.fixture
def mock_bjobs(tmp_path):
script = "#!/usr/bin/env python3" + dedent(
"""
import datetime
import json
import os.path
import sys
import time
# Just to have a log for test purposes what is actually thrown
# towards the bjobs command
with open("bjobs_log", "a+", encoding="utf-8") as f:
f.write(f"{str(sys.argv)}\\n")
print("JOBID USER STAT QUEUE FROM_HOST EXEC_HOST JOB_NAME SUBMIT_TIME")
timestamp = str(datetime.datetime.now())
# File written from the mocked bsub command which provides us with
# the path to where the job actually runs and where we can find i.e
# the job_id and status
if not os.path.isfile("job_paths"):
with open("mocked_result", mode="r+", encoding="utf-8") as result_line:
result = result_line.read()
if "exit" in result.split("\t"):
exit(1)
print(result)
else:
with open("job_paths", encoding="utf-8") as job_paths_file:
job_paths = job_paths_file.read().splitlines()
for line in job_paths:
if not os.path.isfile(line + "/lsf_info.json"):
continue
# ERT has picked up the mocked response from mock_bsub and
# written the id to file
with open(line + "/lsf_info.json") as id_file:
_id = json.load(id_file)["job_id"]
# Statuses LSF can give us
# "PEND"
# "SSUSP"
# "PSUSP"
# "USUSP"
# "RUN"
# "EXIT"
# "ZOMBI" : does not seem to be available from the api.
# "DONE"
# "PDONE" : Post-processor is done.
# "UNKWN"
status = "RUN"
if os.path.isfile(f"{line}/OK"):
status = "DONE"
# Together with the headerline this is actually how LSF is
# providing its statuses on the job and how we are picking these
# up. In this mocked version i just check if the job is done
# with the OK file and then print that status for the job_id
# retrieved from the same runpath.
print(
f"{_id} pytest {status} normal"
f" mock_host mock_exec_host poly_0 {timestamp}"
)
"""
)
script_path = tmp_path / "mock_bjobs"
with open(script_path, "w", encoding="utf-8") as fh:
fh.write(script)

os.chmod(script_path, 0o755)


def make_failing_bsub(script_path, success_script):
"""
Approx 3/10 of the submits will fail due to the random generator in the
created mocked bsub script. By using the retry functionality towards
queue-errors in job_queue.cpp we should still manage to finalize all our runs
before exhausting the limits
"""
script_path.write_text(
"#!/usr/bin/env python3"
+ dedent(
f"""
import random
import sys
import subprocess
num = random.random()
if num > 0.7:
exit(1)
subprocess.call(["python", "{success_script}"] + sys.argv)
"""
)
)

os.chmod(script_path, 0o755)


def make_mock_bsub(script_path):
script_path.write_text(
"#!/usr/bin/env python3"
+ dedent(
"""
import random
import subprocess
import sys
import json
from pathlib import Path
job_dispatch_path = sys.argv[-2]
run_path = sys.argv[-1]
# Write a file with the runpaths to where the jobs are running and
# writing information we later need when providing statuses for the
# jobs through the mocked bjobs command
with open("job_paths", "a+", encoding="utf-8") as jobs_file:
jobs_file.write(f"{run_path}\\n")
# Just a log for testpurposes showing what is thrown against the
# bsub command
with open("bsub_log", "a+", encoding="utf-8") as f:
f.write(f"{str(sys.argv)}\\n")
# Assigning a "unique" job id for each submitted job and print. This
# is how LSF provide response to ERT with the ID of the job.
_id = str(random.randint(0, 10000000))
print(f"Job <{_id}> is submitted to default queue <normal>.")
with open(Path(run_path,"lsf_info.json"), mode="a+", encoding="utf-8") as f:
lsf_data = {
"job_id": _id
}
f.write(json.dumps(lsf_data))
# Launch job-dispatch
subprocess.Popen([job_dispatch_path, run_path])
"""
)
)
os.chmod(script_path, 0o755)


@pytest.fixture(params=["success", "fail"])
def mock_bsub_integration_test(request, tmp_path):
if request.param == "success":
return make_mock_bsub(tmp_path / "mock_bsub")
else:
make_mock_bsub(tmp_path / "success_bsub")
return make_failing_bsub(tmp_path / "mock_bsub", tmp_path / "success_bsub")


@pytest.mark.usefixtures(
"copy_lsf_poly_case",
"mock_bsub",
"mock_bsub_integration_test",
"mock_bjobs",
"mock_start_server",
)
Expand Down

0 comments on commit 1ef5692

Please sign in to comment.