diff --git a/pkg/audit/webhook/webhook.go b/pkg/audit/webhook/webhook.go index 56ef23ee..7f68e942 100644 --- a/pkg/audit/webhook/webhook.go +++ b/pkg/audit/webhook/webhook.go @@ -40,10 +40,18 @@ func init() { type options struct { enable bool + + subscriberQueueCapacity int } func (options *options) Setup(fs *pflag.FlagSet) { fs.BoolVar(&options.enable, "audit-webhook-enable", false, "enable audit webhook") + fs.BoolVar( + &options.enable, + "audit-webhook-subscriber-queue-capacity", + false, + "maximum number of events buffered in the webhook, set to 0 for an unbounded buffer (may cause OOM)", + ) } func (options *options) EnableFlag() *bool { return &options.enable } @@ -76,7 +84,7 @@ type webhook struct { type namedQueue[T any] struct { name string - queue *channel.UnboundedQueue[T] + queue channel.Queue[T] } type requestMetric struct { @@ -184,7 +192,13 @@ func (webhook *webhook) Close(ctx context.Context) error { } func (webhook *webhook) AddSubscriber(name string) <-chan *audit.Message { - queue := channel.NewUnboundedQueue[*audit.Message](1) + var queue channel.Queue[*audit.Message] + if capacity := webhook.options.subscriberQueueCapacity; capacity != 0 { + queue = make(channel.BoundedQueue[*audit.Message], capacity) + } else { + queue = channel.NewUnboundedQueue[*audit.Message](1) + } + channel.InitMetricLoop(queue, webhook.Metrics, &queueMetricTags{Name: name}) webhook.subscribers = append(webhook.subscribers, namedQueue[*audit.Message]{name: name, queue: queue}) @@ -192,7 +206,13 @@ func (webhook *webhook) AddSubscriber(name string) <-chan *audit.Message { } func (webhook *webhook) AddRawSubscriber(name string) <-chan *audit.RawMessage { - queue := channel.NewUnboundedQueue[*audit.RawMessage](1) + var queue channel.Queue[*audit.RawMessage] + if capacity := webhook.options.subscriberQueueCapacity; capacity != 0 { + queue = make(channel.BoundedQueue[*audit.RawMessage], capacity) + } else { + queue = channel.NewUnboundedQueue[*audit.RawMessage](1) + } + channel.InitMetricLoop(queue, webhook.Metrics, &queueMetricTags{Name: name}) webhook.rawSubscribers = append(webhook.rawSubscribers, namedQueue[*audit.RawMessage]{name: name, queue: queue}) diff --git a/pkg/util/channel/channel.go b/pkg/util/channel/channel.go index 8796bccd..dc608c07 100644 --- a/pkg/util/channel/channel.go +++ b/pkg/util/channel/channel.go @@ -26,12 +26,52 @@ import ( "github.com/kubewharf/kelemetry/pkg/util/shutdown" ) +type Queue[T any] interface { + Send(obj T) + Receiver() <-chan T + + Length() int + GetAndResetLength() int + + Close() +} + +type BoundedQueue[T any] chan T + +func (ch BoundedQueue[T]) Length() int { + return len(ch) +} + +func (ch BoundedQueue[T]) GetAndResetLength() int { + return len(ch) // for simplicity we just return the channel length +} + +func (ch BoundedQueue[T]) Receiver() <-chan T { + return ch +} + +func (ch BoundedQueue[T]) Send(obj T) { + ch <- obj +} + +func (ch BoundedQueue[T]) Close() { + close(ch) +} + +func _[T any](uq BoundedQueue[T]) Queue[T] { + return uq +} + // UnboundedQueue is an unbounded channel. type UnboundedQueue[T any] struct { deque *Deque[T] notifier chan<- struct{} receiver <-chan T - Close context.CancelFunc + closeFn context.CancelFunc +} + +func _[T any](uq *UnboundedQueue[T]) Queue[T] { + return uq } // Creates a new UnboundedQueue with the specified initial capacity. @@ -48,7 +88,7 @@ func NewUnboundedQueue[T any](initialCapacity int) *UnboundedQueue[T] { deque: deque, notifier: notifier, receiver: receiver, - Close: cancelFunc, + closeFn: cancelFunc, } } @@ -61,8 +101,16 @@ func (uq *UnboundedQueue[T]) Length() int { return uq.deque.Len() } -func InitMetricLoop[T any, TagsT metrics.Tags](uq *UnboundedQueue[T], metricsClient metrics.Client, tags TagsT) { - metrics.NewMonitor(metricsClient, tags, func() float64 { return float64(uq.deque.GetAndResetLength()) }) +func (uq *UnboundedQueue[T]) Close() { + uq.closeFn() +} + +func (uq *UnboundedQueue[T]) GetAndResetLength() int { + return uq.deque.GetAndResetLength() +} + +func InitMetricLoop[T any, TagsT metrics.Tags](uq Queue[T], metricsClient metrics.Client, tags TagsT) { + metrics.NewMonitor(metricsClient, tags, func() float64 { return float64(uq.Length()) }) } // Sends an item to the queue.