From 5fc50ec748a167b24f0cd8584b2ae235f5483acd Mon Sep 17 00:00:00 2001 From: Weston Schmidt Date: Fri, 15 Feb 2019 18:15:31 -0800 Subject: [PATCH 1/4] Remove the outbound worker pool and replace it with short lived goroutines and a maximum limit configuration. --- CHANGELOG.md | 4 + src/caduceus/metrics.go | 2 +- src/caduceus/outboundSender.go | 295 ++++++++++------------------ src/caduceus/outboundSender_test.go | 13 +- src/caduceus/senderWrapper_test.go | 17 +- src/caduceus/simpleCounter.go | 38 ---- src/caduceus/simpleCounter_test.go | 40 ---- src/glide.yaml | 2 +- 8 files changed, 125 insertions(+), 286 deletions(-) delete mode 100644 src/caduceus/simpleCounter.go delete mode 100644 src/caduceus/simpleCounter_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 8be62ec2..91e6accd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,10 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). ## [Unreleased] +- Remove the worker pool as a fixed number of workers per endpoint and simply cap + the maximum number. +- Fix for webhook shallow copy bug. +- Fix for delivering events as json or msgpack based events - Fix for webhook update for all fields - Fix for retry logic so all failures are retried the specified number of times - Fix for waiting for DNS to resolve prior to listening for webhook updates diff --git a/src/caduceus/metrics.go b/src/caduceus/metrics.go index 521d4578..10d2b0e9 100644 --- a/src/caduceus/metrics.go +++ b/src/caduceus/metrics.go @@ -49,7 +49,7 @@ func Metrics() []xmetrics.Metric { Name: DeliveryRetryCounter, Help: "Number of delivery retries made", Type: "counter", - LabelNames: []string{"url", "code", "event"}, + LabelNames: []string{"url", "event"}, }, { Name: DeliveryCounter, diff --git a/src/caduceus/outboundSender.go b/src/caduceus/outboundSender.go index efafd843..f7bac4de 100644 --- a/src/caduceus/outboundSender.go +++ b/src/caduceus/outboundSender.go @@ -24,7 +24,6 @@ import ( "encoding/json" "errors" "fmt" - "hash" "io" "io/ioutil" "net/http" @@ -33,11 +32,11 @@ import ( "strconv" "strings" "sync" - "sync/atomic" "time" "github.com/Comcast/webpa-common/device" "github.com/Comcast/webpa-common/logging" + "github.com/Comcast/webpa-common/semaphore" "github.com/Comcast/webpa-common/webhook" "github.com/Comcast/webpa-common/wrp" "github.com/Comcast/webpa-common/wrp/wrphttp" @@ -125,8 +124,6 @@ type CaduceusOutboundSender struct { deliverUntil time.Time dropUntil time.Time sender func(*http.Request) (*http.Response, error) - secret []byte - secretChan chan []byte events []*regexp.Regexp matcher []*regexp.Regexp queueSize int @@ -144,10 +141,11 @@ type CaduceusOutboundSender struct { queueDepthGauge metrics.Gauge wg sync.WaitGroup cutOffPeriod time.Duration + workers semaphore.Interface + maxWorkers int failureMsg FailureMessage logger log.Logger mutex sync.RWMutex - shutdownChan chan bool } // New creates a new OutboundSender object from the factory, or returns an error. @@ -175,13 +173,13 @@ func (osf OutboundSenderFactory) New() (obs OutboundSender, err error) { id: osf.Listener.Config.URL, listener: osf.Listener, sender: osf.Sender, - secretChan: make(chan []byte, 10), queueSize: osf.QueueSize, cutOffPeriod: osf.CutOffPeriod, deliverUntil: osf.Listener.Until, logger: osf.Logger, deliveryRetries: osf.DeliveryRetries, deliveryInterval: osf.DeliveryInterval, + maxWorkers: osf.NumWorkers, failureMsg: FailureMessage{ Original: osf.Listener, Text: failureText, @@ -189,7 +187,6 @@ func (osf OutboundSenderFactory) New() (obs OutboundSender, err error) { QueueSize: osf.QueueSize, Workers: osf.NumWorkers, }, - shutdownChan: make(chan bool, 10), } // Don't share the secret with others when there is an error. @@ -227,12 +224,12 @@ func (osf OutboundSenderFactory) New() (obs OutboundSender, err error) { return } - caduceusOutboundSender.wg.Add(osf.NumWorkers) - for i := 0; i < osf.NumWorkers; i++ { - go caduceusOutboundSender.worker(i) - } + caduceusOutboundSender.workers = semaphore.New(caduceusOutboundSender.maxWorkers) + caduceusOutboundSender.wg.Add(1) + go caduceusOutboundSender.dispatcher() obs = caduceusOutboundSender + return } @@ -262,12 +259,6 @@ func (obs *CaduceusOutboundSender) Update(wh webhook.W) (err error) { return } - // Make the secret bytes if available - var secret []byte - if "" != wh.Config.Secret { - secret = []byte(wh.Config.Secret) - } - // Create the matcher regex objects matcher := []*regexp.Regexp{} for _, item := range wh.Matcher.DeviceId { @@ -294,7 +285,6 @@ func (obs *CaduceusOutboundSender) Update(wh webhook.W) (err error) { // Don't share the secret with others when there is an error. obs.failureMsg.Original.Config.Secret = "XxxxxX" - obs.secret = secret obs.listener.FailureURL = wh.FailureURL obs.deliverUntil = wh.Until obs.events = events @@ -307,8 +297,6 @@ func (obs *CaduceusOutboundSender) Update(wh webhook.W) (err error) { obs.mutex.Unlock() - obs.secretChan <- secret - return } @@ -316,11 +304,7 @@ func (obs *CaduceusOutboundSender) Update(wh webhook.W) (err error) { // abruptly based on the gentle parameter. If gentle is false, all queued // messages will be dropped without an attempt to send made. func (obs *CaduceusOutboundSender) Shutdown(gentle bool) { - obs.shutdownChan <- true - close(obs.queue) - close(obs.secretChan) - close(obs.shutdownChan) obs.mutex.Lock() if false == gentle { @@ -425,189 +409,124 @@ func (obs *CaduceusOutboundSender) Queue(msg *wrp.Message) { } } -// helper function to get the right delivery counter to increment -func (obs *CaduceusOutboundSender) getCounter(c metrics.Counter, status int) metrics.Counter { - if -1 == status { - return c.With("url", obs.id, "code", "failure") - } - - s := strconv.Itoa(status) - return c.With("url", obs.id, "code", s) -} - -type secretHash struct { - value atomic.Value -} - -func (sh *secretHash) set(h *hash.Hash) { - sh.value.Store(h) -} - -func (sh *secretHash) get() *hash.Hash { - if h, ok := sh.value.Load().(*hash.Hash); ok { - return h - } - - return nil -} - -// worker is the routine that actually takes the queued messages and delivers -// them to the listeners outside webpa -func (obs *CaduceusOutboundSender) worker(id int) { +func (obs *CaduceusOutboundSender) dispatcher() { defer obs.wg.Done() - // Make a local copy of the hmac - var h secretHash - h.set(new(hash.Hash)) - - // routine that will listen for secret changes - // if a change comes in, both the local secret copy and the hash are updated - go func(sc chan []byte, shutdown chan bool) { - for { - select { - case secret := <-sc: - // Create the base sha1 hash object for each thread - if nil != secret { - t := hmac.New(sha1.New, secret) - h.set(&t) - } else { - h.set(new(hash.Hash)) - } - case <-shutdown: - return - } - } - }(obs.secretChan, obs.shutdownChan) - - // Setup the retry structs once - simpleCounter := &SimpleCounter{} - - retryOptions := xhttp.RetryOptions{ - Logger: obs.logger, - Retries: obs.deliveryRetries, - Interval: obs.deliveryInterval, - Counter: simpleCounter, - // Always retry on failures up to the max count. - ShouldRetry: func(error) bool { return true }, - } - - // Only optimize the successful answers - delivered200 := obs.getCounter(obs.deliveryCounter, 200) - delivered201 := obs.getCounter(obs.deliveryCounter, 201) - delivered202 := obs.getCounter(obs.deliveryCounter, 202) - delivered204 := obs.getCounter(obs.deliveryCounter, 204) - retries200 := obs.getCounter(obs.deliveryRetryCounter, 200) - retries201 := obs.getCounter(obs.deliveryRetryCounter, 201) - retries202 := obs.getCounter(obs.deliveryRetryCounter, 202) - retries204 := obs.getCounter(obs.deliveryRetryCounter, 204) - for msg := range obs.queue { obs.queueDepthGauge.Add(-1.0) + obs.mutex.RLock() deliverUntil := obs.deliverUntil dropUntil := obs.dropUntil + secret := obs.listener.Config.Secret obs.mutex.RUnlock() now := time.Now() + if now.After(dropUntil) { if now.Before(deliverUntil) { - payload := msg.Payload - var payloadReader *bytes.Reader - if obs.listener.Config.ContentType == "wrp" { - buffer := bytes.NewBuffer([]byte{}) - var f wrp.Format - switch msg.ContentType { - case "json": - f = wrp.JSON - default: - f = wrp.Msgpack - } - encoder := wrp.NewEncoder(buffer, f) - encoder.Encode(msg) - payloadReader = bytes.NewReader(buffer.Bytes()) - } else { - payloadReader = bytes.NewReader(payload) - } - req, err := http.NewRequest("POST", obs.id, payloadReader) - if nil != err { - // Report drop - obs.droppedInvalidConfig.Add(1.0) - logging.Error(obs.logger).Log(logging.MessageKey(), "Invalid URL", "url", obs.id, - logging.ErrorKey(), err) - } else { - req.Header.Set("Content-Type", msg.ContentType) + obs.workers.Acquire() + go obs.send(secret, msg) + } else { + obs.droppedExpiredCounter.Add(1.0) + } + } else { + obs.droppedCutoffCounter.Add(1.0) + } + } - // Add x-Midt-* headers - wrphttp.AddMessageHeaders(req.Header, msg) + // Grab all the workers to make sure they are done. + for i := 0; i < obs.maxWorkers; i++ { + obs.workers.Acquire() + } +} - // Provide the old headers for now - req.Header.Set("X-Webpa-Event", strings.TrimPrefix(msg.Destination, "event:")) - req.Header.Set("X-Webpa-Transaction-Id", msg.TransactionUUID) +// worker is the routine that actually takes the queued messages and delivers +// them to the listeners outside webpa +func (obs *CaduceusOutboundSender) send(secret string, msg *wrp.Message) { + defer obs.workers.Release() + + payload := msg.Payload + var payloadReader *bytes.Reader + if obs.listener.Config.ContentType == "wrp" { + // WTS - I'm not sure if this is correct. + buffer := bytes.NewBuffer([]byte{}) + var f wrp.Format + switch msg.ContentType { + case "json": + f = wrp.JSON + default: + f = wrp.Msgpack + } + encoder := wrp.NewEncoder(buffer, f) + encoder.Encode(msg) + payloadReader = bytes.NewReader(buffer.Bytes()) + } else { + payloadReader = bytes.NewReader(payload) + } - // Add the device id without the trailing service - id, _ := device.ParseID(msg.Source) - req.Header.Set("X-Webpa-Device-Id", string(id)) - req.Header.Set("X-Webpa-Device-Name", string(id)) + req, err := http.NewRequest("POST", obs.id, payloadReader) + if nil != err { + // Report drop + obs.droppedInvalidConfig.Add(1.0) + logging.Error(obs.logger).Log(logging.MessageKey(), "Invalid URL", "url", obs.id, + logging.ErrorKey(), err) + } else { + req.Header.Set("Content-Type", msg.ContentType) - // get the latest secret hash - sh := *h.get() + // Add x-Midt-* headers + wrphttp.AddMessageHeaders(req.Header, msg) - if nil != sh { - sh.Reset() - sh.Write(payload) - sig := fmt.Sprintf("sha1=%s", hex.EncodeToString(sh.Sum(nil))) - req.Header.Set("X-Webpa-Signature", sig) - } + // Provide the old headers for now + req.Header.Set("X-Webpa-Event", strings.TrimPrefix(msg.Destination, "event:")) + req.Header.Set("X-Webpa-Transaction-Id", msg.TransactionUUID) - // Setup the retry logic - simpleCounter.Count = 0.0 + // Add the device id without the trailing service + id, _ := device.ParseID(msg.Source) + req.Header.Set("X-Webpa-Device-Id", string(id)) + req.Header.Set("X-Webpa-Device-Name", string(id)) - // find the event "short name" - match := eventPattern.FindStringSubmatch(msg.Destination) - event := "unknown" - if match != nil { - event = match[1] - } + // Apply the secret - // Send it - resp, err := xhttp.RetryTransactor(retryOptions, obs.sender)(req) - if nil != err { - // Report failure - obs.getCounter(obs.deliveryCounter, -1).With("event", event).Add(1.0) - obs.droppedNetworkErrCounter.Add(1.0) - } else { - // Report Result - switch resp.StatusCode { - case 200: - delivered200.With("event", event).Add(1.0) - retries200.With("event", event).Add(simpleCounter.Count) - case 201: - delivered201.With("event", event).Add(1.0) - retries201.With("event", event).Add(simpleCounter.Count) - case 202: - delivered202.With("event", event).Add(1.0) - retries202.With("event", event).Add(simpleCounter.Count) - case 204: - delivered204.With("event", event).Add(1.0) - retries204.With("event", event).Add(simpleCounter.Count) - default: - obs.getCounter(obs.deliveryCounter, resp.StatusCode).With("event", event).Add(1.0) - obs.getCounter(obs.deliveryRetryCounter, resp.StatusCode).With("event", event).Add(simpleCounter.Count) - } + if "" != secret { + s := hmac.New(sha1.New, []byte(secret)) + s.Write(payload) + sig := fmt.Sprintf("sha1=%s", hex.EncodeToString(s.Sum(nil))) + req.Header.Set("X-Webpa-Signature", sig) + } - // read until the response is complete before closing to allow - // connection reuse - if nil != resp.Body { - io.Copy(ioutil.Discard, resp.Body) - resp.Body.Close() - } - } - } - } else { - obs.droppedExpiredCounter.Add(1.0) - } + // find the event "short name" + match := eventPattern.FindStringSubmatch(msg.Destination) + event := "unknown" + if match != nil { + event = match[1] + } + + retryOptions := xhttp.RetryOptions{ + Logger: obs.logger, + Retries: obs.deliveryRetries, + Interval: obs.deliveryInterval, + Counter: obs.deliveryRetryCounter.With("url", obs.id, "event", event), + // Always retry on failures up to the max count. + ShouldRetry: func(error) bool { return true }, + } + + // Send it + resp, err := xhttp.RetryTransactor(retryOptions, obs.sender)(req) + if nil != err { + // Report failure + obs.deliveryCounter.With("url", obs.id, "code", "failure", "event", event).Add(1.0) + obs.droppedNetworkErrCounter.Add(1.0) } else { - obs.droppedCutoffCounter.Add(1.0) + // Report Result + obs.deliveryCounter.With("url", obs.id, "code", strconv.Itoa(resp.StatusCode), "event", event).Add(1.0) + + // read until the response is complete before closing to allow + // connection reuse + if nil != resp.Body { + io.Copy(ioutil.Discard, resp.Body) + resp.Body.Close() + } } } } @@ -616,7 +535,7 @@ func (obs *CaduceusOutboundSender) worker(id int) { func (obs *CaduceusOutboundSender) queueOverflow() { obs.mutex.Lock() obs.dropUntil = time.Now().Add(obs.cutOffPeriod) - secret := obs.secret + secret := obs.listener.Config.Secret failureMsg := obs.failureMsg failureURL := obs.listener.FailureURL obs.mutex.Unlock() @@ -648,8 +567,8 @@ func (obs *CaduceusOutboundSender) queueOverflow() { } else { req.Header.Set("Content-Type", "application/json") - if nil != obs.secret { - h := hmac.New(sha1.New, secret) + if "" != secret { + h := hmac.New(sha1.New, []byte(secret)) h.Write(msg) sig := fmt.Sprintf("sha1=%s", hex.EncodeToString(h.Sum(nil))) req.Header.Set("X-Webpa-Signature", sig) diff --git a/src/caduceus/outboundSender_test.go b/src/caduceus/outboundSender_test.go index f290e7c6..eb4e3d91 100644 --- a/src/caduceus/outboundSender_test.go +++ b/src/caduceus/outboundSender_test.go @@ -76,14 +76,17 @@ func simpleFactorySetup(trans *transport, cutOffPeriod time.Duration, matcher [] w.Matcher.DeviceId = matcher fakeDC := new(mockCounter) - fakeDC.On("With", []string{"url", w.Config.URL, "code", "200"}).Return(fakeDC). + fakeDC.On("With", []string{"url", w.Config.URL, "code", "200", "event", "test"}).Return(fakeDC). + On("With", []string{"url", w.Config.URL, "code", "200", "event", "iot"}).Return(fakeDC). + On("With", []string{"url", w.Config.URL, "code", "200", "event", "unknown"}).Return(fakeDC). + On("With", []string{"url", w.Config.URL, "code", "failure", "event", "iot"}).Return(fakeDC). + On("With", []string{"url", w.Config.URL, "event", "test"}).Return(fakeDC). + On("With", []string{"url", w.Config.URL, "event", "iot"}).Return(fakeDC). + On("With", []string{"url", w.Config.URL, "event", "unknown"}).Return(fakeDC). On("With", []string{"url", w.Config.URL, "code", "201"}).Return(fakeDC). On("With", []string{"url", w.Config.URL, "code", "202"}).Return(fakeDC). On("With", []string{"url", w.Config.URL, "code", "204"}).Return(fakeDC). - On("With", []string{"url", w.Config.URL, "code", "failure"}).Return(fakeDC). - On("With", []string{"event", "iot"}).Return(fakeDC). - On("With", []string{"event", "unknown"}).Return(fakeDC). - On("With", []string{"event", "test"}).Return(fakeDC) + On("With", []string{"url", w.Config.URL, "code", "failure"}).Return(fakeDC) fakeDC.On("Add", 1.0).Return() fakeDC.On("Add", 0.0).Return() diff --git a/src/caduceus/senderWrapper_test.go b/src/caduceus/senderWrapper_test.go index 88fb206d..db08df37 100644 --- a/src/caduceus/senderWrapper_test.go +++ b/src/caduceus/senderWrapper_test.go @@ -79,9 +79,10 @@ func getFakeFactory() *SenderWrapperFactory { fakeIgnore := new(mockCounter) fakeIgnore.On("Add", 1.0).Return().On("Add", 0.0).Return(). - On("With", []string{"url", "unknown"}).Return(fakeIgnore). On("With", []string{"url", "http://localhost:8888/foo"}).Return(fakeIgnore). On("With", []string{"url", "http://localhost:9999/foo"}).Return(fakeIgnore). + On("With", []string{"url", "http://localhost:8888/foo", "event", "unknown"}).Return(fakeIgnore). + On("With", []string{"url", "http://localhost:9999/foo", "event", "unknown"}).Return(fakeIgnore). On("With", []string{"url", "http://localhost:8888/foo", "reason", "cut_off"}).Return(fakeIgnore). On("With", []string{"url", "http://localhost:8888/foo", "reason", "queue_full"}).Return(fakeIgnore). On("With", []string{"url", "http://localhost:8888/foo", "reason", "expired"}).Return(fakeIgnore). @@ -92,18 +93,8 @@ func getFakeFactory() *SenderWrapperFactory { On("With", []string{"url", "http://localhost:9999/foo", "reason", "expired"}).Return(fakeIgnore). On("With", []string{"url", "http://localhost:9999/foo", "reason", "network_err"}).Return(fakeIgnore). On("With", []string{"url", "http://localhost:9999/foo", "reason", "invalid_config"}).Return(fakeIgnore). - On("With", []string{"url", "http://localhost:8888/foo", "code", "200"}).Return(fakeIgnore). - On("With", []string{"url", "http://localhost:8888/foo", "code", "201"}).Return(fakeIgnore). - On("With", []string{"url", "http://localhost:8888/foo", "code", "202"}).Return(fakeIgnore). - On("With", []string{"url", "http://localhost:8888/foo", "code", "204"}).Return(fakeIgnore). - On("With", []string{"url", "http://localhost:9999/foo", "code", "200"}).Return(fakeIgnore). - On("With", []string{"url", "http://localhost:9999/foo", "code", "201"}).Return(fakeIgnore). - On("With", []string{"url", "http://localhost:9999/foo", "code", "202"}).Return(fakeIgnore). - On("With", []string{"url", "http://localhost:9999/foo", "code", "204"}).Return(fakeIgnore). - On("With", []string{"event", "test/extra-stuff"}).Return(fakeIgnore). - On("With", []string{"event", "wrp"}).Return(fakeIgnore). - On("With", []string{"event", "unknown"}).Return(fakeIgnore). - On("With", []string{"event", "iot"}).Return(fakeIgnore) + On("With", []string{"url", "http://localhost:8888/foo", "code", "200", "event", "unknown"}).Return(fakeIgnore). + On("With", []string{"url", "http://localhost:9999/foo", "code", "200", "event", "unknown"}).Return(fakeIgnore) fakeRegistry := new(mockCaduceusMetricsRegistry) fakeRegistry.On("NewCounter", IncomingContentTypeCounter).Return(fakeICTC) diff --git a/src/caduceus/simpleCounter.go b/src/caduceus/simpleCounter.go deleted file mode 100644 index cbf69513..00000000 --- a/src/caduceus/simpleCounter.go +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Copyright 2018 Comcast Cable Communications Management, LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package main - -import "github.com/go-kit/kit/metrics" - -// This is a non-concurrent safe counter that lets a single goroutine aggregate -// a metric before adding them to a larger correlated metric. -type SimpleCounter struct { - // The active count - Count float64 -} - -// With implements Counter. -func (s *SimpleCounter) With(labelValues ...string) metrics.Counter { - return s -} - -// Add implements Counter. -func (s *SimpleCounter) Add(delta float64) { - if 0.0 < delta { - s.Count += delta - } -} diff --git a/src/caduceus/simpleCounter_test.go b/src/caduceus/simpleCounter_test.go deleted file mode 100644 index 08701795..00000000 --- a/src/caduceus/simpleCounter_test.go +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Copyright 2018 Comcast Cable Communications Management, LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package main - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestSimpleCounter(t *testing.T) { - assert := assert.New(t) - - s := &SimpleCounter{Count: 0} - newS := s.With("foo", "bar") - assert.True(s == newS) - - s.Add(1.0) - assert.True(1.0 == s.Count) - - s.Add(10.0) - assert.True(11.0 == s.Count) - - s.Add(-10.0) - assert.True(11.0 == s.Count) -} diff --git a/src/glide.yaml b/src/glide.yaml index 682a6609..504c57c0 100644 --- a/src/glide.yaml +++ b/src/glide.yaml @@ -1,6 +1,6 @@ package: . import: - package: github.com/Comcast/webpa-common - version: 6ca7d6c5e78ebba17483f5d0e61c7c119b43619a + version: 0f087c5390cb60f4bdff2a27d1858c5e44755ce4 - package: github.com/satori/go.uuid version: f58768cc1a7a7e77a3bd49e98cdd21419399b6a3 From e3d47f19ee65369e18913bfb06194a30415b7a91 Mon Sep 17 00:00:00 2001 From: Weston Schmidt Date: Fri, 15 Feb 2019 18:28:16 -0800 Subject: [PATCH 2/4] Add the glide lock file. --- src/glide.lock | 40 +++++++++++++++++++++------------------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/src/glide.lock b/src/glide.lock index 3aed32b1..7014496f 100644 --- a/src/glide.lock +++ b/src/glide.lock @@ -1,5 +1,5 @@ -hash: 5c9aa43d5410004080968bf0948bf1f1ee619d6269cb56ed1e499e57cd3a4fe7 -updated: 2018-10-11T15:02:53.818112-07:00 +hash: 98a3ab8aa409ebc6af88888bdb48f1cf25af970ed8ea0a3bbcaae1d01f7d51dc +updated: 2019-02-14T18:47:17.113403999-08:00 imports: - name: github.com/aws/aws-sdk-go version: 7be45195c3af1b54a609812f90c05a7e492e2491 @@ -38,11 +38,12 @@ imports: subpackages: - linux - name: github.com/Comcast/webpa-common - version: 6ca7d6c5e78ebba17483f5d0e61c7c119b43619a + version: 0f087c5390cb60f4bdff2a27d1858c5e44755ce4 subpackages: - concurrent - convey - convey/conveyhttp + - convey/conveymetric - device - health - logging @@ -50,17 +51,19 @@ imports: - secure - secure/handler - secure/key + - semaphore - server - types - webhook - webhook/aws - wrp - wrp/wrphttp + - wrp/wrpmeta - xhttp - xlistener - xmetrics - name: github.com/davecgh/go-spew - version: 346938d642f2ec3594ed81d874461961cd0faa76 + version: 8991bc29aa16c548c550c7ff78260e27b9ab7c73 subpackages: - spew - name: github.com/fsnotify/fsnotify @@ -68,7 +71,7 @@ imports: - name: github.com/go-ini/ini version: bda519ae5f4cbc60d391ff8610711627a08b86ae - name: github.com/go-kit/kit - version: 4dc7be5d2d12881735283bcab7352178e190fc71 + version: 12210fb6ace19e0496167bb3e667dcd91fa9f69b subpackages: - endpoint - log @@ -89,10 +92,8 @@ imports: - util/conn - name: github.com/go-logfmt/logfmt version: 390ab7935ee28ec6b286364bba9b4dd6410cb3d5 -- name: github.com/go-stack/stack - version: 817915b46b97fd7bb80e8ab6b69f01a53ac3eebf - name: github.com/golang/protobuf - version: b4deda0973fb4c70b50d226b1af49f3da59f5265 + version: 9eb2c01ac278a5d89ce4b2be68fe4500955d8179 subpackages: - proto - name: github.com/gorilla/context @@ -106,6 +107,7 @@ imports: subpackages: - hcl/ast - hcl/parser + - hcl/printer - hcl/scanner - hcl/strconv - hcl/token @@ -113,7 +115,7 @@ imports: - json/scanner - json/token - name: github.com/influxdata/influxdb - version: d977c0ac2494a59d72f41dc277771a3d297b8e98 + version: 48797873ee24fc1dcb5a63f23474d3210f6d68c5 subpackages: - client/v2 - models @@ -135,11 +137,11 @@ imports: - name: github.com/miekg/dns version: ba6747e8a94115e9dc7738afb87850687611df1b - name: github.com/mitchellh/mapstructure - version: f15292f7a699fcc1a38a80977f80a046874ba8ac + version: bb74f1db0675b241733089d5a1faa5dd8b0ef57b - name: github.com/pelletier/go-toml - version: 603baefff989777996bf283da430d693e78eba3a + version: c01d1270ff3e442a8a57cddc1c92dc1138598194 - name: github.com/pmezard/go-difflib - version: d8ed2627bdf02c080bf22230dbb337003b7aba2d + version: 792786c7400a136282c1664665ae0a8db921c6c2 subpackages: - difflib - name: github.com/prometheus/client_golang @@ -148,7 +150,7 @@ imports: - prometheus - prometheus/promhttp - name: github.com/prometheus/client_model - version: 5c3871d89910bfb32f5fcab2aa4b9ec68e65a99f + version: 99fa1f4be8e564e8a6b613da7fa6f46c9edafc6c subpackages: - go - name: github.com/prometheus/common @@ -182,9 +184,9 @@ imports: - name: github.com/spf13/pflag version: e57e3eeb33f795204c1ca35f56c44f83227c6e66 - name: github.com/spf13/viper - version: 25b30aa063fc18e48662b86996252eabdcf2f0c7 + version: 6d33b5a963d922d182c91e8a1c88d81fd150cfd4 - name: github.com/stretchr/objx - version: cbeaeb16a013161a98496fad62933b1d21786672 + version: 9e1dfc121bca96d392da5d00591953bdb54ab306 - name: github.com/stretchr/testify version: 12b6f73e6084dad08a7c6e575284b177ecafbc71 subpackages: @@ -193,13 +195,13 @@ imports: - mock - require - name: github.com/ugorji/go - version: 00a57e09e383d445aeef6c6cd642969dc4360231 + version: e5e69e061d4f7ee3a69b793cf9c1b41afe21918e subpackages: - codec - name: github.com/VividCortex/gohistogram version: 51564d9861991fb0ad0f531c99ef602d0f9866e6 - name: golang.org/x/crypto - version: e3636079e1a4c1f337f212cc5cd2aca108f6c900 + version: 74369b46fc6756741c016591724fd1cb8e26845f subpackages: - ed25519 - ed25519/internal/edwards25519 @@ -212,11 +214,11 @@ imports: - ipv4 - ipv6 - name: golang.org/x/sys - version: ac767d655b305d4e9612f5f6e33120b9176c4ad4 + version: 7138fd3d9dc8335c567ca206f4333fb75eb05d56 subpackages: - unix - name: golang.org/x/text - version: f21a4dfb5e38f5895301dc265a8def02365cc3d0 + version: 5cec4b58c438bd98288aeb248bab2c1840713d21 subpackages: - transform - unicode/norm From 9ea0513f49cd63b6b49469b576b76f9a9d180f98 Mon Sep 17 00:00:00 2001 From: Weston Schmidt Date: Mon, 18 Feb 2019 09:37:44 -0800 Subject: [PATCH 3/4] Fix the race condition. --- src/caduceus/outboundSender.go | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/src/caduceus/outboundSender.go b/src/caduceus/outboundSender.go index f7bac4de..7de36aae 100644 --- a/src/caduceus/outboundSender.go +++ b/src/caduceus/outboundSender.go @@ -419,6 +419,7 @@ func (obs *CaduceusOutboundSender) dispatcher() { deliverUntil := obs.deliverUntil dropUntil := obs.dropUntil secret := obs.listener.Config.Secret + accept := obs.listener.Config.ContentType obs.mutex.RUnlock() now := time.Now() @@ -426,7 +427,7 @@ func (obs *CaduceusOutboundSender) dispatcher() { if now.After(dropUntil) { if now.Before(deliverUntil) { obs.workers.Acquire() - go obs.send(secret, msg) + go obs.send(secret, accept, msg) } else { obs.droppedExpiredCounter.Add(1.0) } @@ -443,22 +444,17 @@ func (obs *CaduceusOutboundSender) dispatcher() { // worker is the routine that actually takes the queued messages and delivers // them to the listeners outside webpa -func (obs *CaduceusOutboundSender) send(secret string, msg *wrp.Message) { +func (obs *CaduceusOutboundSender) send(secret, acceptType string, msg *wrp.Message) { defer obs.workers.Release() payload := msg.Payload var payloadReader *bytes.Reader - if obs.listener.Config.ContentType == "wrp" { - // WTS - I'm not sure if this is correct. + + if acceptType == "wrp" { + // WTS - We should pass the original, raw WRP event instead of + // re-encoding it. buffer := bytes.NewBuffer([]byte{}) - var f wrp.Format - switch msg.ContentType { - case "json": - f = wrp.JSON - default: - f = wrp.Msgpack - } - encoder := wrp.NewEncoder(buffer, f) + encoder := wrp.NewEncoder(buffer, wrp.Msgpack) encoder.Encode(msg) payloadReader = bytes.NewReader(buffer.Bytes()) } else { From 07c37ed8022dd97dcbb2f5fedec44be1c456901d Mon Sep 17 00:00:00 2001 From: Weston Schmidt Date: Mon, 18 Feb 2019 13:06:03 -0800 Subject: [PATCH 4/4] Simplify if/else condition statements to make the code more readable. --- src/caduceus/outboundSender.go | 289 +++++++++++++++++---------------- 1 file changed, 153 insertions(+), 136 deletions(-) diff --git a/src/caduceus/outboundSender.go b/src/caduceus/outboundSender.go index 7de36aae..a0accb64 100644 --- a/src/caduceus/outboundSender.go +++ b/src/caduceus/outboundSender.go @@ -343,70 +343,83 @@ func (obs *CaduceusOutboundSender) Queue(msg *wrp.Message) { var debugLog = logging.Debug(obs.logger) - if now.After(dropUntil) { - if now.Before(deliverUntil) { - for _, eventRegex := range events { - if eventRegex.MatchString(strings.TrimPrefix(msg.Destination, "event:")) { - matchDevice := (nil == matcher) - if nil != matcher { - for _, deviceRegex := range matcher { - if deviceRegex.MatchString(msg.Source) { + if false == obs.isValidTimeWindow(now, dropUntil, deliverUntil) { + return + } + + for _, eventRegex := range events { + if false == eventRegex.MatchString(strings.TrimPrefix(msg.Destination, "event:")) { + debugLog.Log(logging.MessageKey(), + fmt.Sprintf("Regex did not match. got != expected: '%s' != '%s'\n", + msg.Destination, eventRegex.String())) + continue + } + + matchDevice := (nil == matcher) + if nil != matcher { + for _, deviceRegex := range matcher { + if deviceRegex.MatchString(msg.Source) { + matchDevice = true + break + } + } + } + /* + // if the device id matches then we want to look through all the metadata + // and make sure that the obs metadata matches the metadata provided + if matchDevice { + for key, val := range metaData { + if matchers, ok := matcher[key]; ok { + for _, deviceRegex := range matchers { + matchDevice = false + if deviceRegex.MatchString(val) { matchDevice = true break } } - } - /* - // if the device id matches then we want to look through all the metadata - // and make sure that the obs metadata matches the metadata provided - if matchDevice { - for key, val := range metaData { - if matchers, ok := matcher[key]; ok { - for _, deviceRegex := range matchers { - matchDevice = false - if deviceRegex.MatchString(val) { - matchDevice = true - break - } - } - - // metadata was provided but did not match our expectations, - // so it is time to drop the message - if !matchDevice { - break - } - } - } - } - */ - if matchDevice { - if len(obs.queue) < obs.queueSize { - obs.queueDepthGauge.Add(1.0) - obs.queue <- msg - debugLog.Log(logging.MessageKey(), "WRP Sent to obs queue", "url", obs.id) - // a regex was matched, no need to check further matches + + // metadata was provided but did not match our expectations, + // so it is time to drop the message + if !matchDevice { break - } else { - obs.queueOverflow() - obs.droppedQueueFullCounter.Add(1.0) } } - } else { - debugLog.Log(logging.MessageKey(), - fmt.Sprintf("Regex did not match. got != expected: '%s' != '%s'\n", - msg.Destination, eventRegex.String())) } } - } else { - debugLog.Log(logging.MessageKey(), "Outside delivery window", - "now", now, "before", deliverUntil, "after", dropUntil) - obs.droppedExpiredCounter.Add(1.0) + */ + if matchDevice { + if len(obs.queue) < obs.queueSize { + obs.queueDepthGauge.Add(1.0) + obs.queue <- msg + debugLog.Log(logging.MessageKey(), "WRP Sent to obs queue", "url", obs.id) + // a regex was matched, no need to check further matches + break + } else { + obs.queueOverflow() + obs.droppedQueueFullCounter.Add(1.0) + } } - } else { + } +} + +func (obs *CaduceusOutboundSender) isValidTimeWindow(now, dropUntil, deliverUntil time.Time) bool { + var debugLog = logging.Debug(obs.logger) + + if false == now.After(dropUntil) { debugLog.Log(logging.MessageKey(), "Client has been cut off", "now", now, "before", deliverUntil, "after", dropUntil) obs.droppedCutoffCounter.Add(1.0) + return false + } + + if false == now.Before(deliverUntil) { + debugLog.Log(logging.MessageKey(), "Outside delivery window", + "now", now, "before", deliverUntil, "after", dropUntil) + obs.droppedExpiredCounter.Add(1.0) + return false } + + return true } func (obs *CaduceusOutboundSender) dispatcher() { @@ -467,64 +480,66 @@ func (obs *CaduceusOutboundSender) send(secret, acceptType string, msg *wrp.Mess obs.droppedInvalidConfig.Add(1.0) logging.Error(obs.logger).Log(logging.MessageKey(), "Invalid URL", "url", obs.id, logging.ErrorKey(), err) - } else { - req.Header.Set("Content-Type", msg.ContentType) + return + } - // Add x-Midt-* headers - wrphttp.AddMessageHeaders(req.Header, msg) + req.Header.Set("Content-Type", msg.ContentType) - // Provide the old headers for now - req.Header.Set("X-Webpa-Event", strings.TrimPrefix(msg.Destination, "event:")) - req.Header.Set("X-Webpa-Transaction-Id", msg.TransactionUUID) + // Add x-Midt-* headers + wrphttp.AddMessageHeaders(req.Header, msg) - // Add the device id without the trailing service - id, _ := device.ParseID(msg.Source) - req.Header.Set("X-Webpa-Device-Id", string(id)) - req.Header.Set("X-Webpa-Device-Name", string(id)) + // Provide the old headers for now + req.Header.Set("X-Webpa-Event", strings.TrimPrefix(msg.Destination, "event:")) + req.Header.Set("X-Webpa-Transaction-Id", msg.TransactionUUID) - // Apply the secret + // Add the device id without the trailing service + id, _ := device.ParseID(msg.Source) + req.Header.Set("X-Webpa-Device-Id", string(id)) + req.Header.Set("X-Webpa-Device-Name", string(id)) - if "" != secret { - s := hmac.New(sha1.New, []byte(secret)) - s.Write(payload) - sig := fmt.Sprintf("sha1=%s", hex.EncodeToString(s.Sum(nil))) - req.Header.Set("X-Webpa-Signature", sig) - } + // Apply the secret - // find the event "short name" - match := eventPattern.FindStringSubmatch(msg.Destination) - event := "unknown" - if match != nil { - event = match[1] - } + if "" != secret { + s := hmac.New(sha1.New, []byte(secret)) + s.Write(payload) + sig := fmt.Sprintf("sha1=%s", hex.EncodeToString(s.Sum(nil))) + req.Header.Set("X-Webpa-Signature", sig) + } - retryOptions := xhttp.RetryOptions{ - Logger: obs.logger, - Retries: obs.deliveryRetries, - Interval: obs.deliveryInterval, - Counter: obs.deliveryRetryCounter.With("url", obs.id, "event", event), - // Always retry on failures up to the max count. - ShouldRetry: func(error) bool { return true }, - } + // find the event "short name" + match := eventPattern.FindStringSubmatch(msg.Destination) + event := "unknown" + if match != nil { + event = match[1] + } - // Send it - resp, err := xhttp.RetryTransactor(retryOptions, obs.sender)(req) - if nil != err { - // Report failure - obs.deliveryCounter.With("url", obs.id, "code", "failure", "event", event).Add(1.0) - obs.droppedNetworkErrCounter.Add(1.0) - } else { - // Report Result - obs.deliveryCounter.With("url", obs.id, "code", strconv.Itoa(resp.StatusCode), "event", event).Add(1.0) - - // read until the response is complete before closing to allow - // connection reuse - if nil != resp.Body { - io.Copy(ioutil.Discard, resp.Body) - resp.Body.Close() - } + retryOptions := xhttp.RetryOptions{ + Logger: obs.logger, + Retries: obs.deliveryRetries, + Interval: obs.deliveryInterval, + Counter: obs.deliveryRetryCounter.With("url", obs.id, "event", event), + // Always retry on failures up to the max count. + ShouldRetry: func(error) bool { return true }, + } + + // Send it + resp, err := xhttp.RetryTransactor(retryOptions, obs.sender)(req) + code := "failure" + if nil != err { + // Report failure + obs.droppedNetworkErrCounter.Add(1.0) + } else { + // Report Result + code = strconv.Itoa(resp.StatusCode) + + // read until the response is complete before closing to allow + // connection reuse + if nil != resp.Body { + io.Copy(ioutil.Discard, resp.Body) + resp.Body.Close() } } + obs.deliveryCounter.With("url", obs.id, "code", code, "event", event).Add(1.0) } // queueOverflow handles the logic of what to do when a queue overflows @@ -548,47 +563,49 @@ func (obs *CaduceusOutboundSender) queueOverflow() { if nil != err { errorLog.Log(logging.MessageKey(), "Cut-off notification json.Marshall failed", "failureMessage", obs.failureMsg, "for", obs.id, logging.ErrorKey(), err) - } else { - errorLog.Log(logging.MessageKey(), "Cut-off notification", "failureMessage", msg, "for", obs.id) + return + } + errorLog.Log(logging.MessageKey(), "Cut-off notification", "failureMessage", msg, "for", obs.id) - // Send a "you've been cut off" warning message - if "" != failureURL { + // Send a "you've been cut off" warning message + if "" == failureURL { + errorLog.Log(logging.MessageKey(), "No cut-off notification URL specified", "for", obs.id) + return + } - payload := bytes.NewReader(msg) - req, err := http.NewRequest("POST", failureURL, payload) - if nil != err { - // Failure - errorLog.Log(logging.MessageKey(), "Unable to send cut-off notification", "notification", - failureURL, "for", obs.id, logging.ErrorKey(), err) - } else { - req.Header.Set("Content-Type", "application/json") + payload := bytes.NewReader(msg) + req, err := http.NewRequest("POST", failureURL, payload) + if nil != err { + // Failure + errorLog.Log(logging.MessageKey(), "Unable to send cut-off notification", "notification", + failureURL, "for", obs.id, logging.ErrorKey(), err) + return + } + req.Header.Set("Content-Type", "application/json") - if "" != secret { - h := hmac.New(sha1.New, []byte(secret)) - h.Write(msg) - sig := fmt.Sprintf("sha1=%s", hex.EncodeToString(h.Sum(nil))) - req.Header.Set("X-Webpa-Signature", sig) - } + if "" != secret { + h := hmac.New(sha1.New, []byte(secret)) + h.Write(msg) + sig := fmt.Sprintf("sha1=%s", hex.EncodeToString(h.Sum(nil))) + req.Header.Set("X-Webpa-Signature", sig) + } - resp, err := obs.sender(req) - if nil != err { - // Failure - errorLog.Log(logging.MessageKey(), "Unable to send cut-off notification", "notification", - failureURL, "for", obs.id, logging.ErrorKey(), err) - } else { - if nil == resp { - // Failure - errorLog.Log(logging.MessageKey(), "Unable to send cut-off notification, nil response", - "notification", failureURL) - } else { - // Success - logging.Info(obs.logger).Log("Able to send cut-off notification", "url", failureURL, - "status", resp.Status) - } - } - } - } else { - errorLog.Log(logging.MessageKey(), "No cut-off notification URL specified", "for", obs.id) - } + resp, err := obs.sender(req) + if nil != err { + // Failure + errorLog.Log(logging.MessageKey(), "Unable to send cut-off notification", "notification", + failureURL, "for", obs.id, logging.ErrorKey(), err) + return } + + if nil == resp { + // Failure + errorLog.Log(logging.MessageKey(), "Unable to send cut-off notification, nil response", + "notification", failureURL) + return + } + + // Success + logging.Info(obs.logger).Log("Able to send cut-off notification", "url", failureURL, + "status", resp.Status) }