Skip to content

Commit

Permalink
Introduce LogsFanout abstraction
Browse files Browse the repository at this point in the history
Signed-off-by: Paschalis Tsilias <[email protected]>
  • Loading branch information
tpaschalis committed Feb 6, 2023
1 parent 7927df7 commit b99e09f
Showing 1 changed file with 52 additions and 0 deletions.
52 changes: 52 additions & 0 deletions component/common/loki/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,55 @@ func AddLabelsMiddleware(additionalLabels model.LabelSet) EntryMiddleware {
})
})
}

// LogsFanout is an abstraction for fanning out log entries to multiple
// receivers.
type LogsFanout struct {
mut sync.RWMutex
receivers []LogsReceiver
}

// NewFanout creates new LogsFanout.
func NewFanout(r []LogsReceiver) *LogsFanout {
return &LogsFanout{
receivers: r,
}
}

// UpdateReceivers updates where LogsFanout should send entries to.
func (f *LogsFanout) UpdateReceivers(r []LogsReceiver) {
f.mut.Lock()
f.receivers = r
f.mut.Unlock()
}

// Send fans out entries to each receiver, blocking until the entry is through.
func (f *LogsFanout) Send(ctx context.Context, e Entry) {
f.mut.RLock()
receivers := f.receivers
f.mut.RUnlock()
for _, r := range receivers {
select {
case r <- e:
case <-ctx.Done():
return
}
}
}

// SendWithTimeout fans out entries to each receiver, waiting up to d time for
// blocked receivers.
func (f *LogsFanout) SendWithTimeout(ctx context.Context, e Entry, d time.Duration) {
f.mut.RLock()
receivers := f.receivers
f.mut.RUnlock()
for _, r := range receivers {
select {
case r <- e:
case <-ctx.Done():
return
case <-time.After(d):
return
}
}
}

0 comments on commit b99e09f

Please sign in to comment.