Skip to content

Commit

Permalink
Caduceus queue (#193)
Browse files Browse the repository at this point in the history
* updated changlog

* updated changelog

* changes to queue

* queue full check

* code changes

* fixes

* atomic changes

* atomic queue changes

* cleanup

* cleanup

* metric change

* getting updated queue

* cleanup

* emptying queue when no longer can insert in queueOverflow func

* cleanup

* updated changelog

* cleanup

* new version
  • Loading branch information
gargidb authored Apr 16, 2020
1 parent f818b52 commit 74f91d7
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 49 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

## [Unreleased]

## [v0.2.6]
- reduced time from when cutoff is sent to when queue is emptied

## [v0.2.5]
- fix emptying queue when received cutoff [#188](https://github.com/xmidt-org/caduceus/issues/188)
- add queue full check to prevent push event into queue if already full [#189](https://github.com/xmidt-org/caduceus/issues/189)
Expand Down Expand Up @@ -79,7 +82,8 @@ fixed build upload
### Added
- Initial creation

[Unreleased]: https://github.com/Comcast/caduceus/compare/v0.2.5...HEAD
[Unreleased]: https://github.com/Comcast/caduceus/compare/v0.2.6...HEAD
[v0.2.6]: https://github.com/Comcast/caduceus/compare/v0.2.5...v0.2.6
[v0.2.5]: https://github.com/Comcast/caduceus/compare/v0.2.4...v0.2.5
[v0.2.4]: https://github.com/Comcast/caduceus/compare/v0.2.3...v0.2.4
[v0.2.3]: https://github.com/Comcast/caduceus/compare/v0.2.2...v0.2.3
Expand Down
99 changes: 51 additions & 48 deletions outboundSender.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/go-kit/kit/log"
Expand Down Expand Up @@ -124,7 +125,6 @@ type CaduceusOutboundSender struct {
events []*regexp.Regexp
matcher []*regexp.Regexp
queueSize int
queue chan *wrp.Message
deliveryRetries int
deliveryInterval time.Duration
deliveryCounter metrics.Counter
Expand Down Expand Up @@ -154,6 +154,7 @@ type CaduceusOutboundSender struct {
logger log.Logger
mutex sync.RWMutex
queueEmpty bool
queue atomic.Value
}

// New creates a new OutboundSender object from the factory, or returns an error.
Expand Down Expand Up @@ -203,9 +204,7 @@ func (osf OutboundSenderFactory) New() (obs OutboundSender, err error) {

CreateOutbounderMetrics(osf.MetricsRegistry, caduceusOutboundSender)

// Give us some head room so that we don't block when we get near the
// completely full point.
caduceusOutboundSender.queue = make(chan *wrp.Message, osf.QueueSize)
caduceusOutboundSender.queue.Store(make(chan *wrp.Message, osf.QueueSize))

if err = caduceusOutboundSender.Update(osf.Listener); nil != err {
return
Expand Down Expand Up @@ -331,8 +330,7 @@ func (obs *CaduceusOutboundSender) Update(wh webhook.W) (err error) {
// abruptly based on the gentle parameter. If gentle is false, all queued
// messages will be dropped without an attempt to send made.
func (obs *CaduceusOutboundSender) Shutdown(gentle bool) {
close(obs.queue)

close(obs.queue.Load().(chan *wrp.Message))
obs.mutex.Lock()
if false == gentle {
obs.deliverUntil = time.Time{}
Expand All @@ -353,7 +351,6 @@ func (obs *CaduceusOutboundSender) RetiredSince() time.Time {
obs.mutex.RLock()
deliverUntil := obs.deliverUntil
obs.mutex.RUnlock()

return deliverUntil
}

Expand Down Expand Up @@ -417,7 +414,7 @@ func (obs *CaduceusOutboundSender) Queue(msg *wrp.Message) {
*/
if matchDevice {
select {
case obs.queue <- msg:
case obs.queue.Load().(chan *wrp.Message) <- msg:
obs.queueDepthGauge.Add(1.0)
debugLog.Log(logging.MessageKey(), "WRP Sent to obs queue", "url", obs.id)
default:
Expand Down Expand Up @@ -449,54 +446,58 @@ func (obs *CaduceusOutboundSender) isValidTimeWindow(now, dropUntil, deliverUnti
}

func (obs *CaduceusOutboundSender) Empty() {
for !obs.queueEmpty {
select {
case <-obs.queue:
obs.queueDepthGauge.Add(-1.0)
default:
obs.queueEmpty = true
}
}

obs.queue.Store(make(chan *wrp.Message, obs.queueSize))
obs.queueDepthGauge.Set(0.0)
obs.queueEmpty = true

return
}

func (obs *CaduceusOutboundSender) dispatcher() {
defer obs.wg.Done()
var (
msg *wrp.Message
urls *ring.Ring
secret, accept string
ok bool
)

for msg := range obs.queue {
obs.queueDepthGauge.Add(-1.0)
obs.mutex.RLock()
if !obs.queueEmpty {
obs.Empty()
}
urls := obs.urls
// Move to the next URL to try 1st the next time.
obs.urls = obs.urls.Next()

deliverUntil := obs.deliverUntil
dropUntil := obs.dropUntil
secret := obs.listener.Config.Secret
accept := obs.listener.Config.ContentType
obs.mutex.RUnlock()

now := time.Now()

if now.Before(dropUntil) {
obs.droppedCutoffCounter.Add(1.0)
continue
}
if now.After(deliverUntil) {
Loop:
for {
msgQueue := obs.queue.Load().(chan *wrp.Message)
select {
case msg, ok = <-msgQueue:
if !ok {
break Loop
}
obs.queueDepthGauge.Add(-1.0)
obs.mutex.RLock()
urls = obs.urls
// Move to the next URL to try 1st the next time.
obs.urls = obs.urls.Next()
deliverUntil := obs.deliverUntil
dropUntil := obs.dropUntil
secret = obs.listener.Config.Secret
accept = obs.listener.Config.ContentType
obs.mutex.RUnlock()

now := time.Now()

if now.Before(dropUntil) {
obs.droppedCutoffCounter.Add(1.0)
break Loop
}
if now.After(deliverUntil) {
obs.droppedExpiredCounter.Add(1.0)
continue
}
obs.workers.Acquire()
obs.currentWorkersGauge.Add(1.0)

obs.droppedExpiredCounter.Add(1.0)
continue
go obs.send(urls, secret, accept, msg)
}
obs.workers.Acquire()
obs.currentWorkersGauge.Add(1.0)

go obs.send(urls, secret, accept, msg)
}

// Grab all the workers to make sure they are done.
for i := 0; i < obs.maxWorkers; i++ {
obs.workers.Acquire()
}
Expand Down Expand Up @@ -621,6 +622,7 @@ func (obs *CaduceusOutboundSender) queueOverflow() {
obs.mutex.Unlock()
return
}
obs.queueEmpty = false
obs.dropUntil = time.Now().Add(obs.cutOffPeriod)
obs.dropUntilGauge.Set(float64(obs.dropUntil.Unix()))
secret := obs.listener.Config.Secret
Expand All @@ -633,10 +635,11 @@ func (obs *CaduceusOutboundSender) queueOverflow() {
errorLog = logging.Error(obs.logger)
)

obs.queueEmpty = false
obs.cutOffCounter.Add(1.0)
debugLog.Log(logging.MessageKey(), "Queue overflowed", "url", obs.id)

obs.Empty()

msg, err := json.Marshal(failureMsg)
if nil != err {
errorLog.Log(logging.MessageKey(), "Cut-off notification json.Marshall failed", "failureMessage", obs.failureMsg,
Expand Down

0 comments on commit 74f91d7

Please sign in to comment.