Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Text Map Propagator #6

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion pkg/opentelemetry/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package opentelemetry

import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/propagation"
)

// config represents the configuration options available for subscriber
// middlewares and publisher decorators.
type config struct {
spanAttributes []attribute.KeyValue
spanAttributes []attribute.KeyValue
textMapPropagator propagation.TextMapPropagator
}

// Option provides a convenience wrapper for simple options that can be
Expand All @@ -20,3 +22,13 @@ func WithSpanAttributes(attributes ...attribute.KeyValue) Option {
c.spanAttributes = attributes
}
}

// WithTextMapPropagator sets the TextMapPropagator in order to propagate context data across process boundaries.
func WithTextMapPropagator() Option {
return func(c *config) {
c.textMapPropagator = propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
)
}
}
8 changes: 8 additions & 0 deletions pkg/opentelemetry/publishers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/ThreeDotsLabs/watermill/message"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.10.0"
"go.opentelemetry.io/otel/trace"
)
Expand Down Expand Up @@ -66,6 +67,13 @@ func (p *PublisherDecorator) Publish(topic string, messages ...*message.Message)
spanAttributes = append(spanAttributes, spanAttributes...)
span.SetAttributes(spanAttributes...)

if p.config.textMapPropagator != nil {
for i, msg := range messages {
p.config.textMapPropagator.Inject(ctx, propagation.MapCarrier(msg.Metadata))
messages[i] = msg
}
}

err := p.pub.Publish(topic, messages...)
if err != nil {
span.RecordError(err)
Expand Down
27 changes: 24 additions & 3 deletions pkg/opentelemetry/publishers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,29 @@ package opentelemetry

import (
"bytes"
"fmt"
"testing"

"github.com/ThreeDotsLabs/watermill/message"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

func TestNewPublisherDecorator(t *testing.T) {
var (
uuid = "0d5427ea-7ab4-4ef1-b80d-0a22bd54a98f"
payload = message.Payload("test payload")

traceParentKey = "traceparent"
traceID, _ = trace.TraceIDFromHex("093615e8ce177910353c5a09782ba62a")
spanID, _ = trace.SpanIDFromHex("98c5fa0e132dd10d")
traceParentID = fmt.Sprintf("00-%s-%s-00", traceID.String(), spanID.String())
sc = trace.NewSpanContext(trace.SpanContextConfig{
TraceID: traceID,
SpanID: spanID,
Remote: true,
})

pub = &mockMessagePublisher{
PublishFunc: func(topic string, messages ...*message.Message) error {
if got, want := topic, "test.topic"; got != want {
Expand All @@ -29,6 +41,10 @@ func TestNewPublisherDecorator(t *testing.T) {
t.Fatalf("message.UUID = %q, want %q", got, want)
}

if got, want := message.Metadata.Get(traceParentKey), traceParentID; got != want {
t.Fatalf("message.Metadata.Get(%q) = %q, want %q", traceParentKey, got, want)
}

if !bytes.Equal(payload, message.Payload) {
t.Fatalf("unexpected payload")
}
Expand All @@ -37,9 +53,12 @@ func TestNewPublisherDecorator(t *testing.T) {
},
}

dec = NewPublisherDecorator(pub, WithSpanAttributes(
attribute.Bool("test", true),
))
dec = NewPublisherDecorator(pub,
WithSpanAttributes(
attribute.Bool("test", true),
),
WithTextMapPropagator(),
)
)

pd, ok := dec.(*PublisherDecorator)
Expand All @@ -53,6 +72,8 @@ func TestNewPublisherDecorator(t *testing.T) {

msg := message.NewMessage(uuid, payload)

msg.SetContext(trace.ContextWithSpanContext(msg.Context(), sc))

if err := dec.Publish("test.topic", msg); err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand Down
12 changes: 10 additions & 2 deletions pkg/opentelemetry/subscribers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package opentelemetry
import (
"github.com/ThreeDotsLabs/watermill/message"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.10.0"
"go.opentelemetry.io/otel/trace"
)
Expand Down Expand Up @@ -31,8 +32,15 @@ func TraceHandler(h message.HandlerFunc, options ...Option) message.HandlerFunc
}

return func(msg *message.Message) ([]*message.Message, error) {
spanName := message.HandlerNameFromCtx(msg.Context())
ctx, span := tracer.Start(msg.Context(), spanName, spanOptions...)
ctx := msg.Context()

if config.textMapPropagator != nil {
carrier := propagation.MapCarrier(msg.Metadata)
ctx = config.textMapPropagator.Extract(ctx, carrier)
}

spanName := message.HandlerNameFromCtx(ctx)
ctx, span := tracer.Start(ctx, spanName, spanOptions...)
span.SetAttributes(
semconv.MessagingDestinationKindTopic,
semconv.MessagingDestinationKey.String(message.SubscribeTopicFromCtx(ctx)),
Expand Down
43 changes: 43 additions & 0 deletions pkg/opentelemetry/subscribers_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package opentelemetry

import (
"fmt"
"testing"

"github.com/ThreeDotsLabs/watermill/message"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

func TestTrace(t *testing.T) {
Expand All @@ -31,6 +33,47 @@ func TestTrace(t *testing.T) {
}
}

func TestTraceWithTextMapPropagator(t *testing.T) {
middleware := Trace(WithTextMapPropagator())

var (
traceParentKey = "traceparent"
traceID, _ = trace.TraceIDFromHex("093615e8ce177910353c5a09782ba62a")
spanID, _ = trace.SpanIDFromHex("98c5fa0e132dd10d")
sc = trace.NewSpanContext(trace.SpanContextConfig{
TraceID: traceID,
SpanID: spanID,
Remote: true,
})
traceParentID = fmt.Sprintf("00-%s-%s-00", traceID.String(), spanID.String())
uuid = "52219531-0cd8-4b64-be31-ba6b4ef01472"
payload = message.Payload("test payload for Trace")
msg = message.NewMessage(uuid, payload)
)

msg.Metadata.Set(traceParentKey, traceParentID)

h := func(m *message.Message) ([]*message.Message, error) {
if got, want := m.UUID, uuid; got != want {
t.Fatalf("m.UUID = %q, want %q", got, want)
}

msgContext := m.Context()

if !sc.Equal(trace.SpanContextFromContext(msgContext)) {
scJSON, _ := sc.MarshalJSON()
extractedScJSON, _ := trace.SpanContextFromContext(msgContext).MarshalJSON()
t.Fatalf("span context = %v, want %v", string(extractedScJSON), string(scJSON))
}

return nil, nil
}

if _, err := middleware(h)(msg); err != nil {
t.Fatalf("unexpected error: %v", err)
}
}

func TestTraceNoPublishHandler(t *testing.T) {
var (
uuid = "88b433e5-12fa-4eb7-9229-6bfd67de5c4f"
Expand Down