Skip to content

Commit

Permalink
feat(triggers/kafka): produce optional Kafka Message Headers
Browse files Browse the repository at this point in the history
Signed-off-by: Pepe Barbe <[email protected]>
  • Loading branch information
elventear committed Nov 20, 2024
1 parent 7f0a127 commit 36c8d76
Show file tree
Hide file tree
Showing 10 changed files with 1,246 additions and 812 deletions.
14 changes: 14 additions & 0 deletions api/jsonschema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -2519,6 +2519,13 @@
"format": "int32",
"type": "integer"
},
"headers": {
"additionalProperties": {
"type": "string"
},
"description": "Headers for the Kafka Messages.",
"type": "object"
},
"parameters": {
"description": "Parameters is the list of parameters that is applied to resolved Kafka trigger object.",
"items": {
Expand Down Expand Up @@ -2555,6 +2562,13 @@
"$ref": "#/definitions/io.argoproj.events.v1alpha1.SchemaRegistryConfig",
"description": "Schema Registry configuration to producer message with avro format"
},
"secureHeaders": {
"description": "Secure Headers stored in Kubernetes Secrets for the Kafka messages.",
"items": {
"$ref": "#/definitions/io.argoproj.events.v1alpha1.SecureHeader"
},
"type": "array"
},
"tls": {
"$ref": "#/definitions/io.argoproj.events.v1alpha1.TLSConfig",
"description": "TLS configuration for the Kafka producer."
Expand Down
14 changes: 14 additions & 0 deletions api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 41 additions & 0 deletions docs/APIs.md
Original file line number Diff line number Diff line change
Expand Up @@ -12369,6 +12369,47 @@ Schema Registry configuration to producer message with avro format

</tr>

<tr>

<td>

<code>headers</code></br> <em> map\[string\]string </em>
</td>

<td>

<em>(Optional)</em>
<p>

Headers for the Kafka Messages.
</p>

</td>

</tr>

<tr>

<td>

<code>secureHeaders</code></br> <em>
<a href="#argoproj.io/v1alpha1.*github.com/argoproj/argo-events/pkg/apis/events/v1alpha1.SecureHeader">
\[\]\*github.com/argoproj/argo-events/pkg/apis/events/v1alpha1.SecureHeader
</a> </em>
</td>

<td>

<em>(Optional)</em>
<p>

Secure Headers stored in Kubernetes Secrets for the Kafka messages.
</p>

</td>

</tr>

</tbody>

</table>
Expand Down
1,851 changes: 1,042 additions & 809 deletions pkg/apis/events/v1alpha1/generated.pb.go

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions pkg/apis/events/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 30 additions & 1 deletion pkg/apis/events/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions pkg/apis/events/v1alpha1/sensor_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,12 @@ type KafkaTrigger struct {
// Schema Registry configuration to producer message with avro format
// +optional
SchemaRegistry *SchemaRegistryConfig `json:"schemaRegistry,omitempty" protobuf:"bytes,13,opt,name=schemaRegistry"`
// Headers for the Kafka Messages.
// +optional
Headers map[string]string `json:"headers,omitempty" protobuf:"bytes,14,rep,name=headers"`
// Secure Headers stored in Kubernetes Secrets for the Kafka messages.
// +optional
SecureHeaders []*SecureHeader `json:"secureHeaders,omitempty" protobuf:"bytes,15,rep,name=secureHeaders"`
}

// SchemaRegistryConfig refers to configuration for a client
Expand Down
18 changes: 18 additions & 0 deletions pkg/apis/events/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 40 additions & 0 deletions pkg/sensors/triggers/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,50 @@ func (t *KafkaTrigger) Execute(ctx context.Context, events map[string]*v1alpha1.
}
}

numHeaders := 0
if trigger.Headers != nil {
numHeaders += len(trigger.Headers)
}
if trigger.SecureHeaders != nil {
numHeaders += len(trigger.SecureHeaders)
}

msg := &sarama.ProducerMessage{
Topic: trigger.Topic,
Value: sarama.ByteEncoder(payload),
Timestamp: time.Now().UTC(),
Headers: make([]sarama.RecordHeader, numHeaders),
}

headerIndex := 0
if trigger.Headers != nil {
for k, v := range trigger.Headers {
msg.Headers[headerIndex] = sarama.RecordHeader{
Key: []byte(k),
Value: []byte(v),
}
headerIndex++
}
}

if trigger.SecureHeaders != nil {
for _, secure := range trigger.SecureHeaders {
var value string
var err error
if secure.ValueFrom.SecretKeyRef != nil {
value, err = sharedutil.GetSecretFromVolume(secure.ValueFrom.SecretKeyRef)
} else {
value, err = sharedutil.GetConfigMapFromVolume(secure.ValueFrom.ConfigMapKeyRef)
}
if err != nil {
return nil, fmt.Errorf("failed to retrieve the value for secureHeader, %w", err)
}
msg.Headers[headerIndex] = sarama.RecordHeader{
Key: []byte(secure.Name),
Value: []byte(value),
}
headerIndex++
}
}

if trigger.PartitioningKey != nil {
Expand Down
35 changes: 33 additions & 2 deletions pkg/sensors/triggers/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ package kafka
import (
"context"
"testing"
"time"

corev1 "k8s.io/api/core/v1"

"github.com/IBM/sarama"
"github.com/IBM/sarama/mocks"
Expand Down Expand Up @@ -111,7 +114,18 @@ func TestKafkaTrigger_ApplyResourceParameters(t *testing.T) {
}

defaultValue := "http://default.com"
secureHeader := &v1alpha1.SecureHeader{Name: "test", ValueFrom: &v1alpha1.ValueFromSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: "tokens",
},
Key: "serviceToken"}},
}

secureHeaders := []*v1alpha1.SecureHeader{}
secureHeaders = append(secureHeaders, secureHeader)
trigger.Trigger.Template.Kafka.Headers = map[string]string{"key": "value"}
trigger.Trigger.Template.Kafka.SecureHeaders = secureHeaders
trigger.Trigger.Template.Kafka.Parameters = []v1alpha1.TriggerParameter{
{
Src: &v1alpha1.TriggerParameterSource{
Expand All @@ -128,13 +142,18 @@ func TestKafkaTrigger_ApplyResourceParameters(t *testing.T) {
assert.NotNil(t, resource)

updatedTrigger, ok := resource.(*v1alpha1.KafkaTrigger)
assert.Equal(t, "value", updatedTrigger.Headers["key"])
assert.Equal(t, "serviceToken", updatedTrigger.SecureHeaders[0].ValueFrom.SecretKeyRef.Key)
assert.Nil(t, err)
assert.Equal(t, true, ok)
assert.Equal(t, "another-fake-kafka-url", updatedTrigger.URL)
}

func TestKafkaTrigger_Execute(t *testing.T) {
producer := mocks.NewAsyncProducer(t, nil)
config := sarama.NewConfig()
config.Producer.Return.Successes = true

producer := mocks.NewAsyncProducer(t, config)
producers := sharedutil.NewStringKeyedMap[sarama.AsyncProducer]()
producers.Store("fake-trigger", producer)
trigger, err := getFakeKafkaTrigger(producers)
Expand All @@ -154,7 +173,7 @@ func TestKafkaTrigger_Execute(t *testing.T) {
}

defaultValue := "hello"

trigger.Trigger.Template.Kafka.Headers = map[string]string{"key1": "value1", "key2": "value2"}
trigger.Trigger.Template.Kafka.Payload = []v1alpha1.TriggerParameter{
{
Src: &v1alpha1.TriggerParameterSource{
Expand All @@ -171,4 +190,16 @@ func TestKafkaTrigger_Execute(t *testing.T) {
result, err := trigger.Execute(context.TODO(), testEvents, trigger.Trigger.Template.Kafka)
assert.Nil(t, err)
assert.Nil(t, result)

select {
case kafkaMessage := <-producer.Successes():
assert.NotNil(t, kafkaMessage)
assert.Equal(t, 2, len(kafkaMessage.Headers))
assert.Equal(t, "key1", string(kafkaMessage.Headers[0].Key))
assert.Equal(t, "value1", string(kafkaMessage.Headers[0].Value))
assert.Equal(t, "key2", string(kafkaMessage.Headers[1].Key))
assert.Equal(t, "value2", string(kafkaMessage.Headers[1].Value))
case <-time.After(1 * time.Second):
assert.Fail(t, "timed out waiting for message to contain headers")
}
}

0 comments on commit 36c8d76

Please sign in to comment.