diff --git a/src/python/WMCore/MicroService/MSOutput/MSOutput.py b/src/python/WMCore/MicroService/MSOutput/MSOutput.py index e989f32938..0b31215177 100644 --- a/src/python/WMCore/MicroService/MSOutput/MSOutput.py +++ b/src/python/WMCore/MicroService/MSOutput/MSOutput.py @@ -519,6 +519,7 @@ def msOutputConsumer(self): (msPipelineNonRelVal, self.msOutNonRelValColl)] for pipeColl in pipeCollections: wfCounters = 0 + wfCountersOk = 0 pipeLine = pipeColl[0] dbColl = pipeColl[1] pipeLineName = pipeLine.getPipelineName() @@ -526,8 +527,10 @@ def msOutputConsumer(self): # FIXME: # To redefine those exceptions as MSoutputExceptions and # start using those here so we do not mix with general errors + wfCounters += 1 try: pipeLine.run(docOut) + wfCountersOk += 1 except (KeyError, TypeError) as ex: msg = "%s Possibly malformed record in MongoDB. Err: %s. " % (pipeLineName, str(ex)) msg += "Continue to the next document." @@ -542,10 +545,12 @@ def msOutputConsumer(self): msg = "%s General error from pipeline. Err: %s. " % (pipeLineName, str(ex)) msg += "Will retry again in the next cycle." self.logger.exception(msg) - break - wfCounters += 1 - self.logger.info("Processed %d workflows from pipeline: %s", wfCounters, pipeLineName) - wfCounterTotal += wfCounters + workflowname = docOut.get("_id", "") + self.alertGenericError(self.mode, workflowname, msg, str(ex), str(docOut)) + continue + self.logger.info("Successfully processed %d workflows from pipeline: %s", wfCountersOk, pipeLineName) + self.logger.info("Failed to proccess %d workflows from pipeline: %s", wfCounters - wfCountersOk, pipeLineName) + wfCounterTotal += wfCountersOk return wfCounterTotal @@ -579,12 +584,12 @@ def msOutputProducer(self, requestRecords): Functor(self.docCleaner)]) # TODO: # To generate the object from within the Function scope see above. - counter = 0 + counterOk = 0 for request in requestRecords: - counter += 1 pipeLineName = msPipeline.getPipelineName() try: msPipeline.run(request) + counterOk += 1 except (KeyError, TypeError) as ex: msg = "%s Possibly broken read from ReqMgr2 API or other. Err: %s." % (pipeLineName, str(ex)) msg += " Continue to the next document." @@ -594,8 +599,10 @@ def msOutputProducer(self, requestRecords): msg = "%s General Error from pipeline. Err: %s. " % (pipeLineName, str(ex)) msg += "Giving up Now." self.logger.exception(str(ex)) - break - return counter + workflowname = request.get("_id", "") + self.alertGenericError(self.mode, workflowname, msg, str(ex), str(request)) + continue + return counterOk def docTransformer(self, doc): """ @@ -1023,3 +1030,24 @@ def alertDatatierNotFound(self, datatierName, containerName, isRelVal): self.logger.critical(alertDescription) if self.msConfig["sendNotification"] and not isRelVal: self.sendAlert(alertName, alertSeverity, alertSummary, alertDescription, self.alertServiceName) + + def alertGenericError(self, caller, workflowname, msg, exMsg, document): + """ + Send an alert to Prometheus in the case of a generic error with ms-output + + :param caller: str, indicates if the error comes from Producer or Consumer + :param workflowname: str, representing the workflow name + :param msg: str, context about the error + :param exMsg: str, excetpion message + :param document: str, serialized mongodb document + :return: none + """ + alertName = "ms-output: Generic MSOutput error inside {} while processing workflow '{}'".format(caller, workflowname) + alertSeverity = "high" + alertSummary = "[MSOutput] Generic MSOutput error inside {} while processing workflow '{}'".format(caller, workflowname) + alertDescription = "wf: {}\n\nmsg: {}\n\nex: {}\n\n{}".format(workflowname, msg, exMsg, document) + self.logger.error("%s\n%s\n%s", alertName, alertSummary, alertDescription) + if self.msConfig["sendNotification"]: + self.sendAlert(alertName, alertSeverity, alertSummary, alertDescription, self.alertServiceName) + +