From 4b1d57f725ae5c059982a4282b05e46cb5fe8af0 Mon Sep 17 00:00:00 2001 From: chankyin Date: Wed, 25 Sep 2024 16:50:49 +0800 Subject: [PATCH 1/2] fix(audit/webhook): set a limit to the webhook queue length --- pkg/audit/webhook/webhook.go | 21 ++++++++++++-- pkg/util/channel/channel.go | 56 +++++++++++++++++++++++++++++++++--- 2 files changed, 70 insertions(+), 7 deletions(-) diff --git a/pkg/audit/webhook/webhook.go b/pkg/audit/webhook/webhook.go index 56ef23ee..99f12952 100644 --- a/pkg/audit/webhook/webhook.go +++ b/pkg/audit/webhook/webhook.go @@ -40,10 +40,13 @@ func init() { type options struct { enable bool + + subscriberQueueCapacity int } func (options *options) Setup(fs *pflag.FlagSet) { fs.BoolVar(&options.enable, "audit-webhook-enable", false, "enable audit webhook") + fs.BoolVar(&options.enable, "audit-webhook-subscriber-queue-capacity", false, "maximum number of events buffered in the webhook, set to 0 for an unbounded buffer (may cause OOM)") } func (options *options) EnableFlag() *bool { return &options.enable } @@ -76,7 +79,7 @@ type webhook struct { type namedQueue[T any] struct { name string - queue *channel.UnboundedQueue[T] + queue channel.Queue[T] } type requestMetric struct { @@ -184,7 +187,13 @@ func (webhook *webhook) Close(ctx context.Context) error { } func (webhook *webhook) AddSubscriber(name string) <-chan *audit.Message { - queue := channel.NewUnboundedQueue[*audit.Message](1) + var queue channel.Queue[*audit.Message] + if capacity := webhook.options.subscriberQueueCapacity; capacity != 0 { + queue = make(channel.BoundedQueue[*audit.Message], capacity) + } else { + queue = channel.NewUnboundedQueue[*audit.Message](1) + } + channel.InitMetricLoop(queue, webhook.Metrics, &queueMetricTags{Name: name}) webhook.subscribers = append(webhook.subscribers, namedQueue[*audit.Message]{name: name, queue: queue}) @@ -192,7 +201,13 @@ func (webhook *webhook) AddSubscriber(name string) <-chan *audit.Message { } func (webhook *webhook) AddRawSubscriber(name string) <-chan *audit.RawMessage { - queue := channel.NewUnboundedQueue[*audit.RawMessage](1) + var queue channel.Queue[*audit.RawMessage] + if capacity := webhook.options.subscriberQueueCapacity; capacity != 0 { + queue = make(channel.BoundedQueue[*audit.RawMessage], capacity) + } else { + queue = channel.NewUnboundedQueue[*audit.RawMessage](1) + } + channel.InitMetricLoop(queue, webhook.Metrics, &queueMetricTags{Name: name}) webhook.rawSubscribers = append(webhook.rawSubscribers, namedQueue[*audit.RawMessage]{name: name, queue: queue}) diff --git a/pkg/util/channel/channel.go b/pkg/util/channel/channel.go index 8796bccd..7a38214e 100644 --- a/pkg/util/channel/channel.go +++ b/pkg/util/channel/channel.go @@ -26,12 +26,52 @@ import ( "github.com/kubewharf/kelemetry/pkg/util/shutdown" ) +type Queue[T any] interface { + Send(obj T) + Receiver() <-chan T + + Length() int + GetAndResetLength() int + + Close() +} + +type BoundedQueue[T any] chan T + +func (ch BoundedQueue[T]) Length() int { + return len(ch) +} + +func (ch BoundedQueue[T]) GetAndResetLength() int { + return len(ch) // for simplicity we just return the channel length +} + +func (ch BoundedQueue[T]) Receiver() <-chan T{ + return ch +} + +func (ch BoundedQueue[T]) Send(obj T) { + ch <- obj +} + +func (ch BoundedQueue[T]) Close() { + close(ch) +} + +func _[T any](uq BoundedQueue[T]) Queue[T] { + return uq +} + // UnboundedQueue is an unbounded channel. type UnboundedQueue[T any] struct { deque *Deque[T] notifier chan<- struct{} receiver <-chan T - Close context.CancelFunc + closeFn context.CancelFunc +} + +func _[T any](uq *UnboundedQueue[T]) Queue[T] { + return uq } // Creates a new UnboundedQueue with the specified initial capacity. @@ -48,7 +88,7 @@ func NewUnboundedQueue[T any](initialCapacity int) *UnboundedQueue[T] { deque: deque, notifier: notifier, receiver: receiver, - Close: cancelFunc, + closeFn: cancelFunc, } } @@ -61,8 +101,16 @@ func (uq *UnboundedQueue[T]) Length() int { return uq.deque.Len() } -func InitMetricLoop[T any, TagsT metrics.Tags](uq *UnboundedQueue[T], metricsClient metrics.Client, tags TagsT) { - metrics.NewMonitor(metricsClient, tags, func() float64 { return float64(uq.deque.GetAndResetLength()) }) +func (uq *UnboundedQueue[T]) Close() { + uq.closeFn() +} + +func (uq *UnboundedQueue[T]) GetAndResetLength() int { + return uq.deque.GetAndResetLength() +} + +func InitMetricLoop[T any, TagsT metrics.Tags](uq Queue[T], metricsClient metrics.Client, tags TagsT) { + metrics.NewMonitor(metricsClient, tags, func() float64 { return float64(uq.Length()) }) } // Sends an item to the queue. From e4d75a1977066eafcffa6238b1c9910940290b04 Mon Sep 17 00:00:00 2001 From: chankyin Date: Mon, 30 Sep 2024 17:10:15 +0800 Subject: [PATCH 2/2] chore: make fmt --- pkg/audit/webhook/webhook.go | 7 ++++++- pkg/util/channel/channel.go | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/pkg/audit/webhook/webhook.go b/pkg/audit/webhook/webhook.go index 99f12952..7f68e942 100644 --- a/pkg/audit/webhook/webhook.go +++ b/pkg/audit/webhook/webhook.go @@ -46,7 +46,12 @@ type options struct { func (options *options) Setup(fs *pflag.FlagSet) { fs.BoolVar(&options.enable, "audit-webhook-enable", false, "enable audit webhook") - fs.BoolVar(&options.enable, "audit-webhook-subscriber-queue-capacity", false, "maximum number of events buffered in the webhook, set to 0 for an unbounded buffer (may cause OOM)") + fs.BoolVar( + &options.enable, + "audit-webhook-subscriber-queue-capacity", + false, + "maximum number of events buffered in the webhook, set to 0 for an unbounded buffer (may cause OOM)", + ) } func (options *options) EnableFlag() *bool { return &options.enable } diff --git a/pkg/util/channel/channel.go b/pkg/util/channel/channel.go index 7a38214e..dc608c07 100644 --- a/pkg/util/channel/channel.go +++ b/pkg/util/channel/channel.go @@ -46,7 +46,7 @@ func (ch BoundedQueue[T]) GetAndResetLength() int { return len(ch) // for simplicity we just return the channel length } -func (ch BoundedQueue[T]) Receiver() <-chan T{ +func (ch BoundedQueue[T]) Receiver() <-chan T { return ch }