Skip to content

Commit

Permalink
Merge pull request #363 from SOF3/disable-unbounded-queue
Browse files Browse the repository at this point in the history
fix(audit/webhook): set a limit to the webhook queue length
  • Loading branch information
SOF3 authored Sep 30, 2024
2 parents 890b117 + e4d75a1 commit df250b2
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 7 deletions.
26 changes: 23 additions & 3 deletions pkg/audit/webhook/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,18 @@ func init() {

type options struct {
enable bool

subscriberQueueCapacity int
}

func (options *options) Setup(fs *pflag.FlagSet) {
fs.BoolVar(&options.enable, "audit-webhook-enable", false, "enable audit webhook")
fs.BoolVar(
&options.enable,
"audit-webhook-subscriber-queue-capacity",
false,
"maximum number of events buffered in the webhook, set to 0 for an unbounded buffer (may cause OOM)",
)
}

func (options *options) EnableFlag() *bool { return &options.enable }
Expand Down Expand Up @@ -76,7 +84,7 @@ type webhook struct {

type namedQueue[T any] struct {
name string
queue *channel.UnboundedQueue[T]
queue channel.Queue[T]
}

type requestMetric struct {
Expand Down Expand Up @@ -184,15 +192,27 @@ func (webhook *webhook) Close(ctx context.Context) error {
}

func (webhook *webhook) AddSubscriber(name string) <-chan *audit.Message {
queue := channel.NewUnboundedQueue[*audit.Message](1)
var queue channel.Queue[*audit.Message]
if capacity := webhook.options.subscriberQueueCapacity; capacity != 0 {
queue = make(channel.BoundedQueue[*audit.Message], capacity)
} else {
queue = channel.NewUnboundedQueue[*audit.Message](1)
}

channel.InitMetricLoop(queue, webhook.Metrics, &queueMetricTags{Name: name})

webhook.subscribers = append(webhook.subscribers, namedQueue[*audit.Message]{name: name, queue: queue})
return queue.Receiver()
}

func (webhook *webhook) AddRawSubscriber(name string) <-chan *audit.RawMessage {
queue := channel.NewUnboundedQueue[*audit.RawMessage](1)
var queue channel.Queue[*audit.RawMessage]
if capacity := webhook.options.subscriberQueueCapacity; capacity != 0 {
queue = make(channel.BoundedQueue[*audit.RawMessage], capacity)
} else {
queue = channel.NewUnboundedQueue[*audit.RawMessage](1)
}

channel.InitMetricLoop(queue, webhook.Metrics, &queueMetricTags{Name: name})

webhook.rawSubscribers = append(webhook.rawSubscribers, namedQueue[*audit.RawMessage]{name: name, queue: queue})
Expand Down
56 changes: 52 additions & 4 deletions pkg/util/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,52 @@ import (
"github.com/kubewharf/kelemetry/pkg/util/shutdown"
)

type Queue[T any] interface {
Send(obj T)
Receiver() <-chan T

Length() int
GetAndResetLength() int

Close()
}

type BoundedQueue[T any] chan T

func (ch BoundedQueue[T]) Length() int {
return len(ch)
}

func (ch BoundedQueue[T]) GetAndResetLength() int {
return len(ch) // for simplicity we just return the channel length
}

func (ch BoundedQueue[T]) Receiver() <-chan T {
return ch
}

func (ch BoundedQueue[T]) Send(obj T) {
ch <- obj
}

func (ch BoundedQueue[T]) Close() {
close(ch)
}

func _[T any](uq BoundedQueue[T]) Queue[T] {
return uq
}

// UnboundedQueue is an unbounded channel.
type UnboundedQueue[T any] struct {
deque *Deque[T]
notifier chan<- struct{}
receiver <-chan T
Close context.CancelFunc
closeFn context.CancelFunc
}

func _[T any](uq *UnboundedQueue[T]) Queue[T] {
return uq
}

// Creates a new UnboundedQueue with the specified initial capacity.
Expand All @@ -48,7 +88,7 @@ func NewUnboundedQueue[T any](initialCapacity int) *UnboundedQueue[T] {
deque: deque,
notifier: notifier,
receiver: receiver,
Close: cancelFunc,
closeFn: cancelFunc,
}
}

Expand All @@ -61,8 +101,16 @@ func (uq *UnboundedQueue[T]) Length() int {
return uq.deque.Len()
}

func InitMetricLoop[T any, TagsT metrics.Tags](uq *UnboundedQueue[T], metricsClient metrics.Client, tags TagsT) {
metrics.NewMonitor(metricsClient, tags, func() float64 { return float64(uq.deque.GetAndResetLength()) })
func (uq *UnboundedQueue[T]) Close() {
uq.closeFn()
}

func (uq *UnboundedQueue[T]) GetAndResetLength() int {
return uq.deque.GetAndResetLength()
}

func InitMetricLoop[T any, TagsT metrics.Tags](uq Queue[T], metricsClient metrics.Client, tags TagsT) {
metrics.NewMonitor(metricsClient, tags, func() float64 { return float64(uq.Length()) })
}

// Sends an item to the queue.
Expand Down

0 comments on commit df250b2

Please sign in to comment.