From 472b2b8eaa86e295aa321cb81ce26e05e712dadf Mon Sep 17 00:00:00 2001 From: gargidb <46933813+gargidb@users.noreply.github.com> Date: Wed, 1 Apr 2020 14:15:09 -0700 Subject: [PATCH] Caduceus queue (#190) * updated changlog * updated changelog * changes to queue * queue full check * code changes * fixes --- CHANGELOG.md | 2 ++ outboundSender.go | 73 ++++++++++++++++++++++++++++------------------- 2 files changed, 46 insertions(+), 29 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1c428bef..b1241f3b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). ## [Unreleased] +- fix emptying queue when received cutoff [#188](https://github.com/xmidt-org/caduceus/issues/188) +- add queue full check to prevent push event into queue if already full [#189](https://github.com/xmidt-org/caduceus/issues/189) ## [v0.2.4] - added docker automation [#184](https://github.com/xmidt-org/caduceus/pull/184) diff --git a/outboundSender.go b/outboundSender.go index cc2e01de..95a7fc21 100644 --- a/outboundSender.go +++ b/outboundSender.go @@ -153,6 +153,7 @@ type CaduceusOutboundSender struct { failureMsg FailureMessage logger log.Logger mutex sync.RWMutex + queueEmpty bool } // New creates a new OutboundSender object from the factory, or returns an error. @@ -194,6 +195,7 @@ func (osf OutboundSenderFactory) New() (obs OutboundSender, err error) { QueueSize: osf.QueueSize, Workers: osf.NumWorkers, }, + queueEmpty: true, } // Don't share the secret with others when there is an error. @@ -392,38 +394,36 @@ func (obs *CaduceusOutboundSender) Queue(msg *wrp.Message) { } } /* - // if the device id matches then we want to look through all the metadata - // and make sure that the obs metadata matches the metadata provided - if matchDevice { - for key, val := range metaData { - if matchers, ok := matcher[key]; ok { - for _, deviceRegex := range matchers { - matchDevice = false - if deviceRegex.MatchString(val) { - matchDevice = true - break - } - } - - // metadata was provided but did not match our expectations, - // so it is time to drop the message - if !matchDevice { - break - } - } - } - } + // if the device id matches then we want to look through all the metadata + // and make sure that the obs metadata matches the metadata provided + if matchDevice { + for key, val := range metaData { + if matchers, ok := matcher[key]; ok { + for _, deviceRegex := range matchers { + matchDevice = false + if deviceRegex.MatchString(val) { + matchDevice = true + break + } + } + // metadata was provided but did not match our expectations, + // so it is time to drop the message + if !matchDevice { + break + } + } + } + } */ if matchDevice { - if len(obs.queue) < obs.queueSize { + select { + case obs.queue <- msg: obs.queueDepthGauge.Add(1.0) - obs.queue <- msg debugLog.Log(logging.MessageKey(), "WRP Sent to obs queue", "url", obs.id) - // a regex was matched, no need to check further matches - break + default: + obs.queueOverflow() + obs.droppedQueueFullCounter.Add(1.0) } - obs.queueOverflow() - obs.droppedQueueFullCounter.Add(1.0) } } } @@ -431,7 +431,7 @@ func (obs *CaduceusOutboundSender) Queue(msg *wrp.Message) { func (obs *CaduceusOutboundSender) isValidTimeWindow(now, dropUntil, deliverUntil time.Time) bool { var debugLog = logging.Debug(obs.logger) - if false == now.After(dropUntil) { + if false == now.After(dropUntil) || !obs.queueEmpty { debugLog.Log(logging.MessageKey(), "Client has been cut off", "now", now, "before", deliverUntil, "after", dropUntil) obs.droppedCutoffCounter.Add(1.0) @@ -448,13 +448,27 @@ func (obs *CaduceusOutboundSender) isValidTimeWindow(now, dropUntil, deliverUnti return true } +func (obs *CaduceusOutboundSender) Empty() { + for !obs.queueEmpty { + select { + case <-obs.queue: + obs.queueDepthGauge.Add(-1.0) + default: + obs.queueEmpty = true + } + } + return +} + func (obs *CaduceusOutboundSender) dispatcher() { defer obs.wg.Done() for msg := range obs.queue { obs.queueDepthGauge.Add(-1.0) - obs.mutex.RLock() + if !obs.queueEmpty { + obs.Empty() + } urls := obs.urls // Move to the next URL to try 1st the next time. obs.urls = obs.urls.Next() @@ -619,6 +633,7 @@ func (obs *CaduceusOutboundSender) queueOverflow() { errorLog = logging.Error(obs.logger) ) + obs.queueEmpty = false obs.cutOffCounter.Add(1.0) debugLog.Log(logging.MessageKey(), "Queue overflowed", "url", obs.id)