Skip to content

Commit

Permalink
Pydocstyle updates
Browse files Browse the repository at this point in the history
  • Loading branch information
Paul Nilsson committed Nov 19, 2023
1 parent fa4f37b commit c18a4fc
Showing 1 changed file with 74 additions and 92 deletions.
166 changes: 74 additions & 92 deletions pilot/control/payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import traceback
import queue
from re import findall, split
from typing import Any
from typing import Any, TextIO

from pilot.control.payloads import (
generic,
Expand Down Expand Up @@ -69,7 +69,7 @@ def control(queues: Any, traces: Any, args: Any):
:param queues: internal queues for job handling (Any)
:param traces: tuple containing internal pilot states (Any)
:param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc) (Any)
:param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc) (Any).
"""
targets = {'validate_pre': validate_pre, 'execute_payloads': execute_payloads, 'validate_post': validate_post,
'failed_post': failed_post, 'run_realtimelog': run_realtimelog}
Expand Down Expand Up @@ -117,17 +117,18 @@ def control(queues: Any, traces: Any, args: Any):
logger.info('[payload] control thread has finished')


def validate_pre(queues, traces, args):
def validate_pre(queues: Any, traces: Any, args: Any):
"""
Get a Job object from the "payloads" queue and validate it.
Thread.
If the payload is successfully validated (user defined), the Job object is placed in the "validated_payloads" queue,
otherwise it is placed in the "failed_payloads" queue.
:param queues: internal queues for job handling.
:param traces: tuple containing internal pilot states.
:param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
:return:
:param queues: internal queues for job handling (Any)
:param traces: tuple containing internal pilot states (Any)
:param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc) (Any).
"""
while not args.graceful_stop.is_set():
time.sleep(0.5)
Expand All @@ -150,14 +151,13 @@ def validate_pre(queues, traces, args):
logger.info('[payload] validate_pre thread has finished')


def _validate_payload(job):
def _validate_payload(job: Any) -> bool:
"""
Perform validation tests for the payload.
Perform user validation tests for the payload.
:param job: job object.
:return: boolean.
:param job: job object (Any)
:return: boolean (bool).
"""

status = True

# perform user specific validation
Expand All @@ -172,27 +172,28 @@ def _validate_payload(job):
return status


def get_payload_executor(args, job, out, err, traces):
def get_payload_executor(args: Any, job: Any, out: TextIO, err: TextIO, traces: Any) -> Any:
"""
Get payload executor function for different payload.
:param args: args object
:param job: job object
:param out: stdout file object
:param err: stderr file object
:param traces: traces object
:return: instance of a payload executor.
:param args: Pilot arguments object (Any)
:param job: job object (Any)
:param out: stdout file object (TextIO)
:param err: stderr file object (TextIO)
:param traces: traces object (Any)
:return: instance of a payload executor (Any).
"""
if job.is_eventservice: # True for native HPO workflow as well
payload_executor = eventservice.Executor(args, job, out, err, traces)
elif job.is_eventservicemerge:
payload_executor = eventservicemerge.Executor(args, job, out, err, traces)
else:
payload_executor = generic.Executor(args, job, out, err, traces)

return payload_executor


def execute_payloads(queues, traces, args): # noqa: C901
def execute_payloads(queues: Any, traces: Any, args: Any): # noqa: C901
"""
Execute queued payloads.
Expand All @@ -202,12 +203,10 @@ def execute_payloads(queues, traces, args): # noqa: C901
is started, the thread will wait for it to finish and then check for any failures. A successfully completed job is
placed in the "finished_payloads" queue, and a failed job will be placed in the "failed_payloads" queue.
:param queues: internal queues for job handling.
:param traces: tuple containing internal pilot states.
:param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
:return:
:param queues: internal queues for job handling (Any)
:param traces: tuple containing internal pilot states (Any)
:param args: Pilot arguments object (e.g. containing queue name, queuedata dictionary, etc) (Any).
"""

job = None
while not args.graceful_stop.is_set():
time.sleep(0.5)
Expand Down Expand Up @@ -333,14 +332,13 @@ def execute_payloads(queues, traces, args): # noqa: C901
logger.info('[payload] execute_payloads thread has finished')


def extract_error_info(error):
def extract_error_info(error: str) -> (int, str):
"""
Extract the error code and diagnostics from an error exception.
:param error: exception string.
:return: error code (int), diagnostics (string).
:param error: exception string (str)
:return: error code (int), diagnostics (str).
"""

error_code = errors.INTERNALPILOTPROBLEM
diagnostics = f'full exception: {error}'

Expand All @@ -356,26 +354,12 @@ def extract_error_info(error):
return error_code, diagnostics


def get_transport(catchall):
"""
Extract the transport/protocol from catchall if present.
:param catchall: PQ.catchall field (string).
:return: transport (string).
"""

transport = ''

return transport


def get_rtlogging():
def get_rtlogging() -> str:
"""
Return the proper rtlogging value from the experiment specific plug-in or the config file.
:return: rtlogging (str).
"""

rtlogging = None
pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
try:
Expand All @@ -388,21 +372,21 @@ def get_rtlogging():
return rtlogging


def get_logging_info(job, args):
def get_logging_info(job: Any, args: Any) -> dict:
"""
Extract the logging type/protocol/url/port from catchall if present, or from args fields.
Returns a dictionary with the format: {'logging_type': .., 'protocol': .., 'url': .., 'port': .., 'logname': ..}
If the provided debug_command contains a tail instruction ('tail log_file_name'), the pilot will locate
the log file and use that for RT logging (full path).
Note: the returned dictionary can be built with either args (has priority) or catchall info.
:param job: job object.
:param args: args object.
:return: info dictionary (logging_type (string), protocol (string), url (string), port (int)).
:param job: job object (Any)
:param args: Pilot arguments object (Any)
:return: info dictionary (logging_type (string), protocol (string), url (string), port (int)) (dict).
"""

info_dic = {}

if not job.realtimelogging:
Expand Down Expand Up @@ -461,17 +445,16 @@ def get_logging_info(job, args):
return info_dic


def find_log_to_tail(debug_command, workdir, args, is_analysis):
def find_log_to_tail(debug_command: str, workdir: str, args: Any, is_analysis: bool) -> str:
"""
Find the log file to tail in the RT logging.
:param debug_command: requested debug command (string).
:param workdir: job working directory (string).
:param args: pilot args object.
:param is_analysis: True for user jobs (Bool).
:return: path to log file (string).
:param debug_command: requested debug command (str)
:param workdir: job working directory (str)
:param args: Pilot arguments object (Any)
:param is_analysis: True for user jobs, False otherwise (bool)
:return: path to log file (str).
"""

path = ""
filename = ""
counter = 0
Expand Down Expand Up @@ -503,18 +486,17 @@ def find_log_to_tail(debug_command, workdir, args, is_analysis):
return logf


def run_realtimelog(queues, traces, args): # noqa: C901
def run_realtimelog(queues: Any, traces: Any, args: Any): # noqa: C901
"""
Validate finished payloads.
If payload finished correctly, add the job to the data_out queue. If it failed, add it to the data_out queue as
well but only for log stage-out (in failed_post() below).
:param queues: internal queues for job handling.
:param traces: tuple containing internal pilot states.
:param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
:return:
:param queues: internal queues for job handling (Any)
:param traces: tuple containing internal pilot states (Any)
:param args: Pilot arguments object (e.g. containing queue name, queuedata dictionary, etc) (Any).
"""

info_dic = None
while not args.graceful_stop.is_set():
time.sleep(0.5)
Expand Down Expand Up @@ -587,30 +569,28 @@ def run_realtimelog(queues, traces, args): # noqa: C901
logger.info('[payload] run_realtimelog thread has finished')


def set_cpu_consumption_time(job):
def set_cpu_consumption_time(job: Any):
"""
Set the CPU consumption time.
:param job: job object.
:return:
"""
:param job: job object (Any).
"""
cpuconsumptiontime = get_cpu_consumption_time(job.t0)
job.cpuconsumptiontime = int(round(cpuconsumptiontime))
job.cpuconsumptionunit = "s"
job.cpuconversionfactor = 1.0
logger.info(f'CPU consumption time: {cpuconsumptiontime} {job.cpuconsumptionunit} (rounded to {job.cpuconsumptiontime} {job.cpuconsumptionunit})')


def perform_initial_payload_error_analysis(job, exit_code):
def perform_initial_payload_error_analysis(job: Any, exit_code: int):
"""
Perform an initial analysis of the payload.
Singularity/apptainer errors are caught here.
:param job: job object.
:param exit_code: exit code from payload execution.
:return:
:param job: job object (Any)
:param exit_code: exit code from payload execution (int).
"""

if exit_code != 0:
logger.warning(f'main payload execution returned non-zero exit code: {exit_code}')

Expand Down Expand Up @@ -687,14 +667,13 @@ def perform_initial_payload_error_analysis(job, exit_code):
job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(errors.COREDUMP)


def scan_for_memory_errors(subprocesses):
def scan_for_memory_errors(subprocesses: list) -> str:
"""
Scan for memory errors in dmesg messages.
:param subprocesses: list of payload subprocesses.
:return: error diagnostics (string).
:param subprocesses: list of payload subprocesses (list)
:return: error diagnostics (str).
"""

diagnostics = ""
search_str = 'Memory cgroup out of memory'
for pid in subprocesses:
Expand All @@ -714,16 +693,16 @@ def scan_for_memory_errors(subprocesses):
return diagnostics


def set_error_code_from_stderr(msg, fatal):
def set_error_code_from_stderr(msg: str, fatal: bool) -> int:
"""
Identify specific errors in stderr and set the corresponding error code.
The function returns 0 if no error is recognized.
:param msg: stderr (string).
:param fatal: boolean flag if fatal error among warning messages in stderr.
:param msg: stderr (str)
:param fatal: boolean flag if fatal error among warning messages in stderr (bool)
:return: error code (int).
"""

exit_code = 0
error_map = {errors.SINGULARITYNEWUSERNAMESPACE: "Failed invoking the NEWUSER namespace runtime",
errors.SINGULARITYFAILEDUSERNAMESPACE: "Failed to create user namespace",
Expand All @@ -744,18 +723,19 @@ def set_error_code_from_stderr(msg, fatal):
return exit_code


def validate_post(queues, traces, args):
def validate_post(queues: Any, traces: Any, args: Any):
"""
Validate finished payloads.
Thread.
If payload finished correctly, add the job to the data_out queue. If it failed, add it to the data_out queue as
well but only for log stage-out (in failed_post() below).
:param queues: internal queues for job handling.
:param traces: tuple containing internal pilot states.
:param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
:return:
:param queues: internal queues for job handling (Any)
:param traces: tuple containing internal pilot states (Any)
:param args: Pilot arguments object (e.g. containing queue name, queuedata dictionary, etc) (Any).
"""

while not args.graceful_stop.is_set():
time.sleep(0.5)
# finished payloads
Expand All @@ -780,17 +760,19 @@ def validate_post(queues, traces, args):
logger.info('[payload] validate_post thread has finished')


def failed_post(queues, traces, args):
def failed_post(queues: Any, traces: Any, args: Any):
"""
Get a Job object from the "failed_payloads" queue. Set the pilot state to "stakeout" and the stageout field to
Handle failed jobs.
Thread.
Get a Job object from the "failed_payloads" queue. Set the pilot state to "stageout" and the stageout field to
"log", and add the Job object to the "data_out" queue.
:param queues: internal queues for job handling.
:param traces: tuple containing internal pilot states.
:param args: Pilot arguments (e.g. containing queue name, queuedata dictionary, etc).
:return:
:param queues: internal queues for job handling (Any)
:param traces: tuple containing internal pilot states (Any)
:param args: Pilot arguments object (e.g. containing queue name, queuedata dictionary, etc) (Any).
"""

while not args.graceful_stop.is_set():
time.sleep(0.5)
# finished payloads
Expand Down

0 comments on commit c18a4fc

Please sign in to comment.