diff --git a/src/caduceus/caduceus_type.go b/src/caduceus/caduceus_type.go index 921241c3..41890ddc 100644 --- a/src/caduceus/caduceus_type.go +++ b/src/caduceus/caduceus_type.go @@ -62,8 +62,8 @@ type CaduceusHandler struct { } func (ch *CaduceusHandler) HandleRequest(workerID int, msg *wrp.Message) { - logging.Info(ch).Log("workerID", workerID, logging.MessageKey(), "Worker received a request, now passing"+ " to sender") + ch.senderWrapper.Queue(msg) } diff --git a/src/caduceus/http.go b/src/caduceus/http.go index f2da2c05..7b6c9665 100644 --- a/src/caduceus/http.go +++ b/src/caduceus/http.go @@ -25,7 +25,7 @@ import ( "github.com/Comcast/wrp-go/wrp" "github.com/go-kit/kit/log" "github.com/go-kit/kit/metrics" - "github.com/satori/go.uuid" + uuid "github.com/satori/go.uuid" ) // Below is the struct that will implement our ServeHTTP method @@ -89,6 +89,7 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R debugLog.Log(messageKey, "Invalid payload format.\n") return } + sh.caduceusHandler.HandleRequest(0, fixWrp(msg)) // return a 202 diff --git a/src/caduceus/http_test.go b/src/caduceus/http_test.go index b74874cc..e1d941a3 100644 --- a/src/caduceus/http_test.go +++ b/src/caduceus/http_test.go @@ -18,6 +18,7 @@ package main import ( "bytes" + "fmt" "net/http" "net/http/httptest" "testing" @@ -65,6 +66,8 @@ func exampleRequest(list ...string) *http.Request { } func TestServerHandler(t *testing.T) { + fmt.Print("TestingeServerHandler") + assert := assert.New(t) logger := logging.DefaultLogger() @@ -100,6 +103,8 @@ func TestServerHandler(t *testing.T) { } func TestServerHandlerFixWrp(t *testing.T) { + fmt.Printf("TestServerHandlerFixWrp") + assert := assert.New(t) logger := logging.DefaultLogger() @@ -113,6 +118,11 @@ func TestServerHandlerFixWrp(t *testing.T) { fakeQueueDepth := new(mockGauge) fakeQueueDepth.On("Add", mock.AnythingOfType("float64")).Return().Times(2) + fakeIncomingContentTypeCount := new(mockCounter) + fakeIncomingContentTypeCount.On("With", []string{"content_type", "application/msgpack"}).Return(fakeIncomingContentTypeCount) + fakeIncomingContentTypeCount.On("With", []string{"content_type", ""}).Return(fakeIncomingContentTypeCount) + fakeIncomingContentTypeCount.On("Add", 1.0).Return() + serverWrapper := &ServerHandler{ Logger: logger, caduceusHandler: fakeHandler, @@ -135,6 +145,8 @@ func TestServerHandlerFixWrp(t *testing.T) { } func TestServerHandlerFull(t *testing.T) { + fmt.Printf("TestServerHandlerFull") + assert := assert.New(t) logger := logging.DefaultLogger() @@ -167,6 +179,8 @@ func TestServerHandlerFull(t *testing.T) { } func TestServerEmptyPayload(t *testing.T) { + fmt.Printf("TestServerEmptyPayLoad") + assert := assert.New(t) var buffer bytes.Buffer @@ -204,6 +218,8 @@ func TestServerEmptyPayload(t *testing.T) { } func TestServerUnableToReadBody(t *testing.T) { + fmt.Printf("TestServerUnableToReadBody") + assert := assert.New(t) var buffer bytes.Buffer @@ -243,6 +259,8 @@ func TestServerUnableToReadBody(t *testing.T) { } func TestServerInvalidBody(t *testing.T) { + fmt.Printf("TestServerInvalidBody") + assert := assert.New(t) r := bytes.NewReader([]byte("Invalid payload.")) diff --git a/src/caduceus/metrics.go b/src/caduceus/metrics.go index 70bea8d5..95f17ff5 100644 --- a/src/caduceus/metrics.go +++ b/src/caduceus/metrics.go @@ -84,3 +84,16 @@ func Metrics() []xmetrics.Metric { }, } } + +func CreateOutbounderMetrics(m CaduceusMetricsRegistry, c *CaduceusOutboundSender) { + c.deliveryCounter = m.NewCounter(DeliveryCounter) + c.deliveryRetryCounter = m.NewCounter(DeliveryRetryCounter) + c.cutOffCounter = m.NewCounter(SlowConsumerCounter).With("url", c.id) + c.droppedQueueFullCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "queue_full") + c.droppedExpiredCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "expired") + c.droppedCutoffCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "cut_off") + c.droppedInvalidConfig = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "invalid_config") + c.droppedNetworkErrCounter = m.NewCounter(SlowConsumerDroppedMsgCounter).With("url", c.id, "reason", "network_err") + c.queueDepthGauge = m.NewGauge(OutgoingQueueDepth).With("url", c.id) + c.contentTypeCounter = m.NewCounter(IncomingContentTypeCounter) +} diff --git a/src/caduceus/outboundSender.go b/src/caduceus/outboundSender.go index f6d983d7..df42c5fd 100644 --- a/src/caduceus/outboundSender.go +++ b/src/caduceus/outboundSender.go @@ -132,6 +132,7 @@ type CaduceusOutboundSender struct { droppedNetworkErrCounter metrics.Counter droppedInvalidConfig metrics.Counter cutOffCounter metrics.Counter + contentTypeCounter metrics.Counter queueDepthGauge metrics.Gauge eventType metrics.Counter wg sync.WaitGroup @@ -187,29 +188,7 @@ func (osf OutboundSenderFactory) New() (obs OutboundSender, err error) { // Don't share the secret with others when there is an error. caduceusOutboundSender.failureMsg.Original.Config.Secret = "XxxxxX" - caduceusOutboundSender.deliveryCounter = osf.MetricsRegistry.NewCounter(DeliveryCounter) - caduceusOutboundSender.deliveryRetryCounter = osf.MetricsRegistry.NewCounter(DeliveryRetryCounter) - - caduceusOutboundSender.cutOffCounter = osf.MetricsRegistry. - NewCounter(SlowConsumerCounter).With("url", caduceusOutboundSender.id) - - caduceusOutboundSender.droppedQueueFullCounter = osf.MetricsRegistry. - NewCounter(SlowConsumerDroppedMsgCounter).With("url", caduceusOutboundSender.id, "reason", "queue_full") - - caduceusOutboundSender.droppedExpiredCounter = osf.MetricsRegistry. - NewCounter(SlowConsumerDroppedMsgCounter).With("url", caduceusOutboundSender.id, "reason", "expired") - - caduceusOutboundSender.droppedCutoffCounter = osf.MetricsRegistry. - NewCounter(SlowConsumerDroppedMsgCounter).With("url", caduceusOutboundSender.id, "reason", "cut_off") - - caduceusOutboundSender.droppedInvalidConfig = osf.MetricsRegistry. - NewCounter(SlowConsumerDroppedMsgCounter).With("url", caduceusOutboundSender.id, "reason", "invalid_config") - - caduceusOutboundSender.droppedNetworkErrCounter = osf.MetricsRegistry. - NewCounter(SlowConsumerDroppedMsgCounter).With("url", caduceusOutboundSender.id, "reason", "network_err") - - caduceusOutboundSender.queueDepthGauge = osf.MetricsRegistry. - NewGauge(OutgoingQueueDepth).With("url", caduceusOutboundSender.id) + CreateOutbounderMetrics(osf.MetricsRegistry, caduceusOutboundSender) // Give us some head room so that we don't block when we get near the // completely full point. @@ -441,6 +420,7 @@ func (obs *CaduceusOutboundSender) dispatcher() { continue } obs.workers.Acquire() + go obs.send(secret, accept, msg) } @@ -459,6 +439,8 @@ func (obs *CaduceusOutboundSender) send(secret, acceptType string, msg *wrp.Mess body := payload var payloadReader *bytes.Reader + obs.contentTypeCounter.With("content_type", strings.TrimLeft(msg.ContentType, "application/")).Add(1.0) + // Use the internal content type unless the accept type is wrp contentType := msg.ContentType switch acceptType { @@ -588,6 +570,8 @@ func (obs *CaduceusOutboundSender) queueOverflow() { req.Header.Set("X-Webpa-Signature", sig) } + // record content type, json. + obs.contentTypeCounter.With("content_type", "json").Add(1.0) resp, err := obs.sender(req) if nil != err { // Failure diff --git a/src/caduceus/outboundSender_test.go b/src/caduceus/outboundSender_test.go index 761d64aa..fda1e558 100644 --- a/src/caduceus/outboundSender_test.go +++ b/src/caduceus/outboundSender_test.go @@ -25,6 +25,7 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/go-kit/kit/log" "github.com/stretchr/testify/assert" + //"github.com/stretchr/testify/mock" "io" "io/ioutil" @@ -123,18 +124,30 @@ func simpleFactorySetup(trans *transport, cutOffPeriod time.Duration, matcher [] fakeDroppedSlow.On("With", []string{"url", w.Config.URL, "reason", "network_err"}).Return(fakeDroppedSlow) fakeDroppedSlow.On("Add", 1.0).Return() - // test queue depth + // IncomingContentType cases + fakeContentType := new(mockCounter) + fakeContentType.On("With", []string{"content_type", "msgpack"}).Return(fakeContentType) + fakeContentType.On("With", []string{"content_type", "json"}).Return(fakeContentType) + fakeContentType.On("With", []string{"content_type", "http"}).Return(fakeContentType) + fakeContentType.On("With", []string{"content_type", "other"}).Return(fakeContentType) + fakeContentType.On("Add", 1.0).Return() + + // QueueDepth case fakeQdepth := new(mockGauge) fakeQdepth.On("With", []string{"url", w.Config.URL}).Return(fakeQdepth) fakeQdepth.On("Add", 1.0).Return().On("Add", -1.0).Return() - // build a registry and register all fake metrics + // Build a registry and register all fake metrics, these are synymous with the metrics in + // metrics.go + // + // If a new metric within outboundsender is created it must be added here fakeRegistry := new(mockCaduceusMetricsRegistry) fakeRegistry.On("NewCounter", DeliveryRetryCounter).Return(fakeDC) fakeRegistry.On("NewCounter", DeliveryCounter).Return(fakeDC) + fakeRegistry.On("NewCounter", OutgoingQueueDepth).Return(fakeDC) fakeRegistry.On("NewCounter", SlowConsumerCounter).Return(fakeSlow) fakeRegistry.On("NewCounter", SlowConsumerDroppedMsgCounter).Return(fakeDroppedSlow) - //fakeRegistry.On("NewCounter", IncomingEventTypeCounter).Return(fakeEventType) + fakeRegistry.On("NewCounter", IncomingContentTypeCounter).Return(fakeContentType) fakeRegistry.On("NewGauge", OutgoingQueueDepth).Return(fakeQdepth) return &OutboundSenderFactory{ @@ -175,19 +188,35 @@ func TestSimpleWrp(t *testing.T) { // queue case 1 req := simpleRequest() req.Destination = "event:iot" - fmt.Printf("Queue case 1:\n %v\n", spew.Sprint(req.Destination)) + fmt.Printf("Queue case 1:\n %v\n", spew.Sprint(req)) obs.Queue(req) - r2 := simpleRequest() - r2.Destination = "event:test" - fmt.Printf("\nQueue case 2:\n %v\n", spew.Sprint(r2.Destination)) - obs.Queue(r2) + req = simpleRequest() + req.Destination = "event:test" + fmt.Printf("\nQueue case 2:\n %v\n", spew.Sprint(req)) + obs.Queue(req) // queue case 3 - r3 := simpleRequest() - r3.Destination = "event:no-match" - fmt.Printf("\nQueue case 3:\n %v\n", spew.Sprint(r3.Destination)) - obs.Queue(r3) + req = simpleRequest() + req.Destination = "event:no-match" + fmt.Printf("\nQueue case 3:\n %v\n", spew.Sprint(req)) + obs.Queue(req) + + // queue case 4 + req = simpleRequest() + req.ContentType = "application/json" + fmt.Printf("\nQueue case 3:\n %v\n", spew.Sprint(req)) + obs.Queue(req) + + req = simpleRequest() + req.ContentType = "application/http" + fmt.Printf("\nQueue case 4:\n %v\n", spew.Sprint(req)) + obs.Queue(req) + + req = simpleRequest() + req.ContentType = "unknown" + fmt.Printf("\nQueue case 4:\n %v\n", spew.Sprint(req)) + obs.Queue(req) obs.Shutdown(true) diff --git a/src/caduceus/senderWrapper.go b/src/caduceus/senderWrapper.go index fbb69ced..9617500d 100644 --- a/src/caduceus/senderWrapper.go +++ b/src/caduceus/senderWrapper.go @@ -52,8 +52,6 @@ type SenderWrapperFactory struct { // Metrics registry. MetricsRegistry CaduceusMetricsRegistry - ContentTypeCounter metrics.Counter - // The metrics counter for dropped messages due to invalid payloads DroppedMsgCounter metrics.Counter diff --git a/src/caduceus/senderWrapper_test.go b/src/caduceus/senderWrapper_test.go index c29cac84..a91a1024 100644 --- a/src/caduceus/senderWrapper_test.go +++ b/src/caduceus/senderWrapper_test.go @@ -65,10 +65,6 @@ func (t *swTransport) RoundTrip(req *http.Request) (*http.Response, error) { } func getFakeFactory() *SenderWrapperFactory { - fakeICTC := new(mockCounter) - fakeICTC.On("With", []string{"content_type", "msgpack"}).Return(fakeICTC). - On("With", []string{"content_type", "unknown"}).Return(fakeICTC). - On("Add", 1.0).Return() fakeDDTIP := new(mockCounter) fakeDDTIP.On("Add", 1.0).Return() @@ -101,7 +97,11 @@ func getFakeFactory() *SenderWrapperFactory { On("With", []string{"event", "iot"}).Return(fakeIgnore). On("With", []string{"event", "test/extra-stuff"}).Return(fakeIgnore). On("With", []string{"event", "bob/magic/dog"}).Return(fakeIgnore). - On("With", []string{"event", "unknown"}).Return(fakeIgnore) + On("With", []string{"event", "unknown"}).Return(fakeIgnore). + On("With", []string{"content_type", "msgpack"}).Return(fakeIgnore). + On("With", []string{"content_type", "json"}).Return(fakeIgnore). + On("With", []string{"content_type", "http"}).Return(fakeIgnore). + On("With", []string{"content_type", "other"}).Return(fakeIgnore) fakeRegistry := new(mockCaduceusMetricsRegistry) fakeRegistry.On("NewCounter", DropsDueToInvalidPayload).Return(fakeDDTIP) @@ -109,7 +109,7 @@ func getFakeFactory() *SenderWrapperFactory { fakeRegistry.On("NewCounter", DeliveryCounter).Return(fakeIgnore) fakeRegistry.On("NewCounter", SlowConsumerCounter).Return(fakeIgnore) fakeRegistry.On("NewCounter", SlowConsumerDroppedMsgCounter).Return(fakeIgnore) - fakeRegistry.On("NewCounter", IncomingContentTypeCounter).Return(fakeICTC) + fakeRegistry.On("NewCounter", IncomingContentTypeCounter).Return(fakeIgnore) fakeRegistry.On("NewCounter", IncomingEventTypeCounter).Return(fakeIgnore) fakeRegistry.On("NewGauge", OutgoingQueueDepth).Return(fakeGauge)