From 60b0519ba46be36db407d501e1cb87b710ea5691 Mon Sep 17 00:00:00 2001 From: Viktor Singer Date: Thu, 13 Jun 2024 09:40:22 +0200 Subject: [PATCH 1/2] allow setting deny_purge on stream --- controllers/jetstream/stream.go | 7 ++++++- deploy/crds.yml | 4 ++++ pkg/jetstream/apis/jetstream/v1beta2/streamtypes.go | 1 + .../applyconfiguration/jetstream/v1beta2/streamspec.go | 9 +++++++++ .../generated/informers/externalversions/factory.go | 10 ++++++++++ 5 files changed, 30 insertions(+), 1 deletion(-) diff --git a/controllers/jetstream/stream.go b/controllers/jetstream/stream.go index 5c5844cb..ad5f6059 100644 --- a/controllers/jetstream/stream.go +++ b/controllers/jetstream/stream.go @@ -418,11 +418,16 @@ func createStream(ctx context.Context, c jsmClient, spec apis.StreamSpec) (err e opts = append(opts, jsm.DenyDelete()) } + if spec.DenyPurge { + opts = append(opts, jsm.DenyPurge()) + } + if spec.DiscardPerSubject { opts = append(opts, jsm.DiscardNewPerSubject()) } + if spec.FirstSequence != 0 { - opts = append(opts, jsm.FirstSequence(uint64(spec.FirstSequence))) + opts = append(opts, jsm.FirstSequence(spec.FirstSequence)) } if spec.Metadata != nil { diff --git a/deploy/crds.yml b/deploy/crds.yml index 203be8f8..bcfb9a13 100644 --- a/deploy/crds.yml +++ b/deploy/crds.yml @@ -262,6 +262,10 @@ spec: description: When true, restricts the ability to delete messages from a stream via the API. Cannot be changed once set to true. type: boolean default: false + denyPurge: + description: When true, restricts the ability to purge a stream via the API. Cannot be changed once set to true. + type: boolean + default: false discardPerSubject: description: Allows to discard messages on a subject basis. type: boolean diff --git a/pkg/jetstream/apis/jetstream/v1beta2/streamtypes.go b/pkg/jetstream/apis/jetstream/v1beta2/streamtypes.go index c93063a4..1128d1c4 100644 --- a/pkg/jetstream/apis/jetstream/v1beta2/streamtypes.go +++ b/pkg/jetstream/apis/jetstream/v1beta2/streamtypes.go @@ -27,6 +27,7 @@ type StreamSpec struct { AllowRollup bool `json:"allowRollup"` Creds string `json:"creds"` DenyDelete bool `json:"denyDelete"` + DenyPurge bool `json:"denyPurge"` Description string `json:"description"` DiscardPerSubject bool `json:"discardPerSubject"` PreventDelete bool `json:"preventDelete"` diff --git a/pkg/jetstream/generated/applyconfiguration/jetstream/v1beta2/streamspec.go b/pkg/jetstream/generated/applyconfiguration/jetstream/v1beta2/streamspec.go index e5d310b0..e52a845a 100644 --- a/pkg/jetstream/generated/applyconfiguration/jetstream/v1beta2/streamspec.go +++ b/pkg/jetstream/generated/applyconfiguration/jetstream/v1beta2/streamspec.go @@ -27,6 +27,7 @@ type StreamSpecApplyConfiguration struct { AllowRollup *bool `json:"allowRollup,omitempty"` Creds *string `json:"creds,omitempty"` DenyDelete *bool `json:"denyDelete,omitempty"` + DenyPurge *bool `json:"denyPurge,omitempty"` Description *string `json:"description,omitempty"` DiscardPerSubject *bool `json:"discardPerSubject,omitempty"` PreventDelete *bool `json:"preventDelete,omitempty"` @@ -104,6 +105,14 @@ func (b *StreamSpecApplyConfiguration) WithDenyDelete(value bool) *StreamSpecApp return b } +// WithDenyPurge sets the DenyPurge field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the DenyPurge field is set to the value of the last call. +func (b *StreamSpecApplyConfiguration) WithDenyPurge(value bool) *StreamSpecApplyConfiguration { + b.DenyPurge = &value + return b +} + // WithDescription sets the Description field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. // If called multiple times, the Description field is set to the value of the last call. diff --git a/pkg/jetstream/generated/informers/externalversions/factory.go b/pkg/jetstream/generated/informers/externalversions/factory.go index a2f4c654..9bc8ae93 100644 --- a/pkg/jetstream/generated/informers/externalversions/factory.go +++ b/pkg/jetstream/generated/informers/externalversions/factory.go @@ -39,6 +39,7 @@ type sharedInformerFactory struct { lock sync.Mutex defaultResync time.Duration customResync map[reflect.Type]time.Duration + transform cache.TransformFunc informers map[reflect.Type]cache.SharedIndexInformer // startedInformers is used for tracking which informers have been started. @@ -77,6 +78,14 @@ func WithNamespace(namespace string) SharedInformerOption { } } +// WithTransform sets a transform on all informers. +func WithTransform(transform cache.TransformFunc) SharedInformerOption { + return func(factory *sharedInformerFactory) *sharedInformerFactory { + factory.transform = transform + return factory + } +} + // NewSharedInformerFactory constructs a new instance of sharedInformerFactory for all namespaces. func NewSharedInformerFactory(client versioned.Interface, defaultResync time.Duration) SharedInformerFactory { return NewSharedInformerFactoryWithOptions(client, defaultResync) @@ -181,6 +190,7 @@ func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internal } informer = newFunc(f.client, resyncPeriod) + informer.SetTransform(f.transform) f.informers[informerType] = informer return informer From 7d0b6a3d9e38723bc6802561fde4814e7b81f3d2 Mon Sep 17 00:00:00 2001 From: Viktor Singer Date: Thu, 13 Jun 2024 09:56:00 +0200 Subject: [PATCH 2/2] set denyPurge on stream update --- controllers/jetstream/stream.go | 1 + 1 file changed, 1 insertion(+) diff --git a/controllers/jetstream/stream.go b/controllers/jetstream/stream.go index ad5f6059..7dc2ec89 100644 --- a/controllers/jetstream/stream.go +++ b/controllers/jetstream/stream.go @@ -491,6 +491,7 @@ func updateStream(ctx context.Context, c jsmClient, spec apis.StreamSpec) (err e Duplicates: duplicates, AllowDirect: spec.AllowDirect, DenyDelete: spec.DenyDelete, + DenyPurge: spec.DenyPurge, RollupAllowed: spec.AllowRollup, FirstSeq: spec.FirstSequence, SubjectTransform: subjectTransform,