Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added a metrics package and moved metrics files to package #428

Merged
merged 2 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions caduceus_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"go.uber.org/zap"

"github.com/go-kit/kit/metrics"
"github.com/xmidt-org/ancla"

"github.com/xmidt-org/wrp-go/v3"
Expand Down Expand Up @@ -55,12 +54,6 @@ type SenderConfig struct {
DisablePartnerIDs bool
}

type CaduceusMetricsRegistry interface {
NewCounter(name string) metrics.Counter
NewGauge(name string) metrics.Gauge
NewHistogram(name string, buckets int) metrics.Histogram
}

type RequestHandler interface {
HandleRequest(workerID int, msg *wrp.Message)
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/xmidt-org/caduceus

go 1.19
go 1.21

require (
emperror.dev/emperror v0.33.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2038,6 +2038,7 @@ golang.org/x/mod v0.7.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.9.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc=
golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand Down Expand Up @@ -2428,6 +2429,7 @@ golang.org/x/tools v0.3.0/go.mod h1:/rWhSS2+zyEVwoJf8YAX6L2f0ntZ7Kn/mGgAWcipA5k=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/tools v0.7.0/go.mod h1:4pg6aUX35JBAogB10C9AtvVL+qowtN4pT3CGSQex14s=
golang.org/x/tools v0.13.0 h1:Iey4qkscZuv0VvIt8E0neZjtPVQFSc870HQ448QgEmQ=
golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
8 changes: 8 additions & 0 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ import (
"github.com/xmidt-org/wrp-go/v3"
)

const (
emptyContentTypeReason = "empty_content_type"
emptyUUIDReason = "empty_uuid"
bothEmptyReason = "empty_uuid_and_content_type"
networkError = "network_err"
unknownEventType = "unknown"
)

// Below is the struct that will implement our ServeHTTP method
type ServerHandler struct {
*zap.Logger
Expand Down
17 changes: 9 additions & 8 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/spf13/viper"
"github.com/xmidt-org/ancla"
"github.com/xmidt-org/bascule/basculehelper"
"github.com/xmidt-org/caduceus/metrics"
"github.com/xmidt-org/candlelight"
"github.com/xmidt-org/httpaux/recovery"
"github.com/xmidt-org/sallust"
Expand Down Expand Up @@ -83,7 +84,7 @@ func caduceus(arguments []string) int {
f = pflag.NewFlagSet(applicationName, pflag.ContinueOnError)
v = viper.New()

logger, metricsRegistry, webPA, err = server.Initialize(applicationName, arguments, f, v, Metrics, AnclaHelperMetrics, basculehelper.AuthCapabilitiesMetrics, basculehelper.AuthValidationMetrics)
logger, metricsRegistry, webPA, err = server.Initialize(applicationName, arguments, f, v, metrics.Metrics, metrics.AnclaHelperMetrics, basculehelper.AuthCapabilitiesMetrics, basculehelper.AuthValidationMetrics)
)

if parseErr, done := printVersion(f, arguments); done {
Expand Down Expand Up @@ -157,19 +158,19 @@ func caduceus(arguments []string) int {
senderWrapper: caduceusSenderWrapper,
Logger: logger,
},
errorRequests: metricsRegistry.NewCounter(ErrorRequestBodyCounter),
emptyRequests: metricsRegistry.NewCounter(EmptyRequestBodyCounter),
invalidCount: metricsRegistry.NewCounter(DropsDueToInvalidPayload),
incomingQueueDepthMetric: metricsRegistry.NewGauge(IncomingQueueDepth),
modifiedWRPCount: metricsRegistry.NewCounter(ModifiedWRPCounter),
errorRequests: metricsRegistry.NewCounter(metrics.ErrorRequestBodyCounter),
emptyRequests: metricsRegistry.NewCounter(metrics.EmptyRequestBodyCounter),
invalidCount: metricsRegistry.NewCounter(metrics.DropsDueToInvalidPayload),
incomingQueueDepthMetric: metricsRegistry.NewGauge(metrics.IncomingQueueDepth),
modifiedWRPCount: metricsRegistry.NewCounter(metrics.ModifiedWRPCounter),
maxOutstanding: 0,
// 0 is for the unused `buckets` argument in xmetrics.Registry.NewHistogram
incomingQueueLatency: metricsRegistry.NewHistogram(IncomingQueueLatencyHistogram, 0),
incomingQueueLatency: metricsRegistry.NewHistogram(metrics.IncomingQueueLatencyHistogram, 0),
now: time.Now,
}

caduceusConfig.Webhook.Logger = logger
caduceusConfig.Listener.Measures = NewHelperMeasures(metricsRegistry)
caduceusConfig.Listener.Measures = metrics.NewHelperMeasures(metricsRegistry)
argusClientTimeout, err := newArgusClientTimeout(v)
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to parse argus client timeout config values: %v \n", err)
Expand Down
2 changes: 1 addition & 1 deletion anclaHelper.go → metrics/anclaHelper.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package metrics

import (
"github.com/xmidt-org/ancla"
Expand Down
36 changes: 6 additions & 30 deletions metrics.go → metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package main
package metrics

import (
"github.com/go-kit/kit/metrics"
// nolint:staticcheck
"github.com/xmidt-org/webpa-common/v2/xmetrics"
)

Expand All @@ -29,13 +28,11 @@ const (
IncomingQueueLatencyHistogram = "incoming_queue_latency_histogram_seconds"
)

const (
emptyContentTypeReason = "empty_content_type"
emptyUUIDReason = "empty_uuid"
bothEmptyReason = "empty_uuid_and_content_type"
networkError = "network_err"
unknownEventType = "unknown"
)
type CaduceusMetricsRegistry interface {
NewCounter(name string) metrics.Counter
NewGauge(name string) metrics.Gauge
NewHistogram(name string, buckets int) metrics.Histogram
}

func Metrics() []xmetrics.Metric {
return []xmetrics.Metric{
Expand Down Expand Up @@ -160,27 +157,6 @@ func Metrics() []xmetrics.Metric {
}
}

func CreateOutbounderMetrics(m CaduceusMetricsRegistry, c *CaduceusOutboundSender) {
c.deliveryCounter = m.NewCounter(DeliveryCounter)
c.deliveryRetryCounter = m.NewCounter(DeliveryRetryCounter)
c.deliveryRetryMaxGauge = m.NewGauge(DeliveryRetryMaxGauge).With("url", c.id)
c.cutOffCounter = m.NewCounter(SlowConsumerCounter).With("url", c.id)
c.droppedQueueFullCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "queue_full")
c.droppedExpiredCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "expired")
c.droppedExpiredBeforeQueueCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "expired_before_queueing")

c.droppedCutoffCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "cut_off")
c.droppedInvalidConfig = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "invalid_config")
c.droppedNetworkErrCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", networkError)
c.droppedPanic = m.NewCounter(DropsDueToPanic).With("url", c.id)
c.queueDepthGauge = m.NewGauge(OutgoingQueueDepth).With("url", c.id)
c.renewalTimeGauge = m.NewGauge(ConsumerRenewalTimeGauge).With("url", c.id)
c.deliverUntilGauge = m.NewGauge(ConsumerDeliverUntilGauge).With("url", c.id)
c.dropUntilGauge = m.NewGauge(ConsumerDropUntilGauge).With("url", c.id)
c.currentWorkersGauge = m.NewGauge(ConsumerDeliveryWorkersGauge).With("url", c.id)
c.maxWorkersGauge = m.NewGauge(ConsumerMaxDeliveryWorkersGauge).With("url", c.id)
}

func NewMetricWrapperMeasures(m CaduceusMetricsRegistry) metrics.Histogram {
return m.NewHistogram(QueryDurationHistogram, 11)
}
2 changes: 1 addition & 1 deletion metrics_test.go → metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*
*/

package main
package metrics

// Using Caduceus's test suite:
//
Expand Down
64 changes: 43 additions & 21 deletions outboundSender.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ import (

"go.uber.org/zap"

"github.com/go-kit/kit/metrics"
kitmetrics "github.com/go-kit/kit/metrics"
"github.com/xmidt-org/ancla"
metrics "github.com/xmidt-org/caduceus/metrics"
"github.com/xmidt-org/webpa-common/v2/device"

"github.com/xmidt-org/webpa-common/v2/semaphore"
Expand Down Expand Up @@ -95,7 +96,7 @@ type OutboundSenderFactory struct {
DeliveryInterval time.Duration

// Metrics registry.
MetricsRegistry CaduceusMetricsRegistry
MetricsRegistry metrics.CaduceusMetricsRegistry

// The logger to use.
Logger *zap.Logger
Expand All @@ -107,7 +108,7 @@ type OutboundSenderFactory struct {
// DisablePartnerIDs dictates whether or not to enforce the partner ID check.
DisablePartnerIDs bool

QueryLatency metrics.Histogram
QueryLatency kitmetrics.Histogram
}

type OutboundSender interface {
Expand All @@ -130,23 +131,23 @@ type CaduceusOutboundSender struct {
queueSize int
deliveryRetries int
deliveryInterval time.Duration
deliveryCounter metrics.Counter
deliveryRetryCounter metrics.Counter
droppedQueueFullCounter metrics.Counter
droppedCutoffCounter metrics.Counter
droppedExpiredCounter metrics.Counter
droppedExpiredBeforeQueueCounter metrics.Counter
droppedNetworkErrCounter metrics.Counter
droppedInvalidConfig metrics.Counter
droppedPanic metrics.Counter
cutOffCounter metrics.Counter
queueDepthGauge metrics.Gauge
renewalTimeGauge metrics.Gauge
deliverUntilGauge metrics.Gauge
dropUntilGauge metrics.Gauge
maxWorkersGauge metrics.Gauge
currentWorkersGauge metrics.Gauge
deliveryRetryMaxGauge metrics.Gauge
deliveryCounter kitmetrics.Counter
deliveryRetryCounter kitmetrics.Counter
droppedQueueFullCounter kitmetrics.Counter
droppedCutoffCounter kitmetrics.Counter
droppedExpiredCounter kitmetrics.Counter
droppedExpiredBeforeQueueCounter kitmetrics.Counter
droppedNetworkErrCounter kitmetrics.Counter
droppedInvalidConfig kitmetrics.Counter
droppedPanic kitmetrics.Counter
cutOffCounter kitmetrics.Counter
queueDepthGauge kitmetrics.Gauge
renewalTimeGauge kitmetrics.Gauge
deliverUntilGauge kitmetrics.Gauge
dropUntilGauge kitmetrics.Gauge
maxWorkersGauge kitmetrics.Gauge
currentWorkersGauge kitmetrics.Gauge
deliveryRetryMaxGauge kitmetrics.Gauge
wg sync.WaitGroup
cutOffPeriod time.Duration
workers semaphore.Interface
Expand Down Expand Up @@ -471,7 +472,7 @@ func (obs *CaduceusOutboundSender) isValidTimeWindow(now, dropUntil, deliverUnti
// a fresh one, counting any current messages in the queue as dropped.
// It should never close a queue, as a queue not referenced anywhere will be
// cleaned up by the garbage collector without needing to be closed.
func (obs *CaduceusOutboundSender) Empty(droppedCounter metrics.Counter) {
func (obs *CaduceusOutboundSender) Empty(droppedCounter kitmetrics.Counter) {
droppedMsgs := obs.queue.Load().(chan *wrp.Message)
obs.queue.Store(make(chan *wrp.Message, obs.queueSize))
droppedCounter.Add(float64(len(droppedMsgs)))
Expand Down Expand Up @@ -737,3 +738,24 @@ func (obs *CaduceusOutboundSender) queueOverflow() {

}
}

func CreateOutbounderMetrics(m metrics.CaduceusMetricsRegistry, c *CaduceusOutboundSender) {
c.deliveryCounter = m.NewCounter(metrics.DeliveryCounter)
c.deliveryRetryCounter = m.NewCounter(metrics.DeliveryRetryCounter)
c.deliveryRetryMaxGauge = m.NewGauge(metrics.DeliveryRetryMaxGauge).With("url", c.id)
c.cutOffCounter = m.NewCounter(metrics.SlowConsumerCounter).With("url", c.id)
c.droppedQueueFullCounter = m.NewCounter(metrics.SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "queue_full")
c.droppedExpiredCounter = m.NewCounter(metrics.SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "expired")
c.droppedExpiredBeforeQueueCounter = m.NewCounter(metrics.SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "expired_before_queueing")

c.droppedCutoffCounter = m.NewCounter(metrics.SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "cut_off")
c.droppedInvalidConfig = m.NewCounter(metrics.SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "invalid_config")
c.droppedNetworkErrCounter = m.NewCounter(metrics.SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", networkError)
c.droppedPanic = m.NewCounter(metrics.DropsDueToPanic).With("url", c.id)
c.queueDepthGauge = m.NewGauge(metrics.OutgoingQueueDepth).With("url", c.id)
c.renewalTimeGauge = m.NewGauge(metrics.ConsumerRenewalTimeGauge).With("url", c.id)
c.deliverUntilGauge = m.NewGauge(metrics.ConsumerDeliverUntilGauge).With("url", c.id)
c.dropUntilGauge = m.NewGauge(metrics.ConsumerDropUntilGauge).With("url", c.id)
c.currentWorkersGauge = m.NewGauge(metrics.ConsumerDeliveryWorkersGauge).With("url", c.id)
c.maxWorkersGauge = m.NewGauge(metrics.ConsumerMaxDeliveryWorkersGauge).With("url", c.id)
}
29 changes: 15 additions & 14 deletions outboundSender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/xmidt-org/ancla"
"github.com/xmidt-org/caduceus/metrics"
"github.com/xmidt-org/wrp-go/v3"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -163,20 +164,20 @@ func simpleFactorySetup(trans *transport, cutOffPeriod time.Duration, matcher []
//
// If a new metric within outboundsender is created it must be added here
fakeRegistry := new(mockCaduceusMetricsRegistry)
fakeRegistry.On("NewCounter", DeliveryRetryCounter).Return(fakeDC)
fakeRegistry.On("NewCounter", DeliveryCounter).Return(fakeDC)
fakeRegistry.On("NewCounter", OutgoingQueueDepth).Return(fakeDC)
fakeRegistry.On("NewCounter", SlowConsumerCounter).Return(fakeSlow)
fakeRegistry.On("NewCounter", SlowConsumerDroppedMsgCounter).Return(fakeDroppedSlow)
fakeRegistry.On("NewCounter", DropsDueToPanic).Return(fakePanicDrop)
fakeRegistry.On("NewGauge", OutgoingQueueDepth).Return(fakeQdepth)
fakeRegistry.On("NewGauge", DeliveryRetryMaxGauge).Return(fakeQdepth)
fakeRegistry.On("NewGauge", ConsumerRenewalTimeGauge).Return(fakeQdepth)
fakeRegistry.On("NewGauge", ConsumerDeliverUntilGauge).Return(fakeQdepth)
fakeRegistry.On("NewGauge", ConsumerDropUntilGauge).Return(fakeQdepth)
fakeRegistry.On("NewGauge", ConsumerDeliveryWorkersGauge).Return(fakeQdepth)
fakeRegistry.On("NewGauge", ConsumerMaxDeliveryWorkersGauge).Return(fakeQdepth)
fakeRegistry.On("NewHistogram", QueryDurationHistogram).Return(fakeLatency)
fakeRegistry.On("NewCounter", metrics.DeliveryRetryCounter).Return(fakeDC)
fakeRegistry.On("NewCounter", metrics.DeliveryCounter).Return(fakeDC)
fakeRegistry.On("NewCounter", metrics.OutgoingQueueDepth).Return(fakeDC)
fakeRegistry.On("NewCounter", metrics.SlowConsumerCounter).Return(fakeSlow)
fakeRegistry.On("NewCounter", metrics.SlowConsumerDroppedMsgCounter).Return(fakeDroppedSlow)
fakeRegistry.On("NewCounter", metrics.DropsDueToPanic).Return(fakePanicDrop)
fakeRegistry.On("NewGauge", metrics.OutgoingQueueDepth).Return(fakeQdepth)
fakeRegistry.On("NewGauge", metrics.DeliveryRetryMaxGauge).Return(fakeQdepth)
fakeRegistry.On("NewGauge", metrics.ConsumerRenewalTimeGauge).Return(fakeQdepth)
fakeRegistry.On("NewGauge", metrics.ConsumerDeliverUntilGauge).Return(fakeQdepth)
fakeRegistry.On("NewGauge", metrics.ConsumerDropUntilGauge).Return(fakeQdepth)
fakeRegistry.On("NewGauge", metrics.ConsumerDeliveryWorkersGauge).Return(fakeQdepth)
fakeRegistry.On("NewGauge", metrics.ConsumerMaxDeliveryWorkersGauge).Return(fakeQdepth)
fakeRegistry.On("NewHistogram", metrics.QueryDurationHistogram).Return(fakeLatency)

return &OutboundSenderFactory{
Listener: w,
Expand Down
19 changes: 10 additions & 9 deletions senderWrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import (
"sync"
"time"

"github.com/go-kit/kit/metrics"
kitmetrics "github.com/go-kit/kit/metrics"
"github.com/xmidt-org/ancla"
"github.com/xmidt-org/caduceus/metrics"
"github.com/xmidt-org/wrp-go/v3"
"go.uber.org/zap"
)
Expand All @@ -49,12 +50,12 @@ type SenderWrapperFactory struct {
Linger time.Duration

// Metrics registry.
MetricsRegistry CaduceusMetricsRegistry
MetricsRegistry metrics.CaduceusMetricsRegistry

// The metrics counter for dropped messages due to invalid payloads
DroppedMsgCounter metrics.Counter
DroppedMsgCounter kitmetrics.Counter

EventType metrics.Counter
EventType kitmetrics.Counter

// The logger implementation to share with OutboundSenders.
Logger *zap.Logger
Expand Down Expand Up @@ -88,9 +89,9 @@ type CaduceusSenderWrapper struct {
logger *zap.Logger
mutex sync.RWMutex
senders map[string]OutboundSender
metricsRegistry CaduceusMetricsRegistry
eventType metrics.Counter
queryLatency metrics.Histogram
metricsRegistry metrics.CaduceusMetricsRegistry
eventType kitmetrics.Counter
queryLatency kitmetrics.Histogram
wg sync.WaitGroup
shutdown chan struct{}
customPIDs []string
Expand Down Expand Up @@ -120,8 +121,8 @@ func (swf SenderWrapperFactory) New() (sw SenderWrapper, err error) {
return
}

caduceusSenderWrapper.queryLatency = NewMetricWrapperMeasures(swf.MetricsRegistry)
caduceusSenderWrapper.eventType = swf.MetricsRegistry.NewCounter(IncomingEventTypeCounter)
caduceusSenderWrapper.queryLatency = metrics.NewMetricWrapperMeasures(swf.MetricsRegistry)
caduceusSenderWrapper.eventType = swf.MetricsRegistry.NewCounter(metrics.IncomingEventTypeCounter)

caduceusSenderWrapper.senders = make(map[string]OutboundSender)
caduceusSenderWrapper.shutdown = make(chan struct{})
Expand Down
Loading