diff --git a/caduceus_type.go b/caduceus_type.go index d736c13c..1620889f 100644 --- a/caduceus_type.go +++ b/caduceus_type.go @@ -21,7 +21,6 @@ import ( "go.uber.org/zap" - "github.com/go-kit/kit/metrics" "github.com/xmidt-org/ancla" "github.com/xmidt-org/wrp-go/v3" @@ -55,12 +54,6 @@ type SenderConfig struct { DisablePartnerIDs bool } -type CaduceusMetricsRegistry interface { - NewCounter(name string) metrics.Counter - NewGauge(name string) metrics.Gauge - NewHistogram(name string, buckets int) metrics.Histogram -} - type RequestHandler interface { HandleRequest(workerID int, msg *wrp.Message) } diff --git a/go.mod b/go.mod index 8654b662..adf3cca3 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/xmidt-org/caduceus -go 1.19 +go 1.21 require ( emperror.dev/emperror v0.33.0 diff --git a/go.sum b/go.sum index ec8da82c..66c264e7 100644 --- a/go.sum +++ b/go.sum @@ -2038,6 +2038,7 @@ golang.org/x/mod v0.7.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.9.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc= +golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -2428,6 +2429,7 @@ golang.org/x/tools v0.3.0/go.mod h1:/rWhSS2+zyEVwoJf8YAX6L2f0ntZ7Kn/mGgAWcipA5k= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/tools v0.7.0/go.mod h1:4pg6aUX35JBAogB10C9AtvVL+qowtN4pT3CGSQex14s= golang.org/x/tools v0.13.0 h1:Iey4qkscZuv0VvIt8E0neZjtPVQFSc870HQ448QgEmQ= +golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/http.go b/http.go index 8b57766e..b4e96748 100644 --- a/http.go +++ b/http.go @@ -32,6 +32,14 @@ import ( "github.com/xmidt-org/wrp-go/v3" ) +const ( + emptyContentTypeReason = "empty_content_type" + emptyUUIDReason = "empty_uuid" + bothEmptyReason = "empty_uuid_and_content_type" + networkError = "network_err" + unknownEventType = "unknown" +) + // Below is the struct that will implement our ServeHTTP method type ServerHandler struct { *zap.Logger diff --git a/main.go b/main.go index 63baca3a..8aa94139 100644 --- a/main.go +++ b/main.go @@ -38,6 +38,7 @@ import ( "github.com/spf13/viper" "github.com/xmidt-org/ancla" "github.com/xmidt-org/bascule/basculehelper" + "github.com/xmidt-org/caduceus/metrics" "github.com/xmidt-org/candlelight" "github.com/xmidt-org/httpaux/recovery" "github.com/xmidt-org/sallust" @@ -83,7 +84,7 @@ func caduceus(arguments []string) int { f = pflag.NewFlagSet(applicationName, pflag.ContinueOnError) v = viper.New() - logger, metricsRegistry, webPA, err = server.Initialize(applicationName, arguments, f, v, Metrics, AnclaHelperMetrics, basculehelper.AuthCapabilitiesMetrics, basculehelper.AuthValidationMetrics) + logger, metricsRegistry, webPA, err = server.Initialize(applicationName, arguments, f, v, metrics.Metrics, metrics.AnclaHelperMetrics, basculehelper.AuthCapabilitiesMetrics, basculehelper.AuthValidationMetrics) ) if parseErr, done := printVersion(f, arguments); done { @@ -157,19 +158,19 @@ func caduceus(arguments []string) int { senderWrapper: caduceusSenderWrapper, Logger: logger, }, - errorRequests: metricsRegistry.NewCounter(ErrorRequestBodyCounter), - emptyRequests: metricsRegistry.NewCounter(EmptyRequestBodyCounter), - invalidCount: metricsRegistry.NewCounter(DropsDueToInvalidPayload), - incomingQueueDepthMetric: metricsRegistry.NewGauge(IncomingQueueDepth), - modifiedWRPCount: metricsRegistry.NewCounter(ModifiedWRPCounter), + errorRequests: metricsRegistry.NewCounter(metrics.ErrorRequestBodyCounter), + emptyRequests: metricsRegistry.NewCounter(metrics.EmptyRequestBodyCounter), + invalidCount: metricsRegistry.NewCounter(metrics.DropsDueToInvalidPayload), + incomingQueueDepthMetric: metricsRegistry.NewGauge(metrics.IncomingQueueDepth), + modifiedWRPCount: metricsRegistry.NewCounter(metrics.ModifiedWRPCounter), maxOutstanding: 0, // 0 is for the unused `buckets` argument in xmetrics.Registry.NewHistogram - incomingQueueLatency: metricsRegistry.NewHistogram(IncomingQueueLatencyHistogram, 0), + incomingQueueLatency: metricsRegistry.NewHistogram(metrics.IncomingQueueLatencyHistogram, 0), now: time.Now, } caduceusConfig.Webhook.Logger = logger - caduceusConfig.Listener.Measures = NewHelperMeasures(metricsRegistry) + caduceusConfig.Listener.Measures = metrics.NewHelperMeasures(metricsRegistry) argusClientTimeout, err := newArgusClientTimeout(v) if err != nil { fmt.Fprintf(os.Stderr, "Unable to parse argus client timeout config values: %v \n", err) diff --git a/anclaHelper.go b/metrics/anclaHelper.go similarity index 97% rename from anclaHelper.go rename to metrics/anclaHelper.go index dabb7486..7bb69a66 100644 --- a/anclaHelper.go +++ b/metrics/anclaHelper.go @@ -1,4 +1,4 @@ -package main +package metrics import ( "github.com/xmidt-org/ancla" diff --git a/metrics.go b/metrics/metrics.go similarity index 73% rename from metrics.go rename to metrics/metrics.go index a2666983..8debb502 100644 --- a/metrics.go +++ b/metrics/metrics.go @@ -1,8 +1,7 @@ -package main +package metrics import ( "github.com/go-kit/kit/metrics" - // nolint:staticcheck "github.com/xmidt-org/webpa-common/v2/xmetrics" ) @@ -29,13 +28,11 @@ const ( IncomingQueueLatencyHistogram = "incoming_queue_latency_histogram_seconds" ) -const ( - emptyContentTypeReason = "empty_content_type" - emptyUUIDReason = "empty_uuid" - bothEmptyReason = "empty_uuid_and_content_type" - networkError = "network_err" - unknownEventType = "unknown" -) +type CaduceusMetricsRegistry interface { + NewCounter(name string) metrics.Counter + NewGauge(name string) metrics.Gauge + NewHistogram(name string, buckets int) metrics.Histogram +} func Metrics() []xmetrics.Metric { return []xmetrics.Metric{ @@ -160,27 +157,6 @@ func Metrics() []xmetrics.Metric { } } -func CreateOutbounderMetrics(m CaduceusMetricsRegistry, c *CaduceusOutboundSender) { - c.deliveryCounter = m.NewCounter(DeliveryCounter) - c.deliveryRetryCounter = m.NewCounter(DeliveryRetryCounter) - c.deliveryRetryMaxGauge = m.NewGauge(DeliveryRetryMaxGauge).With("url", c.id) - c.cutOffCounter = m.NewCounter(SlowConsumerCounter).With("url", c.id) - c.droppedQueueFullCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "queue_full") - c.droppedExpiredCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "expired") - c.droppedExpiredBeforeQueueCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "expired_before_queueing") - - c.droppedCutoffCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "cut_off") - c.droppedInvalidConfig = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "invalid_config") - c.droppedNetworkErrCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", networkError) - c.droppedPanic = m.NewCounter(DropsDueToPanic).With("url", c.id) - c.queueDepthGauge = m.NewGauge(OutgoingQueueDepth).With("url", c.id) - c.renewalTimeGauge = m.NewGauge(ConsumerRenewalTimeGauge).With("url", c.id) - c.deliverUntilGauge = m.NewGauge(ConsumerDeliverUntilGauge).With("url", c.id) - c.dropUntilGauge = m.NewGauge(ConsumerDropUntilGauge).With("url", c.id) - c.currentWorkersGauge = m.NewGauge(ConsumerDeliveryWorkersGauge).With("url", c.id) - c.maxWorkersGauge = m.NewGauge(ConsumerMaxDeliveryWorkersGauge).With("url", c.id) -} - func NewMetricWrapperMeasures(m CaduceusMetricsRegistry) metrics.Histogram { return m.NewHistogram(QueryDurationHistogram, 11) } diff --git a/metrics_test.go b/metrics/metrics_test.go similarity index 99% rename from metrics_test.go rename to metrics/metrics_test.go index d764b352..4b584010 100644 --- a/metrics_test.go +++ b/metrics/metrics_test.go @@ -15,7 +15,7 @@ * */ -package main +package metrics // Using Caduceus's test suite: // diff --git a/outboundSender.go b/outboundSender.go index d5ec61e8..77a15985 100644 --- a/outboundSender.go +++ b/outboundSender.go @@ -38,8 +38,9 @@ import ( "go.uber.org/zap" - "github.com/go-kit/kit/metrics" + kitmetrics "github.com/go-kit/kit/metrics" "github.com/xmidt-org/ancla" + metrics "github.com/xmidt-org/caduceus/metrics" "github.com/xmidt-org/webpa-common/v2/device" "github.com/xmidt-org/webpa-common/v2/semaphore" @@ -95,7 +96,7 @@ type OutboundSenderFactory struct { DeliveryInterval time.Duration // Metrics registry. - MetricsRegistry CaduceusMetricsRegistry + MetricsRegistry metrics.CaduceusMetricsRegistry // The logger to use. Logger *zap.Logger @@ -107,7 +108,7 @@ type OutboundSenderFactory struct { // DisablePartnerIDs dictates whether or not to enforce the partner ID check. DisablePartnerIDs bool - QueryLatency metrics.Histogram + QueryLatency kitmetrics.Histogram } type OutboundSender interface { @@ -130,23 +131,23 @@ type CaduceusOutboundSender struct { queueSize int deliveryRetries int deliveryInterval time.Duration - deliveryCounter metrics.Counter - deliveryRetryCounter metrics.Counter - droppedQueueFullCounter metrics.Counter - droppedCutoffCounter metrics.Counter - droppedExpiredCounter metrics.Counter - droppedExpiredBeforeQueueCounter metrics.Counter - droppedNetworkErrCounter metrics.Counter - droppedInvalidConfig metrics.Counter - droppedPanic metrics.Counter - cutOffCounter metrics.Counter - queueDepthGauge metrics.Gauge - renewalTimeGauge metrics.Gauge - deliverUntilGauge metrics.Gauge - dropUntilGauge metrics.Gauge - maxWorkersGauge metrics.Gauge - currentWorkersGauge metrics.Gauge - deliveryRetryMaxGauge metrics.Gauge + deliveryCounter kitmetrics.Counter + deliveryRetryCounter kitmetrics.Counter + droppedQueueFullCounter kitmetrics.Counter + droppedCutoffCounter kitmetrics.Counter + droppedExpiredCounter kitmetrics.Counter + droppedExpiredBeforeQueueCounter kitmetrics.Counter + droppedNetworkErrCounter kitmetrics.Counter + droppedInvalidConfig kitmetrics.Counter + droppedPanic kitmetrics.Counter + cutOffCounter kitmetrics.Counter + queueDepthGauge kitmetrics.Gauge + renewalTimeGauge kitmetrics.Gauge + deliverUntilGauge kitmetrics.Gauge + dropUntilGauge kitmetrics.Gauge + maxWorkersGauge kitmetrics.Gauge + currentWorkersGauge kitmetrics.Gauge + deliveryRetryMaxGauge kitmetrics.Gauge wg sync.WaitGroup cutOffPeriod time.Duration workers semaphore.Interface @@ -471,7 +472,7 @@ func (obs *CaduceusOutboundSender) isValidTimeWindow(now, dropUntil, deliverUnti // a fresh one, counting any current messages in the queue as dropped. // It should never close a queue, as a queue not referenced anywhere will be // cleaned up by the garbage collector without needing to be closed. -func (obs *CaduceusOutboundSender) Empty(droppedCounter metrics.Counter) { +func (obs *CaduceusOutboundSender) Empty(droppedCounter kitmetrics.Counter) { droppedMsgs := obs.queue.Load().(chan *wrp.Message) obs.queue.Store(make(chan *wrp.Message, obs.queueSize)) droppedCounter.Add(float64(len(droppedMsgs))) @@ -737,3 +738,24 @@ func (obs *CaduceusOutboundSender) queueOverflow() { } } + +func CreateOutbounderMetrics(m metrics.CaduceusMetricsRegistry, c *CaduceusOutboundSender) { + c.deliveryCounter = m.NewCounter(metrics.DeliveryCounter) + c.deliveryRetryCounter = m.NewCounter(metrics.DeliveryRetryCounter) + c.deliveryRetryMaxGauge = m.NewGauge(metrics.DeliveryRetryMaxGauge).With("url", c.id) + c.cutOffCounter = m.NewCounter(metrics.SlowConsumerCounter).With("url", c.id) + c.droppedQueueFullCounter = m.NewCounter(metrics.SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "queue_full") + c.droppedExpiredCounter = m.NewCounter(metrics.SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "expired") + c.droppedExpiredBeforeQueueCounter = m.NewCounter(metrics.SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "expired_before_queueing") + + c.droppedCutoffCounter = m.NewCounter(metrics.SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "cut_off") + c.droppedInvalidConfig = m.NewCounter(metrics.SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "invalid_config") + c.droppedNetworkErrCounter = m.NewCounter(metrics.SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", networkError) + c.droppedPanic = m.NewCounter(metrics.DropsDueToPanic).With("url", c.id) + c.queueDepthGauge = m.NewGauge(metrics.OutgoingQueueDepth).With("url", c.id) + c.renewalTimeGauge = m.NewGauge(metrics.ConsumerRenewalTimeGauge).With("url", c.id) + c.deliverUntilGauge = m.NewGauge(metrics.ConsumerDeliverUntilGauge).With("url", c.id) + c.dropUntilGauge = m.NewGauge(metrics.ConsumerDropUntilGauge).With("url", c.id) + c.currentWorkersGauge = m.NewGauge(metrics.ConsumerDeliveryWorkersGauge).With("url", c.id) + c.maxWorkersGauge = m.NewGauge(metrics.ConsumerMaxDeliveryWorkersGauge).With("url", c.id) +} \ No newline at end of file diff --git a/outboundSender_test.go b/outboundSender_test.go index 9569f690..d7489731 100644 --- a/outboundSender_test.go +++ b/outboundSender_test.go @@ -24,6 +24,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/xmidt-org/ancla" + "github.com/xmidt-org/caduceus/metrics" "github.com/xmidt-org/wrp-go/v3" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -163,20 +164,20 @@ func simpleFactorySetup(trans *transport, cutOffPeriod time.Duration, matcher [] // // If a new metric within outboundsender is created it must be added here fakeRegistry := new(mockCaduceusMetricsRegistry) - fakeRegistry.On("NewCounter", DeliveryRetryCounter).Return(fakeDC) - fakeRegistry.On("NewCounter", DeliveryCounter).Return(fakeDC) - fakeRegistry.On("NewCounter", OutgoingQueueDepth).Return(fakeDC) - fakeRegistry.On("NewCounter", SlowConsumerCounter).Return(fakeSlow) - fakeRegistry.On("NewCounter", SlowConsumerDroppedMsgCounter).Return(fakeDroppedSlow) - fakeRegistry.On("NewCounter", DropsDueToPanic).Return(fakePanicDrop) - fakeRegistry.On("NewGauge", OutgoingQueueDepth).Return(fakeQdepth) - fakeRegistry.On("NewGauge", DeliveryRetryMaxGauge).Return(fakeQdepth) - fakeRegistry.On("NewGauge", ConsumerRenewalTimeGauge).Return(fakeQdepth) - fakeRegistry.On("NewGauge", ConsumerDeliverUntilGauge).Return(fakeQdepth) - fakeRegistry.On("NewGauge", ConsumerDropUntilGauge).Return(fakeQdepth) - fakeRegistry.On("NewGauge", ConsumerDeliveryWorkersGauge).Return(fakeQdepth) - fakeRegistry.On("NewGauge", ConsumerMaxDeliveryWorkersGauge).Return(fakeQdepth) - fakeRegistry.On("NewHistogram", QueryDurationHistogram).Return(fakeLatency) + fakeRegistry.On("NewCounter", metrics.DeliveryRetryCounter).Return(fakeDC) + fakeRegistry.On("NewCounter", metrics.DeliveryCounter).Return(fakeDC) + fakeRegistry.On("NewCounter", metrics.OutgoingQueueDepth).Return(fakeDC) + fakeRegistry.On("NewCounter", metrics.SlowConsumerCounter).Return(fakeSlow) + fakeRegistry.On("NewCounter", metrics.SlowConsumerDroppedMsgCounter).Return(fakeDroppedSlow) + fakeRegistry.On("NewCounter", metrics.DropsDueToPanic).Return(fakePanicDrop) + fakeRegistry.On("NewGauge", metrics.OutgoingQueueDepth).Return(fakeQdepth) + fakeRegistry.On("NewGauge", metrics.DeliveryRetryMaxGauge).Return(fakeQdepth) + fakeRegistry.On("NewGauge", metrics.ConsumerRenewalTimeGauge).Return(fakeQdepth) + fakeRegistry.On("NewGauge", metrics.ConsumerDeliverUntilGauge).Return(fakeQdepth) + fakeRegistry.On("NewGauge", metrics.ConsumerDropUntilGauge).Return(fakeQdepth) + fakeRegistry.On("NewGauge", metrics.ConsumerDeliveryWorkersGauge).Return(fakeQdepth) + fakeRegistry.On("NewGauge", metrics.ConsumerMaxDeliveryWorkersGauge).Return(fakeQdepth) + fakeRegistry.On("NewHistogram", metrics.QueryDurationHistogram).Return(fakeLatency) return &OutboundSenderFactory{ Listener: w, diff --git a/senderWrapper.go b/senderWrapper.go index e9f2d3f0..318b3ce6 100644 --- a/senderWrapper.go +++ b/senderWrapper.go @@ -21,8 +21,9 @@ import ( "sync" "time" - "github.com/go-kit/kit/metrics" + kitmetrics "github.com/go-kit/kit/metrics" "github.com/xmidt-org/ancla" + "github.com/xmidt-org/caduceus/metrics" "github.com/xmidt-org/wrp-go/v3" "go.uber.org/zap" ) @@ -49,12 +50,12 @@ type SenderWrapperFactory struct { Linger time.Duration // Metrics registry. - MetricsRegistry CaduceusMetricsRegistry + MetricsRegistry metrics.CaduceusMetricsRegistry // The metrics counter for dropped messages due to invalid payloads - DroppedMsgCounter metrics.Counter + DroppedMsgCounter kitmetrics.Counter - EventType metrics.Counter + EventType kitmetrics.Counter // The logger implementation to share with OutboundSenders. Logger *zap.Logger @@ -88,9 +89,9 @@ type CaduceusSenderWrapper struct { logger *zap.Logger mutex sync.RWMutex senders map[string]OutboundSender - metricsRegistry CaduceusMetricsRegistry - eventType metrics.Counter - queryLatency metrics.Histogram + metricsRegistry metrics.CaduceusMetricsRegistry + eventType kitmetrics.Counter + queryLatency kitmetrics.Histogram wg sync.WaitGroup shutdown chan struct{} customPIDs []string @@ -120,8 +121,8 @@ func (swf SenderWrapperFactory) New() (sw SenderWrapper, err error) { return } - caduceusSenderWrapper.queryLatency = NewMetricWrapperMeasures(swf.MetricsRegistry) - caduceusSenderWrapper.eventType = swf.MetricsRegistry.NewCounter(IncomingEventTypeCounter) + caduceusSenderWrapper.queryLatency = metrics.NewMetricWrapperMeasures(swf.MetricsRegistry) + caduceusSenderWrapper.eventType = swf.MetricsRegistry.NewCounter(metrics.IncomingEventTypeCounter) caduceusSenderWrapper.senders = make(map[string]OutboundSender) caduceusSenderWrapper.shutdown = make(chan struct{}) diff --git a/senderWrapper_test.go b/senderWrapper_test.go index ac5d2d83..e11c8e7a 100644 --- a/senderWrapper_test.go +++ b/senderWrapper_test.go @@ -26,6 +26,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/xmidt-org/ancla" + "github.com/xmidt-org/caduceus/metrics" "github.com/xmidt-org/webpa-common/v2/adapter" "github.com/xmidt-org/wrp-go/v3" @@ -114,21 +115,21 @@ func getFakeFactory() *SenderWrapperFactory { On("With", []string{"content_type", "other"}).Return(fakeIgnore) fakeRegistry := new(mockCaduceusMetricsRegistry) - fakeRegistry.On("NewCounter", DropsDueToInvalidPayload).Return(fakeDDTIP) - fakeRegistry.On("NewCounter", DeliveryRetryCounter).Return(fakeIgnore) - fakeRegistry.On("NewCounter", DeliveryCounter).Return(fakeIgnore) - fakeRegistry.On("NewCounter", SlowConsumerCounter).Return(fakeIgnore) - fakeRegistry.On("NewCounter", SlowConsumerDroppedMsgCounter).Return(fakeIgnore) - fakeRegistry.On("NewCounter", IncomingEventTypeCounter).Return(fakeIgnore) - fakeRegistry.On("NewCounter", DropsDueToPanic).Return(fakeIgnore) - fakeRegistry.On("NewGauge", OutgoingQueueDepth).Return(fakeGauge) - fakeRegistry.On("NewGauge", DeliveryRetryMaxGauge).Return(fakeGauge) - fakeRegistry.On("NewGauge", ConsumerRenewalTimeGauge).Return(fakeGauge) - fakeRegistry.On("NewGauge", ConsumerDeliverUntilGauge).Return(fakeGauge) - fakeRegistry.On("NewGauge", ConsumerDropUntilGauge).Return(fakeGauge) - fakeRegistry.On("NewGauge", ConsumerDeliveryWorkersGauge).Return(fakeGauge) - fakeRegistry.On("NewGauge", ConsumerMaxDeliveryWorkersGauge).Return(fakeGauge) - fakeRegistry.On("NewHistogram", QueryDurationHistogram).Return(fakeLatency) + fakeRegistry.On("NewCounter", metrics.DropsDueToInvalidPayload).Return(fakeDDTIP) + fakeRegistry.On("NewCounter", metrics.DeliveryRetryCounter).Return(fakeIgnore) + fakeRegistry.On("NewCounter", metrics.DeliveryCounter).Return(fakeIgnore) + fakeRegistry.On("NewCounter", metrics.SlowConsumerCounter).Return(fakeIgnore) + fakeRegistry.On("NewCounter", metrics.SlowConsumerDroppedMsgCounter).Return(fakeIgnore) + fakeRegistry.On("NewCounter", metrics.IncomingEventTypeCounter).Return(fakeIgnore) + fakeRegistry.On("NewCounter", metrics.DropsDueToPanic).Return(fakeIgnore) + fakeRegistry.On("NewGauge", metrics.OutgoingQueueDepth).Return(fakeGauge) + fakeRegistry.On("NewGauge", metrics.DeliveryRetryMaxGauge).Return(fakeGauge) + fakeRegistry.On("NewGauge", metrics.ConsumerRenewalTimeGauge).Return(fakeGauge) + fakeRegistry.On("NewGauge", metrics.ConsumerDeliverUntilGauge).Return(fakeGauge) + fakeRegistry.On("NewGauge", metrics.ConsumerDropUntilGauge).Return(fakeGauge) + fakeRegistry.On("NewGauge", metrics.ConsumerDeliveryWorkersGauge).Return(fakeGauge) + fakeRegistry.On("NewGauge", metrics.ConsumerMaxDeliveryWorkersGauge).Return(fakeGauge) + fakeRegistry.On("NewHistogram", metrics.QueryDurationHistogram).Return(fakeLatency) return &SenderWrapperFactory{ NumWorkersPerSender: 10,