-
Notifications
You must be signed in to change notification settings - Fork 20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add handler metrics to bus and saga #101
Conversation
Merge the 1.0.2 release into master
merge v1.x into master
merge v1.x into master
…on), add registration for saga handlers
gbus/metrics/handler_metrics.go
Outdated
return &HandlerMetrics{ | ||
result: promauto.NewCounterVec( | ||
prometheus.CounterOpts{ | ||
Namespace: GrabbitNamespace, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will add the grabbit prefix to the metrics name
gbus/metrics/handler_metrics.go
Outdated
} | ||
|
||
func RunHandlerWithMetric(handleMessage func() error, handlerName string, logger logrus.FieldLogger) error { | ||
handlerMetrics, ok := handlerMetricsByHandlerName[handlerName] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
synchronizing access to the map ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do I need to synchronize reads?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it can conflict with a write. maybe change the type to sync.Map instead of the regular map and put it behind you
gbus/abstractions.go
Outdated
@@ -109,6 +112,12 @@ type HandlerRegister interface { | |||
//MessageHandler signature for all command handlers | |||
type MessageHandler func(invocation Invocation, message *BusMessage) error | |||
|
|||
func (mg MessageHandler) Name() string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't feel right that implementation is defined the abstractions.go file.
Maybe extract both the MessageHandler and the Name() method to a message_handler.go file ?
gbus/builder/builder.go
Outdated
@@ -107,6 +107,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus { | |||
panic(err) | |||
} | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
file contains no changes (other than a blank line) should be reverted and not included in the PR
gbus/metrics/handler_metrics.go
Outdated
return err | ||
} | ||
|
||
func ReportHandlerExceededMaxRetries(handlerName string, logger logrus.FieldLogger) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not think this metric should be reported on handlers but rather on rejected messages.
In the current setup and with the above code if a message s rejected it may result in multiple metrics reported for the same message which seem wrong.
think that this should be in a message_metrics.go and named ReportRejectedMessage.
@danielwitz, @vladshub WDYT ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
regardless the metric for failed message handling might happen multiple times for the same message, didn't find a way around that.
In the current setup if the handler exceeds the retries count the metric failure will be increased by that count and we'll have increased the exceeded retries metric only once.
Rejected message might be a better name though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the handler can indeed be invoked more than once that is why the rejected message metric should only be reported (once) after all retries fail.
gbus/worker.go
Outdated
@@ -321,6 +320,9 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { | |||
if err == nil { | |||
_ = worker.ack(delivery) | |||
} else { | |||
for _, handler := range handlers { | |||
metrics.ReportHandlerExceededMaxRetries(handler.Name(), worker.log()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
think this metric should be replaced with a ReportMessageRejected metric that is called only once if the message is rejected and not per handler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And the report should not include the handler name but the message name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think maybe we should have both, cause it's on the line between the messages domain and the handlers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@danielwitz what benefit do we gain with reporting this metric on the handler level...won't it just create info overload that won't be used ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rhinof thought about it some more and got to the conclusion that a metric about rejected message is exactly what we wanted.
I'll add that metric instead of the ExceededMaxRetries
* add handler metrics to bus and saga (#101) * add handler metrics to bus and saga + tests * fix build * add 0 to the default buckets to catch fast message handling * PR correction - changed latency to summary(removed bucket configuration), add registration for saga handlers * PR correction - getting logger as a param * PR correction - new line in eof * PR corrections message handler + sync.map + latency as summary * add rejected messages metric * dead letter handler should reject messages on failures and rollbacks and ack on commit success (#105) * dead letter handler should reject messages on failures and rollbacks * dead letter handler should reject messages on failures and rollbacks * dead letter handler should reject messages on failures and rollbacks * dead letter handler should reject messages on failures and rollbacks * return an error from the saga store when deleting a saga if saga can not (#110) be found In order to deal with concurrent deletes of the sage saga instance we would wan't to indicate that deleting the saga failed if the saga is not stored so callers can take proper action * Persisted timeouts (#107) * decouple transaction manager from glue * moved timeout manager to gbus/tx package * initial commit in order to support persisted timeouts * first working version of a mysql persisted timeout manager * fixing ci lint errors * refactored ensure schema of timeout manager * cleanup timeout manager when bs shuts down * fixing formatting issues * changed logging level from Info to Debug when inserting a new timeout * resusing timeouts tablename (PR review) * renamed AcceptTimeoutFunction to SetTimeoutFunction on the TimeoutManager interface (PR review) * refactored glue to implement the Logged inetrface and use the GLogged helper struct * locking timeout record before executing timeout In order to prevent having a timeout beeing executed twice due to two concurrent grabbit instances running the same service a lock (FOR UPDATE) has been placed on the timeout record in the scope of the executing transaction * Commiting the select transaction when querying for pending timeouts * feat(timeout:lock): This is done in order to reduce contention and allow for parallel processing in case of multiple grabbit instances * Enable returning a message back from the dead to the queue (#112) * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * return to q * return to q * return to q * return to q * return dead to q * allow no retries * test - resend dead to queue * test - resend dead to queue * test - resend dead to queue * test - resend dead to queue - fixes after cr * test - resend dead to queue - fixes after cr * test - resend dead to queue - fixes after cr * added metric report on saga timeout (#114) 1) added reporting saga timeouts to the glue component 2) fixed mysql timeoutmanager error when trying to clear a timeout * Added documentation for grabbit metrics (#117) * added initial documentation for grabbit metrics * including metrics section in readme.md * fixing goreportcard issues (#118) * removed logging a warning when worker message channel returns an error (#116) * corrected saga metrics name and added to metrics documentation (#119) * corrected saga metrics name and added documentatio * corrected saga metric name * corrected typos * removed non transactional bus mode (#120)
* add handler metrics to bus and saga (#101) * add handler metrics to bus and saga + tests * fix build * add 0 to the default buckets to catch fast message handling * PR correction - changed latency to summary(removed bucket configuration), add registration for saga handlers * PR correction - getting logger as a param * PR correction - new line in eof * PR corrections message handler + sync.map + latency as summary * add rejected messages metric * dead letter handler should reject messages on failures and rollbacks and ack on commit success (#105) * dead letter handler should reject messages on failures and rollbacks * dead letter handler should reject messages on failures and rollbacks * dead letter handler should reject messages on failures and rollbacks * dead letter handler should reject messages on failures and rollbacks * return an error from the saga store when deleting a saga if saga can not (#110) be found In order to deal with concurrent deletes of the sage saga instance we would wan't to indicate that deleting the saga failed if the saga is not stored so callers can take proper action * Persisted timeouts (#107) * decouple transaction manager from glue * moved timeout manager to gbus/tx package * initial commit in order to support persisted timeouts * first working version of a mysql persisted timeout manager * fixing ci lint errors * refactored ensure schema of timeout manager * cleanup timeout manager when bs shuts down * fixing formatting issues * changed logging level from Info to Debug when inserting a new timeout * resusing timeouts tablename (PR review) * renamed AcceptTimeoutFunction to SetTimeoutFunction on the TimeoutManager interface (PR review) * refactored glue to implement the Logged inetrface and use the GLogged helper struct * locking timeout record before executing timeout In order to prevent having a timeout beeing executed twice due to two concurrent grabbit instances running the same service a lock (FOR UPDATE) has been placed on the timeout record in the scope of the executing transaction * Commiting the select transaction when querying for pending timeouts * feat(timeout:lock): This is done in order to reduce contention and allow for parallel processing in case of multiple grabbit instances * Enable returning a message back from the dead to the queue (#112) * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * return to q * return to q * return to q * return to q * return dead to q * allow no retries * test - resend dead to queue * test - resend dead to queue * test - resend dead to queue * test - resend dead to queue - fixes after cr * test - resend dead to queue - fixes after cr * test - resend dead to queue - fixes after cr * added metric report on saga timeout (#114) 1) added reporting saga timeouts to the glue component 2) fixed mysql timeoutmanager error when trying to clear a timeout * Added documentation for grabbit metrics (#117) * added initial documentation for grabbit metrics * including metrics section in readme.md * fixing goreportcard issues (#118) * removed logging a warning when worker message channel returns an error (#116) * corrected saga metrics name and added to metrics documentation (#119) * corrected saga metrics name and added documentatio * corrected saga metric name * corrected typos * removed non transactional bus mode (#120) * remove fields * remove fields * go fmt and go lint error fixes to improve goreportcard (#126) * go fmt on some files * go fmt * added comments on exported types * cunsume the messages channel via ranging over the channel to prevent (#125) empty delivreies * Migrations functionality (#111) * implement migrations * implement migrations * implement migrations * implement migrations * implement migrations * migrations * migrations * migrations * migrations * migrations * migrations * migrations * fix tests error * add migrations * migrations - timeout table migration * test - resend dead to queue - fixes after cr * migraration to grabbit (use forked migrator) * remove fields * remove fields * remove fields * remove fields * touch
* add handler metrics to bus and saga (#101) * add handler metrics to bus and saga + tests * fix build * add 0 to the default buckets to catch fast message handling * PR correction - changed latency to summary(removed bucket configuration), add registration for saga handlers * PR correction - getting logger as a param * PR correction - new line in eof * PR corrections message handler + sync.map + latency as summary * add rejected messages metric * dead letter handler should reject messages on failures and rollbacks and ack on commit success (#105) * dead letter handler should reject messages on failures and rollbacks * dead letter handler should reject messages on failures and rollbacks * dead letter handler should reject messages on failures and rollbacks * dead letter handler should reject messages on failures and rollbacks * return an error from the saga store when deleting a saga if saga can not (#110) be found In order to deal with concurrent deletes of the sage saga instance we would wan't to indicate that deleting the saga failed if the saga is not stored so callers can take proper action * Persisted timeouts (#107) * decouple transaction manager from glue * moved timeout manager to gbus/tx package * initial commit in order to support persisted timeouts * first working version of a mysql persisted timeout manager * fixing ci lint errors * refactored ensure schema of timeout manager * cleanup timeout manager when bs shuts down * fixing formatting issues * changed logging level from Info to Debug when inserting a new timeout * resusing timeouts tablename (PR review) * renamed AcceptTimeoutFunction to SetTimeoutFunction on the TimeoutManager interface (PR review) * refactored glue to implement the Logged inetrface and use the GLogged helper struct * locking timeout record before executing timeout In order to prevent having a timeout beeing executed twice due to two concurrent grabbit instances running the same service a lock (FOR UPDATE) has been placed on the timeout record in the scope of the executing transaction * Commiting the select transaction when querying for pending timeouts * feat(timeout:lock): This is done in order to reduce contention and allow for parallel processing in case of multiple grabbit instances * Enable returning a message back from the dead to the queue (#112) * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * return to q * return to q * return to q * return to q * return dead to q * allow no retries * test - resend dead to queue * test - resend dead to queue * test - resend dead to queue * test - resend dead to queue - fixes after cr * test - resend dead to queue - fixes after cr * test - resend dead to queue - fixes after cr * added metric report on saga timeout (#114) 1) added reporting saga timeouts to the glue component 2) fixed mysql timeoutmanager error when trying to clear a timeout * Added documentation for grabbit metrics (#117) * added initial documentation for grabbit metrics * including metrics section in readme.md * fixing goreportcard issues (#118) * removed logging a warning when worker message channel returns an error (#116) * corrected saga metrics name and added to metrics documentation (#119) * corrected saga metrics name and added documentatio * corrected saga metric name * corrected typos * removed non transactional bus mode (#120) * remove fields * remove fields * go fmt and go lint error fixes to improve goreportcard (#126) * go fmt on some files * go fmt * added comments on exported types * cunsume the messages channel via ranging over the channel to prevent (#125) empty delivreies * Migrations functionality (#111) * implement migrations * implement migrations * implement migrations * implement migrations * implement migrations * migrations * migrations * migrations * migrations * migrations * migrations * migrations * fix tests error * add migrations * migrations - timeout table migration * test - resend dead to queue - fixes after cr * migraration to grabbit (use forked migrator) * remove fields * remove fields * remove fields * remove fields * sanitize migrations table name (#130) * more linting fixes for goreportcard (#129) * added metrics on deadLetterHandler, refactored HandleDeadLetter inter… (#122) * added metrics on deadLetterHandler, refactored HandleDeadLetter interface to receive new DeadLetterMessageHandler type * fix dead letter test and a build error * added documentation for DeadLetterMessageHandler, also fixed poison spelling throughout code * retrigger build * align migrations table name with grabbit convention (#140) * Improved tracing and added documentation (#142) * Support handling raw message (#138) * added call to worker.span.Finish() when exiting processMessage (#145) * bug fix - when a deadletterhandler panics grabbit fails to reject the… (#136) * bug fix - when a deadletterhandler panics grabbit fails to reject the message * bug fix - when a deadletterhandler panics grabbit fails to reject the message * BPINFRA125 - MERGE MASTER INTO BRANCH * calling channel.Cancel when worker is stopped (#149) * Handle empty body messages (#147) * fixing golint warnings from goreport card (#150) * more golint fixes (#152)
* add handler metrics to bus and saga (#101) * add handler metrics to bus and saga + tests * fix build * add 0 to the default buckets to catch fast message handling * PR correction - changed latency to summary(removed bucket configuration), add registration for saga handlers * PR correction - getting logger as a param * PR correction - new line in eof * PR corrections message handler + sync.map + latency as summary * add rejected messages metric * dead letter handler should reject messages on failures and rollbacks and ack on commit success (#105) * dead letter handler should reject messages on failures and rollbacks * dead letter handler should reject messages on failures and rollbacks * dead letter handler should reject messages on failures and rollbacks * dead letter handler should reject messages on failures and rollbacks * return an error from the saga store when deleting a saga if saga can not (#110) be found In order to deal with concurrent deletes of the sage saga instance we would wan't to indicate that deleting the saga failed if the saga is not stored so callers can take proper action * Persisted timeouts (#107) * decouple transaction manager from glue * moved timeout manager to gbus/tx package * initial commit in order to support persisted timeouts * first working version of a mysql persisted timeout manager * fixing ci lint errors * refactored ensure schema of timeout manager * cleanup timeout manager when bs shuts down * fixing formatting issues * changed logging level from Info to Debug when inserting a new timeout * resusing timeouts tablename (PR review) * renamed AcceptTimeoutFunction to SetTimeoutFunction on the TimeoutManager interface (PR review) * refactored glue to implement the Logged inetrface and use the GLogged helper struct * locking timeout record before executing timeout In order to prevent having a timeout beeing executed twice due to two concurrent grabbit instances running the same service a lock (FOR UPDATE) has been placed on the timeout record in the scope of the executing transaction * Commiting the select transaction when querying for pending timeouts * feat(timeout:lock): This is done in order to reduce contention and allow for parallel processing in case of multiple grabbit instances * Enable returning a message back from the dead to the queue (#112) * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * enable sending raw messages * return to q * return to q * return to q * return to q * return dead to q * allow no retries * test - resend dead to queue * test - resend dead to queue * test - resend dead to queue * test - resend dead to queue - fixes after cr * test - resend dead to queue - fixes after cr * test - resend dead to queue - fixes after cr * added metric report on saga timeout (#114) 1) added reporting saga timeouts to the glue component 2) fixed mysql timeoutmanager error when trying to clear a timeout * Added documentation for grabbit metrics (#117) * added initial documentation for grabbit metrics * including metrics section in readme.md * fixing goreportcard issues (#118) * removed logging a warning when worker message channel returns an error (#116) * corrected saga metrics name and added to metrics documentation (#119) * corrected saga metrics name and added documentatio * corrected saga metric name * corrected typos * removed non transactional bus mode (#120) * remove fields * remove fields * go fmt and go lint error fixes to improve goreportcard (#126) * go fmt on some files * go fmt * added comments on exported types * cunsume the messages channel via ranging over the channel to prevent (#125) empty delivreies * Migrations functionality (#111) * implement migrations * implement migrations * implement migrations * implement migrations * implement migrations * migrations * migrations * migrations * migrations * migrations * migrations * migrations * fix tests error * add migrations * migrations - timeout table migration * test - resend dead to queue - fixes after cr * migraration to grabbit (use forked migrator) * remove fields * remove fields * remove fields * remove fields * sanitize migrations table name (#130) * more linting fixes for goreportcard (#129) * added metrics on deadLetterHandler, refactored HandleDeadLetter inter… (#122) * added metrics on deadLetterHandler, refactored HandleDeadLetter interface to receive new DeadLetterMessageHandler type * fix dead letter test and a build error * added documentation for DeadLetterMessageHandler, also fixed poison spelling throughout code * retrigger build * align migrations table name with grabbit convention (#140) * Improved tracing and added documentation (#142) * Support handling raw message (#138) * added call to worker.span.Finish() when exiting processMessage (#145) * bug fix - when a deadletterhandler panics grabbit fails to reject the… (#136) * bug fix - when a deadletterhandler panics grabbit fails to reject the message * bug fix - when a deadletterhandler panics grabbit fails to reject the message * BPINFRA125 - MERGE MASTER INTO BRANCH * calling channel.Cancel when worker is stopped (#149) * Handle empty body messages (#147) * fixing golint warnings from goreport card (#150) * more golint fixes (#152)
Adding:
Relates to this issue