From c7b5e01d9e292b8f844ac6a6235909890f90ac54 Mon Sep 17 00:00:00 2001 From: Serena Zamarripa Date: Mon, 25 Apr 2022 16:16:54 -0500 Subject: [PATCH 01/15] Track Latency and test changes --- caduceus_type.go | 1 + http.go | 1 + main.go | 1 + metrics.go | 9 +++++++++ mocks_test.go | 24 ++++++++++++++++++++++++ outboundSender.go | 23 ++++++++++++++++++++++- outboundSender_test.go | 6 ++++++ senderWrapper_test.go | 7 +++++++ 8 files changed, 71 insertions(+), 1 deletion(-) diff --git a/caduceus_type.go b/caduceus_type.go index 6ae51155..f5c15c58 100644 --- a/caduceus_type.go +++ b/caduceus_type.go @@ -57,6 +57,7 @@ type SenderConfig struct { type CaduceusMetricsRegistry interface { NewCounter(name string) metrics.Counter NewGauge(name string) metrics.Gauge + NewHistogram(name string, buckets int) metrics.Histogram } type RequestHandler interface { diff --git a/http.go b/http.go index 0ec1c88f..23543d39 100644 --- a/http.go +++ b/http.go @@ -38,6 +38,7 @@ type ServerHandler struct { invalidCount metrics.Counter incomingQueueDepthMetric metrics.Gauge modifiedWRPCount metrics.Counter + querylatency metrics.Histogram incomingQueueDepth int64 maxOutstanding int64 } diff --git a/main.go b/main.go index c73597d0..6325b385 100644 --- a/main.go +++ b/main.go @@ -156,6 +156,7 @@ func caduceus(arguments []string) int { invalidCount: metricsRegistry.NewCounter(DropsDueToInvalidPayload), incomingQueueDepthMetric: metricsRegistry.NewGauge(IncomingQueueDepth), modifiedWRPCount: metricsRegistry.NewCounter(ModifiedWRPCounter), + querylatency: metricsRegistry.NewHistogram(QueryDurationSecondsHistogram, 11), maxOutstanding: 0, } diff --git a/metrics.go b/metrics.go index fa2c4f0c..38a92659 100644 --- a/metrics.go +++ b/metrics.go @@ -23,6 +23,7 @@ const ( ConsumerDropUntilGauge = "consumer_drop_until" ConsumerDeliveryWorkersGauge = "consumer_delivery_workers" ConsumerMaxDeliveryWorkersGauge = "consumer_delivery_workers_max" + QueryDurationSecondsHistogram = "query_duration_seconds_histogram" ) const ( @@ -137,6 +138,13 @@ func Metrics() []xmetrics.Metric { Type: "gauge", LabelNames: []string{"url"}, }, + { + Name: QueryDurationSecondsHistogram, + Help: "A histogram of latencies for queries.", + Type: "histogram", + LabelNames: []string{"url", "code"}, + Buckets: []float64{0.0625, 0.125, .25, .5, 1, 5, 10, 20, 40, 80, 160}, + }, } } @@ -159,4 +167,5 @@ func CreateOutbounderMetrics(m CaduceusMetricsRegistry, c *CaduceusOutboundSende c.dropUntilGauge = m.NewGauge(ConsumerDropUntilGauge).With("url", c.id) c.currentWorkersGauge = m.NewGauge(ConsumerDeliveryWorkersGauge).With("url", c.id) c.maxWorkersGauge = m.NewGauge(ConsumerMaxDeliveryWorkersGauge).With("url", c.id) + c.querylatency = m.NewHistogram(QueryDurationSecondsHistogram, 11).With("url", c.id, "code", "200") } diff --git a/mocks_test.go b/mocks_test.go index 17dc9000..986243d5 100644 --- a/mocks_test.go +++ b/mocks_test.go @@ -95,6 +95,25 @@ func (m *mockGauge) With(labelValues ...string) metrics.Gauge { return args.Get(0).(metrics.Gauge) } +// mockHistogram provides the mock implementation of the metrics.Histogram object +type mockHistogram struct { + mock.Mock +} + +func (m *mockHistogram) Observe(value float64) { + m.Called(value) +} + +func (m *mockHistogram) With(labelValues ...string) metrics.Histogram { + for _, v := range labelValues { + if !utf8.ValidString(v) { + panic("not UTF-8") + } + } + args := m.Called(labelValues) + return args.Get(0).(metrics.Histogram) +} + // mockCaduceusMetricsRegistry provides the mock implementation of the // CaduceusMetricsRegistry object type mockCaduceusMetricsRegistry struct { @@ -110,3 +129,8 @@ func (m *mockCaduceusMetricsRegistry) NewGauge(name string) metrics.Gauge { args := m.Called(name) return args.Get(0).(metrics.Gauge) } + +func (m *mockCaduceusMetricsRegistry) NewHistogram(name string, buckets int) metrics.Histogram { + args := m.Called(name) + return args.Get(0).(metrics.Histogram) +} diff --git a/outboundSender.go b/outboundSender.go index 8e8abc2c..cfaa3863 100644 --- a/outboundSender.go +++ b/outboundSender.go @@ -40,6 +40,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/go-kit/kit/metrics" + "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/xmidt-org/ancla" "github.com/xmidt-org/webpa-common/v2/device" "github.com/xmidt-org/webpa-common/v2/logging" @@ -142,6 +143,7 @@ type CaduceusOutboundSender struct { maxWorkersGauge metrics.Gauge currentWorkersGauge metrics.Gauge deliveryRetryMaxGauge metrics.Gauge + querylatency metrics.Histogram wg sync.WaitGroup cutOffPeriod time.Duration workers semaphore.Interface @@ -656,7 +658,10 @@ func (obs *CaduceusOutboundSender) send(urls *ring.Ring, secret, acceptType stri "event.source", msg.Source, "event.destination", msg.Destination, ) - resp, err := xhttp.RetryTransactor(retryOptions, obs.sender)(req) + + respFunc := xhttp.RetryTransactor(retryOptions, obs.sender) + roundTripperFunc := obs.metricRoundTripper(respFunc, time.Now) + resp, err := respFunc(req) code := "failure" l := obs.logger if nil != err { @@ -672,6 +677,7 @@ func (obs *CaduceusOutboundSender) send(urls *ring.Ring, secret, acceptType stri if nil != resp.Body { io.Copy(ioutil.Discard, resp.Body) resp.Body.Close() + roundTripperFunc(req) } } obs.deliveryCounter.With("url", obs.id, "code", code, "event", event).Add(1.0) @@ -762,3 +768,18 @@ func (obs *CaduceusOutboundSender) queueOverflow() { resp.Body.Close() } } + +func (obs *CaduceusOutboundSender) metricRoundTripper(next promhttp.RoundTripperFunc, now func() time.Time) promhttp.RoundTripperFunc { + if now == nil { + now = time.Now + } + return func(req *http.Request) (*http.Response, error) { + resp, err := next(req) + + // find time difference, add to metric + var latency = time.Since(now()) + obs.querylatency.Observe(float64(latency)) + + return resp, err + } +} diff --git a/outboundSender_test.go b/outboundSender_test.go index 8b9ea35d..05e9315c 100644 --- a/outboundSender_test.go +++ b/outboundSender_test.go @@ -145,6 +145,11 @@ func simpleFactorySetup(trans *transport, cutOffPeriod time.Duration, matcher [] fakePanicDrop.On("With", []string{"url", w.Webhook.Config.URL}).Return(fakePanicDrop) fakePanicDrop.On("Add", 1.0).Return() + // Fake + fakeLatency := new(mockHistogram) + fakeLatency.On("With", []string{"url", w.Webhook.Config.URL, "code", "200"}).Return(fakeLatency) + fakeLatency.On("Observe", 1.0).Return() + // Build a registry and register all fake metrics, these are synymous with the metrics in // metrics.go // @@ -163,6 +168,7 @@ func simpleFactorySetup(trans *transport, cutOffPeriod time.Duration, matcher [] fakeRegistry.On("NewGauge", ConsumerDropUntilGauge).Return(fakeQdepth) fakeRegistry.On("NewGauge", ConsumerDeliveryWorkersGauge).Return(fakeQdepth) fakeRegistry.On("NewGauge", ConsumerMaxDeliveryWorkersGauge).Return(fakeQdepth) + fakeRegistry.On("NewHistogram", QueryDurationSecondsHistogram).Return(fakeLatency) return &OutboundSenderFactory{ Listener: w, diff --git a/senderWrapper_test.go b/senderWrapper_test.go index 07b9f415..c1d926cd 100644 --- a/senderWrapper_test.go +++ b/senderWrapper_test.go @@ -69,6 +69,12 @@ func getFakeFactory() *SenderWrapperFactory { fakeDDTIP := new(mockCounter) fakeDDTIP.On("Add", 1.0).Return() + // Fake + fakeLatency := new(mockHistogram) + fakeLatency.On("With", []string{"url", "http://localhost:8888/foo", "code", "200"}).Return(fakeLatency) + fakeLatency.On("With", []string{"url", "http://localhost:9999/foo", "code", "200"}).Return(fakeLatency) + fakeLatency.On("Observe", 1.0).Return() + fakeGauge := new(mockGauge) fakeGauge.On("Add", 1.0).Return(). On("Add", -1.0).Return(). @@ -120,6 +126,7 @@ func getFakeFactory() *SenderWrapperFactory { fakeRegistry.On("NewGauge", ConsumerDropUntilGauge).Return(fakeGauge) fakeRegistry.On("NewGauge", ConsumerDeliveryWorkersGauge).Return(fakeGauge) fakeRegistry.On("NewGauge", ConsumerMaxDeliveryWorkersGauge).Return(fakeGauge) + fakeRegistry.On("NewHistogram", QueryDurationSecondsHistogram).Return(fakeLatency) return &SenderWrapperFactory{ NumWorkersPerSender: 10, From 06711ee2ad28686e6158dbeb9c1fc0109b357cf6 Mon Sep 17 00:00:00 2001 From: Serena Zamarripa Date: Wed, 27 Apr 2022 13:21:57 -0500 Subject: [PATCH 02/15] Refactoring the latency methods to HttpClient --- HttpClient.go | 52 ++++++++++++++++++++++++++++++++++++++++++ caduceus_type.go | 2 +- metrics.go | 1 - mocks_test.go | 29 ++++++----------------- outboundSender.go | 27 ++++++---------------- outboundSender_test.go | 6 ----- senderWrapper_test.go | 7 ------ 7 files changed, 67 insertions(+), 57 deletions(-) create mode 100644 HttpClient.go diff --git a/HttpClient.go b/HttpClient.go new file mode 100644 index 00000000..9dc5b3c0 --- /dev/null +++ b/HttpClient.go @@ -0,0 +1,52 @@ +package main + +import ( + "errors" + "net/http" + "time" + + "github.com/go-kit/kit/metrics" +) + +type HTTPClient interface { + Do(*http.Request) (*http.Response, error) +} + +// DoerFunc implements HTTPClient +type DoerFunc func(*http.Request) (*http.Response, error) + +func (d DoerFunc) Do(req *http.Request) (*http.Response, error) { + return d(req) +} + +type metricWrapper struct { + now func() time.Time + queryLatency metrics.Histogram +} + +func newMetricWrapper(now func() time.Time, queryLatency metrics.Histogram) (*metricWrapper, error) { + if now == nil { + now = time.Now + } + if queryLatency == nil { + return nil, errors.New("histogram cannot be nil") + } + return &metricWrapper{ + now: now, + queryLatency: queryLatency, + }, nil +} + +func (m *metricWrapper) roundTripper(next HTTPClient) HTTPClient { + return DoerFunc(func(req *http.Request) (*http.Response, error) { + startTime := m.now() + resp, err := next.Do(req) + endTime := m.now() + + // find time difference, add to metric + var latency = endTime.Sub(startTime) + m.queryLatency.Observe(latency.Seconds()) + + return resp, err + }) +} diff --git a/caduceus_type.go b/caduceus_type.go index f5c15c58..bc08a51f 100644 --- a/caduceus_type.go +++ b/caduceus_type.go @@ -57,7 +57,7 @@ type SenderConfig struct { type CaduceusMetricsRegistry interface { NewCounter(name string) metrics.Counter NewGauge(name string) metrics.Gauge - NewHistogram(name string, buckets int) metrics.Histogram + // NewHistogram(name string, buckets int) metrics.Histogram } type RequestHandler interface { diff --git a/metrics.go b/metrics.go index 38a92659..ae5f9393 100644 --- a/metrics.go +++ b/metrics.go @@ -167,5 +167,4 @@ func CreateOutbounderMetrics(m CaduceusMetricsRegistry, c *CaduceusOutboundSende c.dropUntilGauge = m.NewGauge(ConsumerDropUntilGauge).With("url", c.id) c.currentWorkersGauge = m.NewGauge(ConsumerDeliveryWorkersGauge).With("url", c.id) c.maxWorkersGauge = m.NewGauge(ConsumerMaxDeliveryWorkersGauge).With("url", c.id) - c.querylatency = m.NewHistogram(QueryDurationSecondsHistogram, 11).With("url", c.id, "code", "200") } diff --git a/mocks_test.go b/mocks_test.go index 986243d5..12f0ecff 100644 --- a/mocks_test.go +++ b/mocks_test.go @@ -95,24 +95,14 @@ func (m *mockGauge) With(labelValues ...string) metrics.Gauge { return args.Get(0).(metrics.Gauge) } -// mockHistogram provides the mock implementation of the metrics.Histogram object -type mockHistogram struct { - mock.Mock -} - -func (m *mockHistogram) Observe(value float64) { - m.Called(value) -} +// // mockHistogram provides the mock implementation of the metrics.Histogram object +// type mockHistogram struct { +// mock.Mock +// } -func (m *mockHistogram) With(labelValues ...string) metrics.Histogram { - for _, v := range labelValues { - if !utf8.ValidString(v) { - panic("not UTF-8") - } - } - args := m.Called(labelValues) - return args.Get(0).(metrics.Histogram) -} +// func (m *mockHistogram) Observe(value float64) { +// m.Called(value) +// } // mockCaduceusMetricsRegistry provides the mock implementation of the // CaduceusMetricsRegistry object @@ -129,8 +119,3 @@ func (m *mockCaduceusMetricsRegistry) NewGauge(name string) metrics.Gauge { args := m.Called(name) return args.Get(0).(metrics.Gauge) } - -func (m *mockCaduceusMetricsRegistry) NewHistogram(name string, buckets int) metrics.Histogram { - args := m.Called(name) - return args.Get(0).(metrics.Histogram) -} diff --git a/outboundSender.go b/outboundSender.go index cfaa3863..ac1e6c3c 100644 --- a/outboundSender.go +++ b/outboundSender.go @@ -40,7 +40,6 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/go-kit/kit/metrics" - "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/xmidt-org/ancla" "github.com/xmidt-org/webpa-common/v2/device" "github.com/xmidt-org/webpa-common/v2/logging" @@ -154,6 +153,8 @@ type CaduceusOutboundSender struct { queue atomic.Value customPIDs []string disablePartnerIDs bool + Sender HTTPClient + clientMiddleware func(HTTPClient) HTTPClient } // New creates a new OutboundSender object from the factory, or returns an error. @@ -659,9 +660,10 @@ func (obs *CaduceusOutboundSender) send(urls *ring.Ring, secret, acceptType stri "event.destination", msg.Destination, ) - respFunc := xhttp.RetryTransactor(retryOptions, obs.sender) - roundTripperFunc := obs.metricRoundTripper(respFunc, time.Now) - resp, err := respFunc(req) + retryer := xhttp.RetryTransactor(retryOptions, obs.sender) + roundTripper := obs.clientMiddleware(DoerFunc(retryer)) + resp, err := xhttp.RetryTransactor(retryOptions, obs.sender)(req) + roundTripper.Do(req) code := "failure" l := obs.logger if nil != err { @@ -677,7 +679,7 @@ func (obs *CaduceusOutboundSender) send(urls *ring.Ring, secret, acceptType stri if nil != resp.Body { io.Copy(ioutil.Discard, resp.Body) resp.Body.Close() - roundTripperFunc(req) + roundTripper.Do(req) } } obs.deliveryCounter.With("url", obs.id, "code", code, "event", event).Add(1.0) @@ -768,18 +770,3 @@ func (obs *CaduceusOutboundSender) queueOverflow() { resp.Body.Close() } } - -func (obs *CaduceusOutboundSender) metricRoundTripper(next promhttp.RoundTripperFunc, now func() time.Time) promhttp.RoundTripperFunc { - if now == nil { - now = time.Now - } - return func(req *http.Request) (*http.Response, error) { - resp, err := next(req) - - // find time difference, add to metric - var latency = time.Since(now()) - obs.querylatency.Observe(float64(latency)) - - return resp, err - } -} diff --git a/outboundSender_test.go b/outboundSender_test.go index 05e9315c..8b9ea35d 100644 --- a/outboundSender_test.go +++ b/outboundSender_test.go @@ -145,11 +145,6 @@ func simpleFactorySetup(trans *transport, cutOffPeriod time.Duration, matcher [] fakePanicDrop.On("With", []string{"url", w.Webhook.Config.URL}).Return(fakePanicDrop) fakePanicDrop.On("Add", 1.0).Return() - // Fake - fakeLatency := new(mockHistogram) - fakeLatency.On("With", []string{"url", w.Webhook.Config.URL, "code", "200"}).Return(fakeLatency) - fakeLatency.On("Observe", 1.0).Return() - // Build a registry and register all fake metrics, these are synymous with the metrics in // metrics.go // @@ -168,7 +163,6 @@ func simpleFactorySetup(trans *transport, cutOffPeriod time.Duration, matcher [] fakeRegistry.On("NewGauge", ConsumerDropUntilGauge).Return(fakeQdepth) fakeRegistry.On("NewGauge", ConsumerDeliveryWorkersGauge).Return(fakeQdepth) fakeRegistry.On("NewGauge", ConsumerMaxDeliveryWorkersGauge).Return(fakeQdepth) - fakeRegistry.On("NewHistogram", QueryDurationSecondsHistogram).Return(fakeLatency) return &OutboundSenderFactory{ Listener: w, diff --git a/senderWrapper_test.go b/senderWrapper_test.go index c1d926cd..07b9f415 100644 --- a/senderWrapper_test.go +++ b/senderWrapper_test.go @@ -69,12 +69,6 @@ func getFakeFactory() *SenderWrapperFactory { fakeDDTIP := new(mockCounter) fakeDDTIP.On("Add", 1.0).Return() - // Fake - fakeLatency := new(mockHistogram) - fakeLatency.On("With", []string{"url", "http://localhost:8888/foo", "code", "200"}).Return(fakeLatency) - fakeLatency.On("With", []string{"url", "http://localhost:9999/foo", "code", "200"}).Return(fakeLatency) - fakeLatency.On("Observe", 1.0).Return() - fakeGauge := new(mockGauge) fakeGauge.On("Add", 1.0).Return(). On("Add", -1.0).Return(). @@ -126,7 +120,6 @@ func getFakeFactory() *SenderWrapperFactory { fakeRegistry.On("NewGauge", ConsumerDropUntilGauge).Return(fakeGauge) fakeRegistry.On("NewGauge", ConsumerDeliveryWorkersGauge).Return(fakeGauge) fakeRegistry.On("NewGauge", ConsumerMaxDeliveryWorkersGauge).Return(fakeGauge) - fakeRegistry.On("NewHistogram", QueryDurationSecondsHistogram).Return(fakeLatency) return &SenderWrapperFactory{ NumWorkersPerSender: 10, From e0f642c0ed72feb0451dd1ca49f799615b231c56 Mon Sep 17 00:00:00 2001 From: Serena Zamarripa Date: Mon, 2 May 2022 12:55:36 -0500 Subject: [PATCH 03/15] Updated to pass unit tests --- main.go | 10 ++++------ outboundSender.go | 28 ++++++++++++++++++---------- outboundSender_test.go | 20 ++++++++++---------- senderWrapper.go | 5 ++--- senderWrapper_test.go | 2 +- 5 files changed, 35 insertions(+), 30 deletions(-) diff --git a/main.go b/main.go index 6325b385..4e442b8d 100644 --- a/main.go +++ b/main.go @@ -123,6 +123,7 @@ func caduceus(arguments []string) int { otelhttp.WithTracerProvider(tracing.TracerProvider()), ) + var client HTTPClient caduceusSenderWrapper, err := SenderWrapperFactory{ NumWorkersPerSender: caduceusConfig.Sender.NumWorkersPerSender, QueueSizePerSender: caduceusConfig.Sender.QueueSizePerSender, @@ -132,12 +133,9 @@ func caduceus(arguments []string) int { DeliveryInterval: caduceusConfig.Sender.DeliveryInterval, MetricsRegistry: metricsRegistry, Logger: logger, - Sender: (&http.Client{ - Transport: tr, - Timeout: caduceusConfig.Sender.ClientTimeout, - }).Do, - CustomPIDs: caduceusConfig.Sender.CustomPIDs, - DisablePartnerIDs: caduceusConfig.Sender.DisablePartnerIDs, + Sender: client, + CustomPIDs: caduceusConfig.Sender.CustomPIDs, + DisablePartnerIDs: caduceusConfig.Sender.DisablePartnerIDs, }.New() if err != nil { diff --git a/outboundSender.go b/outboundSender.go index ac1e6c3c..5a5a04e0 100644 --- a/outboundSender.go +++ b/outboundSender.go @@ -72,7 +72,11 @@ type OutboundSenderFactory struct { Listener ancla.InternalWebhook // The http client Do() function to use for outbound requests. - Sender func(*http.Request) (*http.Response, error) + // Sender func(*http.Request) (*http.Response, error) + Sender HTTPClient + + // + ClientMiddleware func(HTTPClient) HTTPClient // The number of delivery workers to create and use. NumWorkers int @@ -119,7 +123,7 @@ type CaduceusOutboundSender struct { listener ancla.InternalWebhook deliverUntil time.Time dropUntil time.Time - sender func(*http.Request) (*http.Response, error) + sender HTTPClient events []*regexp.Regexp matcher []*regexp.Regexp queueSize int @@ -142,7 +146,6 @@ type CaduceusOutboundSender struct { maxWorkersGauge metrics.Gauge currentWorkersGauge metrics.Gauge deliveryRetryMaxGauge metrics.Gauge - querylatency metrics.Histogram wg sync.WaitGroup cutOffPeriod time.Duration workers semaphore.Interface @@ -153,7 +156,6 @@ type CaduceusOutboundSender struct { queue atomic.Value customPIDs []string disablePartnerIDs bool - Sender HTTPClient clientMiddleware func(HTTPClient) HTTPClient } @@ -163,6 +165,10 @@ func (osf OutboundSenderFactory) New() (obs OutboundSender, err error) { return } + if nil == osf.ClientMiddleware { + osf.ClientMiddleware = NopHTTPClient + } + if nil == osf.Sender { err = errors.New("nil Sender()") return @@ -202,6 +208,7 @@ func (osf OutboundSenderFactory) New() (obs OutboundSender, err error) { }, customPIDs: osf.CustomPIDs, disablePartnerIDs: osf.DisablePartnerIDs, + clientMiddleware: osf.ClientMiddleware, } // Don't share the secret with others when there is an error. @@ -228,6 +235,10 @@ func (osf OutboundSenderFactory) New() (obs OutboundSender, err error) { return } +func NopHTTPClient(next HTTPClient) HTTPClient { + return next +} + // Update applies user configurable values for the outbound sender when a // webhook is registered func (obs *CaduceusOutboundSender) Update(wh ancla.InternalWebhook) (err error) { @@ -660,10 +671,8 @@ func (obs *CaduceusOutboundSender) send(urls *ring.Ring, secret, acceptType stri "event.destination", msg.Destination, ) - retryer := xhttp.RetryTransactor(retryOptions, obs.sender) - roundTripper := obs.clientMiddleware(DoerFunc(retryer)) - resp, err := xhttp.RetryTransactor(retryOptions, obs.sender)(req) - roundTripper.Do(req) + retryer := xhttp.RetryTransactor(retryOptions, obs.sender.Do) + resp, err := retryer(req) code := "failure" l := obs.logger if nil != err { @@ -679,7 +688,6 @@ func (obs *CaduceusOutboundSender) send(urls *ring.Ring, secret, acceptType stri if nil != resp.Body { io.Copy(ioutil.Discard, resp.Body) resp.Body.Close() - roundTripper.Do(req) } } obs.deliveryCounter.With("url", obs.id, "code", code, "event", event).Add(1.0) @@ -748,7 +756,7 @@ func (obs *CaduceusOutboundSender) queueOverflow() { req.Header.Set("X-Webpa-Signature", sig) } - resp, err := obs.sender(req) + resp, err := obs.sender.Do(req) if nil != err { // Failure errorLog.Log(logging.MessageKey(), "Unable to send cut-off notification", "notification", diff --git a/outboundSender_test.go b/outboundSender_test.go index 8b9ea35d..2fcab104 100644 --- a/outboundSender_test.go +++ b/outboundSender_test.go @@ -166,7 +166,7 @@ func simpleFactorySetup(trans *transport, cutOffPeriod time.Duration, matcher [] return &OutboundSenderFactory{ Listener: w, - Sender: (&http.Client{Transport: trans}).Do, + Sender: DoerFunc((&http.Client{Transport: trans}).Do), CutOffPeriod: cutOffPeriod, NumWorkers: 10, QueueSize: 10, @@ -583,7 +583,7 @@ func TestInvalidEventRegex(t *testing.T) { obs, err := OutboundSenderFactory{ Listener: w, - Sender: (&http.Client{}).Do, + Sender: DoerFunc((&http.Client{}).Do), NumWorkers: 10, QueueSize: 10, Logger: log.NewNopLogger(), @@ -609,7 +609,7 @@ func TestInvalidUrl(t *testing.T) { obs, err := OutboundSenderFactory{ Listener: w, - Sender: (&http.Client{}).Do, + Sender: DoerFunc((&http.Client{}).Do), NumWorkers: 10, QueueSize: 10, Logger: log.NewNopLogger(), @@ -627,7 +627,7 @@ func TestInvalidUrl(t *testing.T) { obs, err = OutboundSenderFactory{ Listener: w2, - Sender: (&http.Client{}).Do, + Sender: DoerFunc((&http.Client{}).Do), NumWorkers: 10, QueueSize: 10, Logger: log.NewNopLogger(), @@ -665,7 +665,7 @@ func TestInvalidLogger(t *testing.T) { trans := &transport{} obsf := simpleFactorySetup(trans, time.Second, nil) obsf.Listener = w - obsf.Sender = (&http.Client{}).Do + obsf.Sender = DoerFunc((&http.Client{}).Do) obsf.Logger = nil obs, err := obsf.New() @@ -690,7 +690,7 @@ func TestFailureURL(t *testing.T) { trans := &transport{} obsf := simpleFactorySetup(trans, time.Second, nil) obsf.Listener = w - obsf.Sender = (&http.Client{}).Do + obsf.Sender = DoerFunc((&http.Client{}).Do) obs, err := obsf.New() assert.Nil(obs) assert.NotNil(err) @@ -711,7 +711,7 @@ func TestInvalidEvents(t *testing.T) { trans := &transport{} obsf := simpleFactorySetup(trans, time.Second, nil) obsf.Listener = w - obsf.Sender = (&http.Client{}).Do + obsf.Sender = DoerFunc((&http.Client{}).Do) obs, err := obsf.New() assert.Nil(obs) @@ -728,7 +728,7 @@ func TestInvalidEvents(t *testing.T) { obsf = simpleFactorySetup(trans, time.Second, nil) obsf.Listener = w2 - obsf.Sender = (&http.Client{}).Do + obsf.Sender = DoerFunc((&http.Client{}).Do) obs, err = obsf.New() assert.Nil(obs) @@ -763,7 +763,7 @@ func TestUpdate(t *testing.T) { trans := &transport{} obsf := simpleFactorySetup(trans, time.Second, nil) obsf.Listener = w1 - obsf.Sender = (&http.Client{}).Do + obsf.Sender = DoerFunc((&http.Client{}).Do) obs, err := obsf.New() assert.Nil(err) @@ -798,7 +798,7 @@ func TestOverflowNoFailureURL(t *testing.T) { obsf := simpleFactorySetup(trans, time.Second, nil) obsf.Listener = w obsf.Logger = logger - obsf.Sender = (&http.Client{}).Do + obsf.Sender = DoerFunc((&http.Client{}).Do) obs, err := obsf.New() assert.Nil(err) diff --git a/senderWrapper.go b/senderWrapper.go index 3ff1cd5e..e541ac2f 100644 --- a/senderWrapper.go +++ b/senderWrapper.go @@ -18,7 +18,6 @@ package main import ( "errors" - "net/http" "sync" "time" @@ -61,7 +60,7 @@ type SenderWrapperFactory struct { Logger log.Logger // The http client Do() function to share with OutboundSenders. - Sender func(*http.Request) (*http.Response, error) + Sender HTTPClient // CustomPIDs is a custom list of allowed PartnerIDs that will be used if a message // has no partner IDs. @@ -79,7 +78,7 @@ type SenderWrapper interface { // CaduceusSenderWrapper contains no external parameters. type CaduceusSenderWrapper struct { - sender func(*http.Request) (*http.Response, error) + sender HTTPClient numWorkersPerSender int queueSizePerSender int deliveryRetries int diff --git a/senderWrapper_test.go b/senderWrapper_test.go index 07b9f415..3534cdbb 100644 --- a/senderWrapper_test.go +++ b/senderWrapper_test.go @@ -169,7 +169,7 @@ func TestSwSimple(t *testing.T) { trans := &swTransport{} swf := getFakeFactory() - swf.Sender = trans.RoundTrip + swf.Sender = DoerFunc((&http.Client{}).Do) swf.Linger = 1 * time.Second sw, err := swf.New() From 1646ea2d30f5502c21091ab9f085bd384d070e3b Mon Sep 17 00:00:00 2001 From: Serena Zamarripa Date: Tue, 3 May 2022 18:53:41 -0500 Subject: [PATCH 04/15] Updated outbound sender tests --- caduceus_type.go | 2 +- metrics.go | 1 + mocks_test.go | 27 +++++++++++++++++++++------ outboundSender.go | 1 + outboundSender_test.go | 6 ++++++ senderWrapper_test.go | 7 +++++++ 6 files changed, 37 insertions(+), 7 deletions(-) diff --git a/caduceus_type.go b/caduceus_type.go index bc08a51f..f5c15c58 100644 --- a/caduceus_type.go +++ b/caduceus_type.go @@ -57,7 +57,7 @@ type SenderConfig struct { type CaduceusMetricsRegistry interface { NewCounter(name string) metrics.Counter NewGauge(name string) metrics.Gauge - // NewHistogram(name string, buckets int) metrics.Histogram + NewHistogram(name string, buckets int) metrics.Histogram } type RequestHandler interface { diff --git a/metrics.go b/metrics.go index ae5f9393..38a92659 100644 --- a/metrics.go +++ b/metrics.go @@ -167,4 +167,5 @@ func CreateOutbounderMetrics(m CaduceusMetricsRegistry, c *CaduceusOutboundSende c.dropUntilGauge = m.NewGauge(ConsumerDropUntilGauge).With("url", c.id) c.currentWorkersGauge = m.NewGauge(ConsumerDeliveryWorkersGauge).With("url", c.id) c.maxWorkersGauge = m.NewGauge(ConsumerMaxDeliveryWorkersGauge).With("url", c.id) + c.querylatency = m.NewHistogram(QueryDurationSecondsHistogram, 11).With("url", c.id, "code", "200") } diff --git a/mocks_test.go b/mocks_test.go index 12f0ecff..66d004ce 100644 --- a/mocks_test.go +++ b/mocks_test.go @@ -96,13 +96,23 @@ func (m *mockGauge) With(labelValues ...string) metrics.Gauge { } // // mockHistogram provides the mock implementation of the metrics.Histogram object -// type mockHistogram struct { -// mock.Mock -// } +type mockHistogram struct { + mock.Mock +} + +func (m *mockHistogram) Observe(value float64) { + m.Called(value) +} -// func (m *mockHistogram) Observe(value float64) { -// m.Called(value) -// } +func (m *mockHistogram) With(labelValues ...string) metrics.Histogram { + for _, v := range labelValues { + if !utf8.ValidString(v) { + panic("not UTF-8") + } + } + args := m.Called(labelValues) + return args.Get(0).(metrics.Histogram) +} // mockCaduceusMetricsRegistry provides the mock implementation of the // CaduceusMetricsRegistry object @@ -119,3 +129,8 @@ func (m *mockCaduceusMetricsRegistry) NewGauge(name string) metrics.Gauge { args := m.Called(name) return args.Get(0).(metrics.Gauge) } + +func (m *mockCaduceusMetricsRegistry) NewHistogram(name string, buckets int) metrics.Histogram { + args := m.Called(name) + return args.Get(0).(metrics.Histogram) +} diff --git a/outboundSender.go b/outboundSender.go index 5a5a04e0..0b657bca 100644 --- a/outboundSender.go +++ b/outboundSender.go @@ -146,6 +146,7 @@ type CaduceusOutboundSender struct { maxWorkersGauge metrics.Gauge currentWorkersGauge metrics.Gauge deliveryRetryMaxGauge metrics.Gauge + querylatency metrics.Histogram wg sync.WaitGroup cutOffPeriod time.Duration workers semaphore.Interface diff --git a/outboundSender_test.go b/outboundSender_test.go index 2fcab104..3447a79a 100644 --- a/outboundSender_test.go +++ b/outboundSender_test.go @@ -145,6 +145,11 @@ func simpleFactorySetup(trans *transport, cutOffPeriod time.Duration, matcher [] fakePanicDrop.On("With", []string{"url", w.Webhook.Config.URL}).Return(fakePanicDrop) fakePanicDrop.On("Add", 1.0).Return() + // Fake Latency + fakeLatency := new(mockHistogram) + fakeLatency.On("With", []string{"url", w.Webhook.Config.URL, "code", "200"}).Return(fakeLatency) + fakeLatency.On("Observe", 1.0).Return() + // Build a registry and register all fake metrics, these are synymous with the metrics in // metrics.go // @@ -163,6 +168,7 @@ func simpleFactorySetup(trans *transport, cutOffPeriod time.Duration, matcher [] fakeRegistry.On("NewGauge", ConsumerDropUntilGauge).Return(fakeQdepth) fakeRegistry.On("NewGauge", ConsumerDeliveryWorkersGauge).Return(fakeQdepth) fakeRegistry.On("NewGauge", ConsumerMaxDeliveryWorkersGauge).Return(fakeQdepth) + fakeRegistry.On("NewHistogram", QueryDurationSecondsHistogram).Return(fakeLatency) return &OutboundSenderFactory{ Listener: w, diff --git a/senderWrapper_test.go b/senderWrapper_test.go index 3534cdbb..6b514cb7 100644 --- a/senderWrapper_test.go +++ b/senderWrapper_test.go @@ -76,6 +76,12 @@ func getFakeFactory() *SenderWrapperFactory { On("With", []string{"url", "http://localhost:8888/foo"}).Return(fakeGauge). On("With", []string{"url", "http://localhost:9999/foo"}).Return(fakeGauge) + // Fake Latency + fakeLatency := new(mockHistogram) + fakeLatency.On("With", []string{"url", "http://localhost:8888/foo", "code", "200"}).Return(fakeLatency) + fakeLatency.On("With", []string{"url", "http://localhost:9999/foo", "code", "200"}).Return(fakeLatency) + fakeLatency.On("Observe", 1.0).Return() + fakeIgnore := new(mockCounter) fakeIgnore.On("Add", 1.0).Return().On("Add", 0.0).Return(). On("With", []string{"url", "http://localhost:8888/foo"}).Return(fakeIgnore). @@ -120,6 +126,7 @@ func getFakeFactory() *SenderWrapperFactory { fakeRegistry.On("NewGauge", ConsumerDropUntilGauge).Return(fakeGauge) fakeRegistry.On("NewGauge", ConsumerDeliveryWorkersGauge).Return(fakeGauge) fakeRegistry.On("NewGauge", ConsumerMaxDeliveryWorkersGauge).Return(fakeGauge) + fakeRegistry.On("NewHistogram", QueryDurationSecondsHistogram).Return(fakeLatency) return &SenderWrapperFactory{ NumWorkersPerSender: 10, From 0f7054a6222a7f47d485b97328fb4560ccd36f92 Mon Sep 17 00:00:00 2001 From: Serena Zamarripa Date: Wed, 4 May 2022 15:58:29 -0500 Subject: [PATCH 05/15] Corrected httpclient sent to sender wrapper --- main.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/main.go b/main.go index 4054c7c8..5538914b 100644 --- a/main.go +++ b/main.go @@ -123,7 +123,6 @@ func caduceus(arguments []string) int { otelhttp.WithTracerProvider(tracing.TracerProvider()), ) - var client HTTPClient caduceusSenderWrapper, err := SenderWrapperFactory{ NumWorkersPerSender: caduceusConfig.Sender.NumWorkersPerSender, QueueSizePerSender: caduceusConfig.Sender.QueueSizePerSender, @@ -133,9 +132,12 @@ func caduceus(arguments []string) int { DeliveryInterval: caduceusConfig.Sender.DeliveryInterval, MetricsRegistry: metricsRegistry, Logger: logger, - Sender: client, - CustomPIDs: caduceusConfig.Sender.CustomPIDs, - DisablePartnerIDs: caduceusConfig.Sender.DisablePartnerIDs, + Sender: DoerFunc((&http.Client{ + Transport: tr, + Timeout: caduceusConfig.Sender.ClientTimeout, + }).Do), + CustomPIDs: caduceusConfig.Sender.CustomPIDs, + DisablePartnerIDs: caduceusConfig.Sender.DisablePartnerIDs, }.New() if err != nil { @@ -154,7 +156,6 @@ func caduceus(arguments []string) int { invalidCount: metricsRegistry.NewCounter(DropsDueToInvalidPayload), incomingQueueDepthMetric: metricsRegistry.NewGauge(IncomingQueueDepth), modifiedWRPCount: metricsRegistry.NewCounter(ModifiedWRPCounter), - querylatency: metricsRegistry.NewHistogram(QueryDurationSecondsHistogram, 11), maxOutstanding: 0, } From 34dac0d87e5bd0eea727480216ec2c974c4a2ffe Mon Sep 17 00:00:00 2001 From: Serena Zamarripa Date: Fri, 6 May 2022 14:48:18 -0500 Subject: [PATCH 06/15] Reworked latency tracking solution --- HttpClient.go | 14 +++++++++++--- http.go | 1 - metrics.go | 4 ++-- outboundSender.go | 16 +++++++++++----- outboundSender_test.go | 1 + senderWrapper.go | 23 +++++++++++++++++++++-- senderWrapper_test.go | 2 ++ 7 files changed, 48 insertions(+), 13 deletions(-) diff --git a/HttpClient.go b/HttpClient.go index 9dc5b3c0..49235b33 100644 --- a/HttpClient.go +++ b/HttpClient.go @@ -3,12 +3,13 @@ package main import ( "errors" "net/http" + "strconv" "time" "github.com/go-kit/kit/metrics" ) -type HTTPClient interface { +type httpClient interface { Do(*http.Request) (*http.Response, error) } @@ -37,15 +38,22 @@ func newMetricWrapper(now func() time.Time, queryLatency metrics.Histogram) (*me }, nil } -func (m *metricWrapper) roundTripper(next HTTPClient) HTTPClient { +func (m *metricWrapper) roundTripper(next httpClient) httpClient { return DoerFunc(func(req *http.Request) (*http.Response, error) { startTime := m.now() resp, err := next.Do(req) endTime := m.now() + var code string + + if err != nil { + code = networkError + } + + code = strconv.Itoa(resp.StatusCode) // find time difference, add to metric var latency = endTime.Sub(startTime) - m.queryLatency.Observe(latency.Seconds()) + m.queryLatency.With("code", code).Observe(latency.Seconds()) return resp, err }) diff --git a/http.go b/http.go index 23543d39..0ec1c88f 100644 --- a/http.go +++ b/http.go @@ -38,7 +38,6 @@ type ServerHandler struct { invalidCount metrics.Counter incomingQueueDepthMetric metrics.Gauge modifiedWRPCount metrics.Counter - querylatency metrics.Histogram incomingQueueDepth int64 maxOutstanding int64 } diff --git a/metrics.go b/metrics.go index 38a92659..38130895 100644 --- a/metrics.go +++ b/metrics.go @@ -30,6 +30,7 @@ const ( emptyContentTypeReason = "empty_content_type" emptyUUIDReason = "empty_uuid" bothEmptyReason = "empty_uuid_and_content_type" + networkError = "network_err" ) func Metrics() []xmetrics.Metric { @@ -159,7 +160,7 @@ func CreateOutbounderMetrics(m CaduceusMetricsRegistry, c *CaduceusOutboundSende c.droppedCutoffCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "cut_off") c.droppedInvalidConfig = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "invalid_config") - c.droppedNetworkErrCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "network_err") + c.droppedNetworkErrCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", networkError) c.droppedPanic = m.NewCounter(DropsDueToPanic).With("url", c.id) c.queueDepthGauge = m.NewGauge(OutgoingQueueDepth).With("url", c.id) c.renewalTimeGauge = m.NewGauge(ConsumerRenewalTimeGauge).With("url", c.id) @@ -167,5 +168,4 @@ func CreateOutbounderMetrics(m CaduceusMetricsRegistry, c *CaduceusOutboundSende c.dropUntilGauge = m.NewGauge(ConsumerDropUntilGauge).With("url", c.id) c.currentWorkersGauge = m.NewGauge(ConsumerDeliveryWorkersGauge).With("url", c.id) c.maxWorkersGauge = m.NewGauge(ConsumerMaxDeliveryWorkersGauge).With("url", c.id) - c.querylatency = m.NewHistogram(QueryDurationSecondsHistogram, 11).With("url", c.id, "code", "200") } diff --git a/outboundSender.go b/outboundSender.go index 0b657bca..22d63af9 100644 --- a/outboundSender.go +++ b/outboundSender.go @@ -73,10 +73,10 @@ type OutboundSenderFactory struct { // The http client Do() function to use for outbound requests. // Sender func(*http.Request) (*http.Response, error) - Sender HTTPClient + Sender httpClient // - ClientMiddleware func(HTTPClient) HTTPClient + ClientMiddleware func(httpClient) httpClient // The number of delivery workers to create and use. NumWorkers int @@ -107,6 +107,8 @@ type OutboundSenderFactory struct { // DisablePartnerIDs dictates whether or not to enforce the partner ID check. DisablePartnerIDs bool + + QueryLatency metrics.Histogram } type OutboundSender interface { @@ -123,7 +125,7 @@ type CaduceusOutboundSender struct { listener ancla.InternalWebhook deliverUntil time.Time dropUntil time.Time - sender HTTPClient + sender httpClient events []*regexp.Regexp matcher []*regexp.Regexp queueSize int @@ -157,7 +159,7 @@ type CaduceusOutboundSender struct { queue atomic.Value customPIDs []string disablePartnerIDs bool - clientMiddleware func(HTTPClient) HTTPClient + clientMiddleware func(httpClient) httpClient } // New creates a new OutboundSender object from the factory, or returns an error. @@ -236,7 +238,7 @@ func (osf OutboundSenderFactory) New() (obs OutboundSender, err error) { return } -func NopHTTPClient(next HTTPClient) HTTPClient { +func NopHTTPClient(next httpClient) httpClient { return next } @@ -674,6 +676,8 @@ func (obs *CaduceusOutboundSender) send(urls *ring.Ring, secret, acceptType stri retryer := xhttp.RetryTransactor(retryOptions, obs.sender.Do) resp, err := retryer(req) + //hist, err := newMetricWrapper(nil, obs.querylatency) + // thisTrip := hist.roundTripper(obs.sender) code := "failure" l := obs.logger if nil != err { @@ -689,6 +693,7 @@ func (obs *CaduceusOutboundSender) send(urls *ring.Ring, secret, acceptType stri if nil != resp.Body { io.Copy(ioutil.Discard, resp.Body) resp.Body.Close() + // thisTrip.Do(req) } } obs.deliveryCounter.With("url", obs.id, "code", code, "event", event).Add(1.0) @@ -777,5 +782,6 @@ func (obs *CaduceusOutboundSender) queueOverflow() { if nil != resp.Body { io.Copy(ioutil.Discard, resp.Body) resp.Body.Close() + } } diff --git a/outboundSender_test.go b/outboundSender_test.go index 3447a79a..c1133a38 100644 --- a/outboundSender_test.go +++ b/outboundSender_test.go @@ -148,6 +148,7 @@ func simpleFactorySetup(trans *transport, cutOffPeriod time.Duration, matcher [] // Fake Latency fakeLatency := new(mockHistogram) fakeLatency.On("With", []string{"url", w.Webhook.Config.URL, "code", "200"}).Return(fakeLatency) + fakeLatency.On("With", []string{"url", w.Webhook.Config.URL}).Return(fakeLatency) fakeLatency.On("Observe", 1.0).Return() // Build a registry and register all fake metrics, these are synymous with the metrics in diff --git a/senderWrapper.go b/senderWrapper.go index e541ac2f..04f948b0 100644 --- a/senderWrapper.go +++ b/senderWrapper.go @@ -60,7 +60,7 @@ type SenderWrapperFactory struct { Logger log.Logger // The http client Do() function to share with OutboundSenders. - Sender HTTPClient + Sender httpClient // CustomPIDs is a custom list of allowed PartnerIDs that will be used if a message // has no partner IDs. @@ -68,6 +68,8 @@ type SenderWrapperFactory struct { // DisablePartnerIDs dictates whether or not to enforce the partner ID check. DisablePartnerIDs bool + + Querylatency metrics.Histogram } type SenderWrapper interface { @@ -78,7 +80,7 @@ type SenderWrapper interface { // CaduceusSenderWrapper contains no external parameters. type CaduceusSenderWrapper struct { - sender HTTPClient + sender httpClient numWorkersPerSender int queueSizePerSender int deliveryRetries int @@ -90,10 +92,12 @@ type CaduceusSenderWrapper struct { senders map[string]OutboundSender metricsRegistry CaduceusMetricsRegistry eventType metrics.Counter + queryLatency metrics.Histogram wg sync.WaitGroup shutdown chan struct{} customPIDs []string disablePartnerIDs bool + clientMiddleware func(httpClient) httpClient } // New produces a new SenderWrapper implemented by CaduceusSenderWrapper @@ -119,6 +123,9 @@ func (swf SenderWrapperFactory) New() (sw SenderWrapper, err error) { return } + measure := NewMetricWrapperMeasures(swf.MetricsRegistry) + caduceusSenderWrapper.queryLatency = measure + caduceusSenderWrapper.eventType = swf.MetricsRegistry.NewCounter(IncomingEventTypeCounter) caduceusSenderWrapper.senders = make(map[string]OutboundSender) @@ -131,6 +138,10 @@ func (swf SenderWrapperFactory) New() (sw SenderWrapper, err error) { return } +func NewMetricWrapperMeasures(m CaduceusMetricsRegistry) metrics.Histogram { + return m.NewHistogram(QueryDurationSecondsHistogram, 11) +} + // Update is called when we get changes to our webhook listeners with either // additions, or updates. This code takes care of building new OutboundSenders // and maintaining the existing OutboundSenders. @@ -147,6 +158,8 @@ func (sw *CaduceusSenderWrapper) Update(list []ancla.InternalWebhook) { Logger: sw.logger, CustomPIDs: sw.customPIDs, DisablePartnerIDs: sw.disablePartnerIDs, + ClientMiddleware: sw.clientMiddleware, + QueryLatency: sw.queryLatency, } ids := make([]struct { @@ -164,6 +177,12 @@ func (sw *CaduceusSenderWrapper) Update(list []ancla.InternalWebhook) { sender, ok := sw.senders[inValue.ID] if !ok { osf.Listener = inValue.Listener + metricWrapper, err := newMetricWrapper(time.Now, osf.QueryLatency.With("url", inValue.ID)) + + if err != nil { + continue + } + osf.ClientMiddleware = metricWrapper.roundTripper obs, err := osf.New() if nil == err { sw.senders[inValue.ID] = obs diff --git a/senderWrapper_test.go b/senderWrapper_test.go index 6b514cb7..112f1f90 100644 --- a/senderWrapper_test.go +++ b/senderWrapper_test.go @@ -80,6 +80,8 @@ func getFakeFactory() *SenderWrapperFactory { fakeLatency := new(mockHistogram) fakeLatency.On("With", []string{"url", "http://localhost:8888/foo", "code", "200"}).Return(fakeLatency) fakeLatency.On("With", []string{"url", "http://localhost:9999/foo", "code", "200"}).Return(fakeLatency) + fakeLatency.On("With", []string{"url", "http://localhost:8888/foo"}).Return(fakeLatency) + fakeLatency.On("With", []string{"url", "http://localhost:9999/foo"}).Return(fakeLatency) fakeLatency.On("Observe", 1.0).Return() fakeIgnore := new(mockCounter) From 9dc9457a5747799c76e7c63624224f90fa5cdd8e Mon Sep 17 00:00:00 2001 From: Serena Zamarripa Date: Tue, 10 May 2022 00:54:28 -0500 Subject: [PATCH 07/15] Reworked so outboundSender uses clientMiddleware --- HttpClient.go | 8 +++----- outboundSender.go | 7 +++---- senderWrapper.go | 8 +------- 3 files changed, 7 insertions(+), 16 deletions(-) diff --git a/HttpClient.go b/HttpClient.go index 49235b33..ab229c71 100644 --- a/HttpClient.go +++ b/HttpClient.go @@ -43,14 +43,12 @@ func (m *metricWrapper) roundTripper(next httpClient) httpClient { startTime := m.now() resp, err := next.Do(req) endTime := m.now() - var code string + code := networkError - if err != nil { - code = networkError + if err == nil { + code = strconv.Itoa(resp.StatusCode) } - code = strconv.Itoa(resp.StatusCode) - // find time difference, add to metric var latency = endTime.Sub(startTime) m.queryLatency.With("code", code).Observe(latency.Seconds()) diff --git a/outboundSender.go b/outboundSender.go index 22d63af9..6fe751f6 100644 --- a/outboundSender.go +++ b/outboundSender.go @@ -675,9 +675,9 @@ func (obs *CaduceusOutboundSender) send(urls *ring.Ring, secret, acceptType stri ) retryer := xhttp.RetryTransactor(retryOptions, obs.sender.Do) - resp, err := retryer(req) - //hist, err := newMetricWrapper(nil, obs.querylatency) - // thisTrip := hist.roundTripper(obs.sender) + client := obs.clientMiddleware(DoerFunc(retryer)) + resp, err := client.Do(req) + code := "failure" l := obs.logger if nil != err { @@ -693,7 +693,6 @@ func (obs *CaduceusOutboundSender) send(urls *ring.Ring, secret, acceptType stri if nil != resp.Body { io.Copy(ioutil.Discard, resp.Body) resp.Body.Close() - // thisTrip.Do(req) } } obs.deliveryCounter.With("url", obs.id, "code", code, "event", event).Add(1.0) diff --git a/senderWrapper.go b/senderWrapper.go index 04f948b0..a5344a21 100644 --- a/senderWrapper.go +++ b/senderWrapper.go @@ -68,8 +68,6 @@ type SenderWrapperFactory struct { // DisablePartnerIDs dictates whether or not to enforce the partner ID check. DisablePartnerIDs bool - - Querylatency metrics.Histogram } type SenderWrapper interface { @@ -97,7 +95,6 @@ type CaduceusSenderWrapper struct { shutdown chan struct{} customPIDs []string disablePartnerIDs bool - clientMiddleware func(httpClient) httpClient } // New produces a new SenderWrapper implemented by CaduceusSenderWrapper @@ -123,9 +120,7 @@ func (swf SenderWrapperFactory) New() (sw SenderWrapper, err error) { return } - measure := NewMetricWrapperMeasures(swf.MetricsRegistry) - caduceusSenderWrapper.queryLatency = measure - + caduceusSenderWrapper.queryLatency = NewMetricWrapperMeasures(swf.MetricsRegistry) caduceusSenderWrapper.eventType = swf.MetricsRegistry.NewCounter(IncomingEventTypeCounter) caduceusSenderWrapper.senders = make(map[string]OutboundSender) @@ -158,7 +153,6 @@ func (sw *CaduceusSenderWrapper) Update(list []ancla.InternalWebhook) { Logger: sw.logger, CustomPIDs: sw.customPIDs, DisablePartnerIDs: sw.disablePartnerIDs, - ClientMiddleware: sw.clientMiddleware, QueryLatency: sw.queryLatency, } From 0ec338749eaa3db20adfb84d59002e0e5d25a3dc Mon Sep 17 00:00:00 2001 From: Serena Zamarripa Date: Tue, 17 May 2022 14:09:06 -0500 Subject: [PATCH 08/15] Rename HttpClient.go to httpClient.go --- HttpClient.go => httpClient.go | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename HttpClient.go => httpClient.go (100%) diff --git a/HttpClient.go b/httpClient.go similarity index 100% rename from HttpClient.go rename to httpClient.go From ed60c0327ecf7740abf7d517603d715e55d97a0d Mon Sep 17 00:00:00 2001 From: Serena Zamarripa Date: Tue, 17 May 2022 14:14:29 -0500 Subject: [PATCH 09/15] Refactored code based on PR suggestions --- HttpClient.go | 10 +++++++--- main.go | 2 +- metrics.go | 5 +++++ mocks_test.go | 4 ++-- outboundSender.go | 9 ++------- outboundSender_test.go | 20 ++++++++++---------- senderWrapper.go | 4 ---- senderWrapper_test.go | 2 +- 8 files changed, 28 insertions(+), 28 deletions(-) diff --git a/HttpClient.go b/HttpClient.go index ab229c71..495de55c 100644 --- a/HttpClient.go +++ b/HttpClient.go @@ -13,10 +13,14 @@ type httpClient interface { Do(*http.Request) (*http.Response, error) } +func noHTTPClient(next httpClient) httpClient { + return next +} + // DoerFunc implements HTTPClient -type DoerFunc func(*http.Request) (*http.Response, error) +type doerFunc func(*http.Request) (*http.Response, error) -func (d DoerFunc) Do(req *http.Request) (*http.Response, error) { +func (d doerFunc) Do(req *http.Request) (*http.Response, error) { return d(req) } @@ -39,7 +43,7 @@ func newMetricWrapper(now func() time.Time, queryLatency metrics.Histogram) (*me } func (m *metricWrapper) roundTripper(next httpClient) httpClient { - return DoerFunc(func(req *http.Request) (*http.Response, error) { + return doerFunc(func(req *http.Request) (*http.Response, error) { startTime := m.now() resp, err := next.Do(req) endTime := m.now() diff --git a/main.go b/main.go index 5538914b..8d2fdf35 100644 --- a/main.go +++ b/main.go @@ -132,7 +132,7 @@ func caduceus(arguments []string) int { DeliveryInterval: caduceusConfig.Sender.DeliveryInterval, MetricsRegistry: metricsRegistry, Logger: logger, - Sender: DoerFunc((&http.Client{ + Sender: doerFunc((&http.Client{ Transport: tr, Timeout: caduceusConfig.Sender.ClientTimeout, }).Do), diff --git a/metrics.go b/metrics.go index 38130895..7d82b9e4 100644 --- a/metrics.go +++ b/metrics.go @@ -1,6 +1,7 @@ package main import ( + "github.com/go-kit/kit/metrics" "github.com/xmidt-org/webpa-common/v2/xmetrics" ) @@ -169,3 +170,7 @@ func CreateOutbounderMetrics(m CaduceusMetricsRegistry, c *CaduceusOutboundSende c.currentWorkersGauge = m.NewGauge(ConsumerDeliveryWorkersGauge).With("url", c.id) c.maxWorkersGauge = m.NewGauge(ConsumerMaxDeliveryWorkersGauge).With("url", c.id) } + +func NewMetricWrapperMeasures(m CaduceusMetricsRegistry) metrics.Histogram { + return m.NewHistogram(QueryDurationSecondsHistogram, 11) +} diff --git a/mocks_test.go b/mocks_test.go index 66d004ce..8abface8 100644 --- a/mocks_test.go +++ b/mocks_test.go @@ -110,8 +110,8 @@ func (m *mockHistogram) With(labelValues ...string) metrics.Histogram { panic("not UTF-8") } } - args := m.Called(labelValues) - return args.Get(0).(metrics.Histogram) + m.Called(labelValues) + return m } // mockCaduceusMetricsRegistry provides the mock implementation of the diff --git a/outboundSender.go b/outboundSender.go index 6fe751f6..c5c0a241 100644 --- a/outboundSender.go +++ b/outboundSender.go @@ -148,7 +148,6 @@ type CaduceusOutboundSender struct { maxWorkersGauge metrics.Gauge currentWorkersGauge metrics.Gauge deliveryRetryMaxGauge metrics.Gauge - querylatency metrics.Histogram wg sync.WaitGroup cutOffPeriod time.Duration workers semaphore.Interface @@ -169,7 +168,7 @@ func (osf OutboundSenderFactory) New() (obs OutboundSender, err error) { } if nil == osf.ClientMiddleware { - osf.ClientMiddleware = NopHTTPClient + osf.ClientMiddleware = noHTTPClient } if nil == osf.Sender { @@ -238,10 +237,6 @@ func (osf OutboundSenderFactory) New() (obs OutboundSender, err error) { return } -func NopHTTPClient(next httpClient) httpClient { - return next -} - // Update applies user configurable values for the outbound sender when a // webhook is registered func (obs *CaduceusOutboundSender) Update(wh ancla.InternalWebhook) (err error) { @@ -675,7 +670,7 @@ func (obs *CaduceusOutboundSender) send(urls *ring.Ring, secret, acceptType stri ) retryer := xhttp.RetryTransactor(retryOptions, obs.sender.Do) - client := obs.clientMiddleware(DoerFunc(retryer)) + client := obs.clientMiddleware(doerFunc(retryer)) resp, err := client.Do(req) code := "failure" diff --git a/outboundSender_test.go b/outboundSender_test.go index c1133a38..562ecab7 100644 --- a/outboundSender_test.go +++ b/outboundSender_test.go @@ -173,7 +173,7 @@ func simpleFactorySetup(trans *transport, cutOffPeriod time.Duration, matcher [] return &OutboundSenderFactory{ Listener: w, - Sender: DoerFunc((&http.Client{Transport: trans}).Do), + Sender: doerFunc((&http.Client{Transport: trans}).Do), CutOffPeriod: cutOffPeriod, NumWorkers: 10, QueueSize: 10, @@ -590,7 +590,7 @@ func TestInvalidEventRegex(t *testing.T) { obs, err := OutboundSenderFactory{ Listener: w, - Sender: DoerFunc((&http.Client{}).Do), + Sender: doerFunc((&http.Client{}).Do), NumWorkers: 10, QueueSize: 10, Logger: log.NewNopLogger(), @@ -616,7 +616,7 @@ func TestInvalidUrl(t *testing.T) { obs, err := OutboundSenderFactory{ Listener: w, - Sender: DoerFunc((&http.Client{}).Do), + Sender: doerFunc((&http.Client{}).Do), NumWorkers: 10, QueueSize: 10, Logger: log.NewNopLogger(), @@ -634,7 +634,7 @@ func TestInvalidUrl(t *testing.T) { obs, err = OutboundSenderFactory{ Listener: w2, - Sender: DoerFunc((&http.Client{}).Do), + Sender: doerFunc((&http.Client{}).Do), NumWorkers: 10, QueueSize: 10, Logger: log.NewNopLogger(), @@ -672,7 +672,7 @@ func TestInvalidLogger(t *testing.T) { trans := &transport{} obsf := simpleFactorySetup(trans, time.Second, nil) obsf.Listener = w - obsf.Sender = DoerFunc((&http.Client{}).Do) + obsf.Sender = doerFunc((&http.Client{}).Do) obsf.Logger = nil obs, err := obsf.New() @@ -697,7 +697,7 @@ func TestFailureURL(t *testing.T) { trans := &transport{} obsf := simpleFactorySetup(trans, time.Second, nil) obsf.Listener = w - obsf.Sender = DoerFunc((&http.Client{}).Do) + obsf.Sender = doerFunc((&http.Client{}).Do) obs, err := obsf.New() assert.Nil(obs) assert.NotNil(err) @@ -718,7 +718,7 @@ func TestInvalidEvents(t *testing.T) { trans := &transport{} obsf := simpleFactorySetup(trans, time.Second, nil) obsf.Listener = w - obsf.Sender = DoerFunc((&http.Client{}).Do) + obsf.Sender = doerFunc((&http.Client{}).Do) obs, err := obsf.New() assert.Nil(obs) @@ -735,7 +735,7 @@ func TestInvalidEvents(t *testing.T) { obsf = simpleFactorySetup(trans, time.Second, nil) obsf.Listener = w2 - obsf.Sender = DoerFunc((&http.Client{}).Do) + obsf.Sender = doerFunc((&http.Client{}).Do) obs, err = obsf.New() assert.Nil(obs) @@ -770,7 +770,7 @@ func TestUpdate(t *testing.T) { trans := &transport{} obsf := simpleFactorySetup(trans, time.Second, nil) obsf.Listener = w1 - obsf.Sender = DoerFunc((&http.Client{}).Do) + obsf.Sender = doerFunc((&http.Client{}).Do) obs, err := obsf.New() assert.Nil(err) @@ -805,7 +805,7 @@ func TestOverflowNoFailureURL(t *testing.T) { obsf := simpleFactorySetup(trans, time.Second, nil) obsf.Listener = w obsf.Logger = logger - obsf.Sender = DoerFunc((&http.Client{}).Do) + obsf.Sender = doerFunc((&http.Client{}).Do) obs, err := obsf.New() assert.Nil(err) diff --git a/senderWrapper.go b/senderWrapper.go index a5344a21..c2b3db2b 100644 --- a/senderWrapper.go +++ b/senderWrapper.go @@ -133,10 +133,6 @@ func (swf SenderWrapperFactory) New() (sw SenderWrapper, err error) { return } -func NewMetricWrapperMeasures(m CaduceusMetricsRegistry) metrics.Histogram { - return m.NewHistogram(QueryDurationSecondsHistogram, 11) -} - // Update is called when we get changes to our webhook listeners with either // additions, or updates. This code takes care of building new OutboundSenders // and maintaining the existing OutboundSenders. diff --git a/senderWrapper_test.go b/senderWrapper_test.go index 112f1f90..d1cb2ed9 100644 --- a/senderWrapper_test.go +++ b/senderWrapper_test.go @@ -178,7 +178,7 @@ func TestSwSimple(t *testing.T) { trans := &swTransport{} swf := getFakeFactory() - swf.Sender = DoerFunc((&http.Client{}).Do) + swf.Sender = doerFunc((&http.Client{}).Do) swf.Linger = 1 * time.Second sw, err := swf.New() From 4529711744a6260810204c82b0c02b2e759778c6 Mon Sep 17 00:00:00 2001 From: Serena Zamarripa Date: Wed, 18 May 2022 16:30:19 -0500 Subject: [PATCH 10/15] Roundtripper test function added, Successful path --- httpClient_test.go | 86 ++++++++++++++++++++++++++++++++++++++++++++++ mocks_test.go | 27 ++++++++++++++- 2 files changed, 112 insertions(+), 1 deletion(-) create mode 100644 httpClient_test.go diff --git a/httpClient_test.go b/httpClient_test.go new file mode 100644 index 00000000..e72a183f --- /dev/null +++ b/httpClient_test.go @@ -0,0 +1,86 @@ +/** + * Copyright 2022 Comcast Cable Communications Management, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package main + +import ( + "net/http" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func TestRoundTripper(t *testing.T) { + + tests := []struct { + description string + startTime time.Time + endTime time.Time + expectedResponse int + request *http.Request + }{ + { + description: "Success", + startTime: time.Date(2021, time.Month(2), 21, 1, 10, 30, 0, time.UTC), + endTime: time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC), + expectedResponse: http.StatusOK, + request: exampleRequest(1), + }, + } + + for _, tc := range tests { + + fakeTime := mockTime(tc.startTime, tc.endTime) + fakeHandler := new(mockHandler) + fakeHist := new(mockHistogram) + fakeHist.On("With", mock.AnythingOfType("[]string")).Return().Times(2) + fakeHist.On("Observe", mock.AnythingOfType("float64")).Return().Times(2) + + t.Run(tc.description, func(t *testing.T) { + + // Create a roundtripper with mock time and mock histogram + m, err := newMetricWrapper(fakeTime, fakeHist) + + // Create an http response + expected := http.Response{ + StatusCode: tc.expectedResponse, + } + + client := &mockHttpClient{ + MockDo: func(*http.Request) (*http.Response, error) { + return &expected, nil + }, + } + + c := m.roundTripper(client) + resp, err := c.Do(tc.request) + + // Check response + assert.Equal(t, resp.StatusCode, tc.expectedResponse) + + // Check Error + assert.Equal(t, err, nil) + + // Check the histogram + fakeHandler.AssertExpectations(t) + + }) + } + +} diff --git a/mocks_test.go b/mocks_test.go index 8abface8..ea8e1e41 100644 --- a/mocks_test.go +++ b/mocks_test.go @@ -17,6 +17,8 @@ package main import ( + "net/http" + "time" "unicode/utf8" "github.com/go-kit/kit/metrics" @@ -95,7 +97,7 @@ func (m *mockGauge) With(labelValues ...string) metrics.Gauge { return args.Get(0).(metrics.Gauge) } -// // mockHistogram provides the mock implementation of the metrics.Histogram object +// mockHistogram provides the mock implementation of the metrics.Histogram object type mockHistogram struct { mock.Mock } @@ -134,3 +136,26 @@ func (m *mockCaduceusMetricsRegistry) NewHistogram(name string, buckets int) met args := m.Called(name) return args.Get(0).(metrics.Histogram) } + +// mockTime provides two mock time values +func mockTime(one, two time.Time) func() time.Time { + var called bool + return func() time.Time { + if called { + return two + } + called = true + return one + } +} + +// mockHttpClient provides a mock implementation for the httpClient interface +type mockHttpClient struct { + MockDo func(*http.Request) (*http.Response, error) +} + +//type mockDoerFunc func(*http.Request) (*http.Response, error) + +func (d mockHttpClient) Do(req *http.Request) (*http.Response, error) { + return d.MockDo(req) +} From bfc91c82f506731b21874a3ca9c5829caf20f2a7 Mon Sep 17 00:00:00 2001 From: Serena Zamarripa Date: Thu, 19 May 2022 16:22:38 -0500 Subject: [PATCH 11/15] Added newMetricWrapper tests, made suggested changes --- httpClient_test.go | 92 ++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 80 insertions(+), 12 deletions(-) diff --git a/httpClient_test.go b/httpClient_test.go index e72a183f..aebf05b5 100644 --- a/httpClient_test.go +++ b/httpClient_test.go @@ -18,12 +18,15 @@ package main import ( + "errors" "net/http" "testing" "time" + "github.com/go-kit/kit/metrics" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" ) func TestRoundTripper(t *testing.T) { @@ -34,6 +37,7 @@ func TestRoundTripper(t *testing.T) { endTime time.Time expectedResponse int request *http.Request + expectedErr error }{ { description: "Success", @@ -41,32 +45,41 @@ func TestRoundTripper(t *testing.T) { endTime: time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC), expectedResponse: http.StatusOK, request: exampleRequest(1), + expectedErr: nil, + }, + { + description: "503 Service Unavailable", + startTime: time.Date(2021, time.Month(2), 21, 1, 10, 30, 0, time.UTC), + endTime: time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC), + expectedResponse: http.StatusServiceUnavailable, + request: exampleRequest(1), + expectedErr: nil, }, } for _, tc := range tests { - fakeTime := mockTime(tc.startTime, tc.endTime) - fakeHandler := new(mockHandler) - fakeHist := new(mockHistogram) - fakeHist.On("With", mock.AnythingOfType("[]string")).Return().Times(2) - fakeHist.On("Observe", mock.AnythingOfType("float64")).Return().Times(2) - t.Run(tc.description, func(t *testing.T) { + fakeTime := mockTime(tc.startTime, tc.endTime) + fakeHandler := new(mockHandler) + fakeHist := new(mockHistogram) + fakeHist.On("With", mock.AnythingOfType("[]string")).Return().Times(2) + fakeHist.On("Observe", mock.AnythingOfType("float64")).Return().Times(2) + // Create a roundtripper with mock time and mock histogram m, err := newMetricWrapper(fakeTime, fakeHist) + require.NoError(t, err) + require.NotNil(t, m) // Create an http response expected := http.Response{ StatusCode: tc.expectedResponse, } - client := &mockHttpClient{ - MockDo: func(*http.Request) (*http.Response, error) { - return &expected, nil - }, - } + client := doerFunc(func(*http.Request) (*http.Response, error) { + return &expected, tc.expectedErr + }) c := m.roundTripper(client) resp, err := c.Do(tc.request) @@ -75,7 +88,11 @@ func TestRoundTripper(t *testing.T) { assert.Equal(t, resp.StatusCode, tc.expectedResponse) // Check Error - assert.Equal(t, err, nil) + if tc.expectedErr != nil { + assert.ErrorIs(t, tc.expectedErr, err) + } else { + assert.NoError(t, err) + } // Check the histogram fakeHandler.AssertExpectations(t) @@ -84,3 +101,54 @@ func TestRoundTripper(t *testing.T) { } } + +func TestNewMetricWrapper(t *testing.T) { + + tests := []struct { + description string + expectedErr error + fakeTime func() time.Time + fakeHistogram metrics.Histogram + }{ + { + description: "Success", + expectedErr: nil, + fakeTime: time.Now, + fakeHistogram: &mockHistogram{}, + }, + { + description: "Nil Histogram", + expectedErr: errors.New("histogram cannot be nil"), + fakeTime: time.Now, + fakeHistogram: nil, + }, + { + description: "Nil Time", + expectedErr: nil, + fakeTime: nil, + fakeHistogram: &mockHistogram{}, + }, + } + + for _, tc := range tests { + + t.Run(tc.description, func(t *testing.T) { + + // Make function call + mw, err := newMetricWrapper(tc.fakeTime, tc.fakeHistogram) + + // Check histogram, nil should send error + if tc.fakeHistogram == nil { + assert.Equal(t, tc.expectedErr, err) + } + + // Check time, nil should not error + if tc.expectedErr == nil { + require.NoError(t, err) + require.NotNil(t, mw) + } + + }) + } + +} From a5a95ced4fe229c6943b5a104e2d296872293d1a Mon Sep 17 00:00:00 2001 From: Serena Zamarripa Date: Tue, 24 May 2022 01:48:03 -0500 Subject: [PATCH 12/15] Updated to set histogram code correctly --- httpClient.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/httpClient.go b/httpClient.go index 495de55c..ebdba5de 100644 --- a/httpClient.go +++ b/httpClient.go @@ -3,7 +3,6 @@ package main import ( "errors" "net/http" - "strconv" "time" "github.com/go-kit/kit/metrics" @@ -50,7 +49,7 @@ func (m *metricWrapper) roundTripper(next httpClient) httpClient { code := networkError if err == nil { - code = strconv.Itoa(resp.StatusCode) + code = resp.Status } // find time difference, add to metric From a1ae1fedb9118362ecc4853f19d5d874c2367d09 Mon Sep 17 00:00:00 2001 From: Serena Zamarripa Date: Tue, 24 May 2022 01:48:45 -0500 Subject: [PATCH 13/15] Added network error test case, refactoring --- httpClient_test.go | 34 +++++++++++++++++++++++----------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/httpClient_test.go b/httpClient_test.go index aebf05b5..e0943e26 100644 --- a/httpClient_test.go +++ b/httpClient_test.go @@ -30,31 +30,42 @@ import ( ) func TestRoundTripper(t *testing.T) { + errTest := errors.New("test error") + date1 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 0, time.UTC) + date2 := time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC) tests := []struct { description string startTime time.Time endTime time.Time - expectedResponse int + expectedResponse string request *http.Request expectedErr error }{ { description: "Success", - startTime: time.Date(2021, time.Month(2), 21, 1, 10, 30, 0, time.UTC), - endTime: time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC), - expectedResponse: http.StatusOK, + startTime: date1, + endTime: date2, + expectedResponse: "200 OK", request: exampleRequest(1), expectedErr: nil, }, { description: "503 Service Unavailable", - startTime: time.Date(2021, time.Month(2), 21, 1, 10, 30, 0, time.UTC), - endTime: time.Date(2021, time.Month(2), 21, 1, 10, 30, 45, time.UTC), - expectedResponse: http.StatusServiceUnavailable, + startTime: date1, + endTime: date2, + expectedResponse: "503 Service Unavailable", request: exampleRequest(1), expectedErr: nil, }, + { + description: "Network Error", + startTime: date1, + endTime: date2, + expectedResponse: "network_err", + request: exampleRequest(1), + expectedErr: errTest, + }, } for _, tc := range tests { @@ -64,8 +75,9 @@ func TestRoundTripper(t *testing.T) { fakeTime := mockTime(tc.startTime, tc.endTime) fakeHandler := new(mockHandler) fakeHist := new(mockHistogram) - fakeHist.On("With", mock.AnythingOfType("[]string")).Return().Times(2) - fakeHist.On("Observe", mock.AnythingOfType("float64")).Return().Times(2) + histogramFunctionCall := []string{"code", tc.expectedResponse} + fakeHist.On("With", histogramFunctionCall).Return().Once() + fakeHist.On("Observe", mock.AnythingOfType("float64")).Return().Once() // Create a roundtripper with mock time and mock histogram m, err := newMetricWrapper(fakeTime, fakeHist) @@ -74,7 +86,7 @@ func TestRoundTripper(t *testing.T) { // Create an http response expected := http.Response{ - StatusCode: tc.expectedResponse, + Status: tc.expectedResponse, } client := doerFunc(func(*http.Request) (*http.Response, error) { @@ -85,7 +97,7 @@ func TestRoundTripper(t *testing.T) { resp, err := c.Do(tc.request) // Check response - assert.Equal(t, resp.StatusCode, tc.expectedResponse) + assert.Equal(t, resp.Status, tc.expectedResponse) // Check Error if tc.expectedErr != nil { From 5c65198e2175ca4f46b3ca76d54ea260426644b0 Mon Sep 17 00:00:00 2001 From: Serena Zamarripa Date: Tue, 24 May 2022 11:56:40 -0500 Subject: [PATCH 14/15] Cleaned up unit tests, added changes to changelog --- CHANGELOG.md | 1 + httpClient.go | 6 +++++- httpClient_test.go | 29 +++++++++++++++++------------ 3 files changed, 23 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 08f4d2ed..c0466913 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). ## [Unreleased] +- Added latency metric, which Tracks the time spent waiting on outbound client URLs to respond. [#312](https://github.com/xmidt-org/caduceus/pull/312) ## [v0.6.4] - Fixed issue of endpoints no longer being found. [#311](https://github.com/xmidt-org/caduceus/pull/311) diff --git a/httpClient.go b/httpClient.go index ebdba5de..bb877584 100644 --- a/httpClient.go +++ b/httpClient.go @@ -8,6 +8,10 @@ import ( "github.com/go-kit/kit/metrics" ) +var ( + errNilHistogram = errors.New("histogram cannot be nil") +) + type httpClient interface { Do(*http.Request) (*http.Response, error) } @@ -33,7 +37,7 @@ func newMetricWrapper(now func() time.Time, queryLatency metrics.Histogram) (*me now = time.Now } if queryLatency == nil { - return nil, errors.New("histogram cannot be nil") + return nil, errNilHistogram } return &metricWrapper{ now: now, diff --git a/httpClient_test.go b/httpClient_test.go index e0943e26..116f3d93 100644 --- a/httpClient_test.go +++ b/httpClient_test.go @@ -96,9 +96,6 @@ func TestRoundTripper(t *testing.T) { c := m.roundTripper(client) resp, err := c.Do(tc.request) - // Check response - assert.Equal(t, resp.Status, tc.expectedResponse) - // Check Error if tc.expectedErr != nil { assert.ErrorIs(t, tc.expectedErr, err) @@ -106,8 +103,12 @@ func TestRoundTripper(t *testing.T) { assert.NoError(t, err) } - // Check the histogram + // Check response + assert.Equal(t, resp.Status, tc.expectedResponse) + + // Check the histogram and expectations fakeHandler.AssertExpectations(t) + fakeHist.AssertExpectations(t) }) } @@ -130,7 +131,7 @@ func TestNewMetricWrapper(t *testing.T) { }, { description: "Nil Histogram", - expectedErr: errors.New("histogram cannot be nil"), + expectedErr: errNilHistogram, fakeTime: time.Now, fakeHistogram: nil, }, @@ -149,17 +150,21 @@ func TestNewMetricWrapper(t *testing.T) { // Make function call mw, err := newMetricWrapper(tc.fakeTime, tc.fakeHistogram) - // Check histogram, nil should send error - if tc.fakeHistogram == nil { - assert.Equal(t, tc.expectedErr, err) - } - - // Check time, nil should not error if tc.expectedErr == nil { - require.NoError(t, err) + // Check for no errors + assert.NoError(t, err) require.NotNil(t, mw) + + // Check that the time and histogram aren't nil + assert.NotNil(t, mw.now) + assert.NotNil(t, mw.queryLatency) + return } + // with error checks + assert.Nil(t, mw) + assert.ErrorIs(t, err, tc.expectedErr) + }) } From 25d9d785e7914d85b0ee441333c95cd15c342301 Mon Sep 17 00:00:00 2001 From: Serena Zamarripa Date: Wed, 25 May 2022 13:03:34 -0500 Subject: [PATCH 15/15] Updated unit tests per PR suggestions --- httpClient.go | 7 ++--- httpClient_test.go | 67 +++++++++++++++++++++++++--------------------- mocks_test.go | 12 --------- outboundSender.go | 2 +- 4 files changed, 42 insertions(+), 46 deletions(-) diff --git a/httpClient.go b/httpClient.go index bb877584..e6a96216 100644 --- a/httpClient.go +++ b/httpClient.go @@ -3,20 +3,21 @@ package main import ( "errors" "net/http" + "strconv" "time" "github.com/go-kit/kit/metrics" ) var ( - errNilHistogram = errors.New("histogram cannot be nil") + errNilHistogram = errors.New("histogram cannot be nil") ) type httpClient interface { Do(*http.Request) (*http.Response, error) } -func noHTTPClient(next httpClient) httpClient { +func nopHTTPClient(next httpClient) httpClient { return next } @@ -53,7 +54,7 @@ func (m *metricWrapper) roundTripper(next httpClient) httpClient { code := networkError if err == nil { - code = resp.Status + code = strconv.Itoa(resp.StatusCode) } // find time difference, add to metric diff --git a/httpClient_test.go b/httpClient_test.go index 116f3d93..bb1898f7 100644 --- a/httpClient_test.go +++ b/httpClient_test.go @@ -19,13 +19,14 @@ package main import ( "errors" + "io" + "io/ioutil" "net/http" "testing" "time" "github.com/go-kit/kit/metrics" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) @@ -38,33 +39,41 @@ func TestRoundTripper(t *testing.T) { description string startTime time.Time endTime time.Time - expectedResponse string + expectedCode string request *http.Request expectedErr error + expectedResponse *http.Response }{ { - description: "Success", - startTime: date1, - endTime: date2, - expectedResponse: "200 OK", - request: exampleRequest(1), - expectedErr: nil, + description: "Success", + startTime: date1, + endTime: date2, + expectedCode: "200", + request: exampleRequest(1), + expectedErr: nil, + expectedResponse: &http.Response{ + StatusCode: 200, + }, }, { - description: "503 Service Unavailable", - startTime: date1, - endTime: date2, - expectedResponse: "503 Service Unavailable", - request: exampleRequest(1), - expectedErr: nil, + description: "503 Service Unavailable", + startTime: date1, + endTime: date2, + expectedCode: "503", + request: exampleRequest(1), + expectedErr: nil, + expectedResponse: &http.Response{ + StatusCode: 503, + }, }, { description: "Network Error", startTime: date1, endTime: date2, - expectedResponse: "network_err", + expectedCode: "network_err", request: exampleRequest(1), expectedErr: errTest, + expectedResponse: nil, }, } @@ -75,37 +84,35 @@ func TestRoundTripper(t *testing.T) { fakeTime := mockTime(tc.startTime, tc.endTime) fakeHandler := new(mockHandler) fakeHist := new(mockHistogram) - histogramFunctionCall := []string{"code", tc.expectedResponse} + histogramFunctionCall := []string{"code", tc.expectedCode} + fakeLatency := date2.Sub(date1) fakeHist.On("With", histogramFunctionCall).Return().Once() - fakeHist.On("Observe", mock.AnythingOfType("float64")).Return().Once() + fakeHist.On("Observe", fakeLatency.Seconds()).Return().Once() // Create a roundtripper with mock time and mock histogram m, err := newMetricWrapper(fakeTime, fakeHist) require.NoError(t, err) require.NotNil(t, m) - // Create an http response - expected := http.Response{ - Status: tc.expectedResponse, - } - client := doerFunc(func(*http.Request) (*http.Response, error) { - return &expected, tc.expectedErr + + return tc.expectedResponse, tc.expectedErr }) c := m.roundTripper(client) resp, err := c.Do(tc.request) - // Check Error - if tc.expectedErr != nil { - assert.ErrorIs(t, tc.expectedErr, err) - } else { + if tc.expectedErr == nil { + // Read and close response body + if resp.Body != nil { + io.Copy(ioutil.Discard, resp.Body) + resp.Body.Close() + } assert.NoError(t, err) + } else { + assert.ErrorIs(t, tc.expectedErr, err) } - // Check response - assert.Equal(t, resp.Status, tc.expectedResponse) - // Check the histogram and expectations fakeHandler.AssertExpectations(t) fakeHist.AssertExpectations(t) diff --git a/mocks_test.go b/mocks_test.go index ea8e1e41..1aeed03a 100644 --- a/mocks_test.go +++ b/mocks_test.go @@ -17,7 +17,6 @@ package main import ( - "net/http" "time" "unicode/utf8" @@ -148,14 +147,3 @@ func mockTime(one, two time.Time) func() time.Time { return one } } - -// mockHttpClient provides a mock implementation for the httpClient interface -type mockHttpClient struct { - MockDo func(*http.Request) (*http.Response, error) -} - -//type mockDoerFunc func(*http.Request) (*http.Response, error) - -func (d mockHttpClient) Do(req *http.Request) (*http.Response, error) { - return d.MockDo(req) -} diff --git a/outboundSender.go b/outboundSender.go index c5c0a241..7cc0e818 100644 --- a/outboundSender.go +++ b/outboundSender.go @@ -168,7 +168,7 @@ func (osf OutboundSenderFactory) New() (obs OutboundSender, err error) { } if nil == osf.ClientMiddleware { - osf.ClientMiddleware = noHTTPClient + osf.ClientMiddleware = nopHTTPClient } if nil == osf.Sender {