From 208c7c112371889f9a34008d0e0600810f8d47e4 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Tue, 7 May 2024 08:56:20 -0400 Subject: [PATCH] [release-v1.11] fix: triggers use correct consumer group id generated from template (#3779) (#1038) (#1041) * fix: triggers use correct consumer group id generated from template (#3779) * add rekt test to catch trigger cg being wrong * fix cm key name * fix: trigger controller now watches for changes to the config-kafka-features cm * fixed unit test * also watch cm for namespaced triggers * goimports * don't use manifest.InstallYamlFS because it deletes the config-kafka-features cm after the test --------- * cleanup: consumergroup template test does not modify configmap now (#3782) * cleanup: consumergroup template test does not modify configmap now * account for different possible cm keys --------- * update configmap in CI to test non default values * Update openshift/e2e-common.sh * clone correct SO branch --------- Signed-off-by: Calum Murray Co-authored-by: Pierangelo Di Pilato Co-authored-by: Pierangelo Di Pilato --- .../pkg/reconciler/trigger/controller.go | 8 ++ .../pkg/reconciler/trigger/controller_test.go | 4 + .../trigger/namespaced_controller.go | 11 +++ .../trigger/namespaced_controller_test.go | 4 + .../reconciler/trigger/namespaced_trigger.go | 3 +- .../trigger/namespaced_trigger_test.go | 2 + openshift/e2e-common.sh | 13 ++- ...-kafka-default-consumergroup-template.yaml | 4 + ...e-kafka-update-consumergroup-template.yaml | 4 + test/config-kafka-features/new-cg-id.yaml | 26 ++++++ test/config-kafka-features/restore-cg-id.yaml | 26 ++++++ test/e2e_new/broker_sasl_ssl_test.go | 12 +++ test/reconciler-tests.sh | 8 ++ test/rekt/features/broker_auth.go | 82 ++++++++++++++++++- 14 files changed, 204 insertions(+), 3 deletions(-) create mode 100644 openshift/knative-kafka-default-consumergroup-template.yaml create mode 100644 openshift/knative-kafka-update-consumergroup-template.yaml create mode 100644 test/config-kafka-features/new-cg-id.yaml create mode 100644 test/config-kafka-features/restore-cg-id.yaml diff --git a/control-plane/pkg/reconciler/trigger/controller.go b/control-plane/pkg/reconciler/trigger/controller.go index b9b78f32c1..4347dca7ef 100644 --- a/control-plane/pkg/reconciler/trigger/controller.go +++ b/control-plane/pkg/reconciler/trigger/controller.go @@ -124,6 +124,14 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf impl.FilteredGlobalResync(filterTriggers(reconciler.BrokerLister, kafka.BrokerClass, FinalizerName), triggerInformer.Informer()) } + kafkaConfigStore := apisconfig.NewStore(ctx, func(name string, value *apisconfig.KafkaFeatureFlags) { + reconciler.KafkaFeatureFlags.Reset(value) + if globalResync != nil { + globalResync(nil) + } + }) + kafkaConfigStore.WatchConfigs(watcher) + featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) { if globalResync != nil { globalResync(nil) diff --git a/control-plane/pkg/reconciler/trigger/controller_test.go b/control-plane/pkg/reconciler/trigger/controller_test.go index 7fb6b03ad3..d3e2154a8e 100644 --- a/control-plane/pkg/reconciler/trigger/controller_test.go +++ b/control-plane/pkg/reconciler/trigger/controller_test.go @@ -47,6 +47,10 @@ func TestNewController(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "config-features", }, + }, &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "config-kafka-features", + }, }), &config.Env{}) if controller == nil { t.Error("failed to create controller: ") diff --git a/control-plane/pkg/reconciler/trigger/namespaced_controller.go b/control-plane/pkg/reconciler/trigger/namespaced_controller.go index d6f4a1ab45..0ca7459221 100644 --- a/control-plane/pkg/reconciler/trigger/namespaced_controller.go +++ b/control-plane/pkg/reconciler/trigger/namespaced_controller.go @@ -20,6 +20,8 @@ import ( "context" "github.com/IBM/sarama" + + apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/offset" "k8s.io/client-go/tools/cache" @@ -85,6 +87,7 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, con NewKafkaClient: sarama.NewClient, NewKafkaClusterAdminClient: sarama.NewClusterAdmin, InitOffsetsFunc: offset.InitOffsets, + KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(), } impl := triggerreconciler.NewImpl(ctx, reconciler, func(impl *controller.Impl) controller.Options { @@ -115,6 +118,14 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, con impl.GlobalResync(brokerInformer.Informer()) } + kafkaConfigStore := apisconfig.NewStore(ctx, func(name string, value *apisconfig.KafkaFeatureFlags) { + reconciler.KafkaFeatureFlags.Reset(value) + if globalResync != nil { + globalResync(nil) + } + }) + kafkaConfigStore.WatchConfigs(watcher) + featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) { if globalResync != nil { globalResync(nil) diff --git a/control-plane/pkg/reconciler/trigger/namespaced_controller_test.go b/control-plane/pkg/reconciler/trigger/namespaced_controller_test.go index 0907bd11bf..4f1a540dd9 100644 --- a/control-plane/pkg/reconciler/trigger/namespaced_controller_test.go +++ b/control-plane/pkg/reconciler/trigger/namespaced_controller_test.go @@ -42,6 +42,10 @@ func TestNewNamespacedController(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "config-features", }, + }, &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "config-kafka-features", + }, }), &config.Env{}) if controller == nil { t.Error("failed to create controller: ") diff --git a/control-plane/pkg/reconciler/trigger/namespaced_trigger.go b/control-plane/pkg/reconciler/trigger/namespaced_trigger.go index 8752f501aa..5d0e2bd31b 100644 --- a/control-plane/pkg/reconciler/trigger/namespaced_trigger.go +++ b/control-plane/pkg/reconciler/trigger/namespaced_trigger.go @@ -48,6 +48,7 @@ type NamespacedReconciler struct { NewKafkaClusterAdminClient kafka.NewClusterAdminClientFunc NewKafkaClient kafka.NewClientFunc InitOffsetsFunc kafka.InitOffsetsFunc + KafkaFeatureFlags *apisconfig.KafkaFeatureFlags } func (r *NamespacedReconciler) ReconcileKind(ctx context.Context, trigger *eventing.Trigger) reconciler.Event { @@ -91,7 +92,7 @@ func (r *NamespacedReconciler) createReconcilerForTriggerInstance(trigger *event // override BrokerClass: kafka.NamespacedBrokerClass, DataPlaneConfigMapLabeler: kafka.NamespacedDataplaneLabelConfigmapOption, - KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(), + KafkaFeatureFlags: r.KafkaFeatureFlags, NewKafkaClusterAdminClient: r.NewKafkaClusterAdminClient, NewKafkaClient: r.NewKafkaClient, InitOffsetsFunc: r.InitOffsetsFunc, diff --git a/control-plane/pkg/reconciler/trigger/namespaced_trigger_test.go b/control-plane/pkg/reconciler/trigger/namespaced_trigger_test.go index 91f8b01001..d89857484e 100644 --- a/control-plane/pkg/reconciler/trigger/namespaced_trigger_test.go +++ b/control-plane/pkg/reconciler/trigger/namespaced_trigger_test.go @@ -38,6 +38,7 @@ import ( triggerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/trigger" reconcilertesting "knative.dev/eventing/pkg/reconciler/testing/v1" + apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/contract" kafkatesting "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/testing" @@ -198,6 +199,7 @@ func useNamespacedTable(t *testing.T, table TableTest, env *config.Env) { T: t, }, nil }, + KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(), } reconciler.Resolver = resolver.NewURIResolverFromTracker(ctx, tracker.New(func(name types.NamespacedName) {}, 0)) diff --git a/openshift/e2e-common.sh b/openshift/e2e-common.sh index 86dad307e7..b082769723 100755 --- a/openshift/e2e-common.sh +++ b/openshift/e2e-common.sh @@ -56,6 +56,13 @@ function timeout() { function install_serverless() { header "Installing Serverless Operator" + GO111MODULE=off go get -u github.com/openshift-knative/hack/cmd/sobranch + + local release + release=$(yq r "${SCRIPT_DIR}/project.yaml" project.tag) + release=${release/knative-/} + so_branch=$( $(go env GOPATH)/bin/sobranch --upstream-version "${release}") + cat <