Skip to content

Commit

Permalink
Merge pull request #43 from Comcast/fix/correct-payload
Browse files Browse the repository at this point in the history
Change the output delivery so it delivers the expected payload & not …
  • Loading branch information
njharter authored Aug 14, 2017
2 parents b34517d + fdae2eb commit 25da7b1
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 46 deletions.
2 changes: 1 addition & 1 deletion src/caduceus/caduceusProfiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (cp *caduceusProfiler) process(raw []interface{}) (rv interface{}) {
for i, rawElement := range raw {
telemetryData := rawElement.(CaduceusTelemetry)

tonnage += telemetryData.PayloadSize
tonnage += telemetryData.RawPayloadSize

latency[i] = telemetryData.TimeSent.Sub(telemetryData.TimeReceived).Nanoseconds()
processingTime[i] = telemetryData.TimeOutboundAccepted.Sub(telemetryData.TimeReceived).Nanoseconds()
Expand Down
8 changes: 4 additions & 4 deletions src/caduceus/caduceusProfiler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestCaduceusProfilerFactory(t *testing.T) {

func TestCaduceusProfiler(t *testing.T) {
assert := assert.New(t)
testMsg := CaduceusTelemetry{PayloadSize: 12}
testMsg := CaduceusTelemetry{RawPayloadSize: 12}
testData := make([]interface{}, 0)
testData = append(testData, testMsg)

Expand Down Expand Up @@ -129,7 +129,7 @@ func TestCaduceusProfiler(t *testing.T) {
testResults := testProfiler.Report()

assert.Equal(1, len(testResults))
assert.Equal(CaduceusTelemetry{PayloadSize: 12}, testResults[0].(CaduceusTelemetry))
assert.Equal(CaduceusTelemetry{RawPayloadSize: 12}, testResults[0].(CaduceusTelemetry))

fakeRing.AssertExpectations(t)
})
Expand All @@ -141,12 +141,12 @@ func TestCaduceusProfiler(t *testing.T) {
func TestCaduceusProfilerProcess(t *testing.T) {
set := []CaduceusTelemetry{
{
PayloadSize: 100,
RawPayloadSize: 100,
TimeReceived: time.Unix(1000, 0),
TimeSent: time.Unix(1000, 10),
},
{
PayloadSize: 200,
RawPayloadSize: 200,
TimeReceived: time.Unix(1010, 0),
TimeSent: time.Unix(1010, 12),
},
Expand Down
13 changes: 8 additions & 5 deletions src/caduceus/caduceus_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/Comcast/webpa-common/logging"
"github.com/Comcast/webpa-common/secure"
"github.com/Comcast/webpa-common/secure/key"
"github.com/Comcast/webpa-common/wrp"
"time"
)

Expand Down Expand Up @@ -48,10 +49,12 @@ type JWTValidator struct {

// Below is the struct we're using to create a request to caduceus
type CaduceusRequest struct {
Payload []byte
ContentType string
TargetURL string
Telemetry CaduceusTelemetry
RawPayload []byte
PayloadAsWrp *wrp.Message
OutgoingPayload []byte
ContentType string
TargetURL string
Telemetry CaduceusTelemetry
}

const (
Expand All @@ -61,7 +64,7 @@ const (
)

type CaduceusTelemetry struct {
PayloadSize int
RawPayloadSize int
TimeReceived time.Time
TimeAccepted time.Time
TimeSentToOutbound time.Time
Expand Down
6 changes: 3 additions & 3 deletions src/caduceus/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,13 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R
targetURL := request.URL.String()

caduceusRequest := CaduceusRequest{
Payload: payload,
RawPayload: payload,
ContentType: contentType,
TargetURL: targetURL,
Telemetry: stats,
}

caduceusRequest.Telemetry.PayloadSize = len(payload)
caduceusRequest.Telemetry.RawPayloadSize = len(payload)
caduceusRequest.Telemetry.TimeAccepted = time.Now()

err = sh.doJob(func(workerID int) { sh.caduceusHandler.HandleRequest(workerID, caduceusRequest) })
Expand All @@ -96,7 +96,7 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R
response.WriteHeader(http.StatusAccepted)
response.Write([]byte("Request placed on to queue.\n"))
sh.Trace("Request placed on to queue.")
sh.caduceusHealth.IncrementBucket(caduceusRequest.Telemetry.PayloadSize)
sh.caduceusHealth.IncrementBucket(caduceusRequest.Telemetry.RawPayloadSize)
}
}

Expand Down
47 changes: 34 additions & 13 deletions src/caduceus/outboundSender.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type OutboundSender interface {
Shutdown(bool)
RetiredSince() time.Time
QueueJSON(CaduceusRequest, string, string, string)
QueueWrp(CaduceusRequest, map[string]string, string, string, string)
QueueWrp(CaduceusRequest)
}

// CaduceusOutboundSender is the outbound sender object.
Expand Down Expand Up @@ -263,6 +263,7 @@ func (obs *CaduceusOutboundSender) QueueJSON(req CaduceusRequest,
}
if matchDevice {
if len(obs.queue) < obs.queueSize {
req.OutgoingPayload = req.RawPayload
outboundReq := outboundRequest{
req: req,
event: eventType,
Expand All @@ -285,8 +286,7 @@ func (obs *CaduceusOutboundSender) QueueJSON(req CaduceusRequest,
// QueueWrp is given a request to evaluate and optionally enqueue in the list
// of messages to deliver. The request is checked to see if it matches the
// criteria before being accepted or silently dropped.
func (obs *CaduceusOutboundSender) QueueWrp(req CaduceusRequest, metaData map[string]string,
eventType, deviceID, transID string) {
func (obs *CaduceusOutboundSender) QueueWrp(req CaduceusRequest) {
obs.mutex.RLock()
deliverUntil := obs.deliverUntil
dropUntil := obs.dropUntil
Expand All @@ -296,11 +296,11 @@ func (obs *CaduceusOutboundSender) QueueWrp(req CaduceusRequest, metaData map[st

if now.Before(deliverUntil) && now.After(dropUntil) {
for _, eventRegex := range obs.events {
if eventRegex.MatchString(eventType) {
if eventRegex.MatchString(req.PayloadAsWrp.Destination) {
matchDevice := (nil == obs.matcher)
if nil != obs.matcher {
for _, deviceRegex := range obs.matcher {
if deviceRegex.MatchString(deviceID) {
if deviceRegex.MatchString(req.PayloadAsWrp.Source) {
matchDevice = true
break
}
Expand Down Expand Up @@ -331,11 +331,27 @@ func (obs *CaduceusOutboundSender) QueueWrp(req CaduceusRequest, metaData map[st
*/
if matchDevice {
if len(obs.queue) < obs.queueSize {
// TODO we should break this code out into a function set a object
// creation time & choose if we should send the raw WRP message
// or if we should send just the payload + headers in the "HTTP"
// form.

// TODO we should really transcode from WRP into an HTTP format if
// asked for, otherwise we should deliver the WRP in that form.
req.OutgoingPayload = req.PayloadAsWrp.Payload

// Default to "application/json" if there is no content type, otherwise
// use the one the source specified.
ct := req.PayloadAsWrp.ContentType
if "" == ct {
ct = "application/json"
}

outboundReq := outboundRequest{req: req,
event: eventType,
transID: transID,
deviceID: deviceID,
contentType: "application/msgpack",
event: req.PayloadAsWrp.Destination,
transID: req.PayloadAsWrp.TransactionUUID,
deviceID: req.PayloadAsWrp.Source,
contentType: ct,
}
outboundReq.req.Telemetry.TimeOutboundAccepted = time.Now()
obs.queue <- outboundReq
Expand All @@ -345,7 +361,7 @@ func (obs *CaduceusOutboundSender) QueueWrp(req CaduceusRequest, metaData map[st
}
}
} else {
obs.logger.Trace(fmt.Sprintf("Regex did not match. got != expected: '%s' != '%s'\n", eventType, eventRegex.String()))
obs.logger.Trace(fmt.Sprintf("Regex did not match. got != expected: '%s' != '%s'\n", req.PayloadAsWrp.Destination, eventRegex.String()))
}
}
} else {
Expand Down Expand Up @@ -375,20 +391,25 @@ func (obs *CaduceusOutboundSender) worker(id int) {

now := time.Now()
if now.Before(deliverUntil) && now.After(dropUntil) {
payload := bytes.NewReader(work.req.Payload)
req, err := http.NewRequest("POST", obs.listener.Config.URL, payload)
payload := work.req.OutgoingPayload
payloadReader := bytes.NewReader(payload)
req, err := http.NewRequest("POST", obs.listener.Config.URL, payloadReader)
if nil != err {
// Report drop
obs.logger.Error("http.NewRequest(\"POST\", '%s', payload) failed: %s", obs.listener.Config.URL, err)
} else {
req.Header.Set("Content-Type", work.contentType)

// Provide the old headers for now
req.Header.Set("X-Webpa-Event", work.event)
req.Header.Set("X-Webpa-Transaction-Id", work.transID)
req.Header.Set("X-Webpa-Device-Id", work.deviceID)

// TODO Add the conversions to the new Xmidt headers

if nil != h {
h.Reset()
h.Write(work.req.Payload)
h.Write(payload)
sig := fmt.Sprintf("sha1=%s", hex.EncodeToString(h.Sum(nil)))
req.Header.Set("X-Webpa-Signature", sig)
}
Expand Down
66 changes: 51 additions & 15 deletions src/caduceus/outboundSender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"github.com/Comcast/webpa-common/logging"
"github.com/Comcast/webpa-common/webhook"
"github.com/Comcast/webpa-common/wrp"
"github.com/stretchr/testify/assert"
"io/ioutil"
"net/http"
Expand Down Expand Up @@ -71,7 +72,7 @@ func simpleSetup(trans *transport, cutOffPeriod time.Duration, matcher []string)

func simpleJSONRequest() CaduceusRequest {
req := CaduceusRequest{
Payload: []byte("Hello, world."),
RawPayload: []byte("Hello, world."),
ContentType: "application/json",
TargetURL: "http://foo.com/api/v2/notification/device/mac:112233445566/event/iot",
}
Expand All @@ -81,9 +82,10 @@ func simpleJSONRequest() CaduceusRequest {

func simpleWrpRequest() CaduceusRequest {
req := CaduceusRequest{
Payload: []byte("Hello, world."),
ContentType: "application/wrp",
TargetURL: "http://foo.com/api/v2/notification/device/mac:112233445566/event/iot",
RawPayload: []byte("Hello, world."),
PayloadAsWrp: &wrp.Message{},
ContentType: "application/wrp",
TargetURL: "http://foo.com/api/v2/notification/device/mac:112233445566/event/iot",
}

return req
Expand Down Expand Up @@ -169,9 +171,17 @@ func TestSimpleWrp(t *testing.T) {

req := simpleWrpRequest()

obs.QueueWrp(req, nil, "iot", "mac:112233445566", "1234")
obs.QueueWrp(req, nil, "test", "mac:112233445566", "1234")
obs.QueueWrp(req, nil, "no-match", "mac:112233445566", "1234")
req.PayloadAsWrp.Source = "mac:112233445566"
req.PayloadAsWrp.TransactionUUID = "1234"

req.PayloadAsWrp.Destination = "iot"
obs.QueueWrp(req)

req.PayloadAsWrp.Destination = "test"
obs.QueueWrp(req)

req.PayloadAsWrp.Destination = "no-match"
obs.QueueWrp(req)

obs.Shutdown(true)

Expand All @@ -191,10 +201,23 @@ func TestSimpleWrpWithMatchers(t *testing.T) {

req := simpleWrpRequest()

obs.QueueWrp(req, nil, "iot", "mac:112233445565", "1234")
obs.QueueWrp(req, nil, "test", "mac:112233445566", "1234")
obs.QueueWrp(req, nil, "iot", "mac:112233445560", "1234")
obs.QueueWrp(req, nil, "test", "mac:112233445560", "1234")
req.PayloadAsWrp.TransactionUUID = "1234"

req.PayloadAsWrp.Source = "mac:112233445566"
req.PayloadAsWrp.Destination = "iot"
obs.QueueWrp(req)

req.PayloadAsWrp.Source = "mac:112233445565"
req.PayloadAsWrp.Destination = "test"
obs.QueueWrp(req)

req.PayloadAsWrp.Source = "mac:112233445560"
req.PayloadAsWrp.Destination = "iot"
obs.QueueWrp(req)

req.PayloadAsWrp.Source = "mac:112233445560"
req.PayloadAsWrp.Destination = "test"
obs.QueueWrp(req)

obs.Shutdown(true)

Expand All @@ -215,10 +238,23 @@ func TestSimpleWrpWithWildcardMatchers(t *testing.T) {

req := simpleWrpRequest()

obs.QueueWrp(req, nil, "iot", "mac:112233445565", "1234")
obs.QueueWrp(req, nil, "test", "mac:112233445566", "1234")
obs.QueueWrp(req, nil, "iot", "mac:112233445560", "1234")
obs.QueueWrp(req, nil, "test", "mac:112233445560", "1234")
req.PayloadAsWrp.TransactionUUID = "1234"

req.PayloadAsWrp.Source = "mac:112233445566"
req.PayloadAsWrp.Destination = "iot"
obs.QueueWrp(req)

req.PayloadAsWrp.Source = "mac:112233445565"
req.PayloadAsWrp.Destination = "test"
obs.QueueWrp(req)

req.PayloadAsWrp.Source = "mac:112233445560"
req.PayloadAsWrp.Destination = "iot"
obs.QueueWrp(req)

req.PayloadAsWrp.Source = "mac:112233445560"
req.PayloadAsWrp.Destination = "test"
obs.QueueWrp(req)

obs.Shutdown(true)

Expand Down
5 changes: 3 additions & 2 deletions src/caduceus/senderWrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,13 @@ func (sw *CaduceusSenderWrapper) Queue(req CaduceusRequest) {
}

case "application/msgpack":
decoder := wrp.NewDecoderBytes(req.Payload, wrp.Msgpack)
decoder := wrp.NewDecoderBytes(req.RawPayload, wrp.Msgpack)
message := new(wrp.Message)
if err := decoder.Decode(message); nil == err {
req.PayloadAsWrp = message
sw.mutex.RLock()
for _, v := range sw.senders {
v.QueueWrp(req, message.Metadata, message.Destination, message.Source, message.TransactionUUID)
v.QueueWrp(req)
}
sw.mutex.RUnlock()
}
Expand Down
6 changes: 3 additions & 3 deletions src/caduceus/senderWrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,17 @@ func TestSwSimple(t *testing.T) {
assert.Nil(err)

iot := CaduceusRequest{
Payload: []byte("Hello, world."),
RawPayload: []byte("Hello, world."),
ContentType: "application/json",
TargetURL: "http://foo.com/api/v2/notify/mac:112233445566/event/iot",
}
test := CaduceusRequest{
Payload: []byte("Hello, world."),
RawPayload: []byte("Hello, world."),
ContentType: "application/json",
TargetURL: "http://foo.com/api/v2/notify/mac:112233445566/event/test",
}
wrp := CaduceusRequest{
Payload: buffer.Bytes(),
RawPayload: buffer.Bytes(),
ContentType: "application/msgpack",
TargetURL: "http://foo.com/api/v2/notify/mac:112233445566/event/wrp",
}
Expand Down

0 comments on commit 25da7b1

Please sign in to comment.