Skip to content

Commit

Permalink
Merge branch 'main' into add-inferred-proxy-spans
Browse files Browse the repository at this point in the history
  • Loading branch information
zarirhamza authored Jan 15, 2025
2 parents a9aeec2 + 471d723 commit 43ff23f
Show file tree
Hide file tree
Showing 36 changed files with 1,588 additions and 800 deletions.
100 changes: 100 additions & 0 deletions contrib/IBM/sarama.v1/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package sarama

import (
"context"
"github.com/IBM/sarama"

"gopkg.in/DataDog/dd-trace-go.v1/datastreams"
"gopkg.in/DataDog/dd-trace-go.v1/datastreams/options"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
)

type partitionConsumer struct {
sarama.PartitionConsumer
dispatcher dispatcher
}

// Messages returns the read channel for the messages that are returned by
// the broker.
func (pc *partitionConsumer) Messages() <-chan *sarama.ConsumerMessage {
return pc.dispatcher.Messages()
}

// WrapPartitionConsumer wraps a sarama.PartitionConsumer causing each received
// message to be traced.
func WrapPartitionConsumer(pc sarama.PartitionConsumer, opts ...Option) sarama.PartitionConsumer {
cfg := new(config)
defaults(cfg)
for _, opt := range opts {
opt(cfg)
}
log.Debug("contrib/IBM/sarama: Wrapping Partition Consumer: %#v", cfg)

d := wrapDispatcher(pc, cfg)
go d.Run()

wrapped := &partitionConsumer{
PartitionConsumer: pc,
dispatcher: d,
}
return wrapped
}

type consumer struct {
sarama.Consumer
opts []Option
}

// ConsumePartition invokes Consumer.ConsumePartition and wraps the resulting
// PartitionConsumer.
func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error) {
pc, err := c.Consumer.ConsumePartition(topic, partition, offset)
if err != nil {
return pc, err
}
return WrapPartitionConsumer(pc, c.opts...), nil
}

// WrapConsumer wraps a sarama.Consumer wrapping any PartitionConsumer created
// via Consumer.ConsumePartition.
func WrapConsumer(c sarama.Consumer, opts ...Option) sarama.Consumer {
return &consumer{
Consumer: c,
opts: opts,
}
}

func setConsumeCheckpoint(enabled bool, groupID string, msg *sarama.ConsumerMessage) {
if !enabled || msg == nil {
return
}
edges := []string{"direction:in", "topic:" + msg.Topic, "type:kafka"}
if groupID != "" {
edges = append(edges, "group:"+groupID)
}
carrier := NewConsumerMessageCarrier(msg)

ctx, ok := tracer.SetDataStreamsCheckpointWithParams(datastreams.ExtractFromBase64Carrier(context.Background(), carrier), options.CheckpointParams{PayloadSize: getConsumerMsgSize(msg)}, edges...)
if !ok {
return
}
datastreams.InjectToBase64Carrier(ctx, carrier)
if groupID != "" {
// only track Kafka lag if a consumer group is set.
// since there is no ack mechanism, we consider that messages read are committed right away.
tracer.TrackKafkaCommitOffset(groupID, msg.Topic, msg.Partition, msg.Offset)
}
}

func getConsumerMsgSize(msg *sarama.ConsumerMessage) (size int64) {
for _, header := range msg.Headers {
size += int64(len(header.Key) + len(header.Value))
}
return size + int64(len(msg.Value)+len(msg.Key))
}
54 changes: 54 additions & 0 deletions contrib/IBM/sarama.v1/consumer_group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package sarama

import (
"github.com/IBM/sarama"

"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
)

type consumerGroupHandler struct {
sarama.ConsumerGroupHandler
cfg *config
}

func (h *consumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// Wrap claim
wd := wrapDispatcher(claim, h.cfg)
go wd.Run()
claim = &consumerGroupClaim{
ConsumerGroupClaim: claim,
dispatcher: wd,
}

return h.ConsumerGroupHandler.ConsumeClaim(session, claim)
}

// WrapConsumerGroupHandler wraps a sarama.ConsumerGroupHandler causing each received
// message to be traced.
func WrapConsumerGroupHandler(handler sarama.ConsumerGroupHandler, opts ...Option) sarama.ConsumerGroupHandler {
cfg := new(config)
defaults(cfg)
for _, opt := range opts {
opt(cfg)
}
log.Debug("contrib/IBM/sarama: Wrapping Consumer Group Handler: %#v", cfg)

return &consumerGroupHandler{
ConsumerGroupHandler: handler,
cfg: cfg,
}
}

type consumerGroupClaim struct {
sarama.ConsumerGroupClaim
dispatcher dispatcher
}

func (c *consumerGroupClaim) Messages() <-chan *sarama.ConsumerMessage {
return c.dispatcher.Messages()
}
155 changes: 155 additions & 0 deletions contrib/IBM/sarama.v1/consumer_group_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package sarama

import (
"context"
"log"
"sync"
"testing"

"github.com/IBM/sarama"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer"
)

func TestWrapConsumerGroupHandler(t *testing.T) {
mt := mocktracer.Start()
defer mt.Stop()

cfg := sarama.NewConfig()
cfg.Version = sarama.V0_11_0_0 // first version that supports headers
cfg.Producer.Return.Successes = true
cfg.Producer.Flush.Messages = 1

cg, err := sarama.NewConsumerGroup(kafkaBrokers, testGroupID, cfg)
require.NoError(t, err)
defer func() {
assert.NoError(t, cg.Close())
}()

handler := &testConsumerGroupHandler{
T: t,
ready: make(chan bool),
rcvMessages: make(chan *sarama.ConsumerMessage, 1),
}
tracedHandler := WrapConsumerGroupHandler(handler, WithDataStreams(), WithGroupID(testGroupID))

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims
if err := cg.Consume(ctx, []string{testTopic}, tracedHandler); err != nil {
assert.ErrorIs(t, err, sarama.ErrClosedConsumerGroup)
return
}
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
return
}
}
}()

<-handler.ready // Await till the consumer has been set up
log.Println("Sarama consumer up and running!...")

p, err := sarama.NewSyncProducer(kafkaBrokers, cfg)
require.NoError(t, err)

require.NoError(t, err)
p = WrapSyncProducer(cfg, p, WithDataStreams())

produceMsg := &sarama.ProducerMessage{
Topic: testTopic,
Value: sarama.StringEncoder("test 1"),
Metadata: "test",
}
_, _, err = p.SendMessage(produceMsg)
require.NoError(t, err)

waitForSpans(mt, 2)
cancel()
wg.Wait()

spans := mt.FinishedSpans()
require.Len(t, spans, 2)
consumeMsg := <-handler.rcvMessages

s0 := spans[0]
assert.Equal(t, "kafka", s0.Tag(ext.ServiceName))
assert.Equal(t, "queue", s0.Tag(ext.SpanType))
assert.Equal(t, "Produce Topic gotest_ibm_sarama", s0.Tag(ext.ResourceName))
assert.Equal(t, "kafka.produce", s0.OperationName())
assert.Equal(t, int32(0), s0.Tag(ext.MessagingKafkaPartition))
assert.NotNil(t, s0.Tag("offset"))
assert.Equal(t, "IBM/sarama", s0.Tag(ext.Component))
assert.Equal(t, ext.SpanKindProducer, s0.Tag(ext.SpanKind))
assert.Equal(t, "kafka", s0.Tag(ext.MessagingSystem))

assertDSMProducerPathway(t, testTopic, produceMsg)

s1 := spans[1]
assert.Equal(t, "kafka", s1.Tag(ext.ServiceName))
assert.Equal(t, "queue", s1.Tag(ext.SpanType))
assert.Equal(t, "Consume Topic gotest_ibm_sarama", s1.Tag(ext.ResourceName))
assert.Equal(t, "kafka.consume", s1.OperationName())
assert.Equal(t, int32(0), s1.Tag(ext.MessagingKafkaPartition))
assert.NotNil(t, s1.Tag("offset"))
assert.Equal(t, "IBM/sarama", s1.Tag(ext.Component))
assert.Equal(t, ext.SpanKindConsumer, s1.Tag(ext.SpanKind))
assert.Equal(t, "kafka", s1.Tag(ext.MessagingSystem))

assertDSMConsumerPathway(t, testTopic, testGroupID, consumeMsg, true)

assert.Equal(t, s0.SpanID(), s1.ParentID(), "spans are not parent-child")
}

type testConsumerGroupHandler struct {
*testing.T
ready chan bool
rcvMessages chan *sarama.ConsumerMessage
}

func (t *testConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(t.ready)
return nil
}

func (t *testConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
return nil
}

func (t *testConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for {
select {
case msg, ok := <-claim.Messages():
if !ok {
t.T.Log("message channel was closed")
return nil
}
t.T.Logf("Message claimed: value = %s, timestamp = %v, topic = %s", string(msg.Value), msg.Timestamp, msg.Topic)
session.MarkMessage(msg, "")
t.rcvMessages <- msg

// Should return when `session.Context()` is done.
// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
// https://github.com/IBM/sarama/issues/1192
case <-session.Context().Done():
return nil
}
}
}
Loading

0 comments on commit 43ff23f

Please sign in to comment.