From 25b61b70fcda1a6235e80c7f7007bdd24e8e7f93 Mon Sep 17 00:00:00 2001 From: Mark Smith Date: Wed, 19 Jun 2024 19:10:14 +0200 Subject: [PATCH 1/4] clean up finaliseOnMaster --- ganga/GangaDirac/Lib/Backends/DiracBase.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/ganga/GangaDirac/Lib/Backends/DiracBase.py b/ganga/GangaDirac/Lib/Backends/DiracBase.py index 71daa242a9..f291cd87b3 100644 --- a/ganga/GangaDirac/Lib/Backends/DiracBase.py +++ b/ganga/GangaDirac/Lib/Backends/DiracBase.py @@ -41,7 +41,6 @@ configDirac = getConfig('DIRAC') -default_finaliseOnMaster = configDirac['default_finaliseOnMaster'] default_downloadOutputSandbox = configDirac['default_downloadOutputSandbox'] default_unpackOutputSandbox = configDirac['default_unpackOutputSandbox'] logger = getLogger() @@ -131,8 +130,6 @@ class DiracBase(IBackend): 'credential_requirements': ComponentItem('CredentialRequirement', defvalue=DiracProxy), 'blockSubmit': SimpleItem(defvalue=True, doc='Shall we use the block submission?'), - 'finaliseOnMaster': SimpleItem(defvalue=default_finaliseOnMaster, - doc='Finalise the subjobs all in one go when they are all finished.'), 'downloadSandbox': SimpleItem(defvalue=default_downloadOutputSandbox, doc='Do you want to download the output sandbox when the job finalises.'), 'unpackOutputSandbox': SimpleItem(defvalue=default_unpackOutputSandbox, hidden=True, @@ -1084,15 +1081,6 @@ async def _internal_job_finalisation(job, updated_dirac_status): job (Job): Thi is the job we want to finalise updated_dirac_status (str): String representing the Ganga finalisation state of the job failed/completed """ - if job.backend.finaliseOnMaster and job.master and updated_dirac_status == 'completed': - job.updateStatus('completing') - allComplete = True - for sj in job.master.subjobs: - if sj.status not in ['completing', 'failed', 'killed', 'removed', 'completed']: - allComplete = False - break - if allComplete: - DiracBase.finalise_jobs(job.master.subjobs, job.master.backend.downloadSandbox) if updated_dirac_status == 'completed': try: From b743e25df17773b730b99e06b583613f2fd6c294 Mon Sep 17 00:00:00 2001 From: Mark Smith Date: Wed, 19 Jun 2024 19:20:54 +0200 Subject: [PATCH 2/4] add optional bkquery retry_limit --- ganga/GangaLHCb/Lib/LHCbDataset/BKQuery.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/ganga/GangaLHCb/Lib/LHCbDataset/BKQuery.py b/ganga/GangaLHCb/Lib/LHCbDataset/BKQuery.py index 6312ce76bf..4efa99afd6 100644 --- a/ganga/GangaLHCb/Lib/LHCbDataset/BKQuery.py +++ b/ganga/GangaLHCb/Lib/LHCbDataset/BKQuery.py @@ -100,6 +100,8 @@ class BKQuery(GangaObject): doc='Return the data set, even if all the LFNs are archived') schema['SMOG2'] = SimpleItem(defvalue='', typelist=['str', 'list'], doc='Specify the state of SMOG2') + schema['retry_limit'] = SimpleItem(defvalue=1, typelist=['int'], + doc='Number of times to retry the DIRAC commands') _schema = Schema(Version(1, 2), schema) _category = 'query' _name = "BKQuery" @@ -135,7 +137,8 @@ def getDatasetMetadata(self): self.type, self.startDate, self.endDate, self.selection, self.SMOG2) try: - value = get_result(cmd, 'BK query error.', credential_requirements=self.credential_requirements) + value = get_result(cmd, 'BK query error.', credential_requirements=self.credential_requirements, + retry_limit = self.retry_limit) except GangaDiracError as err: return {'OK': False, 'Value': str(err)} @@ -176,7 +179,7 @@ def getDataset(self, compressed=True, SE=None): if isType(self.dqflag, knownLists): cmd = "getDataset('%s',%s,'%s','%s','%s','%s', %s)" % (self.path, self.dqflag, self.type, self.startDate, self.endDate, self.selection, self.SMOG2) - result = get_result(cmd, 'BK query error.', credential_requirements=self.credential_requirements) + result = get_result(cmd, 'BK query error.', credential_requirements=self.credential_requirements, retry_limit = self.retry_limit) logger.debug("Finished Running Command") files = [] value = result @@ -205,7 +208,8 @@ def getDataset(self, compressed=True, SE=None): if isMC and self.check_archived: logger.debug('Detected an MC data set. Checking if it has been archived') all_reps = get_result("getReplicas(%s)" % files, 'Get replica error.', - credential_requirements=self.credential_requirements) + credential_requirements=self.credential_requirements, + retry_limit = self.retry_limit) if 'Successful' in all_reps: all_ses = set([]) for _lfn, _repz in all_reps['Successful'].items(): @@ -214,7 +218,8 @@ def getDataset(self, compressed=True, SE=None): all_archived = True for _se in all_ses: is_archived = get_result("isSEArchive('%s')" % _se, 'Check archive error.', - credential_requirements=self.credential_requirements) + credential_requirements=self.credential_requirements, + retry_limit = self.retry_limit) if not is_archived: all_archived = False break @@ -294,7 +299,8 @@ def getDatasetMetadata(self): return None cmd = 'bkQueryDict(%s)' % self.dict try: - value = get_result(cmd, 'BK query error.', credential_requirements=self.credential_requirements) + value = get_result(cmd, 'BK query error.', credential_requirements=self.credential_requirements, + retry_limit = self.retry_limit) except GangaDiracError as err: return {'OK': False, 'Value': {}} @@ -316,7 +322,8 @@ def getDataset(self): if not self.dict: return None cmd = 'bkQueryDict(%s)' % self.dict - value = get_result(cmd, 'BK query error.', credential_requirements=self.credential_requirements) + value = get_result(cmd, 'BK query error.', credential_requirements=self.credential_requirements, + retry_limit = self.retry_limit) files = [] if 'LFNs' in value: From 8161d9f942b0fb45bfdadb25aed369003db167e0 Mon Sep 17 00:00:00 2001 From: mesmith75 Date: Wed, 19 Jun 2024 17:22:07 +0000 Subject: [PATCH 3/4] autopep8 action fixes --- ganga/GangaLHCb/Lib/LHCbDataset/BKQuery.py | 34 ++++++++++++---------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/ganga/GangaLHCb/Lib/LHCbDataset/BKQuery.py b/ganga/GangaLHCb/Lib/LHCbDataset/BKQuery.py index 4efa99afd6..844ad58abf 100644 --- a/ganga/GangaLHCb/Lib/LHCbDataset/BKQuery.py +++ b/ganga/GangaLHCb/Lib/LHCbDataset/BKQuery.py @@ -1,4 +1,4 @@ -#\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\# +# \/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\# import os import datetime from GangaCore.Core.exceptions import GangaException @@ -14,7 +14,7 @@ from GangaLHCb.Lib.LHCbDataset import LHCbDataset, LHCbCompressedDataset from GangaLHCb.Lib.Backends.Dirac import filterLFNsBySE logger = getLogger() -#\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\# +# \/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\# class BKQuery(GangaObject): @@ -101,7 +101,7 @@ class BKQuery(GangaObject): schema['SMOG2'] = SimpleItem(defvalue='', typelist=['str', 'list'], doc='Specify the state of SMOG2') schema['retry_limit'] = SimpleItem(defvalue=1, typelist=['int'], - doc='Number of times to retry the DIRAC commands') + doc='Number of times to retry the DIRAC commands') _schema = Schema(Version(1, 2), schema) _category = 'query' _name = "BKQuery" @@ -129,16 +129,16 @@ def getDatasetMetadata(self): msg = 'selection not supported for type="%s".' % self.type raise GangaException(msg) cmd = "getDataset('%s','%s','%s','%s','%s','%s', '%s')" % (self.path, self.dqflag, - self.type, self.startDate, self.endDate, self.selection, self.SMOG2) + self.type, self.startDate, self.endDate, self.selection, self.SMOG2) from GangaCore.GPIDev.Lib.GangaList.GangaList import GangaList knownLists = [tuple, list, GangaList] if isType(self.dqflag, knownLists): cmd = "getDataset('%s',%s,'%s','%s','%s','%s', '%s')" % (self.path, self.dqflag, - self.type, self.startDate, self.endDate, self.selection, self.SMOG2) + self.type, self.startDate, self.endDate, self.selection, self.SMOG2) try: value = get_result(cmd, 'BK query error.', credential_requirements=self.credential_requirements, - retry_limit = self.retry_limit) + retry_limit=self.retry_limit) except GangaDiracError as err: return {'OK': False, 'Value': str(err)} @@ -173,13 +173,17 @@ def getDataset(self, compressed=True, SE=None): msg = 'selection not supported for type="%s".' % self.type raise GangaException(msg) cmd = "getDataset('%s','%s','%s','%s','%s','%s', %s)" % (self.path, self.dqflag, - self.type, self.startDate, self.endDate, self.selection, self.SMOG2) + self.type, self.startDate, self.endDate, self.selection, self.SMOG2) from GangaCore.GPIDev.Lib.GangaList.GangaList import GangaList knownLists = [tuple, list, GangaList] if isType(self.dqflag, knownLists): cmd = "getDataset('%s',%s,'%s','%s','%s','%s', %s)" % (self.path, self.dqflag, self.type, self.startDate, - self.endDate, self.selection, self.SMOG2) - result = get_result(cmd, 'BK query error.', credential_requirements=self.credential_requirements, retry_limit = self.retry_limit) + self.endDate, self.selection, self.SMOG2) + result = get_result( + cmd, + 'BK query error.', + credential_requirements=self.credential_requirements, + retry_limit=self.retry_limit) logger.debug("Finished Running Command") files = [] value = result @@ -209,7 +213,7 @@ def getDataset(self, compressed=True, SE=None): logger.debug('Detected an MC data set. Checking if it has been archived') all_reps = get_result("getReplicas(%s)" % files, 'Get replica error.', credential_requirements=self.credential_requirements, - retry_limit = self.retry_limit) + retry_limit=self.retry_limit) if 'Successful' in all_reps: all_ses = set([]) for _lfn, _repz in all_reps['Successful'].items(): @@ -219,7 +223,7 @@ def getDataset(self, compressed=True, SE=None): for _se in all_ses: is_archived = get_result("isSEArchive('%s')" % _se, 'Check archive error.', credential_requirements=self.credential_requirements, - retry_limit = self.retry_limit) + retry_limit=self.retry_limit) if not is_archived: all_archived = False break @@ -246,7 +250,7 @@ def getDataset(self, compressed=True, SE=None): return addProxy(ds) -#\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\# +# \/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\# class BKQueryDict(GangaObject): @@ -300,7 +304,7 @@ def getDatasetMetadata(self): cmd = 'bkQueryDict(%s)' % self.dict try: value = get_result(cmd, 'BK query error.', credential_requirements=self.credential_requirements, - retry_limit = self.retry_limit) + retry_limit=self.retry_limit) except GangaDiracError as err: return {'OK': False, 'Value': {}} @@ -323,7 +327,7 @@ def getDataset(self): return None cmd = 'bkQueryDict(%s)' % self.dict value = get_result(cmd, 'BK query error.', credential_requirements=self.credential_requirements, - retry_limit = self.retry_limit) + retry_limit=self.retry_limit) files = [] if 'LFNs' in value: @@ -338,4 +342,4 @@ def getDataset(self): return addProxy(ds) -#\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\# +# \/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\# From bcd3b4588c7f159467303c8bba9a3b14d394ee7a Mon Sep 17 00:00:00 2001 From: Mark Smith Date: Thu, 20 Jun 2024 10:16:29 +0100 Subject: [PATCH 4/4] Update BKQuery.py --- ganga/GangaLHCb/Lib/LHCbDataset/BKQuery.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/ganga/GangaLHCb/Lib/LHCbDataset/BKQuery.py b/ganga/GangaLHCb/Lib/LHCbDataset/BKQuery.py index 844ad58abf..db1f58f540 100644 --- a/ganga/GangaLHCb/Lib/LHCbDataset/BKQuery.py +++ b/ganga/GangaLHCb/Lib/LHCbDataset/BKQuery.py @@ -1,12 +1,9 @@ # \/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\# -import os -import datetime from GangaCore.Core.exceptions import GangaException from GangaCore.GPIDev.Schema import Schema, Version, SimpleItem, ComponentItem from GangaCore.GPIDev.Base import GangaObject -from GangaCore.GPIDev.Base.Proxy import isType, stripProxy, addProxy +from GangaCore.GPIDev.Base.Proxy import isType, addProxy from GangaCore.GPIDev.Credentials import require_credential -from GangaDirac.Lib.Credentials.DiracProxy import DiracProxy from GangaDirac.Lib.Backends.DiracUtils import get_result from GangaDirac.Lib.Utilities.DiracUtilities import GangaDiracError from GangaDirac.Lib.Files.DiracFile import DiracFile @@ -129,12 +126,14 @@ def getDatasetMetadata(self): msg = 'selection not supported for type="%s".' % self.type raise GangaException(msg) cmd = "getDataset('%s','%s','%s','%s','%s','%s', '%s')" % (self.path, self.dqflag, - self.type, self.startDate, self.endDate, self.selection, self.SMOG2) + self.type, self.startDate, self.endDate, + self.selection, self.SMOG2) from GangaCore.GPIDev.Lib.GangaList.GangaList import GangaList knownLists = [tuple, list, GangaList] if isType(self.dqflag, knownLists): cmd = "getDataset('%s',%s,'%s','%s','%s','%s', '%s')" % (self.path, self.dqflag, - self.type, self.startDate, self.endDate, self.selection, self.SMOG2) + self.type, self.startDate, self.endDate, + self.selection, self.SMOG2) try: value = get_result(cmd, 'BK query error.', credential_requirements=self.credential_requirements, @@ -173,7 +172,8 @@ def getDataset(self, compressed=True, SE=None): msg = 'selection not supported for type="%s".' % self.type raise GangaException(msg) cmd = "getDataset('%s','%s','%s','%s','%s','%s', %s)" % (self.path, self.dqflag, - self.type, self.startDate, self.endDate, self.selection, self.SMOG2) + self.type, self.startDate, self.endDate, + self.selection, self.SMOG2) from GangaCore.GPIDev.Lib.GangaList.GangaList import GangaList knownLists = [tuple, list, GangaList] if isType(self.dqflag, knownLists): @@ -229,10 +229,12 @@ def getDataset(self, compressed=True, SE=None): break if all_archived and not self.ignore_archived: raise GangaDiracError( - "All the files are only available on archive SEs. It is likely the data set has been archived. Contact data management to request that it be staged") + "All the files are only available on archive SEs. It is likely the data set has been archived. " + "Contact data management to request that it be staged") elif all_archived: logger.warning( - "All the files are only available on archive SEs. It is likely the data set has been archived. Contact data management to request that it be staged") + "All the files are only available on archive SEs. It is likely the data set has been archived. " + "Contact data management to request that it be staged") if compressed: ds = LHCbCompressedDataset(files) @@ -305,7 +307,7 @@ def getDatasetMetadata(self): try: value = get_result(cmd, 'BK query error.', credential_requirements=self.credential_requirements, retry_limit=self.retry_limit) - except GangaDiracError as err: + except GangaDiracError: return {'OK': False, 'Value': {}} files = []