From 5b84d10447243284357ce8586f45d7e762c4a2a3 Mon Sep 17 00:00:00 2001 From: farizap Date: Sun, 24 Nov 2024 01:08:22 +0000 Subject: [PATCH] add option to enable text map propagator --- pkg/opentelemetry/options.go | 14 ++++++++- pkg/opentelemetry/publishers.go | 8 +++++ pkg/opentelemetry/publishers_test.go | 27 +++++++++++++++-- pkg/opentelemetry/subscribers.go | 12 ++++++-- pkg/opentelemetry/subscribers_test.go | 43 +++++++++++++++++++++++++++ 5 files changed, 98 insertions(+), 6 deletions(-) diff --git a/pkg/opentelemetry/options.go b/pkg/opentelemetry/options.go index 61ee277..40f771a 100644 --- a/pkg/opentelemetry/options.go +++ b/pkg/opentelemetry/options.go @@ -2,12 +2,14 @@ package opentelemetry import ( "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/propagation" ) // config represents the configuration options available for subscriber // middlewares and publisher decorators. type config struct { - spanAttributes []attribute.KeyValue + spanAttributes []attribute.KeyValue + textMapPropagator propagation.TextMapPropagator } // Option provides a convenience wrapper for simple options that can be @@ -20,3 +22,13 @@ func WithSpanAttributes(attributes ...attribute.KeyValue) Option { c.spanAttributes = attributes } } + +// WithTextMapPropagator sets the TextMapPropagator in order to propagate context data across process boundaries. +func WithTextMapPropagator() Option { + return func(c *config) { + c.textMapPropagator = propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, + ) + } +} diff --git a/pkg/opentelemetry/publishers.go b/pkg/opentelemetry/publishers.go index 8de07f1..b9f0622 100644 --- a/pkg/opentelemetry/publishers.go +++ b/pkg/opentelemetry/publishers.go @@ -7,6 +7,7 @@ import ( "github.com/ThreeDotsLabs/watermill/message" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/propagation" semconv "go.opentelemetry.io/otel/semconv/v1.10.0" "go.opentelemetry.io/otel/trace" ) @@ -66,6 +67,13 @@ func (p *PublisherDecorator) Publish(topic string, messages ...*message.Message) spanAttributes = append(spanAttributes, spanAttributes...) span.SetAttributes(spanAttributes...) + if p.config.textMapPropagator != nil { + for i, msg := range messages { + p.config.textMapPropagator.Inject(ctx, propagation.MapCarrier(msg.Metadata)) + messages[i] = msg + } + } + err := p.pub.Publish(topic, messages...) if err != nil { span.RecordError(err) diff --git a/pkg/opentelemetry/publishers_test.go b/pkg/opentelemetry/publishers_test.go index 8f611c1..ce46e80 100644 --- a/pkg/opentelemetry/publishers_test.go +++ b/pkg/opentelemetry/publishers_test.go @@ -2,10 +2,12 @@ package opentelemetry import ( "bytes" + "fmt" "testing" "github.com/ThreeDotsLabs/watermill/message" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) func TestNewPublisherDecorator(t *testing.T) { @@ -13,6 +15,16 @@ func TestNewPublisherDecorator(t *testing.T) { uuid = "0d5427ea-7ab4-4ef1-b80d-0a22bd54a98f" payload = message.Payload("test payload") + traceParentKey = "traceparent" + traceID, _ = trace.TraceIDFromHex("093615e8ce177910353c5a09782ba62a") + spanID, _ = trace.SpanIDFromHex("98c5fa0e132dd10d") + traceParentID = fmt.Sprintf("00-%s-%s-00", traceID.String(), spanID.String()) + sc = trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: traceID, + SpanID: spanID, + Remote: true, + }) + pub = &mockMessagePublisher{ PublishFunc: func(topic string, messages ...*message.Message) error { if got, want := topic, "test.topic"; got != want { @@ -29,6 +41,10 @@ func TestNewPublisherDecorator(t *testing.T) { t.Fatalf("message.UUID = %q, want %q", got, want) } + if got, want := message.Metadata.Get(traceParentKey), traceParentID; got != want { + t.Fatalf("message.Metadata.Get(%q) = %q, want %q", traceParentKey, got, want) + } + if !bytes.Equal(payload, message.Payload) { t.Fatalf("unexpected payload") } @@ -37,9 +53,12 @@ func TestNewPublisherDecorator(t *testing.T) { }, } - dec = NewPublisherDecorator(pub, WithSpanAttributes( - attribute.Bool("test", true), - )) + dec = NewPublisherDecorator(pub, + WithSpanAttributes( + attribute.Bool("test", true), + ), + WithTextMapPropagator(), + ) ) pd, ok := dec.(*PublisherDecorator) @@ -53,6 +72,8 @@ func TestNewPublisherDecorator(t *testing.T) { msg := message.NewMessage(uuid, payload) + msg.SetContext(trace.ContextWithSpanContext(msg.Context(), sc)) + if err := dec.Publish("test.topic", msg); err != nil { t.Fatalf("unexpected error: %v", err) } diff --git a/pkg/opentelemetry/subscribers.go b/pkg/opentelemetry/subscribers.go index 69b15ee..e805949 100644 --- a/pkg/opentelemetry/subscribers.go +++ b/pkg/opentelemetry/subscribers.go @@ -3,6 +3,7 @@ package opentelemetry import ( "github.com/ThreeDotsLabs/watermill/message" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" semconv "go.opentelemetry.io/otel/semconv/v1.10.0" "go.opentelemetry.io/otel/trace" ) @@ -31,8 +32,15 @@ func TraceHandler(h message.HandlerFunc, options ...Option) message.HandlerFunc } return func(msg *message.Message) ([]*message.Message, error) { - spanName := message.HandlerNameFromCtx(msg.Context()) - ctx, span := tracer.Start(msg.Context(), spanName, spanOptions...) + ctx := msg.Context() + + if config.textMapPropagator != nil { + carrier := propagation.MapCarrier(msg.Metadata) + ctx = config.textMapPropagator.Extract(ctx, carrier) + } + + spanName := message.HandlerNameFromCtx(ctx) + ctx, span := tracer.Start(ctx, spanName, spanOptions...) span.SetAttributes( semconv.MessagingDestinationKindTopic, semconv.MessagingDestinationKey.String(message.SubscribeTopicFromCtx(ctx)), diff --git a/pkg/opentelemetry/subscribers_test.go b/pkg/opentelemetry/subscribers_test.go index 03e8480..3f2816d 100644 --- a/pkg/opentelemetry/subscribers_test.go +++ b/pkg/opentelemetry/subscribers_test.go @@ -1,10 +1,12 @@ package opentelemetry import ( + "fmt" "testing" "github.com/ThreeDotsLabs/watermill/message" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) func TestTrace(t *testing.T) { @@ -31,6 +33,47 @@ func TestTrace(t *testing.T) { } } +func TestTraceWithTextMapPropagator(t *testing.T) { + middleware := Trace(WithTextMapPropagator()) + + var ( + traceParentKey = "traceparent" + traceID, _ = trace.TraceIDFromHex("093615e8ce177910353c5a09782ba62a") + spanID, _ = trace.SpanIDFromHex("98c5fa0e132dd10d") + sc = trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: traceID, + SpanID: spanID, + Remote: true, + }) + traceParentID = fmt.Sprintf("00-%s-%s-00", traceID.String(), spanID.String()) + uuid = "52219531-0cd8-4b64-be31-ba6b4ef01472" + payload = message.Payload("test payload for Trace") + msg = message.NewMessage(uuid, payload) + ) + + msg.Metadata.Set(traceParentKey, traceParentID) + + h := func(m *message.Message) ([]*message.Message, error) { + if got, want := m.UUID, uuid; got != want { + t.Fatalf("m.UUID = %q, want %q", got, want) + } + + msgContext := m.Context() + + if !sc.Equal(trace.SpanContextFromContext(msgContext)) { + scJSON, _ := sc.MarshalJSON() + extractedScJSON, _ := trace.SpanContextFromContext(msgContext).MarshalJSON() + t.Fatalf("span context = %v, want %v", string(extractedScJSON), string(scJSON)) + } + + return nil, nil + } + + if _, err := middleware(h)(msg); err != nil { + t.Fatalf("unexpected error: %v", err) + } +} + func TestTraceNoPublishHandler(t *testing.T) { var ( uuid = "88b433e5-12fa-4eb7-9229-6bfd67de5c4f"