Skip to content

Commit

Permalink
Add exception handling python lsf driver
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-eq committed Nov 24, 2023
1 parent 67346fb commit a69b87b
Show file tree
Hide file tree
Showing 2 changed files with 602 additions and 145 deletions.
183 changes: 149 additions & 34 deletions src/ert/job_queue/driver.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
import asyncio
import logging
import os
import re
import shlex
import shutil
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
from typing import (
TYPE_CHECKING,
Awaitable,
Callable,
Dict,
List,
Optional,
Tuple,
)

from ert.config.parsing.queue_system import QueueSystem

Expand All @@ -12,6 +21,9 @@
from ert.job_queue import RealizationState


logger = logging.getLogger(__name__)


class Driver(ABC):
def __init__(
self,
Expand Down Expand Up @@ -104,64 +116,144 @@ async def kill(self, realization: "RealizationState") -> None:


class LSFDriver(Driver):
LSF_STATUSES = [
"PEND",
"SSUSP",
"PSUSP",
"USUSP",
"RUN",
"EXIT",
"ZOMBI",
"DONE",
"PDONE",
"UNKWN",
]

def __init__(self, queue_options: Optional[List[Tuple[str, str]]]):
super().__init__(queue_options)

self._realstate_to_lsfid: Dict["RealizationState", str] = {}
self._lsfid_to_realstate: Dict[str, "RealizationState"] = {}
self._max_attempt = 100
self._MAX_ERROR_COUNT = 100
self._statuses = {}
self._submit_processes: Dict[
"RealizationState", "asyncio.subprocess.Process"
] = {}

self._retry_sleep_period = 3
self._currently_polling = False

async def run_with_retries(
self, func: Callable[[None], Awaitable], error_msg: str = ""
):
current_attempt = 0
while current_attempt < self._max_attempt:
current_attempt += 1
try:
function_output = await func()
if function_output:
return function_output
await asyncio.sleep(self._retry_sleep_period)
except asyncio.CancelledError as e:
logger.error(e)
await asyncio.sleep(self._retry_sleep_period)
raise RuntimeError(error_msg)

async def submit(self, realization: "RealizationState") -> None:
submit_cmd: List[str] = [
"bsub",
submit_cmd = self.parse_submit_cmd(
"-J",
f"poly_{realization.realization.run_arg.iens}",
str(realization.realization.job_script),
str(realization.realization.run_arg.runpath),
)
await self.run_with_retries(
lambda: self._submit(submit_cmd, realization=realization),
error_msg="Maximum number of submit errors exceeded\n",
)

async def _submit(
self, submit_command: list[str], realization: "RealizationState"
) -> bool:
result = await self.run_shell_command(submit_command, command_name="bsub")
if not result:
return False

(process, output, error) = result
self._submit_processes[realization] = process
lsf_id_match = re.match(
"Job <\\d+> is submitted to \\w+ queue <\\w+>\\.", output.decode()
)
if lsf_id_match is None:
logger.error(f"Could not parse lsf id from: {output.decode()}")
return False
lsf_id = lsf_id_match.group(0)
self._realstate_to_lsfid[realization] = lsf_id
self._lsfid_to_realstate[lsf_id] = realization
realization.accept()
logger.info(f"Submitted job {realization} and got LSF JOBID {lsf_id}")
return True

def parse_submit_cmd(self, *additional_parameters) -> List[str]:
submit_cmd = [
self.get_option("BSUB_CMD") if self.has_option("BSUB_CMD") else "bsub"
]
if self.has_option("LSF_QUEUE"):
submit_cmd += ["-q", self.get_option("LSF_QUEUE")]

return submit_cmd + list(additional_parameters)

async def run_shell_command(
self, command_to_run: list[str], command_name=""
) -> (asyncio.subprocess.Process, str, str):
process = await asyncio.create_subprocess_exec(
*submit_cmd,
*command_to_run,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
self._submit_processes[realization] = process

# Wait for submit process to finish:
output, error = await process.communicate()
print(output) # FLAKY ALERT, we seem to get empty
print(error)

try:
lsf_id = str(output).split(" ")[1].replace("<", "").replace(">", "")
self._realstate_to_lsfid[realization] = lsf_id
self._lsfid_to_realstate[lsf_id] = realization
realization.accept()
print(f"Submitted job {realization} and got LSF JOBID {lsf_id}")
except Exception:
# We should probably retry the submission, bsub stdout seems flaky.
print(f"ERROR: Could not parse lsf id from: {output!r}")
output, _error = await process.communicate()
if process.returncode != 0:
logger.error(
(
f"{command_name} returned non-zero exitcode: {process.returncode}\n"
f"{output.decode()}\n"
f"{_error.decode()}"
)
)
return None
return (process, output, _error)

async def poll_statuses(self) -> None:
if self._currently_polling:
logger.debug("Already polling status elsewhere")
return
self._currently_polling = True

if not self._realstate_to_lsfid:
# Nothing has been submitted yet.
logger.warning("Skipped polling due to no jobs submitted")
return

poll_cmd = ["bjobs"] + list(self._realstate_to_lsfid.values())
assert shutil.which(poll_cmd[0]) # does not propagate back..
process = await asyncio.create_subprocess_exec(
*poll_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
output, _error = await process.communicate()
poll_cmd = [
str(self.get_option("BJOBS_CMD"))
if self.has_option("BJOBS_CMD")
else "bjobs"
] + list(self._realstate_to_lsfid.values())
try:
await self.run_with_retries(lambda: self._poll_statuses(poll_cmd))
# suppress runtime error
except RuntimeError:
return
except ValueError as e:
# raise this value error as runtime error
raise RuntimeError(e)

async def _poll_statuses(self, poll_cmd: str) -> bool:
self._currently_polling = True
result = await self.run_shell_command(poll_cmd, command_name="bjobs")

if result is None:
return False
(_, output, _) = result

for line in output.decode(encoding="utf-8").split("\n"):
if "JOBID" in line:
continue
Expand All @@ -172,7 +264,8 @@ async def poll_statuses(self) -> None:
continue
if tokens[0] not in self._lsfid_to_realstate:
# A LSF id we know nothing of, this should not happen.
continue
raise ValueError(f"Found unknown job id ({tokens[0]})")

realstate = self._lsfid_to_realstate[tokens[0]]

if tokens[2] == "PEND" and str(realstate.current_state.id) == "WAITING":
Expand All @@ -194,9 +287,31 @@ async def poll_statuses(self) -> None:
realstate.runend()
if tokens[2] == "DONE" and str(realstate.current_state.id) == "RUNNING":
realstate.runend()
if tokens[2] not in LSFDriver.LSF_STATUSES:
raise ValueError(
f"The lsf_status {tokens[2]} for job {tokens[0]} was not recognized\n"
)

self._currently_polling = False
return True

async def kill(self, realization: "RealizationState") -> None:
print(f"would like to kill {realization}")
pass
lsf_job_id = self._realstate_to_lsfid[realization]
logger.debug(f"Attempting to kill {lsf_job_id=}")
kill_cmd = [
self.get_option("BKILL_CMD") if self.has_option("BKILL_CMD") else "bkill",
lsf_job_id,
]
await self.run_with_retries(
lambda: self._kill(kill_cmd, realization, lsf_job_id),
error_msg="Maximum number of kill errors exceeded\n",
)

async def _kill(
self, kill_cmd, realization: "RealizationState", lsf_job_id: int
) -> bool:
result = await self.run_shell_command(kill_cmd, "bkill")
if result is None:
return False
realization.verify_kill()
logger.info(f"Successfully killed job {lsf_job_id}")
Loading

0 comments on commit a69b87b

Please sign in to comment.