From 7a3f526542b56a0bca3323bc7c207535ded7fdbf Mon Sep 17 00:00:00 2001 From: LinaresToine Date: Sun, 28 Apr 2024 20:09:54 -0500 Subject: [PATCH] Looking for JobCreator bug --- .../WMComponent/JobCreator/JobCreatorPoller.py | 8 ++++++-- .../RetryManager/Modifier/BaseModifier.py | 12 +++++++----- .../RetryManager/Modifier/MemoryModifier.py | 16 +++++++++------- 3 files changed, 22 insertions(+), 14 deletions(-) diff --git a/src/python/WMComponent/JobCreator/JobCreatorPoller.py b/src/python/WMComponent/JobCreator/JobCreatorPoller.py index ade229396b..1332169873 100644 --- a/src/python/WMComponent/JobCreator/JobCreatorPoller.py +++ b/src/python/WMComponent/JobCreator/JobCreatorPoller.py @@ -412,13 +412,15 @@ def pollSubscriptions(self): """ logging.info("Beginning JobCreator.pollSubscriptions() cycle.") myThread = threading.currentThread() - + logging.info('TEST: jobCreator.pollSubscriptions 1') # First, get list of Subscriptions subscriptions = self.subscriptionList.execute() - + logging.info('TEST: jobCreator.pollSubscriptions 2') # Okay, now we have a list of subscriptions for subscriptionID in subscriptions: + logging.info('TEST: jobCreator.pollSubscriptions 3. subscriptionID is {}'.format(subscriptionID)) wmbsSubscription = Subscription(id=subscriptionID) + logging.info('TEST: jobCreator.pollSubscriptions 4. Subscription is {}'.format(wmbsSubscription)) try: wmbsSubscription.load() except IndexError: @@ -431,8 +433,10 @@ def pollSubscriptions(self): continue workflow = Workflow(id=wmbsSubscription["workflow"].id) + logging.info('TEST: jobCreator.pollSubscriptions 5. workflow is {}'.format(workflow)) workflow.load() wmbsSubscription['workflow'] = workflow + logging.info('TEST: jobCreator.pollSubscriptions 6. wmbsSubscription is {}'.format(wmbsSubscription)) wmWorkload = retrieveWMSpec(workflow=workflow) if not workflow.task or not wmWorkload: diff --git a/src/python/WMComponent/RetryManager/Modifier/BaseModifier.py b/src/python/WMComponent/RetryManager/Modifier/BaseModifier.py index 172f8585ce..2a8554e1f1 100644 --- a/src/python/WMComponent/RetryManager/Modifier/BaseModifier.py +++ b/src/python/WMComponent/RetryManager/Modifier/BaseModifier.py @@ -24,6 +24,7 @@ def __init__(self, config): self.backupPath = "oldSandboxes/" self.sandboxPath = None self.config = config + self.dataDict = {} def loadPKL(self, pklFile): with open(pklFile, 'rb') as file: @@ -33,11 +34,12 @@ def loadPKL(self, pklFile): def savePKL(self, pklFile, data): with open(pklFile, 'wb') as file: pickle.dump(data, file) - - def loadJobPKL(self, pklFile): - if self.data is None: - self.job = load - + + def getDataDict(self): + return self.dataDict + + def updateDataDict(self, key, value): + self.dataDict[key] = value def updateSandbox(self, jobPKL, workload): # Not using workload? date = datetime.datetime.now().strftime("%y%m%d%H%M%S") diff --git a/src/python/WMComponent/RetryManager/Modifier/MemoryModifier.py b/src/python/WMComponent/RetryManager/Modifier/MemoryModifier.py index 429643a272..3151939acb 100644 --- a/src/python/WMComponent/RetryManager/Modifier/MemoryModifier.py +++ b/src/python/WMComponent/RetryManager/Modifier/MemoryModifier.py @@ -21,8 +21,7 @@ class MemoryModifier(BaseModifier): - def __init__(self): - self.taskMemory = {} + def changeSandbox(self, jobPKL, newMemory): """ _changeSandbox_ @@ -93,16 +92,19 @@ def changeMemory(self, job, settings): newMemory = self.getNewMemory(jobPKL, settings) taskPath = self.getTaskPath(jobPKL) + taskMemory = self.getDataDict() + logging.info('CURRENT TASK is {}'.format(taskPath)) + logging.info('1. DICTIONARY is {}'.format(taskMemory)) - if not taskPath in self.taskMemory: - self.taskMemory[taskPath] = job['estimatedMemoryUsage'] + if not taskPath in taskMemory: + self.updateDataDict(key=taskPath, value=job['estimatedMemoryUsage']) self.changeJobPkl(pklFile, jobPKL, newMemory) - if self.taskMemory[taskPath] < newMemory: + logging.info('2. DICTIONARY is {}'.format(taskMemory)) + if taskMemory[taskPath] < newMemory: self.changeMemoryForTask(taskPath, jobPKL, newMemory) - self.taskMemory[taskPath] = newMemory - + taskMemory[taskPath] = newMemory logging.info('Old maxPSS: %d. New maxPSS: %d', job['estimatedMemoryUsage'], newMemory) def modifyJob(self, job):