Skip to content

Commit

Permalink
[release-v1.11] fix: triggers use correct consumer group id generated…
Browse files Browse the repository at this point in the history
… from template (knative-extensions#3779) (#1038) (#1041)

* fix: triggers use correct consumer group id generated from template (knative-extensions#3779)

* add rekt test to catch trigger cg being wrong



* fix cm key name



* fix: trigger controller now watches for changes to the config-kafka-features cm



* fixed unit test



* also watch cm for namespaced triggers



* goimports



* don't use manifest.InstallYamlFS because it deletes the config-kafka-features cm after the test



---------



* cleanup: consumergroup template test does not modify configmap now (knative-extensions#3782)

* cleanup: consumergroup template test does not modify configmap now



* account for different possible cm keys



---------



* update configmap in CI to test non default values



* Update openshift/e2e-common.sh

* clone correct SO branch



---------

Signed-off-by: Calum Murray <[email protected]>
Co-authored-by: Pierangelo Di Pilato <[email protected]>
Co-authored-by: Pierangelo Di Pilato <[email protected]>
  • Loading branch information
3 people authored May 7, 2024
1 parent 145e6fb commit 208c7c1
Show file tree
Hide file tree
Showing 14 changed files with 204 additions and 3 deletions.
8 changes: 8 additions & 0 deletions control-plane/pkg/reconciler/trigger/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,14 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
impl.FilteredGlobalResync(filterTriggers(reconciler.BrokerLister, kafka.BrokerClass, FinalizerName), triggerInformer.Informer())
}

kafkaConfigStore := apisconfig.NewStore(ctx, func(name string, value *apisconfig.KafkaFeatureFlags) {
reconciler.KafkaFeatureFlags.Reset(value)
if globalResync != nil {
globalResync(nil)
}
})
kafkaConfigStore.WatchConfigs(watcher)

featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) {
if globalResync != nil {
globalResync(nil)
Expand Down
4 changes: 4 additions & 0 deletions control-plane/pkg/reconciler/trigger/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ func TestNewController(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: "config-features",
},
}, &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "config-kafka-features",
},
}), &config.Env{})
if controller == nil {
t.Error("failed to create controller: <nil>")
Expand Down
11 changes: 11 additions & 0 deletions control-plane/pkg/reconciler/trigger/namespaced_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"

"github.com/IBM/sarama"

apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config"
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/offset"

"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -85,6 +87,7 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, con
NewKafkaClient: sarama.NewClient,
NewKafkaClusterAdminClient: sarama.NewClusterAdmin,
InitOffsetsFunc: offset.InitOffsets,
KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(),
}

impl := triggerreconciler.NewImpl(ctx, reconciler, func(impl *controller.Impl) controller.Options {
Expand Down Expand Up @@ -115,6 +118,14 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, con
impl.GlobalResync(brokerInformer.Informer())
}

kafkaConfigStore := apisconfig.NewStore(ctx, func(name string, value *apisconfig.KafkaFeatureFlags) {
reconciler.KafkaFeatureFlags.Reset(value)
if globalResync != nil {
globalResync(nil)
}
})
kafkaConfigStore.WatchConfigs(watcher)

featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) {
if globalResync != nil {
globalResync(nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ func TestNewNamespacedController(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: "config-features",
},
}, &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "config-kafka-features",
},
}), &config.Env{})
if controller == nil {
t.Error("failed to create controller: <nil>")
Expand Down
3 changes: 2 additions & 1 deletion control-plane/pkg/reconciler/trigger/namespaced_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type NamespacedReconciler struct {
NewKafkaClusterAdminClient kafka.NewClusterAdminClientFunc
NewKafkaClient kafka.NewClientFunc
InitOffsetsFunc kafka.InitOffsetsFunc
KafkaFeatureFlags *apisconfig.KafkaFeatureFlags
}

func (r *NamespacedReconciler) ReconcileKind(ctx context.Context, trigger *eventing.Trigger) reconciler.Event {
Expand Down Expand Up @@ -91,7 +92,7 @@ func (r *NamespacedReconciler) createReconcilerForTriggerInstance(trigger *event
// override
BrokerClass: kafka.NamespacedBrokerClass,
DataPlaneConfigMapLabeler: kafka.NamespacedDataplaneLabelConfigmapOption,
KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(),
KafkaFeatureFlags: r.KafkaFeatureFlags,
NewKafkaClusterAdminClient: r.NewKafkaClusterAdminClient,
NewKafkaClient: r.NewKafkaClient,
InitOffsetsFunc: r.InitOffsetsFunc,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
triggerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/trigger"
reconcilertesting "knative.dev/eventing/pkg/reconciler/testing/v1"

apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config"
"knative.dev/eventing-kafka-broker/control-plane/pkg/config"
"knative.dev/eventing-kafka-broker/control-plane/pkg/contract"
kafkatesting "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/testing"
Expand Down Expand Up @@ -198,6 +199,7 @@ func useNamespacedTable(t *testing.T, table TableTest, env *config.Env) {
T: t,
}, nil
},
KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(),
}

reconciler.Resolver = resolver.NewURIResolverFromTracker(ctx, tracker.New(func(name types.NamespacedName) {}, 0))
Expand Down
13 changes: 12 additions & 1 deletion openshift/e2e-common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ function timeout() {
function install_serverless() {
header "Installing Serverless Operator"

GO111MODULE=off go get -u github.com/openshift-knative/hack/cmd/sobranch

local release
release=$(yq r "${SCRIPT_DIR}/project.yaml" project.tag)
release=${release/knative-/}
so_branch=$( $(go env GOPATH)/bin/sobranch --upstream-version "${release}")

cat <<EOF | oc apply -f -
apiVersion: v1
kind: Namespace
Expand All @@ -75,7 +82,7 @@ EOF
export KNATIVE_EVENTING_KAFKA_BROKER_MANIFESTS_DIR

local operator_dir=/tmp/serverless-operator
git clone --branch main https://github.com/openshift-knative/serverless-operator.git $operator_dir
git clone --branch "${so_branch}" https://github.com/openshift-knative/serverless-operator.git $operator_dir || git clone --branch main https://github.com/openshift-knative/serverless-operator.git $operator_dir
export GOPATH=/tmp/go
local failed=0
pushd $operator_dir || return $?
Expand Down Expand Up @@ -148,6 +155,10 @@ function run_e2e_new_tests() {

go_test_e2e -timeout=100m ./test/e2e_new/... "${common_opts[@]}" || return $?
go_test_e2e -timeout=100m ./test/e2e_new_channel/... "${common_opts[@]}" || return $?

oc patch knativekafka --type merge -n "${EVENTING_NAMESPACE}" knative-kafka --patch-file "${SCRIPT_DIR}/knative-kafka-update-consumergroup-template.yaml"
go_test_e2e -timeout=15m ./test/e2e_new -run TestTriggerUsesConsumerGroupIDFromTemplate "${common_opts[@]}" || return $?
oc patch knativekafka --type merge -n "${EVENTING_NAMESPACE}" knative-kafka --patch-file "${SCRIPT_DIR}/knative-kafka-default-consumergroup-template.yaml"
}

function run_e2e_encryption_auth_tests(){
Expand Down
4 changes: 4 additions & 0 deletions openshift/knative-kafka-default-consumergroup-template.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
spec:
config:
config-kafka-features:
triggers.consumergroup.template: "knative-trigger-{{ .Namespace }}-{{ .Name }}"
4 changes: 4 additions & 0 deletions openshift/knative-kafka-update-consumergroup-template.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
spec:
config:
config-kafka-features:
triggers.consumergroup.template: "test-trigger-{{ .Name }}-{{ .Namespace }}"
26 changes: 26 additions & 0 deletions test/config-kafka-features/new-cg-id.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Copyright 2024 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: v1
kind: ConfigMap
metadata:
name: config-kafka-features
namespace: knative-eventing
data:
dispatcher-rate-limiter: "disabled"
dispatcher-ordered-executor-metrics: "disabled"
controller-autoscaler-keda: "disabled"
triggers-consumergroup-template: "test-{{ .Namespace }}-{{ .Name }}-trigger"
brokers-topic-template: "knative-broker-{{ .Namespace }}-{{ .Name }}"
channels-topic-template: "knative-messaging-kafka.{{ .Namespace }}.{{ .Name }}"
26 changes: 26 additions & 0 deletions test/config-kafka-features/restore-cg-id.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Copyright 2024 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: v1
kind: ConfigMap
metadata:
name: config-kafka-features
namespace: knative-eventing
data:
dispatcher-rate-limiter: "disabled"
dispatcher-ordered-executor-metrics: "disabled"
controller-autoscaler-keda: "disabled"
triggers-consumergroup-template: "knative-trigger-{{ .Namespace }}-{{ .Name }}"
brokers-topic-template: "knative-broker-{{ .Namespace }}-{{ .Name }}"
channels-topic-template: "knative-messaging-kafka.{{ .Namespace }}.{{ .Name }}"
12 changes: 12 additions & 0 deletions test/e2e_new/broker_sasl_ssl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,18 @@ func TestRestrictedBrokerAuthSslSaslScram512(t *testing.T) {
env.Test(ctx, t, features.SetupBrokerAuthRestrictedSslSaslScram512(ctx))
}

func TestTriggerUsesConsumerGroupIDFromTemplate(t *testing.T) {
ctx, env := global.Environment(
knative.WithKnativeNamespace(system.Namespace()),
knative.WithLoggingConfig,
knative.WithTracingConfig,
k8s.WithEventListener,
environment.Managed(t),
)

env.Test(ctx, t, features.TriggerUsesConsumerGroupIDTemplate())
}

func TestBrokerNotReadyWithoutAuthSecret(t *testing.T) {
t.Parallel()

Expand Down
8 changes: 8 additions & 0 deletions test/reconciler-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ go_test_e2e -tags=e2e,cloudevents -timeout=1h ./test/e2e_new_channel/... || fail

go_test_e2e -tags=deletecm ./test/e2e_new/... || fail_test "E2E (new deletecm) suite failed"

echo "Running E2E Reconciler tests with consumergroup id template changed"

kubectl apply -f "$(dirname "$0")/config-kafka-features/new-cg-id.yaml"

go_test_e2e -tags=e2e -timeout=15m ./test/e2e_new -run TestTriggerUsesConsumerGroupIDFromTemplate

kubectl apply -f "$(dirname "$0")/config-kafka-features/restore-cg-id.yaml"

echo "Running E2E Reconciler Tests with strict transport encryption"

kubectl apply -Rf "$(dirname "$0")/config-transport-encryption"
Expand Down
82 changes: 81 additions & 1 deletion test/rekt/features/broker_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,34 @@
package features

import (
"bytes"
"context"
"text/template"
"time"

kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/system"

"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka"
"knative.dev/reconciler-test/pkg/environment"
"knative.dev/reconciler-test/pkg/resources/service"

"github.com/cloudevents/sdk-go/v2/test"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/google/uuid"
testpkg "knative.dev/eventing-kafka-broker/test/pkg"
"knative.dev/eventing-kafka-broker/test/rekt/resources/kafkaauthsecret"

triggersclient "knative.dev/eventing/pkg/client/injection/client"
"knative.dev/eventing/test/rekt/resources/broker"
"knative.dev/eventing/test/rekt/resources/trigger"

"knative.dev/reconciler-test/pkg/eventshub"
"knative.dev/reconciler-test/pkg/eventshub/assert"
"knative.dev/reconciler-test/pkg/feature"
"knative.dev/reconciler-test/pkg/manifest"

"knative.dev/reconciler-test/resources/svc"

brokerconfigmap "knative.dev/eventing-kafka-broker/test/rekt/resources/configmap/broker"
Expand Down Expand Up @@ -142,3 +153,72 @@ func BrokerNotReadyWithoutAuthSecret() *feature.Feature {

return f
}

func TriggerUsesConsumerGroupIDTemplate() *feature.Feature {
f := feature.NewFeature()

brokerName := feature.MakeRandomK8sName("broker")
triggerName := feature.MakeRandomK8sName("trigger")
sinkName := feature.MakeRandomK8sName("sink")

f.Setup("install broker", broker.Install(brokerName))
f.Setup("install sink", eventshub.Install(sinkName, eventshub.StartReceiver))

f.Setup("broker is ready", broker.IsReady(brokerName))
f.Setup("broker is addressable", broker.IsAddressable(brokerName))

f.Requirement("install trigger", trigger.Install(triggerName, brokerName, trigger.WithSubscriber(service.AsKReference(sinkName), "")))
f.Requirement("trigger is ready", trigger.IsReady(triggerName))

// check that the trigger has the correct annotation
f.Assert("trigger has correct consumergroup template", checkTriggerConsumerGroupIDAnnotation(triggerName))

return f
}

func checkTriggerConsumerGroupIDAnnotation(triggerName string) feature.StepFn {
return func(ctx context.Context, t feature.T) {
ns := environment.FromContext(ctx).Namespace()

cm, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(ctx, "config-kafka-features", metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}

expectedTemplateStr, ok := cm.Data["triggers-consumergroup-template"]
if !ok {
// there are two keys the value could be in
expectedTemplateStr, ok = cm.Data["triggers.consumergroup.template"]
if !ok {
t.Fatal("no consumergroup template in config-kafka-features")
}
}

expectedTemplate, err := template.New("consumergroup-id").Parse(expectedTemplateStr)
if err != nil {
t.Fatal(err)
}

trig, err := triggersclient.Get(ctx).EventingV1().Triggers(ns).Get(ctx, triggerName, metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}

var expectedBytes bytes.Buffer
err = expectedTemplate.Execute(&expectedBytes, trig.ObjectMeta)
if err != nil {
t.Fatal(err)
}

expectedAnnotation := expectedBytes.String()

cgAnnotation, ok := trig.Status.Annotations[kafka.GroupIdAnnotation]
if !ok {
t.Fatal("no consumer group annotation present on the trigger")
}

if cgAnnotation != expectedAnnotation {
t.Fatalf("consumer group id annotation was not equal to expected value. expected %s, got %s", expectedAnnotation, cgAnnotation)
}
}
}

0 comments on commit 208c7c1

Please sign in to comment.