Skip to content

Commit

Permalink
Looking for JobCreator bug
Browse files Browse the repository at this point in the history
  • Loading branch information
LinaresToine committed Apr 29, 2024
1 parent 8621dbb commit c3e370c
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 14 deletions.
8 changes: 6 additions & 2 deletions src/python/WMComponent/JobCreator/JobCreatorPoller.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,13 +412,15 @@ def pollSubscriptions(self):
"""
logging.info("Beginning JobCreator.pollSubscriptions() cycle.")
myThread = threading.currentThread()

logging.info('TEST: jobCreator.pollSubscriptions 1')
# First, get list of Subscriptions
subscriptions = self.subscriptionList.execute()

logging.info('TEST: jobCreator.pollSubscriptions 2')
# Okay, now we have a list of subscriptions
for subscriptionID in subscriptions:
logging.info('TEST: jobCreator.pollSubscriptions 3. subscriptionID is {}'.format(subscriptionID))
wmbsSubscription = Subscription(id=subscriptionID)
logging.info('TEST: jobCreator.pollSubscriptions 4. Subscription is {}'.format(wmbsSubscription))
try:
wmbsSubscription.load()
except IndexError:
Expand All @@ -431,8 +433,10 @@ def pollSubscriptions(self):
continue

workflow = Workflow(id=wmbsSubscription["workflow"].id)
logging.info('TEST: jobCreator.pollSubscriptions 5. workflow is {}'.format(workflow))
workflow.load()
wmbsSubscription['workflow'] = workflow
logging.info('TEST: jobCreator.pollSubscriptions 6. wmbsSubscription is {}'.format(wmbsSubscription))
wmWorkload = retrieveWMSpec(workflow=workflow)

if not workflow.task or not wmWorkload:
Expand Down
12 changes: 7 additions & 5 deletions src/python/WMComponent/RetryManager/Modifier/BaseModifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def __init__(self, config):
self.backupPath = "oldSandboxes/"
self.sandboxPath = None
self.config = config
self.dataDict = {}

def loadPKL(self, pklFile):
with open(pklFile, 'rb') as file:
Expand All @@ -33,11 +34,12 @@ def loadPKL(self, pklFile):
def savePKL(self, pklFile, data):
with open(pklFile, 'wb') as file:
pickle.dump(data, file)

def loadJobPKL(self, pklFile):
if self.data is None:
self.job = load


def getDataDict(self):
return self.dataDict

def updateDataDict(self, key, value):
self.dataDict[key] = value

def updateSandbox(self, jobPKL, workload): # Not using workload?
date = datetime.datetime.now().strftime("%y%m%d%H%M%S")
Expand Down
16 changes: 9 additions & 7 deletions src/python/WMComponent/RetryManager/Modifier/MemoryModifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@


class MemoryModifier(BaseModifier):
def __init__(self):
self.taskMemory = {}

def changeSandbox(self, jobPKL, newMemory):
"""
_changeSandbox_
Expand Down Expand Up @@ -93,16 +92,19 @@ def changeMemory(self, job, settings):

newMemory = self.getNewMemory(jobPKL, settings)
taskPath = self.getTaskPath(jobPKL)
taskMemory = self.getDataDict()
logging.info('CURRENT TASK is {}'.format(taskPath))
logging.info('1. DICTIONARY is {}'.format(taskMemory))

if not taskPath in self.taskMemory:
self.taskMemory[taskPath] = job['estimatedMemoryUsage']
if not taskPath in taskMemory:
self.updateDataDict(key=taskPath, value=job['estimatedMemoryUsage'])

self.changeJobPkl(pklFile, jobPKL, newMemory)

if self.taskMemory[taskPath] < newMemory:
logging.info('2. DICTIONARY is {}'.format(taskMemory))
if taskMemory[taskPath] < newMemory:
self.changeMemoryForTask(taskPath, jobPKL, newMemory)
self.taskMemory[taskPath] = newMemory

taskMemory[taskPath] = newMemory
logging.info('Old maxPSS: %d. New maxPSS: %d', job['estimatedMemoryUsage'], newMemory)

def modifyJob(self, job):
Expand Down

0 comments on commit c3e370c

Please sign in to comment.