Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log LSF execution host to Azure #8826

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions src/ert/scheduler/lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,15 @@ class JobData:
iens: int
job_state: AnyJob
submitted_timestamp: float
exec_hosts: str = "-"


def parse_bjobs(bjobs_output: str) -> Dict[str, JobState]:
data: Dict[str, JobState] = {}
for line in bjobs_output.splitlines():
tokens = line.split(sep="^")
if len(tokens) == 2:
job_id, job_state = tokens
if len(tokens) == 3:
job_id, job_state, _ = tokens
if job_state not in get_args(JobState):
logger.error(
f"Unknown state {job_state} obtained from "
Expand All @@ -127,6 +128,16 @@ def parse_bjobs(bjobs_output: str) -> Dict[str, JobState]:
return data


def parse_bjobs_exec_hosts(bjobs_output: str) -> Dict[str, str]:
data: Dict[str, str] = {}
for line in bjobs_output.splitlines():
tokens = line.split(sep="^")
if len(tokens) == 3:
job_id, _, exec_hosts = tokens
data[job_id] = exec_hosts
return data


def build_resource_requirement_string(
exclude_hosts: Sequence[str],
realization_memory: int,
Expand Down Expand Up @@ -423,7 +434,7 @@ async def poll(self) -> None:
str(self._bjobs_cmd),
"-noheader",
"-o",
"jobid stat delimiter='^'",
"jobid stat exec_host delimiter='^'",
*current_jobids,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
Expand All @@ -440,6 +451,9 @@ async def poll(self) -> None:
f"bjobs gave returncode {process.returncode} and error {stderr.decode()}"
)
bjobs_states = _parse_jobs_dict(parse_bjobs(stdout.decode(errors="ignore")))
self.update_and_log_exec_hosts(
parse_bjobs_exec_hosts(stdout.decode(errors="ignore"))
)

job_ids_found_in_bjobs_output = set(bjobs_states.keys())
if (
Expand Down Expand Up @@ -491,7 +505,6 @@ async def _process_job_update(self, job_id: str, new_state: AnyJob) -> None:
logger.info(f"Realization {iens} (LSF-id: {self._iens2jobid[iens]}) failed")
exit_code = await self._get_exit_code(job_id)
event = FinishedEvent(iens=iens, returncode=exit_code)

elif isinstance(new_state, FinishedJobSuccess):
logger.info(
f"Realization {iens} (LSF-id: {self._iens2jobid[iens]}) succeeded"
Expand Down Expand Up @@ -606,6 +619,14 @@ async def _poll_once_by_bhist(
self._bhist_cache_timestamp = time.time()
return _parse_jobs_dict(jobs)

def update_and_log_exec_hosts(self, bjobs_exec_hosts: Dict[str, str]) -> None:
for job_id, exec_hosts in bjobs_exec_hosts.items():
if self._jobs[job_id].exec_hosts == "-":
logger.info(
f"Realization {self._jobs[job_id].iens} was assigned to host: {exec_hosts}"
)
self._jobs[job_id].exec_hosts = exec_hosts

def _build_resource_requirement_arg(self, realization_memory: int) -> List[str]:
resource_requirement_string = build_resource_requirement_string(
self._exclude_hosts,
Expand Down
2 changes: 1 addition & 1 deletion tests/ert/unit_tests/scheduler/bin/bjobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def get_parser() -> argparse.ArgumentParser:


def bjobs_formatter(jobstats: List[Job]) -> str:
return "".join([f"{job.job_id}^{job.job_state}\n" for job in jobstats])
return "".join([f"{job.job_id}^{job.job_state}^-\n" for job in jobstats])


def read(path: Path, default: Optional[str] = None) -> Optional[str]:
Expand Down
66 changes: 55 additions & 11 deletions tests/ert/unit_tests/scheduler/test_lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
filter_job_ids_on_submission_time,
parse_bhist,
parse_bjobs,
parse_bjobs_exec_hosts,
)
from tests.ert.utils import poll, wait_until

Expand Down Expand Up @@ -428,13 +429,13 @@ def test_parse_bjobs_gives_empty_result_on_random_input(some_text):
"bjobs_output, expected",
[
pytest.param(
"1^RUN",
"1^RUN^-",
{"1": "RUN"},
id="basic",
),
pytest.param("1^DONE", {"1": "DONE"}, id="done"),
pytest.param("1^DONE^-", {"1": "DONE"}, id="done"),
pytest.param(
"1^DONE\n2^RUN",
"1^DONE^-\n2^RUN^-",
{"1": "DONE", "2": "RUN"},
id="two_jobs",
),
Expand All @@ -444,13 +445,42 @@ def test_parse_bjobs_happy_path(bjobs_output, expected):
assert parse_bjobs(bjobs_output) == expected


@pytest.mark.parametrize(
"bjobs_output, expected",
[
pytest.param(
"1^RUN^abc-comp01",
{"1": "abc-comp01"},
id="one_host",
),
pytest.param(
"1^DONE^abc-comp02\n2^RUN^-",
{"1": "abc-comp02", "2": "-"},
id="two_hosts_output",
),
],
)
def test_parse_bjobs_exec_hosts_happy_path(bjobs_output, expected):
assert parse_bjobs_exec_hosts(bjobs_output) == expected


@given(
st.integers(min_value=1),
nonempty_string_without_whitespace(),
st.from_type(JobState),
)
def test_parse_bjobs(job_id, username, job_state):
assert parse_bjobs(f"{job_id}^{job_state}") == {str(job_id): job_state}
def test_parse_bjobs(job_id, job_state):
assert parse_bjobs(f"{job_id}^{job_state}^-") == {str(job_id): job_state}


@given(
st.integers(min_value=1),
st.from_type(JobState),
nonempty_string_without_whitespace(),
)
def test_parse_bjobs_exec_host(job_id, job_state, exec_host):
assert parse_bjobs_exec_hosts(f"{job_id}^{job_state}^{exec_host}") == {
str(job_id): exec_host
}


@given(nonempty_string_without_whitespace().filter(lambda x: x not in valid_jobstates))
Expand All @@ -460,15 +490,15 @@ def test_parse_bjobs_invalid_state_is_ignored(random_state):

def test_parse_bjobs_invalid_state_is_logged(caplog):
# (cannot combine caplog with hypothesis)
parse_bjobs("1^FOO")
parse_bjobs("1^FOO^-")
assert "Unknown state FOO" in caplog.text


@pytest.mark.parametrize(
"bjobs_script, expectation",
[
pytest.param(
"echo '1^DONE'; exit 0",
"echo '1^DONE^-'; exit 0",
does_not_raise(),
id="all-good",
),
Expand All @@ -484,13 +514,13 @@ def test_parse_bjobs_invalid_state_is_logged(caplog):
id="empty_cluster_specific_id",
),
pytest.param(
"echo '1^DONE'; echo 'Job <2> is not found' >&2 ; exit 255",
"echo '1^DONE^-'; echo 'Job <2> is not found' >&2 ; exit 255",
# If we have some success and some failures, actual command returns 255
does_not_raise(),
id="error_for_irrelevant_job_id",
),
pytest.param(
"echo '2^DONE'",
"echo '2^DONE^-'",
pytest.raises(asyncio.TimeoutError),
id="wrong-job-id",
),
Expand All @@ -500,7 +530,7 @@ def test_parse_bjobs_invalid_state_is_logged(caplog):
id="exit-1",
),
pytest.param(
"echo '1^DONE'; exit 1",
"echo '1^DONE^-'; exit 1",
# (this is not observed in reality)
does_not_raise(),
id="correct_output_but_exitcode_1",
Expand Down Expand Up @@ -979,6 +1009,20 @@ def not_found_bjobs(monkeypatch, tmp_path):
bjobs_path.chmod(bjobs_path.stat().st_mode | stat.S_IEXEC)


async def test_bjobs_exec_host_logs_only_once(tmp_path, job_name, caplog):
caplog.set_level(logging.INFO)
os.chdir(tmp_path)
driver = LsfDriver()
await driver.submit(0, "sh", "-c", "sleep 1", name=job_name)

job_id = next(iter(driver._jobs.keys()))
driver.update_and_log_exec_hosts({job_id: "COMP-01"})
driver.update_and_log_exec_hosts({job_id: "COMP-02"})

await poll(driver, {0})
assert caplog.text.count("was assigned to host:") == 1


async def test_lsf_stdout_file(tmp_path, job_name):
os.chdir(tmp_path)
driver = LsfDriver()
Expand Down