diff --git a/.travis.yml b/.travis.yml index 6be20070b..3d1a64ac6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,7 +4,7 @@ matrix: include: - go: tip - go: 1.8.x - env: EXCLUDE_DIRS="./instrumentation/instagrpc" + env: EXCLUDE_DIRS="./instrumentation/instagrpc ./instrumentation/instasarama" - go: 1.9.x - go: 1.10.x - go: 1.11.x diff --git a/README.md b/README.md index a47cb92b1..6b670c306 100644 --- a/README.md +++ b/README.md @@ -162,6 +162,10 @@ The provided `parentSpan` is the incoming request from the request handler (see [`github.com/instana/go-sensor/instrumentation/instagrpc`](./instrumentation/instagrpc) provides both unary and stream interceptors to instrument GRPC servers and clients that use `google.golang.org/grpc`. +### Kafka producers and consumers + +[`github.com/instana/go-sensor/instrumentation/instasarama`](./instrumentation/instasarama) provides both unary and stream interceptors to instrument Kafka producers and consumers built on top of `github.com/Shopify/sarama`. + ## Sensor To use sensor only without tracing ability, import the `instana` package and run diff --git a/instrumentation/instasarama/LICENSE.md b/instrumentation/instasarama/LICENSE.md new file mode 100644 index 000000000..751aac778 --- /dev/null +++ b/instrumentation/instasarama/LICENSE.md @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2020 Instana + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/instrumentation/instasarama/README.md b/instrumentation/instasarama/README.md new file mode 100644 index 000000000..b797fbf9e --- /dev/null +++ b/instrumentation/instasarama/README.md @@ -0,0 +1,129 @@ +Instana instrumentation for github.com/Shopify/sarama +===================================================== + +This module contains instrumentation code for Kafka producers and consumers that use `github.com/Shopify/sarama` library starting +from v1.19.0 and above. + +[![GoDoc](https://img.shields.io/static/v1?label=godoc&message=reference&color=blue)][godoc] + +Installation +------------ + +Unlike the Instana Go sensor, this instrumentation module requires Go v1.9+. + +```bash +$ go get github.com/instana/go-sensor/instrumentation/instasarama +``` + +Usage +----- + +For detailed usage example see [the documentation][godoc] or [`example_test.go`](./example_test.go). + +This instrumentation requires an instance of `instana.Sensor` to initialize spans and handle the trace context propagation. +You can create a new instance of Instana sensor using `instana.NewSensor()`. + +`instasarama` provides a set of convenience wrappers for constructor functions exported by `github.com/Shopify/sarama`. These +wrappers are named the same way as their origins and use the same set of arguments. In most cases it's enough to replace +`sarama` with `instasarama` in the constructor call and append an instance of `*instana.Sensor` to the argument list. + +**Note**: Kafka supports record headers starting from v0.11.0. In order to enable trace context propagation, you need to make sure +that your `(sarama.Config).Version` is set to at least `sarama.V0_11_0_0`. + +### Instrumenting `sarama.SyncProducer` + +For more detailed example code please consult the [package documentation][godoc] or [example_sync_producer_test.go](./example_sync_producer_test.go). + +To create an instrumented instance of `sarama.SyncProducer` from a list of broker addresses use [instasarama.NewSyncProducer()][NewSyncProducer]: + +```go +producer := instasarama.NewSyncProducer(brokers, config, sensor) +``` + +[instasarama.NewSyncProducerFromClient()][NewSyncProducerFromClient] does the same, but from an existing `sarama.Client`: + +```go +producer := instasarama.NewSyncProducerFromClient(client, sensor) +``` + +The wrapped producer takes care of trace context propagation by creating an exit span and injecting the trace context into each Kafka +message headers. Since `github.com/Shopify/sarama` does not use `context.Context`, which is a conventional way of passing the parent +span in Instana Go sensor, the caller needs to inject the parent span context using [`instasarama.ProducerMessageWithSpan()`][ProducerMessageWithSpan] +before passing it to the wrapped producer. + +### Instrumenting `sarama.AsyncProducer` + +Similarly to `sarama.SyncProducer`, `instasarama` provides wrappers for constructor methods of `sarama.AsyncProducer` and expects +the parent span context to be injected into message headers using using `instasarama.ProducerMessageWithSpan()`. + +For more detailed example code please consult the [package documentation][godoc] or [example_async_producer_test.go](./example_async_producer_test.go). + +To create an instrumented instance of `sarama.AsyncProducer` from a list of broker addresses use [instasarama.NewAsyncProducer()][NewAsyncProducer]: + +```go +producer := instasarama.NewAsyncProducer(brokers, config, sensor) +``` + +[instasarama.NewAsyncProducerFromClient()][NewAsyncProducerFromClient] does the same, but from an existing `sarama.Client`: + +```go +producer := instasarama.NewAsyncProducerFromClient(client, sensor) +``` + +The wrapped producer takes care of trace context propagation by creating an exit span and injecting the trace context into each Kafka +message headers. Since `github.com/Shopify/sarama` does not use `context.Context`, which is a conventional way of passing the parent +span in Instana Go sensor, the caller needs to inject the parent span context using [`instasarama.ProducerMessageWithSpan()`][ProducerMessageWithSpan] +before passing it to the wrapped producer. + +### Instrumenting `sarama.Consumer` + +For more detailed example code please consult the [package documentation][godoc] or [example_consumer_test.go](./example_consumer_test.go). + +To create an instrumented instance of `sarama.Consumer` from a list of broker addresses use [instasarama.NewConsumer()][NewConsumer]: + +```go +consumer := instasarama.NewConsumer(brokers, config, sensor) +``` + +[instasarama.NewConsumerFromClient()][NewConsumerFromClient] does the same, but from an existing `sarama.Client`: + +```go +consumer := instasarama.NewConsumerFromClient(client, sensor) +``` + +The wrapped consumer will pick up the existing trace context if found in message headers, start a new entry span and inject its context +into each message. This context can be retrieved with [`instasarama.SpanContextFromConsumerMessage()`][SpanContextFromConsumerMessage] +and used in the message handler to continue the trace. + +### Instrumenting `sarama.ConsumerGroup` + +For more detailed example code please consult the [package documentation][godoc] or [example_consumer_group_test.go](./example_consumer_group_test.go). + +`instasarama` provides [`instasarama.WrapConsumerGroupHandler()`][WrapConsumerGroupHandler] to wrap your `sarama.ConsumerGroupHandler` +into a wrapper that takes care of trace context extraction, creating an entry span and injecting its context into each received `sarama.ConsumerMessage`: + +```go +var client sarama.ConsumerGroup + +consumer := instasarama.WrapConsumerGroupHandler(&Consumer{}, sensor) + +// use the wrapped consumer in the Consume() call +for { + client.Consume(ctx, consumer) +} +``` + +The wrapped consumer will pick up the existing trace context if found in message headers, start a new entry span and inject its context +into each message. This context can be retrieved with [`instasarama.SpanContextFromConsumerMessage()`][SpanContextFromConsumerMessage] and used +in the message handler to continue the trace. + +[godoc]: https://pkg.go.dev/github.com/instana/go-sensor/instrumentation/instasarama +[NewSyncProducer]: https://pkg.go.dev/github.com/instana/go-sensor/instrumentation/instasarama?tab=doc#NewSyncProducer +[NewSyncProducerFromClient]: https://pkg.go.dev/github.com/instana/go-sensor/instrumentation/instasarama?tab=doc#NewSyncProducerFromClient +[NewAsyncProducer]: https://pkg.go.dev/github.com/instana/go-sensor/instrumentation/instasarama?tab=doc#NewAsyncProducer +[NewAsyncProducerFromClient]: https://pkg.go.dev/github.com/instana/go-sensor/instrumentation/instasarama?tab=doc#NewAsyncProducerFromClient +[NewConsumer]: https://pkg.go.dev/github.com/instana/go-sensor/instrumentation/instasarama?tab=doc#NewConsumer +[NewConsumerFromClient]: https://pkg.go.dev/github.com/instana/go-sensor/instrumentation/instasarama?tab=doc#NewConsumerFromClient +[WrapConsumerGroupHandler]: https://pkg.go.dev/github.com/instana/go-sensor/instrumentation/instasarama?tab=doc#WrapConsumerGroupHandler +[ProducerMessageWithSpan]: https://pkg.go.dev/github.com/instana/go-sensor/instrumentation/instasarama?tab=doc#ProducerMessageWithSpan +[SpanContextFromConsumerMessage]: https://pkg.go.dev/github.com/instana/go-sensor/instrumentation/instasarama?tab=doc#SpanContextFromConsumerMessage diff --git a/instrumentation/instasarama/async_producer.go b/instrumentation/instasarama/async_producer.go new file mode 100644 index 000000000..564e85610 --- /dev/null +++ b/instrumentation/instasarama/async_producer.go @@ -0,0 +1,138 @@ +// +build go1.9 + +package instasarama + +import ( + "github.com/Shopify/sarama" + instana "github.com/instana/go-sensor" + ot "github.com/opentracing/opentracing-go" + otlog "github.com/opentracing/opentracing-go/log" +) + +// AsyncProducer is a wrapper for sarama.AsyncProducer that instruments its calls using +// provided instana.Sensor +type AsyncProducer struct { + sarama.AsyncProducer + sensor *instana.Sensor + + awaitResult bool + propageContext bool + + input chan *sarama.ProducerMessage + successes chan *sarama.ProducerMessage + errors chan *sarama.ProducerError + + channelStates uint8 // bit fields describing the open/closed state of the response channels + activeSpans *spanRegistry +} + +const ( + apSuccessesChanReady = uint8(1) << iota + apErrorsChanReady + + apAllChansReady = apSuccessesChanReady | apErrorsChanReady +) + +// NewAsyncProducer creates a new sarama.AsyncProducer using the given broker addresses and configuration, and +// instruments its calls +func NewAsyncProducer(addrs []string, conf *sarama.Config, sensor *instana.Sensor) (sarama.AsyncProducer, error) { + ap, err := sarama.NewAsyncProducer(addrs, conf) + if err != nil { + return ap, err + } + + return WrapAsyncProducer(ap, conf, sensor), nil +} + +// NewAsyncProducerFromClient creates a new sarama.AsyncProducer using the given client, and +// instruments its calls +func NewAsyncProducerFromClient(client sarama.Client, sensor *instana.Sensor) (sarama.AsyncProducer, error) { + ap, err := sarama.NewAsyncProducerFromClient(client) + if err != nil { + return ap, err + } + + return WrapAsyncProducer(ap, client.Config(), sensor), nil +} + +// WrapAsyncProducer wraps an existing sarama.AsyncProducer and instruments its calls. It requires the same +// config that was used to create this producer to detect the Kafka version and whether it's supposed to return +// successes/errors. To initialize a new sync producer instance use instasarama.NewAsyncProducer() and +// instasarama.NewAsyncProducerFromClient() convenience methods instead +func WrapAsyncProducer(p sarama.AsyncProducer, conf *sarama.Config, sensor *instana.Sensor) *AsyncProducer { + ap := &AsyncProducer{ + AsyncProducer: p, + sensor: sensor, + input: make(chan *sarama.ProducerMessage), + successes: make(chan *sarama.ProducerMessage), + errors: make(chan *sarama.ProducerError), + channelStates: apAllChansReady, + } + + if conf != nil { + ap.propageContext = contextPropagationSupported(conf.Version) + ap.awaitResult = conf.Producer.Return.Successes && conf.Producer.Return.Errors + ap.activeSpans = newSpanRegistry() + } + + go ap.consume() + + return ap +} + +// Input is the input channel for the user to write messages to that they wish to send. The async producer +// will than create a new exit span for each message that has trace context added with instasarama.ProducerMessageWithSpan() +func (p *AsyncProducer) Input() chan<- *sarama.ProducerMessage { return p.input } + +// Successes is the success output channel back to the user +func (p *AsyncProducer) Successes() <-chan *sarama.ProducerMessage { return p.successes } + +// Errors is the error output channel back to the user +func (p *AsyncProducer) Errors() <-chan *sarama.ProducerError { return p.errors } + +func (p *AsyncProducer) consume() { + for p.channelStates&apAllChansReady != 0 { + select { + case msg := <-p.input: + sp := startProducerSpan(p.sensor, msg) + if sp != nil { + if p.awaitResult { // postpone span finish until the result is received + p.activeSpans.Add(producerSpanKey(msg), sp) + } else { + sp.Finish() + } + + carrier := ProducerMessageCarrier{msg} + if p.propageContext { + p.sensor.Tracer().Inject(sp.Context(), ot.TextMap, carrier) + } else { + carrier.RemoveAll() + } + } + + p.AsyncProducer.Input() <- msg + case msg, ok := <-p.AsyncProducer.Successes(): + if !ok { + p.channelStates &= ^apSuccessesChanReady + continue + } + p.successes <- msg + + if sp, ok := p.activeSpans.Remove(producerSpanKey(msg)); ok { + sp.Finish() + } + case msg, ok := <-p.AsyncProducer.Errors(): + if !ok { + p.channelStates &= ^apErrorsChanReady + continue + } + p.errors <- msg + + if sp, ok := p.activeSpans.Remove(producerSpanKey(msg.Msg)); ok { + sp.SetTag("kafka.error", msg.Err) + sp.LogFields(otlog.Error(msg.Err)) + sp.Finish() + } + } + } +} diff --git a/instrumentation/instasarama/async_producer_test.go b/instrumentation/instasarama/async_producer_test.go new file mode 100644 index 000000000..248351c65 --- /dev/null +++ b/instrumentation/instasarama/async_producer_test.go @@ -0,0 +1,343 @@ +// +build go1.9 + +package instasarama_test + +import ( + "errors" + "testing" + "time" + + "github.com/Shopify/sarama" + instana "github.com/instana/go-sensor" + "github.com/instana/go-sensor/instrumentation/instasarama" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestAsyncProducer_Input(t *testing.T) { + recorder := instana.NewTestRecorder() + sensor := instana.NewSensorWithTracer(instana.NewTracerWithEverything(&instana.Options{}, recorder)) + + parent := sensor.Tracer().StartSpan("test-span") + msg := instasarama.ProducerMessageWithSpan(&sarama.ProducerMessage{Topic: "test-topic"}, parent) + + ap := newTestAsyncProducer(nil) + defer ap.Teardown() + + conf := sarama.NewConfig() + conf.Version = sarama.V0_11_0_0 + + wrapped := instasarama.WrapAsyncProducer(ap, conf, sensor) + wrapped.Input() <- msg + + var published *sarama.ProducerMessage + select { + case published = <-ap.input: + break + case <-time.After(1 * time.Second): + t.Fatalf("publishing via async producer timed out after 1s") + } + + parent.Finish() + + spans := recorder.GetQueuedSpans() + require.Len(t, spans, 2) + + pSpan, err := extractAgentSpan(spans[1]) + require.NoError(t, err) + + cSpan, err := extractAgentSpan(spans[0]) + require.NoError(t, err) + + assert.Equal(t, 0, cSpan.Ec) + assert.EqualValues(t, instana.ExitSpanKind, cSpan.Kind) + + assert.Equal(t, agentKafkaSpanData{ + Service: "test-topic", + Access: "send", + }, cSpan.Data.Kafka) + + assert.Contains(t, published.Headers, sarama.RecordHeader{ + Key: []byte("X_INSTANA_C"), + Value: instasarama.PackTraceContextHeader(instana.FormatID(cSpan.TraceID), instana.FormatID(cSpan.SpanID)), + }) + assert.Contains(t, published.Headers, sarama.RecordHeader{ + Key: []byte("X_INSTANA_L"), + Value: instasarama.PackTraceLevelHeader("1"), + }) + + assert.Equal(t, pSpan.TraceID, cSpan.TraceID) + assert.Equal(t, pSpan.SpanID, cSpan.ParentID) + assert.NotEqual(t, pSpan.SpanID, cSpan.SpanID) +} + +func TestAsyncProducer_Input_WithAwaitResult_Success(t *testing.T) { + recorder := instana.NewTestRecorder() + sensor := instana.NewSensorWithTracer(instana.NewTracerWithEverything(&instana.Options{}, recorder)) + + parent := sensor.Tracer().StartSpan("test-span") + msg := instasarama.ProducerMessageWithSpan(&sarama.ProducerMessage{Topic: "test-topic"}, parent) + + ap := newTestAsyncProducer(nil) + defer ap.Teardown() + + conf := sarama.NewConfig() + conf.Version = sarama.V0_11_0_0 + conf.Producer.Return.Successes = true + conf.Producer.Return.Errors = true + + wrapped := instasarama.WrapAsyncProducer(ap, conf, sensor) + wrapped.Input() <- msg + + var published *sarama.ProducerMessage + select { + case published = <-ap.input: + break + case <-time.After(1 * time.Second): + t.Fatalf("publishing via async producer timed out after 1s") + } + + parent.Finish() + + spans := recorder.GetQueuedSpans() + require.Len(t, spans, 1) + + pSpan, err := extractAgentSpan(spans[0]) + require.NoError(t, err) + + // send error for another message + ap.errors <- &sarama.ProducerError{ + Msg: &sarama.ProducerMessage{Topic: "another-topic"}, + Err: errors.New("something went wrong"), + } + <-wrapped.Errors() + require.Empty(t, recorder.GetQueuedSpans()) + + // send success for another message + ap.successes <- &sarama.ProducerMessage{Topic: "another-topic"} + <-wrapped.Successes() + require.Empty(t, recorder.GetQueuedSpans()) + + // send expected success + ap.successes <- msg + <-wrapped.Successes() + + spans = recorder.GetQueuedSpans() + require.Len(t, spans, 1) + + cSpan, err := extractAgentSpan(spans[0]) + require.NoError(t, err) + + assert.Equal(t, 0, cSpan.Ec) + assert.EqualValues(t, instana.ExitSpanKind, cSpan.Kind) + + assert.Equal(t, agentKafkaSpanData{ + Service: "test-topic", + Access: "send", + }, cSpan.Data.Kafka) + + assert.Contains(t, published.Headers, sarama.RecordHeader{ + Key: []byte("X_INSTANA_C"), + Value: instasarama.PackTraceContextHeader(instana.FormatID(cSpan.TraceID), instana.FormatID(cSpan.SpanID)), + }) + assert.Contains(t, published.Headers, sarama.RecordHeader{ + Key: []byte("X_INSTANA_L"), + Value: instasarama.PackTraceLevelHeader("1"), + }) + + assert.Equal(t, pSpan.TraceID, cSpan.TraceID) + assert.Equal(t, pSpan.SpanID, cSpan.ParentID) + assert.NotEqual(t, pSpan.SpanID, cSpan.SpanID) +} + +func TestAsyncProducer_Input_WithAwaitResult_Error(t *testing.T) { + recorder := instana.NewTestRecorder() + sensor := instana.NewSensorWithTracer(instana.NewTracerWithEverything(&instana.Options{}, recorder)) + + parent := sensor.Tracer().StartSpan("test-span") + msg := instasarama.ProducerMessageWithSpan(&sarama.ProducerMessage{Topic: "test-topic"}, parent) + + ap := newTestAsyncProducer(nil) + defer ap.Teardown() + + conf := sarama.NewConfig() + conf.Version = sarama.V0_11_0_0 + conf.Producer.Return.Successes = true + conf.Producer.Return.Errors = true + + wrapped := instasarama.WrapAsyncProducer(ap, conf, sensor) + wrapped.Input() <- msg + + var published *sarama.ProducerMessage + select { + case published = <-ap.input: + break + case <-time.After(1 * time.Second): + t.Fatalf("publishing via async producer timed out after 1s") + } + + parent.Finish() + + spans := recorder.GetQueuedSpans() + require.Len(t, spans, 1) + + pSpan, err := extractAgentSpan(spans[0]) + require.NoError(t, err) + + // send error for another message + ap.errors <- &sarama.ProducerError{ + Msg: &sarama.ProducerMessage{Topic: "another-topic"}, + Err: errors.New("something went wrong"), + } + <-wrapped.Errors() + require.Empty(t, recorder.GetQueuedSpans()) + + // send success for another message + ap.successes <- &sarama.ProducerMessage{Topic: "another-topic"} + <-wrapped.Successes() + require.Empty(t, recorder.GetQueuedSpans()) + + // send expected error + ap.errors <- &sarama.ProducerError{ + Msg: msg, + Err: errors.New("something went wrong"), + } + <-wrapped.Errors() + + spans = recorder.GetQueuedSpans() + require.Len(t, spans, 1) + + cSpan, err := extractAgentSpan(spans[0]) + require.NoError(t, err) + + assert.Equal(t, 1, cSpan.Ec) + assert.EqualValues(t, instana.ExitSpanKind, cSpan.Kind) + + assert.Equal(t, agentKafkaSpanData{ + Service: "test-topic", + Access: "send", + }, cSpan.Data.Kafka) + + assert.Contains(t, published.Headers, sarama.RecordHeader{ + Key: []byte("X_INSTANA_C"), + Value: instasarama.PackTraceContextHeader(instana.FormatID(cSpan.TraceID), instana.FormatID(cSpan.SpanID)), + }) + assert.Contains(t, published.Headers, sarama.RecordHeader{ + Key: []byte("X_INSTANA_L"), + Value: instasarama.PackTraceLevelHeader("1"), + }) + + assert.Equal(t, pSpan.TraceID, cSpan.TraceID) + assert.Equal(t, pSpan.SpanID, cSpan.ParentID) + assert.NotEqual(t, pSpan.SpanID, cSpan.SpanID) +} + +func TestAsyncProducer_Input_NoTraceContext(t *testing.T) { + recorder := instana.NewTestRecorder() + sensor := instana.NewSensorWithTracer(instana.NewTracerWithEverything(&instana.Options{}, recorder)) + + msg := &sarama.ProducerMessage{ + Topic: "topic-1", + } + + ap := newTestAsyncProducer(nil) + defer ap.Teardown() + + wrapped := instasarama.WrapAsyncProducer(ap, sarama.NewConfig(), sensor) + wrapped.Input() <- msg + + select { + case published := <-ap.input: + assert.Equal(t, msg, published) + case <-time.After(1 * time.Second): + t.Fatalf("publishing via async producer timed out after 1s") + } + + assert.Empty(t, recorder.GetQueuedSpans()) +} + +func TestAsyncProducer_Successes(t *testing.T) { + recorder := instana.NewTestRecorder() + sensor := instana.NewSensorWithTracer(instana.NewTracerWithEverything(&instana.Options{}, recorder)) + + msg := &sarama.ProducerMessage{ + Topic: "topic-1", + } + + ap := newTestAsyncProducer(nil) + defer ap.Teardown() + + ap.successes <- msg + + wrapped := instasarama.WrapAsyncProducer(ap, sarama.NewConfig(), sensor) + + select { + case received := <-wrapped.Successes(): + assert.Equal(t, msg, received) + case <-time.After(1 * time.Second): + t.Fatalf("reading a success message from async producer timed out after 1s") + } +} + +func TestAsyncProducer_Errors(t *testing.T) { + recorder := instana.NewTestRecorder() + sensor := instana.NewSensorWithTracer(instana.NewTracerWithEverything(&instana.Options{}, recorder)) + + msg := &sarama.ProducerError{ + Err: errors.New("something went wrong"), + Msg: &sarama.ProducerMessage{Topic: "topic-1"}, + } + + ap := newTestAsyncProducer(nil) + defer ap.Teardown() + + ap.errors <- msg + + wrapped := instasarama.WrapAsyncProducer(ap, sarama.NewConfig(), sensor) + + select { + case received := <-wrapped.Errors(): + assert.Equal(t, msg, received) + case <-time.After(1 * time.Second): + t.Fatalf("reading an error message from async producer timed out after 1s") + } +} + +type testAsyncProducer struct { + Error error + Closed bool + Async bool + + input chan *sarama.ProducerMessage + successes chan *sarama.ProducerMessage + errors chan *sarama.ProducerError +} + +func newTestAsyncProducer(returnedErr error) *testAsyncProducer { + return &testAsyncProducer{ + Error: returnedErr, + input: make(chan *sarama.ProducerMessage, 1), + successes: make(chan *sarama.ProducerMessage, 1), + errors: make(chan *sarama.ProducerError, 1), + } +} + +func (p *testAsyncProducer) AsyncClose() { + p.Closed = true + p.Async = true +} + +func (p *testAsyncProducer) Close() error { + p.Closed = true + return p.Error +} + +func (p *testAsyncProducer) Input() chan<- *sarama.ProducerMessage { return p.input } +func (p *testAsyncProducer) Successes() <-chan *sarama.ProducerMessage { return p.successes } +func (p *testAsyncProducer) Errors() <-chan *sarama.ProducerError { return p.errors } + +func (p *testAsyncProducer) Teardown() { + close(p.input) + close(p.successes) + close(p.errors) +} diff --git a/instrumentation/instasarama/consumer.go b/instrumentation/instasarama/consumer.go new file mode 100644 index 000000000..2c54b329b --- /dev/null +++ b/instrumentation/instasarama/consumer.go @@ -0,0 +1,56 @@ +// +build go1.9 + +package instasarama + +import ( + "github.com/Shopify/sarama" + instana "github.com/instana/go-sensor" +) + +// Consumer is a wrapper for sarama.Consumer that wraps and returns instrumented +// partition consumers +type Consumer struct { + sarama.Consumer + sensor *instana.Sensor +} + +// NewConsumer creates a new consumer using the given broker addresses and configuration, and +// instruments its calls +func NewConsumer(addrs []string, config *sarama.Config, sensor *instana.Sensor) (sarama.Consumer, error) { + c, err := sarama.NewConsumer(addrs, config) + if err != nil { + return c, err + } + + return WrapConsumer(c, sensor), nil +} + +// NewConsumerFromClient creates a new consumer using the given client and intruments its calls +func NewConsumerFromClient(client sarama.Client, sensor *instana.Sensor) (sarama.Consumer, error) { + c, err := sarama.NewConsumerFromClient(client) + if err != nil { + return c, err + } + + return WrapConsumer(c, sensor), nil +} + +// WrapConsumer wraps an existing sarama.Consumer instance and instruments its calls. To initialize +// a new instance of sarama.Consumer use instasarama.NewConsumer() and instasarama.NewConsumerFromclient() +// convenience methods instead +func WrapConsumer(c sarama.Consumer, sensor *instana.Sensor) *Consumer { + return &Consumer{ + Consumer: c, + sensor: sensor, + } +} + +// ConsumePartition instruments and returns the partition consumer returned by undelying sarama.Consumer +func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error) { + pc, err := c.Consumer.ConsumePartition(topic, partition, offset) + if err != nil { + return nil, err + } + + return WrapPartitionConsumer(pc, c.sensor), nil +} diff --git a/instrumentation/instasarama/consumer_group_handler.go b/instrumentation/instasarama/consumer_group_handler.go new file mode 100644 index 000000000..f3114d867 --- /dev/null +++ b/instrumentation/instasarama/consumer_group_handler.go @@ -0,0 +1,114 @@ +// +build go1.9 + +package instasarama + +import ( + "github.com/Shopify/sarama" + instana "github.com/instana/go-sensor" + ot "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" +) + +// ConsumerGroupHandler is a wrapper for sarama.ConsumerGroupHandler that creates an entry span for each +// incoming Kafka message, ensuring the extraction and continuation of the existing trace context if provided +type ConsumerGroupHandler struct { + handler sarama.ConsumerGroupHandler + sensor *instana.Sensor +} + +// WrapConsumerGroupHandler wraps the existing group handler and intruments its calls +func WrapConsumerGroupHandler(h sarama.ConsumerGroupHandler, sensor *instana.Sensor) *ConsumerGroupHandler { + return &ConsumerGroupHandler{ + handler: h, + sensor: sensor, + } +} + +// Setup calls the underlying handler's Setup() method +func (h *ConsumerGroupHandler) Setup(sess sarama.ConsumerGroupSession) error { + return h.handler.Setup(sess) +} + +// Cleanup calls the underlying handler's Cleanup() method +func (h *ConsumerGroupHandler) Cleanup(sess sarama.ConsumerGroupSession) error { + return h.handler.Cleanup(sess) +} + +// ConsumeClaim injects the trace context into incoming message headers and delegates further processing to +// the underlying handler +func (h *ConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + wrappedSess := newConsumerGroupSession(sess, h.sensor) + wrappedClaim := newConsumerGroupClaim(claim) + + go func() { + for msg := range claim.Messages() { + sp := wrappedSess.startSpan(msg) + sp.Tracer().Inject(sp.Context(), ot.TextMap, ConsumerMessageCarrier{msg}) + wrappedClaim.messages <- msg + } + close(wrappedClaim.messages) + }() + + return h.handler.ConsumeClaim(wrappedSess, wrappedClaim) +} + +// consumerGroupSession is a wrapper for sarama.ConsumerGroupSession that keeps track of active spans associated +// with messages consumed during this session. The span is initiated by (*instasarama.ConsumerGroupHandler).ConsumeClaim() +// and finished when the message is marked as consumed by MarkMessage(). +type consumerGroupSession struct { + sarama.ConsumerGroupSession + sensor *instana.Sensor + activeSpans *spanRegistry +} + +func newConsumerGroupSession(sess sarama.ConsumerGroupSession, sensor *instana.Sensor) *consumerGroupSession { + return &consumerGroupSession{ + ConsumerGroupSession: sess, + sensor: sensor, + activeSpans: newSpanRegistry(), + } +} + +func (sess *consumerGroupSession) startSpan(msg *sarama.ConsumerMessage) ot.Span { + opts := []ot.StartSpanOption{ + ext.SpanKindConsumer, + ot.Tags{ + "kafka.service": msg.Topic, + "kafka.access": "consume", + }, + } + if spanContext, ok := SpanContextFromConsumerMessage(msg, sess.sensor); ok { + opts = append(opts, ot.ChildOf(spanContext)) + } + + sp := sess.sensor.Tracer().StartSpan("kafka", opts...) + sess.activeSpans.Add(consumerSpanKey(msg), sp) + + return sp +} + +func (sess *consumerGroupSession) MarkMessage(msg *sarama.ConsumerMessage, metadata string) { + if sp, ok := sess.activeSpans.Remove(consumerSpanKey(msg)); ok { + defer sp.Finish() + } + + sess.ConsumerGroupSession.MarkMessage(msg, metadata) +} + +// consumerGroupClaim is a wrapper for sarama.ConsumerGroupClaim that keeps messages after +// the trace header have been added until they are consumed by the original handler +type consumerGroupClaim struct { + sarama.ConsumerGroupClaim + messages chan *sarama.ConsumerMessage +} + +func newConsumerGroupClaim(claim sarama.ConsumerGroupClaim) *consumerGroupClaim { + return &consumerGroupClaim{ + ConsumerGroupClaim: claim, + messages: make(chan *sarama.ConsumerMessage), + } +} + +func (c *consumerGroupClaim) Messages() <-chan *sarama.ConsumerMessage { + return c.messages +} diff --git a/instrumentation/instasarama/consumer_group_handler_test.go b/instrumentation/instasarama/consumer_group_handler_test.go new file mode 100644 index 000000000..d6a14bd94 --- /dev/null +++ b/instrumentation/instasarama/consumer_group_handler_test.go @@ -0,0 +1,219 @@ +// +build go1.9 + +package instasarama_test + +import ( + "context" + "errors" + "testing" + + "github.com/Shopify/sarama" + instana "github.com/instana/go-sensor" + "github.com/instana/go-sensor/instrumentation/instasarama" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestConsumerGroupHandler_ConsumeClaim(t *testing.T) { + recorder := instana.NewTestRecorder() + sensor := instana.NewSensorWithTracer(instana.NewTracerWithEverything(&instana.Options{}, recorder)) + + messages := []*sarama.ConsumerMessage{ + { + Topic: "topic-1", + Headers: []*sarama.RecordHeader{ + { + Key: []byte("x_instana_c"), + Value: []byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x0a, 0xbc, 0xde, 0x12, + // span id + 0x00, 0x00, 0x00, 0x00, 0xde, 0xad, 0xbe, 0xef, + }, + }, + {Key: []byte("x_instana_l"), Value: []byte{0x01}}, + }, + }, + {Topic: "topic-2"}, + } + + claim := &testConsumerGroupClaim{ + messages: make(chan *sarama.ConsumerMessage, len(messages)), + } + for _, msg := range messages { + claim.messages <- msg + } + close(claim.messages) + + sess := &testConsumerGroupSession{} + + h := &testConsumerGroupHandler{} + wrapped := instasarama.WrapConsumerGroupHandler(h, sensor) + + require.NoError(t, wrapped.ConsumeClaim(sess, claim)) + + assert.Equal(t, messages, h.Messages) // all messages were processed + assert.Equal(t, h.Messages, sess.Messages) // all messages are marked + + spans := recorder.GetQueuedSpans() + require.Len(t, spans, 2) + + t.Run("span for message with trace headers", func(t *testing.T) { + span, err := extractAgentSpan(spans[0]) + require.NoError(t, err) + + assert.EqualValues(t, 0x0abcde12, span.TraceID) + assert.EqualValues(t, 0xdeadbeef, span.ParentID) + assert.NotEqual(t, span.ParentID, span.SpanID) + + assert.Equal(t, span.Name, "kafka") + assert.EqualValues(t, span.Kind, instana.EntrySpanKind) + + assert.Equal(t, agentKafkaSpanData{ + Service: "topic-1", + Access: "consume", + }, span.Data.Kafka) + + assert.Contains(t, h.Messages[0].Headers, &sarama.RecordHeader{ + Key: []byte("x_instana_c"), + Value: instasarama.PackTraceContextHeader(instana.FormatID(span.TraceID), instana.FormatID(span.SpanID)), + }) + assert.Contains(t, h.Messages[0].Headers, &sarama.RecordHeader{ + Key: []byte("x_instana_l"), + Value: instasarama.PackTraceLevelHeader("1"), + }) + }) + + t.Run("span for message without trace headers", func(t *testing.T) { + span, err := extractAgentSpan(spans[1]) + require.NoError(t, err) + + assert.NotEmpty(t, span.TraceID) + assert.Empty(t, span.ParentID) + assert.EqualValues(t, span.TraceID, span.SpanID) + + assert.Equal(t, span.Name, "kafka") + assert.EqualValues(t, span.Kind, instana.EntrySpanKind) + + assert.Equal(t, agentKafkaSpanData{ + Service: "topic-2", + Access: "consume", + }, span.Data.Kafka) + + assert.Contains(t, h.Messages[1].Headers, &sarama.RecordHeader{ + Key: []byte("X_INSTANA_C"), + Value: instasarama.PackTraceContextHeader(instana.FormatID(span.TraceID), instana.FormatID(span.SpanID)), + }) + assert.Contains(t, h.Messages[1].Headers, &sarama.RecordHeader{ + Key: []byte("X_INSTANA_L"), + Value: instasarama.PackTraceLevelHeader("1"), + }) + }) +} + +func TestConsumerGroupHandler_Setup(t *testing.T) { + recorder := instana.NewTestRecorder() + sensor := instana.NewSensorWithTracer(instana.NewTracerWithEverything(&instana.Options{}, recorder)) + + h := &testConsumerGroupHandler{} + wrapped := instasarama.WrapConsumerGroupHandler(h, sensor) + + require.NoError(t, wrapped.Setup(&testConsumerGroupSession{})) + assert.True(t, h.SetupCalled) + + assert.Empty(t, recorder.GetQueuedSpans()) +} + +func TestConsumerGroupHandler_Setup_Error(t *testing.T) { + recorder := instana.NewTestRecorder() + sensor := instana.NewSensorWithTracer(instana.NewTracerWithEverything(&instana.Options{}, recorder)) + + h := &testConsumerGroupHandler{ + Error: errors.New("something went wrong"), + } + wrapped := instasarama.WrapConsumerGroupHandler(h, sensor) + + assert.Error(t, wrapped.Setup(&testConsumerGroupSession{})) + + assert.Empty(t, recorder.GetQueuedSpans()) +} + +func TestConsumerGroupHandler_Cleanup(t *testing.T) { + recorder := instana.NewTestRecorder() + sensor := instana.NewSensorWithTracer(instana.NewTracerWithEverything(&instana.Options{}, recorder)) + + h := &testConsumerGroupHandler{} + wrapped := instasarama.WrapConsumerGroupHandler(h, sensor) + + require.NoError(t, wrapped.Cleanup(&testConsumerGroupSession{})) + assert.True(t, h.CleanupCalled) + + assert.Empty(t, recorder.GetQueuedSpans()) +} + +func TestConsumerGroupHandler_Cleanup_Error(t *testing.T) { + recorder := instana.NewTestRecorder() + sensor := instana.NewSensorWithTracer(instana.NewTracerWithEverything(&instana.Options{}, recorder)) + + h := &testConsumerGroupHandler{ + Error: errors.New("something went wrong"), + } + wrapped := instasarama.WrapConsumerGroupHandler(h, sensor) + + assert.Error(t, wrapped.Cleanup(&testConsumerGroupSession{})) + + assert.Empty(t, recorder.GetQueuedSpans()) +} + +type testConsumerGroupHandler struct { + Error error + + SetupCalled, CleanupCalled bool + Messages []*sarama.ConsumerMessage +} + +func (h *testConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error { + h.SetupCalled = true + return h.Error +} + +func (h *testConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error { + h.CleanupCalled = true + return h.Error +} + +func (h *testConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + for msg := range claim.Messages() { + h.Messages = append(h.Messages, msg) + sess.MarkMessage(msg, "") + } + + return h.Error +} + +type testConsumerGroupClaim struct { + messages chan *sarama.ConsumerMessage +} + +func (c *testConsumerGroupClaim) Topic() string { return "test-topic" } +func (c *testConsumerGroupClaim) Partition() int32 { return 0 } +func (c *testConsumerGroupClaim) InitialOffset() int64 { return 0 } +func (c *testConsumerGroupClaim) HighWaterMarkOffset() int64 { return 100 } +func (c *testConsumerGroupClaim) Messages() <-chan *sarama.ConsumerMessage { return c.messages } + +type testConsumerGroupSession struct { + Messages []*sarama.ConsumerMessage +} + +func (s *testConsumerGroupSession) Claims() map[string][]int32 { return nil } +func (s *testConsumerGroupSession) MemberID() string { return "" } +func (s *testConsumerGroupSession) GenerationID() int32 { return 0 } +func (s *testConsumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string) { +} +func (s *testConsumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string) { +} +func (s *testConsumerGroupSession) MarkMessage(msg *sarama.ConsumerMessage, metadata string) { + s.Messages = append(s.Messages, msg) +} +func (s *testConsumerGroupSession) Context() context.Context { return context.Background() } diff --git a/instrumentation/instasarama/consumer_test.go b/instrumentation/instasarama/consumer_test.go new file mode 100644 index 000000000..972259290 --- /dev/null +++ b/instrumentation/instasarama/consumer_test.go @@ -0,0 +1,234 @@ +// +build go1.9 + +package instasarama_test + +import ( + "errors" + "testing" + "time" + + "github.com/Shopify/sarama" + instana "github.com/instana/go-sensor" + "github.com/instana/go-sensor/instrumentation/instasarama" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestConsumer_ConsumePartition(t *testing.T) { + recorder := instana.NewTestRecorder() + sensor := instana.NewSensorWithTracer(instana.NewTracerWithEverything(&instana.Options{}, recorder)) + + messages := make(chan *sarama.ConsumerMessage, 1) + c := &testConsumer{ + consumers: map[string]*testPartitionConsumer{ + "topic-1": &testPartitionConsumer{ + messages: messages, + }, + }, + } + + messages <- &sarama.ConsumerMessage{ + Topic: "topic-1", + Headers: []*sarama.RecordHeader{ + { + Key: []byte("x_instana_c"), + Value: []byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x0a, 0xbc, 0xde, 0x12, + // span id + 0x00, 0x00, 0x00, 0x00, 0xde, 0xad, 0xbe, 0xef, + }, + }, + {Key: []byte("x_instana_l"), Value: []byte{0x01}}, + }, + } + + wrapped := instasarama.WrapConsumer(c, sensor) + pc, err := wrapped.ConsumePartition("topic-1", 1, 2) + require.NoError(t, err) + + _, ok := pc.(*instasarama.PartitionConsumer) + require.True(t, ok) + + require.Empty(t, recorder.GetQueuedSpans()) + + select { + case <-pc.Messages(): + break + case <-time.After(1 * time.Second): + t.Fatalf("partition consumer timed out after 1s") + } + + assert.NotEmpty(t, recorder.GetQueuedSpans()) +} + +func TestConsumer_ConsumePartition_Error(t *testing.T) { + recorder := instana.NewTestRecorder() + sensor := instana.NewSensorWithTracer(instana.NewTracerWithEverything(&instana.Options{}, recorder)) + + c := &testConsumer{ + Error: errors.New("something went wrong"), + consumers: map[string]*testPartitionConsumer{ + "topic-1": &testPartitionConsumer{}, + }, + } + + wrapped := instasarama.WrapConsumer(c, sensor) + _, err := wrapped.ConsumePartition("topic-1", 1, 2) + assert.Error(t, err) +} + +func TestConsumer_Topics(t *testing.T) { + recorder := instana.NewTestRecorder() + sensor := instana.NewSensorWithTracer(instana.NewTracerWithEverything(&instana.Options{}, recorder)) + + c := &testConsumer{ + topics: []string{"topic-1", "topic-2"}, + } + + wrapped := instasarama.WrapConsumer(c, sensor) + + topics, err := wrapped.Topics() + require.NoError(t, err) + + assert.Equal(t, c.topics, topics) +} + +func TestConsumer_Topics_Error(t *testing.T) { + recorder := instana.NewTestRecorder() + sensor := instana.NewSensorWithTracer(instana.NewTracerWithEverything(&instana.Options{}, recorder)) + + c := &testConsumer{ + Error: errors.New("something went wrong"), + topics: []string{"topic-1", "topic-2"}, + } + + wrapped := instasarama.WrapConsumer(c, sensor) + _, err := wrapped.Topics() + assert.Error(t, err) +} + +func TestConsumer_Partitions(t *testing.T) { + c := &testConsumer{ + partitions: map[string][]int32{ + "topic-1": []int32{1, 2, 3}, + }, + } + + t.Run("existing", func(t *testing.T) { + recorder := instana.NewTestRecorder() + sensor := instana.NewSensorWithTracer(instana.NewTracerWithEverything(&instana.Options{}, recorder)) + + wrapped := instasarama.WrapConsumer(c, sensor) + partitions, err := wrapped.Partitions("topic-1") + require.NoError(t, err) + + assert.Equal(t, []int32{1, 2, 3}, partitions) + + assert.Empty(t, recorder.GetQueuedSpans()) + }) + + t.Run("non-existing", func(t *testing.T) { + recorder := instana.NewTestRecorder() + sensor := instana.NewSensorWithTracer(instana.NewTracerWithEverything(&instana.Options{}, recorder)) + + wrapped := instasarama.WrapConsumer(c, sensor) + partitions, err := wrapped.Partitions("topic-2") + require.NoError(t, err) + + assert.Empty(t, partitions) + + assert.Empty(t, recorder.GetQueuedSpans()) + }) +} + +func TestConsumer_Partitions_Error(t *testing.T) { + recorder := instana.NewTestRecorder() + sensor := instana.NewSensorWithTracer(instana.NewTracerWithEverything(&instana.Options{}, recorder)) + + c := &testConsumer{ + Error: errors.New("something went wrong"), + partitions: map[string][]int32{ + "topic-1": []int32{1, 2, 3}, + }, + } + + wrapped := instasarama.WrapConsumer(c, sensor) + _, err := wrapped.Partitions("topic-1") + assert.Error(t, err) +} + +func TestConsumer_HighWaterMarks(t *testing.T) { + recorder := instana.NewTestRecorder() + sensor := instana.NewSensorWithTracer(instana.NewTracerWithEverything(&instana.Options{}, recorder)) + + c := &testConsumer{ + offsets: map[string]map[int32]int64{ + "topic-1": { + 1: 42, + }, + }, + } + + wrapped := instasarama.WrapConsumer(c, sensor) + assert.Equal(t, c.offsets, wrapped.HighWaterMarks()) + + assert.Empty(t, recorder.GetQueuedSpans()) +} + +func TestConsumer_Close(t *testing.T) { + recorder := instana.NewTestRecorder() + sensor := instana.NewSensorWithTracer(instana.NewTracerWithEverything(&instana.Options{}, recorder)) + + c := &testConsumer{} + + wrapped := instasarama.WrapConsumer(c, sensor) + require.NoError(t, wrapped.Close()) + + assert.True(t, c.Closed) + assert.Empty(t, recorder.GetQueuedSpans()) +} + +func TestConsumer_Close_Error(t *testing.T) { + recorder := instana.NewTestRecorder() + sensor := instana.NewSensorWithTracer(instana.NewTracerWithEverything(&instana.Options{}, recorder)) + + c := &testConsumer{ + Error: errors.New("something went wrong"), + } + + wrapped := instasarama.WrapConsumer(c, sensor) + assert.Error(t, wrapped.Close()) +} + +type testConsumer struct { + Closed bool + Error error + + topics []string + partitions map[string][]int32 + offsets map[string]map[int32]int64 + consumers map[string]*testPartitionConsumer +} + +func (c *testConsumer) Topics() ([]string, error) { + return c.topics, c.Error +} + +func (c *testConsumer) Partitions(topic string) ([]int32, error) { + return c.partitions[topic], c.Error +} + +func (c *testConsumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error) { + return c.consumers[topic], c.Error +} + +func (c *testConsumer) HighWaterMarks() map[string]map[int32]int64 { + return c.offsets +} + +func (c *testConsumer) Close() error { + c.Closed = true + return c.Error +} diff --git a/instrumentation/instasarama/example_async_producer_test.go b/instrumentation/instasarama/example_async_producer_test.go new file mode 100644 index 000000000..cf404e033 --- /dev/null +++ b/instrumentation/instasarama/example_async_producer_test.go @@ -0,0 +1,36 @@ +// +build go1.9 + +package instasarama_test + +import ( + "github.com/Shopify/sarama" + instana "github.com/instana/go-sensor" + "github.com/instana/go-sensor/instrumentation/instasarama" + "github.com/opentracing/opentracing-go/ext" +) + +// This example demonstrates how to instrument an async Kafka producer using instasarama. +// Error handling is omitted for brevity. +func Example_asyncProducer() { + sensor := instana.NewSensor("my-service") + brokers := []string{"localhost:9092"} + + config := sarama.NewConfig() + // enable the use record headers added in kafka v0.11.0 and used to propagate + // trace context + config.Version = sarama.V0_11_0_0 + + // create a new instrumented instance of sarama.SyncProducer + producer, _ := instasarama.NewAsyncProducer(brokers, config, sensor) + + // start a new entry span + sp := sensor.Tracer().StartSpan("my-producing-method") + ext.SpanKind.Set(sp, "entry") + + msg := &sarama.ProducerMessage{ + // ... + } + + // inject the span before passing the message to producer + producer.Input() <- instasarama.ProducerMessageWithSpan(msg, sp) +} diff --git a/instrumentation/instasarama/example_consumer_group_test.go b/instrumentation/instasarama/example_consumer_group_test.go new file mode 100644 index 000000000..4c1175c89 --- /dev/null +++ b/instrumentation/instasarama/example_consumer_group_test.go @@ -0,0 +1,69 @@ +// +build go1.9 + +package instasarama_test + +import ( + "context" + + "github.com/Shopify/sarama" + instana "github.com/instana/go-sensor" + "github.com/instana/go-sensor/instrumentation/instasarama" + "github.com/opentracing/opentracing-go" +) + +// This example demonstrates how to instrument a Kafka consumer group using instasarama +// and extract the trace context to ensure continuation. Error handling is omitted for brevity. +func Example_consumerGroup() { + sensor := instana.NewSensor("my-service") + brokers := []string{"localhost:9092"} + topics := []string{"records", "more-records"} + + conf := sarama.NewConfig() + conf.Version = sarama.V0_11_0_0 + + client, _ := sarama.NewConsumerGroup(brokers, "my-service-consumers", conf) + defer client.Close() + + ctx := context.Background() + consumer := instasarama.WrapConsumerGroupHandler(&Consumer{sensor: sensor}, sensor) + + // start consuming + for { + _ = client.Consume(ctx, topics, consumer) + + // ... + } +} + +type Consumer struct { + sensor *instana.Sensor +} + +func (*Consumer) Setup(sarama.ConsumerGroupSession) error { + // setup consumer + return nil +} + +func (*Consumer) Cleanup(sarama.ConsumerGroupSession) error { + // cleanup consumer + return nil +} + +func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + for msg := range claim.Messages() { + c.processMessage(msg) + session.MarkMessage(msg, "") + } + + return nil +} + +func (c *Consumer) processMessage(msg *sarama.ConsumerMessage) { + // extract trace context and start a new span + parentCtx, _ := instasarama.SpanContextFromConsumerMessage(msg, c.sensor) + + sp := c.sensor.Tracer().StartSpan("process-message", opentracing.ChildOf(parentCtx)) + defer sp.Finish() + + // process message +} diff --git a/instrumentation/instasarama/example_consumer_test.go b/instrumentation/instasarama/example_consumer_test.go new file mode 100644 index 000000000..c8a34cf25 --- /dev/null +++ b/instrumentation/instasarama/example_consumer_test.go @@ -0,0 +1,40 @@ +// +build go1.9 + +package instasarama_test + +import ( + "github.com/Shopify/sarama" + instana "github.com/instana/go-sensor" + "github.com/instana/go-sensor/instrumentation/instasarama" + "github.com/opentracing/opentracing-go" +) + +// This example demonstrates how to instrument a Kafka consumer using instasarama +// and extract the trace context to ensure continuation. Error handling is omitted for brevity. +func Example_consumer() { + sensor := instana.NewSensor("my-service") + brokers := []string{"localhost:9092"} + + conf := sarama.NewConfig() + conf.Version = sarama.V0_11_0_0 + + // create a new instrumented instance of sarama.Consumer + consumer, _ := instasarama.NewConsumer(brokers, conf, sensor) + + c, _ := consumer.ConsumePartition("records", 0, sarama.OffsetNewest) + defer c.Close() + + for msg := range c.Messages() { + processMessage(msg, sensor) + } +} + +func processMessage(msg *sarama.ConsumerMessage, sensor *instana.Sensor) { + // extract trace context and start a new span + parentCtx, _ := instasarama.SpanContextFromConsumerMessage(msg, sensor) + + sp := sensor.Tracer().StartSpan("process-message", opentracing.ChildOf(parentCtx)) + defer sp.Finish() + + // process message +} diff --git a/instrumentation/instasarama/example_sync_producer_test.go b/instrumentation/instasarama/example_sync_producer_test.go new file mode 100644 index 000000000..daa526e48 --- /dev/null +++ b/instrumentation/instasarama/example_sync_producer_test.go @@ -0,0 +1,41 @@ +// +build go1.9 + +package instasarama_test + +import ( + "github.com/Shopify/sarama" + instana "github.com/instana/go-sensor" + "github.com/instana/go-sensor/instrumentation/instasarama" + "github.com/opentracing/opentracing-go/ext" +) + +// This example demonstrates how to instrument a sync Kafka producer using instasarama. +// Error handling is omitted for brevity. +func Example_syncProducer() { + sensor := instana.NewSensor("my-service") + brokers := []string{"localhost:9092"} + + config := sarama.NewConfig() + // sarama requires Producer.Return.Successes to be set for sync producers + config.Producer.Return.Successes = true + // enable the use record headers added in kafka v0.11.0 and used to propagate + // trace context + config.Version = sarama.V0_11_0_0 + + // create a new instrumented instance of sarama.SyncProducer + producer, _ := instasarama.NewSyncProducer(brokers, config, sensor) + + // start a new entry span + sp := sensor.Tracer().StartSpan("my-producing-method") + ext.SpanKind.Set(sp, "entry") + + msg := &sarama.ProducerMessage{ + // ... + } + + // inject the span before passing the message to producer + msg = instasarama.ProducerMessageWithSpan(msg, sp) + + // pass it to the producer + producer.SendMessage(msg) +} diff --git a/instrumentation/instasarama/go.mod b/instrumentation/instasarama/go.mod new file mode 100644 index 000000000..64ca5b84a --- /dev/null +++ b/instrumentation/instasarama/go.mod @@ -0,0 +1,18 @@ +module github.com/instana/go-sensor/instrumentation/instasarama + +go 1.9 + +require ( + github.com/Shopify/sarama v1.19.0 + github.com/Shopify/toxiproxy v2.1.4+incompatible // indirect + github.com/eapache/go-resiliency v1.2.0 // indirect + github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect + github.com/eapache/queue v1.1.0 // indirect + github.com/frankban/quicktest v1.8.1 // indirect + github.com/golang/snappy v0.0.1 // indirect + github.com/instana/go-sensor v1.10.0 + github.com/opentracing/opentracing-go v1.1.0 + github.com/pierrec/lz4 v2.4.1+incompatible // indirect + github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 // indirect + github.com/stretchr/testify v1.4.0 +) diff --git a/instrumentation/instasarama/go.sum b/instrumentation/instasarama/go.sum new file mode 100644 index 000000000..c4e4520c9 --- /dev/null +++ b/instrumentation/instasarama/go.sum @@ -0,0 +1,55 @@ +github.com/Shopify/sarama v1.19.0 h1:9oksLxC6uxVPHPVYUmq6xhr1BOF/hHobWH2UzO67z1s= +github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= +github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc= +github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eapache/go-resiliency v1.2.0 h1:v7g92e/KSN71Rq7vSThKaWIq68fL4YHvWyiUKorFR1Q= +github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/frankban/quicktest v1.8.1 h1:PvpJR0Uq8SdX+zagCMsarBMlhz6ysGTf1+pRmCsRXqY= +github.com/frankban/quicktest v1.8.1/go.mod h1:ui7WezCLWMWxVWr1GETZY3smRy0G4KWq9vcPtJmFl7Y= +github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= +github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/instana/go-sensor v1.10.0 h1:w8tnrsO5/M7Tt3hYO/pECA3kkk0Zqmsd4U4MuZboPJ0= +github.com/instana/go-sensor v1.10.0/go.mod h1:lDfZvfAyo5DWJ2AvOHINRTUTG5TMdZNnwXXcFRtfZBE= +github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs= +github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/looplab/fsm v0.1.0 h1:Qte7Zdn/5hBNbXzP7yxVU4OIFHWXBovyTT2LaBTyC20= +github.com/looplab/fsm v0.1.0/go.mod h1:m2VaOfDHxqXBBMgc26m6yUOwkFn8H2AlJDE+jd/uafI= +github.com/opentracing/basictracer-go v1.0.0 h1:YyUAhaEfjoWXclZVJ9sGoNct7j4TVk7lZWlQw5UXuoo= +github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74= +github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU= +github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/pierrec/lz4 v2.4.1+incompatible h1:mFe7ttWaflA46Mhqh+jUfjp2qTbPYxLB2/OyBppH9dg= +github.com/pierrec/lz4 v2.4.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 h1:MkV+77GLUNo5oJ0jf870itWm3D0Sjh7+Za9gazKc5LQ= +github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/instrumentation/instasarama/instasarama.go b/instrumentation/instasarama/instasarama.go new file mode 100644 index 000000000..551cd6777 --- /dev/null +++ b/instrumentation/instasarama/instasarama.go @@ -0,0 +1,3 @@ +// Package instasarama provides Instana tracing instrumentation for +// Kafka producers and consumers build on top of github.com/Shopify/sarama. +package instasarama diff --git a/instrumentation/instasarama/instasarama_test.go b/instrumentation/instasarama/instasarama_test.go new file mode 100644 index 000000000..7b4f760c3 --- /dev/null +++ b/instrumentation/instasarama/instasarama_test.go @@ -0,0 +1,51 @@ +// +build go1.9 + +package instasarama_test + +import ( + "encoding/json" + "fmt" +) + +type agentSpan struct { + TraceID int64 `json:"t"` + ParentID int64 `json:"p,omitempty"` + SpanID int64 `json:"s"` + Timestamp uint64 `json:"ts"` + Duration uint64 `json:"d"` + Name string `json:"n"` + From struct { + PID string `json:"e"` + HostID string `json:"h"` + } `json:"f"` + Batch struct { + Size int `json:"s"` + } `json:"b"` + Kind int `json:"k"` + Ec int `json:"ec,omitempty"` + Data struct { + Service string `json:"service"` + Kafka agentKafkaSpanData `json:"kafka"` + } `json:"data"` +} + +type agentKafkaSpanData struct { + Service string `json:"service"` + Access string `json:"access"` +} + +// unmarshalAgentSpan is a helper function that copies span data values +// into an agentSpan to not to depend on the implementation of instana.Recorder +func extractAgentSpan(span interface{}) (agentSpan, error) { + d, err := json.Marshal(span) + if err != nil { + return agentSpan{}, fmt.Errorf("failed to marshal agent span data: %s", err) + } + + var data agentSpan + if err := json.Unmarshal(d, &data); err != nil { + return agentSpan{}, fmt.Errorf("failed to unmarshal agent span data: %s", err) + } + + return data, nil +} diff --git a/instrumentation/instasarama/partition_consumer.go b/instrumentation/instasarama/partition_consumer.go new file mode 100644 index 000000000..ac8de17aa --- /dev/null +++ b/instrumentation/instasarama/partition_consumer.go @@ -0,0 +1,64 @@ +// +build go1.9 + +package instasarama + +import ( + "github.com/Shopify/sarama" + instana "github.com/instana/go-sensor" + ot "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" +) + +// PartitionConsumer is a wrapper for sarama.PartitionConsumer that instruments its calls using +// provided instana.Sensor +type PartitionConsumer struct { + sarama.PartitionConsumer + sensor *instana.Sensor + messages chan *sarama.ConsumerMessage +} + +// WrapPartitionConsumer wraps sarama.PartitionConsumer instance and instruments its calls +func WrapPartitionConsumer(c sarama.PartitionConsumer, sensor *instana.Sensor) *PartitionConsumer { + pc := &PartitionConsumer{ + PartitionConsumer: c, + sensor: sensor, + messages: make(chan *sarama.ConsumerMessage), + } + + go pc.consumeMessages() + + return pc +} + +// Messages returns a channel of consumer messages of the underlying partition consumer +func (pc *PartitionConsumer) Messages() <-chan *sarama.ConsumerMessage { + return pc.messages +} + +func (pc *PartitionConsumer) consumeMessages() { + for msg := range pc.PartitionConsumer.Messages() { + pc.consumeMessage(msg) + } + close(pc.messages) +} + +func (pc *PartitionConsumer) consumeMessage(msg *sarama.ConsumerMessage) { + opts := []ot.StartSpanOption{ + ext.SpanKindConsumer, + ot.Tags{ + "kafka.service": msg.Topic, + "kafka.access": "consume", + }, + } + if spanContext, ok := SpanContextFromConsumerMessage(msg, pc.sensor); ok { + opts = append(opts, ot.ChildOf(spanContext)) + } + + sp := pc.sensor.Tracer().StartSpan("kafka", opts...) + defer sp.Finish() + + // inject consumer span context, so that it becomes a parent for subcalls + pc.sensor.Tracer().Inject(sp.Context(), ot.TextMap, ConsumerMessageCarrier{msg}) + + pc.messages <- msg +} diff --git a/instrumentation/instasarama/partition_consumer_test.go b/instrumentation/instasarama/partition_consumer_test.go new file mode 100644 index 000000000..222e41ae7 --- /dev/null +++ b/instrumentation/instasarama/partition_consumer_test.go @@ -0,0 +1,205 @@ +// +build go1.9 + +package instasarama_test + +import ( + "errors" + "testing" + "time" + + "github.com/Shopify/sarama" + instana "github.com/instana/go-sensor" + "github.com/instana/go-sensor/instrumentation/instasarama" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPartitionConsumer_Messages(t *testing.T) { + recorder := instana.NewTestRecorder() + sensor := instana.NewSensorWithTracer(instana.NewTracerWithEverything(&instana.Options{}, recorder)) + + messages := []*sarama.ConsumerMessage{ + { + Topic: "instrumented-producer", + Headers: []*sarama.RecordHeader{ + { + Key: []byte("x_instana_c"), + Value: []byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x0a, 0xbc, 0xde, 0x12, + // span id + 0x00, 0x00, 0x00, 0x00, 0xde, 0xad, 0xbe, 0xef, + }, + }, + {Key: []byte("x_instana_l"), Value: []byte{0x01}}, + }, + }, + {Topic: "not-instrumented-producer"}, + } + + pc := &testPartitionConsumer{ + messages: make(chan *sarama.ConsumerMessage, len(messages)), + } + for _, msg := range messages { + pc.messages <- msg + } + close(pc.messages) + + wrapped := instasarama.WrapPartitionConsumer(pc, sensor) + + var collected []*sarama.ConsumerMessage + timeout := time.After(1 * time.Second) + +CONSUMER_LOOP: + for { + select { + case msg, ok := <-wrapped.Messages(): + if !ok { + break CONSUMER_LOOP + } + collected = append(collected, msg) + case <-timeout: + t.Fatalf("consuming (*instasarama.PartitionConsumer).Messages() timed out") + } + } + + _, open := <-wrapped.Messages() + assert.False(t, open) + require.Len(t, collected, len(messages)) + + spans := recorder.GetQueuedSpans() + require.Len(t, spans, len(collected)) + + t.Run("message with trace context", func(t *testing.T) { + msg := collected[0] + assert.Equal(t, "instrumented-producer", msg.Topic) + + span, err := extractAgentSpan(spans[0]) + require.NoError(t, err) + + assert.EqualValues(t, 0x0abcde12, span.TraceID) + assert.EqualValues(t, 0xdeadbeef, span.ParentID) + + assert.Contains(t, msg.Headers, &sarama.RecordHeader{ + Key: []byte("x_instana_c"), + Value: instasarama.PackTraceContextHeader(instana.FormatID(span.TraceID), instana.FormatID(span.SpanID)), + }) + assert.Contains(t, msg.Headers, &sarama.RecordHeader{ + Key: []byte("x_instana_l"), + Value: []byte{0x01}, + }) + }) + + t.Run("message without trace context", func(t *testing.T) { + msg := collected[1] + assert.Equal(t, "not-instrumented-producer", msg.Topic) + + span, err := extractAgentSpan(spans[1]) + require.NoError(t, err) + + assert.NotEmpty(t, span.TraceID) + assert.Empty(t, span.ParentID) + assert.EqualValues(t, span.TraceID, span.SpanID) + + assert.ElementsMatch(t, msg.Headers, []*sarama.RecordHeader{ + { + Key: []byte("X_INSTANA_C"), + Value: instasarama.PackTraceContextHeader(instana.FormatID(span.TraceID), instana.FormatID(span.SpanID)), + }, + { + Key: []byte("X_INSTANA_L"), + Value: []byte{0x01}, + }, + }) + }) +} + +func TestPartitionConsumer_AsyncClose(t *testing.T) { + recorder := instana.NewTestRecorder() + sensor := instana.NewSensorWithTracer(instana.NewTracerWithEverything(&instana.Options{}, recorder)) + + pc := &testPartitionConsumer{} + + wrapped := instasarama.WrapPartitionConsumer(pc, sensor) + wrapped.AsyncClose() + + assert.True(t, pc.Closed) + assert.True(t, pc.Async) +} + +func TestPartitionConsumer_Close(t *testing.T) { + recorder := instana.NewTestRecorder() + sensor := instana.NewSensorWithTracer(instana.NewTracerWithEverything(&instana.Options{}, recorder)) + + pc := &testPartitionConsumer{} + + wrapped := instasarama.WrapPartitionConsumer(pc, sensor) + require.NoError(t, wrapped.Close()) + + assert.True(t, pc.Closed) + assert.False(t, pc.Async) +} + +func TestPartitionConsumer_Close_Error(t *testing.T) { + recorder := instana.NewTestRecorder() + sensor := instana.NewSensorWithTracer(instana.NewTracerWithEverything(&instana.Options{}, recorder)) + + pc := &testPartitionConsumer{ + Error: errors.New("something went wrong"), + } + + wrapped := instasarama.WrapPartitionConsumer(pc, sensor) + assert.Error(t, wrapped.Close()) +} + +func TestPartitionConsumer_HighWaterMarkOffset(t *testing.T) { + recorder := instana.NewTestRecorder() + sensor := instana.NewSensorWithTracer(instana.NewTracerWithEverything(&instana.Options{}, recorder)) + + pc := &testPartitionConsumer{ + Offset: 42, + } + + wrapped := instasarama.WrapPartitionConsumer(pc, sensor) + assert.Equal(t, pc.Offset, wrapped.HighWaterMarkOffset()) +} + +type testPartitionConsumer struct { + messages chan *sarama.ConsumerMessage + errors chan *sarama.ConsumerError + + Offset int64 + Error error + Closed bool + Async bool +} + +// AsyncClose closes the underlying partition consumer asynchronously +func (pc *testPartitionConsumer) AsyncClose() { + pc.Closed = true + pc.Async = true +} + +// Close closes the underlying partition consumer +func (pc *testPartitionConsumer) Close() error { + pc.Closed = true + pc.Async = false + + return pc.Error +} + +// Messages returns a channel of consumer messages of the underlying partition consumer +func (pc *testPartitionConsumer) Messages() <-chan *sarama.ConsumerMessage { + return pc.messages +} + +// Errors returns a channel of consumer errors of the underlying partition consumer +func (pc *testPartitionConsumer) Errors() <-chan *sarama.ConsumerError { + return pc.errors +} + +// HighWaterMarkOffset returns the high water mark offset of the underlying partition consumer +func (pc *testPartitionConsumer) HighWaterMarkOffset() int64 { + return pc.Offset +} diff --git a/instrumentation/instasarama/propagation.go b/instrumentation/instasarama/propagation.go new file mode 100644 index 000000000..a217fd68c --- /dev/null +++ b/instrumentation/instasarama/propagation.go @@ -0,0 +1,288 @@ +// +build go1.9 + +package instasarama + +import ( + "bytes" + "fmt" + "strings" + + "github.com/Shopify/sarama" + instana "github.com/instana/go-sensor" + ot "github.com/opentracing/opentracing-go" +) + +const ( + // The trace context header key + FieldC = "X_INSTANA_C" + // The trace level header key + FieldL = "X_INSTANA_L" +) + +var ( + fieldCKey = []byte(FieldC) + fieldLKey = []byte(FieldL) +) + +// ProducerMessageWithSpan injects the tracing context into producer message headers to propagate +// them through the Kafka requests made with instasarama producers. +func ProducerMessageWithSpan(pm *sarama.ProducerMessage, sp ot.Span) *sarama.ProducerMessage { + sp.Tracer().Inject(sp.Context(), ot.TextMap, ProducerMessageCarrier{Message: pm}) + return pm +} + +// ProducerMessageCarrier is a trace context carrier that propagates Instana OpenTracing +// headers throughout Kafka producer messages +type ProducerMessageCarrier struct { + Message *sarama.ProducerMessage +} + +// Set implements opentracing.TextMapWriter for ProducerMessageCarrier +func (c ProducerMessageCarrier) Set(key, val string) { + switch strings.ToLower(key) { + case instana.FieldT: + if len(val) > 32 { + return // ignore hex-encoded trace IDs longer than 128 bit + } + + traceContext := PackTraceContextHeader(val, "") + if i, ok := c.indexOf(fieldCKey); ok { + // preserve the trace ID if the trace context header already present + existingC := c.Message.Headers[i].Value + if len(existingC) >= 16 { + copy(traceContext[16:], existingC[16:]) + } + } + + c.addOrReplaceHeader(fieldCKey, traceContext) + case instana.FieldS: + if len(val) > 16 { + return // ignore hex-encoded span IDs longer than 64 bit + } + + traceContext := PackTraceContextHeader("", val) + if i, ok := c.indexOf(fieldCKey); ok { + // preserve the span ID if the trace context header already present + existingC := c.Message.Headers[i].Value + if len(existingC) >= 16 { + copy(traceContext[:16], existingC[:16]) + } + } + + c.addOrReplaceHeader(fieldCKey, traceContext) + case instana.FieldL: + c.addOrReplaceHeader(fieldLKey, PackTraceLevelHeader(val)) + } +} + +// RemoveAll removes all tracing headers previously set by Set() +func (c ProducerMessageCarrier) RemoveAll() { + var ln int + for _, header := range c.Message.Headers { + if bytes.EqualFold(header.Key, fieldCKey) || bytes.EqualFold(header.Key, fieldLKey) { + continue + } + + c.Message.Headers[ln] = header + ln++ + } + + c.Message.Headers = c.Message.Headers[:ln] +} + +// ForeachKey implements opentracing.TextMapReader for ProducerMessageCarrier +func (c ProducerMessageCarrier) ForeachKey(handler func(key, val string) error) error { + for _, header := range c.Message.Headers { + switch { + case bytes.EqualFold(header.Key, fieldCKey): + traceID, spanID, err := UnpackTraceContextHeader(header.Value) + if err != nil { + return fmt.Errorf("malformed %q header: %s", header.Key, err) + } + + if err := handler(instana.FieldT, string(traceID)); err != nil { + return err + } + + if err := handler(instana.FieldS, string(spanID)); err != nil { + return err + } + case bytes.EqualFold(header.Key, fieldLKey): + val, err := UnpackTraceLevelHeader(header.Value) + if err != nil { + return fmt.Errorf("malformed %q header: %s", header.Key, err) + } + + if err := handler(instana.FieldL, val); err != nil { + return err + } + } + } + return nil +} + +func (c ProducerMessageCarrier) addOrReplaceHeader(key, val []byte) { + if i, ok := c.indexOf(key); ok { + c.Message.Headers[i].Value = val + return + } + + c.Message.Headers = append(c.Message.Headers, sarama.RecordHeader{Key: key, Value: val}) +} + +func (c ProducerMessageCarrier) indexOf(key []byte) (int, bool) { + for i, header := range c.Message.Headers { + if bytes.EqualFold(key, header.Key) { + return i, true + } + } + + return -1, false +} + +// SpanContextFromConsumerMessage extracts the tracing context from consumer message +func SpanContextFromConsumerMessage(cm *sarama.ConsumerMessage, sensor *instana.Sensor) (ot.SpanContext, bool) { + spanContext, err := sensor.Tracer().Extract(ot.TextMap, ConsumerMessageCarrier{Message: cm}) + if err != nil { + return nil, false + } + + return spanContext, true +} + +// ConsumerMessageCarrier is a trace context carrier that extracts Instana OpenTracing +// headers from Kafka consumer messages +type ConsumerMessageCarrier struct { + Message *sarama.ConsumerMessage +} + +// Set implements opentracing.TextMapWriter for ConsumerMessageCarrier +func (c ConsumerMessageCarrier) Set(key, val string) { + switch strings.ToLower(key) { + case instana.FieldT: + if len(val) > 32 { + return // ignore hex-encoded trace IDs longer than 128 bit + } + + traceContext := PackTraceContextHeader(val, "") + if i, ok := c.indexOf(fieldCKey); ok { + // preserve the trace ID if the trace context header already present + existingC := c.Message.Headers[i].Value + if len(existingC) >= 16 { + copy(traceContext[16:], existingC[16:]) + } + } + + c.addOrReplaceHeader(fieldCKey, traceContext) + case instana.FieldS: + if len(val) > 16 { + return // ignore hex-encoded span IDs longer than 64 bit + } + + traceContext := PackTraceContextHeader("", val) + if i, ok := c.indexOf(fieldCKey); ok { + // preserve the span ID if the trace context header already present + existingC := c.Message.Headers[i].Value + if len(existingC) >= 16 { + copy(traceContext[:16], existingC[:16]) + } + } + + c.addOrReplaceHeader(fieldCKey, traceContext) + case instana.FieldL: + c.addOrReplaceHeader(fieldLKey, PackTraceLevelHeader(val)) + } +} + +// RemoveAll removes all tracing headers previously set by Set() +func (c ConsumerMessageCarrier) RemoveAll() { + var ln int + for _, header := range c.Message.Headers { + if header != nil && (bytes.EqualFold(header.Key, fieldCKey) || bytes.EqualFold(header.Key, fieldLKey)) { + continue + } + + c.Message.Headers[ln] = header + ln++ + } + + c.Message.Headers = c.Message.Headers[:ln] +} + +// ForeachKey implements opentracing.TextMapReader for ConsumerMessageCarrier +func (c ConsumerMessageCarrier) ForeachKey(handler func(key, val string) error) error { + for _, header := range c.Message.Headers { + if header == nil { + continue + } + + switch { + case bytes.EqualFold(header.Key, fieldCKey): + traceID, spanID, err := UnpackTraceContextHeader(header.Value) + if err != nil { + return fmt.Errorf("malformed %q header: %s", header.Key, err) + } + + if err := handler(instana.FieldT, string(traceID)); err != nil { + return err + } + + if err := handler(instana.FieldS, string(spanID)); err != nil { + return err + } + case bytes.EqualFold(header.Key, fieldLKey): + val, err := UnpackTraceLevelHeader(header.Value) + if err != nil { + return fmt.Errorf("malformed %q header: %s", header.Key, err) + } + + if err := handler(instana.FieldL, val); err != nil { + return err + } + } + } + return nil +} + +func (c ConsumerMessageCarrier) addOrReplaceHeader(key, val []byte) { + if i, ok := c.indexOf(key); ok { + c.Message.Headers[i].Value = val + return + } + + c.Message.Headers = append(c.Message.Headers, &sarama.RecordHeader{Key: key, Value: val}) +} + +func (c ConsumerMessageCarrier) indexOf(key []byte) (int, bool) { + for i, header := range c.Message.Headers { + if header == nil { + continue + } + + if bytes.EqualFold(key, header.Key) { + return i, true + } + } + + return -1, false +} + +func contextPropagationSupported(ver sarama.KafkaVersion) bool { + return ver.IsAtLeast(sarama.V0_11_0_0) +} + +func extractTraceSpanID(msg *sarama.ProducerMessage) (string, string, error) { + var traceID, spanID string + err := ProducerMessageCarrier{msg}.ForeachKey(func(k, v string) error { + switch k { + case instana.FieldT: + traceID = v + case instana.FieldS: + spanID = v + } + + return nil + }) + + return traceID, spanID, err +} diff --git a/instrumentation/instasarama/propagation_test.go b/instrumentation/instasarama/propagation_test.go new file mode 100644 index 000000000..a7e2a7ff4 --- /dev/null +++ b/instrumentation/instasarama/propagation_test.go @@ -0,0 +1,929 @@ +// +build go1.9 + +package instasarama_test + +import ( + "errors" + "testing" + + "github.com/Shopify/sarama" + instana "github.com/instana/go-sensor" + "github.com/instana/go-sensor/instrumentation/instasarama" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestProducerMessageWithSpan(t *testing.T) { + recorder := instana.NewTestRecorder() + tracer := instana.NewTracerWithEverything(&instana.Options{}, recorder) + + sp := tracer.StartSpan("test-span") + pm := instasarama.ProducerMessageWithSpan(&sarama.ProducerMessage{ + Topic: "test-topic", + Key: sarama.StringEncoder("key1"), + Value: sarama.StringEncoder("value1"), + Headers: []sarama.RecordHeader{ + {Key: []byte("headerKey1"), Value: []byte("headerValue1")}, + }, + }, sp) + sp.Finish() + + spans := recorder.GetQueuedSpans() + require.Len(t, spans, 1) + + expected := []sarama.RecordHeader{ + {Key: []byte("headerKey1"), Value: []byte("headerValue1")}, + {Key: []byte(instasarama.FieldL), Value: []byte{0x01}}, + { + Key: []byte(instasarama.FieldC), + Value: instasarama.PackTraceContextHeader( + instana.FormatID(spans[0].TraceID), + instana.FormatID(spans[0].SpanID), + ), + }, + } + + assert.ElementsMatch(t, expected, pm.Headers) +} + +func TestProducerMessageCarrier_Set_FieldT(t *testing.T) { + var msg sarama.ProducerMessage + c := instasarama.ProducerMessageCarrier{&msg} + + c.Set(instana.FieldT, "deadbeefdeadbeef") + assert.Equal(t, []sarama.RecordHeader{ + { + Key: []byte(instasarama.FieldC), + Value: []byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0xde, 0xad, 0xbe, 0xef, 0xde, 0xad, 0xbe, 0xef, + // spanid + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + }, + }, + }, msg.Headers) +} + +func TestProducerMessageCarrier_Update_FieldT(t *testing.T) { + examples := map[string]struct { + Value string + Headers []sarama.RecordHeader + Expected []sarama.RecordHeader + }{ + "existing has trace id only": { + Value: "deadbeef", + Headers: []sarama.RecordHeader{ + {Key: []byte("X_CUSTOM_1"), Value: []byte("value1")}, + { + Key: []byte("x_instana_c"), + Value: []byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0xab, 0xcd, 0xef, 0x12, 0xab, 0xcd, 0xef, 0x12, + // span id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + }, + }, + {Key: []byte("X_CUSTOM_2"), Value: []byte("value2")}, + }, + Expected: []sarama.RecordHeader{ + {Key: []byte("X_CUSTOM_1"), Value: []byte("value1")}, + { + Key: []byte("x_instana_c"), + Value: []byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0xde, 0xad, 0xbe, 0xef, + // span id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + }, + }, + {Key: []byte("X_CUSTOM_2"), Value: []byte("value2")}, + }, + }, + "existing has span id only": { + Value: "deadbeef", + Headers: []sarama.RecordHeader{ + {Key: []byte("X_CUSTOM_1"), Value: []byte("value1")}, + { + Key: []byte("x_instana_c"), + Value: []byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + // span id + 0x00, 0x00, 0x00, 0x00, 0xab, 0xcd, 0xef, 0x12, + }, + }, + {Key: []byte("X_CUSTOM_2"), Value: []byte("value2")}, + }, + Expected: []sarama.RecordHeader{ + {Key: []byte("X_CUSTOM_1"), Value: []byte("value1")}, + { + Key: []byte("x_instana_c"), + Value: []byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0xde, 0xad, 0xbe, 0xef, + // span id + 0x00, 0x00, 0x00, 0x00, 0xab, 0xcd, 0xef, 0x12, + }, + }, + {Key: []byte("X_CUSTOM_2"), Value: []byte("value2")}, + }, + }, + "existing has trace and span id": { + Value: "deadbeef", + Headers: []sarama.RecordHeader{ + {Key: []byte("X_CUSTOM_1"), Value: []byte("value1")}, + { + Key: []byte("x_instana_c"), + Value: []byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0xab, 0xcd, 0xef, 0x12, 0xab, 0xcd, 0xef, 0x12, + // span id + 0x00, 0x00, 0x00, 0xab, 0xcd, 0xef, 0x12, 0x34, + }, + }, + {Key: []byte("X_CUSTOM_2"), Value: []byte("value2")}, + }, + Expected: []sarama.RecordHeader{ + {Key: []byte("X_CUSTOM_1"), Value: []byte("value1")}, + { + Key: []byte("x_instana_c"), + Value: []byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0xde, 0xad, 0xbe, 0xef, + // span id + 0x00, 0x00, 0x00, 0xab, 0xcd, 0xef, 0x12, 0x34, + }, + }, + {Key: []byte("X_CUSTOM_2"), Value: []byte("value2")}, + }, + }, + } + + for name, example := range examples { + t.Run(name, func(t *testing.T) { + msg := sarama.ProducerMessage{Headers: example.Headers} + c := instasarama.ProducerMessageCarrier{&msg} + + c.Set(instana.FieldT, example.Value) + assert.ElementsMatch(t, example.Expected, msg.Headers) + }) + } +} + +func TestProducerMessageCarrier_Set_FieldS(t *testing.T) { + var msg sarama.ProducerMessage + c := instasarama.ProducerMessageCarrier{&msg} + + c.Set(instana.FieldS, "deadbeef") + assert.Equal(t, []sarama.RecordHeader{ + { + Key: []byte(instasarama.FieldC), + Value: []byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + // span id + 0x00, 0x00, 0x00, 0x00, 0xde, 0xad, 0xbe, 0xef, + }, + }, + }, msg.Headers) +} + +func TestProducerMessageCarrier_Update_FieldS(t *testing.T) { + examples := map[string]struct { + Value string + Headers []sarama.RecordHeader + Expected []sarama.RecordHeader + }{ + "existing has trace id only": { + Value: "deadbeef", + Headers: []sarama.RecordHeader{ + {Key: []byte("X_CUSTOM_1"), Value: []byte("value1")}, + { + Key: []byte("x_instana_c"), + Value: []byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0xab, 0xcd, 0xef, 0x12, + // span id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + }, + }, + {Key: []byte("X_CUSTOM_2"), Value: []byte("value2")}, + }, + Expected: []sarama.RecordHeader{ + {Key: []byte("X_CUSTOM_1"), Value: []byte("value1")}, + { + Key: []byte("x_instana_c"), + Value: []byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0xab, 0xcd, 0xef, 0x12, + // span id + 0x00, 0x00, 0x00, 0x00, 0xde, 0xad, 0xbe, 0xef, + }, + }, + {Key: []byte("X_CUSTOM_2"), Value: []byte("value2")}, + }, + }, + "existing has span id only": { + Value: "deadbeef", + Headers: []sarama.RecordHeader{ + {Key: []byte("X_CUSTOM_1"), Value: []byte("value1")}, + { + Key: []byte("x_instana_c"), + Value: []byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + // span id + 0x00, 0x00, 0x00, 0x00, 0xab, 0xcd, 0xef, 0x12, + }, + }, + {Key: []byte("X_CUSTOM_2"), Value: []byte("value2")}, + }, + Expected: []sarama.RecordHeader{ + {Key: []byte("X_CUSTOM_1"), Value: []byte("value1")}, + { + Key: []byte("x_instana_c"), + Value: []byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + // span id + 0x00, 0x00, 0x00, 0x00, 0xde, 0xad, 0xbe, 0xef, + }, + }, + {Key: []byte("X_CUSTOM_2"), Value: []byte("value2")}, + }, + }, + "existing has trace and span id": { + Value: "deadbeef", + Headers: []sarama.RecordHeader{ + {Key: []byte("X_CUSTOM_1"), Value: []byte("value1")}, + { + Key: []byte("x_instana_c"), + Value: []byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0xab, 0xcd, 0xef, 0x12, + // span id + 0x00, 0x00, 0x00, 0x00, 0xab, 0xcd, 0xef, 0x12, + }, + }, + {Key: []byte("X_CUSTOM_2"), Value: []byte("value2")}, + }, + Expected: []sarama.RecordHeader{ + {Key: []byte("X_CUSTOM_1"), Value: []byte("value1")}, + { + Key: []byte("x_instana_c"), + Value: []byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0xab, 0xcd, 0xef, 0x12, + // span id + 0x00, 0x00, 0x00, 0x00, 0xde, 0xad, 0xbe, 0xef, + }, + }, + {Key: []byte("X_CUSTOM_2"), Value: []byte("value2")}, + }, + }, + } + + for name, example := range examples { + t.Run(name, func(t *testing.T) { + msg := sarama.ProducerMessage{Headers: example.Headers} + c := instasarama.ProducerMessageCarrier{&msg} + + c.Set(instana.FieldS, example.Value) + assert.ElementsMatch(t, example.Expected, msg.Headers) + }) + } +} + +func TestProducerMessageCarrier_Set_FieldL(t *testing.T) { + examples := map[string][]sarama.RecordHeader{ + "0": []sarama.RecordHeader{ + {Key: []byte(instasarama.FieldL), Value: []byte{0x00}}, + }, + "1": []sarama.RecordHeader{ + {Key: []byte(instasarama.FieldL), Value: []byte{0x01}}, + }, + } + + for value, expected := range examples { + t.Run(value, func(t *testing.T) { + msg := sarama.ProducerMessage{Headers: expected} + c := instasarama.ProducerMessageCarrier{&msg} + + c.Set(instana.FieldL, value) + assert.Equal(t, expected, msg.Headers) + }) + } +} + +func TestProducerMessageCarrier_Update_FieldL(t *testing.T) { + msg := sarama.ProducerMessage{ + Headers: []sarama.RecordHeader{ + {Key: []byte("X_CUSTOM_1"), Value: []byte("value1")}, + {Key: []byte("x_instana_l"), Value: []byte{0x00}}, + {Key: []byte("X_CUSTOM_2"), Value: []byte("value2")}, + }, + } + c := instasarama.ProducerMessageCarrier{&msg} + + c.Set(instana.FieldL, "1") + assert.ElementsMatch(t, []sarama.RecordHeader{ + {Key: []byte("X_CUSTOM_1"), Value: []byte("value1")}, + {Key: []byte("x_instana_l"), Value: []byte{0x01}}, + {Key: []byte("X_CUSTOM_2"), Value: []byte("value2")}, + }, msg.Headers) +} + +func TestProducerMessageCarrier_RemoveAll(t *testing.T) { + msg := sarama.ProducerMessage{ + Headers: []sarama.RecordHeader{ + {Key: []byte("X_CUSTOM_1"), Value: []byte("value1")}, + { + Key: []byte("x_instana_c"), + Value: []byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0xab, 0xcd, 0xef, 0x12, + // span id + 0x00, 0x00, 0x00, 0x00, 0xde, 0xad, 0xbe, 0xef, + }, + }, + {Key: []byte("X_CUSTOM_2"), Value: []byte("value2")}, + {Key: []byte("x_INSTANA_L"), Value: []byte{0x01}}, + }, + } + + c := instasarama.ProducerMessageCarrier{&msg} + c.RemoveAll() + + assert.ElementsMatch(t, []sarama.RecordHeader{ + {Key: []byte("X_CUSTOM_1"), Value: []byte("value1")}, + {Key: []byte("X_CUSTOM_2"), Value: []byte("value2")}, + }, msg.Headers) +} + +func TestProducerMessageCarrier_ForeachKey(t *testing.T) { + msg := sarama.ProducerMessage{ + Headers: []sarama.RecordHeader{ + {Key: []byte("X_CUSTOM_1"), Value: []byte("value1")}, + { + Key: []byte("x_instana_c"), + Value: []byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0xab, 0xcd, 0xef, 0x12, + // span id + 0x00, 0x00, 0x00, 0x00, 0xde, 0xad, 0xbe, 0xef, + }, + }, + {Key: []byte("X_CUSTOM_2"), Value: []byte("value2")}, + {Key: []byte("x_INSTANA_L"), Value: []byte{0x01}}, + }, + } + c := instasarama.ProducerMessageCarrier{&msg} + + var collected []struct{ Key, Value string } + require.NoError(t, c.ForeachKey(func(k, v string) error { + collected = append(collected, struct{ Key, Value string }{k, v}) + return nil + })) + + assert.ElementsMatch(t, []struct{ Key, Value string }{ + {Key: instana.FieldT, Value: "abcdef12"}, + {Key: instana.FieldS, Value: "deadbeef"}, + {Key: instana.FieldL, Value: "1"}, + }, collected) +} + +func TestProducerMessageCarrier_ForeachKey_NoTracingHeaders(t *testing.T) { + msg := sarama.ProducerMessage{ + Headers: []sarama.RecordHeader{ + {Key: []byte("X_CUSTOM_1"), Value: []byte("value1")}, + }, + } + c := instasarama.ProducerMessageCarrier{&msg} + + var collected []struct{ Key, Value string } + require.NoError(t, c.ForeachKey(func(k, v string) error { + collected = append(collected, struct{ Key, Value string }{k, v}) + return nil + })) + + assert.Empty(t, collected) +} + +func TestProducerMessageCarrier_ForeachKey_Error(t *testing.T) { + msg := sarama.ProducerMessage{ + Headers: []sarama.RecordHeader{ + { + Key: []byte("x_instana_c"), + Value: []byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0xab, 0xcd, 0xef, 0x12, + // span id + 0x00, 0x00, 0x00, 0x00, 0xde, 0xad, 0xbe, 0xef, + }, + }, + {Key: []byte("x_INSTANA_L"), Value: []byte{0x01}}, + }, + } + c := instasarama.ProducerMessageCarrier{&msg} + + assert.Error(t, c.ForeachKey(func(k, v string) error { + return errors.New("something went wrong") + })) +} + +func TestSpanContextFromConsumerMessage(t *testing.T) { + sensor := instana.NewSensorWithTracer( + instana.NewTracerWithEverything(&instana.Options{}, instana.NewTestRecorder()), + ) + + msg := &sarama.ConsumerMessage{ + Headers: []*sarama.RecordHeader{ + { + Key: []byte("x_instana_c"), + Value: []byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0xab, 0xcd, 0xef, 0x12, + // span id + 0x00, 0x00, 0x00, 0x00, 0xde, 0xad, 0xbe, 0xef, + }, + }, + {Key: []byte("x_instana_l"), Value: []byte{0x01}}, + }, + } + + spanContext, ok := instasarama.SpanContextFromConsumerMessage(msg, sensor) + require.True(t, ok) + assert.Equal(t, instana.SpanContext{ + TraceID: 0xabcdef12, + SpanID: 0xdeadbeef, + Baggage: make(map[string]string), + }, spanContext) +} + +func TestSpanContextFromConsumerMessage_NoContext(t *testing.T) { + examples := map[string][]*sarama.RecordHeader{ + "no tracing headers": { + {Key: []byte("key1"), Value: []byte("value1")}, + nil, + }, + "malformed tracing headers": { + {Key: []byte("x_instana_c"), Value: []byte("malformed")}, + {Key: []byte("x_instana_l"), Value: []byte{0x00}}, + }, + "incomplete trace headers": { + { + Key: []byte("x_instana_c"), + Value: []byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0xab, 0xcd, 0xef, 0x12, + // empty span id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + }, + }, + {Key: []byte("x_instana_l"), Value: []byte{0x01}}, + }, + } + + for name, headers := range examples { + t.Run(name, func(t *testing.T) { + sensor := instana.NewSensorWithTracer( + instana.NewTracerWithEverything(&instana.Options{}, instana.NewTestRecorder()), + ) + + msg := &sarama.ConsumerMessage{Headers: headers} + + _, ok := instasarama.SpanContextFromConsumerMessage(msg, sensor) + assert.False(t, ok) + }) + } +} + +func TestConsumerMessageCarrier_Set_FieldT(t *testing.T) { + var msg sarama.ConsumerMessage + c := instasarama.ConsumerMessageCarrier{&msg} + + c.Set(instana.FieldT, "deadbeefdeadbeef") + assert.Equal(t, []*sarama.RecordHeader{ + { + Key: []byte(instasarama.FieldC), + Value: []byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0xde, 0xad, 0xbe, 0xef, 0xde, 0xad, 0xbe, 0xef, + // spanid + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + }, + }, + }, msg.Headers) +} + +func TestConsumerMessageCarrier_Update_FieldT(t *testing.T) { + examples := map[string]struct { + Value string + Headers []*sarama.RecordHeader + Expected []*sarama.RecordHeader + }{ + "existing has trace id only": { + Value: "deadbeef", + Headers: []*sarama.RecordHeader{ + {Key: []byte("X_CUSTOM_1"), Value: []byte("value1")}, + { + Key: []byte("x_instana_c"), + Value: []byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0xab, 0xcd, 0xef, 0x12, 0xab, 0xcd, 0xef, 0x12, + // span id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + }, + }, + nil, + {Key: []byte("X_CUSTOM_2"), Value: []byte("value2")}, + }, + Expected: []*sarama.RecordHeader{ + {Key: []byte("X_CUSTOM_1"), Value: []byte("value1")}, + { + Key: []byte("x_instana_c"), + Value: []byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0xde, 0xad, 0xbe, 0xef, + // span id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + }, + }, + nil, + {Key: []byte("X_CUSTOM_2"), Value: []byte("value2")}, + }, + }, + "existing has span id only": { + Value: "deadbeef", + Headers: []*sarama.RecordHeader{ + {Key: []byte("X_CUSTOM_1"), Value: []byte("value1")}, + { + Key: []byte("x_instana_c"), + Value: []byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + // span id + 0x00, 0x00, 0x00, 0x00, 0xab, 0xcd, 0xef, 0x12, + }, + }, + nil, + {Key: []byte("X_CUSTOM_2"), Value: []byte("value2")}, + }, + Expected: []*sarama.RecordHeader{ + {Key: []byte("X_CUSTOM_1"), Value: []byte("value1")}, + { + Key: []byte("x_instana_c"), + Value: []byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0xde, 0xad, 0xbe, 0xef, + // span id + 0x00, 0x00, 0x00, 0x00, 0xab, 0xcd, 0xef, 0x12, + }, + }, + nil, + {Key: []byte("X_CUSTOM_2"), Value: []byte("value2")}, + }, + }, + "existing has trace and span id": { + Value: "deadbeef", + Headers: []*sarama.RecordHeader{ + {Key: []byte("X_CUSTOM_1"), Value: []byte("value1")}, + { + Key: []byte("x_instana_c"), + Value: []byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0xab, 0xcd, 0xef, 0x12, 0xab, 0xcd, 0xef, 0x12, + // span id + 0x00, 0x00, 0x00, 0xab, 0xcd, 0xef, 0x12, 0x34, + }, + }, + nil, + {Key: []byte("X_CUSTOM_2"), Value: []byte("value2")}, + }, + Expected: []*sarama.RecordHeader{ + {Key: []byte("X_CUSTOM_1"), Value: []byte("value1")}, + { + Key: []byte("x_instana_c"), + Value: []byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0xde, 0xad, 0xbe, 0xef, + // span id + 0x00, 0x00, 0x00, 0xab, 0xcd, 0xef, 0x12, 0x34, + }, + }, + nil, + {Key: []byte("X_CUSTOM_2"), Value: []byte("value2")}, + }, + }, + } + + for name, example := range examples { + t.Run(name, func(t *testing.T) { + msg := sarama.ConsumerMessage{Headers: example.Headers} + c := instasarama.ConsumerMessageCarrier{&msg} + + c.Set(instana.FieldT, example.Value) + assert.ElementsMatch(t, example.Expected, msg.Headers) + }) + } +} + +func TestConsumerMessageCarrier_Set_FieldS(t *testing.T) { + var msg sarama.ConsumerMessage + c := instasarama.ConsumerMessageCarrier{&msg} + + c.Set(instana.FieldS, "deadbeef") + assert.Equal(t, []*sarama.RecordHeader{ + { + Key: []byte(instasarama.FieldC), + Value: []byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + // span id + 0x00, 0x00, 0x00, 0x00, 0xde, 0xad, 0xbe, 0xef, + }, + }, + }, msg.Headers) +} + +func TestConsumerMessageCarrier_Update_FieldS(t *testing.T) { + examples := map[string]struct { + Value string + Headers []*sarama.RecordHeader + Expected []*sarama.RecordHeader + }{ + "existing has trace id only": { + Value: "deadbeef", + Headers: []*sarama.RecordHeader{ + {Key: []byte("X_CUSTOM_1"), Value: []byte("value1")}, + { + Key: []byte("x_instana_c"), + Value: []byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0xab, 0xcd, 0xef, 0x12, + // span id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + }, + }, + {Key: []byte("X_CUSTOM_2"), Value: []byte("value2")}, + }, + Expected: []*sarama.RecordHeader{ + {Key: []byte("X_CUSTOM_1"), Value: []byte("value1")}, + { + Key: []byte("x_instana_c"), + Value: []byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0xab, 0xcd, 0xef, 0x12, + // span id + 0x00, 0x00, 0x00, 0x00, 0xde, 0xad, 0xbe, 0xef, + }, + }, + {Key: []byte("X_CUSTOM_2"), Value: []byte("value2")}, + }, + }, + "existing has span id only": { + Value: "deadbeef", + Headers: []*sarama.RecordHeader{ + {Key: []byte("X_CUSTOM_1"), Value: []byte("value1")}, + { + Key: []byte("x_instana_c"), + Value: []byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + // span id + 0x00, 0x00, 0x00, 0x00, 0xab, 0xcd, 0xef, 0x12, + }, + }, + {Key: []byte("X_CUSTOM_2"), Value: []byte("value2")}, + }, + Expected: []*sarama.RecordHeader{ + {Key: []byte("X_CUSTOM_1"), Value: []byte("value1")}, + { + Key: []byte("x_instana_c"), + Value: []byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + // span id + 0x00, 0x00, 0x00, 0x00, 0xde, 0xad, 0xbe, 0xef, + }, + }, + {Key: []byte("X_CUSTOM_2"), Value: []byte("value2")}, + }, + }, + "existing has trace and span id": { + Value: "deadbeef", + Headers: []*sarama.RecordHeader{ + {Key: []byte("X_CUSTOM_1"), Value: []byte("value1")}, + { + Key: []byte("x_instana_c"), + Value: []byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0xab, 0xcd, 0xef, 0x12, + // span id + 0x00, 0x00, 0x00, 0x00, 0xab, 0xcd, 0xef, 0x12, + }, + }, + {Key: []byte("X_CUSTOM_2"), Value: []byte("value2")}, + }, + Expected: []*sarama.RecordHeader{ + {Key: []byte("X_CUSTOM_1"), Value: []byte("value1")}, + { + Key: []byte("x_instana_c"), + Value: []byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0xab, 0xcd, 0xef, 0x12, + // span id + 0x00, 0x00, 0x00, 0x00, 0xde, 0xad, 0xbe, 0xef, + }, + }, + {Key: []byte("X_CUSTOM_2"), Value: []byte("value2")}, + }, + }, + } + + for name, example := range examples { + t.Run(name, func(t *testing.T) { + msg := sarama.ConsumerMessage{Headers: example.Headers} + c := instasarama.ConsumerMessageCarrier{&msg} + + c.Set(instana.FieldS, example.Value) + assert.ElementsMatch(t, example.Expected, msg.Headers) + }) + } +} + +func TestConsumerMessageCarrier_Set_FieldL(t *testing.T) { + examples := map[string][]*sarama.RecordHeader{ + "0": []*sarama.RecordHeader{ + {Key: []byte(instasarama.FieldL), Value: []byte{0x00}}, + }, + "1": []*sarama.RecordHeader{ + {Key: []byte(instasarama.FieldL), Value: []byte{0x01}}, + }, + } + + for value, expected := range examples { + t.Run(value, func(t *testing.T) { + msg := sarama.ConsumerMessage{Headers: expected} + c := instasarama.ConsumerMessageCarrier{&msg} + + c.Set(instana.FieldL, value) + assert.Equal(t, expected, msg.Headers) + }) + } +} + +func TestConsumerMessageCarrier_Update_FieldL(t *testing.T) { + msg := sarama.ConsumerMessage{ + Headers: []*sarama.RecordHeader{ + {Key: []byte("X_CUSTOM_1"), Value: []byte("value1")}, + {Key: []byte("x_instana_l"), Value: []byte{0x00}}, + {Key: []byte("X_CUSTOM_2"), Value: []byte("value2")}, + }, + } + c := instasarama.ConsumerMessageCarrier{&msg} + + c.Set(instana.FieldL, "1") + assert.ElementsMatch(t, []*sarama.RecordHeader{ + {Key: []byte("X_CUSTOM_1"), Value: []byte("value1")}, + {Key: []byte("x_instana_l"), Value: []byte{0x01}}, + {Key: []byte("X_CUSTOM_2"), Value: []byte("value2")}, + }, msg.Headers) +} + +func TestConsumerMessageCarrier_RemoveAll(t *testing.T) { + msg := sarama.ConsumerMessage{ + Headers: []*sarama.RecordHeader{ + {Key: []byte("X_CUSTOM_1"), Value: []byte("value1")}, + { + Key: []byte("x_instana_c"), + Value: []byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0xab, 0xcd, 0xef, 0x12, + // span id + 0x00, 0x00, 0x00, 0x00, 0xde, 0xad, 0xbe, 0xef, + }, + }, + nil, + {Key: []byte("X_CUSTOM_2"), Value: []byte("value2")}, + {Key: []byte("x_INSTANA_L"), Value: []byte{0x01}}, + }, + } + + c := instasarama.ConsumerMessageCarrier{&msg} + c.RemoveAll() + + assert.ElementsMatch(t, []*sarama.RecordHeader{ + {Key: []byte("X_CUSTOM_1"), Value: []byte("value1")}, + nil, + {Key: []byte("X_CUSTOM_2"), Value: []byte("value2")}, + }, msg.Headers) +} + +func TestConsumerMessageCarrier_ForeachKey(t *testing.T) { + msg := sarama.ConsumerMessage{ + Headers: []*sarama.RecordHeader{ + {Key: []byte("X_CUSTOM_1"), Value: []byte("value1")}, + { + Key: []byte("x_instana_c"), + Value: []byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0xab, 0xcd, 0xef, 0x12, + // span id + 0x00, 0x00, 0x00, 0x00, 0xde, 0xad, 0xbe, 0xef, + }, + }, + nil, + {Key: []byte("X_CUSTOM_2"), Value: []byte("value2")}, + {Key: []byte("x_INSTANA_L"), Value: []byte{0x01}}, + }, + } + c := instasarama.ConsumerMessageCarrier{&msg} + + var collected []struct{ Key, Value string } + require.NoError(t, c.ForeachKey(func(k, v string) error { + collected = append(collected, struct{ Key, Value string }{k, v}) + return nil + })) + + assert.ElementsMatch(t, []struct{ Key, Value string }{ + {Key: instana.FieldT, Value: "abcdef12"}, + {Key: instana.FieldS, Value: "deadbeef"}, + {Key: instana.FieldL, Value: "1"}, + }, collected) +} + +func TestConsumerMessageCarrier_ForeachKey_NoTracingHeaders(t *testing.T) { + msg := sarama.ConsumerMessage{ + Headers: []*sarama.RecordHeader{ + {Key: []byte("X_CUSTOM_1"), Value: []byte("value1")}, + }, + } + c := instasarama.ConsumerMessageCarrier{&msg} + + var collected []struct{ Key, Value string } + require.NoError(t, c.ForeachKey(func(k, v string) error { + collected = append(collected, struct{ Key, Value string }{k, v}) + return nil + })) + + assert.Empty(t, collected) +} + +func TestConsumerMessageCarrier_ForeachKey_Error(t *testing.T) { + msg := sarama.ConsumerMessage{ + Headers: []*sarama.RecordHeader{ + { + Key: []byte("x_instana_c"), + Value: []byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0xab, 0xcd, 0xef, 0x12, + // span id + 0x00, 0x00, 0x00, 0x00, 0xde, 0xad, 0xbe, 0xef, + }, + }, + {Key: []byte("x_INSTANA_L"), Value: []byte{0x01}}, + }, + } + c := instasarama.ConsumerMessageCarrier{&msg} + + assert.Error(t, c.ForeachKey(func(k, v string) error { + return errors.New("something went wrong") + })) +} diff --git a/instrumentation/instasarama/record_header.go b/instrumentation/instasarama/record_header.go new file mode 100644 index 000000000..1299abdf5 --- /dev/null +++ b/instrumentation/instasarama/record_header.go @@ -0,0 +1,77 @@ +// +build go1.9 + +package instasarama + +import ( + "bytes" + "encoding/hex" + "fmt" + "strings" +) + +// The following functions perform the packing and unpacking of the trace context +// according to https://github.com/instana/technical-documentation/tree/master/tracing/specification#kafka + +// PackTraceContextHeader packs the trace and span ID into a byte slice to be used as (sarama.RecordHeader).Value. +// The returned slice is always 24 bytes long. +func PackTraceContextHeader(traceID, spanID string) []byte { + buf := make([]byte, 24) + + // hex representation uses 2 bytes to encode one byte of information, which means that + // the length of both trace and span IDs must be even. instana.FormatID() truncates leading + // zeroes, which may lead to data corruption as hex.Decode() will ignore the incomplete byte + // representation at the end + traceID = strings.Repeat("0", len(traceID)%2) + traceID + spanID = strings.Repeat("0", len(spanID)%2) + spanID + + // write the trace ID into the first 16 bytes with zero padding at the beginning + if traceID != "" { + hex.Decode(buf[16-hex.DecodedLen(len(traceID)):16], []byte(traceID)) + } + + // write the span ID into the last 8 bytes + if spanID != "" { + hex.Decode(buf[24-hex.DecodedLen(len(spanID)):], []byte(spanID)) + } + + return buf +} + +// UnpackTraceContextHeader unpacks and returns the trace and span ID, removing any leading zeroes. +// It expects the provided buffer to have exactly 24 bytes. +func UnpackTraceContextHeader(val []byte) (string, string, error) { + if len(val) != 24 { + return "", "", fmt.Errorf("unexpected value length: want 24, got %d", len(val)) + } + + traceID := hex.EncodeToString(bytes.TrimLeft(val[:16], "\000")) + spanID := hex.EncodeToString(bytes.TrimLeft(val[16:], "\000")) + + return strings.TrimPrefix(traceID, "0"), strings.TrimPrefix(spanID, "0"), nil +} + +// PackTraceLevelHeader packs the X-INSTANA-L value into a byte slice to be used as (sarama.RecordHeader).Value. +// It returns a 1-byte slice containing 0x00 if the passed value is "0", and 0x01 otherwise. +func PackTraceLevelHeader(val string) []byte { + switch val { + case "0": + return []byte{0x00} + default: + return []byte{0x01} + } +} + +// UnpackTraceLevelHeader returns "1" if the value contains a non-zero byte, and "0" otherwise. +// It expects the provided buffer to have exactly 1 byte. +func UnpackTraceLevelHeader(val []byte) (string, error) { + if len(val) != 1 { + return "", fmt.Errorf("unexpected value length: want 1, got %d", len(val)) + } + + switch val[0] { + case 0x00: + return "0", nil + default: + return "1", nil + } +} diff --git a/instrumentation/instasarama/record_header_test.go b/instrumentation/instasarama/record_header_test.go new file mode 100644 index 000000000..d54b1cbd8 --- /dev/null +++ b/instrumentation/instasarama/record_header_test.go @@ -0,0 +1,138 @@ +// +build go1.9 + +package instasarama_test + +import ( + "testing" + + "github.com/instana/go-sensor/instrumentation/instasarama" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPackUnpackTraceContextHeader(t *testing.T) { + examples := map[string]struct { + TraceID, SpanID string + Expected [24]byte // using fixed len array here to avoid typos in examples + }{ + "empty values": { + Expected: [24]byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + // span id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + }, + }, + "with short 64-bit trace id, no span id": { + TraceID: "deadbeef1", + Expected: [24]byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x0d, 0xea, 0xdb, 0xee, 0xf1, + // span id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + }, + }, + "with 64-bit trace id, no span id": { + TraceID: "deadbeefdeadbeef", + Expected: [24]byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0xde, 0xad, 0xbe, 0xef, 0xde, 0xad, 0xbe, 0xef, + // span id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + }, + }, + "no trace id, with short 64-bit span id": { + SpanID: "deadbeef", + Expected: [24]byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + // span id + 0x00, 0x00, 0x00, 0x00, 0xde, 0xad, 0xbe, 0xef, + }, + }, + "no trace id, with 64-bit span id": { + SpanID: "deadbeefdeadbeef", + Expected: [24]byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + // span id + 0xde, 0xad, 0xbe, 0xef, 0xde, 0xad, 0xbe, 0xef, + }, + }, + "with 64-bit trace id and 64-bit span id": { + TraceID: "abcd", + SpanID: "deadbeef1", + Expected: [24]byte{ + // trace id + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xab, 0xcd, + // span id + 0x00, 0x00, 0x00, 0x0d, 0xea, 0xdb, 0xee, 0xf1, + }, + }, + } + + for name, example := range examples { + t.Run(name, func(t *testing.T) { + assert.Equal(t, example.Expected[:], instasarama.PackTraceContextHeader(example.TraceID, example.SpanID)) + + traceID, spanID, err := instasarama.UnpackTraceContextHeader(example.Expected[:]) + require.NoError(t, err) + assert.Equal(t, example.TraceID[:], traceID) + assert.Equal(t, example.SpanID[:], spanID) + }) + } +} + +func TestUnpackTraceContextHeader_WrongBufferSize(t *testing.T) { + examples := map[string][]byte{ + "nil": nil, + "too long": make([]byte, 23), + "too short": make([]byte, 25), + } + + for name, example := range examples { + t.Run(name, func(t *testing.T) { + _, _, err := instasarama.UnpackTraceContextHeader(example) + assert.Error(t, err) + }) + } +} + +func TestPackUnpackTraceLevelHeader(t *testing.T) { + // using fixed len arrays here to avoid typos in examples + examples := map[string][1]byte{ + "0": [1]byte{0x00}, + "1": [1]byte{0x01}, + } + + for level, expected := range examples { + t.Run("X-INSTANA-L="+level, func(t *testing.T) { + assert.Equal(t, expected[:], instasarama.PackTraceLevelHeader(level)) + + val, err := instasarama.UnpackTraceLevelHeader(expected[:]) + require.NoError(t, err) + assert.Equal(t, level, val) + }) + } +} + +func TestUnpackTraceLevelHeader_WrongBufferSize(t *testing.T) { + examples := map[string][]byte{ + "nil": nil, + "too long": make([]byte, 2), + "too short": make([]byte, 0), + } + + for name, example := range examples { + t.Run(name, func(t *testing.T) { + _, err := instasarama.UnpackTraceLevelHeader(example) + assert.Error(t, err) + }) + } +} diff --git a/instrumentation/instasarama/span_registry.go b/instrumentation/instasarama/span_registry.go new file mode 100644 index 000000000..be1d96528 --- /dev/null +++ b/instrumentation/instasarama/span_registry.go @@ -0,0 +1,74 @@ +// +build go1.9 + +package instasarama + +import ( + "sync" + + "github.com/Shopify/sarama" + ot "github.com/opentracing/opentracing-go" +) + +type spanKeyType uint8 + +const ( + producerSpanKeyType spanKeyType = iota + 1 + consumerSpanKeyType +) + +type spanKey struct { + Type spanKeyType + Topic string + Partition int32 + Offset int64 +} + +func producerSpanKey(msg *sarama.ProducerMessage) spanKey { + return spanKey{ + Type: producerSpanKeyType, + Topic: msg.Topic, + Partition: msg.Partition, + Offset: msg.Offset, + } +} + +func consumerSpanKey(msg *sarama.ConsumerMessage) spanKey { + return spanKey{ + Type: consumerSpanKeyType, + Topic: msg.Topic, + Partition: msg.Partition, + Offset: msg.Offset, + } +} + +// spanRegistry is a thread-safe storage for spans associated with Kafka messages +type spanRegistry struct { + mu sync.Mutex + spans map[spanKey]ot.Span +} + +func newSpanRegistry() *spanRegistry { + return &spanRegistry{spans: make(map[spanKey]ot.Span)} +} + +// Add puts an active span to the registry +func (r *spanRegistry) Add(key spanKey, sp ot.Span) { + r.mu.Lock() + defer r.mu.Unlock() + + r.spans[key] = sp +} + +// Remove retrieves and removes an active span from registry +func (r *spanRegistry) Remove(key spanKey) (ot.Span, bool) { + r.mu.Lock() + defer r.mu.Unlock() + + sp, ok := r.spans[key] + if !ok { + return nil, false + } + delete(r.spans, key) + + return sp, true +} diff --git a/instrumentation/instasarama/sync_producer.go b/instrumentation/instasarama/sync_producer.go new file mode 100644 index 000000000..fcd658728 --- /dev/null +++ b/instrumentation/instasarama/sync_producer.go @@ -0,0 +1,207 @@ +// +build go1.9 + +package instasarama + +import ( + "bytes" + + "github.com/Shopify/sarama" + instana "github.com/instana/go-sensor" + ot "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" + otlog "github.com/opentracing/opentracing-go/log" +) + +// SyncProducer is a wrapper for sarama.SyncProducer that instruments its calls using +// provided instana.Sensor +type SyncProducer struct { + sarama.SyncProducer + sensor *instana.Sensor + propageContext bool +} + +// NewSyncProducer creates a new SyncProducer using the given broker addresses and configuration, and +// instruments its calls +func NewSyncProducer(addrs []string, config *sarama.Config, sensor *instana.Sensor) (sarama.SyncProducer, error) { + sp, err := sarama.NewSyncProducer(addrs, config) + if err != nil { + return sp, err + } + + return WrapSyncProducer(sp, config, sensor), nil +} + +// NewSyncProducerFromClient creates a new SyncProducer using the given client, and instruments its calls +func NewSyncProducerFromClient(client sarama.Client, sensor *instana.Sensor) (sarama.SyncProducer, error) { + sp, err := sarama.NewSyncProducerFromClient(client) + if err != nil { + return sp, err + } + + return WrapSyncProducer(sp, client.Config(), sensor), nil +} + +// WrapSyncProducer wraps an existing sarama.SyncProducer instance and instruments its calls. It requires the same +// config that was used to create this producer to detect the Kafka version and whether it's supposed to return +// successes/errors. To initialize a new sync producer instance use instasarama.NewSyncProducer() and +// instasarama.NewSyncProducerFromClient() convenience methods instead +func WrapSyncProducer(sp sarama.SyncProducer, config *sarama.Config, sensor *instana.Sensor) *SyncProducer { + return &SyncProducer{ + SyncProducer: sp, + sensor: sensor, + propageContext: contextPropagationSupported(config.Version), + } +} + +// SendMessage picks up the trace context previously added to the message with +// instasarama.ProducerMessageWithSpan(), starts a new child span and injects its +// context into the message headers before sending it to the underlying producer. +// The call will not be traced if there the message does not contain trace context +func (p *SyncProducer) SendMessage(msg *sarama.ProducerMessage) (int32, int64, error) { + sp := startProducerSpan(p.sensor, msg) + if sp != nil { + defer sp.Finish() + + carrier := ProducerMessageCarrier{msg} + if p.propageContext { + // forward the trace context, updating the span ids + sp.Tracer().Inject(sp.Context(), ot.TextMap, carrier) + } else { + // remove previously added trace headers + carrier.RemoveAll() + } + } + + partition, offset, err := p.SyncProducer.SendMessage(msg) + if err != nil && sp != nil { + sp.SetTag("kafka.error", err) + sp.LogFields(otlog.Error(err)) + } + + return partition, offset, err +} + +// SendMessages starts a new span and injects its context into messages headers before +// sending them with the underlying producer. +// +// This method attempts to use the existing trace context found in message headers. +// There will be NO SPAN CREATED for this call if messages originate from different trace contexts. +// A possible use case that result in such behavior would be if messages resulted from different +// HTTP requests are buffered and later being sent in one batch asynchronously. +// In case you want your batch publish operation to be a part of a specific trace, make sure that +// you inject the parent span of this trace explicitly before calling `SendMessages()`, i.e. +// +// type MessageCollector struct { +// CollectedMessages []*sarama.ProducerMessage +// producer *instasarama.SyncProducer +// // ... +// } +// +// func (c MessageCollector) Flush(ctx context.Context) error { +// // extract the parent span from context and use it to continue the trace +// if parentSpan, ok := instana.SpanFromContext(ctx); ok { +// // start a new span for the batch send job +// sp := parentSpan.Tracer().StartSpan("batch-send", ot.ChilfOf(parentSpan.Context())) +// defer sp.Finish() +// +// // inject the trace context into every collected message, overriding the existing one +// for i, msg := range c.CollectedMessages { +// c.CollectedMessages = instasarama.ProducerMessageWithSpan(msg, sp) +// } +// } +// +// return c.producer.SendMessages(c.CollectedMessages) +// } +func (p *SyncProducer) SendMessages(msgs []*sarama.ProducerMessage) error { + if len(msgs) == 0 { + return nil + } + + var sp ot.Span + if producerMessagesFromSameContext(msgs) { + sp = startProducerSpan(p.sensor, msgs[0]) + } + + if sp != nil { + defer sp.Finish() + + instana.BatchSize(len(msgs)).Set(sp) + + // collect unique topics from the rest of messages and inject trace context in one go + topics := make(map[string]struct{}) + for _, msg := range msgs { + if _, ok := topics[msg.Topic]; !ok { + topics[msg.Topic] = struct{}{} + } + + // forward the trace context, updating the span id + sp.Tracer().Inject(sp.Context(), ot.TextMap, ProducerMessageCarrier{msg}) + } + + // send topics as a comma-separated string + buf := bytes.NewBuffer(nil) + for topic := range topics { + buf.WriteString(topic) + buf.WriteByte(',') + } + buf.Truncate(buf.Len() - 1) // truncate trailing comma + sp.SetTag("kafka.service", buf.String()) + } + + err := p.SyncProducer.SendMessages(msgs) + if err != nil && sp != nil { + sp.SetTag("kafka.error", err) + sp.LogFields(otlog.Error(err)) + } + + return err +} + +// startSpan picks up the existing trace context provided in the message and returns a new child +// span. It returns nil if there is no valid context provided in the message +func startProducerSpan(sensor *instana.Sensor, msg *sarama.ProducerMessage) ot.Span { + switch sc, err := sensor.Tracer().Extract(ot.TextMap, ProducerMessageCarrier{msg}); err { + case nil: + return sensor.Tracer().StartSpan( + "kafka", + ext.SpanKindProducer, + ot.ChildOf(sc), + ot.Tags{ + "kafka.service": msg.Topic, + "kafka.access": "send", + }, + ) + case ot.ErrSpanContextNotFound: + sensor.Logger().Debug("no span context provided in message to ", msg.Topic, ", skipping the call", msg.Topic) + case ot.ErrUnsupportedFormat: + sensor.Logger().Info("unsupported span context format provided in message to ", msg.Topic, ", skipping the call") + default: + sensor.Logger().Warn("failed to extract span context from producer message headers: ", err) + } + + return nil +} + +func producerMessagesFromSameContext(msgs []*sarama.ProducerMessage) bool { + if len(msgs) == 0 { + return true + } + + firstTraceID, firstSpanID, err := extractTraceSpanID(msgs[0]) + if err != nil { + return false + } + + for _, msg := range msgs[1:] { + traceID, spanID, err := extractTraceSpanID(msg) + if err != nil { + return false + } + + if traceID != firstTraceID || spanID != firstSpanID { + return false + } + } + + return true +} diff --git a/instrumentation/instasarama/sync_producer_test.go b/instrumentation/instasarama/sync_producer_test.go new file mode 100644 index 000000000..a55c1502a --- /dev/null +++ b/instrumentation/instasarama/sync_producer_test.go @@ -0,0 +1,326 @@ +// +build go1.9 + +package instasarama_test + +import ( + "errors" + "sort" + "strings" + "testing" + + "github.com/Shopify/sarama" + instana "github.com/instana/go-sensor" + "github.com/instana/go-sensor/instrumentation/instasarama" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSyncProducer_SendMessage(t *testing.T) { + recorder := instana.NewTestRecorder() + sensor := instana.NewSensorWithTracer(instana.NewTracerWithEverything(&instana.Options{}, recorder)) + + parent := sensor.Tracer().StartSpan("test-span") + + config := sarama.NewConfig() + config.Version = sarama.V0_11_0_0 + config.Producer.Return.Successes = true + + p := &testSyncProducer{} + wrapped := instasarama.WrapSyncProducer(p, config, sensor) + + _, _, err := wrapped.SendMessage( + instasarama.ProducerMessageWithSpan(&sarama.ProducerMessage{Topic: "test-topic"}, parent), + ) + require.NoError(t, err) + + parent.Finish() + + spans := recorder.GetQueuedSpans() + require.Len(t, spans, 2) + + pSpan, err := extractAgentSpan(spans[1]) + require.NoError(t, err) + + cSpan, err := extractAgentSpan(spans[0]) + require.NoError(t, err) + + assert.Equal(t, 0, cSpan.Ec) + assert.EqualValues(t, instana.ExitSpanKind, cSpan.Kind) + + assert.Equal(t, agentKafkaSpanData{ + Service: "test-topic", + Access: "send", + }, cSpan.Data.Kafka) + + require.Len(t, p.Messages, 1) + assert.Contains(t, p.Messages[0].Headers, sarama.RecordHeader{ + Key: []byte("X_INSTANA_C"), + Value: instasarama.PackTraceContextHeader(instana.FormatID(cSpan.TraceID), instana.FormatID(cSpan.SpanID)), + }) + assert.Contains(t, p.Messages[0].Headers, sarama.RecordHeader{ + Key: []byte("X_INSTANA_L"), + Value: instasarama.PackTraceLevelHeader("1"), + }) + + assert.Equal(t, pSpan.TraceID, cSpan.TraceID) + assert.Equal(t, pSpan.SpanID, cSpan.ParentID) + assert.NotEqual(t, pSpan.SpanID, cSpan.SpanID) +} + +func TestSyncProducer_SendMessage_NoTraceContext(t *testing.T) { + recorder := instana.NewTestRecorder() + sensor := instana.NewSensorWithTracer(instana.NewTracerWithEverything(&instana.Options{}, recorder)) + + config := sarama.NewConfig() + config.Version = sarama.V0_11_0_0 + config.Producer.Return.Successes = true + + p := &testSyncProducer{} + wrapped := instasarama.WrapSyncProducer(p, config, sensor) + + _, _, err := wrapped.SendMessage(&sarama.ProducerMessage{ + Topic: "test-topic", + }) + require.NoError(t, err) + + spans := recorder.GetQueuedSpans() + assert.Empty(t, spans) + + require.Len(t, p.Messages, 1) + assert.Empty(t, p.Messages[0].Headers) +} + +func TestSyncProducer_SendMessage_Error(t *testing.T) { + recorder := instana.NewTestRecorder() + sensor := instana.NewSensorWithTracer(instana.NewTracerWithEverything(&instana.Options{}, recorder)) + + config := sarama.NewConfig() + config.Version = sarama.V0_11_0_0 + config.Producer.Return.Successes = true + + p := &testSyncProducer{ + Error: errors.New("something went wrong"), + } + wrapped := instasarama.WrapSyncProducer(p, config, sensor) + + parent := sensor.Tracer().StartSpan("test-span") + _, _, err := wrapped.SendMessage( + instasarama.ProducerMessageWithSpan(&sarama.ProducerMessage{Topic: "test-topic"}, parent), + ) + parent.Finish() + + assert.Error(t, err) + + spans := recorder.GetQueuedSpans() + require.Len(t, spans, 2) + + span, err := extractAgentSpan(spans[0]) + require.NoError(t, err) + + assert.Equal(t, 1, span.Ec) + assert.EqualValues(t, instana.ExitSpanKind, span.Kind) + + assert.Equal(t, agentKafkaSpanData{ + Service: "test-topic", + Access: "send", + }, span.Data.Kafka) +} + +func TestSyncProducer_SendMessages_SameTraceContext(t *testing.T) { + recorder := instana.NewTestRecorder() + sensor := instana.NewSensorWithTracer(instana.NewTracerWithEverything(&instana.Options{}, recorder)) + + config := sarama.NewConfig() + config.Version = sarama.V0_11_0_0 + config.Producer.Return.Successes = true + + p := &testSyncProducer{} + wrapped := instasarama.WrapSyncProducer(p, config, sensor) + + parent := sensor.Tracer().StartSpan("test-span") + require.NoError(t, wrapped.SendMessages([]*sarama.ProducerMessage{ + instasarama.ProducerMessageWithSpan(&sarama.ProducerMessage{Topic: "test-topic-1"}, parent), + instasarama.ProducerMessageWithSpan(&sarama.ProducerMessage{Topic: "test-topic-2"}, parent), + })) + parent.Finish() + + spans := recorder.GetQueuedSpans() + require.Len(t, spans, 2) + + pSpan, err := extractAgentSpan(spans[1]) + require.NoError(t, err) + + cSpan, err := extractAgentSpan(spans[0]) + require.NoError(t, err) + + assert.Equal(t, 0, cSpan.Ec) + assert.EqualValues(t, instana.ExitSpanKind, cSpan.Kind) + assert.Equal(t, 2, cSpan.Batch.Size) + + // sort comma-separated list of topics for comparison + topics := strings.Split(cSpan.Data.Kafka.Service, ",") + sort.Strings(topics) + cSpan.Data.Kafka.Service = strings.Join(topics, ",") + + assert.Equal(t, agentKafkaSpanData{ + Service: "test-topic-1,test-topic-2", + Access: "send", + }, cSpan.Data.Kafka) + + require.Len(t, p.Messages, 2) + for _, msg := range p.Messages { + assert.Contains(t, msg.Headers, sarama.RecordHeader{ + Key: []byte("X_INSTANA_C"), + Value: instasarama.PackTraceContextHeader(instana.FormatID(cSpan.TraceID), instana.FormatID(cSpan.SpanID)), + }) + assert.Contains(t, msg.Headers, sarama.RecordHeader{ + Key: []byte("X_INSTANA_L"), + Value: instasarama.PackTraceLevelHeader("1"), + }) + } + + assert.Equal(t, pSpan.TraceID, cSpan.TraceID) + assert.Equal(t, pSpan.SpanID, cSpan.ParentID) + assert.NotEqual(t, pSpan.SpanID, cSpan.SpanID) +} + +func TestSyncProducer_SendMessages_DifferentTraceContext(t *testing.T) { + recorder := instana.NewTestRecorder() + sensor := instana.NewSensorWithTracer(instana.NewTracerWithEverything(&instana.Options{}, recorder)) + + parentOne := sensor.Tracer().StartSpan("test-span") + defer parentOne.Finish() + + parentTwo := sensor.Tracer().StartSpan("test-span") + defer parentTwo.Finish() + + examples := map[string][]*sarama.ProducerMessage{ + "different parent spans": { + instasarama.ProducerMessageWithSpan(&sarama.ProducerMessage{Topic: "test-topic-1"}, parentOne), + instasarama.ProducerMessageWithSpan(&sarama.ProducerMessage{Topic: "test-topic-2"}, parentTwo), + }, + "with message without trace context": { + instasarama.ProducerMessageWithSpan(&sarama.ProducerMessage{Topic: "test-topic-1"}, parentOne), + &sarama.ProducerMessage{Topic: "test-topic-3"}, + }, + } + + for name, messages := range examples { + t.Run(name, func(t *testing.T) { + config := sarama.NewConfig() + config.Version = sarama.V0_11_0_0 + config.Producer.Return.Successes = true + + p := &testSyncProducer{} + wrapped := instasarama.WrapSyncProducer(p, config, sensor) + + require.NoError(t, wrapped.SendMessages(messages)) + + assert.Empty(t, recorder.GetQueuedSpans()) + assert.ElementsMatch(t, messages, p.Messages) + }) + } +} + +func TestSyncProducer_SendMessages_NoTraceContext(t *testing.T) { + recorder := instana.NewTestRecorder() + sensor := instana.NewSensorWithTracer(instana.NewTracerWithEverything(&instana.Options{}, recorder)) + + config := sarama.NewConfig() + config.Version = sarama.V0_11_0_0 + config.Producer.Return.Successes = true + + p := &testSyncProducer{} + wrapped := instasarama.WrapSyncProducer(p, config, sensor) + + require.NoError(t, wrapped.SendMessages([]*sarama.ProducerMessage{ + {Topic: "test-topic-1"}, + {Topic: "test-topic-2"}, + })) + + spans := recorder.GetQueuedSpans() + assert.Empty(t, spans) + + require.Len(t, p.Messages, 2) + assert.Empty(t, p.Messages[0].Headers) + assert.Empty(t, p.Messages[1].Headers) +} + +func TestSyncProducer_SendMessages_Error(t *testing.T) { + recorder := instana.NewTestRecorder() + sensor := instana.NewSensorWithTracer(instana.NewTracerWithEverything(&instana.Options{}, recorder)) + + config := sarama.NewConfig() + config.Version = sarama.V0_11_0_0 + config.Producer.Return.Successes = true + + p := &testSyncProducer{ + Error: errors.New("something went wrong"), + } + wrapped := instasarama.WrapSyncProducer(p, config, sensor) + + parent := sensor.Tracer().StartSpan("test-span") + assert.Error(t, wrapped.SendMessages([]*sarama.ProducerMessage{ + instasarama.ProducerMessageWithSpan(&sarama.ProducerMessage{Topic: "test-topic-1"}, parent), + instasarama.ProducerMessageWithSpan(&sarama.ProducerMessage{Topic: "test-topic-2"}, parent), + })) + parent.Finish() + + spans := recorder.GetQueuedSpans() + require.Len(t, spans, 2) + + span, err := extractAgentSpan(spans[0]) + require.NoError(t, err) + + assert.Equal(t, 1, span.Ec) + assert.EqualValues(t, instana.ExitSpanKind, span.Kind) + assert.Equal(t, 2, span.Batch.Size) + + // sort comma-separated list of topics for comparison + topics := strings.Split(span.Data.Kafka.Service, ",") + sort.Strings(topics) + span.Data.Kafka.Service = strings.Join(topics, ",") + + assert.Equal(t, agentKafkaSpanData{ + Service: "test-topic-1,test-topic-2", + Access: "send", + }, span.Data.Kafka) +} + +func TestSyncProducer_Close(t *testing.T) { + recorder := instana.NewTestRecorder() + sensor := instana.NewSensorWithTracer(instana.NewTracerWithEverything(&instana.Options{}, recorder)) + + config := sarama.NewConfig() + config.Version = sarama.V0_11_0_0 + config.Producer.Return.Successes = true + + p := &testSyncProducer{} + wrapped := instasarama.WrapSyncProducer(p, config, sensor) + wrapped.Close() + + assert.True(t, p.Closed) +} + +type testSyncProducer struct { + Error error + Messages []*sarama.ProducerMessage + Closed bool +} + +func (p *testSyncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) { + p.Messages = append(p.Messages, msg) + + return 0, 0, p.Error +} + +func (p *testSyncProducer) SendMessages(msgs []*sarama.ProducerMessage) error { + p.Messages = append(p.Messages, msgs...) + + return p.Error +} + +func (p *testSyncProducer) Close() error { + p.Closed = true + return p.Error +}