diff --git a/CHANGELOG.md b/CHANGELOG.md index 9093bf0e..2affe57f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] +## [v0.2.6] +- reduced time from when cutoff is sent to when queue is emptied + ## [v0.2.5] - fix emptying queue when received cutoff [#188](https://github.com/xmidt-org/caduceus/issues/188) - add queue full check to prevent push event into queue if already full [#189](https://github.com/xmidt-org/caduceus/issues/189) @@ -79,7 +82,8 @@ fixed build upload ### Added - Initial creation -[Unreleased]: https://github.com/Comcast/caduceus/compare/v0.2.5...HEAD +[Unreleased]: https://github.com/Comcast/caduceus/compare/v0.2.6...HEAD +[v0.2.6]: https://github.com/Comcast/caduceus/compare/v0.2.5...v0.2.6 [v0.2.5]: https://github.com/Comcast/caduceus/compare/v0.2.4...v0.2.5 [v0.2.4]: https://github.com/Comcast/caduceus/compare/v0.2.3...v0.2.4 [v0.2.3]: https://github.com/Comcast/caduceus/compare/v0.2.2...v0.2.3 diff --git a/outboundSender.go b/outboundSender.go index 95a7fc21..e2970d81 100644 --- a/outboundSender.go +++ b/outboundSender.go @@ -34,6 +34,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/go-kit/kit/log" @@ -124,7 +125,6 @@ type CaduceusOutboundSender struct { events []*regexp.Regexp matcher []*regexp.Regexp queueSize int - queue chan *wrp.Message deliveryRetries int deliveryInterval time.Duration deliveryCounter metrics.Counter @@ -154,6 +154,7 @@ type CaduceusOutboundSender struct { logger log.Logger mutex sync.RWMutex queueEmpty bool + queue atomic.Value } // New creates a new OutboundSender object from the factory, or returns an error. @@ -203,9 +204,7 @@ func (osf OutboundSenderFactory) New() (obs OutboundSender, err error) { CreateOutbounderMetrics(osf.MetricsRegistry, caduceusOutboundSender) - // Give us some head room so that we don't block when we get near the - // completely full point. - caduceusOutboundSender.queue = make(chan *wrp.Message, osf.QueueSize) + caduceusOutboundSender.queue.Store(make(chan *wrp.Message, osf.QueueSize)) if err = caduceusOutboundSender.Update(osf.Listener); nil != err { return @@ -331,8 +330,7 @@ func (obs *CaduceusOutboundSender) Update(wh webhook.W) (err error) { // abruptly based on the gentle parameter. If gentle is false, all queued // messages will be dropped without an attempt to send made. func (obs *CaduceusOutboundSender) Shutdown(gentle bool) { - close(obs.queue) - + close(obs.queue.Load().(chan *wrp.Message)) obs.mutex.Lock() if false == gentle { obs.deliverUntil = time.Time{} @@ -353,7 +351,6 @@ func (obs *CaduceusOutboundSender) RetiredSince() time.Time { obs.mutex.RLock() deliverUntil := obs.deliverUntil obs.mutex.RUnlock() - return deliverUntil } @@ -417,7 +414,7 @@ func (obs *CaduceusOutboundSender) Queue(msg *wrp.Message) { */ if matchDevice { select { - case obs.queue <- msg: + case obs.queue.Load().(chan *wrp.Message) <- msg: obs.queueDepthGauge.Add(1.0) debugLog.Log(logging.MessageKey(), "WRP Sent to obs queue", "url", obs.id) default: @@ -449,54 +446,58 @@ func (obs *CaduceusOutboundSender) isValidTimeWindow(now, dropUntil, deliverUnti } func (obs *CaduceusOutboundSender) Empty() { - for !obs.queueEmpty { - select { - case <-obs.queue: - obs.queueDepthGauge.Add(-1.0) - default: - obs.queueEmpty = true - } - } + + obs.queue.Store(make(chan *wrp.Message, obs.queueSize)) + obs.queueDepthGauge.Set(0.0) + obs.queueEmpty = true + return } func (obs *CaduceusOutboundSender) dispatcher() { defer obs.wg.Done() + var ( + msg *wrp.Message + urls *ring.Ring + secret, accept string + ok bool + ) - for msg := range obs.queue { - obs.queueDepthGauge.Add(-1.0) - obs.mutex.RLock() - if !obs.queueEmpty { - obs.Empty() - } - urls := obs.urls - // Move to the next URL to try 1st the next time. - obs.urls = obs.urls.Next() - - deliverUntil := obs.deliverUntil - dropUntil := obs.dropUntil - secret := obs.listener.Config.Secret - accept := obs.listener.Config.ContentType - obs.mutex.RUnlock() - - now := time.Now() - - if now.Before(dropUntil) { - obs.droppedCutoffCounter.Add(1.0) - continue - } - if now.After(deliverUntil) { +Loop: + for { + msgQueue := obs.queue.Load().(chan *wrp.Message) + select { + case msg, ok = <-msgQueue: + if !ok { + break Loop + } + obs.queueDepthGauge.Add(-1.0) + obs.mutex.RLock() + urls = obs.urls + // Move to the next URL to try 1st the next time. + obs.urls = obs.urls.Next() + deliverUntil := obs.deliverUntil + dropUntil := obs.dropUntil + secret = obs.listener.Config.Secret + accept = obs.listener.Config.ContentType + obs.mutex.RUnlock() + + now := time.Now() + + if now.Before(dropUntil) { + obs.droppedCutoffCounter.Add(1.0) + break Loop + } + if now.After(deliverUntil) { + obs.droppedExpiredCounter.Add(1.0) + continue + } + obs.workers.Acquire() + obs.currentWorkersGauge.Add(1.0) - obs.droppedExpiredCounter.Add(1.0) - continue + go obs.send(urls, secret, accept, msg) } - obs.workers.Acquire() - obs.currentWorkersGauge.Add(1.0) - - go obs.send(urls, secret, accept, msg) } - - // Grab all the workers to make sure they are done. for i := 0; i < obs.maxWorkers; i++ { obs.workers.Acquire() } @@ -621,6 +622,7 @@ func (obs *CaduceusOutboundSender) queueOverflow() { obs.mutex.Unlock() return } + obs.queueEmpty = false obs.dropUntil = time.Now().Add(obs.cutOffPeriod) obs.dropUntilGauge.Set(float64(obs.dropUntil.Unix())) secret := obs.listener.Config.Secret @@ -633,10 +635,11 @@ func (obs *CaduceusOutboundSender) queueOverflow() { errorLog = logging.Error(obs.logger) ) - obs.queueEmpty = false obs.cutOffCounter.Add(1.0) debugLog.Log(logging.MessageKey(), "Queue overflowed", "url", obs.id) + obs.Empty() + msg, err := json.Marshal(failureMsg) if nil != err { errorLog.Log(logging.MessageKey(), "Cut-off notification json.Marshall failed", "failureMessage", obs.failureMsg,