forked from PanDAWMS/pilot3
-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Paul Nilsson
committed
Mar 8, 2024
1 parent
836bc15
commit cc35a2e
Showing
2 changed files
with
72 additions
and
75 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,36 +17,46 @@ | |
# under the License. | ||
# | ||
# Authors: | ||
# - Paul Nilsson, [email protected], 2018-23 | ||
# - Paul Nilsson, [email protected], 2018-24 | ||
|
||
"""Functions for building job metrics.""" | ||
|
||
import logging | ||
import os | ||
import re | ||
import logging | ||
from typing import Any | ||
|
||
from pilot.api import analytics | ||
from pilot.common.exception import FileHandlingFailure | ||
from pilot.util.config import config | ||
from pilot.util.jobmetrics import get_job_metrics_entry | ||
from pilot.util.features import MachineFeatures, JobFeatures | ||
from pilot.util.filehandling import find_last_line, read_file | ||
from pilot.util.features import ( | ||
MachineFeatures, | ||
JobFeatures | ||
) | ||
from pilot.util.filehandling import ( | ||
find_last_line, | ||
read_file | ||
) | ||
from pilot.util.math import float_to_rounded_string | ||
|
||
from .cpu import get_core_count | ||
from .common import get_db_info, get_resimevents | ||
from .common import ( | ||
get_db_info, | ||
get_resimevents | ||
) | ||
from .utilities import get_memory_monitor_output_filename | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
def get_job_metrics_string(job, extra={}): | ||
def get_job_metrics_string(job: Any, extra: dict = {}) -> str: | ||
""" | ||
Get the job metrics string. | ||
:param job: job object | ||
:param job: job object (Any) | ||
:param extra: any extra information to be added (dict) | ||
:return: job metrics (string). | ||
:return: job metrics (str). | ||
""" | ||
|
||
job_metrics = "" | ||
|
||
# report core count (will also set corecount in job object) | ||
|
@@ -78,7 +88,7 @@ def get_job_metrics_string(job, extra={}): | |
job_metrics += get_job_metrics_entry("resimevents", job.resimevents) | ||
|
||
# get the max disk space used by the payload (at the end of a job) | ||
if job.state == "finished" or job.state == "failed" or job.state == "holding": | ||
if job.state in {"finished", "failed", "holding"}: | ||
max_space = job.get_max_workdir_size() | ||
zero = 0 | ||
|
||
|
@@ -114,14 +124,13 @@ def get_job_metrics_string(job, extra={}): | |
return job_metrics | ||
|
||
|
||
def get_trace_exit_code(workdir): | ||
def get_trace_exit_code(workdir: str) -> str: | ||
""" | ||
Look for any rucio trace curl problems using an env var and a file. | ||
:param workdir: payload work directory (str) | ||
:return: curl exit code (str). | ||
""" | ||
|
||
trace_exit_code = os.environ.get('RUCIO_TRACE_ERROR', '0') | ||
if trace_exit_code == '0': | ||
# look for rucio_trace_error_file in case middleware container is used | ||
|
@@ -137,21 +146,21 @@ def get_trace_exit_code(workdir): | |
return trace_exit_code | ||
|
||
|
||
def add_features(job_metrics, corecount, add=[]): | ||
def add_features(job_metrics: str, corecount: int, add: list = []) -> str: | ||
""" | ||
Add job and machine feature data to the job metrics if available | ||
Add job and machine feature data to the job metrics if available. | ||
If a non-empty add list is specified, only include the corresponding features. If empty/not specified, add all. | ||
:param job_metrics: job metrics (string). | ||
:param corecount: core count (int). | ||
:param add: features to be added (list). | ||
:return: updated job metrics (string). | ||
:param job_metrics: job metrics (str) | ||
:param corecount: core count (int) | ||
:param add: features to be added (list) | ||
:return: updated job metrics (str). | ||
""" | ||
|
||
if job_metrics and not job_metrics.endswith(' '): | ||
job_metrics += ' ' | ||
|
||
def add_sub_features(job_metrics, features_dic, add=[]): | ||
def add_sub_features(features_dic, add=[]): | ||
features_str = '' | ||
for key in features_dic.keys(): | ||
if add and key not in add: | ||
|
@@ -176,48 +185,49 @@ def add_sub_features(job_metrics, features_dic, add=[]): | |
logger.warning(f'cannot process hs06 machine feature: {exc} (hs06={hs06}, total_cpu={total_cpu}, corecount={corecount})') | ||
features_list = [machinefeatures, jobfeatures] | ||
for feature_item in features_list: | ||
features_str = add_sub_features(job_metrics, feature_item, add=add) | ||
features_str = add_sub_features(feature_item, add=add) | ||
if features_str: | ||
job_metrics += features_str | ||
|
||
return job_metrics | ||
|
||
|
||
def add_analytics_data(job_metrics, workdir, state): | ||
def add_analytics_data(job_metrics: str, workdir: str, state: str) -> str: | ||
""" | ||
Add the memory leak+chi2 analytics data to the job metrics. | ||
:param job_metrics: job metrics (string). | ||
:param workdir: work directory (string). | ||
:param state: job state (string). | ||
:return: updated job metrics (string). | ||
:param job_metrics: job metrics (str) | ||
:param workdir: work directory (str) | ||
:param state: job state (str) | ||
:return: updated job metrics (str). | ||
""" | ||
|
||
path = os.path.join(workdir, get_memory_monitor_output_filename()) | ||
if os.path.exists(path): | ||
client = analytics.Analytics() | ||
# do not include tails on final update | ||
tails = False if (state == "finished" or state == "failed" or state == "holding") else True | ||
tails = not (state in {"finished", "failed", "holding"}) | ||
data = client.get_fitted_data(path, tails=tails) | ||
slope = data.get("slope", "") | ||
chi2 = data.get("chi2", "") | ||
intersect = data.get("intersect", "") | ||
if slope != "": | ||
job_metrics += get_job_metrics_entry("leak", slope) | ||
if chi2 != "": | ||
job_metrics += get_job_metrics_entry("chi2", chi2) | ||
if intersect != "": | ||
job_metrics += get_job_metrics_entry("intersect", intersect) | ||
|
||
return job_metrics | ||
|
||
|
||
def add_event_number(job_metrics, workdir): | ||
def add_event_number(job_metrics: str, workdir: str) -> str: | ||
""" | ||
Extract event number from file and add to job metrics if it exists | ||
Extract event number from file and add to job metrics if it exists. | ||
:param job_metrics: job metrics (string). | ||
:param workdir: work directory (string). | ||
:return: updated job metrics (string). | ||
:param job_metrics: job metrics (str) | ||
:param workdir: work directory (str) | ||
:return: updated job metrics (str). | ||
""" | ||
|
||
path = os.path.join(workdir, 'eventLoopHeartBeat.txt') | ||
if os.path.exists(path): | ||
last_line = find_last_line(path) | ||
|
@@ -231,7 +241,7 @@ def add_event_number(job_metrics, workdir): | |
return job_metrics | ||
|
||
|
||
def get_job_metrics(job, extra={}): | ||
def get_job_metrics(job: Any, extra: dict = {}) -> str: | ||
""" | ||
Return a properly formatted job metrics string. | ||
The format of the job metrics string is defined by the server. It will be reported to the server during updateJob. | ||
|
@@ -269,19 +279,18 @@ def get_job_metrics(job, extra={}): | |
return job_metrics | ||
|
||
|
||
def get_number_in_string(line, pattern=r'\ done\ processing\ event\ \#(\d+)\,'): | ||
def get_number_in_string(line: str, pattern: str = r'\ done\ processing\ event\ \#(\d+)\,') -> int: | ||
""" | ||
Extract a number from the given string. | ||
E.g. file eventLoopHeartBeat.txt contains | ||
done processing event #20166959, run #276689 22807 events read so far <<<=== | ||
This function will return 20166959 as in int. | ||
:param line: line from a file (string). | ||
:param pattern: reg ex pattern (raw string). | ||
:param line: line from a file (str) | ||
:param pattern: reg ex pattern (raw str) | ||
:return: extracted number (int). | ||
""" | ||
|
||
event_number = None | ||
match = re.search(pattern, line) | ||
if match: | ||
|