Skip to content

Commit

Permalink
Fix unit test job queue run_done_callback
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-eq committed Dec 5, 2023
1 parent 1ef5692 commit 151446e
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 37 deletions.
57 changes: 21 additions & 36 deletions tests/unit_tests/scheduler/test_lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def mock_bsub(tmp_path):
with open("job_paths", "a+", encoding="utf-8") as jobs_file:
jobs_file.write(f"{run_path}\\n")
# debug purposes
# to verify which arguments were passed to bsub
with open("bsub_log", "a+", encoding="utf-8") as f:
f.write(f"{' '.join(sys.argv)}\\n")
Expand All @@ -58,7 +58,7 @@ def mock_bsub(tmp_path):
if "exit.sh" in sys.argv:
exit(1)
if "gargled_return.sh" in sys.argv:
if "garbled_return.sh" in sys.argv:
print("wait,this_is_not_a_valid_return_format")
else:
_id = str(random.randint(0, 10000000))
Expand Down Expand Up @@ -137,7 +137,9 @@ def create_fake_bjobs_result(dir: str, job_id: str, status: str):


@pytest.mark.asyncio
async def test_submit_failure_script_exit(mock_bsub, caplog, tmpdir, monkeypatch):
async def test_that_submit_fails_when_bsub_fails(
mock_bsub, caplog, tmpdir, monkeypatch
):
monkeypatch.chdir(tmpdir)
lsf_driver = LSFDriver(None)
lsf_driver = LSFDriver(queue_options=[("BSUB_CMD", tmpdir / "mock_bsub")])
Expand All @@ -153,14 +155,14 @@ async def test_submit_failure_script_exit(mock_bsub, caplog, tmpdir, monkeypatch
job_paths = Path("job_paths").read_text(encoding="utf-8").strip().split("\n")

# should try command 3 times before exiting
assert len(job_paths) == 3
assert len(job_paths) == lsf_driver._max_attempt

output = caplog.text
assert len(re.findall("bsub returned non-zero exitcode: 1", output)) == 3


@pytest.mark.asyncio
async def test_submit_failure_badly_formated_return(
async def test_that_submit_fails_when_bsub_returns_badly_formated_response(
mock_bsub, caplog, tmpdir, monkeypatch
):
monkeypatch.chdir(tmpdir)
Expand All @@ -169,7 +171,7 @@ async def test_submit_failure_badly_formated_return(
lsf_driver._retry_sleep_period = 0

mock_realization_state = MockRealizationState()
mock_realization_state.realization.job_script = "gargled_return.sh"
mock_realization_state.realization.job_script = "garbled_return.sh"

with pytest.raises(RuntimeError, match="Maximum number of submit errors exceeded"):
await lsf_driver.submit(mock_realization_state)
Expand Down Expand Up @@ -205,34 +207,17 @@ async def test_submit_success(mock_bsub, caplog, tmpdir, monkeypatch):


@pytest.mark.asyncio
async def test_poll_statuses_while_already_polling(
mock_bjobs, caplog, tmpdir, monkeypatch
):
monkeypatch.chdir(tmpdir)

lsf_driver = LSFDriver(None)
lsf_driver._currently_polling = True

await lsf_driver.poll_statuses()

# Should not call bjobs
assert not Path("bjobs_logs").exists()

output = caplog.text
assert output == ""
assert lsf_driver._currently_polling


@pytest.mark.asyncio
async def test_poll_statuses_before_submitting_jobs():
async def test_that_poll_statuses_does_not_fails_on_run_before_submitting_jobs():
lsf_driver = LSFDriver(None)

# should not crash
await lsf_driver.poll_statuses()


@pytest.mark.asyncio
async def test_poll_statuses_bjobs_exit_code_1(mock_bjobs, caplog, tmpdir, monkeypatch):
async def test_that_poll_statuses_ignores_failing_bjobs(
mock_bjobs, caplog, tmpdir, monkeypatch
):
monkeypatch.chdir(tmpdir)

lsf_driver = LSFDriver(queue_options=[("BJOBS_CMD", tmpdir / "mock_bjobs")])
Expand Down Expand Up @@ -262,7 +247,7 @@ async def test_poll_statuses_bjobs_exit_code_1(mock_bjobs, caplog, tmpdir, monke


@pytest.mark.asyncio
async def test_poll_statuses_bjobs_returning_unknown_job_id(
async def test_that_poll_statuses_ignores_bjobs_returning_unknown_job_id(
mock_bjobs, tmpdir, monkeypatch
):
monkeypatch.chdir(tmpdir)
Expand Down Expand Up @@ -290,7 +275,7 @@ async def test_poll_statuses_bjobs_returning_unknown_job_id(


@pytest.mark.asyncio
async def test_poll_statuses_bjobs_returning_unrecognized_status(
async def test_that_poll_statuses_fails_on_bjobs_returning_unrecognized_status(
mock_bjobs, tmpdir, monkeypatch
):
monkeypatch.chdir(tmpdir)
Expand All @@ -317,7 +302,7 @@ async def test_poll_statuses_bjobs_returning_unrecognized_status(


@pytest.mark.asyncio
async def test_poll_statuses_bjobs_returning_updated_state(
async def test_that_poll_statuses_runs_bjobs_and_updates_state_correctly(
mock_bjobs, tmpdir, monkeypatch
):
monkeypatch.chdir(tmpdir)
Expand All @@ -344,7 +329,7 @@ async def test_poll_statuses_bjobs_returning_updated_state(


@pytest.mark.asyncio
async def test_kill_bkill_non_existent_jobid_exit_code_1(
async def test_that_kill_fails_when_bkill_fails_on_non_existent_jobid(
mock_bkill, caplog, tmpdir, monkeypatch
):
monkeypatch.chdir(tmpdir)
Expand Down Expand Up @@ -378,7 +363,7 @@ async def test_kill_bkill_non_existent_jobid_exit_code_1(
[[("BSUB_CMD", "/bin/mock/bsub")], ["/bin/mock/bsub"]],
],
)
def test_lsf_build_submit_cmd_adds_driver_options(
def test_that_lsf_build_submit_cmd_adds_queue_options(
options: list[Tuple[str, str]], expected_list
):
lsf_driver = LSFDriver(options)
Expand All @@ -389,7 +374,7 @@ def test_lsf_build_submit_cmd_adds_driver_options(
@pytest.mark.parametrize(
"additional_parameters", [[["test0", "test2", "/home/test3.py"]], [[3, 2]], [[]]]
)
def test_lsf_build_submit_cmd_adds_additional_parameters(
def test_that_lsf_build_submit_cmd_adds_additional_parameters(
additional_parameters: list[str],
):
lsf_driver = LSFDriver(None)
Expand Down Expand Up @@ -419,7 +404,7 @@ def test_lsf_build_submit_cmd_adds_additional_parameters(
],
],
)
def test_lsf_build_submit_cmd_adds_additional_parameters_after_options(
def test_that_build_submit_cmd_adds_additional_parameters_after_options(
options: list[tuple[str, str]],
additional_parameters: list[str],
expected_list: list[str],
Expand All @@ -434,7 +419,7 @@ def test_lsf_build_submit_cmd_adds_additional_parameters_after_options(
[[[("LSF_QUEUE", "test_queue")], ["-q test_queue"]]],
)
@pytest.mark.asyncio
async def test_lsf_submit_lsf_queue_option_is_added(
async def test_that_bsub_is_called_with_queue_options(
driver_options: list[Tuple[str, str]],
expected_bsub_options: list[str],
mock_bsub,
Expand Down Expand Up @@ -501,7 +486,7 @@ def mock_bjobs(tmp_path):
print("JOBID USER STAT QUEUE FROM_HOST EXEC_HOST JOB_NAME SUBMIT_TIME")
timestamp = str(datetime.datetime.now())
timestamp = str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
# File written from the mocked bsub command which provides us with
# the path to where the job actually runs and where we can find i.e
# the job_id and status
Expand Down
3 changes: 2 additions & 1 deletion tests/unit_tests/scheduler/test_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import json
import os
import re
import stat
import time
from pathlib import Path
Expand Down Expand Up @@ -196,7 +197,7 @@ async def test_run_done_callback(
await asyncio.gather(execute_task)
assert scheduler.count_realization_state(expected_state) == scheduler.queue_size
for realstate in scheduler._realizations:
assert realstate._callback_status_msg == "foo"
assert re.search("foo", realstate._callback_status_msg)


def test_add_dispatch_info(tmpdir, monkeypatch, simple_script):
Expand Down

0 comments on commit 151446e

Please sign in to comment.