Skip to content

Commit

Permalink
Profile RucioConMon memory
Browse files Browse the repository at this point in the history
fix parameters
  • Loading branch information
amaltaro committed Aug 30, 2024
1 parent 707fcee commit 570d1f6
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 29 deletions.
66 changes: 66 additions & 0 deletions bin/testRucioConMonMem.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import os
import sys
import logging
from memory_profiler import profile
from WMCore.Services.RucioConMon.RucioConMon import RucioConMon

RSE_NAME = "T2_AT_Vienna"
RUCIO_CONMON_URL = "https://cmsweb.cern.ch/rucioconmon/unmerged"

def loggerSetup(logLevel=logging.INFO):
logger = logging.getLogger(__name__)
outHandler = logging.StreamHandler(sys.stdout)
outHandler.setFormatter(logging.Formatter("%(asctime)s:%(levelname)s:%(module)s: %(message)s"))
outHandler.setLevel(logLevel)
logger.addHandler(outHandler)
logger.setLevel(logLevel)
return logger


profileFp = open('getUnmergedFiles.log', 'w+')
@profile(stream=profileFp)
def getUnmergedFiles(rucioConMon, logger, compressed=False):
dirs = set()
counter = 0
logger.info("Fetching data from Rucio ConMon for RSE: %s.", RSE_NAME)
for lfn in rucioConMon.getRSEUnmerged(RSE_NAME, zipped=compressed):
dirPath = _cutPath(lfn)
dirs.add(dirPath)
counter =+ 1
logger.info(f"Total files received: {counter}, unique dirs: {len(dirs)}")
return dirs


def _cutPath(filePath):
newPath = []
root = filePath
while True:
root, tail = os.path.split(root)
if tail:
newPath.append(tail)
else:
newPath.append(root)
break
newPath.reverse()
# Cut/slice the path to the level/element required.
newPath = newPath[:7]
# Build the path out of all that is found up to the deepest level in the LFN tree
finalPath = os.path.join(*newPath)
return finalPath


def main():
logger = loggerSetup()
zipped=False
rucioConMon = RucioConMon(RUCIO_CONMON_URL, logger=logger)
logger.info(f"Fetching unmerged dump for RSE: {RSE_NAME} with compressed data: {zipped}")
getUnmergedFiles(rucioConMon, logger, compressed=zipped)

zipped=True
logger.info(f"Fetching unmerged dump for RSE: {RSE_NAME} with compressed data: {zipped}")
getUnmergedFiles(rucioConMon, logger, compressed=zipped)
logger.info("Done!")


if __name__ == "__main__":
sys.exit(main())
57 changes: 28 additions & 29 deletions src/python/WMCore/Services/RucioConMon/RucioConMon.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import gzip
import json
import logging
from memory_profiler import profile

from WMCore.Services.Service import Service
from Utils.Utilities import decodeBytesToUnicode
Expand All @@ -40,14 +41,13 @@ def __init__(self, url, logger=None, configDict=None):
super(RucioConMon, self).__init__(configDict)
self['logger'].debug("Initializing RucioConMon with url: %s", self['endpoint'])

def _getResult(self, uri, callname="", clearCache=False, args=None, binary=False):
def _getResult(self, uri, callname="", clearCache=False, args=None):
"""
Either fetch data from the cache file or query the data-service
:param uri: The endpoint uri
:param callname: alias for caller function
:param clearCache: parameter to control the cache behavior
:param args: additional parameters to HTTP request call
:param binary: specifies request for binary object from HTTP requests (e.g. zipped content)
:return: A dictionary
"""

Expand All @@ -68,31 +68,26 @@ def _getResult(self, uri, callname="", clearCache=False, args=None, binary=False
if clearCache:
self.clearCache(cachedApi, args)
results = '{}' # explicitly define results which will be loaded by json.loads below
if binary:
with self.refreshCache(cachedApi, apiUrl, decoder=False, binary=True) as istream:
results = gzip.decompress(istream.read())
return results
else:
with self.refreshCache(cachedApi, apiUrl) as istream:
results = istream.read()

results = json.loads(results)
return results
with self.refreshCache(cachedApi, apiUrl, decoder=True, binary=False) as istream:
results = istream.read()
return json.loads(results)

def _getResultZipped(self, uri, callname="", clearCache=True, args=None):
def _getResultZipped(self, uri, callname="", clearCache=True):
"""
This method is retrieving a zipped file from the uri privided, instead
of the normal json
This method retrieves gzipped content, instead of the standard json format.
:param uri: The endpoint uri
:param callname: alias for caller function
:param clearCache: parameter to control the cache behavior
:param args: additional parameters to HTTP request call
:return: a list of LFNs
:return: yields a single record from the data retrieved
"""
data = self._getResult(uri, callname, clearCache, args, binary=True)
# convert bytes which we received upstream to string
data = decodeBytesToUnicode(data)
return [f for f in data.split('\n') if f]
cachedApi = callname
if clearCache:
self.clearCache(cachedApi)

with self.refreshCache(cachedApi, uri, decoder=False, binary=True) as istream:
for line in istream:
line = decodeBytesToUnicode(line).replace("\n", "")
yield line

def getRSEStats(self):
"""
Expand All @@ -104,25 +99,29 @@ def getRSEStats(self):
rseStats = self._getResult(uri, callname='stats')
return rseStats

profileFp = open('getRSEUnmerged.log', 'w+')
@profile(stream=profileFp)
def getRSEUnmerged(self, rseName, zipped=False):
"""
Gets the list of all unmerged files in an RSE
:param rseName: The RSE whose list of unmerged files to be retrieved
:param zipped: If True the interface providing the zipped lists will be called
:return: A list of unmerged files for the RSE in question
:return: a generator of unmerged files for the RSE in question
"""
# NOTE: The default API provided by Rucio Consistency Monitor is in a form of a
# zipped file/stream. Currently we are using the newly provided json API
# But in in case we figure out the data is too big we may need to
# implement the method with the zipped API and use disc cache for
# reading/streaming from file. This will prevent any set arithmetic
# in the future.
if not zipped:
uri = "files?rse=%s&format=json" % rseName
rseUnmerged = self._getResult(uri, callname=rseName)
return rseUnmerged
else:
if zipped:
uri = "files?rse=%s&format=raw" % rseName
callname = '{}.zipped'.format(rseName)
rseUnmerged = self._getResultZipped(uri, callname=callname, clearCache=True)
return rseUnmerged
rseUnmerged = self._getResultZipped(uri, callname=callname, clearCache=True)
else:
uri = "files?rse=%s&format=json" % rseName
callname = '{}.json'.format(rseName)
rseUnmerged = self._getResult(uri, callname=callname)
# now lazily return items
for item in rseUnmerged:
yield item

0 comments on commit 570d1f6

Please sign in to comment.