Skip to content

Commit

Permalink
Add exception handling python lsf driver
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-eq authored and berland committed Nov 27, 2023
1 parent e0c871b commit eebc222
Showing 1 changed file with 144 additions and 34 deletions.
178 changes: 144 additions & 34 deletions src/ert/job_queue/driver.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
import asyncio
import logging
import os
import re
import shlex
import shutil
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
from typing import (
TYPE_CHECKING,
Awaitable,
Callable,
Dict,
List,
Optional,
Tuple,
)

from ert.config.parsing.queue_system import QueueSystem

Expand Down Expand Up @@ -117,64 +125,143 @@ async def kill(self, realization: "RealizationState") -> None:


class LSFDriver(Driver):
LSF_STATUSES = [
"PEND",
"SSUSP",
"PSUSP",
"USUSP",
"RUN",
"EXIT",
"ZOMBI",
"DONE",
"PDONE",
"UNKWN",
]

def __init__(self, queue_options: Optional[List[Tuple[str, str]]]):
super().__init__(queue_options)

self._realstate_to_lsfid: Dict["RealizationState", str] = {}
self._lsfid_to_realstate: Dict[str, "RealizationState"] = {}
self._max_attempt = 100
self._statuses = {}

Check failure on line 147 in src/ert/job_queue/driver.py

View workflow job for this annotation

GitHub Actions / type-checking (3.11)

Need type annotation for "_statuses" (hint: "_statuses: Dict[<type>, <type>] = ...")
self._submit_processes: Dict[
"RealizationState", "asyncio.subprocess.Process"
] = {}

self._retry_sleep_period = 3
self._currently_polling = False

async def run_with_retries(

Check failure on line 154 in src/ert/job_queue/driver.py

View workflow job for this annotation

GitHub Actions / type-checking (3.11)

Function is missing a return type annotation
self, func: Callable[[None], Awaitable], error_msg: str = ""

Check failure on line 155 in src/ert/job_queue/driver.py

View workflow job for this annotation

GitHub Actions / type-checking (3.11)

Missing type parameters for generic type "Awaitable"
):
current_attempt = 0
while current_attempt < self._max_attempt:
current_attempt += 1
try:
function_output = await func()

Check failure on line 161 in src/ert/job_queue/driver.py

View workflow job for this annotation

GitHub Actions / type-checking (3.11)

Too few arguments
if function_output:
return function_output
await asyncio.sleep(self._retry_sleep_period)
except asyncio.CancelledError as e:
logger.error(e)
await asyncio.sleep(self._retry_sleep_period)
raise RuntimeError(error_msg)

async def submit(self, realization: "RealizationState") -> None:
submit_cmd: List[str] = [
"bsub",
submit_cmd = self.parse_submit_cmd(
"-J",
f"poly_{realization.realization.run_arg.iens}",
str(realization.realization.job_script),
str(realization.realization.run_arg.runpath),
)
await self.run_with_retries(
lambda: self._submit(submit_cmd, realization=realization),

Check failure on line 178 in src/ert/job_queue/driver.py

View workflow job for this annotation

GitHub Actions / type-checking (3.11)

Cannot infer type of lambda

Check failure on line 178 in src/ert/job_queue/driver.py

View workflow job for this annotation

GitHub Actions / type-checking (3.11)

Argument 1 to "run_with_retries" of "LSFDriver" has incompatible type "Callable[[], Coroutine[Any, Any, bool]]"; expected "Callable[[None], Awaitable[Any]]"
error_msg="Maximum number of submit errors exceeded\n",
)

async def _submit(
self, submit_command: list[str], realization: "RealizationState"
) -> bool:
result = await self.run_shell_command(submit_command, command_name="bsub")
if not result:
return False

(process, output, error) = result
self._submit_processes[realization] = process
lsf_id_match = re.match(
"Job <\\d+> is submitted to \\w+ queue <\\w+>\\.", output.decode()
)
if lsf_id_match is None:
logger.error(f"Could not parse lsf id from: {output.decode()}")
return False
lsf_id = lsf_id_match.group(0)
self._realstate_to_lsfid[realization] = lsf_id
self._lsfid_to_realstate[lsf_id] = realization
realization.accept()
logger.info(f"Submitted job {realization} and got LSF JOBID {lsf_id}")
return True

def parse_submit_cmd(self, *additional_parameters) -> List[str]:
submit_cmd = [
self.get_option("BSUB_CMD") if self.has_option("BSUB_CMD") else "bsub"
]
if self.has_option("LSF_QUEUE"):
submit_cmd += ["-q", self.get_option("LSF_QUEUE")]

return submit_cmd + list(additional_parameters)

async def run_shell_command(
self, command_to_run: list[str], command_name=""
) -> (asyncio.subprocess.Process, str, str):
process = await asyncio.create_subprocess_exec(
*submit_cmd,
*command_to_run,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
self._submit_processes[realization] = process

# Wait for submit process to finish:
output, error = await process.communicate()
print(output) # FLAKY ALERT, we seem to get empty
print(error)

try:
lsf_id = str(output).split(" ")[1].replace("<", "").replace(">", "")
self._realstate_to_lsfid[realization] = lsf_id
self._lsfid_to_realstate[lsf_id] = realization
realization.accept()
print(f"Submitted job {realization} and got LSF JOBID {lsf_id}")
except Exception:
# We should probably retry the submission, bsub stdout seems flaky.
print(f"ERROR: Could not parse lsf id from: {output!r}")
output, _error = await process.communicate()
if process.returncode != 0:
logger.error(
(
f"{command_name} returned non-zero exitcode: {process.returncode}\n"
f"{output.decode()}\n"
f"{_error.decode()}"
)
)
return None
return (process, output, _error)

async def poll_statuses(self) -> None:
if self._currently_polling:
logger.debug("Already polling status elsewhere")
return
self._currently_polling = True

if not self._realstate_to_lsfid:
# Nothing has been submitted yet.
logger.warning("Skipped polling due to no jobs submitted")
return

poll_cmd = ["bjobs"] + list(self._realstate_to_lsfid.values())
assert shutil.which(poll_cmd[0]) # does not propagate back..
process = await asyncio.create_subprocess_exec(
*poll_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
output, _error = await process.communicate()
poll_cmd = [
str(self.get_option("BJOBS_CMD"))
if self.has_option("BJOBS_CMD")
else "bjobs"
] + list(self._realstate_to_lsfid.values())
try:
await self.run_with_retries(lambda: self._poll_statuses(poll_cmd))
# suppress runtime error
except RuntimeError:
return
except ValueError as e:
# raise this value error as runtime error
raise RuntimeError from e

async def _poll_statuses(self, poll_cmd: str) -> bool:
self._currently_polling = True
result = await self.run_shell_command(poll_cmd, command_name="bjobs")

if result is None:
return False
(_, output, _) = result

for line in output.decode(encoding="utf-8").split("\n"):
if "JOBID" in line:
continue
Expand All @@ -185,7 +272,8 @@ async def poll_statuses(self) -> None:
continue
if tokens[0] not in self._lsfid_to_realstate:
# A LSF id we know nothing of, this should not happen.
continue
raise ValueError(f"Found unknown job id ({tokens[0]})")

realstate = self._lsfid_to_realstate[tokens[0]]

if tokens[2] == "PEND" and str(realstate.current_state.id) == "WAITING":
Expand All @@ -207,9 +295,31 @@ async def poll_statuses(self) -> None:
realstate.runend()
if tokens[2] == "DONE" and str(realstate.current_state.id) == "RUNNING":
realstate.runend()
if tokens[2] not in LSFDriver.LSF_STATUSES:
raise ValueError(
f"The lsf_status {tokens[2]} for job {tokens[0]} was not recognized\n"
)

self._currently_polling = False
return True

async def kill(self, realization: "RealizationState") -> None:
print(f"would like to kill {realization}")
pass
lsf_job_id = self._realstate_to_lsfid[realization]
logger.debug(f"Attempting to kill {lsf_job_id=}")
kill_cmd = [
self.get_option("BKILL_CMD") if self.has_option("BKILL_CMD") else "bkill",
lsf_job_id,
]
await self.run_with_retries(
lambda: self._kill(kill_cmd, realization, lsf_job_id),
error_msg="Maximum number of kill errors exceeded\n",
)

async def _kill(
self, kill_cmd, realization: "RealizationState", lsf_job_id: int
) -> bool:
result = await self.run_shell_command(kill_cmd, "bkill")
if result is None:
return False
realization.verify_kill()
logger.info(f"Successfully killed job {lsf_job_id}")

0 comments on commit eebc222

Please sign in to comment.