Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log LSF execution host to Azure #8826

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions src/ert/scheduler/lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,15 @@ class JobData:
iens: int
job_state: AnyJob
submitted_timestamp: float
exec_hosts: str = "-"


def parse_bjobs(bjobs_output: str) -> Dict[str, JobState]:
data: Dict[str, JobState] = {}
for line in bjobs_output.splitlines():
tokens = line.split(sep="^")
if len(tokens) == 2:
job_id, job_state = tokens
if len(tokens) == 3:
job_id, job_state, _ = tokens
if job_state not in get_args(JobState):
logger.error(
f"Unknown state {job_state} obtained from "
Expand All @@ -127,6 +128,17 @@ def parse_bjobs(bjobs_output: str) -> Dict[str, JobState]:
return data


def parse_bjobs_exec_hosts(bjobs_output: str) -> Dict[str, str]:
data: Dict[str, str] = {}
for line in bjobs_output.splitlines():
tokens = line.split(sep="^")
if len(tokens) == 3:
job_id, _, exec_hosts = tokens
if exec_hosts != "-":
data[job_id] = exec_hosts
return data


def build_resource_requirement_string(
exclude_hosts: Sequence[str],
realization_memory: int,
Expand Down Expand Up @@ -423,7 +435,7 @@ async def poll(self) -> None:
str(self._bjobs_cmd),
"-noheader",
"-o",
"jobid stat delimiter='^'",
"jobid stat exec_host delimiter='^'",
*current_jobids,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
Expand All @@ -440,6 +452,14 @@ async def poll(self) -> None:
f"bjobs gave returncode {process.returncode} and error {stderr.decode()}"
)
bjobs_states = _parse_jobs_dict(parse_bjobs(stdout.decode(errors="ignore")))
bjobs_exec_hosts = parse_bjobs_exec_hosts(stdout.decode(errors="ignore"))

for jobid, exec_hosts in bjobs_exec_hosts.items():
if self._jobs[jobid].exec_hosts == "-":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition is never true in the tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rewrote this to get better coverage. Also, this is the default value for exec_hosts, and the value bjobs will yield when no node is yet assigned for the job.
A re-run of codecov would have been nice.

logger.info(
f"Realization {self._jobs[jobid].iens} was executed on host: {exec_hosts}"
andreas-el marked this conversation as resolved.
Show resolved Hide resolved
)
self._jobs[jobid].exec_hosts = exec_hosts

job_ids_found_in_bjobs_output = set(bjobs_states.keys())
if (
Expand Down Expand Up @@ -491,7 +511,6 @@ async def _process_job_update(self, job_id: str, new_state: AnyJob) -> None:
logger.info(f"Realization {iens} (LSF-id: {self._iens2jobid[iens]}) failed")
exit_code = await self._get_exit_code(job_id)
event = FinishedEvent(iens=iens, returncode=exit_code)

elif isinstance(new_state, FinishedJobSuccess):
logger.info(
f"Realization {iens} (LSF-id: {self._iens2jobid[iens]}) succeeded"
Expand Down
2 changes: 1 addition & 1 deletion tests/ert/unit_tests/scheduler/bin/bjobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def get_parser() -> argparse.ArgumentParser:


def bjobs_formatter(jobstats: List[Job]) -> str:
return "".join([f"{job.job_id}^{job.job_state}\n" for job in jobstats])
return "".join([f"{job.job_id}^{job.job_state}^-\n" for job in jobstats])


def read(path: Path, default: Optional[str] = None) -> Optional[str]:
Expand Down
53 changes: 42 additions & 11 deletions tests/ert/unit_tests/scheduler/test_lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
filter_job_ids_on_submission_time,
parse_bhist,
parse_bjobs,
parse_bjobs_exec_hosts,
)
from tests.ert.utils import poll, wait_until

Expand Down Expand Up @@ -428,13 +429,13 @@ def test_parse_bjobs_gives_empty_result_on_random_input(some_text):
"bjobs_output, expected",
[
pytest.param(
"1^RUN",
"1^RUN^-",
{"1": "RUN"},
id="basic",
),
pytest.param("1^DONE", {"1": "DONE"}, id="done"),
pytest.param("1^DONE^-", {"1": "DONE"}, id="done"),
pytest.param(
"1^DONE\n2^RUN",
"1^DONE^-\n2^RUN^-",
{"1": "DONE", "2": "RUN"},
id="two_jobs",
),
Expand All @@ -444,13 +445,43 @@ def test_parse_bjobs_happy_path(bjobs_output, expected):
assert parse_bjobs(bjobs_output) == expected


@pytest.mark.parametrize(
"bjobs_output, expected",
[
pytest.param(
"1^RUN^st-vgrid01",
andreas-el marked this conversation as resolved.
Show resolved Hide resolved
{"1": "st-vgrid01"},
id="one_host",
),
pytest.param("1^DONE^-", {}, id="no_host"),
pytest.param(
"1^DONE^st-vgrid02\n2^RUN^-",
{"1": "st-vgrid02"},
id="only_one_host_outputs",
),
],
)
def test_parse_bjobs_exec_hosts_happy_path(bjobs_output, expected):
assert parse_bjobs_exec_hosts(bjobs_output) == expected


@given(
st.integers(min_value=1),
st.from_type(JobState),
)
def test_parse_bjobs(job_id, job_state):
assert parse_bjobs(f"{job_id}^{job_state}^-") == {str(job_id): job_state}


@given(
st.integers(min_value=1),
nonempty_string_without_whitespace(),
st.from_type(JobState),
nonempty_string_without_whitespace(),
)
def test_parse_bjobs(job_id, username, job_state):
assert parse_bjobs(f"{job_id}^{job_state}") == {str(job_id): job_state}
def test_parse_bjobs_exec_host(job_id, job_state, exec_host):
assert parse_bjobs_exec_hosts(f"{job_id}^{job_state}^{exec_host}") == {
str(job_id): exec_host
}


@given(nonempty_string_without_whitespace().filter(lambda x: x not in valid_jobstates))
Expand All @@ -460,15 +491,15 @@ def test_parse_bjobs_invalid_state_is_ignored(random_state):

def test_parse_bjobs_invalid_state_is_logged(caplog):
# (cannot combine caplog with hypothesis)
parse_bjobs("1^FOO")
parse_bjobs("1^FOO^-")
assert "Unknown state FOO" in caplog.text


@pytest.mark.parametrize(
"bjobs_script, expectation",
[
pytest.param(
"echo '1^DONE'; exit 0",
"echo '1^DONE^-'; exit 0",
does_not_raise(),
id="all-good",
),
Expand All @@ -484,13 +515,13 @@ def test_parse_bjobs_invalid_state_is_logged(caplog):
id="empty_cluster_specific_id",
),
pytest.param(
"echo '1^DONE'; echo 'Job <2> is not found' >&2 ; exit 255",
"echo '1^DONE^-'; echo 'Job <2> is not found' >&2 ; exit 255",
# If we have some success and some failures, actual command returns 255
does_not_raise(),
id="error_for_irrelevant_job_id",
),
pytest.param(
"echo '2^DONE'",
"echo '2^DONE^-'",
pytest.raises(asyncio.TimeoutError),
id="wrong-job-id",
),
Expand All @@ -500,7 +531,7 @@ def test_parse_bjobs_invalid_state_is_logged(caplog):
id="exit-1",
),
pytest.param(
"echo '1^DONE'; exit 1",
"echo '1^DONE^-'; exit 1",
# (this is not observed in reality)
does_not_raise(),
id="correct_output_but_exitcode_1",
Expand Down