Skip to content

Commit

Permalink
Additional logging
Browse files Browse the repository at this point in the history
Signed-off-by: Pierangelo Di Pilato <[email protected]>
  • Loading branch information
pierDipi committed Nov 28, 2024
1 parent a72139d commit 4fc33b4
Showing 1 changed file with 28 additions and 13 deletions.
41 changes: 28 additions & 13 deletions test/e2e/sacura_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage/names"
"k8s.io/client-go/dynamic"
"k8s.io/utils/pointer"
Expand All @@ -43,6 +44,7 @@ import (
eventingclientset "knative.dev/eventing/pkg/client/clientset/versioned"
testlib "knative.dev/eventing/test/lib"

kafkainternals "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internalskafkaeventing/v1alpha1"
sources "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/sources/v1"
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka"

Expand Down Expand Up @@ -106,20 +108,14 @@ func runSacuraTest(t *testing.T, config SacuraTestConfig) {

ctx := context.Background()

w, err := c.Dynamic.Resource(config.ConsumerResourceGVR).
Namespace(config.Namespace).
Watch(ctx, metav1.ListOptions{})
if err != nil {
t.Fatal("Failed to watch resource", config.ConsumerResourceGVR, err)
}
defer w.Stop()
watchUserFacingResource := watchResource(t, ctx, c.Dynamic, config.Namespace, config.ConsumerResourceGVR)
t.Cleanup(watchUserFacingResource.Stop)

go func() {
for e := range w.ResultChan() {
bytes, _ := json.MarshalIndent(e, "", " ")
t.Logf("Consumer resource %q changed:\n%s\n\n", config.ConsumerResourceGVR.String(), string(bytes))
}
}()
watchConsumerGroups := watchResource(t, ctx, c.Dynamic, config.Namespace, kafkainternals.SchemeGroupVersion.WithResource("consumergroups"))
t.Cleanup(watchConsumerGroups.Stop)

watchConsumer := watchResource(t, ctx, c.Dynamic, config.Namespace, kafkainternals.SchemeGroupVersion.WithResource("consumers"))
t.Cleanup(watchConsumer.Stop)

jobPollError := wait.Poll(pollInterval, pollTimeout, func() (done bool, err error) {
job, err := c.Kube.BatchV1().Jobs(config.Namespace).Get(ctx, app, metav1.GetOptions{})
Expand Down Expand Up @@ -218,3 +214,22 @@ func getKafkaSubscriptionConsumerGroup(ctx context.Context, c dynamic.Interface,
return fmt.Sprintf("kafka.%s.%s.%s", c, sacuraChannelName, string(sub.UID))
}
}

func watchResource(t *testing.T, ctx context.Context, dynamic dynamic.Interface, ns string, gvr schema.GroupVersionResource) watch.Interface {

w, err := dynamic.Resource(gvr).
Namespace(ns).
Watch(ctx, metav1.ListOptions{})
if err != nil {
t.Fatal("Failed to watch resource", gvr, err)
}

go func() {
for e := range w.ResultChan() {
bytes, _ := json.MarshalIndent(e, "", " ")
t.Logf("Resource %q changed:\n%s\n\n", gvr.String(), string(bytes))
}
}()

return w
}

0 comments on commit 4fc33b4

Please sign in to comment.