Skip to content

Commit

Permalink
Pydocstyle updates
Browse files Browse the repository at this point in the history
  • Loading branch information
Paul Nilsson committed Nov 17, 2023
1 parent 5351adc commit 3593dbe
Showing 1 changed file with 65 additions and 94 deletions.
159 changes: 65 additions & 94 deletions pilot/control/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -924,7 +924,7 @@ def add_timing_and_extracts(data: dict, job: Any, state: str, args: Any):

def add_memory_info(data: dict, workdir: str, name: str = ""):
"""
Add memory information (if available) to the data structure that will be sent to the server with job updates
Add memory information (if available) to the data structure that will be sent to the server with job updates.
Note: this function updates the data dictionary.
Expand Down Expand Up @@ -2478,7 +2478,6 @@ def update_server(job: Any, args: Any) -> None:
:param job: job object (Any)
:param args: Pilot arguments object (Any).
"""

if job.completed:
logger.warning('job has already completed - cannot send another final update')
return
Expand Down Expand Up @@ -2602,11 +2601,16 @@ def fast_monitor_tasks(job: Any) -> int:
return exit_code


def message_listener(queues, traces, args):
def message_listener(queues: Any, traces: Any, args: Any):
"""
Listen for messages from ActiveMQ.
"""
Thread.
:param queues: internal queues for job handling (Any)
:param traces: tuple containing internal pilot states (Any)
:param args: Pilot arguments object (e.g. containing queue name, queuedata dictionary, etc) (Any)
"""
while not args.graceful_stop.is_set() and args.subscribe_to_msgsvc:

# listen for a message and add it to the messages queue
Expand Down Expand Up @@ -2649,18 +2653,18 @@ def message_listener(queues, traces, args):
logger.info('[job] message listener thread has finished')


def fast_job_monitor(queues, traces, args):
def fast_job_monitor(queues: Any, traces: Any, args: Any) -> None:
"""
Fast monitoring of job parameters.
Thread.
This function can be used for monitoring processes below the one minute threshold of the normal job_monitor thread.
:param queues: internal queues for job handling.
:param traces: tuple containing internal pilot states.
:param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
:return:
:param queues: internal queues for job handling (Any)
:param traces: tuple containing internal pilot states (Any)
:param args: Pilot arguments object (e.g. containing queue name, queuedata dictionary, etc) (Any)
"""

# peeking and current time; peeking_time gets updated if and when jobs are being monitored, update_time is only
# used for sending the heartbeat and is updated after a server update
#peeking_time = int(time.time())
Expand Down Expand Up @@ -2715,20 +2719,21 @@ def fast_job_monitor(queues, traces, args):
logger.info('[job] fast job monitor thread has finished')


def job_monitor(queues, traces, args): # noqa: C901
def job_monitor(queues: Any, traces: Any, args: Any): # noqa: C901
"""
Monitoring of job parameters.
Monitor job parameters.
Thread.
This function monitors certain job parameters, such as job looping, at various time intervals. The main loop
is executed once a minute, while individual verifications may be executed at any time interval (>= 1 minute). E.g.
looping jobs are checked once every ten minutes (default) and the heartbeat is sent once every 30 minutes. Memory
usage is checked once a minute.
:param queues: internal queues for job handling.
:param traces: tuple containing internal pilot states.
:param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
:return:
:param queues: internal queues for job handling (Any)
:param traces: tuple containing internal pilot states (Any)
:param args: Pilot arguments object (e.g. containing queue name, queuedata dictionary, etc) (Any)
"""

# initialize the monitoring time object
mt = MonitoringTime()

Expand Down Expand Up @@ -2901,10 +2906,6 @@ def job_monitor(queues, traces, args): # noqa: C901

elif os.environ.get('PILOT_JOB_STATE') == 'stagein':
logger.info('job monitoring is waiting for stage-in to finish')
#else:
# # check the waiting time in the job monitor. set global graceful_stop if necessary
# if args.workflow != 'stager':
# check_job_monitor_waiting_time(args, peeking_time, abort_override=abort_job)

n += 1

Expand All @@ -2926,15 +2927,14 @@ def job_monitor(queues, traces, args): # noqa: C901
logger.info('[job] job monitor thread has finished')


def preliminary_server_update(job, args, diagnostics):
def preliminary_server_update(job: Any, args: Any, diagnostics: str):
"""
Send a quick job update to the server (do not send any error code yet) for a failed job.
:param job: job object
:param args: args object
:param diagnostics: error diagnostics (string).
:param job: job object (Any)
:param args: Pilot arguments object (Any)
:param diagnostics: error diagnostics (str).
"""

logger.warning(f'will send preliminary diagnostics (and pretend job is still running)={diagnostics}')
piloterrorcode = job.piloterrorcode
piloterrorcodes = job.piloterrorcodes
Expand All @@ -2948,38 +2948,41 @@ def preliminary_server_update(job, args, diagnostics):
job.piloterrordiags = piloterrordiags


def get_signal_error(sig):
def get_signal_error(sig: Any) -> int:
"""
Return a corresponding pilot error code for the given signal.
:param sig: signal.
:param sig: signal (Any)
:return: pilot error code (int).
"""
try:
_sig = str(sig) # e.g. 'SIGTERM'
except ValueError:
ret = errors.KILLSIGNAL
else:
codes = {'SIGBUS': errors.SIGBUS,
'SIGQUIT': errors.SIGQUIT,
'SIGSEGV': errors.SIGSEGV,
'SIGTERM': errors.SIGTERM,
'SIGXCPU': errors.SIGXCPU,
'SIGUSR1': errors.SIGUSR1,
'USERKILL': errors.USERKILL}
ret = codes.get(_sig) if _sig in codes else errors.KILLSIGNAL

_sig = str(sig) # e.g. 'SIGTERM'
codes = {'SIGBUS': errors.SIGBUS,
'SIGQUIT': errors.SIGQUIT,
'SIGSEGV': errors.SIGSEGV,
'SIGTERM': errors.SIGTERM,
'SIGXCPU': errors.SIGXCPU,
'SIGUSR1': errors.SIGUSR1,
'USERKILL': errors.USERKILL}
ret = codes.get(_sig) if _sig in codes else errors.KILLSIGNAL
return ret


def download_new_proxy(role='production', proxy_type='', workdir=''):
def download_new_proxy(role: str = 'production', proxy_type: str = '', workdir: str = '') -> int:
"""
The production proxy has expired, try to download a new one.
Download a new production proxy, since it has expired.
If it fails to download and verify a new proxy, return the NOVOMSPROXY error.
:param role: role, 'production' or 'user' (string).
:param proxy_type: proxy type, e.g. unified (string).
:param workdir: payload work directory (string).
:param role: role, 'production' or 'user' (str)
:param proxy_type: proxy type, e.g. unified (str)
:param workdir: payload work directory (str)
:return: exit code (int).
"""

exit_code = 0
x509 = os.environ.get('X509_USER_PROXY', '')
logger.info(f'attempt to download a new proxy (proxy_type={proxy_type})')
Expand All @@ -3006,72 +3009,41 @@ def download_new_proxy(role='production', proxy_type='', workdir=''):
return exit_code


def send_heartbeat_if_time(job, args, update_time):
def send_heartbeat_if_time(job: Any, args: Any, update_time: float) -> int:
"""
Send a heartbeat to the server if it is time to do so.
:param job: job object.
:param args: args object.
:param update_time: last update time (from time.time()).
:return: possibly updated update_time (from time.time()).
:param job: job object (Any)
:param args: Pilot arguments object (Any)
:param update_time: last update time (from time.time()) (float)
:return: possibly updated update_time, converted to int (from time.time()) (int).
"""

if job.completed:
logger.info('job already completed - will not send any further updates')
return update_time

if int(time.time()) - update_time >= get_heartbeat_period(job.debug and job.debug_command):
elif int(time.time()) - update_time >= get_heartbeat_period(job.debug and job.debug_command):
# check for state==running here, and send explicit 'running' in send_state, rather than sending job.state
# since the job state can actually change in the meantime by another thread
# job.completed will anyway be checked in https::send_update()
if job.serverstate != 'finished' and job.serverstate != 'failed' and job.state == 'running':
logger.info('will send heartbeat for job in \'running\' state')
send_state(job, args, 'running')
update_time = int(time.time())

return update_time


def check_job_monitor_waiting_time(args, peeking_time, abort_override=False):
"""
Check the waiting time in the job monitor.
Set global graceful_stop if necessary.
:param args: args object.
:param peeking_time: time when monitored_payloads queue was peeked into (int).
:return:
"""

waiting_time = int(time.time()) - peeking_time
msg = f'no jobs in monitored_payloads queue (waited for {waiting_time} s)'
if waiting_time > 60 * 60:
msg += ' - aborting'
# abort = True
#else:
# abort = False
if logger:
logger.warning(msg)
update_time = time.time()
else:
print(msg)
#if abort or abort_override:
# # do not set graceful stop if pilot has not finished sending the final job update
# # i.e. wait until SERVER_UPDATE is DONE_FINAL
# check_for_final_server_update(args.update_server)
# args.graceful_stop.set()
logger.info('will not send any job update')

return int(update_time)


def fail_monitored_job(job, exit_code, diagnostics, queues, traces):
def fail_monitored_job(job: Any, exit_code: int, diagnostics: str, queues: Any, traces: Any):
"""
Fail a monitored job.
:param job: job object
:param exit_code: exit code from job_monitor_tasks (int).
:param diagnostics: pilot error diagnostics (string).
:param queues: queues object.
:param traces: traces object.
:return:
:param job: job object (Any)
:param exit_code: exit code from job_monitor_tasks (int)
:param diagnostics: pilot error diagnostics (str)
:param queues: queues object (Any)
:param traces: traces object (Any).
"""

set_pilot_state(job=job, state="failed")
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(exit_code, msg=diagnostics)
job.piloterrordiag = diagnostics
Expand All @@ -3080,15 +3052,14 @@ def fail_monitored_job(job, exit_code, diagnostics, queues, traces):
logger.info('aborting job monitoring since job state=%s', job.state)


def make_job_report(job):
def make_job_report(job: Any):
"""
Make a summary report for the given job.
This function is called when the job has completed.
:param job: job object.
:return:
:param job: job object (Any).
"""

logger.info('')
logger.info('job summary report')
logger.info('--------------------------------------------------')
Expand Down

0 comments on commit 3593dbe

Please sign in to comment.