Skip to content

Commit

Permalink
Fix some typing
Browse files Browse the repository at this point in the history
  • Loading branch information
berland committed Nov 27, 2023
1 parent 7565ab1 commit 3d12038
Showing 1 changed file with 19 additions and 16 deletions.
35 changes: 19 additions & 16 deletions src/ert/job_queue/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from abc import ABC, abstractmethod
from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Callable,
Dict,
Expand Down Expand Up @@ -143,17 +144,16 @@ def __init__(self, queue_options: Optional[List[Tuple[str, str]]]):

self._realstate_to_lsfid: Dict["RealizationState", str] = {}
self._lsfid_to_realstate: Dict[str, "RealizationState"] = {}
self._max_attempt = 100
self._statuses = {}
self._max_attempt: int = 100
self._submit_processes: Dict[
"RealizationState", "asyncio.subprocess.Process"
] = {}
self._retry_sleep_period = 3
self._currently_polling = False

async def run_with_retries(
self, func: Callable[[None], Awaitable], error_msg: str = ""
):
self, func: Callable[[Any], Awaitable[Any]], error_msg: str = ""
) -> None:
current_attempt = 0
while current_attempt < self._max_attempt:
current_attempt += 1
Expand All @@ -168,19 +168,21 @@ async def run_with_retries(
raise RuntimeError(error_msg)

async def submit(self, realization: "RealizationState") -> None:
submit_cmd = self.parse_submit_cmd(
"-J",
f"poly_{realization.realization.run_arg.iens}",
str(realization.realization.job_script),
str(realization.realization.run_arg.runpath),
submit_cmd = self.build_submit_cmd(
[
"-J",
f"poly_{realization.realization.run_arg.iens}",
str(realization.realization.job_script),
str(realization.realization.run_arg.runpath),
]
)
await self.run_with_retries(
lambda: self._submit(submit_cmd, realization=realization),
error_msg="Maximum number of submit errors exceeded\n",
)

async def _submit(
self, submit_command: list[str], realization: "RealizationState"
self, submit_command: List[str], realization: "RealizationState"
) -> bool:
result = await self.run_shell_command(submit_command, command_name="bsub")
if not result:
Expand All @@ -201,18 +203,18 @@ async def _submit(
logger.info(f"Submitted job {realization} and got LSF JOBID {lsf_id}")
return True

def parse_submit_cmd(self, *additional_parameters) -> List[str]:
def build_submit_cmd(self, args: List[str]) -> List[str]:
submit_cmd = [
self.get_option("BSUB_CMD") if self.has_option("BSUB_CMD") else "bsub"
]
if self.has_option("LSF_QUEUE"):
submit_cmd += ["-q", self.get_option("LSF_QUEUE")]

return submit_cmd + list(additional_parameters)
return submit_cmd + args

async def run_shell_command(
self, command_to_run: list[str], command_name=""
) -> (asyncio.subprocess.Process, str, str):
self, command_to_run: List[str], command_name: str=""
) -> Optional[Tuple[asyncio.subprocess.Process, bytes, bytes]]:
process = await asyncio.create_subprocess_exec(
*command_to_run,
stdout=asyncio.subprocess.PIPE,
Expand Down Expand Up @@ -254,7 +256,7 @@ async def poll_statuses(self) -> None:
# raise this value error as runtime error
raise RuntimeError from e

async def _poll_statuses(self, poll_cmd: str) -> bool:
async def _poll_statuses(self, poll_cmd: List[str]) -> bool:
self._currently_polling = True
result = await self.run_shell_command(poll_cmd, command_name="bjobs")

Expand Down Expand Up @@ -316,10 +318,11 @@ async def kill(self, realization: "RealizationState") -> None:
)

async def _kill(
self, kill_cmd, realization: "RealizationState", lsf_job_id: int
self, kill_cmd: List[str], realization: "RealizationState", lsf_job_id: str
) -> bool:
result = await self.run_shell_command(kill_cmd, "bkill")
if result is None:
return False
realization.verify_kill()
logger.info(f"Successfully killed job {lsf_job_id}")
return True

0 comments on commit 3d12038

Please sign in to comment.