From 00ad5803ff9f4eb353858aae7b03244ccc9940b7 Mon Sep 17 00:00:00 2001 From: Alan Malta Rodrigues Date: Mon, 29 Jul 2024 16:36:33 -0400 Subject: [PATCH] Log each step in the pipeline cleaning up RSE --- .../MicroService/MSUnmerged/MSUnmerged.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/src/python/WMCore/MicroService/MSUnmerged/MSUnmerged.py b/src/python/WMCore/MicroService/MSUnmerged/MSUnmerged.py index 5e797c08eb3..cdf4f956bae 100644 --- a/src/python/WMCore/MicroService/MSUnmerged/MSUnmerged.py +++ b/src/python/WMCore/MicroService/MSUnmerged/MSUnmerged.py @@ -309,6 +309,7 @@ def cleanRSE(self, rse): :param rse: MSUnmergedRSE object to be cleaned :return: The MSUnmergedRSE object """ + self.logger.info("Start cleaning files for RSE: %s.", rse['name']) # Create the gfal2 context object: try: @@ -538,6 +539,7 @@ def consRecordAge(self, rse): :return: rse or raises MSUnmergedPlineExit """ rseName = rse['name'] + self.logger.info("Evaluating consistency record agent for RSE: %s.", rse['name']) if rseName not in self.rseConsStats: msg = "RSE: %s Missing in stats records at Rucio Consistency Monitor. " % rseName @@ -615,6 +617,7 @@ def getUnmergedFiles(self, rse): :param rse: The RSE to work on :return: rse """ + self.logger.info("Fetching data from Rucio ConMon for RSE: %s.", rse['name']) rse['files']['allUnmerged'] = self.rucioConMon.getRSEUnmerged(rse['name']) for filePath in rse['files']['allUnmerged']: # Check if what we start with is under /store/unmerged/* @@ -669,8 +672,11 @@ def filterUnmergedFiles(self, rse): :param rse: The RSE to work on :return: rse """ + self.logger.info("Filtering unmerged files for RSE: %s.", rse['name']) rse['dirs']['toDelete'] = rse['dirs']['allUnmerged'] - self.protectedLFNs rse['dirs']['protected'] = rse['dirs']['allUnmerged'] & self.protectedLFNs + self.logger.info("Pre-filter counts for allUnmerged: %s, toDelete: %s, protected: %s.", + len(rse['dirs']['allUnmerged']), len(rse['dirs']['toDelete']), len(rse['dirs']['protected'])) # The following check may seem redundant, but better stay safe than sorry if not (rse['dirs']['toDelete'] | rse['dirs']['protected']) == rse['dirs']['allUnmerged']: @@ -733,6 +739,9 @@ def genFunc(pattern, iterable): # Now apply the filters back to the set in rse['dirs']['toDelete'] rse['dirs']['toDelete'] = set(rse['files']['toDelete'].keys()) + self.logger.info("Post-filter counts for allUnmerged: %s, toDelete: %s, protected: %s.", + len(rse['dirs']['allUnmerged']), len(rse['dirs']['toDelete']), len(rse['dirs']['protected'])) + # Update the counters: rse['counters']['dirsToDelete'] = len(rse['files']['toDelete']) self.logger.info("RSE: %s: %s", rse['name'], twFormat(rse, maxLength=8)) @@ -747,6 +756,7 @@ def getPfn(self, rse): :param rse: The RSE to be checked :return: rse """ + self.logger.info("Fetching PFN map for RSE: %s.", rse['name']) # NOTE: pfnPrefix here is considered the full part of the pfn up to the # beginning of the lfn part rather than just the protocol prefix if rse['files']['allUnmerged']: @@ -771,6 +781,7 @@ def purgeRseObj(self, rse, dumpRSEtoLog=False): :param dumpRSEToLog: Dump the whole RSEobject into the service log. :return: rse """ + self.logger.info("Purging RSE in-memory information for RSE: %s.", rse['name']) msg = "\n----------------------------------------------------------" msg += "\nMSUnmergedRSE: \n%s" msg += "\n----------------------------------------------------------" @@ -788,6 +799,8 @@ def updateRSETimestamps(self, rse, start=True, end=True): :param rse: The RSE to work on :return: rse """ + self.logger.info("Updating timestamps for RSE: %s. With start: %s, end: %s.", rse['name'], start, end) + rseName = rse['name'] currTime = time() @@ -820,6 +833,8 @@ def updateServiceCounters(self, rse): :param pName: The pipeline name whose counters to be updated :return: rse """ + self.logger.info("Updating service counters for RSE: %s.", rse['name']) + pName = self.plineUnmerged.name self.plineCounters[pName]['totalNumFiles'] += rse['counters']['totalNumFiles'] self.plineCounters[pName]['totalNumDirs'] += rse['counters']['totalNumDirs'] @@ -869,7 +884,7 @@ def uploadRSEToMongoDB(self, rse, fullRSEToDB=False, overwrite=True): :return: rse """ try: - self.logger.info("RSE: %s Writing rse data to MongoDB.", rse['name']) + self.logger.info("Uploading RSE information to MongoDB for RSE: %s.", rse['name']) rse.writeRSEToMongoDB(self.msUnmergedColl, fullRSEToDB=fullRSEToDB, overwrite=overwrite, retryCount=self.msConfig['mongoDBRetryCount']) except NotPrimaryError: msg = "Could not write RSE to MongoDB for the maximum of %s mongoDBRetryCounts configured." % self.msConfig['mongoDBRetryCount']