diff --git a/src/caduceus/caduceus_type.go b/src/caduceus/caduceus_type.go index 4d6ad674..921241c3 100644 --- a/src/caduceus/caduceus_type.go +++ b/src/caduceus/caduceus_type.go @@ -20,7 +20,7 @@ import ( "time" "github.com/Comcast/webpa-common/logging" - "github.com/Comcast/webpa-common/wrp" + "github.com/Comcast/wrp-go/wrp" "github.com/go-kit/kit/log" "github.com/go-kit/kit/metrics" ) diff --git a/src/caduceus/caduceus_type_test.go b/src/caduceus/caduceus_type_test.go index fcfcb253..3b057799 100644 --- a/src/caduceus/caduceus_type_test.go +++ b/src/caduceus/caduceus_type_test.go @@ -20,7 +20,7 @@ import ( "testing" "github.com/Comcast/webpa-common/logging" - "github.com/Comcast/webpa-common/wrp" + "github.com/Comcast/wrp-go/wrp" "github.com/stretchr/testify/mock" ) diff --git a/src/caduceus/http.go b/src/caduceus/http.go index 60a21093..f2da2c05 100644 --- a/src/caduceus/http.go +++ b/src/caduceus/http.go @@ -22,7 +22,7 @@ import ( "sync/atomic" "github.com/Comcast/webpa-common/logging" - "github.com/Comcast/webpa-common/wrp" + "github.com/Comcast/wrp-go/wrp" "github.com/go-kit/kit/log" "github.com/go-kit/kit/metrics" "github.com/satori/go.uuid" diff --git a/src/caduceus/metrics.go b/src/caduceus/metrics.go index 10d2b0e9..70bea8d5 100644 --- a/src/caduceus/metrics.go +++ b/src/caduceus/metrics.go @@ -13,6 +13,7 @@ const ( SlowConsumerCounter = "slow_consumer_cut_off_count" IncomingQueueDepth = "incoming_queue_depth" IncomingContentTypeCounter = "incoming_content_type_count" + IncomingEventTypeCounter = "incoming_event_type_count" DropsDueToInvalidPayload = "drops_due_to_invalid_payload" OutgoingQueueDepth = "outgoing_queue_depths" ) @@ -75,5 +76,11 @@ func Metrics() []xmetrics.Metric { Type: "gauge", LabelNames: []string{"url"}, }, + { + Name: IncomingEventTypeCounter, + Help: "Incoming count of events by event type", + Type: "counter", + LabelNames: []string{"event"}, + }, } } diff --git a/src/caduceus/metrics_test.go b/src/caduceus/metrics_test.go index ac5957c2..d764b352 100644 --- a/src/caduceus/metrics_test.go +++ b/src/caduceus/metrics_test.go @@ -17,6 +17,22 @@ package main +// Using Caduceus's test suite: +// +// If you are testing a new metric the followng process needs to be done below: +// 1. Create a fake, mockMetric i.e fakeEventType := new(mockCounter) +// 2. If your metric type has yet to be included in mockCaduceusMetricRegistry within mocks.go +// add your metric type to mockCaduceusMetricRegistry +// 3. Trigger the On method on that "mockMetric" with various different cases of that metric, +// in both senderWrapper_test.go and/or outboundSender_test.go +// i.e: +// case 1: On("With", []string{"event", iot} +// case 2: On("With", []string{"event", unknown} +// Tests for all possible event_types that will be sent to the metrics Desc. If all cases arn't +// included tests will fail. +// 4. Mimic the metric behavior using On i.e if your specific metric is a counter: +// fakeSlow.On("Add", 1.0).Return() + import ( "testing" diff --git a/src/caduceus/mocks_test.go b/src/caduceus/mocks_test.go index aef15d4b..a9fb3ad6 100644 --- a/src/caduceus/mocks_test.go +++ b/src/caduceus/mocks_test.go @@ -22,7 +22,7 @@ import ( "github.com/Comcast/webpa-common/health" "github.com/Comcast/webpa-common/webhook" - "github.com/Comcast/webpa-common/wrp" + "github.com/Comcast/wrp-go/wrp" "github.com/go-kit/kit/metrics" "github.com/stretchr/testify/mock" ) @@ -72,10 +72,6 @@ func (m *mockOutboundSender) RetiredSince() time.Time { return arguments.Get(0).(time.Time) } -func (m *mockOutboundSender) Queue(msg *wrp.Message) { - m.Called(msg) -} - // mockSenderWrapper needs to mock things that the `SenderWrapper` does type mockSenderWrapper struct { mock.Mock diff --git a/src/caduceus/outboundSender.go b/src/caduceus/outboundSender.go index c25f2ba3..3a8606d3 100644 --- a/src/caduceus/outboundSender.go +++ b/src/caduceus/outboundSender.go @@ -38,9 +38,9 @@ import ( "github.com/Comcast/webpa-common/logging" "github.com/Comcast/webpa-common/semaphore" "github.com/Comcast/webpa-common/webhook" - "github.com/Comcast/webpa-common/wrp" - "github.com/Comcast/webpa-common/wrp/wrphttp" "github.com/Comcast/webpa-common/xhttp" + "github.com/Comcast/wrp-go/wrp" + "github.com/Comcast/wrp-go/wrp/wrphttp" "github.com/go-kit/kit/log" "github.com/go-kit/kit/metrics" ) @@ -52,12 +52,6 @@ const failureText = `Unfortunately, your endpoint is not able to keep up with th `capacity to handle notifications, or reduce the number of notifications ` + `you have requested.` -var ( - // eventPattern is the precompiled regex that selects the top level event - // classifier - eventPattern = regexp.MustCompile(`^event:(?P[^/]+)`) -) - // outboundRequest stores the outgoing request and assorted data that has been // collected so far (to save on processing the information again). type outboundRequest struct { @@ -139,6 +133,7 @@ type CaduceusOutboundSender struct { droppedInvalidConfig metrics.Counter cutOffCounter metrics.Counter queueDepthGauge metrics.Gauge + eventType metrics.Counter wg sync.WaitGroup cutOffPeriod time.Duration workers semaphore.Interface @@ -342,7 +337,6 @@ func (obs *CaduceusOutboundSender) Queue(msg *wrp.Message) { now := time.Now() var debugLog = logging.Debug(obs.logger) - if false == obs.isValidTimeWindow(now, dropUntil, deliverUntil) { return } @@ -352,6 +346,7 @@ func (obs *CaduceusOutboundSender) Queue(msg *wrp.Message) { debugLog.Log(logging.MessageKey(), fmt.Sprintf("Regex did not match. got != expected: '%s' != '%s'\n", msg.Destination, eventRegex.String())) + continue } @@ -511,11 +506,7 @@ func (obs *CaduceusOutboundSender) send(secret, acceptType string, msg *wrp.Mess } // find the event "short name" - match := eventPattern.FindStringSubmatch(msg.Destination) - event := "unknown" - if match != nil { - event = match[1] - } + event := msg.FindEventStringSubMatch() retryOptions := xhttp.RetryOptions{ Logger: obs.logger, diff --git a/src/caduceus/outboundSender_test.go b/src/caduceus/outboundSender_test.go index eb4e3d91..8a31556f 100644 --- a/src/caduceus/outboundSender_test.go +++ b/src/caduceus/outboundSender_test.go @@ -19,8 +19,10 @@ package main import ( "bytes" "fmt" + "github.com/Comcast/webpa-common/webhook" - "github.com/Comcast/webpa-common/wrp" + "github.com/Comcast/wrp-go/wrp" + "github.com/davecgh/go-spew/spew" "github.com/go-kit/kit/log" "github.com/stretchr/testify/assert" //"github.com/stretchr/testify/mock" @@ -56,6 +58,21 @@ func simpleSetup(trans *transport, cutOffPeriod time.Duration, matcher []string) return simpleFactorySetup(trans, cutOffPeriod, matcher).New() } +// simpleFactorySetup sets up a outboundSender with metrics. +// +// Using Caduceus's test suite +// +// If you are testing a new metric it needs to be created in this process below. +// 1. Create a fake, mockMetric i.e fakeEventType := new(mockCounter) +// 2. If your metric type has yet to be included in mockCaduceusMetricRegistry within mocks.go +// add your metric type to mockCaduceusMetricRegistry +// 3. Trigger the On method on that "mockMetric" with various different cases of that metric, +// in both senderWrapper_test.go and outboundSender_test.go +// i.e: +// case 1: On("With", []string{"event", iot} +// case 2: On("With", []string{"event", unknown} +// 4. Mimic the metric behavior using On: +// fakeSlow.On("Add", 1.0).Return() func simpleFactorySetup(trans *transport, cutOffPeriod time.Duration, matcher []string) *OutboundSenderFactory { if nil == trans.fn { trans.fn = func(req *http.Request, count int) (resp *http.Response, err error) { @@ -75,6 +92,7 @@ func simpleFactorySetup(trans *transport, cutOffPeriod time.Duration, matcher [] w.Config.Secret = "123456" w.Matcher.DeviceId = matcher + // test dc metric fakeDC := new(mockCounter) fakeDC.On("With", []string{"url", w.Config.URL, "code", "200", "event", "test"}).Return(fakeDC). On("With", []string{"url", w.Config.URL, "code", "200", "event", "iot"}).Return(fakeDC). @@ -90,10 +108,12 @@ func simpleFactorySetup(trans *transport, cutOffPeriod time.Duration, matcher [] fakeDC.On("Add", 1.0).Return() fakeDC.On("Add", 0.0).Return() + // test slow metric fakeSlow := new(mockCounter) fakeSlow.On("With", []string{"url", w.Config.URL}).Return(fakeSlow) fakeSlow.On("Add", 1.0).Return() + // test dropped metric fakeDroppedSlow := new(mockCounter) fakeDroppedSlow.On("With", []string{"url", w.Config.URL, "reason", "queue_full"}).Return(fakeDroppedSlow) fakeDroppedSlow.On("With", []string{"url", w.Config.URL, "reason", "cut_off"}).Return(fakeDroppedSlow) @@ -102,15 +122,18 @@ func simpleFactorySetup(trans *transport, cutOffPeriod time.Duration, matcher [] fakeDroppedSlow.On("With", []string{"url", w.Config.URL, "reason", "network_err"}).Return(fakeDroppedSlow) fakeDroppedSlow.On("Add", 1.0).Return() + // test queue depth fakeQdepth := new(mockGauge) fakeQdepth.On("With", []string{"url", w.Config.URL}).Return(fakeQdepth) fakeQdepth.On("Add", 1.0).Return().On("Add", -1.0).Return() + // build a registry and register all fake metrics fakeRegistry := new(mockCaduceusMetricsRegistry) fakeRegistry.On("NewCounter", DeliveryRetryCounter).Return(fakeDC) fakeRegistry.On("NewCounter", DeliveryCounter).Return(fakeDC) fakeRegistry.On("NewCounter", SlowConsumerCounter).Return(fakeSlow) fakeRegistry.On("NewCounter", SlowConsumerDroppedMsgCounter).Return(fakeDroppedSlow) + //fakeRegistry.On("NewCounter", IncomingEventTypeCounter).Return(fakeEventType) fakeRegistry.On("NewGauge", OutgoingQueueDepth).Return(fakeQdepth) return &OutboundSenderFactory{ @@ -137,24 +160,32 @@ func simpleRequest() *wrp.Message { // Simple test that covers the normal successful case with no extra matchers func TestSimpleWrp(t *testing.T) { + fmt.Printf("\n\nTestingSimpleWRP:\n\n") assert := assert.New(t) trans := &transport{} + + fmt.Printf("SimpleSetup:\n") obs, err := simpleSetup(trans, time.Second, nil) assert.NotNil(obs) assert.Nil(err) + // queue case 1 req := simpleRequest() req.Destination = "event:iot" + fmt.Printf("Queue case 1:\n %v\n", spew.Sprint(req.Destination)) obs.Queue(req) r2 := simpleRequest() r2.Destination = "event:test" + fmt.Printf("\nQueue case 2:\n %v\n", spew.Sprint(r2.Destination)) obs.Queue(r2) + // queue case 3 r3 := simpleRequest() r3.Destination = "event:no-match" + fmt.Printf("\nQueue case 3:\n %v\n", spew.Sprint(r3.Destination)) obs.Queue(r3) obs.Shutdown(true) diff --git a/src/caduceus/senderWrapper.go b/src/caduceus/senderWrapper.go index 42011680..a8035114 100644 --- a/src/caduceus/senderWrapper.go +++ b/src/caduceus/senderWrapper.go @@ -23,7 +23,7 @@ import ( "time" "github.com/Comcast/webpa-common/webhook" - "github.com/Comcast/webpa-common/wrp" + "github.com/Comcast/wrp-go/wrp" "github.com/go-kit/kit/log" "github.com/go-kit/kit/metrics" ) @@ -52,12 +52,13 @@ type SenderWrapperFactory struct { // Metrics registry. MetricsRegistry CaduceusMetricsRegistry - // The metrics counter for content-type ContentTypeCounter metrics.Counter // The metrics counter for dropped messages due to invalid payloads DroppedMsgCounter metrics.Counter + EventType metrics.Counter + // The logger implementation to share with OutboundSenders. Logger log.Logger @@ -84,6 +85,7 @@ type CaduceusSenderWrapper struct { mutex sync.RWMutex senders map[string]OutboundSender metricsRegistry CaduceusMetricsRegistry + eventType metrics.Counter wg sync.WaitGroup shutdown chan struct{} } @@ -107,6 +109,8 @@ func (swf SenderWrapperFactory) New() (sw SenderWrapper, err error) { return } + caduceusSenderWrapper.eventType = swf.MetricsRegistry.NewCounter(IncomingEventTypeCounter) + caduceusSenderWrapper.senders = make(map[string]OutboundSender) caduceusSenderWrapper.shutdown = make(chan struct{}) @@ -163,6 +167,9 @@ func (sw *CaduceusSenderWrapper) Update(list []webhook.W) { // function performs the fan-out and filtering to multiple possible endpoints. func (sw *CaduceusSenderWrapper) Queue(msg *wrp.Message) { sw.mutex.RLock() + + sw.eventType.With("event", msg.FindEventStringSubMatch()) + for _, v := range sw.senders { v.Queue(msg) } diff --git a/src/caduceus/senderWrapper_test.go b/src/caduceus/senderWrapper_test.go index db08df37..c29cac84 100644 --- a/src/caduceus/senderWrapper_test.go +++ b/src/caduceus/senderWrapper_test.go @@ -18,15 +18,16 @@ package main import ( "bytes" - "github.com/Comcast/webpa-common/logging" - "github.com/Comcast/webpa-common/webhook" - "github.com/Comcast/webpa-common/wrp" - "github.com/stretchr/testify/assert" "net/http" "sync" "sync/atomic" "testing" "time" + + "github.com/Comcast/webpa-common/logging" + "github.com/Comcast/webpa-common/webhook" + "github.com/Comcast/wrp-go/wrp" + "github.com/stretchr/testify/assert" ) type result struct { @@ -45,6 +46,8 @@ type swTransport struct { } func (t *swTransport) RoundTrip(req *http.Request) (*http.Response, error) { + + // atomic.AddInt32(&t.i, 1) r := result{URL: req.URL.String(), @@ -94,15 +97,20 @@ func getFakeFactory() *SenderWrapperFactory { On("With", []string{"url", "http://localhost:9999/foo", "reason", "network_err"}).Return(fakeIgnore). On("With", []string{"url", "http://localhost:9999/foo", "reason", "invalid_config"}).Return(fakeIgnore). On("With", []string{"url", "http://localhost:8888/foo", "code", "200", "event", "unknown"}).Return(fakeIgnore). - On("With", []string{"url", "http://localhost:9999/foo", "code", "200", "event", "unknown"}).Return(fakeIgnore) + On("With", []string{"url", "http://localhost:9999/foo", "code", "200", "event", "unknown"}).Return(fakeIgnore). + On("With", []string{"event", "iot"}).Return(fakeIgnore). + On("With", []string{"event", "test/extra-stuff"}).Return(fakeIgnore). + On("With", []string{"event", "bob/magic/dog"}).Return(fakeIgnore). + On("With", []string{"event", "unknown"}).Return(fakeIgnore) fakeRegistry := new(mockCaduceusMetricsRegistry) - fakeRegistry.On("NewCounter", IncomingContentTypeCounter).Return(fakeICTC) fakeRegistry.On("NewCounter", DropsDueToInvalidPayload).Return(fakeDDTIP) fakeRegistry.On("NewCounter", DeliveryRetryCounter).Return(fakeIgnore) fakeRegistry.On("NewCounter", DeliveryCounter).Return(fakeIgnore) fakeRegistry.On("NewCounter", SlowConsumerCounter).Return(fakeIgnore) fakeRegistry.On("NewCounter", SlowConsumerDroppedMsgCounter).Return(fakeIgnore) + fakeRegistry.On("NewCounter", IncomingContentTypeCounter).Return(fakeICTC) + fakeRegistry.On("NewCounter", IncomingEventTypeCounter).Return(fakeIgnore) fakeRegistry.On("NewGauge", OutgoingQueueDepth).Return(fakeGauge) return &SenderWrapperFactory{ @@ -130,6 +138,7 @@ func TestInvalidLinger(t *testing.T) { // 1. Remove the limitation of 5min as the only timeout // -or- // 2. Add a mock for the webhook implementation + func TestSwSimple(t *testing.T) { assert := assert.New(t) @@ -161,7 +170,6 @@ func TestSwSimple(t *testing.T) { assert.NotNil(sw) // No listeners - sw.Queue(iot) sw.Queue(iot) sw.Queue(iot) @@ -192,6 +200,7 @@ func TestSwSimple(t *testing.T) { sw.Update(list) // Send iot message + sw.Queue(iot) // Send test message diff --git a/src/glide.lock b/src/glide.lock index bc193337..1fbbb53b 100644 --- a/src/glide.lock +++ b/src/glide.lock @@ -1,5 +1,5 @@ hash: 8e0b5dc0f4fcee573e27644aa4b665502fecb68c941dad21430d3f491ee8519d -updated: 2019-02-21T10:14:46.1291513-08:00 +updated: 2019-03-19T17:17:52.093918-06:00 imports: - name: github.com/armon/go-metrics version: 783273d703149aaeb9897cf58613d5af48861c25 @@ -44,8 +44,6 @@ imports: - name: github.com/Comcast/webpa-common version: ee36b7e7561c779aa65a66fbef35b46720486987 subpackages: - - capacitor - - clock - concurrent - convey - convey/conveyhttp @@ -61,19 +59,23 @@ imports: - server - service - service/consul - - service/monitor - service/servicecfg - service/zk - types - webhook - webhook/aws - wrp - - wrp/wrphttp - wrp/wrpmeta - xhttp + - xhttp/gate - xlistener - xmetrics - xviper +- name: github.com/Comcast/wrp-go + version: a1be47de16c373712b073ce49ee15f88e593a155 + subpackages: + - wrp + - wrp/wrphttp - name: github.com/davecgh/go-spew version: 8991bc29aa16c548c550c7ff78260e27b9ab7c73 subpackages: @@ -109,7 +111,7 @@ imports: - name: github.com/go-logfmt/logfmt version: 390ab7935ee28ec6b286364bba9b4dd6410cb3d5 - name: github.com/golang/protobuf - version: 9eb2c01ac278a5d89ce4b2be68fe4500955d8179 + version: 925541529c1fa6821df4e44ce2723319eb2be768 subpackages: - proto - name: github.com/gorilla/context