diff --git a/pilot/control/job.py b/pilot/control/job.py index 0ccddbb2..1c492aa8 100644 --- a/pilot/control/job.py +++ b/pilot/control/job.py @@ -924,7 +924,7 @@ def add_timing_and_extracts(data: dict, job: Any, state: str, args: Any): def add_memory_info(data: dict, workdir: str, name: str = ""): """ - Add memory information (if available) to the data structure that will be sent to the server with job updates + Add memory information (if available) to the data structure that will be sent to the server with job updates. Note: this function updates the data dictionary. @@ -2478,7 +2478,6 @@ def update_server(job: Any, args: Any) -> None: :param job: job object (Any) :param args: Pilot arguments object (Any). """ - if job.completed: logger.warning('job has already completed - cannot send another final update') return @@ -2602,11 +2601,16 @@ def fast_monitor_tasks(job: Any) -> int: return exit_code -def message_listener(queues, traces, args): +def message_listener(queues: Any, traces: Any, args: Any): """ + Listen for messages from ActiveMQ. - """ + Thread. + :param queues: internal queues for job handling (Any) + :param traces: tuple containing internal pilot states (Any) + :param args: Pilot arguments object (e.g. containing queue name, queuedata dictionary, etc) (Any) + """ while not args.graceful_stop.is_set() and args.subscribe_to_msgsvc: # listen for a message and add it to the messages queue @@ -2649,18 +2653,18 @@ def message_listener(queues, traces, args): logger.info('[job] message listener thread has finished') -def fast_job_monitor(queues, traces, args): +def fast_job_monitor(queues: Any, traces: Any, args: Any) -> None: """ Fast monitoring of job parameters. + Thread. + This function can be used for monitoring processes below the one minute threshold of the normal job_monitor thread. - :param queues: internal queues for job handling. - :param traces: tuple containing internal pilot states. - :param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc). - :return: + :param queues: internal queues for job handling (Any) + :param traces: tuple containing internal pilot states (Any) + :param args: Pilot arguments object (e.g. containing queue name, queuedata dictionary, etc) (Any) """ - # peeking and current time; peeking_time gets updated if and when jobs are being monitored, update_time is only # used for sending the heartbeat and is updated after a server update #peeking_time = int(time.time()) @@ -2715,20 +2719,21 @@ def fast_job_monitor(queues, traces, args): logger.info('[job] fast job monitor thread has finished') -def job_monitor(queues, traces, args): # noqa: C901 +def job_monitor(queues: Any, traces: Any, args: Any): # noqa: C901 """ - Monitoring of job parameters. + Monitor job parameters. + + Thread. + This function monitors certain job parameters, such as job looping, at various time intervals. The main loop is executed once a minute, while individual verifications may be executed at any time interval (>= 1 minute). E.g. looping jobs are checked once every ten minutes (default) and the heartbeat is sent once every 30 minutes. Memory usage is checked once a minute. - :param queues: internal queues for job handling. - :param traces: tuple containing internal pilot states. - :param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc). - :return: + :param queues: internal queues for job handling (Any) + :param traces: tuple containing internal pilot states (Any) + :param args: Pilot arguments object (e.g. containing queue name, queuedata dictionary, etc) (Any) """ - # initialize the monitoring time object mt = MonitoringTime() @@ -2901,10 +2906,6 @@ def job_monitor(queues, traces, args): # noqa: C901 elif os.environ.get('PILOT_JOB_STATE') == 'stagein': logger.info('job monitoring is waiting for stage-in to finish') - #else: - # # check the waiting time in the job monitor. set global graceful_stop if necessary - # if args.workflow != 'stager': - # check_job_monitor_waiting_time(args, peeking_time, abort_override=abort_job) n += 1 @@ -2926,15 +2927,14 @@ def job_monitor(queues, traces, args): # noqa: C901 logger.info('[job] job monitor thread has finished') -def preliminary_server_update(job, args, diagnostics): +def preliminary_server_update(job: Any, args: Any, diagnostics: str): """ Send a quick job update to the server (do not send any error code yet) for a failed job. - :param job: job object - :param args: args object - :param diagnostics: error diagnostics (string). + :param job: job object (Any) + :param args: Pilot arguments object (Any) + :param diagnostics: error diagnostics (str). """ - logger.warning(f'will send preliminary diagnostics (and pretend job is still running)={diagnostics}') piloterrorcode = job.piloterrorcode piloterrorcodes = job.piloterrorcodes @@ -2948,38 +2948,41 @@ def preliminary_server_update(job, args, diagnostics): job.piloterrordiags = piloterrordiags -def get_signal_error(sig): +def get_signal_error(sig: Any) -> int: """ Return a corresponding pilot error code for the given signal. - :param sig: signal. + :param sig: signal (Any) :return: pilot error code (int). """ + try: + _sig = str(sig) # e.g. 'SIGTERM' + except ValueError: + ret = errors.KILLSIGNAL + else: + codes = {'SIGBUS': errors.SIGBUS, + 'SIGQUIT': errors.SIGQUIT, + 'SIGSEGV': errors.SIGSEGV, + 'SIGTERM': errors.SIGTERM, + 'SIGXCPU': errors.SIGXCPU, + 'SIGUSR1': errors.SIGUSR1, + 'USERKILL': errors.USERKILL} + ret = codes.get(_sig) if _sig in codes else errors.KILLSIGNAL - _sig = str(sig) # e.g. 'SIGTERM' - codes = {'SIGBUS': errors.SIGBUS, - 'SIGQUIT': errors.SIGQUIT, - 'SIGSEGV': errors.SIGSEGV, - 'SIGTERM': errors.SIGTERM, - 'SIGXCPU': errors.SIGXCPU, - 'SIGUSR1': errors.SIGUSR1, - 'USERKILL': errors.USERKILL} - ret = codes.get(_sig) if _sig in codes else errors.KILLSIGNAL return ret -def download_new_proxy(role='production', proxy_type='', workdir=''): +def download_new_proxy(role: str = 'production', proxy_type: str = '', workdir: str = '') -> int: """ - The production proxy has expired, try to download a new one. + Download a new production proxy, since it has expired. If it fails to download and verify a new proxy, return the NOVOMSPROXY error. - :param role: role, 'production' or 'user' (string). - :param proxy_type: proxy type, e.g. unified (string). - :param workdir: payload work directory (string). + :param role: role, 'production' or 'user' (str) + :param proxy_type: proxy type, e.g. unified (str) + :param workdir: payload work directory (str) :return: exit code (int). """ - exit_code = 0 x509 = os.environ.get('X509_USER_PROXY', '') logger.info(f'attempt to download a new proxy (proxy_type={proxy_type})') @@ -3006,72 +3009,41 @@ def download_new_proxy(role='production', proxy_type='', workdir=''): return exit_code -def send_heartbeat_if_time(job, args, update_time): +def send_heartbeat_if_time(job: Any, args: Any, update_time: float) -> int: """ Send a heartbeat to the server if it is time to do so. - :param job: job object. - :param args: args object. - :param update_time: last update time (from time.time()). - :return: possibly updated update_time (from time.time()). + :param job: job object (Any) + :param args: Pilot arguments object (Any) + :param update_time: last update time (from time.time()) (float) + :return: possibly updated update_time, converted to int (from time.time()) (int). """ - if job.completed: logger.info('job already completed - will not send any further updates') - return update_time - - if int(time.time()) - update_time >= get_heartbeat_period(job.debug and job.debug_command): + elif int(time.time()) - update_time >= get_heartbeat_period(job.debug and job.debug_command): # check for state==running here, and send explicit 'running' in send_state, rather than sending job.state # since the job state can actually change in the meantime by another thread # job.completed will anyway be checked in https::send_update() if job.serverstate != 'finished' and job.serverstate != 'failed' and job.state == 'running': logger.info('will send heartbeat for job in \'running\' state') send_state(job, args, 'running') - update_time = int(time.time()) - - return update_time - - -def check_job_monitor_waiting_time(args, peeking_time, abort_override=False): - """ - Check the waiting time in the job monitor. - Set global graceful_stop if necessary. - - :param args: args object. - :param peeking_time: time when monitored_payloads queue was peeked into (int). - :return: - """ - - waiting_time = int(time.time()) - peeking_time - msg = f'no jobs in monitored_payloads queue (waited for {waiting_time} s)' - if waiting_time > 60 * 60: - msg += ' - aborting' - # abort = True - #else: - # abort = False - if logger: - logger.warning(msg) + update_time = time.time() else: - print(msg) - #if abort or abort_override: - # # do not set graceful stop if pilot has not finished sending the final job update - # # i.e. wait until SERVER_UPDATE is DONE_FINAL - # check_for_final_server_update(args.update_server) - # args.graceful_stop.set() + logger.info('will not send any job update') + + return int(update_time) -def fail_monitored_job(job, exit_code, diagnostics, queues, traces): +def fail_monitored_job(job: Any, exit_code: int, diagnostics: str, queues: Any, traces: Any): """ Fail a monitored job. - :param job: job object - :param exit_code: exit code from job_monitor_tasks (int). - :param diagnostics: pilot error diagnostics (string). - :param queues: queues object. - :param traces: traces object. - :return: + :param job: job object (Any) + :param exit_code: exit code from job_monitor_tasks (int) + :param diagnostics: pilot error diagnostics (str) + :param queues: queues object (Any) + :param traces: traces object (Any). """ - set_pilot_state(job=job, state="failed") job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(exit_code, msg=diagnostics) job.piloterrordiag = diagnostics @@ -3080,15 +3052,14 @@ def fail_monitored_job(job, exit_code, diagnostics, queues, traces): logger.info('aborting job monitoring since job state=%s', job.state) -def make_job_report(job): +def make_job_report(job: Any): """ Make a summary report for the given job. + This function is called when the job has completed. - :param job: job object. - :return: + :param job: job object (Any). """ - logger.info('') logger.info('job summary report') logger.info('--------------------------------------------------')