diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 628deefd..5bdb2686 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -74,10 +74,10 @@ const ( MessageDroppedCode = "message_dropped" ) -// MetricsIn will be populated automatically by the ProvideMetrics function -// and then used to populate the Metrics struct -type MetricsIn struct { +// Metrics will be used to set up the metrics for each sink. +type Metrics struct { fx.In + QueryLatency prometheus.ObserverVec `name:"query_duration_histogram_seconds"` DeliveryCounter *prometheus.CounterVec `name:"delivery_count"` DeliveryRetryCounter *prometheus.CounterVec `name:"delivery_retry_count"` @@ -93,23 +93,6 @@ type MetricsIn struct { ConsumerRenewalTimeGauge *prometheus.GaugeVec `name:"consumer_renewal_time"` } -// Metrics will be used to set up the metrics for each sink -type Metrics struct { - DeliveryCounter *prometheus.CounterVec - DeliveryRetryCounter *prometheus.CounterVec - DeliveryRetryMaxGauge *prometheus.GaugeVec - CutOffCounter *prometheus.CounterVec - SlowConsumerDroppedMsgCounter *prometheus.CounterVec - DropsDueToPanic *prometheus.CounterVec - ConsumerDeliverUntilGauge *prometheus.GaugeVec - ConsumerDropUntilGauge *prometheus.GaugeVec - ConsumerDeliveryWorkersGauge *prometheus.GaugeVec - ConsumerMaxDeliveryWorkersGauge *prometheus.GaugeVec - OutgoingQueueDepth *prometheus.GaugeVec - ConsumerRenewalTimeGauge *prometheus.GaugeVec - QueryLatency prometheus.ObserverVec -} - // TODO: do these need to be annonated/broken into groups based on where the metrics are being used/called func Provide() fx.Option { return fx.Options( diff --git a/internal/sink/sink.go b/internal/sink/sink.go index 8af7e476..5c2a5d02 100644 --- a/internal/sink/sink.go +++ b/internal/sink/sink.go @@ -179,7 +179,7 @@ func (whs Webhooks) Send(secret, acceptType string, msg *wrp.Message) error { func (v1 *WebhookV1) Send(secret, acceptType string, msg *wrp.Message) error { defer func() { if r := recover(); nil != r { - // s.droppedPanic.Add(1.0) + // s.DropsDueToPanic.With(prometheus.Labels{metrics.UrlLabel: s.id}).Add(1.0) v1.logger.Error("goroutine send() panicked", zap.String("id", v1.id), zap.Any("panic", r)) } // s.workers.Release() @@ -208,7 +208,7 @@ func (v1 *WebhookV1) Send(secret, acceptType string, msg *wrp.Message) error { req, err := http.NewRequest("POST", v1.urls.Value.(string), payloadReader) if err != nil { // Report drop - // s.droppedInvalidConfig.Add(1.0) + // s.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "invalid_config"}).Add(1.0) v1.logger.Error("Invalid URL", zap.String(metrics.UrlLabel, v1.urls.Value.(string)), zap.String("id", v1.id), zap.Error(err)) return err } @@ -254,8 +254,8 @@ func (v1 *WebhookV1) Send(secret, acceptType string, msg *wrp.Message) error { logger := v1.logger if err != nil { // Report failure - //TODO: add droppedMessage to webhook metrics and remove from sink sender? - // v1.droppedMessage.Add(1.0) + //TODO: add SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: metrics.NetworkError}) to webhook metrics and remove from sink sender? + // v1.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: metrics.NetworkError}).Add(1.0) reason = metrics.GetDoErrReason(err) if resp != nil { code = strconv.Itoa(resp.StatusCode) @@ -264,7 +264,7 @@ func (v1 *WebhookV1) Send(secret, acceptType string, msg *wrp.Message) error { logger = v1.logger.With(zap.String(metrics.ReasonLabel, reason), zap.Error(err)) deliveryCounterLabels = []string{metrics.UrlLabel, req.URL.String(), metrics.ReasonLabel, reason, metrics.CodeLabel, code, metrics.EventLabel, event} fmt.Print(deliveryCounterLabels) - // v1.droppedMessage.With(metrics.UrlLabel, req.URL.String(), metrics.ReasonLabel, reason).Add(1) + // v1.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: metrics.NetworkError}).With(metrics.UrlLabel, req.URL.String(), metrics.ReasonLabel, reason).Add(1) logger.Error("Dropped Network Error", zap.Error(err)) return err } else { @@ -305,8 +305,8 @@ func (v1 *WebhookV1) updateRequest(urls *ring.Ring) func(*http.Request) *http.Re tmp, err := url.Parse(urls.Value.(string)) if err != nil { v1.logger.Error("failed to update url", zap.String(metrics.UrlLabel, urls.Value.(string)), zap.Error(err)) - //TODO: do we add droppedMessage metric to webhook and remove from sink sender? - // v1.droppedMessage.With(metrics.UrlLabel, request.URL.String(), metrics.ReasonLabel, metrics.UpdateRequestURLFailedReason).Add(1) + //TODO: do we add SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: metrics.NetworkError}) metric to webhook and remove from sink sender? + // v1.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: metrics.NetworkError}).With(metrics.UrlLabel, request.URL.String(), metrics.ReasonLabel, metrics.UpdateRequestURLFailedReason).Add(1) } request.URL = tmp return request @@ -318,7 +318,7 @@ func (v1 *WebhookV1) onAttempt(request *http.Request, event string) retry.OnAtte return func(attempt retry.Attempt[*http.Response]) { if attempt.Retries > 0 { fmt.Print(event) - // s.deliveryRetryCounter.With(prometheus.Labels{UrlLabel: v1.id, EventLabel: event}).Add(1.0) + // s.DeliveryRetryCounter.With(prometheus.Labels{UrlLabel: v1.id, EventLabel: event}).Add(1.0) v1.logger.Debug("retrying HTTP transaction", zap.String(metrics.UrlLabel, request.URL.String()), zap.Error(attempt.Err), zap.Int("retry", attempt.Retries+1), zap.Int("statusCode", attempt.Result.StatusCode)) } @@ -377,7 +377,7 @@ func (k *Kafka) send(secret string, acceptType string, msg *wrp.Message) error { defer func() { if r := recover(); nil != r { - // s.droppedPanic.Add(1.0) + // s.DropsDueToPanic.With(prometheus.Labels{metrics.UrlLabel: s.id}).Add(1.0) //TODO: should we be using the RegistrationV2 id for this (canonical_name) //or should we have an id for the specific kafka instance that failed? k.logger.Error("goroutine send() panicked", zap.String("id", k.id), zap.Any("panic", r)) diff --git a/internal/sink/sinkSender.go b/internal/sink/sinkSender.go index cc74c777..cb1d3979 100644 --- a/internal/sink/sinkSender.go +++ b/internal/sink/sinkSender.go @@ -65,27 +65,7 @@ type Sender struct { failureMessage FailureMessage listener ancla.Register matcher Matcher - SinkMetrics -} - -type SinkMetrics struct { - deliverUntilGauge prometheus.Gauge - deliveryRetryMaxGauge prometheus.Gauge - renewalTimeGauge prometheus.Gauge - maxWorkersGauge prometheus.Gauge - // deliveryCounter prometheus.CounterVec - deliveryRetryCounter *prometheus.CounterVec - droppedQueueFullCounter prometheus.Counter - droppedCutoffCounter prometheus.Counter - droppedExpiredCounter prometheus.Counter - droppedExpiredBeforeQueueCounter prometheus.Counter - droppedMessage prometheus.Counter - droppedInvalidConfig prometheus.Counter - droppedPanic prometheus.Counter - cutOffCounter prometheus.Counter - queueDepthGauge prometheus.Gauge - dropUntilGauge prometheus.Gauge - currentWorkersGauge prometheus.Gauge + metrics.Metrics } func NewSender(w *wrapper, l ancla.Register) (s *Sender, err error) { @@ -122,6 +102,7 @@ func NewSender(w *wrapper, l ancla.Register) (s *Sender, err error) { deliveryRetries: w.config.DeliveryRetries, deliveryInterval: w.config.DeliveryInterval, maxWorkers: w.config.NumWorkersPerSender, + Metrics: w.metrics, failureMessage: FailureMessage{ Original: l, Text: failureText, @@ -133,9 +114,8 @@ func NewSender(w *wrapper, l ancla.Register) (s *Sender, err error) { disablePartnerIDs: w.config.DisablePartnerIDs, } - s.CreateMetrics(w.metrics) - s.queueDepthGauge.Set(0) - s.currentWorkersGauge.Set(0) + s.OutgoingQueueDepth.With(prometheus.Labels{metrics.UrlLabel: s.id}).Set(0) + s.ConsumerDeliveryWorkersGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}).Set(0) // Don't share the secret with others when there is an error. hideSecret(s.failureMessage.Original) @@ -157,21 +137,21 @@ func (s *Sender) Update(l ancla.Register) (err error) { s.matcher, err = NewMatcher(l, s.logger) s.sink = NewSink(s.config, s.logger, l) - s.renewalTimeGauge.Set(float64(time.Now().Unix())) + s.ConsumerRenewalTimeGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}).Set(float64(time.Now().Unix())) s.mutex.Lock() defer s.mutex.Unlock() s.deliverUntil = l.GetUntil() - s.deliverUntilGauge.Set(float64(s.deliverUntil.Unix())) - s.deliveryRetryMaxGauge.Set(float64(s.deliveryRetries)) + s.ConsumerDeliverUntilGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}).Set(float64(s.deliverUntil.Unix())) + s.DeliveryRetryMaxGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}).Set(float64(s.deliveryRetries)) s.id = l.GetId() s.listener = l s.failureMessage.Original = l // Update this here in case we make this configurable later - s.maxWorkersGauge.Set(float64(s.maxWorkers)) + s.ConsumerMaxDeliveryWorkersGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}).Set(float64(s.maxWorkers)) return } @@ -214,12 +194,12 @@ func (s *Sender) Queue(msg *wrp.Message) { //TODO: this code will be changing will take away queue and send to the sink interface (not webhook interface) select { case s.queue.Load().(chan *wrp.Message) <- msg: - s.queueDepthGauge.Add(1.0) + s.OutgoingQueueDepth.With(prometheus.Labels{metrics.UrlLabel: s.id}).Add(1.0) s.logger.Debug("event added to outbound queue", zap.String("event.source", msg.Source), zap.String("event.destination", msg.Destination)) default: s.logger.Debug("queue full. event dropped", zap.String("event.source", msg.Source), zap.String("event.destination", msg.Destination)) s.queueOverflow() - s.droppedQueueFullCounter.Add(1.0) + s.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "queue_full"}).Add(1.0) } } @@ -231,15 +211,15 @@ func (s *Sender) Shutdown(gentle bool) { // need to close the channel we're going to replace, in case it doesn't // have any events in it. close(s.queue.Load().(chan *wrp.Message)) - s.Empty(s.droppedExpiredCounter) + s.Empty(s.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "expired"})) } close(s.queue.Load().(chan *wrp.Message)) s.wg.Wait() s.mutex.Lock() s.deliverUntil = time.Time{} - s.deliverUntilGauge.Set(float64(s.deliverUntil.Unix())) - s.queueDepthGauge.Set(0) //just in case + s.ConsumerDeliverUntilGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}).Set(float64(s.deliverUntil.Unix())) + s.OutgoingQueueDepth.With(prometheus.Labels{metrics.UrlLabel: s.id}).Set(0) //just in case s.mutex.Unlock() } @@ -270,13 +250,13 @@ func overlaps(sl1 []string, sl2 []string) bool { func (s *Sender) isValidTimeWindow(now, dropUntil, deliverUntil time.Time) bool { if !now.After(dropUntil) { // client was cut off - s.droppedCutoffCounter.Add(1.0) + s.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "cut_off"}).Add(1.0) return false } if !now.Before(deliverUntil) { // outside delivery window - s.droppedExpiredBeforeQueueCounter.Add(1.0) + s.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "expired_before_queueing"}).Add(1.0) return false } @@ -291,7 +271,7 @@ func (s *Sender) Empty(droppedCounter prometheus.Counter) { droppedMsgs := s.queue.Load().(chan *wrp.Message) s.queue.Store(make(chan *wrp.Message, s.queueSize)) droppedCounter.Add(float64(len(droppedMsgs))) - s.queueDepthGauge.Set(0.0) + s.OutgoingQueueDepth.With(prometheus.Labels{metrics.UrlLabel: s.id}).Set(0.0) } // queueOverflow handles the logic of what to do when a queue overflows: @@ -304,7 +284,7 @@ func (s *Sender) queueOverflow() { return } s.dropUntil = time.Now().Add(s.cutOffPeriod) - s.dropUntilGauge.Set(float64(s.dropUntil.Unix())) + s.ConsumerDropUntilGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}).Set(float64(s.dropUntil.Unix())) //TODO: need to figure this out // secret := s.listener.Webhook.Config.Secret secret := "placeholderSecret" @@ -314,11 +294,11 @@ func (s *Sender) queueOverflow() { failureURL := "placeholderURL" s.mutex.Unlock() - s.cutOffCounter.Add(1.0) + s.CutOffCounter.With(prometheus.Labels{metrics.UrlLabel: s.id}).Add(1.0) // We empty the queue but don't close the channel, because we're not // shutting down. - s.Empty(s.droppedCutoffCounter) + s.Empty(s.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "cut_off"})) msg, err := json.Marshal(failureMsg) if nil != err { @@ -368,7 +348,6 @@ func (s *Sender) queueOverflow() { if nil != resp.Body { io.Copy(io.Discard, resp.Body) resp.Body.Close() - } } @@ -380,7 +359,6 @@ func (s *Sender) dispatcher() { ok bool ) -Loop: for { // Always pull a new queue in case we have been cutoff or are shutting // down. @@ -407,9 +385,9 @@ Loop: // This is only true when a queue is empty and closed, which for us // only happens on Shutdown(). if !ok { - break Loop + break } - s.queueDepthGauge.Add(-1.0) + s.OutgoingQueueDepth.With(prometheus.Labels{metrics.UrlLabel: s.id}).Add(-1.0) s.mutex.RLock() deliverUntil := s.deliverUntil dropUntil := s.dropUntil @@ -420,15 +398,15 @@ Loop: now := time.Now() if now.Before(dropUntil) { - s.droppedCutoffCounter.Add(1.0) + s.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "cut_off"}).Add(1.0) continue } if now.After(deliverUntil) { - s.Empty(s.droppedExpiredCounter) + s.Empty(s.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "expired"})) continue } s.workers.Acquire() - s.currentWorkersGauge.Add(1.0) + s.ConsumerDeliveryWorkersGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}).Add(1.0) go s.sink.Send(secret, accept, msg) } @@ -437,26 +415,6 @@ Loop: } } -func (s *Sender) CreateMetrics(m metrics.Metrics) { - s.deliveryRetryCounter = m.DeliveryRetryCounter - s.deliveryRetryMaxGauge = m.DeliveryRetryMaxGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}) - s.cutOffCounter = m.CutOffCounter.With(prometheus.Labels{metrics.UrlLabel: s.id}) - s.droppedQueueFullCounter = m.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "queue_full"}) - s.droppedExpiredCounter = m.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "expired"}) - s.droppedExpiredBeforeQueueCounter = m.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "expired_before_queueing"}) - s.droppedCutoffCounter = m.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "cut_off"}) - s.droppedInvalidConfig = m.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "invalid_config"}) - s.droppedMessage = m.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: metrics.NetworkError}) - s.droppedPanic = m.DropsDueToPanic.With(prometheus.Labels{metrics.UrlLabel: s.id}) - s.queueDepthGauge = m.OutgoingQueueDepth.With(prometheus.Labels{metrics.UrlLabel: s.id}) - s.renewalTimeGauge = m.ConsumerRenewalTimeGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}) - s.deliverUntilGauge = m.ConsumerDeliverUntilGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}) - s.dropUntilGauge = m.ConsumerDropUntilGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}) - s.currentWorkersGauge = m.ConsumerDeliveryWorkersGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}) - s.maxWorkersGauge = m.ConsumerMaxDeliveryWorkersGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}) - -} - func getPartnerIds(l ancla.Register) ([]string, error) { switch v := l.(type) { case *ancla.RegistryV1: diff --git a/internal/sink/sinkWrapper.go b/internal/sink/sinkWrapper.go index 34859dc7..d7675997 100644 --- a/internal/sink/sinkWrapper.go +++ b/internal/sink/sinkWrapper.go @@ -26,11 +26,12 @@ import ( type WrapperIn struct { fx.In - Tracing candlelight.Tracing - Config Config - Metrics metrics.Metrics + Tracing candlelight.Tracing + Config Config + // Metrics metrics.MetricsIn EventType *prometheus.CounterVec `name:"incoming_event_type_count"` Logger *zap.Logger + metrics.Metrics } // SinkWrapper interface is needed for unit testing. @@ -65,24 +66,6 @@ type wrapper struct { func Provide() fx.Option { return fx.Provide( - func(in metrics.MetricsIn) metrics.Metrics { - senderMetrics := metrics.Metrics{ - DeliveryCounter: in.DeliveryCounter, - DeliveryRetryCounter: in.DeliveryRetryCounter, - DeliveryRetryMaxGauge: in.DeliveryRetryMaxGauge, - CutOffCounter: in.CutOffCounter, - SlowConsumerDroppedMsgCounter: in.SlowConsumerDroppedMsgCounter, - DropsDueToPanic: in.DropsDueToPanic, - ConsumerDeliverUntilGauge: in.ConsumerDeliverUntilGauge, - ConsumerDropUntilGauge: in.ConsumerDropUntilGauge, - ConsumerDeliveryWorkersGauge: in.ConsumerDeliveryWorkersGauge, - ConsumerMaxDeliveryWorkersGauge: in.ConsumerMaxDeliveryWorkersGauge, - OutgoingQueueDepth: in.OutgoingQueueDepth, - ConsumerRenewalTimeGauge: in.ConsumerRenewalTimeGauge, - QueryLatency: in.QueryLatency, - } - return senderMetrics - }, func(in WrapperIn) (Wrapper, error) { w, err := NewWrapper(in) return w, err