From bda2eaa48e5a96a48ababa520ab9dd32e58a130e Mon Sep 17 00:00:00 2001 From: Paul Nilsson Date: Fri, 8 Dec 2023 19:17:29 +0100 Subject: [PATCH] Pydocstyle and pylint updates --- pilot/user/atlas/common.py | 552 ++++++++++++++++++------------------- 1 file changed, 263 insertions(+), 289 deletions(-) diff --git a/pilot/user/atlas/common.py b/pilot/user/atlas/common.py index 67044e31..0db1fa5b 100644 --- a/pilot/user/atlas/common.py +++ b/pilot/user/atlas/common.py @@ -36,27 +36,6 @@ # from tarfile import ExFileObject -from .container import create_root_container_command -from .dbrelease import get_dbrelease_version, create_dbrelease -from .setup import ( - should_pilot_prepare_setup, - is_standard_atlas_job, - get_asetup, - set_inds, - get_analysis_trf, - get_payload_environment_variables, - replace_lfns_with_turls, -) -from .utilities import ( - get_memory_monitor_setup, - get_network_monitor_setup, - post_memory_monitor_action, - get_memory_monitor_summary_filename, - get_prefetcher_setup, - get_benchmark_setup, - get_memory_monitor_output_filename, - get_metadata_dict_from_txt, -) from pilot.util.auxiliary import ( get_resource_name, get_key_value, @@ -102,6 +81,28 @@ ) from pilot.util.tracereport import TraceReport +from .container import create_root_container_command +from .dbrelease import get_dbrelease_version, create_dbrelease +from .setup import ( + should_pilot_prepare_setup, + is_standard_atlas_job, + get_asetup, + set_inds, + get_analysis_trf, + get_payload_environment_variables, + replace_lfns_with_turls, +) +from .utilities import ( + get_memory_monitor_setup, + get_network_monitor_setup, + post_memory_monitor_action, + get_memory_monitor_summary_filename, + get_prefetcher_setup, + get_benchmark_setup, + get_memory_monitor_output_filename, + get_metadata_dict_from_txt, +) + logger = logging.getLogger(__name__) errors = ErrorCodes() @@ -129,7 +130,7 @@ def sanity_check() -> int: return 0 -def validate(job) -> bool: +def validate(job: Any) -> bool: """ Perform user specific payload/job validation. @@ -184,7 +185,8 @@ def open_remote_files(indata: list, workdir: str, nthreads: int) -> (int, str, l :param indata: list of FileSpec (list) :param workdir: working directory (str) :param nthreads: number of concurrent file open threads (int) - :return: exit code (int), diagnostics (str), not opened files (list). + :return: exit code (int), diagnostics (str), not opened files (list) + :raises PilotException: in case of pilot error. """ exitcode = 0 diagnostics = "" @@ -222,48 +224,49 @@ def open_remote_files(indata: list, workdir: str, nthreads: int) -> (int, str, l diagnostics = f'cannot perform file open test - pilot source copy failed: {exc}' logger.warning(diagnostics) return exitcode, diagnostics, not_opened + + # correct the path when containers have been used + final_script_path = os.path.join('.', script) + + _cmd = get_file_open_command(final_script_path, turls, nthreads) + cmd = create_root_container_command(workdir, _cmd) + + timeout = get_timeout_for_remoteio(indata) + logger.info(f'executing file open verification script (timeout={timeout}):\n\n\'{cmd}\'\n\n') + + exitcode, stdout, stderr = execute(cmd, usecontainer=False, timeout=timeout) + if config.Pilot.remotefileverification_log: + fpath = os.path.join(workdir, config.Pilot.remotefileverification_log) + write_file(fpath, stdout + stderr, mute=False) + logger.info(f'remote file open finished with ec={exitcode}') + + # error handling + if exitcode: + # first check for apptainer errors + _exitcode = errors.resolve_transform_error(exitcode, stdout + stderr) + if _exitcode != exitcode: # a better error code was found (COMMANDTIMEDOUT error will be passed through) + return _exitcode, stderr, not_opened + + # note: if the remote files could still be opened the reported error should not be REMOTEFILEOPENTIMEDOUT + _exitcode, diagnostics, not_opened = parse_remotefileverification_dictionary(workdir) + if not _exitcode: + logger.info('remote file could still be opened in spite of previous error') + elif _exitcode: + if exitcode == errors.COMMANDTIMEDOUT and _exitcode == errors.REMOTEFILECOULDNOTBEOPENED: + exitcode = errors.REMOTEFILEOPENTIMEDOUT + elif exitcode == errors.COMMANDTIMEDOUT and _exitcode == errors.REMOTEFILEDICTDOESNOTEXIST: + exitcode = errors.REMOTEFILEOPENTIMEDOUT + diagnostics = f'remote file open command was timed-out and: {diagnostics}' # cannot give further info + else: # REMOTEFILECOULDNOTBEOPENED + exitcode = _exitcode else: - # correct the path when containers have been used - final_script_path = os.path.join('.', script) - - _cmd = get_file_open_command(final_script_path, turls, nthreads) - cmd = create_root_container_command(workdir, _cmd) - - timeout = get_timeout_for_remoteio(indata) - logger.info(f'executing file open verification script (timeout={timeout}):\n\n\'{cmd}\'\n\n') - - exitcode, stdout, stderr = execute(cmd, usecontainer=False, timeout=timeout) - if config.Pilot.remotefileverification_log: - fpath = os.path.join(workdir, config.Pilot.remotefileverification_log) - write_file(fpath, stdout + stderr, mute=False) - logger.info(f'remote file open finished with ec={exitcode}') - - # error handling - if exitcode: - # first check for apptainer errors - _exitcode = errors.resolve_transform_error(exitcode, stdout + stderr) - if _exitcode != exitcode: # a better error code was found (COMMANDTIMEDOUT error will be passed through) - return _exitcode, stderr, not_opened - - # note: if the remote files could still be opened the reported error should not be REMOTEFILEOPENTIMEDOUT - _exitcode, diagnostics, not_opened = parse_remotefileverification_dictionary(workdir) - if not _exitcode: - logger.info('remote file could still be opened in spite of previous error') - elif _exitcode: - if exitcode == errors.COMMANDTIMEDOUT and _exitcode == errors.REMOTEFILECOULDNOTBEOPENED: - exitcode = errors.REMOTEFILEOPENTIMEDOUT - elif exitcode == errors.COMMANDTIMEDOUT and _exitcode == errors.REMOTEFILEDICTDOESNOTEXIST: - exitcode = errors.REMOTEFILEOPENTIMEDOUT - diagnostics = f'remote file open command was timed-out and: {diagnostics}' # cannot give further info - else: # REMOTEFILECOULDNOTBEOPENED - exitcode = _exitcode - else: - exitcode, diagnostics, not_opened = parse_remotefileverification_dictionary(workdir) + exitcode, diagnostics, not_opened = parse_remotefileverification_dictionary(workdir) else: logger.info('nothing to verify (for remote files)') if exitcode: logger.warning(f'remote file open exit code: {exitcode}') + return exitcode, diagnostics, not_opened @@ -275,12 +278,14 @@ def get_timeout_for_remoteio(indata: list) -> int: :return: timeout in seconds (int). """ remote_io = [fspec.status == 'remote_io' for fspec in indata] + return len(remote_io) * 30 + 600 def parse_remotefileverification_dictionary(workdir: str) -> (int, str, list): """ Verify that all files could be remotely opened. + Note: currently ignoring if remote file dictionary doesn't exist. :param workdir: work directory needed for opening remote file dictionary (str) @@ -328,6 +333,8 @@ def get_file_open_command(script_path: str, turls: str, nthreads: int, :param script_path: path to script (str) :param turls: comma-separated turls (str) :param nthreads: number of concurrent file open threads (int) + :param stdout: stdout file name (str) + :param stderr: stderr file name (str) :return: comma-separated list of turls (str). """ cmd = f"{script_path} --turls=\'{turls}\' -w {os.path.dirname(script_path)} -t {nthreads}" @@ -700,11 +707,10 @@ def get_generic_payload_command(cmd: str, job: Any, preparesetup: bool, userjob: cmd = f"python {job.homepackage}/{job.transformation} {job.jobparams}" else: cmd = job.jobparams + elif preparesetup: + cmd = f"python {job.transformation} {job.jobparams}" else: - if preparesetup: - cmd = f"python {job.transformation} {job.jobparams}" - else: - cmd = job.jobparams + cmd = job.jobparams return cmd @@ -948,7 +954,7 @@ def get_guids_from_jobparams(jobparams: str, infiles: list, infilesguids: list) return guidlist -def test_job_data(job): +def test_job_data(job: Any): """ Test function to verify that the job object contains the expected data. @@ -1008,6 +1014,8 @@ def test_job_data(job): def update_job_data(job: Any): """ + Update the job object. + This function can be used to update/add data to the job object. E.g. user specific information can be extracted from other job object fields. In the case of ATLAS, information is extracted from the metadata field and @@ -1058,14 +1066,13 @@ def update_job_data(job: Any): verify_output_files(job) except Exception as exc: logger.warning(f'exception caught while trying verify output files: {exc}') + elif not job.allownooutput: # i.e. if it's an empty list/string, do nothing + logger.debug(( + "will not try to extract output files from jobReport " + "for user job (and allowNoOut list is empty)")) else: - if not job.allownooutput: # i.e. if it's an empty list/string, do nothing - logger.debug(( - "will not try to extract output files from jobReport " - "for user job (and allowNoOut list is empty)")) - else: - # remove the files listed in allowNoOutput if they don't exist - remove_no_output_files(job) + # remove the files listed in allowNoOutput if they don't exist + remove_no_output_files(job) validate_output_data(job) @@ -1117,14 +1124,16 @@ def naming_convention_pattern() -> str: """ Return a regular expression pattern in case the output file name should be verified. - pattern=re.compile(r'^[A-Za-z0-9][A-Za-z0-9\\.\\-\\_]{1,250}$') + Pattern as below in the return statement will match the following file names: re.findall(pattern, 'AOD.29466419._001462.pool.root.1') ['AOD.29466419._001462.pool.root.1'] :return: raw string (str). """ max_filename_size = 250 - return r'^[A-Za-z0-9][A-Za-z0-9\\.\\-\\_]{1,%s}$' % max_filename_size + + # pydocstyle does not like the backslash in the following line, but it is needed + return fr"^[A-Za-z0-9][A-Za-z0-9.\-_]{{1,{max_filename_size}}}$" def get_stageout_label(job: Any): @@ -1139,15 +1148,14 @@ def get_stageout_label(job: Any): if job.is_eventservice: logger.info('event service payload, will only stage-out log') stageout = "log" - else: + elif 'exeErrorCode' in job.metadata: # handle any error codes - if 'exeErrorCode' in job.metadata: - job.exeerrorcode = job.metadata['exeErrorCode'] - if job.exeerrorcode == 0: - stageout = "all" - else: - logger.info(f'payload failed: exeErrorCode={job.exeerrorcode}') - stageout = "log" + job.exeerrorcode = job.metadata['exeErrorCode'] + if job.exeerrorcode == 0: + stageout = "all" + else: + logger.info(f'payload failed: exeErrorCode={job.exeerrorcode}') + stageout = "log" return stageout @@ -1228,7 +1236,7 @@ def discover_new_output(name_pattern: str, workdir: str) -> dict: # get checksum try: checksum = calculate_checksum(path, algorithm=config.File.checksum_type) - except (FileHandlingFailure, NotImplementedError, Exception) as exc: + except (FileHandlingFailure, NotImplementedError) as exc: logger.warning(f'failed to create file info (filesize={filesize}) for lfn={lfn}: {exc}') else: if filesize and checksum: @@ -1241,11 +1249,12 @@ def discover_new_output(name_pattern: str, workdir: str) -> dict: def extract_output_file_guids(job: Any) -> None: """ - Extract output file info from the job report and make sure all guids\ - are assigned (use job report value if present, otherwise generate the guid.\ + Extract output file info from the job report and make sure all guids are assigned. + + Use job report value if present, otherwise generate the guid. Note: guid generation is done later, not in this function since this function might not be called if metadata info is not found prior - to the call). + to the call. :param job: job object (Any) :return: None. @@ -1297,11 +1306,10 @@ def extract_output_file_guids(job: Any) -> None: if fspec.guid != data[fspec.lfn].guid: fspec.guid = data[fspec.lfn].guid logger.debug(f'reset guid={fspec.guid} for lfn={fspec.lfn}') + elif fspec.guid: + logger.debug(f'verified guid={fspec.guid} for lfn={fspec.lfn}') else: - if fspec.guid: - logger.debug(f'verified guid={fspec.guid} for lfn={fspec.lfn}') - else: - logger.warning(f'guid not set for lfn={fspec.lfn}') + logger.warning(f'guid not set for lfn={fspec.lfn}') #if extra: #logger.info('found extra output files in job report, # will overwrite output file list: extra=%s' % extra) @@ -1426,6 +1434,7 @@ def verify_extracted_output_files(output: list, lfns_jobdef: list, job: Any) -> nentries = output_jobrep[lfn] if nentries == "UNDEFINED": logger.warning(f'encountered file with nentries=UNDEFINED - will ignore {lfn}') + elif nentries is None: if lfn not in job.allownooutput: logger.warning(f'output file {lfn} is listed in job report, but has no events and ' @@ -1434,6 +1443,7 @@ def verify_extracted_output_files(output: list, lfns_jobdef: list, job: Any) -> logger.warning(f'output file {lfn} is listed in job report, nentries is None and is listed in ' f'allowNoOutput - remove from stage-out') remove_from_stageout(lfn, job) + elif nentries == 0: if lfn not in job.allownooutput: logger.warning(f'output file {lfn} is listed in job report, has zero events and ' @@ -1443,7 +1453,7 @@ def verify_extracted_output_files(output: list, lfns_jobdef: list, job: Any) -> f'allowNoOutput - remove from stage-out') remove_from_stageout(lfn, job) - elif type(nentries) is int and nentries: + elif isinstance(nentries, int) and nentries: logger.info(f'output file {lfn} has {nentries} event(s)') nevents += nentries else: # should not reach this step @@ -1559,12 +1569,12 @@ def get(self, path: str, dst_dict: dict, dst_key: str): return -def parse_jobreport_data(job_report): # noqa: C901 +def parse_jobreport_data(job_report: dict) -> dict: # noqa: C901 """ Parse a job report and extract relevant fields. - :param job_report: - :return: + :param job_report: job report dictionary (dict) + :return: work_attributes (dict). """ work_attributes = {} if job_report is None or not any(job_report): @@ -1581,7 +1591,7 @@ def parse_jobreport_data(job_report): # noqa: C901 if "ATHENA_PROC_NUMBER" in os.environ: logger.debug(f"ATHENA_PROC_NUMBER: {os.environ['ATHENA_PROC_NUMBER']}") work_attributes['core_count'] = int(os.environ['ATHENA_PROC_NUMBER']) - core_count = int(os.environ['ATHENA_PROC_NUMBER']) + core_count = os.environ['ATHENA_PROC_NUMBER'] dictq = DictQuery(job_report) dictq.get("resource/transform/processedEvents", work_attributes, "nEvents") @@ -1627,14 +1637,13 @@ def parse_jobreport_data(job_report): # noqa: C901 return work_attributes -def get_executor_dictionary(jobreport_dictionary): +def get_executor_dictionary(jobreport_dictionary: dict) -> dict: """ Extract the 'executor' dictionary from with a job report. - :param jobreport_dictionary: - :return: executor_dictionary + :param jobreport_dictionary: job report dictionary (dict) + :return: executor_dictionary (dict). """ - executor_dictionary = {} if jobreport_dictionary != {}: @@ -1650,15 +1659,15 @@ def get_executor_dictionary(jobreport_dictionary): return executor_dictionary -def get_resimevents(jobreport_dictionary): +def get_resimevents(jobreport_dictionary: dict) -> int or None: """ Extract and add up the resimevents from the job report. + This information is reported with the jobMetrics. - :param jobreport_dictionary: job report dictionary. - :return: resimevents (int or None) + :param jobreport_dictionary: job report dictionary (dict) + :return: resimevents (int or None). """ - resimevents = None executor_dictionary = get_executor_dictionary(jobreport_dictionary) @@ -1675,18 +1684,18 @@ def get_resimevents(jobreport_dictionary): return resimevents -def get_db_info(jobreport_dictionary): +def get_db_info(jobreport_dictionary) -> (int, int): """ Extract and add up the DB info from the job report. + This information is reported with the jobMetrics. Note: this function adds up the different dbData and dbTime's in the different executor steps. In modern job reports this might have been done already by the transform and stored in dbDataTotal and dbTimeTotal. - :param jobreport_dictionary: job report dictionary. - :return: db_time (int), db_data (long) + :param jobreport_dictionary: job report dictionary (dict) + :return: db_time (int), db_data (int). """ - db_time = 0 db_data = 0 @@ -1711,14 +1720,15 @@ def get_db_info(jobreport_dictionary): return db_time, db_data -def get_db_info_str(db_time, db_data): +def get_db_info_str(db_time: int, db_data: int) -> (str, str): """ Convert db_time, db_data to strings. + E.g. dbData="105077960", dbTime="251.42". - :param db_time: time (s) - :param db_data: long integer - :return: db_time_s, db_data_s (strings) + :param db_time: time in seconds (int) + :param db_data: long integer (int) + :return: db_time_s (str), db_data_s (str). """ zero = 0 @@ -1733,18 +1743,17 @@ def get_db_info_str(db_time, db_data): return db_time_s, db_data_s -def get_cpu_times(jobreport_dictionary): +def get_cpu_times(jobreport_dictionary: dict) -> (str, int, float): """ Extract and add up the total CPU times from the job report. + E.g. ('s', 5790L, 1.0). Note: this function is used with Event Service jobs - :param jobreport_dictionary: - :return: cpu_conversion_unit (unit), total_cpu_time, - conversion_factor (output consistent with set_time_consumed()) + :param jobreport_dictionary: job report dictionary (dict) + :return: cpu_conversion_unit (str), total_cpu_time (int), conversion_factor (output consistent with set_time_consumed()) (float). """ - total_cpu_time = 0 executor_dictionary = get_executor_dictionary(jobreport_dictionary) @@ -1763,27 +1772,26 @@ def get_cpu_times(jobreport_dictionary): return cpu_conversion_unit, total_cpu_time, conversion_factor -def get_exit_info(jobreport_dictionary): +def get_exit_info(jobreport_dictionary: dict) -> (int, str): """ Return the exit code (exitCode) and exit message (exitMsg). + E.g. (0, 'OK'). :param jobreport_dictionary: - :return: exit_code, exit_message + :return: exit_code (int), exit_message (str). """ + return jobreport_dictionary.get('exitCode'), jobreport_dictionary.get('exitMsg') - return jobreport_dictionary['exitCode'], jobreport_dictionary['exitMsg'] - -def cleanup_looping_payload(workdir): +def cleanup_looping_payload(workdir: str): """ Run a special cleanup for looping payloads. + Remove any root and tmp files. - :param workdir: working directory (string) - :return: + :param workdir: working directory (str). """ - for (root, _, files) in os.walk(workdir): for filename in files: if 'pool.root' in filename: @@ -1792,17 +1800,16 @@ def cleanup_looping_payload(workdir): remove(path) -def cleanup_payload(workdir, outputfiles=None, removecores=True): +def cleanup_payload(workdir: str, outputfiles: list = None, removecores: bool = True): """ - Cleanup of payload (specifically AthenaMP) sub directories prior to log file creation. + Clean up payload (specifically AthenaMP) sub-directories prior to log file creation. + Also remove core dumps. - :param workdir: working directory (string). - :param outputfiles: list of output files. - :param removecores: remove core files if True (Boolean). - :return: + :param workdir: working directory (str) + :param outputfiles: list of output files (list) + :param removecores: remove core files if True (bool). """ - if outputfiles is None: outputfiles = [] @@ -1826,14 +1833,12 @@ def cleanup_payload(workdir, outputfiles=None, removecores=True): remove(path) -def get_redundant_path(): +def get_redundant_path() -> str: """ - Return the path to the file containing the redundant files - and directories to be removed prior to log file creation. + Return the path to the file containing the redundant files and directories to be removed prior to log file creation. - :return: file path (string). + :return: file path (str). """ - filename = config.Pilot.redundant # correct /cvmfs if necessary @@ -1843,16 +1848,16 @@ def get_redundant_path(): return filename -def get_redundants(): +def get_redundants() -> list: """ Get list of redundant files and directories (to be removed). + The function will return the content of an external file. It that can't be read, then a list defined in this function will be returned instead. Any updates to the external file must be propagated to this function. - :return: files and directories list + :return: files and directories (list). """ - # try to read the list from the external file filename = get_redundant_path() @@ -1941,16 +1946,14 @@ def get_redundants(): return dir_list -def remove_archives(workdir): +def remove_archives(workdir: str): """ - Explicitly remove any soft linked archives (.a files) since - they will be dereferenced by the tar command + Explicitly remove any soft linked archives (.a files) since they will be dereferenced by the tar command. + (--dereference option). - :param workdir: working directory (string) - :return: + :param workdir: working directory (str). """ - matches = [] for root, _, filenames in os.walk(workdir): for filename in fnmatch.filter(filenames, '*.a'): @@ -1963,14 +1966,12 @@ def remove_archives(workdir): remove(match) -def cleanup_broken_links(workdir): +def cleanup_broken_links(workdir: str): """ Run a second pass to clean up any broken links prior to log file creation. - :param workdir: working directory (string) - :return: + :param workdir: working directory (str). """ - broken = [] for root, _, files in os.walk(workdir): for filename in files: @@ -1989,28 +1990,25 @@ def cleanup_broken_links(workdir): remove(brok) -def list_work_dir(workdir): +def list_work_dir(workdir: str): """ Execute ls -lF for the given directory and dump to log. - :param workdir: directory name (string). + :param workdir: directory name (str). """ - cmd = f'ls -lF {workdir}' _, stdout, stderr = execute(cmd) logger.debug(f'{stdout}:\n' + stderr) -def remove_special_files(workdir, dir_list, outputfiles): +def remove_special_files(workdir: str, dir_list: list, outputfiles: list): """ Remove list of special files from the workdir. - :param workdir: work directory (string). - :param dir_list: list of special files (list). + :param workdir: work directory (str) + :param dir_list: list of special files (list) :param outputfiles: output files (list). - :return: """ - # note: these should be partial file/dir names, not containing any wildcards exceptions_list = ["runargs", "runwrapper", "jobReport", "log.", "xrdlog"] @@ -2041,19 +2039,17 @@ def remove_special_files(workdir, dir_list, outputfiles): remove_dir_tree(item) -def remove_redundant_files(workdir, outputfiles=None, piloterrors=[], debugmode=False): +def remove_redundant_files(workdir: str, outputfiles: list = None, piloterrors: list = [], debugmode: bool = False): """ Remove redundant files and directories prior to creating the log file. Note: in debug mode, any core files should not be removed before creating the log. - :param workdir: working directory (string). - :param outputfiles: list of protected output files (list). - :param errors: list of Pilot assigned error codes (list). - :param debugmode: True if debug mode has been switched on (Boolean). - :return: + :param workdir: working directory (str) + :param outputfiles: list of protected output files (list) + :param errors: list of Pilot assigned error codes (list) + :param debugmode: True if debug mode has been switched on (bool). """ - if outputfiles is None: outputfiles = [] @@ -2108,17 +2104,16 @@ def remove_redundant_files(workdir, outputfiles=None, piloterrors=[], debugmode= list_work_dir(workdir) -def download_command(process, workdir): +def download_command(process: dict, workdir: str) -> dict: """ Download the pre/postprocess commands if necessary. Process FORMAT: {'command': , 'args': , 'label': } - :param process: pre/postprocess dictionary. - :param workdir: job workdir (string). - :return: updated pre/postprocess dictionary. + :param process: pre/postprocess dictionary (dict) + :param workdir: job workdir (str) + :return: updated pre/postprocess dictionary (dict). """ - cmd = process.get('command', '') # download the command if necessary @@ -2135,14 +2130,13 @@ def download_command(process, workdir): return process -def get_utility_commands(order=None, job=None): +def get_utility_commands(order: int = None, job: Any = None) -> dict or None: """ - Return a dictionary of utility commands and arguments to be executed - in parallel with the payload. This could e.g. be memory and network - monitor commands. A separate function can be used to determine the - corresponding command setups using the utility command name. If the - optional order parameter is set, the function should return the list - of corresponding commands. + Return a dictionary of utility commands and arguments to be executed in parallel with the payload. + + This could e.g. be memory and network monitor commands. A separate function can be used to determine the + corresponding command setups using the utility command name. If the optional order parameter is set, the + function should return the list of corresponding commands. For example: @@ -2164,7 +2158,6 @@ def get_utility_commands(order=None, job=None): :param job: optional job object. :return: dictionary of utilities to be executed in parallel with the payload. """ - if order == UTILITY_BEFORE_PAYLOAD and job.preprocess: return get_precopostprocess_command(job.preprocess, job.workdir, 'preprocess') @@ -2201,36 +2194,36 @@ def get_utility_commands(order=None, job=None): return None -def get_precopostprocess_command(process, workdir, label): +def get_precopostprocess_command(process: dict, workdir: str, label: str) -> dict: """ Return the pre/co/post-process command dictionary. Command FORMAT: {'command': , 'args': , 'label': } The returned command has the structure: { 'command': , } - :param process: pre/co/post-process (dictionary). - :param workdir: working directory (string). - :param label: label (string). - :return: command (dictionary). - """ + :param process: pre/co/post-process (dict) + :param workdir: working directory (str) + :param label: label (str) + :return: command (dict). + """ com = {} if process.get('command', ''): com = download_command(process, workdir) com['label'] = label com['ignore_failure'] = False + return com -def get_utility_after_payload_started(): +def get_utility_after_payload_started() -> dict: """ Return the command dictionary for the utility after the payload has started. Command FORMAT: {'command': , 'args': , 'label': } - :return: command (dictionary). + :return: command (dict). """ - com = {} try: cmd = config.Pilot.utility_after_payload_started @@ -2239,39 +2232,38 @@ def get_utility_after_payload_started(): else: if cmd: com = {'command': cmd, 'args': '', 'label': cmd.lower(), 'ignore_failure': True} + return com -def get_xcache_command(catchall, workdir, jobid, label, xcache_function): +def get_xcache_command(catchall: str, workdir: str, jobid: str, label: str, xcache_function: Any) -> dict: """ Return the proper xcache command for either activation or deactivation. Command FORMAT: {'command': , 'args': , 'label': } - :param catchall: queuedata catchall field (string). - :param workdir: job working directory (string). - :param jobid: PanDA job id (string). - :param label: label (string). - :param xcache_function: activation/deactivation function name (function). - :return: command (dictionary). + :param catchall: queuedata catchall field (str) + :param workdir: job working directory (str) + :param jobid: PanDA job id (str) + :param label: label (str) + :param xcache_function: activation/deactivation function name (Any) + :return: command (dict). """ - com = {} if 'pilotXcache' in catchall: com = xcache_function(jobid=jobid, workdir=workdir) com['label'] = label com['ignore_failure'] = True + return com -def post_prestagein_utility_command(**kwargs): +def post_prestagein_utility_command(**kwargs: dict): """ Execute any post pre-stage-in utility commands. - :param kwargs: kwargs (dictionary). - :return: + :param kwargs: kwargs (dict). """ - label = kwargs.get('label', 'unknown_label') stdout = kwargs.get('output', None) @@ -2288,14 +2280,12 @@ def post_prestagein_utility_command(**kwargs): logger.debug(f'cmd={cmd}:\n\n{_stdout}\n\n') -def xcache_proxy(output): +def xcache_proxy(output: str): """ Extract env vars from xcache stdout and set them. - :param output: command output (string). - :return: + :param output: command output (str). """ - # loop over each line in the xcache stdout and identify the needed environmental variables for line in output.split('\n'): if 'ALRB_XCACHE_PROXY' in line: @@ -2326,34 +2316,31 @@ def xcache_proxy(output): ) -def set_xcache_var(line, name='', pattern=''): +def set_xcache_var(line: str, name: str = '', pattern: str = ''): """ Extract the value of a given environmental variable from a given stdout line. - :param line: line from stdout to be investigated (string). - :param name: name of env var (string). - :param pattern: regex pattern (string). - :return: + :param line: line from stdout to be investigated (str) + :param name: name of env var (str) + :param pattern: regular expression pattern (str). """ - pattern = re.compile(pattern) result = re.findall(pattern, line) if result: os.environ[name] = result[0] -def xcache_activation_command(workdir='', jobid=''): +def xcache_activation_command(workdir: str = '', jobid: str = '') -> dict: """ Return the xcache service activation command. Note: the workdir is not used here, but the function prototype needs it in the called (xcache_activation_command needs it). - :param workdir: unused work directory - do not remove (string). - :param jobid: PanDA job id to guarantee that xcache process is unique (int). - :return: xcache command (string). + :param workdir: unused work directory - do not remove (str) + :param jobid: PanDA job id to guarantee that xcache process is unique (int) + :return: xcache command (str). """ - # a successful startup will set ALRB_XCACHE_PROXY and ALRB_XCACHE_PROXY_REMOTE # so any file access with root://... should be replaced with one of # the above (depending on whether you are on the same machine or not) @@ -2370,20 +2357,20 @@ def xcache_activation_command(workdir='', jobid=''): return {'command': command, 'args': ''} -def xcache_deactivation_command(workdir='', jobid=''): +def xcache_deactivation_command(workdir: str = '', jobid: str = '') -> dict: """ Return the xcache service deactivation command. + This service should be stopped after the payload has finished. Copy the messages log before shutting down. Note: the job id is not used here, but the function prototype needs it in the called (xcache_activation_command needs it). - :param workdir: payload work directory (string). - :param jobid: unused job id - do not remove (string). - :return: xcache command (string). + :param workdir: payload work directory (str) + :param jobid: unused job id - do not remove (str) + :return: xcache command (dict). """ - path = os.environ.get('ALRB_XCACHE_LOG', None) if path and os.path.exists(path): logger.debug(f'copying xcache messages log file ({path}) to work dir ({workdir})') @@ -2403,17 +2390,17 @@ def xcache_deactivation_command(workdir='', jobid=''): return {'command': command, 'args': '-p $ALRB_XCACHE_MYPROCESS'} -def get_utility_command_setup(name, job, setup=None): +def get_utility_command_setup(name: str, job: Any, setup: str = None) -> str: """ Return the proper setup for the given utility command. + If a payload setup is specified, then the utility command string should be prepended to it. - :param name: name of utility (string). - :param job: job object. - :param setup: optional payload setup string. - :return: utility command setup (string). + :param name: name of utility (str) + :param job: job object (Any) + :param setup: optional payload setup string (str) + :return: utility command setup (str). """ - if name == 'MemoryMonitor': # must know if payload is running in a container or not # (enables search for pid in ps output) @@ -2461,14 +2448,13 @@ def get_utility_command_setup(name, job, setup=None): return "" -def get_utility_command_execution_order(name): +def get_utility_command_execution_order(name: str) -> int: """ - Should the given utility command be executed before or after the payload? + Determine if the given utility command should be executed before or after the payload. - :param name: utility name (string). - :return: execution order constant. + :param name: utility name (str) + :return: execution order constant (int). """ - # example implementation if name == 'NetworkMonitor': return UTILITY_WITH_PAYLOAD @@ -2477,56 +2463,52 @@ def get_utility_command_execution_order(name): return UTILITY_AFTER_PAYLOAD_STARTED logger.warning(f'unknown utility name: {name}') + return UTILITY_AFTER_PAYLOAD_STARTED -def post_utility_command_action(name, job): +def post_utility_command_action(name: str, job: Any): """ Perform post action for given utility command. - :param name: name of utility command (string). - :param job: job object. - :return: + :param name: name of utility command (str) + :param job: job object (Any). """ - if name == 'NetworkMonitor': pass elif name == 'MemoryMonitor': post_memory_monitor_action(job) -def get_utility_command_kill_signal(name): +def get_utility_command_kill_signal(name: str) -> int: """ Return the proper kill signal used to stop the utility command. - :param name: name of utility command (string). - :return: kill signal + :param name: name of utility command (str) + :return: kill signal (int). """ - # note that the NetworkMonitor does not require killing (to be confirmed) return SIGUSR1 if name == 'MemoryMonitor' else SIGTERM -def get_utility_command_output_filename(name, selector=None): +def get_utility_command_output_filename(name: str, selector: bool = None) -> str: """ Return the filename to the output of the utility command. - :param name: utility name (string). - :param selector: optional special conditions flag (boolean). - :return: filename (string). + :param name: utility name (str) + :param selector: optional special conditions flag (bool) + :return: filename (str). """ - return get_memory_monitor_summary_filename(selector=selector) if name == 'MemoryMonitor' else "" -def verify_lfn_length(outdata): +def verify_lfn_length(outdata: list) -> (int, str): """ Make sure that the LFNs are all within the allowed length. - :param outdata: FileSpec object. - :return: error code (int), diagnostics (string). + :param outdata: list of FileSpec objects (list) + :return: error code (int), diagnostics (str). """ - exitcode = 0 diagnostics = "" max_length = 255 @@ -2542,14 +2524,12 @@ def verify_lfn_length(outdata): return exitcode, diagnostics -def verify_ncores(corecount): +def verify_ncores(corecount: int): """ - Verify that nCores settings are correct + Verify that nCores settings are correct. :param corecount: number of cores (int). - :return: """ - try: del os.environ['ATHENA_PROC_NUMBER_JOB'] logger.debug("unset existing ATHENA_PROC_NUMBER_JOB") @@ -2577,17 +2557,17 @@ def verify_ncores(corecount): f"(ATHENA_PROC_NUMBER will not be overwritten)") -def verify_job(job): +def verify_job(job: Any) -> bool: """ Verify job parameters for specific errors. + Note: in case of problem, the function should set the corresponding pilot error code using: job.piloterrorcodes, job.piloterrordiags = errors.add_error_code(error.get_error_code()) - :param job: job object - :return: Boolean. + :param job: job object (Any) + :return: True if verified, False otherwise (bool). """ - status = False # are LFNs of correct lengths? @@ -2605,27 +2585,24 @@ def verify_job(job): return status -def update_stagein(job): +def update_stagein(job: Any): """ Skip DBRelease files during stage-in. - :param job: job object. - :return: + :param job: job object (Any). """ - for fspec in job.indata: if 'DBRelease' in fspec.lfn: fspec.status = 'no_transfer' -def get_metadata(workdir): +def get_metadata(workdir: str) -> dict or None: """ Return the metadata from file. - :param workdir: work directory (string) - :return: + :param workdir: work directory (str) + :return: metadata (dict). """ - path = os.path.join(workdir, config.Payload.jobreport) metadata = read_file(path) if os.path.exists(path) else None logger.debug(f'metadata={metadata}') @@ -2633,26 +2610,24 @@ def get_metadata(workdir): return metadata -def should_update_logstash(frequency=10): +def should_update_logstash(frequency: int = 10) -> bool: """ - Should logstash be updated with prmon dictionary? + Determine if logstash should be updated with prmon dictionary. - :param frequency: - :return: return True once per 'frequency' times. + :param frequency: update frequency (int) + :return: return True once per 'frequency' times (bool). """ return randint(0, frequency - 1) == 0 -def update_server(job): +def update_server(job: Any) -> None: """ Perform any user specific server actions. E.g. this can be used to send special information to a logstash. - :param job: job object. - :return: + :param job: job object (Any). """ - # attempt to read memory_monitor_output.txt and convert it to json if not should_update_logstash(): logger.debug('no need to update logstash for this job') @@ -2692,15 +2667,15 @@ def update_server(job): msg = 'no prmon json available - cannot send anything to logstash server' logger.warning(msg) + return -def preprocess_debug_command(job): + +def preprocess_debug_command(job: Any): """ Pre-process the debug command in debug mode. - :param job: Job object. - :return: + :param job: Job object (Any). """ - # Should the pilot do the setup or does jobPars already contain the information? preparesetup = should_pilot_prepare_setup(job.noexecstrcnv, job.jobparams) # get the general setup command and then verify it if required @@ -2715,8 +2690,10 @@ def preprocess_debug_command(job): job.debug_command = cmd + job.debug_command -def process_debug_command(debug_command, pandaid): +def process_debug_command(debug_command: str, pandaid: str) -> str: """ + Process the debug command in debug mode. + In debug mode, the server can send a special debug command to the piloti via the updateJob backchannel. This function can be used to process that command, i.e. to identify a proper pid to debug (which is unknown @@ -2727,16 +2704,13 @@ def process_debug_command(debug_command, pandaid): (hardcoded) process will be that of athena.py. The pilot will find the corresponding pid. - :param debug_command: debug command (string). - :param pandaid: PanDA id (string). - :return: updated debug command (string). + :param debug_command: debug command (str) + :param pandaid: PanDA id (str) + :return: updated debug command (str). """ - if '--pid %' not in debug_command: return debug_command - pandaid_pid = None - # replace the % with the pid for athena.py # note: if athena.py is not yet running, the --pid % will remain. # Otherwise the % will be replaced by the pid first find the pid @@ -2782,23 +2756,23 @@ def process_debug_command(debug_command, pandaid): return debug_command -def allow_timefloor(submitmode): +def allow_timefloor(submitmode: str) -> bool: """ - Should the timefloor mechanism (multi-jobs) be allowed for the given submit mode? + Decide if the timefloor mechanism (for multi-jobs) should be allowed for the given submit mode. - :param submitmode: submit mode (string). + :param submitmode: submit mode (str) + :return: always True for ATLAS (bool). """ - return True -def get_pilot_id(jobid): +def get_pilot_id(jobid: int) -> str: """ Get the pilot id from the environment variable GTAG. + Update if necessary (not for ATLAS since we want the same pilot id for all multi-jobs). - :param jobid: PanDA job id - UNUSED (int). - :return: pilot id (string). + :param jobid: PanDA job id - UNUSED (int) + :return: pilot id (str). """ - return os.environ.get("GTAG", "unknown")