Skip to content

Commit

Permalink
Merge pull request #106 from instana/instrument_sarama
Browse files Browse the repository at this point in the history
Instrument github.com/Shopify/sarama
  • Loading branch information
Andrew Slotin authored Apr 20, 2020
2 parents ee25376 + 84e06ec commit d966816
Show file tree
Hide file tree
Showing 27 changed files with 3,880 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ matrix:
include:
- go: tip
- go: 1.8.x
env: EXCLUDE_DIRS="./instrumentation/instagrpc"
env: EXCLUDE_DIRS="./instrumentation/instagrpc ./instrumentation/instasarama"
- go: 1.9.x
- go: 1.10.x
- go: 1.11.x
Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ The provided `parentSpan` is the incoming request from the request handler (see

[`github.com/instana/go-sensor/instrumentation/instagrpc`](./instrumentation/instagrpc) provides both unary and stream interceptors to instrument GRPC servers and clients that use `google.golang.org/grpc`.

### Kafka producers and consumers

[`github.com/instana/go-sensor/instrumentation/instasarama`](./instrumentation/instasarama) provides both unary and stream interceptors to instrument Kafka producers and consumers built on top of `github.com/Shopify/sarama`.

## Sensor

To use sensor only without tracing ability, import the `instana` package and run
Expand Down
21 changes: 21 additions & 0 deletions instrumentation/instasarama/LICENSE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2020 Instana

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
129 changes: 129 additions & 0 deletions instrumentation/instasarama/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
Instana instrumentation for github.com/Shopify/sarama
=====================================================

This module contains instrumentation code for Kafka producers and consumers that use `github.com/Shopify/sarama` library starting
from v1.19.0 and above.

[![GoDoc](https://img.shields.io/static/v1?label=godoc&message=reference&color=blue)][godoc]

Installation
------------

Unlike the Instana Go sensor, this instrumentation module requires Go v1.9+.

```bash
$ go get github.com/instana/go-sensor/instrumentation/instasarama
```

Usage
-----

For detailed usage example see [the documentation][godoc] or [`example_test.go`](./example_test.go).

This instrumentation requires an instance of `instana.Sensor` to initialize spans and handle the trace context propagation.
You can create a new instance of Instana sensor using `instana.NewSensor()`.

`instasarama` provides a set of convenience wrappers for constructor functions exported by `github.com/Shopify/sarama`. These
wrappers are named the same way as their origins and use the same set of arguments. In most cases it's enough to replace
`sarama` with `instasarama` in the constructor call and append an instance of `*instana.Sensor` to the argument list.

**Note**: Kafka supports record headers starting from v0.11.0. In order to enable trace context propagation, you need to make sure
that your `(sarama.Config).Version` is set to at least `sarama.V0_11_0_0`.

### Instrumenting `sarama.SyncProducer`

For more detailed example code please consult the [package documentation][godoc] or [example_sync_producer_test.go](./example_sync_producer_test.go).

To create an instrumented instance of `sarama.SyncProducer` from a list of broker addresses use [instasarama.NewSyncProducer()][NewSyncProducer]:

```go
producer := instasarama.NewSyncProducer(brokers, config, sensor)
```

[instasarama.NewSyncProducerFromClient()][NewSyncProducerFromClient] does the same, but from an existing `sarama.Client`:

```go
producer := instasarama.NewSyncProducerFromClient(client, sensor)
```

The wrapped producer takes care of trace context propagation by creating an exit span and injecting the trace context into each Kafka
message headers. Since `github.com/Shopify/sarama` does not use `context.Context`, which is a conventional way of passing the parent
span in Instana Go sensor, the caller needs to inject the parent span context using [`instasarama.ProducerMessageWithSpan()`][ProducerMessageWithSpan]
before passing it to the wrapped producer.

### Instrumenting `sarama.AsyncProducer`

Similarly to `sarama.SyncProducer`, `instasarama` provides wrappers for constructor methods of `sarama.AsyncProducer` and expects
the parent span context to be injected into message headers using using `instasarama.ProducerMessageWithSpan()`.

For more detailed example code please consult the [package documentation][godoc] or [example_async_producer_test.go](./example_async_producer_test.go).

To create an instrumented instance of `sarama.AsyncProducer` from a list of broker addresses use [instasarama.NewAsyncProducer()][NewAsyncProducer]:

```go
producer := instasarama.NewAsyncProducer(brokers, config, sensor)
```

[instasarama.NewAsyncProducerFromClient()][NewAsyncProducerFromClient] does the same, but from an existing `sarama.Client`:

```go
producer := instasarama.NewAsyncProducerFromClient(client, sensor)
```

The wrapped producer takes care of trace context propagation by creating an exit span and injecting the trace context into each Kafka
message headers. Since `github.com/Shopify/sarama` does not use `context.Context`, which is a conventional way of passing the parent
span in Instana Go sensor, the caller needs to inject the parent span context using [`instasarama.ProducerMessageWithSpan()`][ProducerMessageWithSpan]
before passing it to the wrapped producer.

### Instrumenting `sarama.Consumer`

For more detailed example code please consult the [package documentation][godoc] or [example_consumer_test.go](./example_consumer_test.go).

To create an instrumented instance of `sarama.Consumer` from a list of broker addresses use [instasarama.NewConsumer()][NewConsumer]:

```go
consumer := instasarama.NewConsumer(brokers, config, sensor)
```

[instasarama.NewConsumerFromClient()][NewConsumerFromClient] does the same, but from an existing `sarama.Client`:

```go
consumer := instasarama.NewConsumerFromClient(client, sensor)
```

The wrapped consumer will pick up the existing trace context if found in message headers, start a new entry span and inject its context
into each message. This context can be retrieved with [`instasarama.SpanContextFromConsumerMessage()`][SpanContextFromConsumerMessage]
and used in the message handler to continue the trace.

### Instrumenting `sarama.ConsumerGroup`

For more detailed example code please consult the [package documentation][godoc] or [example_consumer_group_test.go](./example_consumer_group_test.go).

`instasarama` provides [`instasarama.WrapConsumerGroupHandler()`][WrapConsumerGroupHandler] to wrap your `sarama.ConsumerGroupHandler`
into a wrapper that takes care of trace context extraction, creating an entry span and injecting its context into each received `sarama.ConsumerMessage`:

```go
var client sarama.ConsumerGroup

consumer := instasarama.WrapConsumerGroupHandler(&Consumer{}, sensor)

// use the wrapped consumer in the Consume() call
for {
client.Consume(ctx, consumer)
}
```

The wrapped consumer will pick up the existing trace context if found in message headers, start a new entry span and inject its context
into each message. This context can be retrieved with [`instasarama.SpanContextFromConsumerMessage()`][SpanContextFromConsumerMessage] and used
in the message handler to continue the trace.

[godoc]: https://pkg.go.dev/github.com/instana/go-sensor/instrumentation/instasarama
[NewSyncProducer]: https://pkg.go.dev/github.com/instana/go-sensor/instrumentation/instasarama?tab=doc#NewSyncProducer
[NewSyncProducerFromClient]: https://pkg.go.dev/github.com/instana/go-sensor/instrumentation/instasarama?tab=doc#NewSyncProducerFromClient
[NewAsyncProducer]: https://pkg.go.dev/github.com/instana/go-sensor/instrumentation/instasarama?tab=doc#NewAsyncProducer
[NewAsyncProducerFromClient]: https://pkg.go.dev/github.com/instana/go-sensor/instrumentation/instasarama?tab=doc#NewAsyncProducerFromClient
[NewConsumer]: https://pkg.go.dev/github.com/instana/go-sensor/instrumentation/instasarama?tab=doc#NewConsumer
[NewConsumerFromClient]: https://pkg.go.dev/github.com/instana/go-sensor/instrumentation/instasarama?tab=doc#NewConsumerFromClient
[WrapConsumerGroupHandler]: https://pkg.go.dev/github.com/instana/go-sensor/instrumentation/instasarama?tab=doc#WrapConsumerGroupHandler
[ProducerMessageWithSpan]: https://pkg.go.dev/github.com/instana/go-sensor/instrumentation/instasarama?tab=doc#ProducerMessageWithSpan
[SpanContextFromConsumerMessage]: https://pkg.go.dev/github.com/instana/go-sensor/instrumentation/instasarama?tab=doc#SpanContextFromConsumerMessage
138 changes: 138 additions & 0 deletions instrumentation/instasarama/async_producer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// +build go1.9

package instasarama

import (
"github.com/Shopify/sarama"
instana "github.com/instana/go-sensor"
ot "github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
)

// AsyncProducer is a wrapper for sarama.AsyncProducer that instruments its calls using
// provided instana.Sensor
type AsyncProducer struct {
sarama.AsyncProducer
sensor *instana.Sensor

awaitResult bool
propageContext bool

input chan *sarama.ProducerMessage
successes chan *sarama.ProducerMessage
errors chan *sarama.ProducerError

channelStates uint8 // bit fields describing the open/closed state of the response channels
activeSpans *spanRegistry
}

const (
apSuccessesChanReady = uint8(1) << iota
apErrorsChanReady

apAllChansReady = apSuccessesChanReady | apErrorsChanReady
)

// NewAsyncProducer creates a new sarama.AsyncProducer using the given broker addresses and configuration, and
// instruments its calls
func NewAsyncProducer(addrs []string, conf *sarama.Config, sensor *instana.Sensor) (sarama.AsyncProducer, error) {
ap, err := sarama.NewAsyncProducer(addrs, conf)
if err != nil {
return ap, err
}

return WrapAsyncProducer(ap, conf, sensor), nil
}

// NewAsyncProducerFromClient creates a new sarama.AsyncProducer using the given client, and
// instruments its calls
func NewAsyncProducerFromClient(client sarama.Client, sensor *instana.Sensor) (sarama.AsyncProducer, error) {
ap, err := sarama.NewAsyncProducerFromClient(client)
if err != nil {
return ap, err
}

return WrapAsyncProducer(ap, client.Config(), sensor), nil
}

// WrapAsyncProducer wraps an existing sarama.AsyncProducer and instruments its calls. It requires the same
// config that was used to create this producer to detect the Kafka version and whether it's supposed to return
// successes/errors. To initialize a new sync producer instance use instasarama.NewAsyncProducer() and
// instasarama.NewAsyncProducerFromClient() convenience methods instead
func WrapAsyncProducer(p sarama.AsyncProducer, conf *sarama.Config, sensor *instana.Sensor) *AsyncProducer {
ap := &AsyncProducer{
AsyncProducer: p,
sensor: sensor,
input: make(chan *sarama.ProducerMessage),
successes: make(chan *sarama.ProducerMessage),
errors: make(chan *sarama.ProducerError),
channelStates: apAllChansReady,
}

if conf != nil {
ap.propageContext = contextPropagationSupported(conf.Version)
ap.awaitResult = conf.Producer.Return.Successes && conf.Producer.Return.Errors
ap.activeSpans = newSpanRegistry()
}

go ap.consume()

return ap
}

// Input is the input channel for the user to write messages to that they wish to send. The async producer
// will than create a new exit span for each message that has trace context added with instasarama.ProducerMessageWithSpan()
func (p *AsyncProducer) Input() chan<- *sarama.ProducerMessage { return p.input }

// Successes is the success output channel back to the user
func (p *AsyncProducer) Successes() <-chan *sarama.ProducerMessage { return p.successes }

// Errors is the error output channel back to the user
func (p *AsyncProducer) Errors() <-chan *sarama.ProducerError { return p.errors }

func (p *AsyncProducer) consume() {
for p.channelStates&apAllChansReady != 0 {
select {
case msg := <-p.input:
sp := startProducerSpan(p.sensor, msg)
if sp != nil {
if p.awaitResult { // postpone span finish until the result is received
p.activeSpans.Add(producerSpanKey(msg), sp)
} else {
sp.Finish()
}

carrier := ProducerMessageCarrier{msg}
if p.propageContext {
p.sensor.Tracer().Inject(sp.Context(), ot.TextMap, carrier)
} else {
carrier.RemoveAll()
}
}

p.AsyncProducer.Input() <- msg
case msg, ok := <-p.AsyncProducer.Successes():
if !ok {
p.channelStates &= ^apSuccessesChanReady
continue
}
p.successes <- msg

if sp, ok := p.activeSpans.Remove(producerSpanKey(msg)); ok {
sp.Finish()
}
case msg, ok := <-p.AsyncProducer.Errors():
if !ok {
p.channelStates &= ^apErrorsChanReady
continue
}
p.errors <- msg

if sp, ok := p.activeSpans.Remove(producerSpanKey(msg.Msg)); ok {
sp.SetTag("kafka.error", msg.Err)
sp.LogFields(otlog.Error(msg.Err))
sp.Finish()
}
}
}
}
Loading

0 comments on commit d966816

Please sign in to comment.