Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of the token-safe retry logic for gfal #12191

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
35 changes: 31 additions & 4 deletions src/python/WMCore/Storage/Backends/GFAL2Impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""
import argparse
import os
import logging

from WMCore.Storage.Registry import registerStageOutImpl
from WMCore.Storage.StageOutImpl import StageOutImpl
Expand All @@ -24,7 +25,24 @@ def __init__(self, stagein=False):
# Next commands after separation are executed without env -i and this leads us with
# mixed environment with COMP and system python.
# GFAL2 is not build under COMP environment and it had failures with mixed environment.
self.setups = "env -i X509_USER_PROXY=$X509_USER_PROXY JOBSTARTDIR=$JOBSTARTDIR bash -c '{}'"
self.setups = "env -i JOBSTARTDIR=$JOBSTARTDIR bash -c '{}'" # Default initialization, it is tweaked in createStageOutCommand depending on the authentication method
self.removeCommand = self.setups.format('. $JOBSTARTDIR/startup_environment.sh; date; gfal-rm -t 600 {}')
self.copyOpts = '-t 2400 -T 2400 -p -v --abort-on-failure {checksum} {options} {source} {destination}'
self.copyCommand = self.setups.format('. $JOBSTARTDIR/startup_environment.sh; date; gfal-copy ' + self.copyOpts)

def adjustSetup(self, auth_method=None):
"""
Adjust the `self.setups` based on the selected authentication method and regenerate commands.
"""
if auth_method == "X509":
self.setups = "env -i X509_USER_PROXY=$X509_USER_PROXY JOBSTARTDIR=$JOBSTARTDIR bash -c '{}'"
elif auth_method == "TOKEN":
self.setups = "env -i BEARER_TOKEN=$(cat $BEARER_TOKEN_FILE) JOBSTARTDIR=$JOBSTARTDIR bash -c '{}'"
else:
logging.info("Warning! Running gfal without either a X509 certificate or a token!")
self.setups = "env -i JOBSTARTDIR=$JOBSTARTDIR bash -c '{}'"

# Regenerate dependent commands
self.removeCommand = self.setups.format('. $JOBSTARTDIR/startup_environment.sh; date; gfal-rm -t 600 {}')
self.copyOpts = '-t 2400 -T 2400 -p -v --abort-on-failure {checksum} {options} {source} {destination}'
self.copyCommand = self.setups.format('. $JOBSTARTDIR/startup_environment.sh; date; gfal-copy ' + self.copyOpts)
Expand Down Expand Up @@ -113,16 +131,20 @@ def buildCopyCommandDict(self, sourcePFN, targetPFN, options=None, checksums=Non

return copyCommandDict

def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=None):
def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=None, auth_method=None):
"""
Create gfal-cp command for stageOut

:sourcePFN: str, PFN of the source file
:targetPFN: str, destination PFN
:options: str, additional options for gfal-cp
:checksums: dict, collect checksums according to the algorithms saved as keys
:auth_method: str, the authentication method to be used ("X509", "TOKEN", or None)
"""
# Adjust the setup
self.adjustSetup(auth_method)

# Construct the gfal-cp command
copyCommandDict = self.buildCopyCommandDict(sourcePFN, targetPFN, options, checksums)
copyCommand = self.copyCommand.format_map(copyCommandDict)
result = "#!/bin/bash\n" + copyCommand
Expand All @@ -137,11 +159,11 @@ def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=No
{remove_command}
fi
exit $EXIT_STATUS
""".format(remove_command=self.createRemoveFileCommand(targetPFN))
""".format(remove_command=self.createRemoveFileCommand(targetPFN))

return result

def createDebuggingCommand(self, sourcePFN, targetPFN, options=None, checksums=None):
def createDebuggingCommand(self, sourcePFN, targetPFN, options=None, checksums=None, auth_method=None):
"""
Debug a failed gfal-cp command for stageOut, without re-running it,
providing information on the environment and the certifications
Expand All @@ -150,8 +172,13 @@ def createDebuggingCommand(self, sourcePFN, targetPFN, options=None, checksums=N
:targetPFN: str, destination PFN
:options: str, additional options for gfal-cp
:checksums: dict, collect checksums according to the algorithms saved as keys
:auth_method: str, the authentication method to be used ("X509", "TOKEN", or None)
"""

# Adjust the setup
self.adjustSetup(auth_method)

# Build the gfal-cp command for debugging purposes
copyCommandDict = self.buildCopyCommandDict(sourcePFN, targetPFN, options, checksums)
copyCommand = self.copyCommand.format_map(copyCommandDict)

Expand Down
57 changes: 45 additions & 12 deletions src/python/WMCore/Storage/StageOutImpl.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def createOutputDirectory(self, targetPFN):
If no directory is required, do not implement this method
"""

def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=None):
def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=None, auth_method=None):
"""
_createStageOutCommand_

Expand All @@ -142,7 +142,7 @@ def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=No
"""
raise NotImplementedError("StageOutImpl.createStageOutCommand")

def createDebuggingCommand(self, sourcePFN, targetPFN, options=None, checksums=None):
def createDebuggingCommand(self, sourcePFN, targetPFN, options=None, checksums=None, auth_method=None):
"""
Build a shell command that will report in the logs the details about
failing stageOut commands
Expand Down Expand Up @@ -178,18 +178,17 @@ def __call__(self, protocol, inputPFN, targetPFN, options=None, checksums=None):

This operator does the actual stage out by invoking the overridden
plugin methods of the derived object.


"""
# //

# //
# // Generate the source PFN from the plain PFN if needed
# //
sourcePFN = self.createSourceName(protocol, inputPFN)

# destination may also need PFN changed
# i.e. if we are staging in a file from an SE
targetPFN = self.createTargetName(protocol, targetPFN)
# //
# //
# // Create the output directory if implemented
# //
for retryCount in range(self.numRetries + 1):
Expand All @@ -203,30 +202,64 @@ def __call__(self, protocol, inputPFN, targetPFN, options=None, checksums=None):
msg += "Error details:\n{}\n".format(str(ex))
logging.error(msg)
if retryCount == self.numRetries:
# //
# //
# // last retry, propagate exception
# //
logging.error("Maximum retries exhausted when trying to create the output directory")
raise ex
time.sleep(self.retryPause)

# //
# // Create the command to be used.
# //
command = self.createStageOutCommand(sourcePFN, targetPFN, options, checksums)
# //
try:
command = self.createStageOutCommand(sourcePFN, targetPFN, options, checksums, auth_method="TOKEN")
except TypeError as ex:
logging.warning("Falling back to default createStageOutCommand due to: %s", str(ex))
command = self.createStageOutCommand(sourcePFN, targetPFN, options, checksums)
# //
# // Run the command
# //

stageOutEx = None # variable to store the possible StageOutError
for retryCount in range(self.numRetries + 1):
try:
logging.info("Running the stage out...")
logging.info("Running the stage out with tokens (attempt %d)...", retryCount + 1)
self.executeCommand(command)
logging.info("Command to run: %s", command)
logging.info("Stage-out succeeded with the current environment.")
break

except StageOutError as ex:
msg = "Attempt {} to stage out failed.\n".format(retryCount)
msg = "Attempt {} to stage out failed with default setup.\n".format(retryCount)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amaltaro @stlammel as default, do we want to set the BEARER_TOKEN env var to force trying with token authentication?

msg += "Error details:\n{}\n".format(str(ex))
logging.error(msg)

logging.info("Retrying with authentication-safe logic...")

# Authentication-safe fallback logic
if os.getenv("X509_USER_PROXY"):
logging.info("Retrying with X509_USER_PROXY after unsetting BEARER_TOKEN...")
os.system("unset BEARER_TOKEN; unset BEARER_TOKEN_FILE")
command = self.createStageOutCommand(sourcePFN, targetPFN, options, checksums, auth_method="X509")
try:
self.executeCommand(command)
logging.info("Stage-out succeeded with X509 after unsetting BEARER_TOKEN.")
return
except StageOutError as fallbackEx:
logging.warning("Fallback with X509_USER_PROXY failed:\n%s", str(fallbackEx))

if os.getenv("BEARER_TOKEN") or os.getenv("BEARER_TOKEN_FILE"):
logging.info("Retrying with BEARER_TOKEN after unsetting X509_USER_PROXY...")
os.system("unset X509_USER_PROXY")
command = self.createStageOutCommand(sourcePFN, targetPFN, options, checksums, auth_method="TOKEN")
try:
self.executeCommand(command)
logging.info("Stage-out succeeded with TOKEN after unsetting X509_USER_PROXY.")
return
except StageOutError as fallbackEx:
logging.warning("Fallback with BEARER_TOKEN failed:\n%s", str(fallbackEx))

if retryCount == self.numRetries:
# Last retry, propagate the information outside of the for loop
stageOutEx = ex
Expand All @@ -236,6 +269,6 @@ def __call__(self, protocol, inputPFN, targetPFN, options=None, checksums=None):
# This block will now always be executed after retries are exhausted
if stageOutEx is not None:
logging.error("Maximum number of retries exhausted. Further details on the failed command reported below.")
command = self.createDebuggingCommand(sourcePFN, targetPFN, options, checksums)
command = self.createDebuggingCommand(sourcePFN, targetPFN, options, checksums, auth_method="TOKEN")
self.executeCommand(command)
raise stageOutEx from None
69 changes: 50 additions & 19 deletions test/python/WMCore_t/Storage_t/Backends_t/GFAL2Impl_t.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ def setUp(self):

def testInit(self):
testGFAL2Impl = GFAL2Impl()
removeCommand = "env -i X509_USER_PROXY=$X509_USER_PROXY JOBSTARTDIR=$JOBSTARTDIR bash -c " \
"'. $JOBSTARTDIR/startup_environment.sh; date; gfal-rm -t 600 {}'"
copyCommand = "env -i X509_USER_PROXY=$X509_USER_PROXY JOBSTARTDIR=$JOBSTARTDIR bash -c '" \
". $JOBSTARTDIR/startup_environment.sh; date; gfal-copy -t 2400 -T 2400 -p " \
"-v --abort-on-failure {checksum} {options} {source} {destination}'"
# The default setup without a token
removeCommand = "env -i JOBSTARTDIR=$JOBSTARTDIR bash -c " \
"'. $JOBSTARTDIR/startup_environment.sh; date; gfal-rm -t 600 {}'"
copyCommand = "env -i JOBSTARTDIR=$JOBSTARTDIR bash -c '" \
". $JOBSTARTDIR/startup_environment.sh; date; gfal-copy -t 2400 -T 2400 -p " \
"-v --abort-on-failure {checksum} {options} {source} {destination}'"
self.assertEqual(removeCommand, testGFAL2Impl.removeCommand)
self.assertEqual(copyCommand, testGFAL2Impl.copyCommand)

Expand Down Expand Up @@ -79,10 +80,23 @@ def testCreateRemoveFileCommand_removeCommand(self, mock_path):
def testCreateStageOutCommand_stageIn(self, mock_createRemoveFileCommand):
self.GFAL2Impl.stageIn = True
mock_createRemoveFileCommand.return_value = "targetPFN2"
result = self.GFAL2Impl.createStageOutCommand("sourcePFN", "targetPFN")

# Call createStageOutCommand with auth_method='TOKEN'
result = self.GFAL2Impl.createStageOutCommand(
"sourcePFN", "targetPFN", auth_method='TOKEN'
)

# Generate the expected result with auth_method='TOKEN'
expectedResult = self.getStageOutCommandResult(
self.getCopyCommandDict("-K adler32", "", "sourcePFN", "targetPFN"), "targetPFN2")
self.getCopyCommandDict("-K adler32", "", "sourcePFN", "targetPFN"),
"targetPFN2",
auth_method="TOKEN"
)

# Assert that the removeFileCommand was called correctly
mock_createRemoveFileCommand.assert_called_with("targetPFN")

# Compare the expected and actual result
self.assertEqual(expectedResult, result)

@mock.patch('WMCore.Storage.Backends.GFAL2Impl.GFAL2Impl.createRemoveFileCommand')
Expand All @@ -94,20 +108,37 @@ def testCreateStageOutCommand_options(self, mock_createRemoveFileCommand):
mock_createRemoveFileCommand.assert_called_with("file:targetPFN")
self.assertEqual(expectedResult, result)

def getCopyCommandDict(self, checksum, options, source, destination):
copyCommandDict = {'checksum': '', 'options': '', 'source': '', 'destination': ''}
copyCommandDict['checksum'] = checksum
copyCommandDict['options'] = options
copyCommandDict['source'] = source
copyCommandDict['destination'] = destination
def getCopyCommandDict(self, checksum, options, source, destination, auth_method=None):
"""
Generate a dictionary for the gfal-copy command, dynamically adjusting for auth_method.
"""
copyCommandDict = {
'checksum': checksum,
'options': options,
'source': source,
'destination': destination
}
return copyCommandDict

def getStageOutCommandResult(self, copyCommandDict, createRemoveFileCommandResult):
def getStageOutCommandResult(self, copyCommandDict, createRemoveFileCommandResult, auth_method=None):
"""
Generate the expected result for the gfal-copy command, including dynamic adjustments for auth_method.
"""
# Adjust the setup based on auth_method
if auth_method == "X509":
setups = "env -i X509_USER_PROXY=$X509_USER_PROXY JOBSTARTDIR=$JOBSTARTDIR bash -c '{}'"
elif auth_method == "TOKEN":
setups = "env -i BEARER_TOKEN=$(cat $BEARER_TOKEN_FILE) JOBSTARTDIR=$JOBSTARTDIR bash -c '{}'"
else:
setups = "env -i JOBSTARTDIR=$JOBSTARTDIR bash -c '{}'"

# Build the copy command dynamically
copyOpts = '-t 2400 -T 2400 -p -v --abort-on-failure {checksum} {options} {source} {destination}'
copyCommand = setups.format('. $JOBSTARTDIR/startup_environment.sh; date; gfal-copy ' + copyOpts)

# Construct the full result
result = "#!/bin/bash\n"

copyCommand = self.copyCommand.format_map(copyCommandDict)
result += copyCommand

result += copyCommand.format_map(copyCommandDict)
result += """
EXIT_STATUS=$?
echo "gfal-copy exit status: $EXIT_STATUS"
Expand All @@ -118,7 +149,7 @@ def getStageOutCommandResult(self, copyCommandDict, createRemoveFileCommandResul
fi
exit $EXIT_STATUS
""".format(remove_command=createRemoveFileCommandResult)

return result

@mock.patch('WMCore.Storage.Backends.GFAL2Impl.os.path')
Expand Down
4 changes: 2 additions & 2 deletions test/python/WMCore_t/Storage_t/StageOutImpl_t.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def testCallable(self, mock_executeCommand, mock_createStageOutCommand, mock_cre
mock_createSourceName.assert_called_with("protocol", "inputPFN")
mock_createTargetName.assert_called_with("protocol", "targetPFN")
mock_createOutputDirectory.assert_called_with("targetPFN")
mock_createStageOutCommand.assert_called_with("sourcePFN", "targetPFN", None, None)
mock_createStageOutCommand.assert_called_with("sourcePFN", "targetPFN", None, None, auth_method='TOKEN')
mock_executeCommand.assert_called_with("command")

@mock.patch('WMCore.Storage.StageOutImpl.StageOutImpl.createSourceName')
Expand All @@ -111,7 +111,7 @@ def testCallable_StageOutError(self, mock_time, mock_executeCommand, mock_create
mock_createSourceName.assert_called_with("protocol", "inputPFN")
mock_createTargetName.assert_called_with("protocol", "targetPFN")
mock_createOutputDirectory.assert_called_with("targetPFN")
mock_createStageOutCommand.assert_called_with("sourcePFN", "targetPFN", None, None)
mock_createStageOutCommand.assert_called_with("sourcePFN", "targetPFN", None, None, auth_method='TOKEN')
mock_executeCommand.assert_called_with("command")
calls = [call(600), call(600), call(600), call(600)]
mock_time.sleep.assert_has_calls(calls)
Expand Down