diff --git a/pilot/control/payloads/eventservice.py b/pilot/control/payloads/eventservice.py index fd0892db..e1c61cce 100644 --- a/pilot/control/payloads/eventservice.py +++ b/pilot/control/payloads/eventservice.py @@ -41,11 +41,11 @@ def __init__(self, args: Any, job: Any, out: TextIO, err: TextIO, traces: Any): """ Set initial values. - :param args: args object - :param job: job object - :param out: stdout file object - :param err: stderr file object - :param traces: traces object. + :param args: args object (Any) + :param job: job object (Any) + :param out: stdout file object (TextIO) + :param err: stderr file object (TextIO) + :param traces: traces object (Any). """ super().__init__(args, job, out, err, traces) diff --git a/pilot/control/payloads/eventservicemerge.py b/pilot/control/payloads/eventservicemerge.py index 1826d943..05fb44b0 100644 --- a/pilot/control/payloads/eventservicemerge.py +++ b/pilot/control/payloads/eventservicemerge.py @@ -39,11 +39,11 @@ def __init__(self, args: Any, job: Any, out: TextIO, err: TextIO, traces: Any): """ Set initial values. - :param args: args object - :param job: job object - :param out: stdout file object - :param err: stderr file object - :param traces: traces object. + :param args: args object (Any) + :param job: job object (Any) + :param out: stdout file object (TextIO) + :param err: stderr file object (TextIO) + :param traces: traces object (Any). """ super().__init__(args, job, out, err, traces) diff --git a/pilot/control/payloads/generic.py b/pilot/control/payloads/generic.py index c6ace32d..8d6cd068 100644 --- a/pilot/control/payloads/generic.py +++ b/pilot/control/payloads/generic.py @@ -68,6 +68,15 @@ class Executor(): """Executor class for generic payloads.""" def __init__(self, args: Any, job: Any, out: TextIO, err: TextIO, traces: Any): + """ + Set initial values. + + :param args: args object (Any) + :param job: job object (Any) + :param out: stdout file object (TextIO) + :param err: stderr file object (TextIO) + :param traces: traces object (Any). + """ self.__args = args self.__job = job self.__out = out # payload stdout file object @@ -88,11 +97,11 @@ def get_job(self): """ return self.__job - def pre_setup(self, job): + def pre_setup(self, job: Any): """ Run pre setup functions. - :param job: job object. + :param job: job object (Any). """ # write time stamps to pilot timing file update_time = time.time() @@ -224,9 +233,8 @@ def utility_after_payload_started(self, job: Any): """ Run utility functions after payload started. - :param job: job object + :param job: job object (Any). """ - # get the payload command from the user specific code pilot_user = os.environ.get('PILOT_USER', 'generic').lower() user = __import__(f'pilot.user.{pilot_user}.common', globals(), locals(), [pilot_user], 0) @@ -351,7 +359,6 @@ def execute_utility_command(self, cmd: str, job: Any, label: str) -> int: :param label: command label (str) :return: exit code (int). """ - exit_code, stdout, stderr = execute(cmd, workdir=job.workdir, cwd=job.workdir, usecontainer=False) if exit_code: ignored_exit_codes = [160, 161, 162] @@ -387,7 +394,6 @@ def write_utility_output(self, workdir: str, step: str, stdout: str, stderr: str :param stdout: command stdout (str) :param stderr: command stderr (str) """ - # dump to file try: name_stdout = step + '_stdout.txt' @@ -446,9 +452,9 @@ def run_command(self, cmd: str, label: str = "") -> Any: Execute the given command and return the process info. :param cmd: command (str) + :param label: label (str) :return: subprocess object (Any). """ - if label: logger.info(f'\n\n{label}:\n\n{cmd}\n') if label == 'coprocess': @@ -478,11 +484,12 @@ def run_command(self, cmd: str, label: str = "") -> Any: def run_payload(self, job: Any, cmd: str, out: Any, err: Any) -> Any: """ - Setup and execute the main payload process. + Set up and execute the main payload process. REFACTOR using run_command() - :param job: job object + :param job: job object (Any) + :param cmd: command (str) :param out: (currently not used; deprecated) :param err: (currently not used; deprecated) :return: proc (subprocess returned by Popen()). @@ -535,7 +542,7 @@ def cut_str_from(_cmd: str, _str: str) -> str: def cut_str_from_last_semicolon(_cmd: str) -> str: """ - Cut the string from the last semicolon + Cut the string from the last semicolon. NOTE: this will not work if jobParams also contain ; @@ -572,7 +579,6 @@ def wait_graceful(self, args: Any, proc: Any) -> int: :param proc: subprocess object (Any) :return: exit code (int). """ - breaker = False exit_code = None iteration = 0 @@ -636,10 +642,10 @@ def run_preprocess(self, job: Any): """ Run any preprocess payloads. - :param job: job object. - :return: + :param job: job object (Any) + :return: exit code (int) + :raises: Exception. """ - exit_code = 0 try: @@ -651,7 +657,7 @@ def run_preprocess(self, job: Any): if cmd_before_payload: cmd_before_payload = job.setup + cmd_before_payload - logger.info("\n\npreprocess execution command:\n\n%s\n", cmd_before_payload) + logger.info(f"\n\npreprocess execution command:\n\n{cmd_before_payload}\n") exit_code = self.execute_utility_command(cmd_before_payload, job, 'preprocess') if exit_code == 160: logger.warning('no more HP points - time to abort processing loop') @@ -662,35 +668,34 @@ def run_preprocess(self, job: Any): elif exit_code: # set error code job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.PREPROCESSFAILURE) - logger.fatal('cannot continue since preprocess failed: exit_code=%d', exit_code) + logger.fatal(f'cannot continue since preprocess failed: exit_code={exit_code}') else: # in case the preprocess produced a command, chmod it path = os.path.join(job.workdir, job.containeroptions.get('containerExec', 'does_not_exist')) if os.path.exists(path): - logger.debug('chmod 0o755: %s', path) os.chmod(path, 0o755) return exit_code def should_verify_setup(self): """ - Should the setup command be verified? + Determine if the setup command should be verified. - :return: Boolean. + :return: should verify setup (bool). """ - pilot_user = os.environ.get('PILOT_USER', 'generic').lower() - user = __import__('pilot.user.%s.setup' % pilot_user, globals(), locals(), [pilot_user], 0) + user = __import__(f'pilot.user.{pilot_user}.setup', globals(), locals(), [pilot_user], 0) return user.should_verify_setup(self.__job) - def run(self): # noqa: C901 + def run(self) -> int: # noqa: C901 """ Run all payload processes (including pre- and post-processes, and utilities). + In the case of HPO jobs, this function will loop over all processes until the preprocess returns a special exit code. - :return: - """ + :return: exit code (int). + """ diagnostics = '' # get the payload command from the user specific code @@ -749,8 +754,8 @@ def run(self): # noqa: C901 iteration = 0 while True: - logger.info('payload iteration loop #%d', iteration + 1) - os.environ['PILOT_EXEC_ITERATION_COUNT'] = '%s' % iteration + logger.info(f'payload iteration loop #{iteration + 1}') + os.environ['PILOT_EXEC_ITERATION_COUNT'] = f'{iteration}' #if self.__args.debug: # show_memory_usage() @@ -759,7 +764,7 @@ def run(self): # noqa: C901 exit_code = self.run_preprocess(self.__job) jobparams_post = self.__job.jobparams if exit_code: - if exit_code >= 160 and exit_code <= 162: + if 160 <= exit_code <= 162: exit_code = 0 # wipe the output file list since there won't be any new files # any output files from previous iterations, should have been transferred already @@ -787,7 +792,7 @@ def run(self): # noqa: C901 if proc is None: # run the post-process command even if there was no main payload if os.environ.get('HARVESTER_HOROVOD', '') != '': - logger.info('No need to execute any main payload') + logger.info('no need to execute any main payload') exit_code = self.run_utility_after_payload_finished(exit_code, True, UTILITY_AFTER_PAYLOAD_FINISHED2) self.post_payload(self.__job) else: @@ -806,7 +811,7 @@ def run(self): # noqa: C901 # allow for a secondary command to be started after the payload (e.g. a coprocess) utility_cmd = self.get_utility_command(order=UTILITY_AFTER_PAYLOAD_STARTED2) if utility_cmd: - logger.debug('starting utility command: %s', utility_cmd) + logger.debug(f'starting utility command: {utility_cmd}') label = 'coprocess' if 'coprocess' in utility_cmd else None proc_co = self.run_command(utility_cmd, label=label) @@ -823,18 +828,18 @@ def run(self): # noqa: C901 else: state = 'finished' if exit_code == 0 else 'failed' set_pilot_state(job=self.__job, state=state) - logger.info('\n\nfinished pid=%s exit_code=%s state=%s\n', proc.pid, exit_code, self.__job.state) + logger.info(f'\n\nfinished pid={proc.pid} exit_code={exit_code} state={self.__job.state}\n') # stop the utility command (e.g. a coprocess if necessary if proc_co: - logger.debug('stopping utility command: %s', utility_cmd) + logger.debug(f'stopping utility command: {utility_cmd}') kill_processes(proc_co.pid) if exit_code is None: logger.warning('detected unset exit_code from wait_graceful - reset to -1') exit_code = -1 - for order in [UTILITY_AFTER_PAYLOAD_FINISHED, UTILITY_AFTER_PAYLOAD_FINISHED2]: + for order in (UTILITY_AFTER_PAYLOAD_FINISHED, UTILITY_AFTER_PAYLOAD_FINISHED2): exit_code = self.run_utility_after_payload_finished(exit_code, state, order) # keep track of post-payload timing @@ -857,19 +862,18 @@ def run(self): # noqa: C901 return exit_code, diagnostics - def run_utility_after_payload_finished(self, exit_code, state, order): + def run_utility_after_payload_finished(self, exit_code: int, state: str, order: str) -> int: """ Run utility command after the main payload has finished. - In horovod mode, select the corresponding post-process. Otherwise, select different post-process (e.g. Xcache). + In horovod mode, select the corresponding post-process. Otherwise, select different post-process (e.g. Xcache). The order constant can be UTILITY_AFTER_PAYLOAD_FINISHED, UTILITY_AFTER_PAYLOAD_FINISHED2 - :param exit_code: transform exit code (int). - :param state: payload state; finished/failed (string). - :param order: constant used for utility selection (constant). + :param exit_code: transform exit code (int) + :param state: payload state; finished/failed (str) + :param order: constant used for utility selection (str) :return: exit code (int). """ - _exit_code = 0 try: cmd_after_payload, label, ignore_failure = self.utility_after_payload_finished(self.__job, order) @@ -879,10 +883,10 @@ def run_utility_after_payload_finished(self, exit_code, state, order): else: if cmd_after_payload and self.__job.postprocess and state != 'failed': cmd_after_payload = self.__job.setup + cmd_after_payload - logger.info("\n\npostprocess execution command:\n\n%s\n", cmd_after_payload) + logger.info(f"\n\npostprocess execution command:\n\n{cmd_after_payload}\n") _exit_code = self.execute_utility_command(cmd_after_payload, self.__job, label) elif cmd_after_payload: - logger.info("\n\npostprocess execution command:\n\n%s\n", cmd_after_payload) + logger.info(f"\n\npostprocess execution command:\n\n{cmd_after_payload}\n") _exit_code = self.execute_utility_command(cmd_after_payload, self.__job, label) # only set a new non-zero exit code if exit_code was not already set and ignore_failure is False @@ -893,14 +897,9 @@ def run_utility_after_payload_finished(self, exit_code, state, order): return exit_code def stop_utilities(self): - """ - Stop any running utilities. - - :return: - """ - + """Stop any running utilities.""" pilot_user = os.environ.get('PILOT_USER', 'generic').lower() - user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0) + user = __import__(f'pilot.user.{pilot_user}.common', globals(), locals(), [pilot_user], 0) for utcmd in list(self.__job.utilities.keys()): utproc = self.__job.utilities[utcmd][0] @@ -917,18 +916,17 @@ def stop_utilities(self): logger.info(f'utility process {utproc.pid} cleanup finished with status={status}') user.post_utility_command_action(utcmd, self.__job) - def kill_and_wait_for_process(self, pid, user, utcmd): + def kill_and_wait_for_process(self, pid: int, user: str, utcmd: str) -> int: """ Kill utility process and wait for it to finish. :param pid: process id (int) :param user: pilot user/experiment (str) :param utcmd: utility command (str) - :return: process exit status (int, None). + :return: process exit status (int or None). """ - sig = user.get_utility_command_kill_signal(utcmd) - logger.info("stopping utility process \'%s\' with signal %d", utcmd, sig) + logger.info(f"stopping utility process \'{utcmd}\' with signal {sig}") try: # Send SIGUSR1 signal to the process @@ -1000,17 +998,16 @@ def kill_and_wait_for_process(self, pid, user, utcmd): # logger.warning(f"exception caught: {exc}") # return None - def rename_log_files(self, iteration): + def rename_log_files(self, iteration: int): """ + Rename log files. - :param iteration: - :return: + :param iteration: iteration (int). """ - names = [self.__preprocess_stdout_name, self.__preprocess_stderr_name, self.__postprocess_stdout_name, self.__postprocess_stderr_name] for name in names: if os.path.exists(name): - os.rename(name, name + '%d' % iteration) + os.rename(name, name + f'{iteration}') else: - logger.warning('cannot rename %s since it does not exist', name) + logger.warning(f'cannot rename {name} since it does not exist')