Skip to content

Commit

Permalink
contrib/segmentio/kafka.go.v0: add DSM support
Browse files Browse the repository at this point in the history
Signed-off-by: Adrien Fillon <[email protected]>
  • Loading branch information
adrien-f committed Mar 22, 2024
1 parent 485e60e commit 7a9c514
Show file tree
Hide file tree
Showing 4 changed files with 218 additions and 28 deletions.
81 changes: 81 additions & 0 deletions contrib/segmentio/kafka.go.v0/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"math"
"strings"

"gopkg.in/DataDog/dd-trace-go.v1/datastreams"
"gopkg.in/DataDog/dd-trace-go.v1/datastreams/options"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
Expand Down Expand Up @@ -46,13 +48,17 @@ func WrapReader(c *kafka.Reader, opts ...Option) *Reader {
if c.Config().Brokers != nil {
wrapped.bootstrapServers = strings.Join(c.Config().Brokers, ",")
}

wrapped.groupID = c.Config().GroupID

log.Debug("contrib/segmentio/kafka-go.v0/kafka: Wrapping Reader: %#v", wrapped.cfg)
return wrapped
}

// A kafkaConfig struct holds information from the kafka config for span tags
type kafkaConfig struct {
bootstrapServers string
groupID string
}

// A Reader wraps a kafka.Reader.
Expand Down Expand Up @@ -115,6 +121,7 @@ func (r *Reader) ReadMessage(ctx context.Context) (kafka.Message, error) {
return kafka.Message{}, err
}
r.prev = r.startSpan(ctx, &msg)
setConsumeCheckpoint(r.cfg.dataStreamsEnabled, r.groupID, &msg)
return msg, nil
}

Expand All @@ -129,9 +136,35 @@ func (r *Reader) FetchMessage(ctx context.Context) (kafka.Message, error) {
return msg, err
}
r.prev = r.startSpan(ctx, &msg)
setConsumeCheckpoint(r.cfg.dataStreamsEnabled, r.groupID, &msg)
return msg, nil
}

func setConsumeCheckpoint(enabled bool, groupID string, msg *kafka.Message) {
if !enabled || msg == nil {
return
}
edges := []string{"direction:in", "topic:" + msg.Topic, "type:kafka"}
if groupID != "" {
edges = append(edges, "group:"+groupID)
}
carrier := messageCarrier{msg}
ctx, ok := tracer.SetDataStreamsCheckpointWithParams(
datastreams.ExtractFromBase64Carrier(context.Background(), carrier),
options.CheckpointParams{PayloadSize: getConsumerMsgSize(msg)},
edges...,
)
if !ok {
return
}
datastreams.InjectToBase64Carrier(ctx, carrier)
if groupID != "" {
// only track Kafka lag if a consumer group is set.
// since there is no ack mechanism, we consider that messages read are committed right away.
tracer.TrackKafkaCommitOffset(groupID, msg.Topic, int32(msg.Partition), msg.Offset)
}
}

// WrapWriter wraps a kafka.Writer so requests are traced.
func WrapWriter(w *kafka.Writer, opts ...Option) *Writer {
writer := &Writer{
Expand Down Expand Up @@ -191,10 +224,58 @@ func (w *Writer) WriteMessages(ctx context.Context, msgs ...kafka.Message) error
spans := make([]ddtrace.Span, len(msgs))
for i := range msgs {
spans[i] = w.startSpan(ctx, &msgs[i])
setProduceCheckpoint(w.cfg.dataStreamsEnabled, &msgs[i], w.Writer)
}
err := w.Writer.WriteMessages(ctx, msgs...)
for i, span := range spans {
finishSpan(span, msgs[i].Partition, msgs[i].Offset, err)
}
return err
}

func setProduceCheckpoint(enabled bool, msg *kafka.Message, writer *kafka.Writer) {
if !enabled || msg == nil {
return
}

var topic string
if writer.Topic != "" {
topic = writer.Topic
} else {
topic = msg.Topic
}

edges := []string{"direction:out", "topic:" + topic, "type:kafka"}
carrier := messageCarrier{msg}
ctx, ok := tracer.SetDataStreamsCheckpointWithParams(
datastreams.ExtractFromBase64Carrier(context.Background(), carrier),
options.CheckpointParams{PayloadSize: getProducerMsgSize(msg)},
edges...,
)
if !ok {
return
}

// Headers will be dropped if the current protocol does not support them
datastreams.InjectToBase64Carrier(ctx, carrier)
}

func getProducerMsgSize(msg *kafka.Message) (size int64) {
for _, header := range msg.Headers {
size += int64(len(header.Key) + len(header.Value))
}
if msg.Value != nil {
size += int64(len(msg.Value))
}
if msg.Key != nil {
size += int64(len(msg.Key))
}
return size
}

func getConsumerMsgSize(msg *kafka.Message) (size int64) {
for _, header := range msg.Headers {
size += int64(len(header.Key) + len(header.Value))
}
return size + int64(len(msg.Value)+len(msg.Key))
}
Loading

0 comments on commit 7a9c514

Please sign in to comment.