Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
eko committed Oct 22, 2021
2 parents 98b35fa + 1e1d179 commit c0f5f1c
Show file tree
Hide file tree
Showing 25 changed files with 799 additions and 21 deletions.
7 changes: 0 additions & 7 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,6 @@ jobs:
name: Build
runs-on: ubuntu-latest
steps:

- name: Set up Go 1.15
uses: actions/setup-go@v1
with:
go-version: 1.15
id: go

- name: Check out code into the Go module directory
uses: actions/checkout@v2

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
GOLANG_VERSION?=1.15
GOLANG_VERSION?=1.17

.PHONY: dev.up
dev.up:
Expand Down
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,12 @@ for development:
* to run tests `make tests`
* to stop local environment, run `make dev.down`

# OpenTelemetry support

An [OpenTelemetry](https://opentelemetry.io/) instrumentation is available under `instrumentation/otel/` directory.

It is a separated Go module you can import in your project:

```bash
$ go get github.com/etf1/kafka-transformer/instrumentation/otel
```
6 changes: 5 additions & 1 deletion examples/custom_collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"syscall"

confluent "github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/etf1/kafka-transformer/pkg/instrument"
"github.com/etf1/kafka-transformer/pkg/transformer/kafka"

"github.com/prometheus/client_golang/prometheus/promhttp"
Expand All @@ -30,7 +31,10 @@ func main() {
ProducerConfig: &confluent.ConfigMap{
"bootstrap.servers": broker,
},
Collector: NewCollector("custom_collector"),
Collector: instrument.NewMultiCollector(
NewCollector("custom_collector"),
// Add your other collectors here if you have multiple ones...
),
}

transformer, err := kafka.NewKafkaTransformer(config)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module github.com/etf1/kafka-transformer

go 1.15
go 1.17

require github.com/confluentinc/confluent-kafka-go v1.5.2
3 changes: 3 additions & 0 deletions instrumentation/otel/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# OpenTelemetry Instrumentation

This package provides an instrumentation of [OpenTelemetry](https://github.com/open-telemetry).
140 changes: 140 additions & 0 deletions instrumentation/otel/collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package otel

import (
"context"
"sync"
"time"

"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/etf1/kafka-transformer/instrumentation/otel/internal"
"github.com/etf1/kafka-transformer/pkg/instrument"

"go.opentelemetry.io/contrib"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
oteltrace "go.opentelemetry.io/otel/trace"
)

// Collector is the OpenTelemetry instrumentation collector.
type Collector struct {
ctx context.Context
tracer oteltrace.Tracer
propagator propagation.TextMapPropagator
consumerGroupID string
spans *sync.Map
}

// NewCollector instanciates a new Collector.
func NewCollector(opts ...Option) *Collector {
cfg := &config{
tracerProvider: otel.GetTracerProvider(),
propagator: otel.GetTextMapPropagator(),
tracerName: tracerName,
}

for _, o := range opts {
o.apply(cfg)
}

return &Collector{
ctx: context.Background(),
tracer: cfg.tracerProvider.Tracer(
cfg.tracerName,
oteltrace.WithInstrumentationVersion(contrib.SemVersion()),
),
propagator: cfg.propagator,
consumerGroupID: cfg.consumerGroupID,
spans: &sync.Map{},
}
}

func (c *Collector) attrsByOperationAndMessage(operation internal.Operation, msg *kafka.Message) []attribute.KeyValue {
attributes := []attribute.KeyValue{
internal.KafkaSystemKey(),
internal.KafkaOperation(operation),
semconv.MessagingDestinationKindTopic,
}

switch operation {
case internal.OperationConsume:
attributes = append(
attributes,
internal.KafkaConsumerGroupID(c.consumerGroupID),
)
}

if msg != nil {
attributes = append(
attributes,
internal.KafkaMessageKey(string(msg.Key)),
semconv.MessagingKafkaPartitionKey.Int(int(msg.TopicPartition.Partition)),
)
attributes = append(attributes, internal.KafkaMessageHeaders(msg.Headers)...)

if topic := msg.TopicPartition.Topic; topic != nil {
attributes = append(attributes, internal.KafkaDestinationTopic(*topic))
}
}

return attributes
}

func (c *Collector) startSpan(operationName internal.Operation, msg *kafka.Message) oteltrace.Span {
opts := []oteltrace.SpanStartOption{
oteltrace.WithSpanKind(oteltrace.SpanKindConsumer),
}

carrier := NewMessageCarrier(msg)
ctx := c.propagator.Extract(c.ctx, carrier)

ctx, span := c.tracer.Start(ctx, string(operationName), opts...)

c.propagator.Inject(ctx, carrier)

span.SetAttributes(c.attrsByOperationAndMessage(operationName, msg)...)

return span
}

// Before is triggered before an event occurred.
func (c *Collector) Before(message *kafka.Message, action instrument.Action, _ time.Time) {
if message == nil {
return
}

var operation internal.Operation
switch action {
case instrument.KafkaProducerProduce:
operation = internal.OperationProduce

case instrument.KafkaConsumerConsume:
operation = internal.OperationConsume

case instrument.TransformerTransform:
operation = internal.OperationTransform

case instrument.ProjectorProject:
operation = internal.OperationProject

default:
return
}

span := c.startSpan(operation, message)

c.spans.Store(message, span)
}

// After is triggered before an event occurred.
func (c *Collector) After(message *kafka.Message, action instrument.Action, err error, _ time.Time) {
if message == nil {
return
}

if value, ok := c.spans.LoadAndDelete(message); ok {
span := value.(oteltrace.Span)
endSpan(span, err)
}
}
154 changes: 154 additions & 0 deletions instrumentation/otel/collector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package otel

import (
"context"
"sync"
"testing"
"time"

"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/etf1/kafka-transformer/pkg/instrument"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)

func TestNewCollector(t *testing.T) {
// When
collector := NewCollector()

// Then
assert.IsType(t, new(Collector), collector)

assert.Equal(t, context.Background(), collector.ctx)
assert.Equal(t, otel.GetTextMapPropagator(), collector.propagator)
assert.Equal(t, new(sync.Map), collector.spans)
}

func TestNewCollector_WithTracerProvider(t *testing.T) {
// Given
tracerProvider := otel.GetTracerProvider()

// When
collector := NewCollector(WithTracerProvider(tracerProvider))

// Then
assert.IsType(t, new(Collector), collector)

assert.Equal(t, context.Background(), collector.ctx)
assert.Equal(t, otel.GetTextMapPropagator(), collector.propagator)
assert.Equal(t, new(sync.Map), collector.spans)
}

func TestCollector_Before_Produce(t *testing.T) {
// Given
tracerProvider := otel.GetTracerProvider()

collector := NewCollector(WithTracerProvider(tracerProvider))

message := &kafka.Message{
Key: []byte("key"),
Value: []byte("value"),
}

// When
collector.Before(message, instrument.KafkaProducerProduce, time.Now())

// Then
value, ok := collector.spans.Load(message)
span := value.(trace.Span)

assert.True(t, ok)
assert.Equal(t, "00000000000000000000000000000000", span.SpanContext().TraceID().String())
assert.Equal(t, "0000000000000000", span.SpanContext().SpanID().String())
}

func TestCollector_Before_Consume(t *testing.T) {
// Given
tracerProvider := otel.GetTracerProvider()

collector := NewCollector(WithTracerProvider(tracerProvider))

message := &kafka.Message{
Key: []byte("key"),
Value: []byte("value"),
}

// When
collector.Before(message, instrument.KafkaConsumerConsume, time.Now())

// Then
value, ok := collector.spans.Load(message)
span := value.(trace.Span)

assert.True(t, ok)
assert.Equal(t, "00000000000000000000000000000000", span.SpanContext().TraceID().String())
assert.Equal(t, "0000000000000000", span.SpanContext().SpanID().String())
}

func TestCollector_Before_Transform(t *testing.T) {
// Given
tracerProvider := otel.GetTracerProvider()

collector := NewCollector(WithTracerProvider(tracerProvider))

message := &kafka.Message{
Key: []byte("key"),
Value: []byte("value"),
}

// When
collector.Before(message, instrument.TransformerTransform, time.Now())

// Then
value, ok := collector.spans.Load(message)
span := value.(trace.Span)

assert.True(t, ok)
assert.Equal(t, "00000000000000000000000000000000", span.SpanContext().TraceID().String())
assert.Equal(t, "0000000000000000", span.SpanContext().SpanID().String())
}

func TestCollector_Before_Project(t *testing.T) {
// Given
tracerProvider := otel.GetTracerProvider()

collector := NewCollector(WithTracerProvider(tracerProvider))

message := &kafka.Message{
Key: []byte("key"),
Value: []byte("value"),
}

// When
collector.Before(message, instrument.ProjectorProject, time.Now())

// Then
value, ok := collector.spans.Load(message)
span := value.(trace.Span)

assert.True(t, ok)
assert.Equal(t, "00000000000000000000000000000000", span.SpanContext().TraceID().String())
assert.Equal(t, "0000000000000000", span.SpanContext().SpanID().String())
}

func TestCollector_Before_AnotherAction(t *testing.T) {
// Given
tracerProvider := otel.GetTracerProvider()

collector := NewCollector(WithTracerProvider(tracerProvider))

message := &kafka.Message{
Key: []byte("key"),
Value: []byte("value"),
}

// When
collector.Before(message, instrument.OverallTime, time.Now())

// Then
value, ok := collector.spans.Load(message)

assert.Nil(t, value)
assert.False(t, ok)
}
18 changes: 18 additions & 0 deletions instrumentation/otel/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package otel

import (
"go.opentelemetry.io/otel/codes"
oteltrace "go.opentelemetry.io/otel/trace"
)

const (
// tracerName is the technical name of the tracer.
tracerName = "github.com/etf1/kafka-transformer/instrumentation/otel"
)

func endSpan(s oteltrace.Span, err error) {
if err != nil {
s.SetStatus(codes.Error, err.Error())
}
s.End()
}
Loading

0 comments on commit c0f5f1c

Please sign in to comment.