Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make python lsf driver handle crashes and add lsf queue option #6662

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 151 additions & 38 deletions src/ert/job_queue/driver.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
import asyncio
import logging
import os
import re
import shlex
import shutil
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Callable,
Dict,
List,
Optional,
Tuple,
)

from ert.config.parsing.queue_system import QueueSystem

Expand Down Expand Up @@ -117,64 +126,144 @@


class LSFDriver(Driver):
LSF_STATUSES = [
"PEND",
"SSUSP",
"PSUSP",
"USUSP",
"RUN",
"EXIT",
"ZOMBI",
"DONE",
"PDONE",
"UNKWN",
]

def __init__(self, queue_options: Optional[List[Tuple[str, str]]]):
super().__init__(queue_options)

self._realstate_to_lsfid: Dict["RealizationState", str] = {}
self._lsfid_to_realstate: Dict[str, "RealizationState"] = {}
self._max_attempt: int = 100
self._submit_processes: Dict[
"RealizationState", "asyncio.subprocess.Process"
] = {}

self._retry_sleep_period = 3
self._currently_polling = False

async def run_with_retries(
self, func: Callable[[Any], Awaitable[Any]], error_msg: str = ""
) -> None:
current_attempt = 0
while current_attempt < self._max_attempt:
current_attempt += 1
try:
function_output = await func()

Check failure on line 161 in src/ert/job_queue/driver.py

View workflow job for this annotation

GitHub Actions / type-checking (3.11)

Too few arguments
if function_output:
return function_output
await asyncio.sleep(self._retry_sleep_period)
except asyncio.CancelledError as e:
logger.error(e)
await asyncio.sleep(self._retry_sleep_period)
raise RuntimeError(error_msg)

async def submit(self, realization: "RealizationState") -> None:
submit_cmd: List[str] = [
"bsub",
"-J",
f"poly_{realization.realization.run_arg.iens}",
str(realization.realization.job_script),
str(realization.realization.run_arg.runpath),
submit_cmd = self.build_submit_cmd(
[
"-J",
f"poly_{realization.realization.run_arg.iens}",
str(realization.realization.job_script),
str(realization.realization.run_arg.runpath),
]
)
await self.run_with_retries(
lambda: self._submit(submit_cmd, realization=realization),

Check failure on line 180 in src/ert/job_queue/driver.py

View workflow job for this annotation

GitHub Actions / type-checking (3.11)

Cannot infer type of lambda

Check failure on line 180 in src/ert/job_queue/driver.py

View workflow job for this annotation

GitHub Actions / type-checking (3.11)

Argument 1 to "run_with_retries" of "LSFDriver" has incompatible type "Callable[[], Coroutine[Any, Any, bool]]"; expected "Callable[[Any], Awaitable[Any]]"
error_msg="Maximum number of submit errors exceeded\n",
)

async def _submit(
self, submit_command: List[str], realization: "RealizationState"
) -> bool:
result = await self.run_shell_command(submit_command, command_name="bsub")
if not result:
return False

(process, output, error) = result
self._submit_processes[realization] = process
lsf_id_match = re.match(
"Job <\\d+> is submitted to \\w+ queue <\\w+>\\.", output.decode()
)
if lsf_id_match is None:
logger.error(f"Could not parse lsf id from: {output.decode()}")
return False
lsf_id = lsf_id_match.group(0)
self._realstate_to_lsfid[realization] = lsf_id
self._lsfid_to_realstate[lsf_id] = realization
realization.accept()
logger.info(f"Submitted job {realization} and got LSF JOBID {lsf_id}")
return True

def build_submit_cmd(self, args: List[str]) -> List[str]:
submit_cmd = [
self.get_option("BSUB_CMD") if self.has_option("BSUB_CMD") else "bsub"
]
if self.has_option("LSF_QUEUE"):
submit_cmd += ["-q", self.get_option("LSF_QUEUE")]

return submit_cmd + args

async def run_shell_command(
self, command_to_run: List[str], command_name: str=""
) -> Optional[Tuple[asyncio.subprocess.Process, bytes, bytes]]:
process = await asyncio.create_subprocess_exec(
*submit_cmd,
*command_to_run,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
self._submit_processes[realization] = process

# Wait for submit process to finish:
output, error = await process.communicate()
print(output) # FLAKY ALERT, we seem to get empty
print(error)

try:
lsf_id = str(output).split(" ")[1].replace("<", "").replace(">", "")
self._realstate_to_lsfid[realization] = lsf_id
self._lsfid_to_realstate[lsf_id] = realization
realization.accept()
print(f"Submitted job {realization} and got LSF JOBID {lsf_id}")
except Exception:
# We should probably retry the submission, bsub stdout seems flaky.
print(f"ERROR: Could not parse lsf id from: {output!r}")
output, _error = await process.communicate()
if process.returncode != 0:
logger.error(
(
f"{command_name} returned non-zero exitcode: {process.returncode}\n"
f"{output.decode()}\n"
f"{_error.decode()}"
)
)
return None
return (process, output, _error)

async def poll_statuses(self) -> None:
if self._currently_polling:
logger.debug("Already polling status elsewhere")
return
self._currently_polling = True

if not self._realstate_to_lsfid:
# Nothing has been submitted yet.
logger.warning("Skipped polling due to no jobs submitted")
return

poll_cmd = ["bjobs"] + list(self._realstate_to_lsfid.values())
assert shutil.which(poll_cmd[0]) # does not propagate back..
process = await asyncio.create_subprocess_exec(
*poll_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
output, _error = await process.communicate()
poll_cmd = [
str(self.get_option("BJOBS_CMD"))
if self.has_option("BJOBS_CMD")
else "bjobs"
] + list(self._realstate_to_lsfid.values())
try:
await self.run_with_retries(lambda: self._poll_statuses(poll_cmd))

Check failure on line 251 in src/ert/job_queue/driver.py

View workflow job for this annotation

GitHub Actions / type-checking (3.11)

Cannot infer type of lambda

Check failure on line 251 in src/ert/job_queue/driver.py

View workflow job for this annotation

GitHub Actions / type-checking (3.11)

Argument 1 to "run_with_retries" of "LSFDriver" has incompatible type "Callable[[], Coroutine[Any, Any, bool]]"; expected "Callable[[Any], Awaitable[Any]]"
# suppress runtime error
except RuntimeError:
return
except ValueError as e:
# raise this value error as runtime error
raise RuntimeError from e

async def _poll_statuses(self, poll_cmd: List[str]) -> bool:
self._currently_polling = True
result = await self.run_shell_command(poll_cmd, command_name="bjobs")

if result is None:
return False
(_, output, _) = result

for line in output.decode(encoding="utf-8").split("\n"):
if "JOBID" in line:
continue
Expand All @@ -185,7 +274,8 @@
continue
if tokens[0] not in self._lsfid_to_realstate:
# A LSF id we know nothing of, this should not happen.
continue
raise ValueError(f"Found unknown job id ({tokens[0]})")

realstate = self._lsfid_to_realstate[tokens[0]]

if tokens[2] == "PEND" and str(realstate.current_state.id) == "WAITING":
Expand All @@ -207,9 +297,32 @@
realstate.runend()
if tokens[2] == "DONE" and str(realstate.current_state.id) == "RUNNING":
realstate.runend()
if tokens[2] not in LSFDriver.LSF_STATUSES:
raise ValueError(
f"The lsf_status {tokens[2]} for job {tokens[0]} was not recognized\n"
)

self._currently_polling = False
return True

async def kill(self, realization: "RealizationState") -> None:
print(f"would like to kill {realization}")
pass
lsf_job_id = self._realstate_to_lsfid[realization]
logger.debug(f"Attempting to kill {lsf_job_id=}")
kill_cmd = [
self.get_option("BKILL_CMD") if self.has_option("BKILL_CMD") else "bkill",
lsf_job_id,
]
await self.run_with_retries(
lambda: self._kill(kill_cmd, realization, lsf_job_id),

Check failure on line 316 in src/ert/job_queue/driver.py

View workflow job for this annotation

GitHub Actions / type-checking (3.11)

Cannot infer type of lambda
error_msg="Maximum number of kill errors exceeded\n",
)

async def _kill(
self, kill_cmd: List[str], realization: "RealizationState", lsf_job_id: str
) -> bool:
result = await self.run_shell_command(kill_cmd, "bkill")
if result is None:
return False
realization.verify_kill()
logger.info(f"Successfully killed job {lsf_job_id}")
return True
Loading