Skip to content

Commit

Permalink
Support dispatcher format for triggers (#8151)
Browse files Browse the repository at this point in the history
* feat: added delivery format to trigger crd

Signed-off-by: Calum Murray <[email protected]>

* feat: support delivery format in filter handler

Signed-off-by: Calum Murray <[email protected]>

* test: add e2e test to verify trigger delivers events with the correct format

Signed-off-by: Calum Murray <[email protected]>

---------

Signed-off-by: Calum Murray <[email protected]>
  • Loading branch information
Cali0707 authored Aug 22, 2024
1 parent 477588c commit da10b71
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 2 deletions.
3 changes: 3 additions & 0 deletions config/core/resources/trigger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ spec:
description: Retry is the minimum number of retries the sender should attempt when sending an event before moving it to the dead letter sink.
type: integer
format: int32
format:
description: Format is the format used to serialize the event into a http request when delivering the event. It can be json (for structured events), binary (for binary events), or unset.
type: string
filter:
description: 'Filter is the filter to apply against all events from the Broker. Only events that pass this filter will be sent to the Subscriber. If not specified, will default to allowing all events.'
type: object
Expand Down
13 changes: 11 additions & 2 deletions pkg/broker/filter/filter_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,10 +423,15 @@ func (h *Handler) handleDispatchToSubscriberRequest(ctx context.Context, trigger
Audience: trigger.Status.SubscriberAudience,
}

h.send(ctx, writer, utils.PassThroughHeaders(request.Header), target, reportArgs, event, trigger, ttl)
sendOptions := []kncloudevents.SendOption{}
if trigger.Spec.Delivery != nil && trigger.Spec.Delivery.Format != nil {
sendOptions = append(sendOptions, kncloudevents.WithEventFormat(trigger.Spec.Delivery.Format))
}

h.send(ctx, writer, utils.PassThroughHeaders(request.Header), target, reportArgs, event, trigger, ttl, sendOptions...)
}

func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers http.Header, target duckv1.Addressable, reportArgs *ReportArgs, event *cloudevents.Event, t *eventingv1.Trigger, ttl int32) {
func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers http.Header, target duckv1.Addressable, reportArgs *ReportArgs, event *cloudevents.Event, t *eventingv1.Trigger, ttl int32, sendOpts ...kncloudevents.SendOption) {
additionalHeaders := headers.Clone()
additionalHeaders.Set(apis.KnNamespaceHeader, t.GetNamespace())

Expand Down Expand Up @@ -454,6 +459,10 @@ func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers
}))
}

if len(sendOpts) > 0 {
opts = append(opts, sendOpts...)
}

dispatchInfo, err := h.eventDispatcher.SendEvent(ctx, *event, target, opts...)
if err != nil {
h.logger.Error("failed to send event", zap.Error(err))
Expand Down
34 changes: 34 additions & 0 deletions test/rekt/features/trigger/feature.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package trigger

import (
"context"
"fmt"

"github.com/cloudevents/sdk-go/v2/test"
"k8s.io/utils/pointer"
Expand Down Expand Up @@ -96,6 +97,39 @@ func TriggerDependencyAnnotation() *feature.Feature {
return f
}

func TriggerSupportsDeliveryFormat() *feature.FeatureSet {
return &feature.FeatureSet{
Name: "Trigger supports delivery format",
Features: []*feature.Feature{triggerWithDispatcherFormat("json"), triggerWithDispatcherFormat("binary")},
}
}

func triggerWithDispatcherFormat(format string) *feature.Feature {
f := feature.NewFeatureNamed(fmt.Sprintf("Trigger supports sending with %s delivery format", format))

brokerName := feature.MakeRandomK8sName("broker")
sourceName := feature.MakeRandomK8sName("source")
sinkName := feature.MakeRandomK8sName("sink")
triggerName := feature.MakeRandomK8sName("trigger")
eventToSend := test.FullEvent()

f.Setup("Install Broker", broker.Install(brokerName, broker.WithEnvConfig()...))
f.Setup("Broker is ready", broker.IsReady(brokerName))
f.Setup("Broker is addressable", broker.IsAddressable(brokerName))

f.Setup("Install Sink", eventshub.Install(sinkName, eventshub.VerifyEventFormat(format), eventshub.StartReceiver))

f.Setup("Install trigger", trigger.Install(triggerName, trigger.WithBrokerName(brokerName), trigger.WithFormat(format), trigger.WithSubscriber(service.AsKReference(sinkName), "")))
f.Setup("Trigger is ready", trigger.IsReady(triggerName))

f.Requirement("Install source", eventshub.Install(sourceName, eventshub.InputEvent(eventToSend), eventshub.StartSenderToResource(broker.GVR(), brokerName)))

f.Alpha("trigger").
Must("dispatch event with correct format", assert.OnStore(sinkName).MatchReceivedEvent(test.HasId(eventToSend.ID())).AtLeast(1))

return f
}

func TriggerWithTLSSubscriber() *feature.Feature {
f := feature.NewFeatureNamed("Trigger with TLS subscriber")

Expand Down
11 changes: 11 additions & 0 deletions test/rekt/resources/delivery/delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,17 @@ func WithRetry(count int32, backoffPolicy *eventingv1.BackoffPolicyType, backoff
}
}

func WithFormat(format string) manifest.CfgFn {
return func(cfg map[string]interface{}) {
if _, set := cfg["delivery"]; !set {
cfg["delivery"] = map[string]interface{}{}
}
delivery := cfg["delivery"].(map[string]interface{})

delivery["format"] = format
}
}

// WithTimeout adds the timeout related config to the config.
func WithTimeout(timeout string) manifest.CfgFn {
return func(cfg map[string]interface{}) {
Expand Down
3 changes: 3 additions & 0 deletions test/rekt/resources/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ var WithRetry = delivery.WithRetry
// WithTimeout adds the timeout related config to the config.
var WithTimeout = delivery.WithTimeout

// WithFormat adds the format related config to a Trigger spec
var WithFormat = delivery.WithFormat

// Install will create a Trigger resource, augmented with the config fn options.
func Install(name string, opts ...manifest.CfgFn) feature.StepFn {
cfg := map[string]interface{}{
Expand Down
3 changes: 3 additions & 0 deletions test/rekt/resources/trigger/trigger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,7 @@ spec:
{{ if .delivery.backoffDelay }}
backoffDelay: "{{ .delivery.backoffDelay}}"
{{ end }}
{{ if .delivery.format }}
format: {{ .delivery.format }}
{{ end }}
{{ end }}
12 changes: 12 additions & 0 deletions test/rekt/trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,18 @@ func TestTriggerDependencyAnnotation(t *testing.T) {
env.Test(ctx, t, trigger.TriggerDependencyAnnotation())
}

func TestTriggerDeliveryFormat(t *testing.T) {
ctx, env := global.Environment(
knative.WithKnativeNamespace(system.Namespace()),
knative.WithLoggingConfig,
knative.WithTracingConfig,
k8s.WithEventListener,
environment.Managed(t),
)

env.TestSet(ctx, t, trigger.TriggerSupportsDeliveryFormat())
}

func TestTriggerTLSSubscriber(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit da10b71

Please sign in to comment.