From eebc222395625370b01cf40e8a3006925feabc56 Mon Sep 17 00:00:00 2001 From: Jonathan Karlsen Date: Wed, 22 Nov 2023 13:09:24 +0100 Subject: [PATCH] Add exception handling python lsf driver --- src/ert/job_queue/driver.py | 178 +++++++++++++++++++++++++++++------- 1 file changed, 144 insertions(+), 34 deletions(-) diff --git a/src/ert/job_queue/driver.py b/src/ert/job_queue/driver.py index 858d04a90ef..5e7c6ecaa2a 100644 --- a/src/ert/job_queue/driver.py +++ b/src/ert/job_queue/driver.py @@ -1,10 +1,18 @@ import asyncio import logging import os +import re import shlex -import shutil from abc import ABC, abstractmethod -from typing import TYPE_CHECKING, Dict, List, Optional, Tuple +from typing import ( + TYPE_CHECKING, + Awaitable, + Callable, + Dict, + List, + Optional, + Tuple, +) from ert.config.parsing.queue_system import QueueSystem @@ -117,64 +125,143 @@ async def kill(self, realization: "RealizationState") -> None: class LSFDriver(Driver): + LSF_STATUSES = [ + "PEND", + "SSUSP", + "PSUSP", + "USUSP", + "RUN", + "EXIT", + "ZOMBI", + "DONE", + "PDONE", + "UNKWN", + ] + def __init__(self, queue_options: Optional[List[Tuple[str, str]]]): super().__init__(queue_options) self._realstate_to_lsfid: Dict["RealizationState", str] = {} self._lsfid_to_realstate: Dict[str, "RealizationState"] = {} + self._max_attempt = 100 + self._statuses = {} self._submit_processes: Dict[ "RealizationState", "asyncio.subprocess.Process" ] = {} - + self._retry_sleep_period = 3 self._currently_polling = False + async def run_with_retries( + self, func: Callable[[None], Awaitable], error_msg: str = "" + ): + current_attempt = 0 + while current_attempt < self._max_attempt: + current_attempt += 1 + try: + function_output = await func() + if function_output: + return function_output + await asyncio.sleep(self._retry_sleep_period) + except asyncio.CancelledError as e: + logger.error(e) + await asyncio.sleep(self._retry_sleep_period) + raise RuntimeError(error_msg) + async def submit(self, realization: "RealizationState") -> None: - submit_cmd: List[str] = [ - "bsub", + submit_cmd = self.parse_submit_cmd( "-J", f"poly_{realization.realization.run_arg.iens}", str(realization.realization.job_script), str(realization.realization.run_arg.runpath), + ) + await self.run_with_retries( + lambda: self._submit(submit_cmd, realization=realization), + error_msg="Maximum number of submit errors exceeded\n", + ) + + async def _submit( + self, submit_command: list[str], realization: "RealizationState" + ) -> bool: + result = await self.run_shell_command(submit_command, command_name="bsub") + if not result: + return False + + (process, output, error) = result + self._submit_processes[realization] = process + lsf_id_match = re.match( + "Job <\\d+> is submitted to \\w+ queue <\\w+>\\.", output.decode() + ) + if lsf_id_match is None: + logger.error(f"Could not parse lsf id from: {output.decode()}") + return False + lsf_id = lsf_id_match.group(0) + self._realstate_to_lsfid[realization] = lsf_id + self._lsfid_to_realstate[lsf_id] = realization + realization.accept() + logger.info(f"Submitted job {realization} and got LSF JOBID {lsf_id}") + return True + + def parse_submit_cmd(self, *additional_parameters) -> List[str]: + submit_cmd = [ + self.get_option("BSUB_CMD") if self.has_option("BSUB_CMD") else "bsub" ] + if self.has_option("LSF_QUEUE"): + submit_cmd += ["-q", self.get_option("LSF_QUEUE")] + + return submit_cmd + list(additional_parameters) + + async def run_shell_command( + self, command_to_run: list[str], command_name="" + ) -> (asyncio.subprocess.Process, str, str): process = await asyncio.create_subprocess_exec( - *submit_cmd, + *command_to_run, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) - self._submit_processes[realization] = process - - # Wait for submit process to finish: - output, error = await process.communicate() - print(output) # FLAKY ALERT, we seem to get empty - print(error) - - try: - lsf_id = str(output).split(" ")[1].replace("<", "").replace(">", "") - self._realstate_to_lsfid[realization] = lsf_id - self._lsfid_to_realstate[lsf_id] = realization - realization.accept() - print(f"Submitted job {realization} and got LSF JOBID {lsf_id}") - except Exception: - # We should probably retry the submission, bsub stdout seems flaky. - print(f"ERROR: Could not parse lsf id from: {output!r}") + output, _error = await process.communicate() + if process.returncode != 0: + logger.error( + ( + f"{command_name} returned non-zero exitcode: {process.returncode}\n" + f"{output.decode()}\n" + f"{_error.decode()}" + ) + ) + return None + return (process, output, _error) async def poll_statuses(self) -> None: if self._currently_polling: + logger.debug("Already polling status elsewhere") return - self._currently_polling = True if not self._realstate_to_lsfid: # Nothing has been submitted yet. + logger.warning("Skipped polling due to no jobs submitted") return - poll_cmd = ["bjobs"] + list(self._realstate_to_lsfid.values()) - assert shutil.which(poll_cmd[0]) # does not propagate back.. - process = await asyncio.create_subprocess_exec( - *poll_cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - output, _error = await process.communicate() + poll_cmd = [ + str(self.get_option("BJOBS_CMD")) + if self.has_option("BJOBS_CMD") + else "bjobs" + ] + list(self._realstate_to_lsfid.values()) + try: + await self.run_with_retries(lambda: self._poll_statuses(poll_cmd)) + # suppress runtime error + except RuntimeError: + return + except ValueError as e: + # raise this value error as runtime error + raise RuntimeError from e + + async def _poll_statuses(self, poll_cmd: str) -> bool: + self._currently_polling = True + result = await self.run_shell_command(poll_cmd, command_name="bjobs") + + if result is None: + return False + (_, output, _) = result + for line in output.decode(encoding="utf-8").split("\n"): if "JOBID" in line: continue @@ -185,7 +272,8 @@ async def poll_statuses(self) -> None: continue if tokens[0] not in self._lsfid_to_realstate: # A LSF id we know nothing of, this should not happen. - continue + raise ValueError(f"Found unknown job id ({tokens[0]})") + realstate = self._lsfid_to_realstate[tokens[0]] if tokens[2] == "PEND" and str(realstate.current_state.id) == "WAITING": @@ -207,9 +295,31 @@ async def poll_statuses(self) -> None: realstate.runend() if tokens[2] == "DONE" and str(realstate.current_state.id) == "RUNNING": realstate.runend() + if tokens[2] not in LSFDriver.LSF_STATUSES: + raise ValueError( + f"The lsf_status {tokens[2]} for job {tokens[0]} was not recognized\n" + ) self._currently_polling = False + return True async def kill(self, realization: "RealizationState") -> None: - print(f"would like to kill {realization}") - pass + lsf_job_id = self._realstate_to_lsfid[realization] + logger.debug(f"Attempting to kill {lsf_job_id=}") + kill_cmd = [ + self.get_option("BKILL_CMD") if self.has_option("BKILL_CMD") else "bkill", + lsf_job_id, + ] + await self.run_with_retries( + lambda: self._kill(kill_cmd, realization, lsf_job_id), + error_msg="Maximum number of kill errors exceeded\n", + ) + + async def _kill( + self, kill_cmd, realization: "RealizationState", lsf_job_id: int + ) -> bool: + result = await self.run_shell_command(kill_cmd, "bkill") + if result is None: + return False + realization.verify_kill() + logger.info(f"Successfully killed job {lsf_job_id}")