diff --git a/src/python/WMCore/Storage/Backends/GFAL2Impl.py b/src/python/WMCore/Storage/Backends/GFAL2Impl.py index 4b00d16591..1835e49591 100644 --- a/src/python/WMCore/Storage/Backends/GFAL2Impl.py +++ b/src/python/WMCore/Storage/Backends/GFAL2Impl.py @@ -5,6 +5,7 @@ """ import argparse import os +import logging from WMCore.Storage.Registry import registerStageOutImpl from WMCore.Storage.StageOutImpl import StageOutImpl @@ -24,7 +25,24 @@ def __init__(self, stagein=False): # Next commands after separation are executed without env -i and this leads us with # mixed environment with COMP and system python. # GFAL2 is not build under COMP environment and it had failures with mixed environment. - self.setups = "env -i X509_USER_PROXY=$X509_USER_PROXY JOBSTARTDIR=$JOBSTARTDIR bash -c '{}'" + self.setups = "env -i JOBSTARTDIR=$JOBSTARTDIR bash -c '{}'" # Default initialization, it is tweaked in createStageOutCommand depending on the authentication method + self.removeCommand = self.setups.format('. $JOBSTARTDIR/startup_environment.sh; date; gfal-rm -t 600 {}') + self.copyOpts = '-t 2400 -T 2400 -p -v --abort-on-failure {checksum} {options} {source} {destination}' + self.copyCommand = self.setups.format('. $JOBSTARTDIR/startup_environment.sh; date; gfal-copy ' + self.copyOpts) + + def adjustSetup(self, auth_method=None): + """ + Adjust the `self.setups` based on the selected authentication method and regenerate commands. + """ + if auth_method == "X509": + self.setups = "env -i X509_USER_PROXY=$X509_USER_PROXY JOBSTARTDIR=$JOBSTARTDIR bash -c '{}'" + elif auth_method == "TOKEN": + self.setups = "env -i BEARER_TOKEN=$(cat $BEARER_TOKEN_FILE) JOBSTARTDIR=$JOBSTARTDIR bash -c '{}'" + else: + logging.info("Warning! Running gfal without either a X509 certificate or a token!") + self.setups = "env -i JOBSTARTDIR=$JOBSTARTDIR bash -c '{}'" + + # Regenerate dependent commands self.removeCommand = self.setups.format('. $JOBSTARTDIR/startup_environment.sh; date; gfal-rm -t 600 {}') self.copyOpts = '-t 2400 -T 2400 -p -v --abort-on-failure {checksum} {options} {source} {destination}' self.copyCommand = self.setups.format('. $JOBSTARTDIR/startup_environment.sh; date; gfal-copy ' + self.copyOpts) @@ -113,7 +131,7 @@ def buildCopyCommandDict(self, sourcePFN, targetPFN, options=None, checksums=Non return copyCommandDict - def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=None): + def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=None, auth_method=None): """ Create gfal-cp command for stageOut @@ -121,8 +139,12 @@ def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=No :targetPFN: str, destination PFN :options: str, additional options for gfal-cp :checksums: dict, collect checksums according to the algorithms saved as keys + :auth_method: str, the authentication method to be used ("X509", "TOKEN", or None) """ + # Adjust the setup + self.adjustSetup(auth_method) + # Construct the gfal-cp command copyCommandDict = self.buildCopyCommandDict(sourcePFN, targetPFN, options, checksums) copyCommand = self.copyCommand.format_map(copyCommandDict) result = "#!/bin/bash\n" + copyCommand @@ -137,11 +159,11 @@ def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=No {remove_command} fi exit $EXIT_STATUS - """.format(remove_command=self.createRemoveFileCommand(targetPFN)) + """.format(remove_command=self.createRemoveFileCommand(targetPFN)) return result - def createDebuggingCommand(self, sourcePFN, targetPFN, options=None, checksums=None): + def createDebuggingCommand(self, sourcePFN, targetPFN, options=None, checksums=None, auth_method=None): """ Debug a failed gfal-cp command for stageOut, without re-running it, providing information on the environment and the certifications @@ -150,8 +172,13 @@ def createDebuggingCommand(self, sourcePFN, targetPFN, options=None, checksums=N :targetPFN: str, destination PFN :options: str, additional options for gfal-cp :checksums: dict, collect checksums according to the algorithms saved as keys + :auth_method: str, the authentication method to be used ("X509", "TOKEN", or None) """ + # Adjust the setup + self.adjustSetup(auth_method) + + # Build the gfal-cp command for debugging purposes copyCommandDict = self.buildCopyCommandDict(sourcePFN, targetPFN, options, checksums) copyCommand = self.copyCommand.format_map(copyCommandDict) diff --git a/src/python/WMCore/Storage/StageOutImpl.py b/src/python/WMCore/Storage/StageOutImpl.py index 46573aec4b..4289bfa2b1 100644 --- a/src/python/WMCore/Storage/StageOutImpl.py +++ b/src/python/WMCore/Storage/StageOutImpl.py @@ -132,7 +132,7 @@ def createOutputDirectory(self, targetPFN): If no directory is required, do not implement this method """ - def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=None): + def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=None, auth_method=None): """ _createStageOutCommand_ @@ -142,7 +142,7 @@ def createStageOutCommand(self, sourcePFN, targetPFN, options=None, checksums=No """ raise NotImplementedError("StageOutImpl.createStageOutCommand") - def createDebuggingCommand(self, sourcePFN, targetPFN, options=None, checksums=None): + def createDebuggingCommand(self, sourcePFN, targetPFN, options=None, checksums=None, auth_method=None): """ Build a shell command that will report in the logs the details about failing stageOut commands @@ -178,10 +178,9 @@ def __call__(self, protocol, inputPFN, targetPFN, options=None, checksums=None): This operator does the actual stage out by invoking the overridden plugin methods of the derived object. - - """ - # // + + # // # // Generate the source PFN from the plain PFN if needed # // sourcePFN = self.createSourceName(protocol, inputPFN) @@ -189,7 +188,7 @@ def __call__(self, protocol, inputPFN, targetPFN, options=None, checksums=None): # destination may also need PFN changed # i.e. if we are staging in a file from an SE targetPFN = self.createTargetName(protocol, targetPFN) - # // + # // # // Create the output directory if implemented # // for retryCount in range(self.numRetries + 1): @@ -203,30 +202,64 @@ def __call__(self, protocol, inputPFN, targetPFN, options=None, checksums=None): msg += "Error details:\n{}\n".format(str(ex)) logging.error(msg) if retryCount == self.numRetries: - # // + # // # // last retry, propagate exception # // + logging.error("Maximum retries exhausted when trying to create the output directory") raise ex time.sleep(self.retryPause) # // # // Create the command to be used. # // - command = self.createStageOutCommand(sourcePFN, targetPFN, options, checksums) - # // + try: + command = self.createStageOutCommand(sourcePFN, targetPFN, options, checksums, auth_method="TOKEN") + except TypeError as ex: + logging.warning("Falling back to default createStageOutCommand due to: %s", str(ex)) + command = self.createStageOutCommand(sourcePFN, targetPFN, options, checksums) + # // # // Run the command # // stageOutEx = None # variable to store the possible StageOutError for retryCount in range(self.numRetries + 1): try: - logging.info("Running the stage out...") + logging.info("Running the stage out with tokens (attempt %d)...", retryCount + 1) self.executeCommand(command) + logging.info("Command to run: %s", command) + logging.info("Stage-out succeeded with the current environment.") break + except StageOutError as ex: - msg = "Attempt {} to stage out failed.\n".format(retryCount) + msg = "Attempt {} to stage out failed with default setup.\n".format(retryCount) msg += "Error details:\n{}\n".format(str(ex)) logging.error(msg) + + logging.info("Retrying with authentication-safe logic...") + + # Authentication-safe fallback logic + if os.getenv("X509_USER_PROXY"): + logging.info("Retrying with X509_USER_PROXY after unsetting BEARER_TOKEN...") + os.system("unset BEARER_TOKEN; unset BEARER_TOKEN_FILE") + command = self.createStageOutCommand(sourcePFN, targetPFN, options, checksums, auth_method="X509") + try: + self.executeCommand(command) + logging.info("Stage-out succeeded with X509 after unsetting BEARER_TOKEN.") + return + except StageOutError as fallbackEx: + logging.warning("Fallback with X509_USER_PROXY failed:\n%s", str(fallbackEx)) + + if os.getenv("BEARER_TOKEN") or os.getenv("BEARER_TOKEN_FILE"): + logging.info("Retrying with BEARER_TOKEN after unsetting X509_USER_PROXY...") + os.system("unset X509_USER_PROXY") + command = self.createStageOutCommand(sourcePFN, targetPFN, options, checksums, auth_method="TOKEN") + try: + self.executeCommand(command) + logging.info("Stage-out succeeded with TOKEN after unsetting X509_USER_PROXY.") + return + except StageOutError as fallbackEx: + logging.warning("Fallback with BEARER_TOKEN failed:\n%s", str(fallbackEx)) + if retryCount == self.numRetries: # Last retry, propagate the information outside of the for loop stageOutEx = ex @@ -236,6 +269,6 @@ def __call__(self, protocol, inputPFN, targetPFN, options=None, checksums=None): # This block will now always be executed after retries are exhausted if stageOutEx is not None: logging.error("Maximum number of retries exhausted. Further details on the failed command reported below.") - command = self.createDebuggingCommand(sourcePFN, targetPFN, options, checksums) + command = self.createDebuggingCommand(sourcePFN, targetPFN, options, checksums, auth_method="TOKEN") self.executeCommand(command) raise stageOutEx from None diff --git a/test/python/WMCore_t/Storage_t/Backends_t/GFAL2Impl_t.py b/test/python/WMCore_t/Storage_t/Backends_t/GFAL2Impl_t.py index d82a504416..58a5e9fa84 100644 --- a/test/python/WMCore_t/Storage_t/Backends_t/GFAL2Impl_t.py +++ b/test/python/WMCore_t/Storage_t/Backends_t/GFAL2Impl_t.py @@ -14,11 +14,12 @@ def setUp(self): def testInit(self): testGFAL2Impl = GFAL2Impl() - removeCommand = "env -i X509_USER_PROXY=$X509_USER_PROXY JOBSTARTDIR=$JOBSTARTDIR bash -c " \ - "'. $JOBSTARTDIR/startup_environment.sh; date; gfal-rm -t 600 {}'" - copyCommand = "env -i X509_USER_PROXY=$X509_USER_PROXY JOBSTARTDIR=$JOBSTARTDIR bash -c '" \ - ". $JOBSTARTDIR/startup_environment.sh; date; gfal-copy -t 2400 -T 2400 -p " \ - "-v --abort-on-failure {checksum} {options} {source} {destination}'" + # The default setup without a token + removeCommand = "env -i JOBSTARTDIR=$JOBSTARTDIR bash -c " \ + "'. $JOBSTARTDIR/startup_environment.sh; date; gfal-rm -t 600 {}'" + copyCommand = "env -i JOBSTARTDIR=$JOBSTARTDIR bash -c '" \ + ". $JOBSTARTDIR/startup_environment.sh; date; gfal-copy -t 2400 -T 2400 -p " \ + "-v --abort-on-failure {checksum} {options} {source} {destination}'" self.assertEqual(removeCommand, testGFAL2Impl.removeCommand) self.assertEqual(copyCommand, testGFAL2Impl.copyCommand) @@ -79,10 +80,23 @@ def testCreateRemoveFileCommand_removeCommand(self, mock_path): def testCreateStageOutCommand_stageIn(self, mock_createRemoveFileCommand): self.GFAL2Impl.stageIn = True mock_createRemoveFileCommand.return_value = "targetPFN2" - result = self.GFAL2Impl.createStageOutCommand("sourcePFN", "targetPFN") + + # Call createStageOutCommand with auth_method='TOKEN' + result = self.GFAL2Impl.createStageOutCommand( + "sourcePFN", "targetPFN", auth_method='TOKEN' + ) + + # Generate the expected result with auth_method='TOKEN' expectedResult = self.getStageOutCommandResult( - self.getCopyCommandDict("-K adler32", "", "sourcePFN", "targetPFN"), "targetPFN2") + self.getCopyCommandDict("-K adler32", "", "sourcePFN", "targetPFN"), + "targetPFN2", + auth_method="TOKEN" + ) + + # Assert that the removeFileCommand was called correctly mock_createRemoveFileCommand.assert_called_with("targetPFN") + + # Compare the expected and actual result self.assertEqual(expectedResult, result) @mock.patch('WMCore.Storage.Backends.GFAL2Impl.GFAL2Impl.createRemoveFileCommand') @@ -94,20 +108,37 @@ def testCreateStageOutCommand_options(self, mock_createRemoveFileCommand): mock_createRemoveFileCommand.assert_called_with("file:targetPFN") self.assertEqual(expectedResult, result) - def getCopyCommandDict(self, checksum, options, source, destination): - copyCommandDict = {'checksum': '', 'options': '', 'source': '', 'destination': ''} - copyCommandDict['checksum'] = checksum - copyCommandDict['options'] = options - copyCommandDict['source'] = source - copyCommandDict['destination'] = destination + def getCopyCommandDict(self, checksum, options, source, destination, auth_method=None): + """ + Generate a dictionary for the gfal-copy command, dynamically adjusting for auth_method. + """ + copyCommandDict = { + 'checksum': checksum, + 'options': options, + 'source': source, + 'destination': destination + } return copyCommandDict - def getStageOutCommandResult(self, copyCommandDict, createRemoveFileCommandResult): + def getStageOutCommandResult(self, copyCommandDict, createRemoveFileCommandResult, auth_method=None): + """ + Generate the expected result for the gfal-copy command, including dynamic adjustments for auth_method. + """ + # Adjust the setup based on auth_method + if auth_method == "X509": + setups = "env -i X509_USER_PROXY=$X509_USER_PROXY JOBSTARTDIR=$JOBSTARTDIR bash -c '{}'" + elif auth_method == "TOKEN": + setups = "env -i BEARER_TOKEN=$(cat $BEARER_TOKEN_FILE) JOBSTARTDIR=$JOBSTARTDIR bash -c '{}'" + else: + setups = "env -i JOBSTARTDIR=$JOBSTARTDIR bash -c '{}'" + + # Build the copy command dynamically + copyOpts = '-t 2400 -T 2400 -p -v --abort-on-failure {checksum} {options} {source} {destination}' + copyCommand = setups.format('. $JOBSTARTDIR/startup_environment.sh; date; gfal-copy ' + copyOpts) + + # Construct the full result result = "#!/bin/bash\n" - - copyCommand = self.copyCommand.format_map(copyCommandDict) - result += copyCommand - + result += copyCommand.format_map(copyCommandDict) result += """ EXIT_STATUS=$? echo "gfal-copy exit status: $EXIT_STATUS" @@ -118,7 +149,7 @@ def getStageOutCommandResult(self, copyCommandDict, createRemoveFileCommandResul fi exit $EXIT_STATUS """.format(remove_command=createRemoveFileCommandResult) - + return result @mock.patch('WMCore.Storage.Backends.GFAL2Impl.os.path') diff --git a/test/python/WMCore_t/Storage_t/StageOutImpl_t.py b/test/python/WMCore_t/Storage_t/StageOutImpl_t.py index 05a161544e..17a0fe3965 100644 --- a/test/python/WMCore_t/Storage_t/StageOutImpl_t.py +++ b/test/python/WMCore_t/Storage_t/StageOutImpl_t.py @@ -91,7 +91,7 @@ def testCallable(self, mock_executeCommand, mock_createStageOutCommand, mock_cre mock_createSourceName.assert_called_with("protocol", "inputPFN") mock_createTargetName.assert_called_with("protocol", "targetPFN") mock_createOutputDirectory.assert_called_with("targetPFN") - mock_createStageOutCommand.assert_called_with("sourcePFN", "targetPFN", None, None) + mock_createStageOutCommand.assert_called_with("sourcePFN", "targetPFN", None, None, auth_method='TOKEN') mock_executeCommand.assert_called_with("command") @mock.patch('WMCore.Storage.StageOutImpl.StageOutImpl.createSourceName') @@ -111,7 +111,7 @@ def testCallable_StageOutError(self, mock_time, mock_executeCommand, mock_create mock_createSourceName.assert_called_with("protocol", "inputPFN") mock_createTargetName.assert_called_with("protocol", "targetPFN") mock_createOutputDirectory.assert_called_with("targetPFN") - mock_createStageOutCommand.assert_called_with("sourcePFN", "targetPFN", None, None) + mock_createStageOutCommand.assert_called_with("sourcePFN", "targetPFN", None, None, auth_method='TOKEN') mock_executeCommand.assert_called_with("command") calls = [call(600), call(600), call(600), call(600)] mock_time.sleep.assert_has_calls(calls)