Skip to content

Commit

Permalink
Merge pull request #131 from githubsands/metric_for_incoming_events
Browse files Browse the repository at this point in the history
caduceus: add metric - all event types before dropped
  • Loading branch information
schmidtw authored Mar 20, 2019
2 parents ea35303 + 4a64478 commit e167daa
Show file tree
Hide file tree
Showing 11 changed files with 97 additions and 38 deletions.
2 changes: 1 addition & 1 deletion src/caduceus/caduceus_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"time"

"github.com/Comcast/webpa-common/logging"
"github.com/Comcast/webpa-common/wrp"
"github.com/Comcast/wrp-go/wrp"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/metrics"
)
Expand Down
2 changes: 1 addition & 1 deletion src/caduceus/caduceus_type_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"testing"

"github.com/Comcast/webpa-common/logging"
"github.com/Comcast/webpa-common/wrp"
"github.com/Comcast/wrp-go/wrp"
"github.com/stretchr/testify/mock"
)

Expand Down
2 changes: 1 addition & 1 deletion src/caduceus/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"sync/atomic"

"github.com/Comcast/webpa-common/logging"
"github.com/Comcast/webpa-common/wrp"
"github.com/Comcast/wrp-go/wrp"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/metrics"
"github.com/satori/go.uuid"
Expand Down
7 changes: 7 additions & 0 deletions src/caduceus/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const (
SlowConsumerCounter = "slow_consumer_cut_off_count"
IncomingQueueDepth = "incoming_queue_depth"
IncomingContentTypeCounter = "incoming_content_type_count"
IncomingEventTypeCounter = "incoming_event_type_count"
DropsDueToInvalidPayload = "drops_due_to_invalid_payload"
OutgoingQueueDepth = "outgoing_queue_depths"
)
Expand Down Expand Up @@ -75,5 +76,11 @@ func Metrics() []xmetrics.Metric {
Type: "gauge",
LabelNames: []string{"url"},
},
{
Name: IncomingEventTypeCounter,
Help: "Incoming count of events by event type",
Type: "counter",
LabelNames: []string{"event"},
},
}
}
16 changes: 16 additions & 0 deletions src/caduceus/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,22 @@

package main

// Using Caduceus's test suite:
//
// If you are testing a new metric the followng process needs to be done below:
// 1. Create a fake, mockMetric i.e fakeEventType := new(mockCounter)
// 2. If your metric type has yet to be included in mockCaduceusMetricRegistry within mocks.go
// add your metric type to mockCaduceusMetricRegistry
// 3. Trigger the On method on that "mockMetric" with various different cases of that metric,
// in both senderWrapper_test.go and/or outboundSender_test.go
// i.e:
// case 1: On("With", []string{"event", iot}
// case 2: On("With", []string{"event", unknown}
// Tests for all possible event_types that will be sent to the metrics Desc. If all cases arn't
// included tests will fail.
// 4. Mimic the metric behavior using On i.e if your specific metric is a counter:
// fakeSlow.On("Add", 1.0).Return()

import (
"testing"

Expand Down
6 changes: 1 addition & 5 deletions src/caduceus/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

"github.com/Comcast/webpa-common/health"
"github.com/Comcast/webpa-common/webhook"
"github.com/Comcast/webpa-common/wrp"
"github.com/Comcast/wrp-go/wrp"
"github.com/go-kit/kit/metrics"
"github.com/stretchr/testify/mock"
)
Expand Down Expand Up @@ -72,10 +72,6 @@ func (m *mockOutboundSender) RetiredSince() time.Time {
return arguments.Get(0).(time.Time)
}

func (m *mockOutboundSender) Queue(msg *wrp.Message) {
m.Called(msg)
}

// mockSenderWrapper needs to mock things that the `SenderWrapper` does
type mockSenderWrapper struct {
mock.Mock
Expand Down
19 changes: 5 additions & 14 deletions src/caduceus/outboundSender.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ import (
"github.com/Comcast/webpa-common/logging"
"github.com/Comcast/webpa-common/semaphore"
"github.com/Comcast/webpa-common/webhook"
"github.com/Comcast/webpa-common/wrp"
"github.com/Comcast/webpa-common/wrp/wrphttp"
"github.com/Comcast/webpa-common/xhttp"
"github.com/Comcast/wrp-go/wrp"
"github.com/Comcast/wrp-go/wrp/wrphttp"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/metrics"
)
Expand All @@ -52,12 +52,6 @@ const failureText = `Unfortunately, your endpoint is not able to keep up with th
`capacity to handle notifications, or reduce the number of notifications ` +
`you have requested.`

var (
// eventPattern is the precompiled regex that selects the top level event
// classifier
eventPattern = regexp.MustCompile(`^event:(?P<event>[^/]+)`)
)

// outboundRequest stores the outgoing request and assorted data that has been
// collected so far (to save on processing the information again).
type outboundRequest struct {
Expand Down Expand Up @@ -139,6 +133,7 @@ type CaduceusOutboundSender struct {
droppedInvalidConfig metrics.Counter
cutOffCounter metrics.Counter
queueDepthGauge metrics.Gauge
eventType metrics.Counter
wg sync.WaitGroup
cutOffPeriod time.Duration
workers semaphore.Interface
Expand Down Expand Up @@ -342,7 +337,6 @@ func (obs *CaduceusOutboundSender) Queue(msg *wrp.Message) {
now := time.Now()

var debugLog = logging.Debug(obs.logger)

if false == obs.isValidTimeWindow(now, dropUntil, deliverUntil) {
return
}
Expand All @@ -352,6 +346,7 @@ func (obs *CaduceusOutboundSender) Queue(msg *wrp.Message) {
debugLog.Log(logging.MessageKey(),
fmt.Sprintf("Regex did not match. got != expected: '%s' != '%s'\n",
msg.Destination, eventRegex.String()))

continue
}

Expand Down Expand Up @@ -511,11 +506,7 @@ func (obs *CaduceusOutboundSender) send(secret, acceptType string, msg *wrp.Mess
}

// find the event "short name"
match := eventPattern.FindStringSubmatch(msg.Destination)
event := "unknown"
if match != nil {
event = match[1]
}
event := msg.FindEventStringSubMatch()

retryOptions := xhttp.RetryOptions{
Logger: obs.logger,
Expand Down
33 changes: 32 additions & 1 deletion src/caduceus/outboundSender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package main
import (
"bytes"
"fmt"

"github.com/Comcast/webpa-common/webhook"
"github.com/Comcast/webpa-common/wrp"
"github.com/Comcast/wrp-go/wrp"
"github.com/davecgh/go-spew/spew"
"github.com/go-kit/kit/log"
"github.com/stretchr/testify/assert"
//"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -56,6 +58,21 @@ func simpleSetup(trans *transport, cutOffPeriod time.Duration, matcher []string)
return simpleFactorySetup(trans, cutOffPeriod, matcher).New()
}

// simpleFactorySetup sets up a outboundSender with metrics.
//
// Using Caduceus's test suite
//
// If you are testing a new metric it needs to be created in this process below.
// 1. Create a fake, mockMetric i.e fakeEventType := new(mockCounter)
// 2. If your metric type has yet to be included in mockCaduceusMetricRegistry within mocks.go
// add your metric type to mockCaduceusMetricRegistry
// 3. Trigger the On method on that "mockMetric" with various different cases of that metric,
// in both senderWrapper_test.go and outboundSender_test.go
// i.e:
// case 1: On("With", []string{"event", iot}
// case 2: On("With", []string{"event", unknown}
// 4. Mimic the metric behavior using On:
// fakeSlow.On("Add", 1.0).Return()
func simpleFactorySetup(trans *transport, cutOffPeriod time.Duration, matcher []string) *OutboundSenderFactory {
if nil == trans.fn {
trans.fn = func(req *http.Request, count int) (resp *http.Response, err error) {
Expand All @@ -75,6 +92,7 @@ func simpleFactorySetup(trans *transport, cutOffPeriod time.Duration, matcher []
w.Config.Secret = "123456"
w.Matcher.DeviceId = matcher

// test dc metric
fakeDC := new(mockCounter)
fakeDC.On("With", []string{"url", w.Config.URL, "code", "200", "event", "test"}).Return(fakeDC).
On("With", []string{"url", w.Config.URL, "code", "200", "event", "iot"}).Return(fakeDC).
Expand All @@ -90,10 +108,12 @@ func simpleFactorySetup(trans *transport, cutOffPeriod time.Duration, matcher []
fakeDC.On("Add", 1.0).Return()
fakeDC.On("Add", 0.0).Return()

// test slow metric
fakeSlow := new(mockCounter)
fakeSlow.On("With", []string{"url", w.Config.URL}).Return(fakeSlow)
fakeSlow.On("Add", 1.0).Return()

// test dropped metric
fakeDroppedSlow := new(mockCounter)
fakeDroppedSlow.On("With", []string{"url", w.Config.URL, "reason", "queue_full"}).Return(fakeDroppedSlow)
fakeDroppedSlow.On("With", []string{"url", w.Config.URL, "reason", "cut_off"}).Return(fakeDroppedSlow)
Expand All @@ -102,15 +122,18 @@ func simpleFactorySetup(trans *transport, cutOffPeriod time.Duration, matcher []
fakeDroppedSlow.On("With", []string{"url", w.Config.URL, "reason", "network_err"}).Return(fakeDroppedSlow)
fakeDroppedSlow.On("Add", 1.0).Return()

// test queue depth
fakeQdepth := new(mockGauge)
fakeQdepth.On("With", []string{"url", w.Config.URL}).Return(fakeQdepth)
fakeQdepth.On("Add", 1.0).Return().On("Add", -1.0).Return()

// build a registry and register all fake metrics
fakeRegistry := new(mockCaduceusMetricsRegistry)
fakeRegistry.On("NewCounter", DeliveryRetryCounter).Return(fakeDC)
fakeRegistry.On("NewCounter", DeliveryCounter).Return(fakeDC)
fakeRegistry.On("NewCounter", SlowConsumerCounter).Return(fakeSlow)
fakeRegistry.On("NewCounter", SlowConsumerDroppedMsgCounter).Return(fakeDroppedSlow)
//fakeRegistry.On("NewCounter", IncomingEventTypeCounter).Return(fakeEventType)
fakeRegistry.On("NewGauge", OutgoingQueueDepth).Return(fakeQdepth)

return &OutboundSenderFactory{
Expand All @@ -137,24 +160,32 @@ func simpleRequest() *wrp.Message {

// Simple test that covers the normal successful case with no extra matchers
func TestSimpleWrp(t *testing.T) {
fmt.Printf("\n\nTestingSimpleWRP:\n\n")

assert := assert.New(t)

trans := &transport{}

fmt.Printf("SimpleSetup:\n")
obs, err := simpleSetup(trans, time.Second, nil)
assert.NotNil(obs)
assert.Nil(err)

// queue case 1
req := simpleRequest()
req.Destination = "event:iot"
fmt.Printf("Queue case 1:\n %v\n", spew.Sprint(req.Destination))
obs.Queue(req)

r2 := simpleRequest()
r2.Destination = "event:test"
fmt.Printf("\nQueue case 2:\n %v\n", spew.Sprint(r2.Destination))
obs.Queue(r2)

// queue case 3
r3 := simpleRequest()
r3.Destination = "event:no-match"
fmt.Printf("\nQueue case 3:\n %v\n", spew.Sprint(r3.Destination))
obs.Queue(r3)

obs.Shutdown(true)
Expand Down
11 changes: 9 additions & 2 deletions src/caduceus/senderWrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"time"

"github.com/Comcast/webpa-common/webhook"
"github.com/Comcast/webpa-common/wrp"
"github.com/Comcast/wrp-go/wrp"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/metrics"
)
Expand Down Expand Up @@ -52,12 +52,13 @@ type SenderWrapperFactory struct {
// Metrics registry.
MetricsRegistry CaduceusMetricsRegistry

// The metrics counter for content-type
ContentTypeCounter metrics.Counter

// The metrics counter for dropped messages due to invalid payloads
DroppedMsgCounter metrics.Counter

EventType metrics.Counter

// The logger implementation to share with OutboundSenders.
Logger log.Logger

Expand All @@ -84,6 +85,7 @@ type CaduceusSenderWrapper struct {
mutex sync.RWMutex
senders map[string]OutboundSender
metricsRegistry CaduceusMetricsRegistry
eventType metrics.Counter
wg sync.WaitGroup
shutdown chan struct{}
}
Expand All @@ -107,6 +109,8 @@ func (swf SenderWrapperFactory) New() (sw SenderWrapper, err error) {
return
}

caduceusSenderWrapper.eventType = swf.MetricsRegistry.NewCounter(IncomingEventTypeCounter)

caduceusSenderWrapper.senders = make(map[string]OutboundSender)
caduceusSenderWrapper.shutdown = make(chan struct{})

Expand Down Expand Up @@ -163,6 +167,9 @@ func (sw *CaduceusSenderWrapper) Update(list []webhook.W) {
// function performs the fan-out and filtering to multiple possible endpoints.
func (sw *CaduceusSenderWrapper) Queue(msg *wrp.Message) {
sw.mutex.RLock()

sw.eventType.With("event", msg.FindEventStringSubMatch())

for _, v := range sw.senders {
v.Queue(msg)
}
Expand Down
23 changes: 16 additions & 7 deletions src/caduceus/senderWrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@ package main

import (
"bytes"
"github.com/Comcast/webpa-common/logging"
"github.com/Comcast/webpa-common/webhook"
"github.com/Comcast/webpa-common/wrp"
"github.com/stretchr/testify/assert"
"net/http"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/Comcast/webpa-common/logging"
"github.com/Comcast/webpa-common/webhook"
"github.com/Comcast/wrp-go/wrp"
"github.com/stretchr/testify/assert"
)

type result struct {
Expand All @@ -45,6 +46,8 @@ type swTransport struct {
}

func (t *swTransport) RoundTrip(req *http.Request) (*http.Response, error) {

//
atomic.AddInt32(&t.i, 1)

r := result{URL: req.URL.String(),
Expand Down Expand Up @@ -94,15 +97,20 @@ func getFakeFactory() *SenderWrapperFactory {
On("With", []string{"url", "http://localhost:9999/foo", "reason", "network_err"}).Return(fakeIgnore).
On("With", []string{"url", "http://localhost:9999/foo", "reason", "invalid_config"}).Return(fakeIgnore).
On("With", []string{"url", "http://localhost:8888/foo", "code", "200", "event", "unknown"}).Return(fakeIgnore).
On("With", []string{"url", "http://localhost:9999/foo", "code", "200", "event", "unknown"}).Return(fakeIgnore)
On("With", []string{"url", "http://localhost:9999/foo", "code", "200", "event", "unknown"}).Return(fakeIgnore).
On("With", []string{"event", "iot"}).Return(fakeIgnore).
On("With", []string{"event", "test/extra-stuff"}).Return(fakeIgnore).
On("With", []string{"event", "bob/magic/dog"}).Return(fakeIgnore).
On("With", []string{"event", "unknown"}).Return(fakeIgnore)

fakeRegistry := new(mockCaduceusMetricsRegistry)
fakeRegistry.On("NewCounter", IncomingContentTypeCounter).Return(fakeICTC)
fakeRegistry.On("NewCounter", DropsDueToInvalidPayload).Return(fakeDDTIP)
fakeRegistry.On("NewCounter", DeliveryRetryCounter).Return(fakeIgnore)
fakeRegistry.On("NewCounter", DeliveryCounter).Return(fakeIgnore)
fakeRegistry.On("NewCounter", SlowConsumerCounter).Return(fakeIgnore)
fakeRegistry.On("NewCounter", SlowConsumerDroppedMsgCounter).Return(fakeIgnore)
fakeRegistry.On("NewCounter", IncomingContentTypeCounter).Return(fakeICTC)
fakeRegistry.On("NewCounter", IncomingEventTypeCounter).Return(fakeIgnore)
fakeRegistry.On("NewGauge", OutgoingQueueDepth).Return(fakeGauge)

return &SenderWrapperFactory{
Expand Down Expand Up @@ -130,6 +138,7 @@ func TestInvalidLinger(t *testing.T) {
// 1. Remove the limitation of 5min as the only timeout
// -or-
// 2. Add a mock for the webhook implementation

func TestSwSimple(t *testing.T) {
assert := assert.New(t)

Expand Down Expand Up @@ -161,7 +170,6 @@ func TestSwSimple(t *testing.T) {
assert.NotNil(sw)

// No listeners

sw.Queue(iot)
sw.Queue(iot)
sw.Queue(iot)
Expand Down Expand Up @@ -192,6 +200,7 @@ func TestSwSimple(t *testing.T) {
sw.Update(list)

// Send iot message

sw.Queue(iot)

// Send test message
Expand Down
Loading

0 comments on commit e167daa

Please sign in to comment.