Skip to content

Commit

Permalink
Caduceus queue (#190)
Browse files Browse the repository at this point in the history
* updated changlog

* updated changelog

* changes to queue

* queue full check

* code changes

* fixes
  • Loading branch information
gargidb authored Apr 1, 2020
1 parent 9803961 commit 472b2b8
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 29 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [Unreleased]
- fix emptying queue when received cutoff [#188](https://github.com/xmidt-org/caduceus/issues/188)
- add queue full check to prevent push event into queue if already full [#189](https://github.com/xmidt-org/caduceus/issues/189)

## [v0.2.4]
- added docker automation [#184](https://github.com/xmidt-org/caduceus/pull/184)
Expand Down
73 changes: 44 additions & 29 deletions outboundSender.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ type CaduceusOutboundSender struct {
failureMsg FailureMessage
logger log.Logger
mutex sync.RWMutex
queueEmpty bool
}

// New creates a new OutboundSender object from the factory, or returns an error.
Expand Down Expand Up @@ -194,6 +195,7 @@ func (osf OutboundSenderFactory) New() (obs OutboundSender, err error) {
QueueSize: osf.QueueSize,
Workers: osf.NumWorkers,
},
queueEmpty: true,
}

// Don't share the secret with others when there is an error.
Expand Down Expand Up @@ -392,46 +394,44 @@ func (obs *CaduceusOutboundSender) Queue(msg *wrp.Message) {
}
}
/*
// if the device id matches then we want to look through all the metadata
// and make sure that the obs metadata matches the metadata provided
if matchDevice {
for key, val := range metaData {
if matchers, ok := matcher[key]; ok {
for _, deviceRegex := range matchers {
matchDevice = false
if deviceRegex.MatchString(val) {
matchDevice = true
break
}
}
// metadata was provided but did not match our expectations,
// so it is time to drop the message
if !matchDevice {
break
}
}
}
}
// if the device id matches then we want to look through all the metadata
// and make sure that the obs metadata matches the metadata provided
if matchDevice {
for key, val := range metaData {
if matchers, ok := matcher[key]; ok {
for _, deviceRegex := range matchers {
matchDevice = false
if deviceRegex.MatchString(val) {
matchDevice = true
break
}
}
// metadata was provided but did not match our expectations,
// so it is time to drop the message
if !matchDevice {
break
}
}
}
}
*/
if matchDevice {
if len(obs.queue) < obs.queueSize {
select {
case obs.queue <- msg:
obs.queueDepthGauge.Add(1.0)
obs.queue <- msg
debugLog.Log(logging.MessageKey(), "WRP Sent to obs queue", "url", obs.id)
// a regex was matched, no need to check further matches
break
default:
obs.queueOverflow()
obs.droppedQueueFullCounter.Add(1.0)
}
obs.queueOverflow()
obs.droppedQueueFullCounter.Add(1.0)
}
}
}

func (obs *CaduceusOutboundSender) isValidTimeWindow(now, dropUntil, deliverUntil time.Time) bool {
var debugLog = logging.Debug(obs.logger)

if false == now.After(dropUntil) {
if false == now.After(dropUntil) || !obs.queueEmpty {
debugLog.Log(logging.MessageKey(), "Client has been cut off",
"now", now, "before", deliverUntil, "after", dropUntil)
obs.droppedCutoffCounter.Add(1.0)
Expand All @@ -448,13 +448,27 @@ func (obs *CaduceusOutboundSender) isValidTimeWindow(now, dropUntil, deliverUnti
return true
}

func (obs *CaduceusOutboundSender) Empty() {
for !obs.queueEmpty {
select {
case <-obs.queue:
obs.queueDepthGauge.Add(-1.0)
default:
obs.queueEmpty = true
}
}
return
}

func (obs *CaduceusOutboundSender) dispatcher() {
defer obs.wg.Done()

for msg := range obs.queue {
obs.queueDepthGauge.Add(-1.0)

obs.mutex.RLock()
if !obs.queueEmpty {
obs.Empty()
}
urls := obs.urls
// Move to the next URL to try 1st the next time.
obs.urls = obs.urls.Next()
Expand Down Expand Up @@ -619,6 +633,7 @@ func (obs *CaduceusOutboundSender) queueOverflow() {
errorLog = logging.Error(obs.logger)
)

obs.queueEmpty = false
obs.cutOffCounter.Add(1.0)
debugLog.Log(logging.MessageKey(), "Queue overflowed", "url", obs.id)

Expand Down

0 comments on commit 472b2b8

Please sign in to comment.