Skip to content

Commit

Permalink
Remove sub-directories before going full scale with file deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
amaltaro committed Oct 2, 2024
1 parent 9c9a4d6 commit f01c556
Showing 1 changed file with 94 additions and 52 deletions.
146 changes: 94 additions & 52 deletions src/python/WMCore/MicroService/MSUnmerged/MSUnmerged.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,10 +306,11 @@ def cleanRSE(self, rse):
The method to implement the actual deletion of files for an RSE.
Order of deletion attempts is:
1. top directory
2. list sub-directories and files
3. remove each file (unlink)
4. remove the (now) empty sub-directories
5. try to remove the top directory again
2. sub-directories
3. list sub-directories and files
4. remove each file (unlink)
5. remove the (now) empty sub-directories
6. try to remove the top directory again
:param rse: MSUnmergedRSE object to be cleaned
:return: The MSUnmergedRSE object
"""
Expand Down Expand Up @@ -339,63 +340,75 @@ def cleanRSE(self, rse):
if not self.msConfig['enableRealMode']:
self.logger.info("DRY-RUN: would delete directory PFN: %s for RSE: %s", dirPfn, rse['name'])
else:
# The following two bool flags are to track the success for directory removal
# during all consecutive attempts/steps of cleaning the current branch.
filesDeletedSuccess = 0
filesDeletedFail = 0

# Initially try to delete the whole directory even before emptying its content:
self.logger.info("Trying to remove the whole directory: %s", dirPfn)
rmdirSuccess = self._rmDir(ctx, dirPfn)

if rmdirSuccess:
self.logger.info("Directory successfully removed: %s", dirPfn)
rse['counters']['dirsDeletedSuccess'] += 1
else:
self.logger.info("Failed to remove the whole directory. Listing its content and deleting file by file.")
listFiles = self._listDir(ctx, dirPfn)
self.logger.info("Starting deletion of %s files:", len(listFiles))
for pfnSlice in list(grouper(listFiles, self.msConfig["filesToDeleteSliceSize"])):
self.logger.info("Executing file slice removal for %s files...", len(pfnSlice))
try:
# returns None if deletion was successful
for resp in ctx.unlink(pfnSlice):
if resp is None:
filesDeletedSuccess += 1
else:
filesDeletedFail += 1
errMessage = os.strerror(resp.code)
rse['counters']['gfalErrors'].setdefault(errMessage, 0)
rse['counters']['gfalErrors'][errMessage] += 1
except Exception as ex:
msg = "Error while cleaning RSE: %s. "
msg += "Will retry in the next cycle. Err: %s"
self.logger.exception(msg, rse['name'], str(ex))

self.logger.info("RSE: %s, Dir: %s, filesDeletedSuccess: %s, filesDeletedFail: %s",
rse['name'], dirLfn, filesDeletedSuccess, filesDeletedFail)

# now reverse engineer the deepest directory names and delete each one of them
setDirPfn = set()
for item in listFiles:
setDirPfn.add(os.path.dirname(item))

for subDir in setDirPfn:
rmdirSuccess = self._rmDir(ctx, subDir)
if rmdirSuccess:
self.logger.info("Sub-directory successfully removed: %s", subDir)
else:
self.logger.info("Sub-directory failed to be removed: %s", subDir)

# lastly, try to delete the original directory
if self._rmDir(ctx, dirPfn):
self.logger.info("Finally, directory successfully removed: %s", dirPfn)
rse['dirs']['deletedSuccess'].add(dirLfn)
continue

# Next, try to delete sub-directories then
self.logger.info("Then trying to list and remove all sub-directories for: %s", dirPfn)
rmdirSuccess = self._rmSubDir(ctx, dirPfn)
if rmdirSuccess:
self.logger.info("ALL sub-directories successfully removed. Now deleting the parent: %s", dirPfn)
if self._rmDir(ctx, dirPfn) is True:
rse['counters']['dirsDeletedSuccess'] += 1
rse['dirs']['deletedSuccess'].add(dirLfn)
continue

# IF we are here, that means we did not manage to delete some/all of the directories
msg = "Failed to delete the parent and/or its sub-directories. "
msg += "Going to retrieve a list of files and remove them in bulk operations."
self.logger.warning(msg)
listFiles = self._listDir(ctx, dirPfn)
# The following two bool flags are to track the success for directory removal
# during all consecutive attempts/steps of cleaning the current branch.
filesDeletedSuccess = 0
filesDeletedFail = 0
self.logger.info("Starting deletion of %s files:", len(listFiles))
for pfnSlice in list(grouper(listFiles, self.msConfig["filesToDeleteSliceSize"])):
self.logger.info("Executing file slice removal for %s files...", len(pfnSlice))
try:
# returns None if deletion was successful
for resp in ctx.unlink(pfnSlice):
if resp is None:
filesDeletedSuccess += len(pfnSlice)
else:
filesDeletedFail += len(pfnSlice)
errMessage = os.strerror(resp.code)
rse['counters']['gfalErrors'].setdefault(errMessage, 0)
rse['counters']['gfalErrors'][errMessage] += 1
except Exception as ex:
msg = "Error while cleaning RSE: %s. "
msg += "Will retry in the next cycle. Err: %s"
self.logger.exception(msg, rse['name'], str(ex))

self.logger.info("RSE: %s, Dir: %s, filesDeletedSuccess: %s, filesDeletedFail: %s",
rse['name'], dirLfn, filesDeletedSuccess, filesDeletedFail)

# Now reverse engineer the deepest directory names and delete each one of them, likely all empty dirs
setDirPfn = set()
for item in listFiles:
setDirPfn.add(os.path.dirname(item))
for subDir in setDirPfn:
rmdirSuccess = self._rmDir(ctx, subDir)
if rmdirSuccess:
self.logger.info("Empty sub-directory successfully removed: %s", subDir)
else:
self.logger.info("Directory still fails to be removed: %s", dirPfn)
rse['counters']['dirsDeletedFail'] += 1
rse['dirs']['deletedFail'].add(dirLfn)
self.logger.info("Empty sub-directory failed to be removed: %s", subDir)

# lastly, try to delete the original directory
if self._rmDir(ctx, dirPfn):
self.logger.info("Finally, directory successfully removed: %s", dirPfn)
rse['counters']['dirsDeletedSuccess'] += 1
rse['dirs']['deletedSuccess'].add(dirLfn)
else:
self.logger.info("Directory still fails to be removed: %s", dirPfn)
rse['counters']['dirsDeletedFail'] += 1
rse['dirs']['deletedFail'].add(dirLfn)

# Updating the RSE counters with the newly successfully deleted files
rse['counters']['filesDeletedSuccess'] += filesDeletedSuccess
Expand All @@ -418,6 +431,7 @@ def _rmDir(self, ctx, dirPfn):
"""
try:
# NOTE: For gfal2 rmdir() exit status of 0 is success
self.logger.info("Deleting directory: %s", dirPfn)
rmdirSuccess = ctx.rmdir(dirPfn) == 0
except gfal2.GError as gfalExc:
if gfalExc.code == errno.ENOENT:
Expand All @@ -428,6 +442,34 @@ def _rmDir(self, ctx, dirPfn):
rmdirSuccess = False
return rmdirSuccess

def _rmSubDir(self, ctx, dirPfn):
"""
Similar to the _rmDir method, but it first lists the content of a given
directory, identify directories with a cheap logic and tries to delete
each of the sub-directories.
:param ctx: Gfal Context Manager object.
:param dirPfn: The Pfn of the directory to be removed
:return: Bool: True if the removal was successful, False otherwise
"""
# list with status of the directory removal (True is success, False otherwise)
rm_status = []
try:
self.logger.info("Listing content for parent directory: %s", dirPfn)
for entry in ctx.listdir(dirPfn):
if not entry.endswith(".root"):
# then assume it is a directory
subDir = os.path.join(dirPfn, entry)
rm_status.append(self._rmDir(ctx, subDir))
except gfal2.GError as gfalExc:
self.logger.error("FAILED to list directory: %s: gfalException: %s, gfalErrorCode: %s",
dirPfn, str(gfalExc), gfalExc.code)
return False

self.logger.info("Sub-directories successful deletion for %s and failed for %d directories.",
rm_status.count(True), rm_status.count(False))
# return True (success) only if all sub-directories were deleted
return (rm_status.count(True) == len(rm_status))

def _listDir(self, ctx, dirPfn):
"""
Recursively lists all files in the given directory and its subdirectories.
Expand Down

0 comments on commit f01c556

Please sign in to comment.