Skip to content

Commit

Permalink
ms-output - do not break out producer and consumer loops in case of g…
Browse files Browse the repository at this point in the history
…eneric error
  • Loading branch information
mapellidario committed Dec 17, 2024
1 parent a69a30d commit 9a702b2
Showing 1 changed file with 36 additions and 8 deletions.
44 changes: 36 additions & 8 deletions src/python/WMCore/MicroService/MSOutput/MSOutput.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,15 +519,18 @@ def msOutputConsumer(self):
(msPipelineNonRelVal, self.msOutNonRelValColl)]
for pipeColl in pipeCollections:
wfCounters = 0
wfCountersOk = 0
pipeLine = pipeColl[0]
dbColl = pipeColl[1]
pipeLineName = pipeLine.getPipelineName()
for docOut in self.getDocsFromMongo(mQueryDict, dbColl, self.msConfig['limitRequestsPerCycle']):
# FIXME:
# To redefine those exceptions as MSoutputExceptions and
# start using those here so we do not mix with general errors
wfCounters += 1
try:
pipeLine.run(docOut)
wfCountersOk += 1
except (KeyError, TypeError) as ex:
msg = "%s Possibly malformed record in MongoDB. Err: %s. " % (pipeLineName, str(ex))
msg += "Continue to the next document."
Expand All @@ -542,10 +545,12 @@ def msOutputConsumer(self):
msg = "%s General error from pipeline. Err: %s. " % (pipeLineName, str(ex))
msg += "Will retry again in the next cycle."
self.logger.exception(msg)
break
wfCounters += 1
self.logger.info("Processed %d workflows from pipeline: %s", wfCounters, pipeLineName)
wfCounterTotal += wfCounters
workflowname = docOut.get("_id", "")
self.alertGenericError(self.mode, workflowname, msg, str(ex), str(docOut))
continue
self.logger.info("Successfully processed %d workflows from pipeline: %s", wfCountersOk, pipeLineName)
self.logger.info("Failed to proccess %d workflows from pipeline: %s", wfCounters - wfCountersOk, pipeLineName)
wfCounterTotal += wfCountersOk

return wfCounterTotal

Expand Down Expand Up @@ -579,12 +584,12 @@ def msOutputProducer(self, requestRecords):
Functor(self.docCleaner)])
# TODO:
# To generate the object from within the Function scope see above.
counter = 0
counterOk = 0
for request in requestRecords:
counter += 1
pipeLineName = msPipeline.getPipelineName()
try:
msPipeline.run(request)
counterOk += 1
except (KeyError, TypeError) as ex:
msg = "%s Possibly broken read from ReqMgr2 API or other. Err: %s." % (pipeLineName, str(ex))
msg += " Continue to the next document."
Expand All @@ -594,8 +599,10 @@ def msOutputProducer(self, requestRecords):
msg = "%s General Error from pipeline. Err: %s. " % (pipeLineName, str(ex))
msg += "Giving up Now."
self.logger.exception(str(ex))
break
return counter
workflowname = request.get("_id", "")
self.alertGenericError(self.mode, workflowname, msg, str(ex), str(request))
continue
return counterOk

def docTransformer(self, doc):
"""
Expand Down Expand Up @@ -1023,3 +1030,24 @@ def alertDatatierNotFound(self, datatierName, containerName, isRelVal):
self.logger.critical(alertDescription)
if self.msConfig["sendNotification"] and not isRelVal:
self.sendAlert(alertName, alertSeverity, alertSummary, alertDescription, self.alertServiceName)

def alertGenericError(self, caller, workflowname, msg, exMsg, document):
"""
Send an alert to Prometheus in the case of a generic error with ms-output
:param caller: str, indicates if the error comes from Producer or Consumer
:param workflowname: str, representing the workflow name
:param msg: str, context about the error
:param exMsg: str, excetpion message
:param document: str, serialized mongodb document
:return: none
"""
alertName = "ms-output: Generic MSOutput error inside {} while processing workflow '{}'".format(caller, workflowname)
alertSeverity = "high"
alertSummary = "[MSOutput] Generic MSOutput error inside {} while processing workflow '{}'".format(caller, workflowname)
alertDescription = "wf: {}\n\nmsg: {}\n\nex: {}\n\n{}".format(workflowname, msg, exMsg, document)
self.logger.error("%s\n%s\n%s", alertName, alertSummary, alertDescription)
if self.msConfig["sendNotification"]:
self.sendAlert(alertName, alertSeverity, alertSummary, alertDescription, self.alertServiceName)


0 comments on commit 9a702b2

Please sign in to comment.