Skip to content

Commit

Permalink
Merge pull request #101 from Comcast/fix-99
Browse files Browse the repository at this point in the history
This change breaks the expired drop reason from the cut_off reason & …
  • Loading branch information
githubsands authored Sep 28, 2018
2 parents 9a8471a + 46e31be commit cf8cf73
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 113 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- Fix for cpu spike after about 10 mintues due to worker go routines not finishing.
- Fix logic for updating webhooks
- Fix for sending the same event multiple times to the same webhook.
- Fix for [issue 99](https://github.com/Comcast/caduceus/issues/99)

## [0.1.1] - 2018-04-06
### Added
Expand Down
240 changes: 127 additions & 113 deletions src/caduceus/outboundSender.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ type CaduceusOutboundSender struct {
deliveryCounter metrics.Counter
deliveryRetryCounter metrics.Counter
droppedQueueFullCounter metrics.Counter
droppedCutoffCounter metrics.Counter
droppedExpiredCounter metrics.Counter
droppedNetworkErrCounter metrics.Counter
droppedInvalidConfig metrics.Counter
Expand Down Expand Up @@ -206,6 +207,9 @@ func (osf OutboundSenderFactory) New() (obs OutboundSender, err error) {
caduceusOutboundSender.droppedExpiredCounter = osf.MetricsRegistry.
NewCounter(SlowConsumerDroppedMsgCounter).With("url", caduceusOutboundSender.id, "reason", "expired")

caduceusOutboundSender.droppedCutoffCounter = osf.MetricsRegistry.
NewCounter(SlowConsumerDroppedMsgCounter).With("url", caduceusOutboundSender.id, "reason", "cut_off")

caduceusOutboundSender.droppedInvalidConfig = osf.MetricsRegistry.
NewCounter(SlowConsumerDroppedMsgCounter).With("url", caduceusOutboundSender.id, "reason", "invalid_config")

Expand Down Expand Up @@ -269,7 +273,7 @@ func (obs *CaduceusOutboundSender) Update(wh webhook.W) (err error) {

obsCopy.events = append(obsCopy.events, re)
}
if nil == obsCopy.events || len(obsCopy.events) == 0{
if nil == obsCopy.events || len(obsCopy.events) == 0 {
err = errors.New("Events must not be empty.")
return
}
Expand Down Expand Up @@ -360,63 +364,69 @@ func (obs *CaduceusOutboundSender) Queue(msg *wrp.Message) {

var debugLog = logging.Debug(obs.logger)

if now.Before(deliverUntil) && now.After(dropUntil) {
for _, eventRegex := range events {
if eventRegex.MatchString(strings.TrimPrefix(msg.Destination, "event:")) {
matchDevice := (nil == matcher)
if nil != matcher {
for _, deviceRegex := range matcher {
if deviceRegex.MatchString(msg.Source) {
matchDevice = true
break
if now.After(dropUntil) {
if now.Before(deliverUntil) {
for _, eventRegex := range events {
if eventRegex.MatchString(strings.TrimPrefix(msg.Destination, "event:")) {
matchDevice := (nil == matcher)
if nil != matcher {
for _, deviceRegex := range matcher {
if deviceRegex.MatchString(msg.Source) {
matchDevice = true
break
}
}
}
}
/*
// if the device id matches then we want to look through all the metadata
// and make sure that the obs metadata matches the metadata provided
if matchDevice {
for key, val := range metaData {
if matchers, ok := matcher[key]; ok {
for _, deviceRegex := range matchers {
matchDevice = false
if deviceRegex.MatchString(val) {
matchDevice = true
break
/*
// if the device id matches then we want to look through all the metadata
// and make sure that the obs metadata matches the metadata provided
if matchDevice {
for key, val := range metaData {
if matchers, ok := matcher[key]; ok {
for _, deviceRegex := range matchers {
matchDevice = false
if deviceRegex.MatchString(val) {
matchDevice = true
break
}
}
}
// metadata was provided but did not match our expectations,
// so it is time to drop the message
if !matchDevice {
break
// metadata was provided but did not match our expectations,
// so it is time to drop the message
if !matchDevice {
break
}
}
}
}
*/
if matchDevice {
if len(obs.queue) < obs.queueSize {
obs.queueDepthGauge.Add(1.0)
obs.queue <- msg
debugLog.Log(logging.MessageKey(), "WRP Sent to obs queue", "url", obs.id)
// a regex was matched, no need to check further matches
break
} else {
obs.queueOverflow()
obs.droppedQueueFullCounter.Add(1.0)
}
}
*/
if matchDevice {
if len(obs.queue) < obs.queueSize {
obs.queueDepthGauge.Add(1.0)
obs.queue <- msg
debugLog.Log(logging.MessageKey(), "WRP Sent to obs queue", "url", obs.id)
// a regex was matched, no need to check further matches
break
} else {
obs.queueOverflow()
obs.droppedQueueFullCounter.Add(1.0)
}
} else {
debugLog.Log(logging.MessageKey(),
fmt.Sprintf("Regex did not match. got != expected: '%s' != '%s'\n",
msg.Destination, eventRegex.String()))
}
} else {
debugLog.Log(logging.MessageKey(),
fmt.Sprintf("Regex did not match. got != expected: '%s' != '%s'\n",
msg.Destination, eventRegex.String()))
}
} else {
debugLog.Log(logging.MessageKey(), "Outside delivery window",
"now", now, "before", deliverUntil, "after", dropUntil)
obs.droppedExpiredCounter.Add(1.0)
}
} else {
debugLog.Log(logging.MessageKey(), "Outside delivery window")
debugLog.Log("now", now, "before", deliverUntil, "after", dropUntil)
obs.droppedExpiredCounter.Add(1.0)
debugLog.Log(logging.MessageKey(), "Client has been cut off",
"now", now, "before", deliverUntil, "after", dropUntil)
obs.droppedCutoffCounter.Add(1.0)
}
}

Expand Down Expand Up @@ -504,86 +514,90 @@ func (obs *CaduceusOutboundSender) worker(id int) {
obs.mutex.RUnlock()

now := time.Now()
if now.Before(deliverUntil) && now.After(dropUntil) {
payload := msg.Payload
payloadReader := bytes.NewReader(payload)
req, err := http.NewRequest("POST", obs.id, payloadReader)
if nil != err {
// Report drop
obs.droppedInvalidConfig.Add(1.0)
logging.Error(obs.logger).Log(logging.MessageKey(), "Invalid URL", "url", obs.id,
logging.ErrorKey(), err)
} else {
req.Header.Set("Content-Type", msg.ContentType)
if now.After(dropUntil) {
if now.Before(deliverUntil) {
payload := msg.Payload
payloadReader := bytes.NewReader(payload)
req, err := http.NewRequest("POST", obs.id, payloadReader)
if nil != err {
// Report drop
obs.droppedInvalidConfig.Add(1.0)
logging.Error(obs.logger).Log(logging.MessageKey(), "Invalid URL", "url", obs.id,
logging.ErrorKey(), err)
} else {
req.Header.Set("Content-Type", msg.ContentType)

// Add x-Midt-* headers
wrphttp.AddMessageHeaders(req.Header, msg)
// Add x-Midt-* headers
wrphttp.AddMessageHeaders(req.Header, msg)

// Provide the old headers for now
req.Header.Set("X-Webpa-Event", strings.TrimPrefix(msg.Destination, "event:"))
req.Header.Set("X-Webpa-Transaction-Id", msg.TransactionUUID)
// Provide the old headers for now
req.Header.Set("X-Webpa-Event", strings.TrimPrefix(msg.Destination, "event:"))
req.Header.Set("X-Webpa-Transaction-Id", msg.TransactionUUID)

// Add the device id without the trailing service
id, _ := device.ParseID(msg.Source)
req.Header.Set("X-Webpa-Device-Id", string(id))
req.Header.Set("X-Webpa-Device-Name", string(id))
// Add the device id without the trailing service
id, _ := device.ParseID(msg.Source)
req.Header.Set("X-Webpa-Device-Id", string(id))
req.Header.Set("X-Webpa-Device-Name", string(id))

// get the latest secret hash
sh := *h.get()
// get the latest secret hash
sh := *h.get()

if nil != sh {
sh.Reset()
sh.Write(payload)
sig := fmt.Sprintf("sha1=%s", hex.EncodeToString(sh.Sum(nil)))
req.Header.Set("X-Webpa-Signature", sig)
}

// Setup the retry logic
simpleCounter.Count = 0.0
if nil != sh {
sh.Reset()
sh.Write(payload)
sig := fmt.Sprintf("sha1=%s", hex.EncodeToString(sh.Sum(nil)))
req.Header.Set("X-Webpa-Signature", sig)
}

// find the event "short name"
match := eventPattern.FindStringSubmatch(msg.Destination)
event := "unknown"
if match != nil {
event = match[1]
}
// Setup the retry logic
simpleCounter.Count = 0.0

// Send it
resp, err := xhttp.RetryTransactor(retryOptions, obs.sender)(req)
if nil != err {
// Report failure
obs.getCounter(obs.deliveryCounter, -1).With("event", event).Add(1.0)
obs.droppedNetworkErrCounter.Add(1.0)
} else {
// Report Result
switch resp.StatusCode {
case 200:
delivered200.With("event", event).Add(1.0)
retries200.With("event", event).Add(simpleCounter.Count)
case 201:
delivered201.With("event", event).Add(1.0)
retries201.With("event", event).Add(simpleCounter.Count)
case 202:
delivered202.With("event", event).Add(1.0)
retries202.With("event", event).Add(simpleCounter.Count)
case 204:
delivered204.With("event", event).Add(1.0)
retries204.With("event", event).Add(simpleCounter.Count)
default:
obs.getCounter(obs.deliveryCounter, resp.StatusCode).With("event", event).Add(1.0)
obs.getCounter(obs.deliveryRetryCounter, resp.StatusCode).With("event", event).Add(simpleCounter.Count)
// find the event "short name"
match := eventPattern.FindStringSubmatch(msg.Destination)
event := "unknown"
if match != nil {
event = match[1]
}

// read until the response is complete before closing to allow
// connection reuse
if nil != resp.Body {
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
// Send it
resp, err := xhttp.RetryTransactor(retryOptions, obs.sender)(req)
if nil != err {
// Report failure
obs.getCounter(obs.deliveryCounter, -1).With("event", event).Add(1.0)
obs.droppedNetworkErrCounter.Add(1.0)
} else {
// Report Result
switch resp.StatusCode {
case 200:
delivered200.With("event", event).Add(1.0)
retries200.With("event", event).Add(simpleCounter.Count)
case 201:
delivered201.With("event", event).Add(1.0)
retries201.With("event", event).Add(simpleCounter.Count)
case 202:
delivered202.With("event", event).Add(1.0)
retries202.With("event", event).Add(simpleCounter.Count)
case 204:
delivered204.With("event", event).Add(1.0)
retries204.With("event", event).Add(simpleCounter.Count)
default:
obs.getCounter(obs.deliveryCounter, resp.StatusCode).With("event", event).Add(1.0)
obs.getCounter(obs.deliveryRetryCounter, resp.StatusCode).With("event", event).Add(simpleCounter.Count)
}

// read until the response is complete before closing to allow
// connection reuse
if nil != resp.Body {
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
}
}
}
} else {
obs.droppedExpiredCounter.Add(1.0)
}
} else {
obs.droppedExpiredCounter.Add(1.0)
obs.droppedCutoffCounter.Add(1.0)
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/caduceus/outboundSender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func simpleFactorySetup(trans *transport, cutOffPeriod time.Duration, matcher []

fakeDroppedSlow := new(mockCounter)
fakeDroppedSlow.On("With", []string{"url", w.Config.URL, "reason", "queue_full"}).Return(fakeDroppedSlow)
fakeDroppedSlow.On("With", []string{"url", w.Config.URL, "reason", "cut_off"}).Return(fakeDroppedSlow)
fakeDroppedSlow.On("With", []string{"url", w.Config.URL, "reason", "expired"}).Return(fakeDroppedSlow)
fakeDroppedSlow.On("With", []string{"url", w.Config.URL, "reason", "invalid_config"}).Return(fakeDroppedSlow)
fakeDroppedSlow.On("With", []string{"url", w.Config.URL, "reason", "network_err"}).Return(fakeDroppedSlow)
Expand Down
2 changes: 2 additions & 0 deletions src/caduceus/senderWrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,12 @@ func getFakeFactory() *SenderWrapperFactory {
On("With", []string{"url", "unknown"}).Return(fakeIgnore).
On("With", []string{"url", "http://localhost:8888/foo"}).Return(fakeIgnore).
On("With", []string{"url", "http://localhost:9999/foo"}).Return(fakeIgnore).
On("With", []string{"url", "http://localhost:8888/foo", "reason", "cut_off"}).Return(fakeIgnore).
On("With", []string{"url", "http://localhost:8888/foo", "reason", "queue_full"}).Return(fakeIgnore).
On("With", []string{"url", "http://localhost:8888/foo", "reason", "expired"}).Return(fakeIgnore).
On("With", []string{"url", "http://localhost:8888/foo", "reason", "network_err"}).Return(fakeIgnore).
On("With", []string{"url", "http://localhost:8888/foo", "reason", "invalid_config"}).Return(fakeIgnore).
On("With", []string{"url", "http://localhost:9999/foo", "reason", "cut_off"}).Return(fakeIgnore).
On("With", []string{"url", "http://localhost:9999/foo", "reason", "queue_full"}).Return(fakeIgnore).
On("With", []string{"url", "http://localhost:9999/foo", "reason", "expired"}).Return(fakeIgnore).
On("With", []string{"url", "http://localhost:9999/foo", "reason", "network_err"}).Return(fakeIgnore).
Expand Down

0 comments on commit cf8cf73

Please sign in to comment.