Skip to content

Commit

Permalink
Log each step in the pipeline cleaning up RSE
Browse files Browse the repository at this point in the history
  • Loading branch information
amaltaro committed Jul 29, 2024
1 parent bd337e7 commit 00ad580
Showing 1 changed file with 16 additions and 1 deletion.
17 changes: 16 additions & 1 deletion src/python/WMCore/MicroService/MSUnmerged/MSUnmerged.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ def cleanRSE(self, rse):
:param rse: MSUnmergedRSE object to be cleaned
:return: The MSUnmergedRSE object
"""
self.logger.info("Start cleaning files for RSE: %s.", rse['name'])

# Create the gfal2 context object:
try:
Expand Down Expand Up @@ -538,6 +539,7 @@ def consRecordAge(self, rse):
:return: rse or raises MSUnmergedPlineExit
"""
rseName = rse['name']
self.logger.info("Evaluating consistency record agent for RSE: %s.", rse['name'])

if rseName not in self.rseConsStats:
msg = "RSE: %s Missing in stats records at Rucio Consistency Monitor. " % rseName
Expand Down Expand Up @@ -615,6 +617,7 @@ def getUnmergedFiles(self, rse):
:param rse: The RSE to work on
:return: rse
"""
self.logger.info("Fetching data from Rucio ConMon for RSE: %s.", rse['name'])
rse['files']['allUnmerged'] = self.rucioConMon.getRSEUnmerged(rse['name'])
for filePath in rse['files']['allUnmerged']:
# Check if what we start with is under /store/unmerged/*
Expand Down Expand Up @@ -669,8 +672,11 @@ def filterUnmergedFiles(self, rse):
:param rse: The RSE to work on
:return: rse
"""
self.logger.info("Filtering unmerged files for RSE: %s.", rse['name'])
rse['dirs']['toDelete'] = rse['dirs']['allUnmerged'] - self.protectedLFNs
rse['dirs']['protected'] = rse['dirs']['allUnmerged'] & self.protectedLFNs
self.logger.info("Pre-filter counts for allUnmerged: %s, toDelete: %s, protected: %s.",
len(rse['dirs']['allUnmerged']), len(rse['dirs']['toDelete']), len(rse['dirs']['protected']))

# The following check may seem redundant, but better stay safe than sorry
if not (rse['dirs']['toDelete'] | rse['dirs']['protected']) == rse['dirs']['allUnmerged']:
Expand Down Expand Up @@ -733,6 +739,9 @@ def genFunc(pattern, iterable):
# Now apply the filters back to the set in rse['dirs']['toDelete']
rse['dirs']['toDelete'] = set(rse['files']['toDelete'].keys())

self.logger.info("Post-filter counts for allUnmerged: %s, toDelete: %s, protected: %s.",
len(rse['dirs']['allUnmerged']), len(rse['dirs']['toDelete']), len(rse['dirs']['protected']))

# Update the counters:
rse['counters']['dirsToDelete'] = len(rse['files']['toDelete'])
self.logger.info("RSE: %s: %s", rse['name'], twFormat(rse, maxLength=8))
Expand All @@ -747,6 +756,7 @@ def getPfn(self, rse):
:param rse: The RSE to be checked
:return: rse
"""
self.logger.info("Fetching PFN map for RSE: %s.", rse['name'])
# NOTE: pfnPrefix here is considered the full part of the pfn up to the
# beginning of the lfn part rather than just the protocol prefix
if rse['files']['allUnmerged']:
Expand All @@ -771,6 +781,7 @@ def purgeRseObj(self, rse, dumpRSEtoLog=False):
:param dumpRSEToLog: Dump the whole RSEobject into the service log.
:return: rse
"""
self.logger.info("Purging RSE in-memory information for RSE: %s.", rse['name'])
msg = "\n----------------------------------------------------------"
msg += "\nMSUnmergedRSE: \n%s"
msg += "\n----------------------------------------------------------"
Expand All @@ -788,6 +799,8 @@ def updateRSETimestamps(self, rse, start=True, end=True):
:param rse: The RSE to work on
:return: rse
"""
self.logger.info("Updating timestamps for RSE: %s. With start: %s, end: %s.", rse['name'], start, end)

rseName = rse['name']
currTime = time()

Expand Down Expand Up @@ -820,6 +833,8 @@ def updateServiceCounters(self, rse):
:param pName: The pipeline name whose counters to be updated
:return: rse
"""
self.logger.info("Updating service counters for RSE: %s.", rse['name'])

pName = self.plineUnmerged.name
self.plineCounters[pName]['totalNumFiles'] += rse['counters']['totalNumFiles']
self.plineCounters[pName]['totalNumDirs'] += rse['counters']['totalNumDirs']
Expand Down Expand Up @@ -869,7 +884,7 @@ def uploadRSEToMongoDB(self, rse, fullRSEToDB=False, overwrite=True):
:return: rse
"""
try:
self.logger.info("RSE: %s Writing rse data to MongoDB.", rse['name'])
self.logger.info("Uploading RSE information to MongoDB for RSE: %s.", rse['name'])
rse.writeRSEToMongoDB(self.msUnmergedColl, fullRSEToDB=fullRSEToDB, overwrite=overwrite, retryCount=self.msConfig['mongoDBRetryCount'])
except NotPrimaryError:
msg = "Could not write RSE to MongoDB for the maximum of %s mongoDBRetryCounts configured." % self.msConfig['mongoDBRetryCount']
Expand Down

0 comments on commit 00ad580

Please sign in to comment.