Skip to content

Commit

Permalink
contrib/segmentio/kafka.go.v0: add DSM support
Browse files Browse the repository at this point in the history
Signed-off-by: Adrien Fillon <[email protected]>
  • Loading branch information
adrien-f committed Mar 21, 2024
1 parent 485e60e commit a2600a1
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 11 deletions.
81 changes: 81 additions & 0 deletions contrib/segmentio/kafka.go.v0/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"math"
"strings"

"gopkg.in/DataDog/dd-trace-go.v1/datastreams"
"gopkg.in/DataDog/dd-trace-go.v1/datastreams/options"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
Expand Down Expand Up @@ -46,13 +48,17 @@ func WrapReader(c *kafka.Reader, opts ...Option) *Reader {
if c.Config().Brokers != nil {
wrapped.bootstrapServers = strings.Join(c.Config().Brokers, ",")
}

wrapped.groupId = c.Config().GroupID

log.Debug("contrib/segmentio/kafka-go.v0/kafka: Wrapping Reader: %#v", wrapped.cfg)
return wrapped
}

// A kafkaConfig struct holds information from the kafka config for span tags
type kafkaConfig struct {
bootstrapServers string
groupId string
}

// A Reader wraps a kafka.Reader.
Expand Down Expand Up @@ -115,6 +121,7 @@ func (r *Reader) ReadMessage(ctx context.Context) (kafka.Message, error) {
return kafka.Message{}, err
}
r.prev = r.startSpan(ctx, &msg)
setConsumeCheckpoint(r.cfg.dataStreamsEnabled, r.groupId, &msg)
return msg, nil
}

Expand All @@ -129,9 +136,35 @@ func (r *Reader) FetchMessage(ctx context.Context) (kafka.Message, error) {
return msg, err
}
r.prev = r.startSpan(ctx, &msg)
setConsumeCheckpoint(r.cfg.dataStreamsEnabled, r.groupId, &msg)
return msg, nil
}

func setConsumeCheckpoint(enabled bool, groupID string, msg *kafka.Message) {
if !enabled || msg == nil {
return
}
edges := []string{"direction:in", "topic:" + msg.Topic, "type:kafka"}
if groupID != "" {
edges = append(edges, "group:"+groupID)
}
carrier := messageCarrier{msg}
ctx, ok := tracer.SetDataStreamsCheckpointWithParams(
datastreams.ExtractFromBase64Carrier(context.Background(), carrier),
options.CheckpointParams{PayloadSize: getConsumerMsgSize(msg)},
edges...,
)
if !ok {
return
}
datastreams.InjectToBase64Carrier(ctx, carrier)
if groupID != "" {
// only track Kafka lag if a consumer group is set.
// since there is no ack mechanism, we consider that messages read are committed right away.
tracer.TrackKafkaCommitOffset(groupID, msg.Topic, int32(msg.Partition), msg.Offset)
}
}

// WrapWriter wraps a kafka.Writer so requests are traced.
func WrapWriter(w *kafka.Writer, opts ...Option) *Writer {
writer := &Writer{
Expand Down Expand Up @@ -191,10 +224,58 @@ func (w *Writer) WriteMessages(ctx context.Context, msgs ...kafka.Message) error
spans := make([]ddtrace.Span, len(msgs))
for i := range msgs {
spans[i] = w.startSpan(ctx, &msgs[i])
setProduceCheckpoint(w.cfg.dataStreamsEnabled, &msgs[i], w.Writer)
}
err := w.Writer.WriteMessages(ctx, msgs...)
for i, span := range spans {
finishSpan(span, msgs[i].Partition, msgs[i].Offset, err)
}
return err
}

func setProduceCheckpoint(enabled bool, msg *kafka.Message, writer *kafka.Writer) {
if !enabled || msg == nil {
return
}

var topic string
if writer.Topic != "" {
topic = writer.Topic
} else {
topic = msg.Topic
}

edges := []string{"direction:out", "topic:" + topic, "type:kafka"}
carrier := messageCarrier{msg}
ctx, ok := tracer.SetDataStreamsCheckpointWithParams(
datastreams.ExtractFromBase64Carrier(context.Background(), carrier),
options.CheckpointParams{PayloadSize: getProducerMsgSize(msg)},
edges...,
)
if !ok {
return
}

// Headers will be dropped if the current protocol does not support them
datastreams.InjectToBase64Carrier(ctx, carrier)
}

func getProducerMsgSize(msg *kafka.Message) (size int64) {
for _, header := range msg.Headers {
size += int64(len(header.Key) + len(header.Value))
}
if msg.Value != nil {
size += int64(len(msg.Value))
}
if msg.Key != nil {
size += int64(len(msg.Key))
}
return size
}

func getConsumerMsgSize(msg *kafka.Message) (size int64) {
for _, header := range msg.Headers {
size += int64(len(header.Key) + len(header.Value))
}
return size + int64(len(msg.Value)+len(msg.Key))
}
84 changes: 73 additions & 11 deletions contrib/segmentio/kafka.go.v0/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import (
"time"

"gopkg.in/DataDog/dd-trace-go.v1/contrib/internal/namingschematest"
"gopkg.in/DataDog/dd-trace-go.v1/datastreams"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"

kafka "github.com/segmentio/kafka-go"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -48,16 +50,19 @@ to setup the integration test locally run:

type readerOpFn func(t *testing.T, r *Reader)

func genIntegrationTestSpans(t *testing.T, writerOp func(t *testing.T, w *Writer), readerOp readerOpFn, writerOpts []Option, readerOpts []Option) []mocktracer.Span {
func genIntegrationTestSpans(t *testing.T, mt mocktracer.Tracer, writerOp func(t *testing.T, w *Writer), readerOp readerOpFn, writerOpts []Option, readerOpts []Option) ([]mocktracer.Span, []kafka.Message) {
skipIntegrationTest(t)
mt := mocktracer.Start()
defer mt.Stop()

writtenMessages := []kafka.Message{}

// add some dummy values to broker/addr to test bootstrap servers.
kw := &kafka.Writer{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Topic: testTopic,
RequiredAcks: kafka.RequireOne,
Completion: func(messages []kafka.Message, err error) {
writtenMessages = append(writtenMessages, messages...)
},
}
w := WrapWriter(kw, writerOpts...)
writerOp(t, w)
Expand All @@ -78,12 +83,18 @@ func genIntegrationTestSpans(t *testing.T, writerOp func(t *testing.T, w *Writer
require.Len(t, spans, 2)
// they should be linked via headers
assert.Equal(t, spans[0].TraceID(), spans[1].TraceID(), "Trace IDs should match")
return spans
return spans, writtenMessages
}

func TestReadMessageFunctional(t *testing.T) {
spans := genIntegrationTestSpans(
mt := mocktracer.Start()
defer mt.Stop()

readMessages := []kafka.Message{}

spans, writtenMessages := genIntegrationTestSpans(
t,
mt,
func(t *testing.T, w *Writer) {
err := w.WriteMessages(context.Background(), testMessages...)
require.NoError(t, err, "Expected to write message to topic")
Expand All @@ -95,13 +106,17 @@ func TestReadMessageFunctional(t *testing.T) {
require.NoError(t, err, "Expected to consume message")
assert.Equal(t, testMessages[0].Value, readMsg.Value, "Values should be equal")

readMessages = append(readMessages, readMsg)
err = r.CommitMessages(context.Background(), readMsg)
assert.NoError(t, err, "Expected CommitMessages to not return an error")
},
[]Option{WithAnalyticsRate(0.1)},
[]Option{},
[]Option{WithAnalyticsRate(0.1), WithDataStreams()},
[]Option{WithDataStreams()},
)

assert.Len(t, writtenMessages, len(testMessages))
assert.Len(t, readMessages, len(testMessages))

// producer span
s0 := spans[0]
assert.Equal(t, "kafka.produce", s0.OperationName())
Expand All @@ -115,6 +130,13 @@ func TestReadMessageFunctional(t *testing.T) {
assert.Equal(t, "kafka", s0.Tag(ext.MessagingSystem))
assert.Equal(t, "localhost:9092,localhost:9093,localhost:9094", s0.Tag(ext.KafkaBootstrapServers))

p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), messageCarrier{&writtenMessages[0]}))
assert.True(t, ok)
expectedCtx, _ := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:out", "topic:"+testTopic, "type:kafka")
expected, _ := datastreams.PathwayFromContext(expectedCtx)
assert.NotEqual(t, expected.GetHash(), 0)
assert.Equal(t, expected.GetHash(), p.GetHash())

// consumer span
s1 := spans[1]
assert.Equal(t, "kafka.consume", s1.OperationName())
Expand All @@ -127,11 +149,27 @@ func TestReadMessageFunctional(t *testing.T) {
assert.Equal(t, ext.SpanKindConsumer, s1.Tag(ext.SpanKind))
assert.Equal(t, "kafka", s1.Tag(ext.MessagingSystem))
assert.Equal(t, "localhost:9092,localhost:9093,localhost:9094", s1.Tag(ext.KafkaBootstrapServers))

p, ok = datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), messageCarrier{&readMessages[0]}))
assert.True(t, ok)
expectedCtx, _ = tracer.SetDataStreamsCheckpoint(
datastreams.ExtractFromBase64Carrier(context.Background(), messageCarrier{&writtenMessages[0]}),
"direction:in", "topic:"+testTopic, "type:kafka", "group:"+testGroupID,
)
expected, _ = datastreams.PathwayFromContext(expectedCtx)
assert.NotEqual(t, expected.GetHash(), 0)
assert.Equal(t, expected.GetHash(), p.GetHash())
}

func TestFetchMessageFunctional(t *testing.T) {
spans := genIntegrationTestSpans(
mt := mocktracer.Start()
defer mt.Stop()

readMessages := []kafka.Message{}

spans, writtenMessages := genIntegrationTestSpans(
t,
mt,
func(t *testing.T, w *Writer) {
err := w.WriteMessages(context.Background(), testMessages...)
require.NoError(t, err, "Expected to write message to topic")
Expand All @@ -143,11 +181,12 @@ func TestFetchMessageFunctional(t *testing.T) {
require.NoError(t, err, "Expected to consume message")
assert.Equal(t, testMessages[0].Value, readMsg.Value, "Values should be equal")

readMessages = append(readMessages, readMsg)
err = r.CommitMessages(context.Background(), readMsg)
assert.NoError(t, err, "Expected CommitMessages to not return an error")
},
[]Option{WithAnalyticsRate(0.1)},
[]Option{},
[]Option{WithAnalyticsRate(0.1), WithDataStreams()},
[]Option{WithDataStreams()},
)

// producer span
Expand All @@ -163,6 +202,13 @@ func TestFetchMessageFunctional(t *testing.T) {
assert.Equal(t, "kafka", s0.Tag(ext.MessagingSystem))
assert.Equal(t, "localhost:9092,localhost:9093,localhost:9094", s0.Tag(ext.KafkaBootstrapServers))

p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), messageCarrier{&writtenMessages[0]}))
assert.True(t, ok)
expectedCtx, _ := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:out", "topic:"+testTopic, "type:kafka")
expected, _ := datastreams.PathwayFromContext(expectedCtx)
assert.NotEqual(t, expected.GetHash(), 0)
assert.Equal(t, expected.GetHash(), p.GetHash())

// consumer span
s1 := spans[1]
assert.Equal(t, "kafka.consume", s1.OperationName())
Expand All @@ -175,6 +221,16 @@ func TestFetchMessageFunctional(t *testing.T) {
assert.Equal(t, ext.SpanKindConsumer, s1.Tag(ext.SpanKind))
assert.Equal(t, "kafka", s1.Tag(ext.MessagingSystem))
assert.Equal(t, "localhost:9092,localhost:9093,localhost:9094", s1.Tag(ext.KafkaBootstrapServers))

p, ok = datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), messageCarrier{&readMessages[0]}))
assert.True(t, ok)
expectedCtx, _ = tracer.SetDataStreamsCheckpoint(
datastreams.ExtractFromBase64Carrier(context.Background(), messageCarrier{&writtenMessages[0]}),
"direction:in", "topic:"+testTopic, "type:kafka", "group:"+testGroupID,
)
expected, _ = datastreams.PathwayFromContext(expectedCtx)
assert.NotEqual(t, expected.GetHash(), 0)
assert.Equal(t, expected.GetHash(), p.GetHash())
}

func TestNamingSchema(t *testing.T) {
Expand All @@ -183,8 +239,13 @@ func TestNamingSchema(t *testing.T) {
if serviceOverride != "" {
opts = append(opts, WithServiceName(serviceOverride))
}
return genIntegrationTestSpans(

mt := mocktracer.Start()
defer mt.Stop()

spans, _ := genIntegrationTestSpans(
t,
mt,
func(t *testing.T, w *Writer) {
err := w.WriteMessages(context.Background(), testMessages...)
require.NoError(t, err, "Expected to write message to topic")
Expand All @@ -202,6 +263,7 @@ func TestNamingSchema(t *testing.T) {
opts,
opts,
)
return spans
}
namingschematest.NewKafkaTest(genSpans)(t)
}
Expand Down
10 changes: 10 additions & 0 deletions contrib/segmentio/kafka.go.v0/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type config struct {
consumerSpanName string
producerSpanName string
analyticsRate float64
dataStreamsEnabled bool
}

// An Option customizes the config.
Expand All @@ -34,6 +35,8 @@ func newConfig(opts ...Option) *config {
cfg.analyticsRate = 1.0
}

cfg.dataStreamsEnabled = internal.BoolEnv("DD_DATA_STREAMS_ENABLED", false)

cfg.consumerServiceName = namingschema.ServiceName(defaultServiceName)
cfg.producerServiceName = namingschema.ServiceNameOverrideV0(defaultServiceName, defaultServiceName)
cfg.consumerSpanName = namingschema.OpName(namingschema.KafkaInbound)
Expand Down Expand Up @@ -75,3 +78,10 @@ func WithAnalyticsRate(rate float64) Option {
}
}
}

// WithDataStreams enables the Data Streams monitoring product features: https://www.datadoghq.com/product/data-streams-monitoring/
func WithDataStreams() Option {
return func(cfg *config) {
cfg.dataStreamsEnabled = true
}
}
13 changes: 13 additions & 0 deletions contrib/segmentio/kafka.go.v0/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,17 @@ func TestAnalyticsSettings(t *testing.T) {
cfg := newConfig(WithAnalyticsRate(0.2))
assert.Equal(t, 0.2, cfg.analyticsRate)
})

t.Run("withEnv", func(t *testing.T) {
t.Setenv("DD_DATA_STREAMS_ENABLED", "true")
cfg := newConfig()
assert.True(t, cfg.dataStreamsEnabled)
})

t.Run("optionOverridesEnv", func(t *testing.T) {
t.Setenv("DD_DATA_STREAMS_ENABLED", "false")
cfg := newConfig()
WithDataStreams()(cfg)
assert.True(t, cfg.dataStreamsEnabled)
})
}

0 comments on commit a2600a1

Please sign in to comment.