Skip to content

Commit

Permalink
Merge branch 'develop' into kill_fix
Browse files Browse the repository at this point in the history
  • Loading branch information
mesmith75 authored Jun 20, 2024
2 parents 2d96d5f + d447bcb commit b3302a4
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 33 deletions.
12 changes: 0 additions & 12 deletions ganga/GangaDirac/Lib/Backends/DiracBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@


configDirac = getConfig('DIRAC')
default_finaliseOnMaster = configDirac['default_finaliseOnMaster']
default_downloadOutputSandbox = configDirac['default_downloadOutputSandbox']
default_unpackOutputSandbox = configDirac['default_unpackOutputSandbox']
logger = getLogger()
Expand Down Expand Up @@ -131,8 +130,6 @@ class DiracBase(IBackend):
'credential_requirements': ComponentItem('CredentialRequirement', defvalue=DiracProxy),
'blockSubmit': SimpleItem(defvalue=True,
doc='Shall we use the block submission?'),
'finaliseOnMaster': SimpleItem(defvalue=default_finaliseOnMaster,
doc='Finalise the subjobs all in one go when they are all finished.'),
'downloadSandbox': SimpleItem(defvalue=default_downloadOutputSandbox,
doc='Do you want to download the output sandbox when the job finalises.'),
'unpackOutputSandbox': SimpleItem(defvalue=default_unpackOutputSandbox, hidden=True,
Expand Down Expand Up @@ -1084,15 +1081,6 @@ async def _internal_job_finalisation(job, updated_dirac_status):
job (Job): Thi is the job we want to finalise
updated_dirac_status (str): String representing the Ganga finalisation state of the job failed/completed
"""
if job.backend.finaliseOnMaster and job.master and updated_dirac_status == 'completed':
job.updateStatus('completing')
allComplete = True
for sj in job.master.subjobs:
if sj.status not in ['completing', 'failed', 'killed', 'removed', 'completed']:
allComplete = False
break
if allComplete:
DiracBase.finalise_jobs(job.master.subjobs, job.master.backend.downloadSandbox)

if updated_dirac_status == 'completed':
try:
Expand Down
55 changes: 34 additions & 21 deletions ganga/GangaLHCb/Lib/LHCbDataset/BKQuery.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,17 @@
#\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\#
import os
import datetime
# \/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\#
from GangaCore.Core.exceptions import GangaException
from GangaCore.GPIDev.Schema import Schema, Version, SimpleItem, ComponentItem
from GangaCore.GPIDev.Base import GangaObject
from GangaCore.GPIDev.Base.Proxy import isType, stripProxy, addProxy
from GangaCore.GPIDev.Base.Proxy import isType, addProxy
from GangaCore.GPIDev.Credentials import require_credential
from GangaDirac.Lib.Credentials.DiracProxy import DiracProxy
from GangaDirac.Lib.Backends.DiracUtils import get_result
from GangaDirac.Lib.Utilities.DiracUtilities import GangaDiracError
from GangaDirac.Lib.Files.DiracFile import DiracFile
from GangaCore.Utility.logging import getLogger
from GangaLHCb.Lib.LHCbDataset import LHCbDataset, LHCbCompressedDataset
from GangaLHCb.Lib.Backends.Dirac import filterLFNsBySE
logger = getLogger()
#\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\#
# \/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\#


class BKQuery(GangaObject):
Expand Down Expand Up @@ -100,6 +97,8 @@ class BKQuery(GangaObject):
doc='Return the data set, even if all the LFNs are archived')
schema['SMOG2'] = SimpleItem(defvalue='', typelist=['str', 'list'],
doc='Specify the state of SMOG2')
schema['retry_limit'] = SimpleItem(defvalue=1, typelist=['int'],
doc='Number of times to retry the DIRAC commands')
_schema = Schema(Version(1, 2), schema)
_category = 'query'
_name = "BKQuery"
Expand Down Expand Up @@ -127,15 +126,18 @@ def getDatasetMetadata(self):
msg = 'selection not supported for type="%s".' % self.type
raise GangaException(msg)
cmd = "getDataset('%s','%s','%s','%s','%s','%s', '%s')" % (self.path, self.dqflag,
self.type, self.startDate, self.endDate, self.selection, self.SMOG2)
self.type, self.startDate, self.endDate,
self.selection, self.SMOG2)
from GangaCore.GPIDev.Lib.GangaList.GangaList import GangaList
knownLists = [tuple, list, GangaList]
if isType(self.dqflag, knownLists):
cmd = "getDataset('%s',%s,'%s','%s','%s','%s', '%s')" % (self.path, self.dqflag,
self.type, self.startDate, self.endDate, self.selection, self.SMOG2)
self.type, self.startDate, self.endDate,
self.selection, self.SMOG2)

try:
value = get_result(cmd, 'BK query error.', credential_requirements=self.credential_requirements)
value = get_result(cmd, 'BK query error.', credential_requirements=self.credential_requirements,
retry_limit=self.retry_limit)
except GangaDiracError as err:
return {'OK': False, 'Value': str(err)}

Expand Down Expand Up @@ -170,13 +172,18 @@ def getDataset(self, compressed=True, SE=None):
msg = 'selection not supported for type="%s".' % self.type
raise GangaException(msg)
cmd = "getDataset('%s','%s','%s','%s','%s','%s', %s)" % (self.path, self.dqflag,
self.type, self.startDate, self.endDate, self.selection, self.SMOG2)
self.type, self.startDate, self.endDate,
self.selection, self.SMOG2)
from GangaCore.GPIDev.Lib.GangaList.GangaList import GangaList
knownLists = [tuple, list, GangaList]
if isType(self.dqflag, knownLists):
cmd = "getDataset('%s',%s,'%s','%s','%s','%s', %s)" % (self.path, self.dqflag, self.type, self.startDate,
self.endDate, self.selection, self.SMOG2)
result = get_result(cmd, 'BK query error.', credential_requirements=self.credential_requirements)
self.endDate, self.selection, self.SMOG2)
result = get_result(
cmd,
'BK query error.',
credential_requirements=self.credential_requirements,
retry_limit=self.retry_limit)
logger.debug("Finished Running Command")
files = []
value = result
Expand Down Expand Up @@ -205,7 +212,8 @@ def getDataset(self, compressed=True, SE=None):
if isMC and self.check_archived:
logger.debug('Detected an MC data set. Checking if it has been archived')
all_reps = get_result("getReplicas(%s)" % files, 'Get replica error.',
credential_requirements=self.credential_requirements)
credential_requirements=self.credential_requirements,
retry_limit=self.retry_limit)
if 'Successful' in all_reps:
all_ses = set([])
for _lfn, _repz in all_reps['Successful'].items():
Expand All @@ -214,16 +222,19 @@ def getDataset(self, compressed=True, SE=None):
all_archived = True
for _se in all_ses:
is_archived = get_result("isSEArchive('%s')" % _se, 'Check archive error.',
credential_requirements=self.credential_requirements)
credential_requirements=self.credential_requirements,
retry_limit=self.retry_limit)
if not is_archived:
all_archived = False
break
if all_archived and not self.ignore_archived:
raise GangaDiracError(
"All the files are only available on archive SEs. It is likely the data set has been archived. Contact data management to request that it be staged")
"All the files are only available on archive SEs. It is likely the data set has been archived. "
"Contact data management to request that it be staged")
elif all_archived:
logger.warning(
"All the files are only available on archive SEs. It is likely the data set has been archived. Contact data management to request that it be staged")
"All the files are only available on archive SEs. It is likely the data set has been archived. "
"Contact data management to request that it be staged")

if compressed:
ds = LHCbCompressedDataset(files)
Expand All @@ -241,7 +252,7 @@ def getDataset(self, compressed=True, SE=None):

return addProxy(ds)

#\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\#
# \/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\#


class BKQueryDict(GangaObject):
Expand Down Expand Up @@ -294,8 +305,9 @@ def getDatasetMetadata(self):
return None
cmd = 'bkQueryDict(%s)' % self.dict
try:
value = get_result(cmd, 'BK query error.', credential_requirements=self.credential_requirements)
except GangaDiracError as err:
value = get_result(cmd, 'BK query error.', credential_requirements=self.credential_requirements,
retry_limit=self.retry_limit)
except GangaDiracError:
return {'OK': False, 'Value': {}}

files = []
Expand All @@ -316,7 +328,8 @@ def getDataset(self):
if not self.dict:
return None
cmd = 'bkQueryDict(%s)' % self.dict
value = get_result(cmd, 'BK query error.', credential_requirements=self.credential_requirements)
value = get_result(cmd, 'BK query error.', credential_requirements=self.credential_requirements,
retry_limit=self.retry_limit)

files = []
if 'LFNs' in value:
Expand All @@ -331,4 +344,4 @@ def getDataset(self):

return addProxy(ds)

#\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\#
# \/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\#

0 comments on commit b3302a4

Please sign in to comment.