Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: simplify metrics package #575

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 3 additions & 20 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ const (
MessageDroppedCode = "message_dropped"
)

// MetricsIn will be populated automatically by the ProvideMetrics function
// and then used to populate the Metrics struct
type MetricsIn struct {
// Metrics will be used to set up the metrics for each sink.
type Metrics struct {
fx.In

QueryLatency prometheus.ObserverVec `name:"query_duration_histogram_seconds"`
DeliveryCounter *prometheus.CounterVec `name:"delivery_count"`
DeliveryRetryCounter *prometheus.CounterVec `name:"delivery_retry_count"`
Expand All @@ -93,23 +93,6 @@ type MetricsIn struct {
ConsumerRenewalTimeGauge *prometheus.GaugeVec `name:"consumer_renewal_time"`
}

// Metrics will be used to set up the metrics for each sink
type Metrics struct {
DeliveryCounter *prometheus.CounterVec
DeliveryRetryCounter *prometheus.CounterVec
DeliveryRetryMaxGauge *prometheus.GaugeVec
CutOffCounter *prometheus.CounterVec
SlowConsumerDroppedMsgCounter *prometheus.CounterVec
DropsDueToPanic *prometheus.CounterVec
ConsumerDeliverUntilGauge *prometheus.GaugeVec
ConsumerDropUntilGauge *prometheus.GaugeVec
ConsumerDeliveryWorkersGauge *prometheus.GaugeVec
ConsumerMaxDeliveryWorkersGauge *prometheus.GaugeVec
OutgoingQueueDepth *prometheus.GaugeVec
ConsumerRenewalTimeGauge *prometheus.GaugeVec
QueryLatency prometheus.ObserverVec
}

// TODO: do these need to be annonated/broken into groups based on where the metrics are being used/called
func Provide() fx.Option {
return fx.Options(
Expand Down
18 changes: 9 additions & 9 deletions internal/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (whs Webhooks) Send(secret, acceptType string, msg *wrp.Message) error {
func (v1 *WebhookV1) Send(secret, acceptType string, msg *wrp.Message) error {
defer func() {
if r := recover(); nil != r {
// s.droppedPanic.Add(1.0)
// s.DropsDueToPanic.With(prometheus.Labels{metrics.UrlLabel: s.id}).Add(1.0)
v1.logger.Error("goroutine send() panicked", zap.String("id", v1.id), zap.Any("panic", r))
}
// s.workers.Release()
Expand Down Expand Up @@ -208,7 +208,7 @@ func (v1 *WebhookV1) Send(secret, acceptType string, msg *wrp.Message) error {
req, err := http.NewRequest("POST", v1.urls.Value.(string), payloadReader)
if err != nil {
// Report drop
// s.droppedInvalidConfig.Add(1.0)
// s.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "invalid_config"}).Add(1.0)
v1.logger.Error("Invalid URL", zap.String(metrics.UrlLabel, v1.urls.Value.(string)), zap.String("id", v1.id), zap.Error(err))
return err
}
Expand Down Expand Up @@ -254,8 +254,8 @@ func (v1 *WebhookV1) Send(secret, acceptType string, msg *wrp.Message) error {
logger := v1.logger
if err != nil {
// Report failure
//TODO: add droppedMessage to webhook metrics and remove from sink sender?
// v1.droppedMessage.Add(1.0)
//TODO: add SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: metrics.NetworkError}) to webhook metrics and remove from sink sender?
// v1.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: metrics.NetworkError}).Add(1.0)
reason = metrics.GetDoErrReason(err)
if resp != nil {
code = strconv.Itoa(resp.StatusCode)
Expand All @@ -264,7 +264,7 @@ func (v1 *WebhookV1) Send(secret, acceptType string, msg *wrp.Message) error {
logger = v1.logger.With(zap.String(metrics.ReasonLabel, reason), zap.Error(err))
deliveryCounterLabels = []string{metrics.UrlLabel, req.URL.String(), metrics.ReasonLabel, reason, metrics.CodeLabel, code, metrics.EventLabel, event}
fmt.Print(deliveryCounterLabels)
// v1.droppedMessage.With(metrics.UrlLabel, req.URL.String(), metrics.ReasonLabel, reason).Add(1)
// v1.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: metrics.NetworkError}).With(metrics.UrlLabel, req.URL.String(), metrics.ReasonLabel, reason).Add(1)
logger.Error("Dropped Network Error", zap.Error(err))
return err
} else {
Expand Down Expand Up @@ -305,8 +305,8 @@ func (v1 *WebhookV1) updateRequest(urls *ring.Ring) func(*http.Request) *http.Re
tmp, err := url.Parse(urls.Value.(string))
if err != nil {
v1.logger.Error("failed to update url", zap.String(metrics.UrlLabel, urls.Value.(string)), zap.Error(err))
//TODO: do we add droppedMessage metric to webhook and remove from sink sender?
// v1.droppedMessage.With(metrics.UrlLabel, request.URL.String(), metrics.ReasonLabel, metrics.UpdateRequestURLFailedReason).Add(1)
//TODO: do we add SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: metrics.NetworkError}) metric to webhook and remove from sink sender?
// v1.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: metrics.NetworkError}).With(metrics.UrlLabel, request.URL.String(), metrics.ReasonLabel, metrics.UpdateRequestURLFailedReason).Add(1)
}
request.URL = tmp
return request
Expand All @@ -318,7 +318,7 @@ func (v1 *WebhookV1) onAttempt(request *http.Request, event string) retry.OnAtte
return func(attempt retry.Attempt[*http.Response]) {
if attempt.Retries > 0 {
fmt.Print(event)
// s.deliveryRetryCounter.With(prometheus.Labels{UrlLabel: v1.id, EventLabel: event}).Add(1.0)
// s.DeliveryRetryCounter.With(prometheus.Labels{UrlLabel: v1.id, EventLabel: event}).Add(1.0)
v1.logger.Debug("retrying HTTP transaction", zap.String(metrics.UrlLabel, request.URL.String()), zap.Error(attempt.Err), zap.Int("retry", attempt.Retries+1), zap.Int("statusCode", attempt.Result.StatusCode))
}

Expand Down Expand Up @@ -377,7 +377,7 @@ func (k *Kafka) send(secret string, acceptType string, msg *wrp.Message) error {

defer func() {
if r := recover(); nil != r {
// s.droppedPanic.Add(1.0)
// s.DropsDueToPanic.With(prometheus.Labels{metrics.UrlLabel: s.id}).Add(1.0)
//TODO: should we be using the RegistrationV2 id for this (canonical_name)
//or should we have an id for the specific kafka instance that failed?
k.logger.Error("goroutine send() panicked", zap.String("id", k.id), zap.Any("panic", r))
Expand Down
90 changes: 24 additions & 66 deletions internal/sink/sinkSender.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,27 +65,7 @@ type Sender struct {
failureMessage FailureMessage
listener ancla.Register
matcher Matcher
SinkMetrics
}

type SinkMetrics struct {
deliverUntilGauge prometheus.Gauge
deliveryRetryMaxGauge prometheus.Gauge
renewalTimeGauge prometheus.Gauge
maxWorkersGauge prometheus.Gauge
// deliveryCounter prometheus.CounterVec
deliveryRetryCounter *prometheus.CounterVec
droppedQueueFullCounter prometheus.Counter
droppedCutoffCounter prometheus.Counter
droppedExpiredCounter prometheus.Counter
droppedExpiredBeforeQueueCounter prometheus.Counter
droppedMessage prometheus.Counter
droppedInvalidConfig prometheus.Counter
droppedPanic prometheus.Counter
cutOffCounter prometheus.Counter
queueDepthGauge prometheus.Gauge
dropUntilGauge prometheus.Gauge
currentWorkersGauge prometheus.Gauge
metrics.Metrics
}

func NewSender(w *wrapper, l ancla.Register) (s *Sender, err error) {
Expand Down Expand Up @@ -122,6 +102,7 @@ func NewSender(w *wrapper, l ancla.Register) (s *Sender, err error) {
deliveryRetries: w.config.DeliveryRetries,
deliveryInterval: w.config.DeliveryInterval,
maxWorkers: w.config.NumWorkersPerSender,
Metrics: w.metrics,
failureMessage: FailureMessage{
Original: l,
Text: failureText,
Expand All @@ -133,9 +114,8 @@ func NewSender(w *wrapper, l ancla.Register) (s *Sender, err error) {
disablePartnerIDs: w.config.DisablePartnerIDs,
}

s.CreateMetrics(w.metrics)
s.queueDepthGauge.Set(0)
s.currentWorkersGauge.Set(0)
s.OutgoingQueueDepth.With(prometheus.Labels{metrics.UrlLabel: s.id}).Set(0)
s.ConsumerDeliveryWorkersGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}).Set(0)

// Don't share the secret with others when there is an error.
hideSecret(s.failureMessage.Original)
Expand All @@ -157,21 +137,21 @@ func (s *Sender) Update(l ancla.Register) (err error) {
s.matcher, err = NewMatcher(l, s.logger)
s.sink = NewSink(s.config, s.logger, l)

s.renewalTimeGauge.Set(float64(time.Now().Unix()))
s.ConsumerRenewalTimeGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}).Set(float64(time.Now().Unix()))

s.mutex.Lock()
defer s.mutex.Unlock()

s.deliverUntil = l.GetUntil()
s.deliverUntilGauge.Set(float64(s.deliverUntil.Unix()))
s.deliveryRetryMaxGauge.Set(float64(s.deliveryRetries))
s.ConsumerDeliverUntilGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}).Set(float64(s.deliverUntil.Unix()))
s.DeliveryRetryMaxGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}).Set(float64(s.deliveryRetries))

s.id = l.GetId()
s.listener = l
s.failureMessage.Original = l

// Update this here in case we make this configurable later
s.maxWorkersGauge.Set(float64(s.maxWorkers))
s.ConsumerMaxDeliveryWorkersGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}).Set(float64(s.maxWorkers))

return
}
Expand Down Expand Up @@ -214,12 +194,12 @@ func (s *Sender) Queue(msg *wrp.Message) {
//TODO: this code will be changing will take away queue and send to the sink interface (not webhook interface)
select {
case s.queue.Load().(chan *wrp.Message) <- msg:
s.queueDepthGauge.Add(1.0)
s.OutgoingQueueDepth.With(prometheus.Labels{metrics.UrlLabel: s.id}).Add(1.0)
s.logger.Debug("event added to outbound queue", zap.String("event.source", msg.Source), zap.String("event.destination", msg.Destination))
default:
s.logger.Debug("queue full. event dropped", zap.String("event.source", msg.Source), zap.String("event.destination", msg.Destination))
s.queueOverflow()
s.droppedQueueFullCounter.Add(1.0)
s.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "queue_full"}).Add(1.0)
}
}

Expand All @@ -231,15 +211,15 @@ func (s *Sender) Shutdown(gentle bool) {
// need to close the channel we're going to replace, in case it doesn't
// have any events in it.
close(s.queue.Load().(chan *wrp.Message))
s.Empty(s.droppedExpiredCounter)
s.Empty(s.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "expired"}))
}
close(s.queue.Load().(chan *wrp.Message))
s.wg.Wait()

s.mutex.Lock()
s.deliverUntil = time.Time{}
s.deliverUntilGauge.Set(float64(s.deliverUntil.Unix()))
s.queueDepthGauge.Set(0) //just in case
s.ConsumerDeliverUntilGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}).Set(float64(s.deliverUntil.Unix()))
s.OutgoingQueueDepth.With(prometheus.Labels{metrics.UrlLabel: s.id}).Set(0) //just in case
s.mutex.Unlock()
}

Expand Down Expand Up @@ -270,13 +250,13 @@ func overlaps(sl1 []string, sl2 []string) bool {
func (s *Sender) isValidTimeWindow(now, dropUntil, deliverUntil time.Time) bool {
if !now.After(dropUntil) {
// client was cut off
s.droppedCutoffCounter.Add(1.0)
s.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "cut_off"}).Add(1.0)
return false
}

if !now.Before(deliverUntil) {
// outside delivery window
s.droppedExpiredBeforeQueueCounter.Add(1.0)
s.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "expired_before_queueing"}).Add(1.0)
return false
}

Expand All @@ -291,7 +271,7 @@ func (s *Sender) Empty(droppedCounter prometheus.Counter) {
droppedMsgs := s.queue.Load().(chan *wrp.Message)
s.queue.Store(make(chan *wrp.Message, s.queueSize))
droppedCounter.Add(float64(len(droppedMsgs)))
s.queueDepthGauge.Set(0.0)
s.OutgoingQueueDepth.With(prometheus.Labels{metrics.UrlLabel: s.id}).Set(0.0)
}

// queueOverflow handles the logic of what to do when a queue overflows:
Expand All @@ -304,7 +284,7 @@ func (s *Sender) queueOverflow() {
return
}
s.dropUntil = time.Now().Add(s.cutOffPeriod)
s.dropUntilGauge.Set(float64(s.dropUntil.Unix()))
s.ConsumerDropUntilGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}).Set(float64(s.dropUntil.Unix()))
//TODO: need to figure this out
// secret := s.listener.Webhook.Config.Secret
secret := "placeholderSecret"
Expand All @@ -314,11 +294,11 @@ func (s *Sender) queueOverflow() {
failureURL := "placeholderURL"
s.mutex.Unlock()

s.cutOffCounter.Add(1.0)
s.CutOffCounter.With(prometheus.Labels{metrics.UrlLabel: s.id}).Add(1.0)

// We empty the queue but don't close the channel, because we're not
// shutting down.
s.Empty(s.droppedCutoffCounter)
s.Empty(s.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "cut_off"}))

msg, err := json.Marshal(failureMsg)
if nil != err {
Expand Down Expand Up @@ -368,7 +348,6 @@ func (s *Sender) queueOverflow() {
if nil != resp.Body {
io.Copy(io.Discard, resp.Body)
resp.Body.Close()

}
}

Expand All @@ -380,7 +359,6 @@ func (s *Sender) dispatcher() {
ok bool
)

Loop:
for {
// Always pull a new queue in case we have been cutoff or are shutting
// down.
Expand All @@ -407,9 +385,9 @@ Loop:
// This is only true when a queue is empty and closed, which for us
// only happens on Shutdown().
if !ok {
break Loop
break
}
s.queueDepthGauge.Add(-1.0)
s.OutgoingQueueDepth.With(prometheus.Labels{metrics.UrlLabel: s.id}).Add(-1.0)
s.mutex.RLock()
deliverUntil := s.deliverUntil
dropUntil := s.dropUntil
Expand All @@ -420,15 +398,15 @@ Loop:
now := time.Now()

if now.Before(dropUntil) {
s.droppedCutoffCounter.Add(1.0)
s.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "cut_off"}).Add(1.0)
continue
}
if now.After(deliverUntil) {
s.Empty(s.droppedExpiredCounter)
s.Empty(s.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "expired"}))
continue
}
s.workers.Acquire()
s.currentWorkersGauge.Add(1.0)
s.ConsumerDeliveryWorkersGauge.With(prometheus.Labels{metrics.UrlLabel: s.id}).Add(1.0)

go s.sink.Send(secret, accept, msg)
}
Expand All @@ -437,26 +415,6 @@ Loop:
}
}

func (s *Sender) CreateMetrics(m metrics.Metrics) {
s.deliveryRetryCounter = m.DeliveryRetryCounter
s.deliveryRetryMaxGauge = m.DeliveryRetryMaxGauge.With(prometheus.Labels{metrics.UrlLabel: s.id})
s.cutOffCounter = m.CutOffCounter.With(prometheus.Labels{metrics.UrlLabel: s.id})
s.droppedQueueFullCounter = m.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "queue_full"})
s.droppedExpiredCounter = m.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "expired"})
s.droppedExpiredBeforeQueueCounter = m.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "expired_before_queueing"})
s.droppedCutoffCounter = m.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "cut_off"})
s.droppedInvalidConfig = m.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: "invalid_config"})
s.droppedMessage = m.SlowConsumerDroppedMsgCounter.With(prometheus.Labels{metrics.UrlLabel: s.id, metrics.ReasonLabel: metrics.NetworkError})
s.droppedPanic = m.DropsDueToPanic.With(prometheus.Labels{metrics.UrlLabel: s.id})
s.queueDepthGauge = m.OutgoingQueueDepth.With(prometheus.Labels{metrics.UrlLabel: s.id})
s.renewalTimeGauge = m.ConsumerRenewalTimeGauge.With(prometheus.Labels{metrics.UrlLabel: s.id})
s.deliverUntilGauge = m.ConsumerDeliverUntilGauge.With(prometheus.Labels{metrics.UrlLabel: s.id})
s.dropUntilGauge = m.ConsumerDropUntilGauge.With(prometheus.Labels{metrics.UrlLabel: s.id})
s.currentWorkersGauge = m.ConsumerDeliveryWorkersGauge.With(prometheus.Labels{metrics.UrlLabel: s.id})
s.maxWorkersGauge = m.ConsumerMaxDeliveryWorkersGauge.With(prometheus.Labels{metrics.UrlLabel: s.id})

}

func getPartnerIds(l ancla.Register) ([]string, error) {
switch v := l.(type) {
case *ancla.RegistryV1:
Expand Down
25 changes: 4 additions & 21 deletions internal/sink/sinkWrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ import (
type WrapperIn struct {
fx.In

Tracing candlelight.Tracing
Config Config
Metrics metrics.Metrics
Tracing candlelight.Tracing
Config Config
// Metrics metrics.MetricsIn
EventType *prometheus.CounterVec `name:"incoming_event_type_count"`
Logger *zap.Logger
metrics.Metrics
}

// SinkWrapper interface is needed for unit testing.
Expand Down Expand Up @@ -65,24 +66,6 @@ type wrapper struct {

func Provide() fx.Option {
return fx.Provide(
func(in metrics.MetricsIn) metrics.Metrics {
senderMetrics := metrics.Metrics{
DeliveryCounter: in.DeliveryCounter,
DeliveryRetryCounter: in.DeliveryRetryCounter,
DeliveryRetryMaxGauge: in.DeliveryRetryMaxGauge,
CutOffCounter: in.CutOffCounter,
SlowConsumerDroppedMsgCounter: in.SlowConsumerDroppedMsgCounter,
DropsDueToPanic: in.DropsDueToPanic,
ConsumerDeliverUntilGauge: in.ConsumerDeliverUntilGauge,
ConsumerDropUntilGauge: in.ConsumerDropUntilGauge,
ConsumerDeliveryWorkersGauge: in.ConsumerDeliveryWorkersGauge,
ConsumerMaxDeliveryWorkersGauge: in.ConsumerMaxDeliveryWorkersGauge,
OutgoingQueueDepth: in.OutgoingQueueDepth,
ConsumerRenewalTimeGauge: in.ConsumerRenewalTimeGauge,
QueryLatency: in.QueryLatency,
}
return senderMetrics
},
func(in WrapperIn) (Wrapper, error) {
w, err := NewWrapper(in)
return w, err
Expand Down
Loading