Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ms-output - do not break out producer and consumer loops #12194

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 36 additions & 8 deletions src/python/WMCore/MicroService/MSOutput/MSOutput.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,15 +519,18 @@ def msOutputConsumer(self):
(msPipelineNonRelVal, self.msOutNonRelValColl)]
for pipeColl in pipeCollections:
wfCounters = 0
wfCountersOk = 0
pipeLine = pipeColl[0]
dbColl = pipeColl[1]
pipeLineName = pipeLine.getPipelineName()
for docOut in self.getDocsFromMongo(mQueryDict, dbColl, self.msConfig['limitRequestsPerCycle']):
# FIXME:
# To redefine those exceptions as MSoutputExceptions and
# start using those here so we do not mix with general errors
wfCounters += 1
try:
pipeLine.run(docOut)
wfCountersOk += 1
except (KeyError, TypeError) as ex:
msg = "%s Possibly malformed record in MongoDB. Err: %s. " % (pipeLineName, str(ex))
msg += "Continue to the next document."
Expand All @@ -542,10 +545,12 @@ def msOutputConsumer(self):
msg = "%s General error from pipeline. Err: %s. " % (pipeLineName, str(ex))
msg += "Will retry again in the next cycle."
self.logger.exception(msg)
break
wfCounters += 1
self.logger.info("Processed %d workflows from pipeline: %s", wfCounters, pipeLineName)
wfCounterTotal += wfCounters
workflowname = docOut.get("_id", "")
self.alertGenericError(self.mode, workflowname, msg, str(ex), str(docOut))
continue
self.logger.info("Successfully processed %d workflows from pipeline: %s", wfCountersOk, pipeLineName)
self.logger.info("Failed to proccess %d workflows from pipeline: %s", wfCounters - wfCountersOk, pipeLineName)
wfCounterTotal += wfCountersOk
mapellidario marked this conversation as resolved.
Show resolved Hide resolved

return wfCounterTotal

Expand Down Expand Up @@ -579,12 +584,12 @@ def msOutputProducer(self, requestRecords):
Functor(self.docCleaner)])
# TODO:
# To generate the object from within the Function scope see above.
counter = 0
counterOk = 0
for request in requestRecords:
counter += 1
pipeLineName = msPipeline.getPipelineName()
try:
msPipeline.run(request)
counterOk += 1
except (KeyError, TypeError) as ex:
msg = "%s Possibly broken read from ReqMgr2 API or other. Err: %s." % (pipeLineName, str(ex))
msg += " Continue to the next document."
Expand All @@ -594,8 +599,10 @@ def msOutputProducer(self, requestRecords):
msg = "%s General Error from pipeline. Err: %s. " % (pipeLineName, str(ex))
msg += "Giving up Now."
self.logger.exception(str(ex))
break
return counter
workflowname = request.get("_id", "")
self.alertGenericError(self.mode, workflowname, msg, str(ex), str(request))
continue
return counterOk

def docTransformer(self, doc):
"""
Expand Down Expand Up @@ -1023,3 +1030,24 @@ def alertDatatierNotFound(self, datatierName, containerName, isRelVal):
self.logger.critical(alertDescription)
if self.msConfig["sendNotification"] and not isRelVal:
self.sendAlert(alertName, alertSeverity, alertSummary, alertDescription, self.alertServiceName)

def alertGenericError(self, caller, workflowname, msg, exMsg, document):
"""
Send an alert to Prometheus in the case of a generic error with ms-output

:param caller: str, indicates if the error comes from Producer or Consumer
:param workflowname: str, representing the workflow name
:param msg: str, context about the error
:param exMsg: str, excetpion message
:param document: str, serialized mongodb document
:return: none
"""
alertName = "ms-output: Generic MSOutput error inside {} while processing workflow '{}'".format(caller, workflowname)
alertSeverity = "high"
alertSummary = "[MSOutput] Generic MSOutput error inside {} while processing workflow '{}'".format(caller, workflowname)
alertDescription = "wf: {}\n\nmsg: {}\n\nex: {}\n\n{}".format(workflowname, msg, exMsg, document)
self.logger.error("%s\n%s\n%s", alertName, alertSummary, alertDescription)
if self.msConfig["sendNotification"]:
self.sendAlert(alertName, alertSeverity, alertSummary, alertDescription, self.alertServiceName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we explicitly set an expiration time for this alert? Without it, what is the default value?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The configuration is here https://gitlab.cern.ch/cmsmonitoring/cmsmon-configs/-/blob/master/alertmanager/alertmanager.yaml
and we use this function:

def sendAlert(self, alertName, severity, summary, description, service, tag="wmcore", endSecs=600, generatorURL=""):

so:

  • we send an alert with the tag wmcore
  • the route uses the tag to match the alert and redirect it to "dmwm admins" here
  • the "dmwm admins" receiver is configured to send an email to our egroup here
  • after endSecs = 10min the alert is silenced (this overrides the global resolve timeout of 5min)
  • if the same alert is sent before repeat_interval: 2h, then alertmanager will not send the same notification again

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, Dario. Where did you come with repeat_interval configuration from?

I am undecided whether we should make this time for re-raising an alert larger or not, as a fear of spam.
Would 12h be a better option?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's always the same file, here

we can change to 12h if:

  • we create a new receiver for alertmanager (for example dmwm-admins-12h) , and we override the default values (for example setting 12h for repeat interval)
  • in msoutput.py, we override the value for tag, let's say wmcore12h
  • we create a new redirect for alertmanager that matches the tag wmcore12h and send the alert to dmwm-admins-12h

at this point it would be beneficial to have a broader discussion on our alerts, because we could take this opportunity to improve the situation across the board.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for these details.
Given that we would have to either change the default repeat_interval value or fork it for dmwm, I would suggest to leave it for one of the monitoring-related issues that we are discussing and considering for Q1. I fully agree that a discussion on that is important, so I suggest to keep it out of these developments.