From 4e9f262333331a7f668cd73140a24b3170425a67 Mon Sep 17 00:00:00 2001 From: Weston Schmidt Date: Wed, 28 Aug 2019 19:42:02 -0700 Subject: [PATCH 1/2] Update the expired metric to explicitly cover only messages that expire in the queue. A new label is present for prior to queueing. --- src/caduceus/metrics.go | 74 ++++++++++++++++++++----- src/caduceus/mocks_test.go | 4 +- src/caduceus/outboundSender.go | 84 ++++++++++++++++++----------- src/caduceus/outboundSender_test.go | 7 +++ src/caduceus/senderWrapper_test.go | 8 +++ 5 files changed, 132 insertions(+), 45 deletions(-) diff --git a/src/caduceus/metrics.go b/src/caduceus/metrics.go index fad50b7e..cd187c21 100644 --- a/src/caduceus/metrics.go +++ b/src/caduceus/metrics.go @@ -5,18 +5,24 @@ import ( ) const ( - ErrorRequestBodyCounter = "error_request_body_count" - EmptyRequestBodyCounter = "empty_request_body_count" - DeliveryCounter = "delivery_count" - DeliveryRetryCounter = "delivery_retry_count" - SlowConsumerDroppedMsgCounter = "slow_consumer_dropped_message_count" - SlowConsumerCounter = "slow_consumer_cut_off_count" - IncomingQueueDepth = "incoming_queue_depth" - IncomingContentTypeCounter = "incoming_content_type_count" - IncomingEventTypeCounter = "incoming_event_type_count" - DropsDueToInvalidPayload = "drops_due_to_invalid_payload" - OutgoingQueueDepth = "outgoing_queue_depths" - DropsDueToPanic = "drops_due_to_panic" + ErrorRequestBodyCounter = "error_request_body_count" + EmptyRequestBodyCounter = "empty_request_body_count" + DeliveryCounter = "delivery_count" + DeliveryRetryCounter = "delivery_retry_count" + DeliveryRetryMaxGauge = "delivery_retry_max" + SlowConsumerDroppedMsgCounter = "slow_consumer_dropped_message_count" + SlowConsumerCounter = "slow_consumer_cut_off_count" + IncomingQueueDepth = "incoming_queue_depth" + IncomingContentTypeCounter = "incoming_content_type_count" + IncomingEventTypeCounter = "incoming_event_type_count" + DropsDueToInvalidPayload = "drops_due_to_invalid_payload" + OutgoingQueueDepth = "outgoing_queue_depths" + DropsDueToPanic = "drops_due_to_panic" + ConsumerRenewalTimeGauge = "consumer_renewal_time" + ConsumerDeliverUntilGauge = "consumer_deliver_until" + ConsumerDropUntilGauge = "consumer_drop_until" + ConsumerDeliveryWorkersGauge = "consumer_delivery_workers" + ConsumerMaxDeliveryWorkersGauge = "consumer_delivery_workers_max" ) func Metrics() []xmetrics.Metric { @@ -53,6 +59,12 @@ func Metrics() []xmetrics.Metric { Type: "counter", LabelNames: []string{"url", "event"}, }, + { + Name: DeliveryRetryMaxGauge, + Help: "Maximum number of delivery retries attempted", + Type: "gauge", + LabelNames: []string{"url"}, + }, { Name: DeliveryCounter, Help: "Count of delivered messages to a url with a status code", @@ -89,19 +101,57 @@ func Metrics() []xmetrics.Metric { Type: "counter", LabelNames: []string{"url"}, }, + { + Name: ConsumerRenewalTimeGauge, + Help: "Time when the consumer data was updated.", + Type: "gauge", + LabelNames: []string{"url"}, + }, + { + Name: ConsumerDeliverUntilGauge, + Help: "Time when the consumer's registration expires and events will be dropped.", + Type: "gauge", + LabelNames: []string{"url"}, + }, + { + Name: ConsumerDropUntilGauge, + Help: "The time after which events going to a customer will be delivered.", + Type: "gauge", + LabelNames: []string{"url"}, + }, + { + Name: ConsumerDeliveryWorkersGauge, + Help: "The number of active delivery workers for a particular customer.", + Type: "gauge", + LabelNames: []string{"url"}, + }, + { + Name: ConsumerMaxDeliveryWorkersGauge, + Help: "The maximum number of delivery workers available for a particular customer.", + Type: "gauge", + LabelNames: []string{"url"}, + }, } } func CreateOutbounderMetrics(m CaduceusMetricsRegistry, c *CaduceusOutboundSender) { c.deliveryCounter = m.NewCounter(DeliveryCounter) c.deliveryRetryCounter = m.NewCounter(DeliveryRetryCounter) + c.deliveryRetryMaxGauge = m.NewGauge(DeliveryRetryMaxGauge) c.cutOffCounter = m.NewCounter(SlowConsumerCounter).With("url", c.id) c.droppedQueueFullCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "queue_full") c.droppedExpiredCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "expired") + c.droppedExpiredBeforeQueueCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "expired_before_queueing") + c.droppedCutoffCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "cut_off") c.droppedInvalidConfig = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "invalid_config") c.droppedNetworkErrCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "network_err") c.droppedPanic = m.NewCounter(DropsDueToPanic).With("url", c.id) c.queueDepthGauge = m.NewGauge(OutgoingQueueDepth).With("url", c.id) c.contentTypeCounter = m.NewCounter(IncomingContentTypeCounter) + c.renewalTimeGauge = m.NewGauge(ConsumerRenewalTimeGauge).With("url", c.id) + c.deliverUntilGauge = m.NewGauge(ConsumerDeliverUntilGauge).With("url", c.id) + c.dropUntilGauge = m.NewGauge(ConsumerDropUntilGauge).With("url", c.id) + c.currentWorkersGauge = m.NewGauge(ConsumerDeliveryWorkersGauge).With("url", c.id) + c.maxWorkersGauge = m.NewGauge(ConsumerMaxDeliveryWorkersGauge).With("url", c.id) } diff --git a/src/caduceus/mocks_test.go b/src/caduceus/mocks_test.go index c0523a22..e00798a8 100644 --- a/src/caduceus/mocks_test.go +++ b/src/caduceus/mocks_test.go @@ -119,7 +119,9 @@ func (m *mockGauge) Add(delta float64) { } func (m *mockGauge) Set(value float64) { - m.Called(value) + // We're setting time values & the ROI seems pretty low with this level + // of validation... + //m.Called(value) } func (m *mockGauge) With(labelValues ...string) metrics.Gauge { diff --git a/src/caduceus/outboundSender.go b/src/caduceus/outboundSender.go index 8eb324ea..8fdd7546 100644 --- a/src/caduceus/outboundSender.go +++ b/src/caduceus/outboundSender.go @@ -115,37 +115,44 @@ type OutboundSender interface { // CaduceusOutboundSender is the outbound sender object. type CaduceusOutboundSender struct { - id string - urls *ring.Ring - listener webhook.W - deliverUntil time.Time - dropUntil time.Time - sender func(*http.Request) (*http.Response, error) - events []*regexp.Regexp - matcher []*regexp.Regexp - queueSize int - queue chan *wrp.Message - deliveryRetries int - deliveryInterval time.Duration - deliveryCounter metrics.Counter - deliveryRetryCounter metrics.Counter - droppedQueueFullCounter metrics.Counter - droppedCutoffCounter metrics.Counter - droppedExpiredCounter metrics.Counter - droppedNetworkErrCounter metrics.Counter - droppedInvalidConfig metrics.Counter - droppedPanic metrics.Counter - cutOffCounter metrics.Counter - contentTypeCounter metrics.Counter - queueDepthGauge metrics.Gauge - eventType metrics.Counter - wg sync.WaitGroup - cutOffPeriod time.Duration - workers semaphore.Interface - maxWorkers int - failureMsg FailureMessage - logger log.Logger - mutex sync.RWMutex + id string + urls *ring.Ring + listener webhook.W + deliverUntil time.Time + dropUntil time.Time + sender func(*http.Request) (*http.Response, error) + events []*regexp.Regexp + matcher []*regexp.Regexp + queueSize int + queue chan *wrp.Message + deliveryRetries int + deliveryInterval time.Duration + deliveryCounter metrics.Counter + deliveryRetryCounter metrics.Counter + droppedQueueFullCounter metrics.Counter + droppedCutoffCounter metrics.Counter + droppedExpiredCounter metrics.Counter + droppedExpiredBeforeQueueCounter metrics.Counter + droppedNetworkErrCounter metrics.Counter + droppedInvalidConfig metrics.Counter + droppedPanic metrics.Counter + cutOffCounter metrics.Counter + contentTypeCounter metrics.Counter + queueDepthGauge metrics.Gauge + renewalTimeGauge metrics.Gauge + deliverUntilGauge metrics.Gauge + dropUntilGauge metrics.Gauge + maxWorkersGauge metrics.Gauge + currentWorkersGauge metrics.Gauge + deliveryRetryMaxGauge metrics.Gauge + eventType metrics.Counter + wg sync.WaitGroup + cutOffPeriod time.Duration + workers semaphore.Interface + maxWorkers int + failureMsg FailureMessage + logger log.Logger + mutex sync.RWMutex } // New creates a new OutboundSender object from the factory, or returns an error. @@ -270,6 +277,8 @@ func (obs *CaduceusOutboundSender) Update(wh webhook.W) (err error) { } } + obs.renewalTimeGauge.Set(float64(time.Now().Unix())) + // write/update obs obs.mutex.Lock() @@ -281,12 +290,15 @@ func (obs *CaduceusOutboundSender) Update(wh webhook.W) (err error) { obs.listener.FailureURL = wh.FailureURL obs.deliverUntil = wh.Until + obs.deliverUntilGauge.Set(float64(obs.deliverUntil.Unix())) + obs.events = events // update default deliver retry count for sender if wh.Config.MaxRetryCount != 0 { obs.deliveryRetries = wh.Config.MaxRetryCount } + obs.deliveryRetryMaxGauge.Set(float64(obs.deliveryRetries)) // if matcher list is empty set it nil for Queue() logic obs.matcher = nil @@ -314,6 +326,9 @@ func (obs *CaduceusOutboundSender) Update(wh webhook.W) (err error) { offset-- } + // Update this here in case we make this configurable later + obs.maxWorkersGauge.Set(float64(obs.maxWorkers)) + obs.mutex.Unlock() return @@ -328,12 +343,14 @@ func (obs *CaduceusOutboundSender) Shutdown(gentle bool) { obs.mutex.Lock() if false == gentle { obs.deliverUntil = time.Time{} + obs.deliverUntilGauge.Set(float64(obs.deliverUntil.Unix())) } obs.mutex.Unlock() obs.wg.Wait() obs.mutex.Lock() obs.deliverUntil = time.Time{} + obs.deliverUntilGauge.Set(float64(obs.deliverUntil.Unix())) obs.mutex.Unlock() } @@ -433,7 +450,7 @@ func (obs *CaduceusOutboundSender) isValidTimeWindow(now, dropUntil, deliverUnti if false == now.Before(deliverUntil) { debugLog.Log(logging.MessageKey(), "Outside delivery window", "now", now, "before", deliverUntil, "after", dropUntil) - obs.droppedExpiredCounter.Add(1.0) + obs.droppedExpiredBeforeQueueCounter.Add(1.0) return false } @@ -469,6 +486,7 @@ func (obs *CaduceusOutboundSender) dispatcher() { continue } obs.workers.Acquire() + obs.currentWorkersGauge.Add(1.0) go obs.send(urls, secret, accept, msg) } @@ -489,6 +507,7 @@ func (obs *CaduceusOutboundSender) send(urls *ring.Ring, secret, acceptType stri "id", obs.id, "panic", r) } obs.workers.Release() + obs.currentWorkersGauge.Add(-1.0) }() payload := msg.Payload @@ -594,6 +613,7 @@ func (obs *CaduceusOutboundSender) send(urls *ring.Ring, secret, acceptType stri func (obs *CaduceusOutboundSender) queueOverflow() { obs.mutex.Lock() obs.dropUntil = time.Now().Add(obs.cutOffPeriod) + obs.dropUntilGauge.Set(float64(obs.dropUntil.Unix())) secret := obs.listener.Config.Secret failureMsg := obs.failureMsg failureURL := obs.listener.FailureURL diff --git a/src/caduceus/outboundSender_test.go b/src/caduceus/outboundSender_test.go index ed6c6957..57d90293 100644 --- a/src/caduceus/outboundSender_test.go +++ b/src/caduceus/outboundSender_test.go @@ -120,6 +120,7 @@ func simpleFactorySetup(trans *transport, cutOffPeriod time.Duration, matcher [] fakeDroppedSlow.On("With", []string{"url", w.Config.URL, "reason", "queue_full"}).Return(fakeDroppedSlow) fakeDroppedSlow.On("With", []string{"url", w.Config.URL, "reason", "cut_off"}).Return(fakeDroppedSlow) fakeDroppedSlow.On("With", []string{"url", w.Config.URL, "reason", "expired"}).Return(fakeDroppedSlow) + fakeDroppedSlow.On("With", []string{"url", w.Config.URL, "reason", "expired_before_queueing"}).Return(fakeDroppedSlow) fakeDroppedSlow.On("With", []string{"url", w.Config.URL, "reason", "invalid_config"}).Return(fakeDroppedSlow) fakeDroppedSlow.On("With", []string{"url", w.Config.URL, "reason", "network_err"}).Return(fakeDroppedSlow) fakeDroppedSlow.On("Add", 1.0).Return() @@ -155,6 +156,12 @@ func simpleFactorySetup(trans *transport, cutOffPeriod time.Duration, matcher [] fakeRegistry.On("NewCounter", IncomingContentTypeCounter).Return(fakeContentType) fakeRegistry.On("NewCounter", DropsDueToPanic).Return(fakePanicDrop) fakeRegistry.On("NewGauge", OutgoingQueueDepth).Return(fakeQdepth) + fakeRegistry.On("NewGauge", DeliveryRetryMaxGauge).Return(fakeQdepth) + fakeRegistry.On("NewGauge", ConsumerRenewalTimeGauge).Return(fakeQdepth) + fakeRegistry.On("NewGauge", ConsumerDeliverUntilGauge).Return(fakeQdepth) + fakeRegistry.On("NewGauge", ConsumerDropUntilGauge).Return(fakeQdepth) + fakeRegistry.On("NewGauge", ConsumerDeliveryWorkersGauge).Return(fakeQdepth) + fakeRegistry.On("NewGauge", ConsumerMaxDeliveryWorkersGauge).Return(fakeQdepth) return &OutboundSenderFactory{ Listener: w, diff --git a/src/caduceus/senderWrapper_test.go b/src/caduceus/senderWrapper_test.go index c962d898..ab838613 100644 --- a/src/caduceus/senderWrapper_test.go +++ b/src/caduceus/senderWrapper_test.go @@ -85,11 +85,13 @@ func getFakeFactory() *SenderWrapperFactory { On("With", []string{"url", "http://localhost:8888/foo", "reason", "cut_off"}).Return(fakeIgnore). On("With", []string{"url", "http://localhost:8888/foo", "reason", "queue_full"}).Return(fakeIgnore). On("With", []string{"url", "http://localhost:8888/foo", "reason", "expired"}).Return(fakeIgnore). + On("With", []string{"url", "http://localhost:8888/foo", "reason", "expired_before_queueing"}).Return(fakeIgnore). On("With", []string{"url", "http://localhost:8888/foo", "reason", "network_err"}).Return(fakeIgnore). On("With", []string{"url", "http://localhost:8888/foo", "reason", "invalid_config"}).Return(fakeIgnore). On("With", []string{"url", "http://localhost:9999/foo", "reason", "cut_off"}).Return(fakeIgnore). On("With", []string{"url", "http://localhost:9999/foo", "reason", "queue_full"}).Return(fakeIgnore). On("With", []string{"url", "http://localhost:9999/foo", "reason", "expired"}).Return(fakeIgnore). + On("With", []string{"url", "http://localhost:9999/foo", "reason", "expired_before_queueing"}).Return(fakeIgnore). On("With", []string{"url", "http://localhost:9999/foo", "reason", "network_err"}).Return(fakeIgnore). On("With", []string{"url", "http://localhost:9999/foo", "reason", "invalid_config"}).Return(fakeIgnore). On("With", []string{"url", "http://localhost:8888/foo", "code", "200", "event", "unknown"}).Return(fakeIgnore). @@ -113,6 +115,12 @@ func getFakeFactory() *SenderWrapperFactory { fakeRegistry.On("NewCounter", IncomingEventTypeCounter).Return(fakeIgnore) fakeRegistry.On("NewCounter", DropsDueToPanic).Return(fakeIgnore) fakeRegistry.On("NewGauge", OutgoingQueueDepth).Return(fakeGauge) + fakeRegistry.On("NewGauge", DeliveryRetryMaxGauge).Return(fakeGauge) + fakeRegistry.On("NewGauge", ConsumerRenewalTimeGauge).Return(fakeGauge) + fakeRegistry.On("NewGauge", ConsumerDeliverUntilGauge).Return(fakeGauge) + fakeRegistry.On("NewGauge", ConsumerDropUntilGauge).Return(fakeGauge) + fakeRegistry.On("NewGauge", ConsumerDeliveryWorkersGauge).Return(fakeGauge) + fakeRegistry.On("NewGauge", ConsumerMaxDeliveryWorkersGauge).Return(fakeGauge) return &SenderWrapperFactory{ NumWorkersPerSender: 10, From 80d71df277cc8308bab939120e04faff57612d19 Mon Sep 17 00:00:00 2001 From: Weston Schmidt Date: Wed, 28 Aug 2019 19:48:11 -0700 Subject: [PATCH 2/2] Update the changelog. --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 26d08e62..86021a90 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). ## [Unreleased] +### Added +- Metrics to support debugging the problem found by GH Issue #145. ## [v0.1.5] fixed build upload