diff --git a/CHANGELOG.md b/CHANGELOG.md index ff4bf8db..d5016a3d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). ## [Unreleased] +- Added latency metric, which Tracks the time spent waiting on outbound client URLs to respond. [#312](https://github.com/xmidt-org/caduceus/pull/312) ## [v0.6.6] - Fix a missing return after an invalid utf8 string is handled. [#315](https://github.com/xmidt-org/caduceus/pull/315) diff --git a/caduceus_type.go b/caduceus_type.go index 6ae51155..f5c15c58 100644 --- a/caduceus_type.go +++ b/caduceus_type.go @@ -57,6 +57,7 @@ type SenderConfig struct { type CaduceusMetricsRegistry interface { NewCounter(name string) metrics.Counter NewGauge(name string) metrics.Gauge + NewHistogram(name string, buckets int) metrics.Histogram } type RequestHandler interface { diff --git a/httpClient.go b/httpClient.go new file mode 100644 index 00000000..e6a96216 --- /dev/null +++ b/httpClient.go @@ -0,0 +1,66 @@ +package main + +import ( + "errors" + "net/http" + "strconv" + "time" + + "github.com/go-kit/kit/metrics" +) + +var ( + errNilHistogram = errors.New("histogram cannot be nil") +) + +type httpClient interface { + Do(*http.Request) (*http.Response, error) +} + +func nopHTTPClient(next httpClient) httpClient { + return next +} + +// DoerFunc implements HTTPClient +type doerFunc func(*http.Request) (*http.Response, error) + +func (d doerFunc) Do(req *http.Request) (*http.Response, error) { + return d(req) +} + +type metricWrapper struct { + now func() time.Time + queryLatency metrics.Histogram +} + +func newMetricWrapper(now func() time.Time, queryLatency metrics.Histogram) (*metricWrapper, error) { + if now == nil { + now = time.Now + } + if queryLatency == nil { + return nil, errNilHistogram + } + return &metricWrapper{ + now: now, + queryLatency: queryLatency, + }, nil +} + +func (m *metricWrapper) roundTripper(next httpClient) httpClient { + return doerFunc(func(req *http.Request) (*http.Response, error) { + startTime := m.now() + resp, err := next.Do(req) + endTime := m.now() + code := networkError + + if err == nil { + code = strconv.Itoa(resp.StatusCode) + } + + // find time difference, add to metric + var latency = endTime.Sub(startTime) + m.queryLatency.With("code", code).Observe(latency.Seconds()) + + return resp, err + }) +} diff --git a/httpClient_test.go b/httpClient_test.go new file mode 100644 index 00000000..bb1898f7 --- /dev/null +++ b/httpClient_test.go @@ -0,0 +1,178 @@ +/** + * Copyright 2022 Comcast Cable Communications Management, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package main + +import ( + "errors" + "io" + "io/ioutil" + "net/http" + "testing" + "time" + + "github.com/go-kit/kit/metrics" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestRoundTripper(t *testing.T) { + errTest := errors.New("test error") + date1 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 0, time.UTC) + date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) + + tests := []struct { + description string + startTime time.Time + endTime time.Time + expectedCode string + request *http.Request + expectedErr error + expectedResponse *http.Response + }{ + { + description: "Success", + startTime: date1, + endTime: date2, + expectedCode: "200", + request: exampleRequest(1), + expectedErr: nil, + expectedResponse: &http.Response{ + StatusCode: 200, + }, + }, + { + description: "503 Service Unavailable", + startTime: date1, + endTime: date2, + expectedCode: "503", + request: exampleRequest(1), + expectedErr: nil, + expectedResponse: &http.Response{ + StatusCode: 503, + }, + }, + { + description: "Network Error", + startTime: date1, + endTime: date2, + expectedCode: "network_err", + request: exampleRequest(1), + expectedErr: errTest, + expectedResponse: nil, + }, + } + + for _, tc := range tests { + + t.Run(tc.description, func(t *testing.T) { + + fakeTime := mockTime(tc.startTime, tc.endTime) + fakeHandler := new(mockHandler) + fakeHist := new(mockHistogram) + histogramFunctionCall := []string{"code", tc.expectedCode} + fakeLatency := date2.Sub(date1) + fakeHist.On("With", histogramFunctionCall).Return().Once() + fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() + + // Create a roundtripper with mock time and mock histogram + m, err := newMetricWrapper(fakeTime, fakeHist) + require.NoError(t, err) + require.NotNil(t, m) + + client := doerFunc(func(*http.Request) (*http.Response, error) { + + return tc.expectedResponse, tc.expectedErr + }) + + c := m.roundTripper(client) + resp, err := c.Do(tc.request) + + if tc.expectedErr == nil { + // Read and close response body + if resp.Body != nil { + io.Copy(ioutil.Discard, resp.Body) + resp.Body.Close() + } + assert.NoError(t, err) + } else { + assert.ErrorIs(t, tc.expectedErr, err) + } + + // Check the histogram and expectations + fakeHandler.AssertExpectations(t) + fakeHist.AssertExpectations(t) + + }) + } + +} + +func TestNewMetricWrapper(t *testing.T) { + + tests := []struct { + description string + expectedErr error + fakeTime func() time.Time + fakeHistogram metrics.Histogram + }{ + { + description: "Success", + expectedErr: nil, + fakeTime: time.Now, + fakeHistogram: &mockHistogram{}, + }, + { + description: "Nil Histogram", + expectedErr: errNilHistogram, + fakeTime: time.Now, + fakeHistogram: nil, + }, + { + description: "Nil Time", + expectedErr: nil, + fakeTime: nil, + fakeHistogram: &mockHistogram{}, + }, + } + + for _, tc := range tests { + + t.Run(tc.description, func(t *testing.T) { + + // Make function call + mw, err := newMetricWrapper(tc.fakeTime, tc.fakeHistogram) + + if tc.expectedErr == nil { + // Check for no errors + assert.NoError(t, err) + require.NotNil(t, mw) + + // Check that the time and histogram aren't nil + assert.NotNil(t, mw.now) + assert.NotNil(t, mw.queryLatency) + return + } + + // with error checks + assert.Nil(t, mw) + assert.ErrorIs(t, err, tc.expectedErr) + + }) + } + +} diff --git a/main.go b/main.go index 253e97dc..4e5a0e21 100644 --- a/main.go +++ b/main.go @@ -133,10 +133,10 @@ func caduceus(arguments []string) int { DeliveryInterval: caduceusConfig.Sender.DeliveryInterval, MetricsRegistry: metricsRegistry, Logger: logger, - Sender: (&http.Client{ + Sender: doerFunc((&http.Client{ Transport: tr, Timeout: caduceusConfig.Sender.ClientTimeout, - }).Do, + }).Do), CustomPIDs: caduceusConfig.Sender.CustomPIDs, DisablePartnerIDs: caduceusConfig.Sender.DisablePartnerIDs, }.New() diff --git a/metrics.go b/metrics.go index fa2c4f0c..7d82b9e4 100644 --- a/metrics.go +++ b/metrics.go @@ -1,6 +1,7 @@ package main import ( + "github.com/go-kit/kit/metrics" "github.com/xmidt-org/webpa-common/v2/xmetrics" ) @@ -23,12 +24,14 @@ const ( ConsumerDropUntilGauge = "consumer_drop_until" ConsumerDeliveryWorkersGauge = "consumer_delivery_workers" ConsumerMaxDeliveryWorkersGauge = "consumer_delivery_workers_max" + QueryDurationSecondsHistogram = "query_duration_seconds_histogram" ) const ( emptyContentTypeReason = "empty_content_type" emptyUUIDReason = "empty_uuid" bothEmptyReason = "empty_uuid_and_content_type" + networkError = "network_err" ) func Metrics() []xmetrics.Metric { @@ -137,6 +140,13 @@ func Metrics() []xmetrics.Metric { Type: "gauge", LabelNames: []string{"url"}, }, + { + Name: QueryDurationSecondsHistogram, + Help: "A histogram of latencies for queries.", + Type: "histogram", + LabelNames: []string{"url", "code"}, + Buckets: []float64{0.0625, 0.125, .25, .5, 1, 5, 10, 20, 40, 80, 160}, + }, } } @@ -151,7 +161,7 @@ func CreateOutbounderMetrics(m CaduceusMetricsRegistry, c *CaduceusOutboundSende c.droppedCutoffCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "cut_off") c.droppedInvalidConfig = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "invalid_config") - c.droppedNetworkErrCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "network_err") + c.droppedNetworkErrCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", networkError) c.droppedPanic = m.NewCounter(DropsDueToPanic).With("url", c.id) c.queueDepthGauge = m.NewGauge(OutgoingQueueDepth).With("url", c.id) c.renewalTimeGauge = m.NewGauge(ConsumerRenewalTimeGauge).With("url", c.id) @@ -160,3 +170,7 @@ func CreateOutbounderMetrics(m CaduceusMetricsRegistry, c *CaduceusOutboundSende c.currentWorkersGauge = m.NewGauge(ConsumerDeliveryWorkersGauge).With("url", c.id) c.maxWorkersGauge = m.NewGauge(ConsumerMaxDeliveryWorkersGauge).With("url", c.id) } + +func NewMetricWrapperMeasures(m CaduceusMetricsRegistry) metrics.Histogram { + return m.NewHistogram(QueryDurationSecondsHistogram, 11) +} diff --git a/mocks_test.go b/mocks_test.go index 17dc9000..1aeed03a 100644 --- a/mocks_test.go +++ b/mocks_test.go @@ -17,6 +17,7 @@ package main import ( + "time" "unicode/utf8" "github.com/go-kit/kit/metrics" @@ -95,6 +96,25 @@ func (m *mockGauge) With(labelValues ...string) metrics.Gauge { return args.Get(0).(metrics.Gauge) } +// mockHistogram provides the mock implementation of the metrics.Histogram object +type mockHistogram struct { + mock.Mock +} + +func (m *mockHistogram) Observe(value float64) { + m.Called(value) +} + +func (m *mockHistogram) With(labelValues ...string) metrics.Histogram { + for _, v := range labelValues { + if !utf8.ValidString(v) { + panic("not UTF-8") + } + } + m.Called(labelValues) + return m +} + // mockCaduceusMetricsRegistry provides the mock implementation of the // CaduceusMetricsRegistry object type mockCaduceusMetricsRegistry struct { @@ -110,3 +130,20 @@ func (m *mockCaduceusMetricsRegistry) NewGauge(name string) metrics.Gauge { args := m.Called(name) return args.Get(0).(metrics.Gauge) } + +func (m *mockCaduceusMetricsRegistry) NewHistogram(name string, buckets int) metrics.Histogram { + args := m.Called(name) + return args.Get(0).(metrics.Histogram) +} + +// mockTime provides two mock time values +func mockTime(one, two time.Time) func() time.Time { + var called bool + return func() time.Time { + if called { + return two + } + called = true + return one + } +} diff --git a/outboundSender.go b/outboundSender.go index 8e8abc2c..7cc0e818 100644 --- a/outboundSender.go +++ b/outboundSender.go @@ -72,7 +72,11 @@ type OutboundSenderFactory struct { Listener ancla.InternalWebhook // The http client Do() function to use for outbound requests. - Sender func(*http.Request) (*http.Response, error) + // Sender func(*http.Request) (*http.Response, error) + Sender httpClient + + // + ClientMiddleware func(httpClient) httpClient // The number of delivery workers to create and use. NumWorkers int @@ -103,6 +107,8 @@ type OutboundSenderFactory struct { // DisablePartnerIDs dictates whether or not to enforce the partner ID check. DisablePartnerIDs bool + + QueryLatency metrics.Histogram } type OutboundSender interface { @@ -119,7 +125,7 @@ type CaduceusOutboundSender struct { listener ancla.InternalWebhook deliverUntil time.Time dropUntil time.Time - sender func(*http.Request) (*http.Response, error) + sender httpClient events []*regexp.Regexp matcher []*regexp.Regexp queueSize int @@ -152,6 +158,7 @@ type CaduceusOutboundSender struct { queue atomic.Value customPIDs []string disablePartnerIDs bool + clientMiddleware func(httpClient) httpClient } // New creates a new OutboundSender object from the factory, or returns an error. @@ -160,6 +167,10 @@ func (osf OutboundSenderFactory) New() (obs OutboundSender, err error) { return } + if nil == osf.ClientMiddleware { + osf.ClientMiddleware = nopHTTPClient + } + if nil == osf.Sender { err = errors.New("nil Sender()") return @@ -199,6 +210,7 @@ func (osf OutboundSenderFactory) New() (obs OutboundSender, err error) { }, customPIDs: osf.CustomPIDs, disablePartnerIDs: osf.DisablePartnerIDs, + clientMiddleware: osf.ClientMiddleware, } // Don't share the secret with others when there is an error. @@ -656,7 +668,11 @@ func (obs *CaduceusOutboundSender) send(urls *ring.Ring, secret, acceptType stri "event.source", msg.Source, "event.destination", msg.Destination, ) - resp, err := xhttp.RetryTransactor(retryOptions, obs.sender)(req) + + retryer := xhttp.RetryTransactor(retryOptions, obs.sender.Do) + client := obs.clientMiddleware(doerFunc(retryer)) + resp, err := client.Do(req) + code := "failure" l := obs.logger if nil != err { @@ -740,7 +756,7 @@ func (obs *CaduceusOutboundSender) queueOverflow() { req.Header.Set("X-Webpa-Signature", sig) } - resp, err := obs.sender(req) + resp, err := obs.sender.Do(req) if nil != err { // Failure errorLog.Log(logging.MessageKey(), "Unable to send cut-off notification", "notification", @@ -760,5 +776,6 @@ func (obs *CaduceusOutboundSender) queueOverflow() { if nil != resp.Body { io.Copy(ioutil.Discard, resp.Body) resp.Body.Close() + } } diff --git a/outboundSender_test.go b/outboundSender_test.go index 8b9ea35d..562ecab7 100644 --- a/outboundSender_test.go +++ b/outboundSender_test.go @@ -145,6 +145,12 @@ func simpleFactorySetup(trans *transport, cutOffPeriod time.Duration, matcher [] fakePanicDrop.On("With", []string{"url", w.Webhook.Config.URL}).Return(fakePanicDrop) fakePanicDrop.On("Add", 1.0).Return() + // Fake Latency + fakeLatency := new(mockHistogram) + fakeLatency.On("With", []string{"url", w.Webhook.Config.URL, "code", "200"}).Return(fakeLatency) + fakeLatency.On("With", []string{"url", w.Webhook.Config.URL}).Return(fakeLatency) + fakeLatency.On("Observe", 1.0).Return() + // Build a registry and register all fake metrics, these are synymous with the metrics in // metrics.go // @@ -163,10 +169,11 @@ func simpleFactorySetup(trans *transport, cutOffPeriod time.Duration, matcher [] fakeRegistry.On("NewGauge", ConsumerDropUntilGauge).Return(fakeQdepth) fakeRegistry.On("NewGauge", ConsumerDeliveryWorkersGauge).Return(fakeQdepth) fakeRegistry.On("NewGauge", ConsumerMaxDeliveryWorkersGauge).Return(fakeQdepth) + fakeRegistry.On("NewHistogram", QueryDurationSecondsHistogram).Return(fakeLatency) return &OutboundSenderFactory{ Listener: w, - Sender: (&http.Client{Transport: trans}).Do, + Sender: doerFunc((&http.Client{Transport: trans}).Do), CutOffPeriod: cutOffPeriod, NumWorkers: 10, QueueSize: 10, @@ -583,7 +590,7 @@ func TestInvalidEventRegex(t *testing.T) { obs, err := OutboundSenderFactory{ Listener: w, - Sender: (&http.Client{}).Do, + Sender: doerFunc((&http.Client{}).Do), NumWorkers: 10, QueueSize: 10, Logger: log.NewNopLogger(), @@ -609,7 +616,7 @@ func TestInvalidUrl(t *testing.T) { obs, err := OutboundSenderFactory{ Listener: w, - Sender: (&http.Client{}).Do, + Sender: doerFunc((&http.Client{}).Do), NumWorkers: 10, QueueSize: 10, Logger: log.NewNopLogger(), @@ -627,7 +634,7 @@ func TestInvalidUrl(t *testing.T) { obs, err = OutboundSenderFactory{ Listener: w2, - Sender: (&http.Client{}).Do, + Sender: doerFunc((&http.Client{}).Do), NumWorkers: 10, QueueSize: 10, Logger: log.NewNopLogger(), @@ -665,7 +672,7 @@ func TestInvalidLogger(t *testing.T) { trans := &transport{} obsf := simpleFactorySetup(trans, time.Second, nil) obsf.Listener = w - obsf.Sender = (&http.Client{}).Do + obsf.Sender = doerFunc((&http.Client{}).Do) obsf.Logger = nil obs, err := obsf.New() @@ -690,7 +697,7 @@ func TestFailureURL(t *testing.T) { trans := &transport{} obsf := simpleFactorySetup(trans, time.Second, nil) obsf.Listener = w - obsf.Sender = (&http.Client{}).Do + obsf.Sender = doerFunc((&http.Client{}).Do) obs, err := obsf.New() assert.Nil(obs) assert.NotNil(err) @@ -711,7 +718,7 @@ func TestInvalidEvents(t *testing.T) { trans := &transport{} obsf := simpleFactorySetup(trans, time.Second, nil) obsf.Listener = w - obsf.Sender = (&http.Client{}).Do + obsf.Sender = doerFunc((&http.Client{}).Do) obs, err := obsf.New() assert.Nil(obs) @@ -728,7 +735,7 @@ func TestInvalidEvents(t *testing.T) { obsf = simpleFactorySetup(trans, time.Second, nil) obsf.Listener = w2 - obsf.Sender = (&http.Client{}).Do + obsf.Sender = doerFunc((&http.Client{}).Do) obs, err = obsf.New() assert.Nil(obs) @@ -763,7 +770,7 @@ func TestUpdate(t *testing.T) { trans := &transport{} obsf := simpleFactorySetup(trans, time.Second, nil) obsf.Listener = w1 - obsf.Sender = (&http.Client{}).Do + obsf.Sender = doerFunc((&http.Client{}).Do) obs, err := obsf.New() assert.Nil(err) @@ -798,7 +805,7 @@ func TestOverflowNoFailureURL(t *testing.T) { obsf := simpleFactorySetup(trans, time.Second, nil) obsf.Listener = w obsf.Logger = logger - obsf.Sender = (&http.Client{}).Do + obsf.Sender = doerFunc((&http.Client{}).Do) obs, err := obsf.New() assert.Nil(err) diff --git a/senderWrapper.go b/senderWrapper.go index 719889c5..313ff3a9 100644 --- a/senderWrapper.go +++ b/senderWrapper.go @@ -18,7 +18,6 @@ package main import ( "errors" - "net/http" "sync" "time" @@ -61,7 +60,7 @@ type SenderWrapperFactory struct { Logger log.Logger // The http client Do() function to share with OutboundSenders. - Sender func(*http.Request) (*http.Response, error) + Sender httpClient // CustomPIDs is a custom list of allowed PartnerIDs that will be used if a message // has no partner IDs. @@ -79,7 +78,7 @@ type SenderWrapper interface { // CaduceusSenderWrapper contains no external parameters. type CaduceusSenderWrapper struct { - sender func(*http.Request) (*http.Response, error) + sender httpClient numWorkersPerSender int queueSizePerSender int deliveryRetries int @@ -91,6 +90,7 @@ type CaduceusSenderWrapper struct { senders map[string]OutboundSender metricsRegistry CaduceusMetricsRegistry eventType metrics.Counter + queryLatency metrics.Histogram wg sync.WaitGroup shutdown chan struct{} customPIDs []string @@ -120,6 +120,7 @@ func (swf SenderWrapperFactory) New() (sw SenderWrapper, err error) { return } + caduceusSenderWrapper.queryLatency = NewMetricWrapperMeasures(swf.MetricsRegistry) caduceusSenderWrapper.eventType = swf.MetricsRegistry.NewCounter(IncomingEventTypeCounter) caduceusSenderWrapper.senders = make(map[string]OutboundSender) @@ -148,6 +149,7 @@ func (sw *CaduceusSenderWrapper) Update(list []ancla.InternalWebhook) { Logger: sw.logger, CustomPIDs: sw.customPIDs, DisablePartnerIDs: sw.disablePartnerIDs, + QueryLatency: sw.queryLatency, } ids := make([]struct { @@ -167,6 +169,12 @@ func (sw *CaduceusSenderWrapper) Update(list []ancla.InternalWebhook) { sender, ok := sw.senders[inValue.ID] if !ok { osf.Listener = inValue.Listener + metricWrapper, err := newMetricWrapper(time.Now, osf.QueryLatency.With("url", inValue.ID)) + + if err != nil { + continue + } + osf.ClientMiddleware = metricWrapper.roundTripper obs, err := osf.New() if nil == err { sw.senders[inValue.ID] = obs diff --git a/senderWrapper_test.go b/senderWrapper_test.go index 07b9f415..d1cb2ed9 100644 --- a/senderWrapper_test.go +++ b/senderWrapper_test.go @@ -76,6 +76,14 @@ func getFakeFactory() *SenderWrapperFactory { On("With", []string{"url", "http://localhost:8888/foo"}).Return(fakeGauge). On("With", []string{"url", "http://localhost:9999/foo"}).Return(fakeGauge) + // Fake Latency + fakeLatency := new(mockHistogram) + fakeLatency.On("With", []string{"url", "http://localhost:8888/foo", "code", "200"}).Return(fakeLatency) + fakeLatency.On("With", []string{"url", "http://localhost:9999/foo", "code", "200"}).Return(fakeLatency) + fakeLatency.On("With", []string{"url", "http://localhost:8888/foo"}).Return(fakeLatency) + fakeLatency.On("With", []string{"url", "http://localhost:9999/foo"}).Return(fakeLatency) + fakeLatency.On("Observe", 1.0).Return() + fakeIgnore := new(mockCounter) fakeIgnore.On("Add", 1.0).Return().On("Add", 0.0).Return(). On("With", []string{"url", "http://localhost:8888/foo"}).Return(fakeIgnore). @@ -120,6 +128,7 @@ func getFakeFactory() *SenderWrapperFactory { fakeRegistry.On("NewGauge", ConsumerDropUntilGauge).Return(fakeGauge) fakeRegistry.On("NewGauge", ConsumerDeliveryWorkersGauge).Return(fakeGauge) fakeRegistry.On("NewGauge", ConsumerMaxDeliveryWorkersGauge).Return(fakeGauge) + fakeRegistry.On("NewHistogram", QueryDurationSecondsHistogram).Return(fakeLatency) return &SenderWrapperFactory{ NumWorkersPerSender: 10, @@ -169,7 +178,7 @@ func TestSwSimple(t *testing.T) { trans := &swTransport{} swf := getFakeFactory() - swf.Sender = trans.RoundTrip + swf.Sender = doerFunc((&http.Client{}).Do) swf.Linger = 1 * time.Second sw, err := swf.New()