Skip to content

Commit

Permalink
chore: simplify metrics package
Browse files Browse the repository at this point in the history
- reduce the overall cognitive load of the metric package and how it's used
- reduce redundancy (redeclaring of metric related structs and fields)
  • Loading branch information
denopink committed Nov 5, 2024
1 parent abb3c3e commit 5b4b8e0
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 116 deletions.
23 changes: 3 additions & 20 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ const (
MessageDroppedCode = "message_dropped"
)

// MetricsIn will be populated automatically by the ProvideMetrics function
// and then used to populate the Metrics struct
type MetricsIn struct {
// Metrics will be used to set up the metrics for each sink.
type Metrics struct {
fx.In

QueryLatency prometheus.ObserverVec `name:"query_duration_histogram_seconds"`
DeliveryCounter *prometheus.CounterVec `name:"delivery_count"`
DeliveryRetryCounter *prometheus.CounterVec `name:"delivery_retry_count"`
Expand All @@ -93,23 +93,6 @@ type MetricsIn struct {
ConsumerRenewalTimeGauge *prometheus.GaugeVec `name:"consumer_renewal_time"`
}

// Metrics will be used to set up the metrics for each sink
type Metrics struct {
DeliveryCounter *prometheus.CounterVec
DeliveryRetryCounter *prometheus.CounterVec
DeliveryRetryMaxGauge *prometheus.GaugeVec
CutOffCounter *prometheus.CounterVec
SlowConsumerDroppedMsgCounter *prometheus.CounterVec
DropsDueToPanic *prometheus.CounterVec
ConsumerDeliverUntilGauge *prometheus.GaugeVec
ConsumerDropUntilGauge *prometheus.GaugeVec
ConsumerDeliveryWorkersGauge *prometheus.GaugeVec
ConsumerMaxDeliveryWorkersGauge *prometheus.GaugeVec
OutgoingQueueDepth *prometheus.GaugeVec
ConsumerRenewalTimeGauge *prometheus.GaugeVec
QueryLatency prometheus.ObserverVec
}

// TODO: do these need to be annonated/broken into groups based on where the metrics are being used/called
func Provide() fx.Option {
return fx.Options(
Expand Down
18 changes: 9 additions & 9 deletions internal/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (whs Webhooks) Send(secret, acceptType string, msg *wrp.Message) error {
func (v1 *WebhookV1) Send(secret, acceptType string, msg *wrp.Message) error {
defer func() {
if r := recover(); nil != r {
// s.droppedPanic.Add(1.0)
// s.DropsDueToPanic.With(prometheus.Labels{metrics.UrlLabel: s.id}).Add(1.0)
v1.logger.Error("goroutine send() panicked", zap.String("id", v1.id), zap.Any("panic", r))
}
// s.workers.Release()
Expand Down Expand Up @@ -208,7 +208,7 @@ func (v1 *WebhookV1) Send(secret, acceptType string, msg *wrp.Message) error {
req, err := http.NewRequest("POST", v1.urls.Value.(string), payloadReader)
if err != nil {
// Report drop
// s.droppedInvalidConfig.Add(1.0)
// s.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "invalid_config"}).Add(1.0)
v1.logger.Error("Invalid URL", zap.String(metrics.UrlLabel, v1.urls.Value.(string)), zap.String("id", v1.id), zap.Error(err))
return err
}
Expand Down Expand Up @@ -254,8 +254,8 @@ func (v1 *WebhookV1) Send(secret, acceptType string, msg *wrp.Message) error {
logger := v1.logger
if err != nil {
// Report failure
//TODO: add droppedMessage to webhook metrics and remove from sink sender?
// v1.droppedMessage.Add(1.0)
//TODO: add SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: metrics.NetworkError}) to webhook metrics and remove from sink sender?
// v1.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: metrics.NetworkError}).Add(1.0)
reason = metrics.GetDoErrReason(err)
if resp != nil {
code = strconv.Itoa(resp.StatusCode)
Expand All @@ -264,7 +264,7 @@ func (v1 *WebhookV1) Send(secret, acceptType string, msg *wrp.Message) error {
logger = v1.logger.With(zap.String(metrics.ReasonLabel, reason), zap.Error(err))
deliveryCounterLabels = []string{metrics.UrlLabel, req.URL.String(), metrics.ReasonLabel, reason, metrics.CodeLabel, code, metrics.EventLabel, event}
fmt.Print(deliveryCounterLabels)
// v1.droppedMessage.With(metrics.UrlLabel, req.URL.String(), metrics.ReasonLabel, reason).Add(1)
// v1.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: metrics.NetworkError}).With(metrics.UrlLabel, req.URL.String(), metrics.ReasonLabel, reason).Add(1)
logger.Error("Dropped Network Error", zap.Error(err))
return err
} else {
Expand Down Expand Up @@ -305,8 +305,8 @@ func (v1 *WebhookV1) updateRequest(urls *ring.Ring) func(*http.Request) *http.Re
tmp, err := url.Parse(urls.Value.(string))
if err != nil {
v1.logger.Error("failed to update url", zap.String(metrics.UrlLabel, urls.Value.(string)), zap.Error(err))
//TODO: do we add droppedMessage metric to webhook and remove from sink sender?
// v1.droppedMessage.With(metrics.UrlLabel, request.URL.String(), metrics.ReasonLabel, metrics.UpdateRequestURLFailedReason).Add(1)
//TODO: do we add SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: metrics.NetworkError}) metric to webhook and remove from sink sender?
// v1.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: metrics.NetworkError}).With(metrics.UrlLabel, request.URL.String(), metrics.ReasonLabel, metrics.UpdateRequestURLFailedReason).Add(1)
}
request.URL = tmp
return request
Expand All @@ -318,7 +318,7 @@ func (v1 *WebhookV1) onAttempt(request *http.Request, event string) retry.OnAtte
return func(attempt retry.Attempt[*http.Response]) {
if attempt.Retries > 0 {
fmt.Print(event)
// s.deliveryRetryCounter.With(prometheus.Labels{UrlLabel: v1.id, EventLabel: event}).Add(1.0)
// s.DeliveryRetryCounter.With(prometheus.Labels{UrlLabel: v1.id, EventLabel: event}).Add(1.0)
v1.logger.Debug("retrying HTTP transaction", zap.String(metrics.UrlLabel, request.URL.String()), zap.Error(attempt.Err), zap.Int("retry", attempt.Retries+1), zap.Int("statusCode", attempt.Result.StatusCode))
}

Expand Down Expand Up @@ -377,7 +377,7 @@ func (k *Kafka) send(secret string, acceptType string, msg *wrp.Message) error {

defer func() {
if r := recover(); nil != r {
// s.droppedPanic.Add(1.0)
// s.DropsDueToPanic.With(prometheus.Labels{metrics.UrlLabel: s.id}).Add(1.0)
//TODO: should we be using the RegistrationV2 id for this (canonical_name)
//or should we have an id for the specific kafka instance that failed?
k.logger.Error("goroutine send() panicked", zap.String("id", k.id), zap.Any("panic", r))
Expand Down
90 changes: 24 additions & 66 deletions internal/sink/sinkSender.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,27 +65,7 @@ type Sender struct {
failureMessage FailureMessage
listener ancla.Register
matcher Matcher
SinkMetrics
}

type SinkMetrics struct {
deliverUntilGauge prometheus.Gauge
deliveryRetryMaxGauge prometheus.Gauge
renewalTimeGauge prometheus.Gauge
maxWorkersGauge prometheus.Gauge
// deliveryCounter prometheus.CounterVec
deliveryRetryCounter *prometheus.CounterVec
droppedQueueFullCounter prometheus.Counter
droppedCutoffCounter prometheus.Counter
droppedExpiredCounter prometheus.Counter
droppedExpiredBeforeQueueCounter prometheus.Counter
droppedMessage prometheus.Counter
droppedInvalidConfig prometheus.Counter
droppedPanic prometheus.Counter
cutOffCounter prometheus.Counter
queueDepthGauge prometheus.Gauge
dropUntilGauge prometheus.Gauge
currentWorkersGauge prometheus.Gauge
metrics.Metrics
}

func NewSender(w *wrapper, l ancla.Register) (s *Sender, err error) {
Expand Down Expand Up @@ -122,6 +102,7 @@ func NewSender(w *wrapper, l ancla.Register) (s *Sender, err error) {
deliveryRetries: w.config.DeliveryRetries,
deliveryInterval: w.config.DeliveryInterval,
maxWorkers: w.config.NumWorkersPerSender,
Metrics: w.metrics,
failureMessage: FailureMessage{
Original: l,
Text: failureText,
Expand All @@ -133,9 +114,8 @@ func NewSender(w *wrapper, l ancla.Register) (s *Sender, err error) {
disablePartnerIDs: w.config.DisablePartnerIDs,
}

s.CreateMetrics(w.metrics)
s.queueDepthGauge.Set(0)
s.currentWorkersGauge.Set(0)
s.OutgoingQueueDepth.With(prometheus.Labels{metrics.UrlLabel: s.id}).Set(0)
s.ConsumerDeliveryWorkersGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}).Set(0)

// Don't share the secret with others when there is an error.
hideSecret(s.failureMessage.Original)
Expand All @@ -157,21 +137,21 @@ func (s *Sender) Update(l ancla.Register) (err error) {
s.matcher, err = NewMatcher(l, s.logger)
s.sink = NewSink(s.config, s.logger, l)

s.renewalTimeGauge.Set(float64(time.Now().Unix()))
s.ConsumerRenewalTimeGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}).Set(float64(time.Now().Unix()))

s.mutex.Lock()
defer s.mutex.Unlock()

s.deliverUntil = l.GetUntil()
s.deliverUntilGauge.Set(float64(s.deliverUntil.Unix()))
s.deliveryRetryMaxGauge.Set(float64(s.deliveryRetries))
s.ConsumerDeliverUntilGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}).Set(float64(s.deliverUntil.Unix()))
s.DeliveryRetryMaxGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}).Set(float64(s.deliveryRetries))

s.id = l.GetId()
s.listener = l
s.failureMessage.Original = l

// Update this here in case we make this configurable later
s.maxWorkersGauge.Set(float64(s.maxWorkers))
s.ConsumerMaxDeliveryWorkersGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}).Set(float64(s.maxWorkers))

return
}
Expand Down Expand Up @@ -214,12 +194,12 @@ func (s *Sender) Queue(msg *wrp.Message) {
//TODO: this code will be changing will take away queue and send to the sink interface (not webhook interface)
select {
case s.queue.Load().(chan *wrp.Message) <- msg:
s.queueDepthGauge.Add(1.0)
s.OutgoingQueueDepth.With(prometheus.Labels{metrics.UrlLabel: s.id}).Add(1.0)
s.logger.Debug("event added to outbound queue", zap.String("event.source", msg.Source), zap.String("event.destination", msg.Destination))
default:
s.logger.Debug("queue full. event dropped", zap.String("event.source", msg.Source), zap.String("event.destination", msg.Destination))
s.queueOverflow()
s.droppedQueueFullCounter.Add(1.0)
s.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "queue_full"}).Add(1.0)
}
}

Expand All @@ -231,15 +211,15 @@ func (s *Sender) Shutdown(gentle bool) {
// need to close the channel we're going to replace, in case it doesn't
// have any events in it.
close(s.queue.Load().(chan *wrp.Message))
s.Empty(s.droppedExpiredCounter)
s.Empty(s.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "expired"}))
}
close(s.queue.Load().(chan *wrp.Message))
s.wg.Wait()

s.mutex.Lock()
s.deliverUntil = time.Time{}
s.deliverUntilGauge.Set(float64(s.deliverUntil.Unix()))
s.queueDepthGauge.Set(0) //just in case
s.ConsumerDeliverUntilGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}).Set(float64(s.deliverUntil.Unix()))
s.OutgoingQueueDepth.With(prometheus.Labels{metrics.UrlLabel: s.id}).Set(0) //just in case
s.mutex.Unlock()
}

Expand Down Expand Up @@ -270,13 +250,13 @@ func overlaps(sl1 []string, sl2 []string) bool {
func (s *Sender) isValidTimeWindow(now, dropUntil, deliverUntil time.Time) bool {
if !now.After(dropUntil) {
// client was cut off
s.droppedCutoffCounter.Add(1.0)
s.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "cut_off"}).Add(1.0)
return false
}

if !now.Before(deliverUntil) {
// outside delivery window
s.droppedExpiredBeforeQueueCounter.Add(1.0)
s.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "expired_before_queueing"}).Add(1.0)
return false
}

Expand All @@ -291,7 +271,7 @@ func (s *Sender) Empty(droppedCounter prometheus.Counter) {
droppedMsgs := s.queue.Load().(chan *wrp.Message)
s.queue.Store(make(chan *wrp.Message, s.queueSize))
droppedCounter.Add(float64(len(droppedMsgs)))
s.queueDepthGauge.Set(0.0)
s.OutgoingQueueDepth.With(prometheus.Labels{metrics.UrlLabel: s.id}).Set(0.0)
}

// queueOverflow handles the logic of what to do when a queue overflows:
Expand All @@ -304,7 +284,7 @@ func (s *Sender) queueOverflow() {
return
}
s.dropUntil = time.Now().Add(s.cutOffPeriod)
s.dropUntilGauge.Set(float64(s.dropUntil.Unix()))
s.ConsumerDropUntilGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}).Set(float64(s.dropUntil.Unix()))
//TODO: need to figure this out
// secret := s.listener.Webhook.Config.Secret
secret := "placeholderSecret"
Expand All @@ -314,11 +294,11 @@ func (s *Sender) queueOverflow() {
failureURL := "placeholderURL"
s.mutex.Unlock()

s.cutOffCounter.Add(1.0)
s.CutOffCounter.With(prometheus.Labels{metrics.UrlLabel: s.id}).Add(1.0)

// We empty the queue but don't close the channel, because we're not
// shutting down.
s.Empty(s.droppedCutoffCounter)
s.Empty(s.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "cut_off"}))

msg, err := json.Marshal(failureMsg)
if nil != err {
Expand Down Expand Up @@ -368,7 +348,6 @@ func (s *Sender) queueOverflow() {
if nil != resp.Body {
io.Copy(io.Discard, resp.Body)
resp.Body.Close()

}
}

Expand All @@ -380,7 +359,6 @@ func (s *Sender) dispatcher() {
ok bool
)

Loop:
for {
// Always pull a new queue in case we have been cutoff or are shutting
// down.
Expand All @@ -407,9 +385,9 @@ Loop:
// This is only true when a queue is empty and closed, which for us
// only happens on Shutdown().
if !ok {
break Loop
break
}
s.queueDepthGauge.Add(-1.0)
s.OutgoingQueueDepth.With(prometheus.Labels{metrics.UrlLabel: s.id}).Add(-1.0)
s.mutex.RLock()
deliverUntil := s.deliverUntil
dropUntil := s.dropUntil
Expand All @@ -420,15 +398,15 @@ Loop:
now := time.Now()

if now.Before(dropUntil) {
s.droppedCutoffCounter.Add(1.0)
s.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "cut_off"}).Add(1.0)
continue
}
if now.After(deliverUntil) {
s.Empty(s.droppedExpiredCounter)
s.Empty(s.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "expired"}))
continue
}
s.workers.Acquire()
s.currentWorkersGauge.Add(1.0)
s.ConsumerDeliveryWorkersGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}).Add(1.0)

go s.sink.Send(secret, accept, msg)
}
Expand All @@ -437,26 +415,6 @@ Loop:
}
}

func (s *Sender) CreateMetrics(m metrics.Metrics) {
s.deliveryRetryCounter = m.DeliveryRetryCounter
s.deliveryRetryMaxGauge = m.DeliveryRetryMaxGauge.With(prometheus.Labels{metrics.UrlLabel: s.id})
s.cutOffCounter = m.CutOffCounter.With(prometheus.Labels{metrics.UrlLabel: s.id})
s.droppedQueueFullCounter = m.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "queue_full"})
s.droppedExpiredCounter = m.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "expired"})
s.droppedExpiredBeforeQueueCounter = m.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "expired_before_queueing"})
s.droppedCutoffCounter = m.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "cut_off"})
s.droppedInvalidConfig = m.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "invalid_config"})
s.droppedMessage = m.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: metrics.NetworkError})
s.droppedPanic = m.DropsDueToPanic.With(prometheus.Labels{metrics.UrlLabel: s.id})
s.queueDepthGauge = m.OutgoingQueueDepth.With(prometheus.Labels{metrics.UrlLabel: s.id})
s.renewalTimeGauge = m.ConsumerRenewalTimeGauge.With(prometheus.Labels{metrics.UrlLabel: s.id})
s.deliverUntilGauge = m.ConsumerDeliverUntilGauge.With(prometheus.Labels{metrics.UrlLabel: s.id})
s.dropUntilGauge = m.ConsumerDropUntilGauge.With(prometheus.Labels{metrics.UrlLabel: s.id})
s.currentWorkersGauge = m.ConsumerDeliveryWorkersGauge.With(prometheus.Labels{metrics.UrlLabel: s.id})
s.maxWorkersGauge = m.ConsumerMaxDeliveryWorkersGauge.With(prometheus.Labels{metrics.UrlLabel: s.id})

}

func getPartnerIds(l ancla.Register) ([]string, error) {
switch v := l.(type) {
case *ancla.RegistryV1:
Expand Down
25 changes: 4 additions & 21 deletions internal/sink/sinkWrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ import (
type WrapperIn struct {
fx.In

Tracing candlelight.Tracing
Config Config
Metrics metrics.Metrics
Tracing candlelight.Tracing
Config Config
// Metrics metrics.MetricsIn
EventType *prometheus.CounterVec `name:"incoming_event_type_count"`
Logger *zap.Logger
metrics.Metrics
}

// SinkWrapper interface is needed for unit testing.
Expand Down Expand Up @@ -65,24 +66,6 @@ type wrapper struct {

func Provide() fx.Option {
return fx.Provide(
func(in metrics.MetricsIn) metrics.Metrics {
senderMetrics := metrics.Metrics{
DeliveryCounter: in.DeliveryCounter,
DeliveryRetryCounter: in.DeliveryRetryCounter,
DeliveryRetryMaxGauge: in.DeliveryRetryMaxGauge,
CutOffCounter: in.CutOffCounter,
SlowConsumerDroppedMsgCounter: in.SlowConsumerDroppedMsgCounter,
DropsDueToPanic: in.DropsDueToPanic,
ConsumerDeliverUntilGauge: in.ConsumerDeliverUntilGauge,
ConsumerDropUntilGauge: in.ConsumerDropUntilGauge,
ConsumerDeliveryWorkersGauge: in.ConsumerDeliveryWorkersGauge,
ConsumerMaxDeliveryWorkersGauge: in.ConsumerMaxDeliveryWorkersGauge,
OutgoingQueueDepth: in.OutgoingQueueDepth,
ConsumerRenewalTimeGauge: in.ConsumerRenewalTimeGauge,
QueryLatency: in.QueryLatency,
}
return senderMetrics
},
func(in WrapperIn) (Wrapper, error) {
w, err := NewWrapper(in)
return w, err
Expand Down

0 comments on commit 5b4b8e0

Please sign in to comment.