Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: simplify Sender #573

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 41 additions & 49 deletions internal/sink/sinkSender.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,7 @@ type FailureMessage struct {
Workers int `json:"worker_count"`
}

type Sender interface {
Update(ancla.Register) error
Shutdown(bool)
RetiredSince() (time.Time, error)
Queue(*wrp.Message)
}
type sender struct {
type Sender struct {
id string
queueSize int
deliveryRetries int
Expand Down Expand Up @@ -94,11 +88,12 @@ type SinkMetrics struct {
currentWorkersGauge prometheus.Gauge
}

func NewSender(w *wrapper, l ancla.Register) (s *sender, err error) {
func NewSender(w *wrapper, l ancla.Register) (s *Sender, err error) {

if w.clientMiddleware == nil {
w.clientMiddleware = client.NopClient
}

if w.client == nil {
err = errors.New("nil Client")
return
Expand All @@ -115,13 +110,13 @@ func NewSender(w *wrapper, l ancla.Register) (s *sender, err error) {
}
id := l.GetId()

s = &sender{
s = &Sender{
id: id,
listener: l,
queueSize: w.config.QueueSizePerSender,
deliverUntil: l.GetUntil(),
logger: w.logger,
config: w.config, //TODO: need to figure out which config options are used for just sender, just sink, and both
config: w.config, //TODO: need to figure out which config options are used for just Sender, just sink, and both
// dropUntil: where is this being set in old caduceus?,
cutOffPeriod: w.config.CutOffPeriod,
deliveryRetries: w.config.DeliveryRetries,
Expand Down Expand Up @@ -158,7 +153,7 @@ func NewSender(w *wrapper, l ancla.Register) (s *sender, err error) {
return
}

func (s *sender) Update(l ancla.Register) (err error) {
func (s *Sender) Update(l ancla.Register) (err error) {
s.matcher, err = NewMatcher(l, s.logger)
s.sink = NewSink(s.config, s.logger, l)

Expand All @@ -185,7 +180,7 @@ func (s *sender) Update(l ancla.Register) (err error) {
// of messages to deliver. The request is checked to see if it matches the
// criteria before being accepted or silently dropped.
// TODO: can pass in message along with webhook information
func (s *sender) Queue(msg *wrp.Message) {
func (s *Sender) Queue(msg *wrp.Message) {
s.mutex.RLock()
deliverUntil := s.deliverUntil
dropUntil := s.dropUntil
Expand Down Expand Up @@ -231,7 +226,7 @@ func (s *sender) Queue(msg *wrp.Message) {
// Shutdown causes the CaduceusOutboundSender to stop its activities either gently or
// abruptly based on the gentle parameter. If gentle is false, all queued
// messages will be dropped without an attempt to send made.
func (s *sender) Shutdown(gentle bool) {
func (s *Sender) Shutdown(gentle bool) {
if !gentle {
// need to close the channel we're going to replace, in case it doesn't
// have any events in it.
Expand All @@ -250,7 +245,7 @@ func (s *sender) Shutdown(gentle bool) {

// RetiredSince returns the time the CaduceusOutboundSender retired (which could be in
// the future).
func (s *sender) RetiredSince() (time.Time, error) {
func (s *Sender) RetiredSince() (time.Time, error) {
s.mutex.RLock()
defer s.mutex.RUnlock()
deliverUntil := s.deliverUntil
Expand All @@ -272,7 +267,7 @@ func overlaps(sl1 []string, sl2 []string) bool {
return false
}

func (s *sender) isValidTimeWindow(now, dropUntil, deliverUntil time.Time) bool {
func (s *Sender) isValidTimeWindow(now, dropUntil, deliverUntil time.Time) bool {
if !now.After(dropUntil) {
// client was cut off
s.droppedCutoffCounter.Add(1.0)
Expand All @@ -292,7 +287,7 @@ func (s *sender) isValidTimeWindow(now, dropUntil, deliverUntil time.Time) bool
// a fresh one, counting any current messages in the queue as dropped.
// It should never close a queue, as a queue not referenced anywhere will be
// cleaned up by the garbage collector without needing to be closed.
func (s *sender) Empty(droppedCounter prometheus.Counter) {
func (s *Sender) Empty(droppedCounter prometheus.Counter) {
droppedMsgs := s.queue.Load().(chan *wrp.Message)
s.queue.Store(make(chan *wrp.Message, s.queueSize))
droppedCounter.Add(float64(len(droppedMsgs)))
Expand All @@ -302,7 +297,7 @@ func (s *sender) Empty(droppedCounter prometheus.Counter) {
// queueOverflow handles the logic of what to do when a queue overflows:
// cutting off the webhook for a time and sending a cut off notification
// to the failure URL.
func (s *sender) queueOverflow() {
func (s *Sender) queueOverflow() {
s.mutex.Lock()
if time.Now().Before(s.dropUntil) {
s.mutex.Unlock()
Expand Down Expand Up @@ -377,7 +372,7 @@ func (s *sender) queueOverflow() {
}
}

func (s *sender) dispatcher() {
func (s *Sender) dispatcher() {
defer s.wg.Done()
var (
msg *wrp.Message
Expand All @@ -390,8 +385,6 @@ Loop:
// Always pull a new queue in case we have been cutoff or are shutting
// down.
msgQueue := s.queue.Load().(chan *wrp.Message)
// nolint:gosimple
select {
// The dispatcher cannot get stuck blocking here forever (caused by an
// empty queue that is replaced and then Queue() starts adding to the
// new queue) because:
Expand All @@ -410,42 +403,41 @@ Loop:
// - If the first queue has messages, we drop a message as expired
// pull in the new queue which is empty and closed, break the
// loop, gather workers, and exit.
case msg, ok = <-msgQueue:
// This is only true when a queue is empty and closed, which for us
// only happens on Shutdown().
if !ok {
break Loop
}
s.queueDepthGauge.Add(-1.0)
s.mutex.RLock()
deliverUntil := s.deliverUntil
dropUntil := s.dropUntil
// secret = s.listener.Webhook.Config.Secret
// accept = s.listener.Webhook.Config.ContentType
s.mutex.RUnlock()

now := time.Now()

if now.Before(dropUntil) {
s.droppedCutoffCounter.Add(1.0)
continue
}
if now.After(deliverUntil) {
s.Empty(s.droppedExpiredCounter)
continue
}
s.workers.Acquire()
s.currentWorkersGauge.Add(1.0)

go s.sink.Send(secret, accept, msg)
msg, ok = <-msgQueue
// This is only true when a queue is empty and closed, which for us
// only happens on Shutdown().
if !ok {
break Loop
}
s.queueDepthGauge.Add(-1.0)
s.mutex.RLock()
deliverUntil := s.deliverUntil
dropUntil := s.dropUntil
// secret = s.listener.Webhook.Config.Secret
// accept = s.listener.Webhook.Config.ContentType
s.mutex.RUnlock()

now := time.Now()

if now.Before(dropUntil) {
s.droppedCutoffCounter.Add(1.0)
continue
}
if now.After(deliverUntil) {
s.Empty(s.droppedExpiredCounter)
continue
}
s.workers.Acquire()
s.currentWorkersGauge.Add(1.0)

go s.sink.Send(secret, accept, msg)
}
for i := 0; i < s.maxWorkers; i++ {
s.workers.Acquire()
}
}

func (s *sender) CreateMetrics(m metrics.Metrics) {
func (s *Sender) CreateMetrics(m metrics.Metrics) {
s.deliveryRetryCounter = m.DeliveryRetryCounter
s.deliveryRetryMaxGauge = m.DeliveryRetryMaxGauge.With(prometheus.Labels{metrics.UrlLabel: s.id})
s.cutOffCounter = m.CutOffCounter.With(prometheus.Labels{metrics.UrlLabel: s.id})
Expand Down
10 changes: 5 additions & 5 deletions internal/sink/sinkWrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type wrapper struct {
config Config

mutex sync.RWMutex
senders map[string]Sender
senders map[string]*Sender
eventType *prometheus.CounterVec
wg sync.WaitGroup
shutdown chan struct{}
Expand Down Expand Up @@ -111,7 +111,7 @@ func NewWrapper(in WrapperIn) (wr Wrapper, err error) {
w = nil
return
}
w.senders = make(map[string]Sender)
w.senders = make(map[string]*Sender)
w.shutdown = make(chan struct{})

w.wg.Add(1)
Expand Down Expand Up @@ -158,7 +158,7 @@ func (w *wrapper) Update(list []ancla.Register) {
for _, inValue := range ids {
sender, ok := w.senders[inValue.ID]
if !ok {
var ss Sender
var ss *Sender
var err error

listener := inValue.Listener
Expand Down Expand Up @@ -238,12 +238,12 @@ func undertaker(w *wrapper) {
}
}

func createDeadlist(w *wrapper, threshold time.Time) (map[string]Sender, error) {
func createDeadlist(w *wrapper, threshold time.Time) (map[string]*Sender, error) {
if w == nil || threshold.IsZero() {
return nil, nil
}

deadList := make(map[string]Sender)
deadList := make(map[string]*Sender)
w.mutex.Lock()
defer w.mutex.Unlock()
for k, v := range w.senders {
Expand Down
Loading