diff --git a/changelog.md b/changelog.md index aad82a4..de7e84a 100644 --- a/changelog.md +++ b/changelog.md @@ -4,7 +4,11 @@ All notable changes to this project will be documented in this file. This project adheres to Semantic Versioning. -## 2.0.5 (Jan 14, 2024) +## 2.1.0 (Jan 23, 2025) + +1. Include `DisableTracePropagation` as a WriterOption + +## 2.0.5 (Jan 14, 2025) 1. Updated confluent-kafka-go-2.8.0 diff --git a/test/integration_test.go b/test/integration_test.go index 081c13f..5a05096 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -18,12 +18,18 @@ import ( "github.com/stretchr/testify/require" "github.com/zillow/zfmt" "github.com/zillow/zkafka/v2" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/trace/noop" "golang.org/x/sync/errgroup" ) // TestKafkaClientsCanReadOwnWritesAndBehaveProperlyAfterRestart will test that a kafka consumer can properly read messages // written by the kafka producer. It will additionally, confirm that when a group is restarted that it starts off where -// it left off (addressing an off by 1 bug seen with an earlier version) +// it left off (addressing an off by 1 bug seen with an earlier version). +// Finally it'll confirm that otel propagation can be used to manipulate written contents of kafka.Message +// And that options exist to create a zkafka.Writer which disables that propagation. This is done by including a propagator +// which always adds the header "kobe"="bryant". Writer1 uses this propagator, writer2 disables it. Assertions are made +// on the presence of the header. // // The following steps are followed // 1. Create a new consumer group that is reading from the topic @@ -43,14 +49,34 @@ func TestKafkaClientsCanReadOwnWritesAndBehaveProperlyAfterRestart(t *testing.T) groupID := uuid.NewString() - client := zkafka.NewClient(zkafka.Config{BootstrapServers: []string{bootstrapServer}}, zkafka.LoggerOption(stdLogger{})) + client := zkafka.NewClient( + zkafka.Config{BootstrapServers: []string{bootstrapServer}}, + zkafka.LoggerOption(stdLogger{}), + zkafka.WithClientTextMapPropagator(fakeTextMapPropagator{ + inject: func(ctx context.Context, carrier propagation.TextMapCarrier) { + carrier.Set("kobe", "bryant") + }, + }), + zkafka.WithClientTracerProviderOption(noop.NewTracerProvider()), + ) defer func() { require.NoError(t, client.Close()) }() - writer, err := client.Writer(ctx, zkafka.ProducerTopicConfig{ - ClientID: fmt.Sprintf("writer-%s-%s", t.Name(), uuid.NewString()), + writer1, err := client.Writer(ctx, zkafka.ProducerTopicConfig{ + ClientID: fmt.Sprintf("writer1-%s-%s", t.Name(), uuid.NewString()), + Topic: topic, + Formatter: zfmt.JSONFmt, + }) + require.NoError(t, err) + + writer2, err := client.Writer(ctx, zkafka.ProducerTopicConfig{ + ClientID: fmt.Sprintf("writer2-%s-%s", t.Name(), uuid.NewString()), Topic: topic, Formatter: zfmt.JSONFmt, + }, func(settings *zkafka.WriterSettings) { + settings.DisableTracePropagation = true }) + require.NoError(t, err) + consumerTopicConfig := zkafka.ConsumerTopicConfig{ ClientID: fmt.Sprintf("reader-%s-%s", t.Name(), uuid.NewString()), Topic: topic, @@ -111,11 +137,11 @@ func TestKafkaClientsCanReadOwnWritesAndBehaveProperlyAfterRestart(t *testing.T) }, readResponses) // write msg1, and msg2 - resWrite1, err := writer.Write(ctx, msg1) + resWrite1, err := writer1.Write(ctx, msg1) require.NoError(t, err) msg2 := Msg{Val: "2"} - resWrite2, err := writer.Write(ctx, msg2) + resWrite2, err := writer2.Write(ctx, msg2) require.NoError(t, err) // reader will send on channel the messages it has read (should just be msg1) @@ -125,6 +151,7 @@ func TestKafkaClientsCanReadOwnWritesAndBehaveProperlyAfterRestart(t *testing.T) require.NoError(t, resp.err) require.NotNil(t, rmsg1, "expected written message to be read") require.Equal(t, int(rmsg1.Offset), int(resWrite1.Offset), "expected read offset to match written") + require.Equal(t, "bryant", string(rmsg1.Headers["kobe"])) gotMsg1 := Msg{} err = resp.msg.Decode(&gotMsg1) @@ -151,6 +178,7 @@ func TestKafkaClientsCanReadOwnWritesAndBehaveProperlyAfterRestart(t *testing.T) // assert offset is for second message written (no replay of old message) require.Equal(t, int(rmsg2.Offset), int(resWrite2.Offset), "expected read offset to match written") + require.Empty(t, rmsg2.Headers["kobe"]) gotMsg2 := Msg{} err = rmsg2.Decode(&gotMsg2) @@ -1285,7 +1313,7 @@ func Test_DeadletterClientDoesntCollideWithProducer(t *testing.T) { if msgCount.Load()%2 == 0 { return errors.New("random error occurred") } - _, err := processorWriter.WriteRaw(ctx, nil, msg.Value()) + _, err := processorWriter.WriteRaw(ctx, nil, msg.Value(), nil) return err }) @@ -1458,3 +1486,23 @@ type partition struct { offset int64 topic string } + +type fakeTextMapPropagator struct { + inject func(ctx context.Context, carrier propagation.TextMapCarrier) +} + +func (f fakeTextMapPropagator) Inject(ctx context.Context, carrier propagation.TextMapCarrier) { + if f.inject != nil { + f.inject(ctx, carrier) + } +} + +func (f fakeTextMapPropagator) Extract(ctx context.Context, carrier propagation.TextMapCarrier) context.Context { + return ctx +} + +func (f fakeTextMapPropagator) Fields() []string { + return nil +} + +var _ propagation.TextMapPropagator = (*fakeTextMapPropagator)(nil) diff --git a/writer.go b/writer.go index 6880ae8..a5cf9c2 100644 --- a/writer.go +++ b/writer.go @@ -100,6 +100,9 @@ func newWriter(args writerArgs) (*KWriter, error) { for _, opt := range args.opts { opt(&s) } + if s.DisableTracePropagation { + w.p = nil + } if s.f != nil { w.formatter = s.f } @@ -130,6 +133,9 @@ func (w *KWriter) WriteKey(ctx context.Context, key string, value any, opts ...W func (w *KWriter) WriteRaw(ctx context.Context, key *string, value []byte, opts ...WriteOption) (Response, error) { kafkaMessage := makeProducerMessageRaw(ctx, w.topicConfig.Topic, key, value) for _, opt := range opts { + if opt == nil { + continue + } opt.apply(&kafkaMessage) } if w.lifecycle.PreWrite != nil { @@ -248,7 +254,8 @@ func (w *KWriter) Close() { } type WriterSettings struct { - f kFormatter + f kFormatter + DisableTracePropagation bool } // WriterOption is a function that modify the writer configurations