From 06f6a6a07d1c67d019563cfa1611c77925033e7b Mon Sep 17 00:00:00 2001 From: Owen Cabalceta Date: Sat, 24 Feb 2024 19:27:18 -0500 Subject: [PATCH 1/6] patch: improve event delivery related metrics --- http.go | 4 +- httpClient.go | 47 ++++++++++++++++++++--- httpClient_test.go | 29 +++++++------- http_test.go | 16 ++++---- metrics.go | 85 ++++++++++++++++++++++++++---------------- outboundSender.go | 20 +++++++--- outboundSender_test.go | 52 +++++++++++++------------- senderWrapper.go | 4 +- senderWrapper_test.go | 58 ++++++++++++++-------------- 9 files changed, 192 insertions(+), 123 deletions(-) diff --git a/http.go b/http.go index 07cbacd9..6aa978c2 100644 --- a/http.go +++ b/http.go @@ -125,7 +125,7 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R func (sh *ServerHandler) recordQueueLatencyToHistogram(startTime time.Time, eventType string) { endTime := sh.now() - sh.incomingQueueLatency.With("event", eventType).Observe(endTime.Sub(startTime).Seconds()) + sh.incomingQueueLatency.With(eventLabel, eventType).Observe(endTime.Sub(startTime).Seconds()) } func (sh *ServerHandler) fixWrp(msg *wrp.Message) *wrp.Message { @@ -150,7 +150,7 @@ func (sh *ServerHandler) fixWrp(msg *wrp.Message) *wrp.Message { } if reason != "" { - sh.modifiedWRPCount.With("reason", reason).Add(1.0) + sh.modifiedWRPCount.With(reasonLabel, reason).Add(1.0) } return msg diff --git a/httpClient.go b/httpClient.go index 07033f5a..428b724f 100644 --- a/httpClient.go +++ b/httpClient.go @@ -4,7 +4,9 @@ package main import ( + "context" "errors" + "net" "net/http" "strconv" "time" @@ -54,16 +56,51 @@ func (m *metricWrapper) roundTripper(next httpClient) httpClient { startTime := m.now() resp, err := next.Do(req) endTime := m.now() - code := networkError - - if err == nil { + code := genericDoReason + reason := noErr + if err != nil { + reason = getDoErrReason(err) + if resp != nil { + code = strconv.Itoa(resp.StatusCode) + } + } else { code = strconv.Itoa(resp.StatusCode) } // find time difference, add to metric - var latency = endTime.Sub(startTime) - m.queryLatency.With("code", code).Observe(latency.Seconds()) + m.queryLatency.With(urlLabel, req.URL.String(), reasonLabel, reason, codeLabel, code).Observe(endTime.Sub(startTime).Seconds()) return resp, err }) } + +func getDoErrReason(e error) string { + var d *net.DNSError + if e == nil { + return noErr + } + if errors.Is(e, context.DeadlineExceeded) { + return deadlineExceededReason + } else if errors.Is(e, context.Canceled) { + return contextCanceledReason + } else if errors.Is(e, &net.AddrError{}) { + return addressErrReason + } else if errors.Is(e, &net.ParseError{}) { + return parseAddrErrReason + } else if errors.Is(e, net.InvalidAddrError("")) { + return invalidAddrReason + } else if errors.As(e, &d) { + if d.IsNotFound { + return hostNotFoundReason + } + return dnsErrReason + } else if errors.Is(e, net.ErrClosed) { + return connClosedReason + } else if errors.Is(e, &net.OpError{}) { + return opErrReason + } else if errors.Is(e, net.UnknownNetworkError("")) { + return networkErrReason + } + + return genericDoReason +} diff --git a/httpClient_test.go b/httpClient_test.go index 19fbc596..e50e1756 100644 --- a/httpClient_test.go +++ b/httpClient_test.go @@ -7,6 +7,7 @@ import ( "errors" "io" "net/http" + "strconv" "testing" "time" @@ -16,10 +17,8 @@ import ( ) func TestRoundTripper(t *testing.T) { - errTest := errors.New("test error") date1 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 0, time.UTC) date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) - tests := []struct { description string startTime time.Time @@ -33,32 +32,34 @@ func TestRoundTripper(t *testing.T) { description: "Success", startTime: date1, endTime: date2, - expectedCode: "200", + expectedCode: strconv.Itoa(http.StatusOK), request: exampleRequest(1), expectedErr: nil, expectedResponse: &http.Response{ - StatusCode: 200, + StatusCode: http.StatusOK, }, }, { description: "503 Service Unavailable", startTime: date1, endTime: date2, - expectedCode: "503", + expectedCode: strconv.Itoa(http.StatusServiceUnavailable), request: exampleRequest(1), expectedErr: nil, expectedResponse: &http.Response{ - StatusCode: 503, + StatusCode: http.StatusServiceUnavailable, }, }, { - description: "Network Error", - startTime: date1, - endTime: date2, - expectedCode: "network_err", - request: exampleRequest(1), - expectedErr: errTest, - expectedResponse: nil, + description: "Network Error", + startTime: date1, + endTime: date2, + expectedCode: strconv.Itoa(http.StatusServiceUnavailable), + request: exampleRequest(1), + expectedErr: errors.New(genericDoReason), + expectedResponse: &http.Response{ + StatusCode: http.StatusServiceUnavailable, + }, }, } @@ -69,7 +70,7 @@ func TestRoundTripper(t *testing.T) { fakeTime := mockTime(tc.startTime, tc.endTime) fakeHandler := new(mockHandler) fakeHist := new(mockHistogram) - histogramFunctionCall := []string{"code", tc.expectedCode} + histogramFunctionCall := []string{urlLabel, tc.request.URL.String(), reasonLabel, getDoErrReason(tc.expectedErr), codeLabel, tc.expectedCode} fakeLatency := date2.Sub(date1) fakeHist.On("With", histogramFunctionCall).Return().Once() fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() diff --git a/http_test.go b/http_test.go index caea6bd3..dff71c4c 100644 --- a/http_test.go +++ b/http_test.go @@ -106,7 +106,7 @@ func TestServerHandler(t *testing.T) { fakeTime := mockTime(tc.startTime, tc.endTime) fakeHist := new(mockHistogram) - histogramFunctionCall := []string{"event", tc.expectedEventType} + histogramFunctionCall := []string{eventLabel, tc.expectedEventType} fakeLatency := date2.Sub(date1) fakeHist.On("With", histogramFunctionCall).Return().Once() fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() @@ -162,11 +162,11 @@ func TestServerHandlerFixWrp(t *testing.T) { fakeIncomingContentTypeCount.On("Add", 1.0).Return() fakeModifiedWRPCount := new(mockCounter) - fakeModifiedWRPCount.On("With", []string{"reason", bothEmptyReason}).Return(fakeIncomingContentTypeCount).Once() + fakeModifiedWRPCount.On("With", []string{reasonLabel, bothEmptyReason}).Return(fakeIncomingContentTypeCount).Once() fakeModifiedWRPCount.On("Add", 1.0).Return().Once() fakeHist := new(mockHistogram) - histogramFunctionCall := []string{"event", "bob"} + histogramFunctionCall := []string{eventLabel, "bob"} fakeLatency := date2.Sub(date1) fakeHist.On("With", histogramFunctionCall).Return().Once() fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() @@ -215,7 +215,7 @@ func TestServerHandlerFull(t *testing.T) { fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(4) fakeHist := new(mockHistogram) - histogramFunctionCall := []string{"event", unknownEventType} + histogramFunctionCall := []string{eventLabel, unknownEventType} fakeLatency := date2.Sub(date1) fakeHist.On("With", histogramFunctionCall).Return().Once() fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() @@ -270,7 +270,7 @@ func TestServerEmptyPayload(t *testing.T) { fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(4) fakeHist := new(mockHistogram) - histogramFunctionCall := []string{"event", unknownEventType} + histogramFunctionCall := []string{eventLabel, unknownEventType} fakeLatency := date2.Sub(date1) fakeHist.On("With", histogramFunctionCall).Return().Once() fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() @@ -325,7 +325,7 @@ func TestServerUnableToReadBody(t *testing.T) { fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(4) fakeHist := new(mockHistogram) - histogramFunctionCall := []string{"event", unknownEventType} + histogramFunctionCall := []string{eventLabel, unknownEventType} fakeLatency := date2.Sub(date1) fakeHist.On("With", histogramFunctionCall).Return().Once() fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() @@ -380,7 +380,7 @@ func TestServerInvalidBody(t *testing.T) { fakeInvalidCount.On("Add", mock.AnythingOfType("float64")).Return().Once() fakeHist := new(mockHistogram) - histogramFunctionCall := []string{"event", unknownEventType} + histogramFunctionCall := []string{eventLabel, unknownEventType} fakeLatency := date2.Sub(date1) fakeHist.On("With", histogramFunctionCall).Return().Once() fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() @@ -415,7 +415,7 @@ func TestHandlerUnsupportedMediaType(t *testing.T) { date1 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 0, time.UTC) date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) - histogramFunctionCall := []string{"event", unknownEventType} + histogramFunctionCall := []string{eventLabel, unknownEventType} fakeLatency := date2.Sub(date1) assert := assert.New(t) diff --git a/metrics.go b/metrics.go index 7d575db2..0ec90b10 100644 --- a/metrics.go +++ b/metrics.go @@ -35,8 +35,29 @@ const ( emptyContentTypeReason = "empty_content_type" emptyUUIDReason = "empty_uuid" bothEmptyReason = "empty_uuid_and_content_type" - networkError = "network_err" unknownEventType = "unknown" + + // metric labels + + codeLabel = "code" + urlLabel = "url" + eventLabel = "event" + reasonLabel = "reason" + + // metric label values + + genericDoReason = "do_error" + deadlineExceededReason = "context_deadline_exceeded" + contextCanceledReason = "context_canceled" + addressErrReason = "address_error" + parseAddrErrReason = "parse_address_error" + invalidAddrReason = "invalid_address" + dnsErrReason = "dns_error" + hostNotFoundReason = "host_not_found" + connClosedReason = "connection_closed" + opErrReason = "op_error" + networkErrReason = "unknown_network_err" + noErr = "no_err" ) func Metrics() []xmetrics.Metric { @@ -65,98 +86,98 @@ func Metrics() []xmetrics.Metric { Name: ModifiedWRPCounter, Help: "Number of times a WRP was modified by Caduceus", Type: "counter", - LabelNames: []string{"reason"}, + LabelNames: []string{reasonLabel}, }, { Name: DeliveryRetryCounter, Help: "Number of delivery retries made", Type: "counter", - LabelNames: []string{"url", "event"}, + LabelNames: []string{urlLabel, eventLabel}, }, { Name: DeliveryRetryMaxGauge, Help: "Maximum number of delivery retries attempted", Type: "gauge", - LabelNames: []string{"url"}, + LabelNames: []string{urlLabel}, }, { Name: DeliveryCounter, Help: "Count of delivered messages to a url with a status code", Type: "counter", - LabelNames: []string{"url", "code", "event"}, + LabelNames: []string{urlLabel, reasonLabel, codeLabel, eventLabel}, }, { Name: SlowConsumerDroppedMsgCounter, Help: "Count of dropped messages due to a slow consumer", Type: "counter", - LabelNames: []string{"url", "reason"}, + LabelNames: []string{urlLabel, reasonLabel}, }, { Name: SlowConsumerCounter, Help: "Count of the number of times a consumer has been deemed too slow and is cut off.", Type: "counter", - LabelNames: []string{"url"}, + LabelNames: []string{urlLabel}, }, { Name: OutgoingQueueDepth, Help: "The depth of the queue per outgoing url.", Type: "gauge", - LabelNames: []string{"url"}, + LabelNames: []string{urlLabel}, }, { Name: IncomingEventTypeCounter, Help: "Incoming count of events by event type", Type: "counter", - LabelNames: []string{"event"}, + LabelNames: []string{eventLabel}, }, { Name: DropsDueToPanic, Help: "The outgoing message delivery pipeline panicked.", Type: "counter", - LabelNames: []string{"url"}, + LabelNames: []string{urlLabel}, }, { Name: ConsumerRenewalTimeGauge, Help: "Time when the consumer data was updated.", Type: "gauge", - LabelNames: []string{"url"}, + LabelNames: []string{urlLabel}, }, { Name: ConsumerDeliverUntilGauge, Help: "Time when the consumer's registration expires and events will be dropped.", Type: "gauge", - LabelNames: []string{"url"}, + LabelNames: []string{urlLabel}, }, { Name: ConsumerDropUntilGauge, Help: "The time after which events going to a customer will be delivered.", Type: "gauge", - LabelNames: []string{"url"}, + LabelNames: []string{urlLabel}, }, { Name: ConsumerDeliveryWorkersGauge, Help: "The number of active delivery workers for a particular customer.", Type: "gauge", - LabelNames: []string{"url"}, + LabelNames: []string{urlLabel}, }, { Name: ConsumerMaxDeliveryWorkersGauge, Help: "The maximum number of delivery workers available for a particular customer.", Type: "gauge", - LabelNames: []string{"url"}, + LabelNames: []string{urlLabel}, }, { Name: QueryDurationHistogram, Help: "A histogram of latencies for queries.", Type: "histogram", - LabelNames: []string{"url", "code"}, + LabelNames: []string{urlLabel, reasonLabel, codeLabel}, Buckets: []float64{0.0625, 0.125, .25, .5, 1, 5, 10, 20, 40, 80, 160}, }, { Name: IncomingQueueLatencyHistogram, Help: "A histogram of latencies for the incoming queue.", Type: "histogram", - LabelNames: []string{"event"}, + LabelNames: []string{eventLabel}, Buckets: []float64{0.0625, 0.125, .25, .5, 1, 5, 10, 20, 40, 80, 160}, }, } @@ -165,22 +186,22 @@ func Metrics() []xmetrics.Metric { func CreateOutbounderMetrics(m CaduceusMetricsRegistry, c *CaduceusOutboundSender) { c.deliveryCounter = m.NewCounter(DeliveryCounter) c.deliveryRetryCounter = m.NewCounter(DeliveryRetryCounter) - c.deliveryRetryMaxGauge = m.NewGauge(DeliveryRetryMaxGauge).With("url", c.id) - c.cutOffCounter = m.NewCounter(SlowConsumerCounter).With("url", c.id) - c.droppedQueueFullCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "queue_full") - c.droppedExpiredCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "expired") - c.droppedExpiredBeforeQueueCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "expired_before_queueing") + c.deliveryRetryMaxGauge = m.NewGauge(DeliveryRetryMaxGauge).With(urlLabel, c.id) + c.cutOffCounter = m.NewCounter(SlowConsumerCounter).With(urlLabel, c.id) + c.droppedQueueFullCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With(urlLabel, c.id, reasonLabel, "queue_full") + c.droppedExpiredCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With(urlLabel, c.id, reasonLabel, "expired") + c.droppedExpiredBeforeQueueCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With(urlLabel, c.id, reasonLabel, "expired_before_queueing") - c.droppedCutoffCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "cut_off") - c.droppedInvalidConfig = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "invalid_config") - c.droppedNetworkErrCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", networkError) - c.droppedPanic = m.NewCounter(DropsDueToPanic).With("url", c.id) - c.queueDepthGauge = m.NewGauge(OutgoingQueueDepth).With("url", c.id) - c.renewalTimeGauge = m.NewGauge(ConsumerRenewalTimeGauge).With("url", c.id) - c.deliverUntilGauge = m.NewGauge(ConsumerDeliverUntilGauge).With("url", c.id) - c.dropUntilGauge = m.NewGauge(ConsumerDropUntilGauge).With("url", c.id) - c.currentWorkersGauge = m.NewGauge(ConsumerDeliveryWorkersGauge).With("url", c.id) - c.maxWorkersGauge = m.NewGauge(ConsumerMaxDeliveryWorkersGauge).With("url", c.id) + c.droppedCutoffCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With(urlLabel, c.id, reasonLabel, "cut_off") + c.droppedInvalidConfig = m.NewCounter(SlowConsumerDroppedMsgCounter).With(urlLabel, c.id, reasonLabel, "invalid_config") + c.droppedMessage = m.NewCounter(SlowConsumerDroppedMsgCounter) + c.droppedPanic = m.NewCounter(DropsDueToPanic).With(urlLabel, c.id) + c.queueDepthGauge = m.NewGauge(OutgoingQueueDepth).With(urlLabel, c.id) + c.renewalTimeGauge = m.NewGauge(ConsumerRenewalTimeGauge).With(urlLabel, c.id) + c.deliverUntilGauge = m.NewGauge(ConsumerDeliverUntilGauge).With(urlLabel, c.id) + c.dropUntilGauge = m.NewGauge(ConsumerDropUntilGauge).With(urlLabel, c.id) + c.currentWorkersGauge = m.NewGauge(ConsumerDeliveryWorkersGauge).With(urlLabel, c.id) + c.maxWorkersGauge = m.NewGauge(ConsumerMaxDeliveryWorkersGauge).With(urlLabel, c.id) } func NewMetricWrapperMeasures(m CaduceusMetricsRegistry) metrics.Histogram { diff --git a/outboundSender.go b/outboundSender.go index ac67c2d7..f6cfa89e 100644 --- a/outboundSender.go +++ b/outboundSender.go @@ -122,7 +122,7 @@ type CaduceusOutboundSender struct { droppedCutoffCounter metrics.Counter droppedExpiredCounter metrics.Counter droppedExpiredBeforeQueueCounter metrics.Counter - droppedNetworkErrCounter metrics.Counter + droppedMessage metrics.Counter droppedInvalidConfig metrics.Counter droppedPanic metrics.Counter cutOffCounter metrics.Counter @@ -612,7 +612,7 @@ func (obs *CaduceusOutboundSender) send(urls *ring.Ring, secret, acceptType stri Logger: obs.logger, Retries: obs.deliveryRetries, Interval: obs.deliveryInterval, - Counter: obs.deliveryRetryCounter.With("url", obs.id, "event", event), + Counter: obs.deliveryRetryCounter.With(urlLabel, obs.id, eventLabel, event), // Always retry on failures up to the max count. ShouldRetry: xhttp.ShouldRetry, ShouldRetryStatus: xhttp.RetryCodes, @@ -636,12 +636,18 @@ func (obs *CaduceusOutboundSender) send(urls *ring.Ring, secret, acceptType stri client := obs.clientMiddleware(doerFunc(retryer)) resp, err := client.Do(req) - code := "failure" + code := genericDoReason + reason := noErr l := obs.logger if nil != err { // Report failure - obs.droppedNetworkErrCounter.Add(1.0) - l = obs.logger.With(zap.Error(err)) + reason = getDoErrReason(err) + if resp != nil { + code = strconv.Itoa(resp.StatusCode) + } + + obs.droppedMessage.With(urlLabel, req.URL.String(), reasonLabel, reason).Add(1) + l = obs.logger.With(zap.String(reasonLabel, reason), zap.Error(err)) } else { // Report Result code = strconv.Itoa(resp.StatusCode) @@ -652,8 +658,10 @@ func (obs *CaduceusOutboundSender) send(urls *ring.Ring, secret, acceptType stri io.Copy(io.Discard, resp.Body) resp.Body.Close() } + + obs.deliveryCounter.With(urlLabel, req.URL.String(), reasonLabel, reason, codeLabel, code, eventLabel, event).Add(1.0) } - obs.deliveryCounter.With("url", obs.id, "code", code, "event", event).Add(1.0) + l.Debug("event sent-ish", zap.String("event.source", msg.Source), zap.String("event.destination", msg.Destination), zap.String("code", code), zap.String("url", req.URL.String())) } diff --git a/outboundSender_test.go b/outboundSender_test.go index 803ae639..81c4d175 100644 --- a/outboundSender_test.go +++ b/outboundSender_test.go @@ -23,6 +23,8 @@ import ( "time" ) +// TODO Improve all of these tests + // Make a simple RoundTrip implementation that let's me short-circuit the network type transport struct { i int32 @@ -60,8 +62,8 @@ func simpleSetup(trans *transport, cutOffPeriod time.Duration, matcher []string) // 3. Trigger the On method on that "mockMetric" with various different cases of that metric, // in both senderWrapper_test.go and outboundSender_test.go // i.e: -// case 1: On("With", []string{"event", iot} -// case 2: On("With", []string{"event", unknown} +// case 1: On("With", []string{eventLabel, iot} +// case 2: On("With", []string{eventLabel, unknown} // 4. Mimic the metric behavior using On: // fakeSlow.On("Add", 1.0).Return() func simpleFactorySetup(trans *transport, cutOffPeriod time.Duration, matcher []string) *OutboundSenderFactory { @@ -90,34 +92,34 @@ func simpleFactorySetup(trans *transport, cutOffPeriod time.Duration, matcher [] // test dc metric fakeDC := new(mockCounter) - fakeDC.On("With", []string{"url", w.Webhook.Config.URL, "code", "200", "event", "test"}).Return(fakeDC). - On("With", []string{"url", w.Webhook.Config.URL, "code", "200", "event", "iot"}).Return(fakeDC). - On("With", []string{"url", w.Webhook.Config.URL, "code", "200", "event", "unknown"}).Return(fakeDC). - On("With", []string{"url", w.Webhook.Config.URL, "code", "failure", "event", "iot"}).Return(fakeDC). - On("With", []string{"url", w.Webhook.Config.URL, "event", "test"}).Return(fakeDC). - On("With", []string{"url", w.Webhook.Config.URL, "event", "iot"}).Return(fakeDC). - On("With", []string{"url", w.Webhook.Config.URL, "event", "unknown"}).Return(fakeDC). - On("With", []string{"url", w.Webhook.Config.URL, "code", "201"}).Return(fakeDC). - On("With", []string{"url", w.Webhook.Config.URL, "code", "202"}).Return(fakeDC). - On("With", []string{"url", w.Webhook.Config.URL, "code", "204"}).Return(fakeDC). - On("With", []string{"url", w.Webhook.Config.URL, "code", "429", "event", "iot"}).Return(fakeDC). - On("With", []string{"url", w.Webhook.Config.URL, "code", "failure"}).Return(fakeDC) + fakeDC.On("With", []string{urlLabel, w.Webhook.Config.URL, codeLabel, "200", eventLabel, "test"}).Return(fakeDC). + On("With", []string{urlLabel, w.Webhook.Config.URL, codeLabel, "200", eventLabel, "iot"}).Return(fakeDC). + On("With", []string{urlLabel, w.Webhook.Config.URL, codeLabel, "200", eventLabel, "unknown"}).Return(fakeDC). + On("With", []string{urlLabel, w.Webhook.Config.URL, codeLabel, "failure", eventLabel, "iot"}).Return(fakeDC). + On("With", []string{urlLabel, w.Webhook.Config.URL, eventLabel, "test"}).Return(fakeDC). + On("With", []string{urlLabel, w.Webhook.Config.URL, eventLabel, "iot"}).Return(fakeDC). + On("With", []string{urlLabel, w.Webhook.Config.URL, eventLabel, "unknown"}).Return(fakeDC). + On("With", []string{urlLabel, w.Webhook.Config.URL, codeLabel, "201"}).Return(fakeDC). + On("With", []string{urlLabel, w.Webhook.Config.URL, codeLabel, "202"}).Return(fakeDC). + On("With", []string{urlLabel, w.Webhook.Config.URL, codeLabel, "204"}).Return(fakeDC). + On("With", []string{urlLabel, w.Webhook.Config.URL, codeLabel, "429", eventLabel, "iot"}).Return(fakeDC). + On("With", []string{urlLabel, w.Webhook.Config.URL, codeLabel, "failure"}).Return(fakeDC) fakeDC.On("Add", 1.0).Return() fakeDC.On("Add", 0.0).Return() // test slow metric fakeSlow := new(mockCounter) - fakeSlow.On("With", []string{"url", w.Webhook.Config.URL}).Return(fakeSlow) + fakeSlow.On("With", []string{urlLabel, w.Webhook.Config.URL}).Return(fakeSlow) fakeSlow.On("Add", 1.0).Return() // test dropped metric fakeDroppedSlow := new(mockCounter) - fakeDroppedSlow.On("With", []string{"url", w.Webhook.Config.URL, "reason", "queue_full"}).Return(fakeDroppedSlow) - fakeDroppedSlow.On("With", []string{"url", w.Webhook.Config.URL, "reason", "cut_off"}).Return(fakeDroppedSlow) - fakeDroppedSlow.On("With", []string{"url", w.Webhook.Config.URL, "reason", "expired"}).Return(fakeDroppedSlow) - fakeDroppedSlow.On("With", []string{"url", w.Webhook.Config.URL, "reason", "expired_before_queueing"}).Return(fakeDroppedSlow) - fakeDroppedSlow.On("With", []string{"url", w.Webhook.Config.URL, "reason", "invalid_config"}).Return(fakeDroppedSlow) - fakeDroppedSlow.On("With", []string{"url", w.Webhook.Config.URL, "reason", "network_err"}).Return(fakeDroppedSlow) + fakeDroppedSlow.On("With", []string{urlLabel, w.Webhook.Config.URL, reasonLabel, "queue_full"}).Return(fakeDroppedSlow) + fakeDroppedSlow.On("With", []string{urlLabel, w.Webhook.Config.URL, reasonLabel, "cut_off"}).Return(fakeDroppedSlow) + fakeDroppedSlow.On("With", []string{urlLabel, w.Webhook.Config.URL, reasonLabel, "expired"}).Return(fakeDroppedSlow) + fakeDroppedSlow.On("With", []string{urlLabel, w.Webhook.Config.URL, reasonLabel, "expired_before_queueing"}).Return(fakeDroppedSlow) + fakeDroppedSlow.On("With", []string{urlLabel, w.Webhook.Config.URL, reasonLabel, "invalid_config"}).Return(fakeDroppedSlow) + fakeDroppedSlow.On("With", []string{urlLabel, w.Webhook.Config.URL, reasonLabel, "network_err"}).Return(fakeDroppedSlow) fakeDroppedSlow.On("Add", mock.Anything).Return() // IncomingContentType cases @@ -130,18 +132,18 @@ func simpleFactorySetup(trans *transport, cutOffPeriod time.Duration, matcher [] // QueueDepth case fakeQdepth := new(mockGauge) - fakeQdepth.On("With", []string{"url", w.Webhook.Config.URL}).Return(fakeQdepth) + fakeQdepth.On("With", []string{urlLabel, w.Webhook.Config.URL}).Return(fakeQdepth) fakeQdepth.On("Add", 1.0).Return().On("Add", -1.0).Return() // DropsDueToPanic case fakePanicDrop := new(mockCounter) - fakePanicDrop.On("With", []string{"url", w.Webhook.Config.URL}).Return(fakePanicDrop) + fakePanicDrop.On("With", []string{urlLabel, w.Webhook.Config.URL}).Return(fakePanicDrop) fakePanicDrop.On("Add", 1.0).Return() // Fake Latency fakeLatency := new(mockHistogram) - fakeLatency.On("With", []string{"url", w.Webhook.Config.URL, "code", "200"}).Return(fakeLatency) - fakeLatency.On("With", []string{"url", w.Webhook.Config.URL}).Return(fakeLatency) + fakeLatency.On("With", []string{urlLabel, w.Webhook.Config.URL, codeLabel, "200"}).Return(fakeLatency) + fakeLatency.On("With", []string{urlLabel, w.Webhook.Config.URL}).Return(fakeLatency) fakeLatency.On("Observe", 1.0).Return() // Build a registry and register all fake metrics, these are synymous with the metrics in diff --git a/senderWrapper.go b/senderWrapper.go index 02683797..4122511b 100644 --- a/senderWrapper.go +++ b/senderWrapper.go @@ -155,7 +155,7 @@ func (sw *CaduceusSenderWrapper) Update(list []ancla.InternalWebhook) { sender, ok := sw.senders[inValue.ID] if !ok { osf.Listener = inValue.Listener - metricWrapper, err := newMetricWrapper(time.Now, osf.QueryLatency.With("url", inValue.ID)) + metricWrapper, err := newMetricWrapper(time.Now, osf.QueryLatency) if err != nil { continue @@ -177,7 +177,7 @@ func (sw *CaduceusSenderWrapper) Queue(msg *wrp.Message) { sw.mutex.RLock() defer sw.mutex.RUnlock() - sw.eventType.With("event", msg.FindEventStringSubMatch()).Add(1) + sw.eventType.With(eventLabel, msg.FindEventStringSubMatch()).Add(1) for _, v := range sw.senders { v.Queue(msg) diff --git a/senderWrapper_test.go b/senderWrapper_test.go index 4c48e764..c6e95ddc 100644 --- a/senderWrapper_test.go +++ b/senderWrapper_test.go @@ -58,42 +58,42 @@ func getFakeFactory() *SenderWrapperFactory { fakeGauge := new(mockGauge) fakeGauge.On("Add", 1.0).Return(). On("Add", -1.0).Return(). - //On("With", []string{"url", "unknown"}).Return(fakeGauge). - On("With", []string{"url", "http://localhost:8888/foo"}).Return(fakeGauge). - On("With", []string{"url", "http://localhost:9999/foo"}).Return(fakeGauge) + //On("With", []string{urlLabel, "unknown"}).Return(fakeGauge). + On("With", []string{urlLabel, "http://localhost:8888/foo"}).Return(fakeGauge). + On("With", []string{urlLabel, "http://localhost:9999/foo"}).Return(fakeGauge) // Fake Latency fakeLatency := new(mockHistogram) - fakeLatency.On("With", []string{"url", "http://localhost:8888/foo", "code", "200"}).Return(fakeLatency) - fakeLatency.On("With", []string{"url", "http://localhost:9999/foo", "code", "200"}).Return(fakeLatency) - fakeLatency.On("With", []string{"url", "http://localhost:8888/foo"}).Return(fakeLatency) - fakeLatency.On("With", []string{"url", "http://localhost:9999/foo"}).Return(fakeLatency) + fakeLatency.On("With", []string{urlLabel, "http://localhost:8888/foo", codeLabel, "200"}).Return(fakeLatency) + fakeLatency.On("With", []string{urlLabel, "http://localhost:9999/foo", codeLabel, "200"}).Return(fakeLatency) + fakeLatency.On("With", []string{urlLabel, "http://localhost:8888/foo"}).Return(fakeLatency) + fakeLatency.On("With", []string{urlLabel, "http://localhost:9999/foo"}).Return(fakeLatency) fakeLatency.On("Observe", 1.0).Return() fakeIgnore := new(mockCounter) fakeIgnore.On("Add", 1.0).Return().On("Add", 0.0).Return(). - On("With", []string{"url", "http://localhost:8888/foo"}).Return(fakeIgnore). - On("With", []string{"url", "http://localhost:9999/foo"}).Return(fakeIgnore). - On("With", []string{"url", "http://localhost:8888/foo", "event", "unknown"}).Return(fakeIgnore). - On("With", []string{"url", "http://localhost:9999/foo", "event", "unknown"}).Return(fakeIgnore). - On("With", []string{"url", "http://localhost:8888/foo", "reason", "cut_off"}).Return(fakeIgnore). - On("With", []string{"url", "http://localhost:8888/foo", "reason", "queue_full"}).Return(fakeIgnore). - On("With", []string{"url", "http://localhost:8888/foo", "reason", "expired"}).Return(fakeIgnore). - On("With", []string{"url", "http://localhost:8888/foo", "reason", "expired_before_queueing"}).Return(fakeIgnore). - On("With", []string{"url", "http://localhost:8888/foo", "reason", "network_err"}).Return(fakeIgnore). - On("With", []string{"url", "http://localhost:8888/foo", "reason", "invalid_config"}).Return(fakeIgnore). - On("With", []string{"url", "http://localhost:9999/foo", "reason", "cut_off"}).Return(fakeIgnore). - On("With", []string{"url", "http://localhost:9999/foo", "reason", "queue_full"}).Return(fakeIgnore). - On("With", []string{"url", "http://localhost:9999/foo", "reason", "expired"}).Return(fakeIgnore). - On("With", []string{"url", "http://localhost:9999/foo", "reason", "expired_before_queueing"}).Return(fakeIgnore). - On("With", []string{"url", "http://localhost:9999/foo", "reason", "network_err"}).Return(fakeIgnore). - On("With", []string{"url", "http://localhost:9999/foo", "reason", "invalid_config"}).Return(fakeIgnore). - On("With", []string{"url", "http://localhost:8888/foo", "code", "200", "event", "unknown"}).Return(fakeIgnore). - On("With", []string{"url", "http://localhost:9999/foo", "code", "200", "event", "unknown"}).Return(fakeIgnore). - On("With", []string{"event", "iot"}).Return(fakeIgnore). - On("With", []string{"event", "test/extra-stuff"}).Return(fakeIgnore). - On("With", []string{"event", "bob/magic/dog"}).Return(fakeIgnore). - On("With", []string{"event", "unknown"}).Return(fakeIgnore). + On("With", []string{urlLabel, "http://localhost:8888/foo"}).Return(fakeIgnore). + On("With", []string{urlLabel, "http://localhost:9999/foo"}).Return(fakeIgnore). + On("With", []string{urlLabel, "http://localhost:8888/foo", eventLabel, "unknown"}).Return(fakeIgnore). + On("With", []string{urlLabel, "http://localhost:9999/foo", eventLabel, "unknown"}).Return(fakeIgnore). + On("With", []string{urlLabel, "http://localhost:8888/foo", reasonLabel, "cut_off"}).Return(fakeIgnore). + On("With", []string{urlLabel, "http://localhost:8888/foo", reasonLabel, "queue_full"}).Return(fakeIgnore). + On("With", []string{urlLabel, "http://localhost:8888/foo", reasonLabel, "expired"}).Return(fakeIgnore). + On("With", []string{urlLabel, "http://localhost:8888/foo", reasonLabel, "expired_before_queueing"}).Return(fakeIgnore). + On("With", []string{urlLabel, "http://localhost:8888/foo", reasonLabel, "network_err"}).Return(fakeIgnore). + On("With", []string{urlLabel, "http://localhost:8888/foo", reasonLabel, "invalid_config"}).Return(fakeIgnore). + On("With", []string{urlLabel, "http://localhost:9999/foo", reasonLabel, "cut_off"}).Return(fakeIgnore). + On("With", []string{urlLabel, "http://localhost:9999/foo", reasonLabel, "queue_full"}).Return(fakeIgnore). + On("With", []string{urlLabel, "http://localhost:9999/foo", reasonLabel, "expired"}).Return(fakeIgnore). + On("With", []string{urlLabel, "http://localhost:9999/foo", reasonLabel, "expired_before_queueing"}).Return(fakeIgnore). + On("With", []string{urlLabel, "http://localhost:9999/foo", reasonLabel, "network_err"}).Return(fakeIgnore). + On("With", []string{urlLabel, "http://localhost:9999/foo", reasonLabel, "invalid_config"}).Return(fakeIgnore). + On("With", []string{urlLabel, "http://localhost:8888/foo", codeLabel, "200", eventLabel, "unknown"}).Return(fakeIgnore). + On("With", []string{urlLabel, "http://localhost:9999/foo", codeLabel, "200", eventLabel, "unknown"}).Return(fakeIgnore). + On("With", []string{eventLabel, "iot"}).Return(fakeIgnore). + On("With", []string{eventLabel, "test/extra-stuff"}).Return(fakeIgnore). + On("With", []string{eventLabel, "bob/magic/dog"}).Return(fakeIgnore). + On("With", []string{eventLabel, "unknown"}).Return(fakeIgnore). On("With", []string{"content_type", "msgpack"}).Return(fakeIgnore). On("With", []string{"content_type", "json"}).Return(fakeIgnore). On("With", []string{"content_type", "http"}).Return(fakeIgnore). From 2e49bb65b2d274a239a96d30a6bfd6baa10b368a Mon Sep 17 00:00:00 2001 From: Owen Cabalceta Date: Mon, 26 Feb 2024 17:19:38 -0500 Subject: [PATCH 2/6] feat: improve deliveryCounter metric --- httpClient.go | 28 ++++++++++++++-------------- metrics.go | 5 ++++- outboundSender.go | 12 +++++++----- 3 files changed, 25 insertions(+), 20 deletions(-) diff --git a/httpClient.go b/httpClient.go index 428b724f..26cadbc2 100644 --- a/httpClient.go +++ b/httpClient.go @@ -56,8 +56,9 @@ func (m *metricWrapper) roundTripper(next httpClient) httpClient { startTime := m.now() resp, err := next.Do(req) endTime := m.now() + code := genericDoReason - reason := noErr + reason := noErrReason if err != nil { reason = getDoErrReason(err) if resp != nil { @@ -74,31 +75,30 @@ func (m *metricWrapper) roundTripper(next httpClient) httpClient { }) } -func getDoErrReason(e error) string { +func getDoErrReason(err error) string { var d *net.DNSError - if e == nil { - return noErr - } - if errors.Is(e, context.DeadlineExceeded) { + if err == nil { + return noErrReason + } else if errors.Is(err, context.DeadlineExceeded) { return deadlineExceededReason - } else if errors.Is(e, context.Canceled) { + } else if errors.Is(err, context.Canceled) { return contextCanceledReason - } else if errors.Is(e, &net.AddrError{}) { + } else if errors.Is(err, &net.AddrError{}) { return addressErrReason - } else if errors.Is(e, &net.ParseError{}) { + } else if errors.Is(err, &net.ParseError{}) { return parseAddrErrReason - } else if errors.Is(e, net.InvalidAddrError("")) { + } else if errors.Is(err, net.InvalidAddrError("")) { return invalidAddrReason - } else if errors.As(e, &d) { + } else if errors.As(err, &d) { if d.IsNotFound { return hostNotFoundReason } return dnsErrReason - } else if errors.Is(e, net.ErrClosed) { + } else if errors.Is(err, net.ErrClosed) { return connClosedReason - } else if errors.Is(e, &net.OpError{}) { + } else if errors.Is(err, &net.OpError{}) { return opErrReason - } else if errors.Is(e, net.UnknownNetworkError("")) { + } else if errors.Is(err, net.UnknownNetworkError("")) { return networkErrReason } diff --git a/metrics.go b/metrics.go index 0ec90b10..6e412d1f 100644 --- a/metrics.go +++ b/metrics.go @@ -57,7 +57,10 @@ const ( connClosedReason = "connection_closed" opErrReason = "op_error" networkErrReason = "unknown_network_err" - noErr = "no_err" + noErrReason = "no_err" + + // dropped message codes + messageDroppedCode = "message_dropped" ) func Metrics() []xmetrics.Metric { diff --git a/outboundSender.go b/outboundSender.go index f6cfa89e..970c7a20 100644 --- a/outboundSender.go +++ b/outboundSender.go @@ -636,8 +636,9 @@ func (obs *CaduceusOutboundSender) send(urls *ring.Ring, secret, acceptType stri client := obs.clientMiddleware(doerFunc(retryer)) resp, err := client.Do(req) - code := genericDoReason - reason := noErr + var deliveryCounterLabels []string + code := messageDroppedCode + reason := noErrReason l := obs.logger if nil != err { // Report failure @@ -646,12 +647,12 @@ func (obs *CaduceusOutboundSender) send(urls *ring.Ring, secret, acceptType stri code = strconv.Itoa(resp.StatusCode) } - obs.droppedMessage.With(urlLabel, req.URL.String(), reasonLabel, reason).Add(1) l = obs.logger.With(zap.String(reasonLabel, reason), zap.Error(err)) + deliveryCounterLabels = []string{urlLabel, req.URL.String(), reasonLabel, reason, codeLabel, code, eventLabel, event} + obs.droppedMessage.With(urlLabel, req.URL.String(), reasonLabel, reason).Add(1) } else { // Report Result code = strconv.Itoa(resp.StatusCode) - // read until the response is complete before closing to allow // connection reuse if nil != resp.Body { @@ -659,9 +660,10 @@ func (obs *CaduceusOutboundSender) send(urls *ring.Ring, secret, acceptType stri resp.Body.Close() } - obs.deliveryCounter.With(urlLabel, req.URL.String(), reasonLabel, reason, codeLabel, code, eventLabel, event).Add(1.0) + deliveryCounterLabels = []string{urlLabel, req.URL.String(), reasonLabel, reason, codeLabel, code, eventLabel, event} } + obs.deliveryCounter.With(deliveryCounterLabels...).Add(1.0) l.Debug("event sent-ish", zap.String("event.source", msg.Source), zap.String("event.destination", msg.Destination), zap.String("code", code), zap.String("url", req.URL.String())) } From e3a550bf98318ad3a1bdb64c2566e7f68bbe2627 Mon Sep 17 00:00:00 2001 From: Owen Cabalceta Date: Tue, 27 Feb 2024 10:04:19 -0500 Subject: [PATCH 3/6] feat: add missing droppedMessage count --- metrics.go | 27 ++++++++++++++------------- outboundSender.go | 1 + 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/metrics.go b/metrics.go index 6e412d1f..f3b30599 100644 --- a/metrics.go +++ b/metrics.go @@ -45,19 +45,20 @@ const ( reasonLabel = "reason" // metric label values - - genericDoReason = "do_error" - deadlineExceededReason = "context_deadline_exceeded" - contextCanceledReason = "context_canceled" - addressErrReason = "address_error" - parseAddrErrReason = "parse_address_error" - invalidAddrReason = "invalid_address" - dnsErrReason = "dns_error" - hostNotFoundReason = "host_not_found" - connClosedReason = "connection_closed" - opErrReason = "op_error" - networkErrReason = "unknown_network_err" - noErrReason = "no_err" + // dropped messages reasons + genericDoReason = "do_error" + deadlineExceededReason = "context_deadline_exceeded" + contextCanceledReason = "context_canceled" + addressErrReason = "address_error" + parseAddrErrReason = "parse_address_error" + invalidAddrReason = "invalid_address" + dnsErrReason = "dns_error" + hostNotFoundReason = "host_not_found" + connClosedReason = "connection_closed" + opErrReason = "op_error" + networkErrReason = "unknown_network_err" + updateRequestURLFailedReason = "update_request_url_failed" + noErrReason = "no_err" // dropped message codes messageDroppedCode = "message_dropped" diff --git a/outboundSender.go b/outboundSender.go index 970c7a20..6d6ecabe 100644 --- a/outboundSender.go +++ b/outboundSender.go @@ -623,6 +623,7 @@ func (obs *CaduceusOutboundSender) send(urls *ring.Ring, secret, acceptType stri urls = urls.Next() tmp, err := url.Parse(urls.Value.(string)) if err != nil { + obs.droppedMessage.With(urlLabel, req.URL.String(), reasonLabel, updateRequestURLFailedReason).Add(1) obs.logger.Error("failed to update url", zap.String("url", urls.Value.(string)), zap.Error(err)) return } From 2d0cea7fca26ae57dcaad9a24488fe81bb2b523b Mon Sep 17 00:00:00 2001 From: Owen Cabalceta Date: Tue, 27 Feb 2024 11:20:11 -0500 Subject: [PATCH 4/6] feat: add EOF dropped message reasoning --- httpClient.go | 3 +++ metrics.go | 27 ++++++++++++++------------- 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/httpClient.go b/httpClient.go index 26cadbc2..f047ff2f 100644 --- a/httpClient.go +++ b/httpClient.go @@ -9,6 +9,7 @@ import ( "net" "net/http" "strconv" + "strings" "time" "github.com/go-kit/kit/metrics" @@ -100,6 +101,8 @@ func getDoErrReason(err error) string { return opErrReason } else if errors.Is(err, net.UnknownNetworkError("")) { return networkErrReason + } else if strings.TrimSpace(strings.ToLower(err.Error())) == "eof" { + return connectionUnexpectedlyClosedEOFReason } return genericDoReason diff --git a/metrics.go b/metrics.go index f3b30599..7aecfc1e 100644 --- a/metrics.go +++ b/metrics.go @@ -46,19 +46,20 @@ const ( // metric label values // dropped messages reasons - genericDoReason = "do_error" - deadlineExceededReason = "context_deadline_exceeded" - contextCanceledReason = "context_canceled" - addressErrReason = "address_error" - parseAddrErrReason = "parse_address_error" - invalidAddrReason = "invalid_address" - dnsErrReason = "dns_error" - hostNotFoundReason = "host_not_found" - connClosedReason = "connection_closed" - opErrReason = "op_error" - networkErrReason = "unknown_network_err" - updateRequestURLFailedReason = "update_request_url_failed" - noErrReason = "no_err" + genericDoReason = "do_error" + deadlineExceededReason = "context_deadline_exceeded" + contextCanceledReason = "context_canceled" + addressErrReason = "address_error" + parseAddrErrReason = "parse_address_error" + invalidAddrReason = "invalid_address" + dnsErrReason = "dns_error" + hostNotFoundReason = "host_not_found" + connClosedReason = "connection_closed" + opErrReason = "op_error" + networkErrReason = "unknown_network_err" + updateRequestURLFailedReason = "update_request_url_failed" + connectionUnexpectedlyClosedEOFReason = "connection_unexpectedly_closed_eof" + noErrReason = "no_err" // dropped message codes messageDroppedCode = "message_dropped" From e2b43e93367371902366b3c04bf92f00bb320060 Mon Sep 17 00:00:00 2001 From: Owen Cabalceta Date: Tue, 27 Feb 2024 11:31:58 -0500 Subject: [PATCH 5/6] Update httpClient.go --- httpClient.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/httpClient.go b/httpClient.go index f047ff2f..33502ae7 100644 --- a/httpClient.go +++ b/httpClient.go @@ -101,7 +101,7 @@ func getDoErrReason(err error) string { return opErrReason } else if errors.Is(err, net.UnknownNetworkError("")) { return networkErrReason - } else if strings.TrimSpace(strings.ToLower(err.Error())) == "eof" { + } else if err := errors.Unwrap(err).Error(); strings.TrimSpace(strings.ToLower(err)) == "eof" { return connectionUnexpectedlyClosedEOFReason } From 8d4d453aede953a99f20c909b06ec0fbf4540227 Mon Sep 17 00:00:00 2001 From: Owen Cabalceta Date: Tue, 27 Feb 2024 11:37:15 -0500 Subject: [PATCH 6/6] chore: typo --- httpClient.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/httpClient.go b/httpClient.go index 33502ae7..6bb8025f 100644 --- a/httpClient.go +++ b/httpClient.go @@ -8,6 +8,7 @@ import ( "errors" "net" "net/http" + "net/url" "strconv" "strings" "time" @@ -101,8 +102,10 @@ func getDoErrReason(err error) string { return opErrReason } else if errors.Is(err, net.UnknownNetworkError("")) { return networkErrReason - } else if err := errors.Unwrap(err).Error(); strings.TrimSpace(strings.ToLower(err)) == "eof" { - return connectionUnexpectedlyClosedEOFReason + } else if err, ok := err.(*url.Error); ok { + if strings.TrimSpace(strings.ToLower(err.Unwrap().Error())) == "eof" { + return connectionUnexpectedlyClosedEOFReason + } } return genericDoReason