diff --git a/pilot/api/analytics.py b/pilot/api/analytics.py index 56d82865..ed11abe1 100644 --- a/pilot/api/analytics.py +++ b/pilot/api/analytics.py @@ -59,8 +59,8 @@ def fit(self, x: list, y: list, model: str = "linear") -> Any: """ try: self._fit = Fit(x=x, y=y, model=model) - except Exception as e: - raise UnknownException(e) + except Exception as exc: + raise UnknownException(exc) from exc return self._fit @@ -71,12 +71,10 @@ def slope(self) -> float: :raises NotDefined: exception thrown if fit is not defined. :return: slope (float). """ - if self._fit: - slope = self._fit.slope() - else: + if not self._fit: raise NotDefined("Fit has not been defined") - return slope + return self._fit.slope() def intersect(self) -> float: """ @@ -87,10 +85,8 @@ def intersect(self) -> float: """ if not self._fit: raise NotDefined("Fit has not been defined") - else: - _intersect = self._fit.intersect() - return _intersect + return self._fit.intersect() def chi2(self) -> float: """ @@ -99,12 +95,10 @@ def chi2(self) -> float: :raises NotDefined: exception thrown if fit is not defined :return: chi2 (float). """ - if self._fit: - x2 = self._fit.chi2() - else: + if not self._fit: raise NotDefined("Fit has not been defined") - return x2 + return self._fit.chi2() def get_table(self, filename: str, header: str = "", separator: str = "\t", convert_to_float: bool = True) -> dict: """ @@ -139,7 +133,8 @@ def get_fitted_data( :return: {"slope": slope, "chi2": chi2} (dict). """ slope = "" - chi2 = "" + intersect = "" + _chi2 = "" table = self.get_table(filename) if table: @@ -198,24 +193,18 @@ def get_fitted_data( fit = self.fit(x, y) _slope = self.slope() except Exception as exc: - logger.warning( - "failed to fit data, x=%s, y=%s: %s", str(x), str(y), exc - ) + logger.warning(f"failed to fit data, x={x}, y={y}: {exc}") else: if _slope: - slope = float_to_rounded_string( - fit.slope(), precision=precision - ) - chi2 = float_to_rounded_string(fit.chi2(), precision=precision) + slope = float_to_rounded_string(fit.slope(), precision=precision) + intersect = float_to_rounded_string(fit.intersect(), precision=precision) + _chi2 = float_to_rounded_string(fit.chi2(), precision=precision) if slope != "": logger.info( - "current memory leak: %s B/s (using %d data points, chi2=%s)", - slope, - len(x), - chi2, + f"current memory leak: {slope} B/s (using {len(x)} data points, chi2={_chi2})" ) - return {"slope": slope, "chi2": chi2} + return {"slope": slope, "chi2": _chi2, "intersect": intersect} def find_limit( self, _x, _y, _chi2_org, norg, change_limit=0.25, edge="right", steps=5 @@ -244,8 +233,8 @@ def find_limit( if change < change_limit: found = True break - else: - _chi2_prev = _chi2 + + _chi2_prev = _chi2 if edge == "right": if not found: @@ -254,13 +243,12 @@ def find_limit( else: limit = len(_x) - 1 logger.info(f"right removable region: {limit}") + elif not found: + limit = 0 + logger.info("left removable region not found") else: - if not found: - limit = 0 - logger.info("left removable region not found") - else: - limit = iterations * 10 - logger.info(f"left removable region: {limit}") + limit = iterations * 10 + logger.info(f"left removable region: {limit}") return limit @@ -293,7 +281,7 @@ def extract_from_table(self, table, x_name, y_name): return x, y -class Fit(object): +class Fit(): """Low-level fitting class.""" _model = "linear" # fitting model diff --git a/pilot/user/atlas/jobmetrics.py b/pilot/user/atlas/jobmetrics.py index 493d36b1..6115ecdc 100644 --- a/pilot/user/atlas/jobmetrics.py +++ b/pilot/user/atlas/jobmetrics.py @@ -17,36 +17,46 @@ # under the License. # # Authors: -# - Paul Nilsson, paul.nilsson@cern.ch, 2018-23 +# - Paul Nilsson, paul.nilsson@cern.ch, 2018-24 +"""Functions for building job metrics.""" + +import logging import os import re -import logging +from typing import Any from pilot.api import analytics from pilot.common.exception import FileHandlingFailure from pilot.util.config import config from pilot.util.jobmetrics import get_job_metrics_entry -from pilot.util.features import MachineFeatures, JobFeatures -from pilot.util.filehandling import find_last_line, read_file +from pilot.util.features import ( + MachineFeatures, + JobFeatures +) +from pilot.util.filehandling import ( + find_last_line, + read_file +) from pilot.util.math import float_to_rounded_string - from .cpu import get_core_count -from .common import get_db_info, get_resimevents +from .common import ( + get_db_info, + get_resimevents +) from .utilities import get_memory_monitor_output_filename logger = logging.getLogger(__name__) -def get_job_metrics_string(job, extra={}): +def get_job_metrics_string(job: Any, extra: dict = {}) -> str: """ Get the job metrics string. - :param job: job object + :param job: job object (Any) :param extra: any extra information to be added (dict) - :return: job metrics (string). + :return: job metrics (str). """ - job_metrics = "" # report core count (will also set corecount in job object) @@ -78,7 +88,7 @@ def get_job_metrics_string(job, extra={}): job_metrics += get_job_metrics_entry("resimevents", job.resimevents) # get the max disk space used by the payload (at the end of a job) - if job.state == "finished" or job.state == "failed" or job.state == "holding": + if job.state in {"finished", "failed", "holding"}: max_space = job.get_max_workdir_size() zero = 0 @@ -114,14 +124,13 @@ def get_job_metrics_string(job, extra={}): return job_metrics -def get_trace_exit_code(workdir): +def get_trace_exit_code(workdir: str) -> str: """ Look for any rucio trace curl problems using an env var and a file. :param workdir: payload work directory (str) :return: curl exit code (str). """ - trace_exit_code = os.environ.get('RUCIO_TRACE_ERROR', '0') if trace_exit_code == '0': # look for rucio_trace_error_file in case middleware container is used @@ -137,21 +146,21 @@ def get_trace_exit_code(workdir): return trace_exit_code -def add_features(job_metrics, corecount, add=[]): +def add_features(job_metrics: str, corecount: int, add: list = []) -> str: """ - Add job and machine feature data to the job metrics if available + Add job and machine feature data to the job metrics if available. + If a non-empty add list is specified, only include the corresponding features. If empty/not specified, add all. - :param job_metrics: job metrics (string). - :param corecount: core count (int). - :param add: features to be added (list). - :return: updated job metrics (string). + :param job_metrics: job metrics (str) + :param corecount: core count (int) + :param add: features to be added (list) + :return: updated job metrics (str). """ - if job_metrics and not job_metrics.endswith(' '): job_metrics += ' ' - def add_sub_features(job_metrics, features_dic, add=[]): + def add_sub_features(features_dic, add=[]): features_str = '' for key in features_dic.keys(): if add and key not in add: @@ -176,48 +185,49 @@ def add_sub_features(job_metrics, features_dic, add=[]): logger.warning(f'cannot process hs06 machine feature: {exc} (hs06={hs06}, total_cpu={total_cpu}, corecount={corecount})') features_list = [machinefeatures, jobfeatures] for feature_item in features_list: - features_str = add_sub_features(job_metrics, feature_item, add=add) + features_str = add_sub_features(feature_item, add=add) if features_str: job_metrics += features_str return job_metrics -def add_analytics_data(job_metrics, workdir, state): +def add_analytics_data(job_metrics: str, workdir: str, state: str) -> str: """ Add the memory leak+chi2 analytics data to the job metrics. - :param job_metrics: job metrics (string). - :param workdir: work directory (string). - :param state: job state (string). - :return: updated job metrics (string). + :param job_metrics: job metrics (str) + :param workdir: work directory (str) + :param state: job state (str) + :return: updated job metrics (str). """ - path = os.path.join(workdir, get_memory_monitor_output_filename()) if os.path.exists(path): client = analytics.Analytics() # do not include tails on final update - tails = False if (state == "finished" or state == "failed" or state == "holding") else True + tails = not (state in {"finished", "failed", "holding"}) data = client.get_fitted_data(path, tails=tails) slope = data.get("slope", "") chi2 = data.get("chi2", "") + intersect = data.get("intersect", "") if slope != "": job_metrics += get_job_metrics_entry("leak", slope) if chi2 != "": job_metrics += get_job_metrics_entry("chi2", chi2) + if intersect != "": + job_metrics += get_job_metrics_entry("intersect", intersect) return job_metrics -def add_event_number(job_metrics, workdir): +def add_event_number(job_metrics: str, workdir: str) -> str: """ - Extract event number from file and add to job metrics if it exists + Extract event number from file and add to job metrics if it exists. - :param job_metrics: job metrics (string). - :param workdir: work directory (string). - :return: updated job metrics (string). + :param job_metrics: job metrics (str) + :param workdir: work directory (str) + :return: updated job metrics (str). """ - path = os.path.join(workdir, 'eventLoopHeartBeat.txt') if os.path.exists(path): last_line = find_last_line(path) @@ -231,7 +241,7 @@ def add_event_number(job_metrics, workdir): return job_metrics -def get_job_metrics(job, extra={}): +def get_job_metrics(job: Any, extra: dict = {}) -> str: """ Return a properly formatted job metrics string. The format of the job metrics string is defined by the server. It will be reported to the server during updateJob. @@ -269,7 +279,7 @@ def get_job_metrics(job, extra={}): return job_metrics -def get_number_in_string(line, pattern=r'\ done\ processing\ event\ \#(\d+)\,'): +def get_number_in_string(line: str, pattern: str = r'\ done\ processing\ event\ \#(\d+)\,') -> int: """ Extract a number from the given string. @@ -277,11 +287,10 @@ def get_number_in_string(line, pattern=r'\ done\ processing\ event\ \#(\d+)\,'): done processing event #20166959, run #276689 22807 events read so far <<<=== This function will return 20166959 as in int. - :param line: line from a file (string). - :param pattern: reg ex pattern (raw string). + :param line: line from a file (str) + :param pattern: reg ex pattern (raw str) :return: extracted number (int). """ - event_number = None match = re.search(pattern, line) if match: