From 570d1f691a3b20298d8433f055b5da56008f152f Mon Sep 17 00:00:00 2001 From: Alan Malta Rodrigues Date: Fri, 30 Aug 2024 11:34:13 -0400 Subject: [PATCH] Profile RucioConMon memory fix parameters --- bin/testRucioConMonMem.py | 66 +++++++++++++++++++ .../Services/RucioConMon/RucioConMon.py | 57 ++++++++-------- 2 files changed, 94 insertions(+), 29 deletions(-) create mode 100644 bin/testRucioConMonMem.py diff --git a/bin/testRucioConMonMem.py b/bin/testRucioConMonMem.py new file mode 100644 index 0000000000..602f3c83ff --- /dev/null +++ b/bin/testRucioConMonMem.py @@ -0,0 +1,66 @@ +import os +import sys +import logging +from memory_profiler import profile +from WMCore.Services.RucioConMon.RucioConMon import RucioConMon + +RSE_NAME = "T2_AT_Vienna" +RUCIO_CONMON_URL = "https://cmsweb.cern.ch/rucioconmon/unmerged" + +def loggerSetup(logLevel=logging.INFO): + logger = logging.getLogger(__name__) + outHandler = logging.StreamHandler(sys.stdout) + outHandler.setFormatter(logging.Formatter("%(asctime)s:%(levelname)s:%(module)s: %(message)s")) + outHandler.setLevel(logLevel) + logger.addHandler(outHandler) + logger.setLevel(logLevel) + return logger + + +profileFp = open('getUnmergedFiles.log', 'w+') +@profile(stream=profileFp) +def getUnmergedFiles(rucioConMon, logger, compressed=False): + dirs = set() + counter = 0 + logger.info("Fetching data from Rucio ConMon for RSE: %s.", RSE_NAME) + for lfn in rucioConMon.getRSEUnmerged(RSE_NAME, zipped=compressed): + dirPath = _cutPath(lfn) + dirs.add(dirPath) + counter =+ 1 + logger.info(f"Total files received: {counter}, unique dirs: {len(dirs)}") + return dirs + + +def _cutPath(filePath): + newPath = [] + root = filePath + while True: + root, tail = os.path.split(root) + if tail: + newPath.append(tail) + else: + newPath.append(root) + break + newPath.reverse() + # Cut/slice the path to the level/element required. + newPath = newPath[:7] + # Build the path out of all that is found up to the deepest level in the LFN tree + finalPath = os.path.join(*newPath) + return finalPath + + +def main(): + logger = loggerSetup() + zipped=False + rucioConMon = RucioConMon(RUCIO_CONMON_URL, logger=logger) + logger.info(f"Fetching unmerged dump for RSE: {RSE_NAME} with compressed data: {zipped}") + getUnmergedFiles(rucioConMon, logger, compressed=zipped) + + zipped=True + logger.info(f"Fetching unmerged dump for RSE: {RSE_NAME} with compressed data: {zipped}") + getUnmergedFiles(rucioConMon, logger, compressed=zipped) + logger.info("Done!") + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/src/python/WMCore/Services/RucioConMon/RucioConMon.py b/src/python/WMCore/Services/RucioConMon/RucioConMon.py index 5a695e6727..bc3305f48e 100644 --- a/src/python/WMCore/Services/RucioConMon/RucioConMon.py +++ b/src/python/WMCore/Services/RucioConMon/RucioConMon.py @@ -14,6 +14,7 @@ import gzip import json import logging +from memory_profiler import profile from WMCore.Services.Service import Service from Utils.Utilities import decodeBytesToUnicode @@ -40,14 +41,13 @@ def __init__(self, url, logger=None, configDict=None): super(RucioConMon, self).__init__(configDict) self['logger'].debug("Initializing RucioConMon with url: %s", self['endpoint']) - def _getResult(self, uri, callname="", clearCache=False, args=None, binary=False): + def _getResult(self, uri, callname="", clearCache=False, args=None): """ Either fetch data from the cache file or query the data-service :param uri: The endpoint uri :param callname: alias for caller function :param clearCache: parameter to control the cache behavior :param args: additional parameters to HTTP request call - :param binary: specifies request for binary object from HTTP requests (e.g. zipped content) :return: A dictionary """ @@ -68,31 +68,26 @@ def _getResult(self, uri, callname="", clearCache=False, args=None, binary=False if clearCache: self.clearCache(cachedApi, args) results = '{}' # explicitly define results which will be loaded by json.loads below - if binary: - with self.refreshCache(cachedApi, apiUrl, decoder=False, binary=True) as istream: - results = gzip.decompress(istream.read()) - return results - else: - with self.refreshCache(cachedApi, apiUrl) as istream: - results = istream.read() - - results = json.loads(results) - return results + with self.refreshCache(cachedApi, apiUrl, decoder=True, binary=False) as istream: + results = istream.read() + return json.loads(results) - def _getResultZipped(self, uri, callname="", clearCache=True, args=None): + def _getResultZipped(self, uri, callname="", clearCache=True): """ - This method is retrieving a zipped file from the uri privided, instead - of the normal json + This method retrieves gzipped content, instead of the standard json format. :param uri: The endpoint uri :param callname: alias for caller function :param clearCache: parameter to control the cache behavior - :param args: additional parameters to HTTP request call - :return: a list of LFNs + :return: yields a single record from the data retrieved """ - data = self._getResult(uri, callname, clearCache, args, binary=True) - # convert bytes which we received upstream to string - data = decodeBytesToUnicode(data) - return [f for f in data.split('\n') if f] + cachedApi = callname + if clearCache: + self.clearCache(cachedApi) + + with self.refreshCache(cachedApi, uri, decoder=False, binary=True) as istream: + for line in istream: + line = decodeBytesToUnicode(line).replace("\n", "") + yield line def getRSEStats(self): """ @@ -104,12 +99,14 @@ def getRSEStats(self): rseStats = self._getResult(uri, callname='stats') return rseStats + profileFp = open('getRSEUnmerged.log', 'w+') + @profile(stream=profileFp) def getRSEUnmerged(self, rseName, zipped=False): """ Gets the list of all unmerged files in an RSE :param rseName: The RSE whose list of unmerged files to be retrieved :param zipped: If True the interface providing the zipped lists will be called - :return: A list of unmerged files for the RSE in question + :return: a generator of unmerged files for the RSE in question """ # NOTE: The default API provided by Rucio Consistency Monitor is in a form of a # zipped file/stream. Currently we are using the newly provided json API @@ -117,12 +114,14 @@ def getRSEUnmerged(self, rseName, zipped=False): # implement the method with the zipped API and use disc cache for # reading/streaming from file. This will prevent any set arithmetic # in the future. - if not zipped: - uri = "files?rse=%s&format=json" % rseName - rseUnmerged = self._getResult(uri, callname=rseName) - return rseUnmerged - else: + if zipped: uri = "files?rse=%s&format=raw" % rseName callname = '{}.zipped'.format(rseName) - rseUnmerged = self._getResultZipped(uri, callname=callname, clearCache=True) - return rseUnmerged + rseUnmerged = self._getResultZipped(uri, callname=callname, clearCache=True) + else: + uri = "files?rse=%s&format=json" % rseName + callname = '{}.json'.format(rseName) + rseUnmerged = self._getResult(uri, callname=callname) + # now lazily return items + for item in rseUnmerged: + yield item