Skip to content

Commit

Permalink
pydocstyle and pylint updates
Browse files Browse the repository at this point in the history
  • Loading branch information
Paul Nilsson committed Nov 3, 2023
1 parent 26f2553 commit 8f305a7
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 67 deletions.
10 changes: 5 additions & 5 deletions pilot/control/payloads/eventservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ def __init__(self, args: Any, job: Any, out: TextIO, err: TextIO, traces: Any):
"""
Set initial values.
:param args: args object
:param job: job object
:param out: stdout file object
:param err: stderr file object
:param traces: traces object.
:param args: args object (Any)
:param job: job object (Any)
:param out: stdout file object (TextIO)
:param err: stderr file object (TextIO)
:param traces: traces object (Any).
"""
super().__init__(args, job, out, err, traces)

Expand Down
10 changes: 5 additions & 5 deletions pilot/control/payloads/eventservicemerge.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ def __init__(self, args: Any, job: Any, out: TextIO, err: TextIO, traces: Any):
"""
Set initial values.
:param args: args object
:param job: job object
:param out: stdout file object
:param err: stderr file object
:param traces: traces object.
:param args: args object (Any)
:param job: job object (Any)
:param out: stdout file object (TextIO)
:param err: stderr file object (TextIO)
:param traces: traces object (Any).
"""
super().__init__(args, job, out, err, traces)

Expand Down
111 changes: 54 additions & 57 deletions pilot/control/payloads/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,15 @@ class Executor():
"""Executor class for generic payloads."""

def __init__(self, args: Any, job: Any, out: TextIO, err: TextIO, traces: Any):
"""
Set initial values.
:param args: args object (Any)
:param job: job object (Any)
:param out: stdout file object (TextIO)
:param err: stderr file object (TextIO)
:param traces: traces object (Any).
"""
self.__args = args
self.__job = job
self.__out = out # payload stdout file object
Expand All @@ -88,11 +97,11 @@ def get_job(self):
"""
return self.__job

def pre_setup(self, job):
def pre_setup(self, job: Any):
"""
Run pre setup functions.
:param job: job object.
:param job: job object (Any).
"""
# write time stamps to pilot timing file
update_time = time.time()
Expand Down Expand Up @@ -224,9 +233,8 @@ def utility_after_payload_started(self, job: Any):
"""
Run utility functions after payload started.
:param job: job object
:param job: job object (Any).
"""

# get the payload command from the user specific code
pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
user = __import__(f'pilot.user.{pilot_user}.common', globals(), locals(), [pilot_user], 0)
Expand Down Expand Up @@ -351,7 +359,6 @@ def execute_utility_command(self, cmd: str, job: Any, label: str) -> int:
:param label: command label (str)
:return: exit code (int).
"""

exit_code, stdout, stderr = execute(cmd, workdir=job.workdir, cwd=job.workdir, usecontainer=False)
if exit_code:
ignored_exit_codes = [160, 161, 162]
Expand Down Expand Up @@ -387,7 +394,6 @@ def write_utility_output(self, workdir: str, step: str, stdout: str, stderr: str
:param stdout: command stdout (str)
:param stderr: command stderr (str)
"""

# dump to file
try:
name_stdout = step + '_stdout.txt'
Expand Down Expand Up @@ -446,9 +452,9 @@ def run_command(self, cmd: str, label: str = "") -> Any:
Execute the given command and return the process info.
:param cmd: command (str)
:param label: label (str)
:return: subprocess object (Any).
"""

if label:
logger.info(f'\n\n{label}:\n\n{cmd}\n')
if label == 'coprocess':
Expand Down Expand Up @@ -478,11 +484,12 @@ def run_command(self, cmd: str, label: str = "") -> Any:

def run_payload(self, job: Any, cmd: str, out: Any, err: Any) -> Any:
"""
Setup and execute the main payload process.
Set up and execute the main payload process.
REFACTOR using run_command()
:param job: job object
:param job: job object (Any)
:param cmd: command (str)
:param out: (currently not used; deprecated)
:param err: (currently not used; deprecated)
:return: proc (subprocess returned by Popen()).
Expand Down Expand Up @@ -535,7 +542,7 @@ def cut_str_from(_cmd: str, _str: str) -> str:

def cut_str_from_last_semicolon(_cmd: str) -> str:
"""
Cut the string from the last semicolon
Cut the string from the last semicolon.
NOTE: this will not work if jobParams also contain ;
Expand Down Expand Up @@ -572,7 +579,6 @@ def wait_graceful(self, args: Any, proc: Any) -> int:
:param proc: subprocess object (Any)
:return: exit code (int).
"""

breaker = False
exit_code = None
iteration = 0
Expand Down Expand Up @@ -636,10 +642,10 @@ def run_preprocess(self, job: Any):
"""
Run any preprocess payloads.
:param job: job object.
:return:
:param job: job object (Any)
:return: exit code (int)
:raises: Exception.
"""

exit_code = 0

try:
Expand All @@ -651,7 +657,7 @@ def run_preprocess(self, job: Any):

if cmd_before_payload:
cmd_before_payload = job.setup + cmd_before_payload
logger.info("\n\npreprocess execution command:\n\n%s\n", cmd_before_payload)
logger.info(f"\n\npreprocess execution command:\n\n{cmd_before_payload}\n")
exit_code = self.execute_utility_command(cmd_before_payload, job, 'preprocess')
if exit_code == 160:
logger.warning('no more HP points - time to abort processing loop')
Expand All @@ -662,35 +668,34 @@ def run_preprocess(self, job: Any):
elif exit_code:
# set error code
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.PREPROCESSFAILURE)
logger.fatal('cannot continue since preprocess failed: exit_code=%d', exit_code)
logger.fatal(f'cannot continue since preprocess failed: exit_code={exit_code}')
else:
# in case the preprocess produced a command, chmod it
path = os.path.join(job.workdir, job.containeroptions.get('containerExec', 'does_not_exist'))
if os.path.exists(path):
logger.debug('chmod 0o755: %s', path)
os.chmod(path, 0o755)

return exit_code

def should_verify_setup(self):
"""
Should the setup command be verified?
Determine if the setup command should be verified.
:return: Boolean.
:return: should verify setup (bool).
"""

pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
user = __import__('pilot.user.%s.setup' % pilot_user, globals(), locals(), [pilot_user], 0)
user = __import__(f'pilot.user.{pilot_user}.setup', globals(), locals(), [pilot_user], 0)
return user.should_verify_setup(self.__job)

def run(self): # noqa: C901
def run(self) -> int: # noqa: C901
"""
Run all payload processes (including pre- and post-processes, and utilities).
In the case of HPO jobs, this function will loop over all processes until the preprocess returns a special
exit code.
:return:
"""
:return: exit code (int).
"""
diagnostics = ''

# get the payload command from the user specific code
Expand Down Expand Up @@ -749,8 +754,8 @@ def run(self): # noqa: C901
iteration = 0
while True:

logger.info('payload iteration loop #%d', iteration + 1)
os.environ['PILOT_EXEC_ITERATION_COUNT'] = '%s' % iteration
logger.info(f'payload iteration loop #{iteration + 1}')
os.environ['PILOT_EXEC_ITERATION_COUNT'] = f'{iteration}'
#if self.__args.debug:
# show_memory_usage()

Expand All @@ -759,7 +764,7 @@ def run(self): # noqa: C901
exit_code = self.run_preprocess(self.__job)
jobparams_post = self.__job.jobparams
if exit_code:
if exit_code >= 160 and exit_code <= 162:
if 160 <= exit_code <= 162:
exit_code = 0
# wipe the output file list since there won't be any new files
# any output files from previous iterations, should have been transferred already
Expand Down Expand Up @@ -787,7 +792,7 @@ def run(self): # noqa: C901
if proc is None:
# run the post-process command even if there was no main payload
if os.environ.get('HARVESTER_HOROVOD', '') != '':
logger.info('No need to execute any main payload')
logger.info('no need to execute any main payload')
exit_code = self.run_utility_after_payload_finished(exit_code, True, UTILITY_AFTER_PAYLOAD_FINISHED2)
self.post_payload(self.__job)
else:
Expand All @@ -806,7 +811,7 @@ def run(self): # noqa: C901
# allow for a secondary command to be started after the payload (e.g. a coprocess)
utility_cmd = self.get_utility_command(order=UTILITY_AFTER_PAYLOAD_STARTED2)
if utility_cmd:
logger.debug('starting utility command: %s', utility_cmd)
logger.debug(f'starting utility command: {utility_cmd}')
label = 'coprocess' if 'coprocess' in utility_cmd else None
proc_co = self.run_command(utility_cmd, label=label)

Expand All @@ -823,18 +828,18 @@ def run(self): # noqa: C901
else:
state = 'finished' if exit_code == 0 else 'failed'
set_pilot_state(job=self.__job, state=state)
logger.info('\n\nfinished pid=%s exit_code=%s state=%s\n', proc.pid, exit_code, self.__job.state)
logger.info(f'\n\nfinished pid={proc.pid} exit_code={exit_code} state={self.__job.state}\n')

# stop the utility command (e.g. a coprocess if necessary
if proc_co:
logger.debug('stopping utility command: %s', utility_cmd)
logger.debug(f'stopping utility command: {utility_cmd}')
kill_processes(proc_co.pid)

if exit_code is None:
logger.warning('detected unset exit_code from wait_graceful - reset to -1')
exit_code = -1

for order in [UTILITY_AFTER_PAYLOAD_FINISHED, UTILITY_AFTER_PAYLOAD_FINISHED2]:
for order in (UTILITY_AFTER_PAYLOAD_FINISHED, UTILITY_AFTER_PAYLOAD_FINISHED2):
exit_code = self.run_utility_after_payload_finished(exit_code, state, order)

# keep track of post-payload timing
Expand All @@ -857,19 +862,18 @@ def run(self): # noqa: C901

return exit_code, diagnostics

def run_utility_after_payload_finished(self, exit_code, state, order):
def run_utility_after_payload_finished(self, exit_code: int, state: str, order: str) -> int:
"""
Run utility command after the main payload has finished.
In horovod mode, select the corresponding post-process. Otherwise, select different post-process (e.g. Xcache).
In horovod mode, select the corresponding post-process. Otherwise, select different post-process (e.g. Xcache).
The order constant can be UTILITY_AFTER_PAYLOAD_FINISHED, UTILITY_AFTER_PAYLOAD_FINISHED2
:param exit_code: transform exit code (int).
:param state: payload state; finished/failed (string).
:param order: constant used for utility selection (constant).
:param exit_code: transform exit code (int)
:param state: payload state; finished/failed (str)
:param order: constant used for utility selection (str)
:return: exit code (int).
"""

_exit_code = 0
try:
cmd_after_payload, label, ignore_failure = self.utility_after_payload_finished(self.__job, order)
Expand All @@ -879,10 +883,10 @@ def run_utility_after_payload_finished(self, exit_code, state, order):
else:
if cmd_after_payload and self.__job.postprocess and state != 'failed':
cmd_after_payload = self.__job.setup + cmd_after_payload
logger.info("\n\npostprocess execution command:\n\n%s\n", cmd_after_payload)
logger.info(f"\n\npostprocess execution command:\n\n{cmd_after_payload}\n")
_exit_code = self.execute_utility_command(cmd_after_payload, self.__job, label)
elif cmd_after_payload:
logger.info("\n\npostprocess execution command:\n\n%s\n", cmd_after_payload)
logger.info(f"\n\npostprocess execution command:\n\n{cmd_after_payload}\n")
_exit_code = self.execute_utility_command(cmd_after_payload, self.__job, label)

# only set a new non-zero exit code if exit_code was not already set and ignore_failure is False
Expand All @@ -893,14 +897,9 @@ def run_utility_after_payload_finished(self, exit_code, state, order):
return exit_code

def stop_utilities(self):
"""
Stop any running utilities.
:return:
"""

"""Stop any running utilities."""
pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0)
user = __import__(f'pilot.user.{pilot_user}.common', globals(), locals(), [pilot_user], 0)

for utcmd in list(self.__job.utilities.keys()):
utproc = self.__job.utilities[utcmd][0]
Expand All @@ -917,18 +916,17 @@ def stop_utilities(self):
logger.info(f'utility process {utproc.pid} cleanup finished with status={status}')
user.post_utility_command_action(utcmd, self.__job)

def kill_and_wait_for_process(self, pid, user, utcmd):
def kill_and_wait_for_process(self, pid: int, user: str, utcmd: str) -> int:
"""
Kill utility process and wait for it to finish.
:param pid: process id (int)
:param user: pilot user/experiment (str)
:param utcmd: utility command (str)
:return: process exit status (int, None).
:return: process exit status (int or None).
"""

sig = user.get_utility_command_kill_signal(utcmd)
logger.info("stopping utility process \'%s\' with signal %d", utcmd, sig)
logger.info(f"stopping utility process \'{utcmd}\' with signal {sig}")

try:
# Send SIGUSR1 signal to the process
Expand Down Expand Up @@ -1000,17 +998,16 @@ def kill_and_wait_for_process(self, pid, user, utcmd):
# logger.warning(f"exception caught: {exc}")
# return None

def rename_log_files(self, iteration):
def rename_log_files(self, iteration: int):
"""
Rename log files.
:param iteration:
:return:
:param iteration: iteration (int).
"""

names = [self.__preprocess_stdout_name, self.__preprocess_stderr_name,
self.__postprocess_stdout_name, self.__postprocess_stderr_name]
for name in names:
if os.path.exists(name):
os.rename(name, name + '%d' % iteration)
os.rename(name, name + f'{iteration}')
else:
logger.warning('cannot rename %s since it does not exist', name)
logger.warning(f'cannot rename {name} since it does not exist')

0 comments on commit 8f305a7

Please sign in to comment.