Skip to content

Commit

Permalink
Merge pull request #428 from xmidt-org/metrics-package
Browse files Browse the repository at this point in the history
added a metrics package and moved metrics files to package
  • Loading branch information
maurafortino authored Nov 3, 2023
2 parents 0188094 + f06d97f commit 47fd0b6
Show file tree
Hide file tree
Showing 12 changed files with 112 additions and 107 deletions.
7 changes: 0 additions & 7 deletions caduceus_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"go.uber.org/zap"

"github.com/go-kit/kit/metrics"
"github.com/xmidt-org/ancla"

"github.com/xmidt-org/wrp-go/v3"
Expand Down Expand Up @@ -55,12 +54,6 @@ type SenderConfig struct {
DisablePartnerIDs bool
}

type CaduceusMetricsRegistry interface {
NewCounter(name string) metrics.Counter
NewGauge(name string) metrics.Gauge
NewHistogram(name string, buckets int) metrics.Histogram
}

type RequestHandler interface {
HandleRequest(workerID int, msg *wrp.Message)
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/xmidt-org/caduceus

go 1.19
go 1.21

require (
emperror.dev/emperror v0.33.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2038,6 +2038,7 @@ golang.org/x/mod v0.7.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.9.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc=
golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand Down Expand Up @@ -2428,6 +2429,7 @@ golang.org/x/tools v0.3.0/go.mod h1:/rWhSS2+zyEVwoJf8YAX6L2f0ntZ7Kn/mGgAWcipA5k=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/tools v0.7.0/go.mod h1:4pg6aUX35JBAogB10C9AtvVL+qowtN4pT3CGSQex14s=
golang.org/x/tools v0.13.0 h1:Iey4qkscZuv0VvIt8E0neZjtPVQFSc870HQ448QgEmQ=
golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
8 changes: 8 additions & 0 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ import (
"github.com/xmidt-org/wrp-go/v3"
)

const (
emptyContentTypeReason = "empty_content_type"
emptyUUIDReason = "empty_uuid"
bothEmptyReason = "empty_uuid_and_content_type"
networkError = "network_err"
unknownEventType = "unknown"
)

// Below is the struct that will implement our ServeHTTP method
type ServerHandler struct {
*zap.Logger
Expand Down
17 changes: 9 additions & 8 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/spf13/viper"
"github.com/xmidt-org/ancla"
"github.com/xmidt-org/bascule/basculehelper"
"github.com/xmidt-org/caduceus/metrics"
"github.com/xmidt-org/candlelight"
"github.com/xmidt-org/httpaux/recovery"
"github.com/xmidt-org/sallust"
Expand Down Expand Up @@ -83,7 +84,7 @@ func caduceus(arguments []string) int {
f = pflag.NewFlagSet(applicationName, pflag.ContinueOnError)
v = viper.New()

logger, metricsRegistry, webPA, err = server.Initialize(applicationName, arguments, f, v, Metrics, AnclaHelperMetrics, basculehelper.AuthCapabilitiesMetrics, basculehelper.AuthValidationMetrics)
logger, metricsRegistry, webPA, err = server.Initialize(applicationName, arguments, f, v, metrics.Metrics, metrics.AnclaHelperMetrics, basculehelper.AuthCapabilitiesMetrics, basculehelper.AuthValidationMetrics)
)

if parseErr, done := printVersion(f, arguments); done {
Expand Down Expand Up @@ -157,19 +158,19 @@ func caduceus(arguments []string) int {
senderWrapper: caduceusSenderWrapper,
Logger: logger,
},
errorRequests: metricsRegistry.NewCounter(ErrorRequestBodyCounter),
emptyRequests: metricsRegistry.NewCounter(EmptyRequestBodyCounter),
invalidCount: metricsRegistry.NewCounter(DropsDueToInvalidPayload),
incomingQueueDepthMetric: metricsRegistry.NewGauge(IncomingQueueDepth),
modifiedWRPCount: metricsRegistry.NewCounter(ModifiedWRPCounter),
errorRequests: metricsRegistry.NewCounter(metrics.ErrorRequestBodyCounter),
emptyRequests: metricsRegistry.NewCounter(metrics.EmptyRequestBodyCounter),
invalidCount: metricsRegistry.NewCounter(metrics.DropsDueToInvalidPayload),
incomingQueueDepthMetric: metricsRegistry.NewGauge(metrics.IncomingQueueDepth),
modifiedWRPCount: metricsRegistry.NewCounter(metrics.ModifiedWRPCounter),
maxOutstanding: 0,
// 0 is for the unused `buckets` argument in xmetrics.Registry.NewHistogram
incomingQueueLatency: metricsRegistry.NewHistogram(IncomingQueueLatencyHistogram, 0),
incomingQueueLatency: metricsRegistry.NewHistogram(metrics.IncomingQueueLatencyHistogram, 0),
now: time.Now,
}

caduceusConfig.Webhook.Logger = logger
caduceusConfig.Listener.Measures = NewHelperMeasures(metricsRegistry)
caduceusConfig.Listener.Measures = metrics.NewHelperMeasures(metricsRegistry)
argusClientTimeout, err := newArgusClientTimeout(v)
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to parse argus client timeout config values: %v \n", err)
Expand Down
2 changes: 1 addition & 1 deletion anclaHelper.go → metrics/anclaHelper.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package metrics

import (
"github.com/xmidt-org/ancla"
Expand Down
36 changes: 6 additions & 30 deletions metrics.go → metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package main
package metrics

import (
"github.com/go-kit/kit/metrics"
// nolint:staticcheck
"github.com/xmidt-org/webpa-common/v2/xmetrics"
)

Expand All @@ -29,13 +28,11 @@ const (
IncomingQueueLatencyHistogram = "incoming_queue_latency_histogram_seconds"
)

const (
emptyContentTypeReason = "empty_content_type"
emptyUUIDReason = "empty_uuid"
bothEmptyReason = "empty_uuid_and_content_type"
networkError = "network_err"
unknownEventType = "unknown"
)
type CaduceusMetricsRegistry interface {
NewCounter(name string) metrics.Counter
NewGauge(name string) metrics.Gauge
NewHistogram(name string, buckets int) metrics.Histogram
}

func Metrics() []xmetrics.Metric {
return []xmetrics.Metric{
Expand Down Expand Up @@ -160,27 +157,6 @@ func Metrics() []xmetrics.Metric {
}
}

func CreateOutbounderMetrics(m CaduceusMetricsRegistry, c *CaduceusOutboundSender) {
c.deliveryCounter = m.NewCounter(DeliveryCounter)
c.deliveryRetryCounter = m.NewCounter(DeliveryRetryCounter)
c.deliveryRetryMaxGauge = m.NewGauge(DeliveryRetryMaxGauge).With("url", c.id)
c.cutOffCounter = m.NewCounter(SlowConsumerCounter).With("url", c.id)
c.droppedQueueFullCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "queue_full")
c.droppedExpiredCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "expired")
c.droppedExpiredBeforeQueueCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "expired_before_queueing")

c.droppedCutoffCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "cut_off")
c.droppedInvalidConfig = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "invalid_config")
c.droppedNetworkErrCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", networkError)
c.droppedPanic = m.NewCounter(DropsDueToPanic).With("url", c.id)
c.queueDepthGauge = m.NewGauge(OutgoingQueueDepth).With("url", c.id)
c.renewalTimeGauge = m.NewGauge(ConsumerRenewalTimeGauge).With("url", c.id)
c.deliverUntilGauge = m.NewGauge(ConsumerDeliverUntilGauge).With("url", c.id)
c.dropUntilGauge = m.NewGauge(ConsumerDropUntilGauge).With("url", c.id)
c.currentWorkersGauge = m.NewGauge(ConsumerDeliveryWorkersGauge).With("url", c.id)
c.maxWorkersGauge = m.NewGauge(ConsumerMaxDeliveryWorkersGauge).With("url", c.id)
}

func NewMetricWrapperMeasures(m CaduceusMetricsRegistry) metrics.Histogram {
return m.NewHistogram(QueryDurationHistogram, 11)
}
2 changes: 1 addition & 1 deletion metrics_test.go → metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*
*/

package main
package metrics

// Using Caduceus's test suite:
//
Expand Down
64 changes: 43 additions & 21 deletions outboundSender.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ import (

"go.uber.org/zap"

"github.com/go-kit/kit/metrics"
kitmetrics "github.com/go-kit/kit/metrics"
"github.com/xmidt-org/ancla"
metrics "github.com/xmidt-org/caduceus/metrics"
"github.com/xmidt-org/webpa-common/v2/device"

"github.com/xmidt-org/webpa-common/v2/semaphore"
Expand Down Expand Up @@ -95,7 +96,7 @@ type OutboundSenderFactory struct {
DeliveryInterval time.Duration

// Metrics registry.
MetricsRegistry CaduceusMetricsRegistry
MetricsRegistry metrics.CaduceusMetricsRegistry

// The logger to use.
Logger *zap.Logger
Expand All @@ -107,7 +108,7 @@ type OutboundSenderFactory struct {
// DisablePartnerIDs dictates whether or not to enforce the partner ID check.
DisablePartnerIDs bool

QueryLatency metrics.Histogram
QueryLatency kitmetrics.Histogram
}

type OutboundSender interface {
Expand All @@ -130,23 +131,23 @@ type CaduceusOutboundSender struct {
queueSize int
deliveryRetries int
deliveryInterval time.Duration
deliveryCounter metrics.Counter
deliveryRetryCounter metrics.Counter
droppedQueueFullCounter metrics.Counter
droppedCutoffCounter metrics.Counter
droppedExpiredCounter metrics.Counter
droppedExpiredBeforeQueueCounter metrics.Counter
droppedNetworkErrCounter metrics.Counter
droppedInvalidConfig metrics.Counter
droppedPanic metrics.Counter
cutOffCounter metrics.Counter
queueDepthGauge metrics.Gauge
renewalTimeGauge metrics.Gauge
deliverUntilGauge metrics.Gauge
dropUntilGauge metrics.Gauge
maxWorkersGauge metrics.Gauge
currentWorkersGauge metrics.Gauge
deliveryRetryMaxGauge metrics.Gauge
deliveryCounter kitmetrics.Counter
deliveryRetryCounter kitmetrics.Counter
droppedQueueFullCounter kitmetrics.Counter
droppedCutoffCounter kitmetrics.Counter
droppedExpiredCounter kitmetrics.Counter
droppedExpiredBeforeQueueCounter kitmetrics.Counter
droppedNetworkErrCounter kitmetrics.Counter
droppedInvalidConfig kitmetrics.Counter
droppedPanic kitmetrics.Counter
cutOffCounter kitmetrics.Counter
queueDepthGauge kitmetrics.Gauge
renewalTimeGauge kitmetrics.Gauge
deliverUntilGauge kitmetrics.Gauge
dropUntilGauge kitmetrics.Gauge
maxWorkersGauge kitmetrics.Gauge
currentWorkersGauge kitmetrics.Gauge
deliveryRetryMaxGauge kitmetrics.Gauge
wg sync.WaitGroup
cutOffPeriod time.Duration
workers semaphore.Interface
Expand Down Expand Up @@ -471,7 +472,7 @@ func (obs *CaduceusOutboundSender) isValidTimeWindow(now, dropUntil, deliverUnti
// a fresh one, counting any current messages in the queue as dropped.
// It should never close a queue, as a queue not referenced anywhere will be
// cleaned up by the garbage collector without needing to be closed.
func (obs *CaduceusOutboundSender) Empty(droppedCounter metrics.Counter) {
func (obs *CaduceusOutboundSender) Empty(droppedCounter kitmetrics.Counter) {
droppedMsgs := obs.queue.Load().(chan *wrp.Message)
obs.queue.Store(make(chan *wrp.Message, obs.queueSize))
droppedCounter.Add(float64(len(droppedMsgs)))
Expand Down Expand Up @@ -737,3 +738,24 @@ func (obs *CaduceusOutboundSender) queueOverflow() {

}
}

func CreateOutbounderMetrics(m metrics.CaduceusMetricsRegistry, c *CaduceusOutboundSender) {
c.deliveryCounter = m.NewCounter(metrics.DeliveryCounter)
c.deliveryRetryCounter = m.NewCounter(metrics.DeliveryRetryCounter)
c.deliveryRetryMaxGauge = m.NewGauge(metrics.DeliveryRetryMaxGauge).With("url", c.id)
c.cutOffCounter = m.NewCounter(metrics.SlowConsumerCounter).With("url", c.id)
c.droppedQueueFullCounter = m.NewCounter(metrics.SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "queue_full")
c.droppedExpiredCounter = m.NewCounter(metrics.SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "expired")
c.droppedExpiredBeforeQueueCounter = m.NewCounter(metrics.SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "expired_before_queueing")

c.droppedCutoffCounter = m.NewCounter(metrics.SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "cut_off")
c.droppedInvalidConfig = m.NewCounter(metrics.SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "invalid_config")
c.droppedNetworkErrCounter = m.NewCounter(metrics.SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", networkError)
c.droppedPanic = m.NewCounter(metrics.DropsDueToPanic).With("url", c.id)
c.queueDepthGauge = m.NewGauge(metrics.OutgoingQueueDepth).With("url", c.id)
c.renewalTimeGauge = m.NewGauge(metrics.ConsumerRenewalTimeGauge).With("url", c.id)
c.deliverUntilGauge = m.NewGauge(metrics.ConsumerDeliverUntilGauge).With("url", c.id)
c.dropUntilGauge = m.NewGauge(metrics.ConsumerDropUntilGauge).With("url", c.id)
c.currentWorkersGauge = m.NewGauge(metrics.ConsumerDeliveryWorkersGauge).With("url", c.id)
c.maxWorkersGauge = m.NewGauge(metrics.ConsumerMaxDeliveryWorkersGauge).With("url", c.id)
}
29 changes: 15 additions & 14 deletions outboundSender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/xmidt-org/ancla"
"github.com/xmidt-org/caduceus/metrics"
"github.com/xmidt-org/wrp-go/v3"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -163,20 +164,20 @@ func simpleFactorySetup(trans *transport, cutOffPeriod time.Duration, matcher []
//
// If a new metric within outboundsender is created it must be added here
fakeRegistry := new(mockCaduceusMetricsRegistry)
fakeRegistry.On("NewCounter", DeliveryRetryCounter).Return(fakeDC)
fakeRegistry.On("NewCounter", DeliveryCounter).Return(fakeDC)
fakeRegistry.On("NewCounter", OutgoingQueueDepth).Return(fakeDC)
fakeRegistry.On("NewCounter", SlowConsumerCounter).Return(fakeSlow)
fakeRegistry.On("NewCounter", SlowConsumerDroppedMsgCounter).Return(fakeDroppedSlow)
fakeRegistry.On("NewCounter", DropsDueToPanic).Return(fakePanicDrop)
fakeRegistry.On("NewGauge", OutgoingQueueDepth).Return(fakeQdepth)
fakeRegistry.On("NewGauge", DeliveryRetryMaxGauge).Return(fakeQdepth)
fakeRegistry.On("NewGauge", ConsumerRenewalTimeGauge).Return(fakeQdepth)
fakeRegistry.On("NewGauge", ConsumerDeliverUntilGauge).Return(fakeQdepth)
fakeRegistry.On("NewGauge", ConsumerDropUntilGauge).Return(fakeQdepth)
fakeRegistry.On("NewGauge", ConsumerDeliveryWorkersGauge).Return(fakeQdepth)
fakeRegistry.On("NewGauge", ConsumerMaxDeliveryWorkersGauge).Return(fakeQdepth)
fakeRegistry.On("NewHistogram", QueryDurationHistogram).Return(fakeLatency)
fakeRegistry.On("NewCounter", metrics.DeliveryRetryCounter).Return(fakeDC)
fakeRegistry.On("NewCounter", metrics.DeliveryCounter).Return(fakeDC)
fakeRegistry.On("NewCounter", metrics.OutgoingQueueDepth).Return(fakeDC)
fakeRegistry.On("NewCounter", metrics.SlowConsumerCounter).Return(fakeSlow)
fakeRegistry.On("NewCounter", metrics.SlowConsumerDroppedMsgCounter).Return(fakeDroppedSlow)
fakeRegistry.On("NewCounter", metrics.DropsDueToPanic).Return(fakePanicDrop)
fakeRegistry.On("NewGauge", metrics.OutgoingQueueDepth).Return(fakeQdepth)
fakeRegistry.On("NewGauge", metrics.DeliveryRetryMaxGauge).Return(fakeQdepth)
fakeRegistry.On("NewGauge", metrics.ConsumerRenewalTimeGauge).Return(fakeQdepth)
fakeRegistry.On("NewGauge", metrics.ConsumerDeliverUntilGauge).Return(fakeQdepth)
fakeRegistry.On("NewGauge", metrics.ConsumerDropUntilGauge).Return(fakeQdepth)
fakeRegistry.On("NewGauge", metrics.ConsumerDeliveryWorkersGauge).Return(fakeQdepth)
fakeRegistry.On("NewGauge", metrics.ConsumerMaxDeliveryWorkersGauge).Return(fakeQdepth)
fakeRegistry.On("NewHistogram", metrics.QueryDurationHistogram).Return(fakeLatency)

return &OutboundSenderFactory{
Listener: w,
Expand Down
19 changes: 10 additions & 9 deletions senderWrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import (
"sync"
"time"

"github.com/go-kit/kit/metrics"
kitmetrics "github.com/go-kit/kit/metrics"
"github.com/xmidt-org/ancla"
"github.com/xmidt-org/caduceus/metrics"
"github.com/xmidt-org/wrp-go/v3"
"go.uber.org/zap"
)
Expand All @@ -49,12 +50,12 @@ type SenderWrapperFactory struct {
Linger time.Duration

// Metrics registry.
MetricsRegistry CaduceusMetricsRegistry
MetricsRegistry metrics.CaduceusMetricsRegistry

// The metrics counter for dropped messages due to invalid payloads
DroppedMsgCounter metrics.Counter
DroppedMsgCounter kitmetrics.Counter

EventType metrics.Counter
EventType kitmetrics.Counter

// The logger implementation to share with OutboundSenders.
Logger *zap.Logger
Expand Down Expand Up @@ -88,9 +89,9 @@ type CaduceusSenderWrapper struct {
logger *zap.Logger
mutex sync.RWMutex
senders map[string]OutboundSender
metricsRegistry CaduceusMetricsRegistry
eventType metrics.Counter
queryLatency metrics.Histogram
metricsRegistry metrics.CaduceusMetricsRegistry
eventType kitmetrics.Counter
queryLatency kitmetrics.Histogram
wg sync.WaitGroup
shutdown chan struct{}
customPIDs []string
Expand Down Expand Up @@ -120,8 +121,8 @@ func (swf SenderWrapperFactory) New() (sw SenderWrapper, err error) {
return
}

caduceusSenderWrapper.queryLatency = NewMetricWrapperMeasures(swf.MetricsRegistry)
caduceusSenderWrapper.eventType = swf.MetricsRegistry.NewCounter(IncomingEventTypeCounter)
caduceusSenderWrapper.queryLatency = metrics.NewMetricWrapperMeasures(swf.MetricsRegistry)
caduceusSenderWrapper.eventType = swf.MetricsRegistry.NewCounter(metrics.IncomingEventTypeCounter)

caduceusSenderWrapper.senders = make(map[string]OutboundSender)
caduceusSenderWrapper.shutdown = make(chan struct{})
Expand Down
Loading

0 comments on commit 47fd0b6

Please sign in to comment.