Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

otelmongo: Add command.duration metric #4805

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -259,15 +259,16 @@ test-mongo-driver:
set -e; \
docker run --name mongo-integ --rm -p 27017:27017 -d mongo; \
CMD=mongo IMG_NAME=mongo-integ ./tools/wait.sh; \
(cd instrumentation/go.mongodb.org/mongo-driver/mongo/otelmongo/test && \
$(GO) test \
-covermode=$(COVERAGE_MODE) \
-coverprofile=$(COVERAGE_PROFILE) \
-coverpkg=go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mongo-driver/mongo/otelmongo/... \
./... \
&& $(GO) tool cover -html=$(COVERAGE_PROFILE) -o coverage.html); \
trap 'docker stop mongo-integ' EXIT; \
pushd instrumentation/go.mongodb.org/mongo-driver/mongo/otelmongo/test; \
$(GO) test \
-covermode=$(COVERAGE_MODE) \
-coverprofile=$(COVERAGE_PROFILE) \
-coverpkg=go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mongo-driver/mongo/otelmongo/... \
./...; \
$(GO) tool cover -html=$(COVERAGE_PROFILE) -o coverage.html; \
popd; \
cp ./instrumentation/go.mongodb.org/mongo-driver/mongo/otelmongo/test/coverage.out ./; \
docker stop mongo-integ; \
fi

# Releasing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package otelmongo // import "go.opentelemetry.io/contrib/instrumentation/go.mong

import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
)

Expand All @@ -25,8 +26,10 @@ const ScopeName = "go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mo
// config is used to configure the mongo tracer.
type config struct {
TracerProvider trace.TracerProvider
MeterProvider metric.MeterProvider

Tracer trace.Tracer
Meter metric.Meter

CommandAttributeDisabled bool
}
Expand All @@ -35,6 +38,7 @@ type config struct {
func newConfig(opts ...Option) config {
cfg := config{
TracerProvider: otel.GetTracerProvider(),
MeterProvider: otel.GetMeterProvider(),
CommandAttributeDisabled: true,
}
for _, opt := range opts {
Expand All @@ -45,6 +49,10 @@ func newConfig(opts ...Option) config {
ScopeName,
trace.WithInstrumentationVersion(Version()),
)
cfg.Meter = cfg.MeterProvider.Meter(
ScopeName,
metric.WithInstrumentationVersion(Version()),
)
return cfg
}

Expand All @@ -69,6 +77,16 @@ func WithTracerProvider(provider trace.TracerProvider) Option {
})
}

// WithMeterProvider specifies a meter provider to use for recording metrics.
// If none is specified, the global provider is used.
func WithMeterProvider(provider metric.MeterProvider) Option {
return optionFunc(func(cfg *config) {
if provider != nil {
cfg.MeterProvider = provider
}
})
}

// WithCommandAttributeDisabled specifies if the MongoDB command is added as an attribute to Spans or not.
// This is disabled by default and the MongoDB command will not be added as an attribute
// to Spans if this option is not provided.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.20
require (
go.mongodb.org/mongo-driver v1.13.1
go.opentelemetry.io/otel v1.21.0
go.opentelemetry.io/otel/metric v1.21.0
go.opentelemetry.io/otel/trace v1.21.0
)

Expand All @@ -18,7 +19,6 @@ require (
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
go.opentelemetry.io/otel/metric v1.21.0 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 // indirect
golang.org/x/text v0.14.0 // indirect
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,89 +21,103 @@ import (
"strings"
"sync"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/metric"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"go.opentelemetry.io/otel/trace"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/event"
)

type spanKey struct {
type requestKey struct {
ConnectionID string
RequestID int64
}

type request struct {
span trace.Span
attrs []attribute.KeyValue
}

type monitor struct {
sync.Mutex
spans map[spanKey]trace.Span
cfg config
requests map[requestKey]request
cfg config
durations metric.Float64Histogram
}

func (m *monitor) Started(ctx context.Context, evt *event.CommandStartedEvent) {
var spanName string

hostname, port := peerInfo(evt)

attrs := []attribute.KeyValue{
attrs := make([]attribute.KeyValue, 0, 8)
attrs = append(attrs,
semconv.DBSystemMongoDB,
semconv.DBOperation(evt.CommandName),
semconv.DBName(evt.DatabaseName),
semconv.NetPeerName(hostname),
semconv.NetPeerPort(port),
semconv.NetTransportTCP,
}
if !m.cfg.CommandAttributeDisabled {
attrs = append(attrs, semconv.DBStatement(sanitizeCommand(evt.Command)))
}
)
if collection, err := extractCollection(evt); err == nil && collection != "" {
spanName = collection + "."
attrs = append(attrs, semconv.DBMongoDBCollection(collection))
}
spanAttrs := attrs
if !m.cfg.CommandAttributeDisabled {
spanAttrs = append(spanAttrs, semconv.DBStatement(sanitizeCommand(evt.Command)))
}
spanName += evt.CommandName
opts := []trace.SpanStartOption{
trace.WithSpanKind(trace.SpanKindClient),
trace.WithAttributes(attrs...),
trace.WithAttributes(spanAttrs...),
}
_, span := m.cfg.Tracer.Start(ctx, spanName, opts...)
key := spanKey{
key := requestKey{
ConnectionID: evt.ConnectionID,
RequestID: evt.RequestID,
}
m.Lock()
m.spans[key] = span
m.requests[key] = request{span, attrs}
m.Unlock()
}

func (m *monitor) Succeeded(ctx context.Context, evt *event.CommandSucceededEvent) {
m.Finished(&evt.CommandFinishedEvent, nil)
m.Finished(ctx, &evt.CommandFinishedEvent, nil)
}

func (m *monitor) Failed(ctx context.Context, evt *event.CommandFailedEvent) {
m.Finished(&evt.CommandFinishedEvent, fmt.Errorf("%s", evt.Failure))
m.Finished(ctx, &evt.CommandFinishedEvent, fmt.Errorf("%s", evt.Failure))
}

func (m *monitor) Finished(evt *event.CommandFinishedEvent, err error) {
key := spanKey{
func (m *monitor) Finished(ctx context.Context, evt *event.CommandFinishedEvent, err error) {
key := requestKey{
ConnectionID: evt.ConnectionID,
RequestID: evt.RequestID,
}
m.Lock()
span, ok := m.spans[key]
request, ok := m.requests[key]
if ok {
delete(m.spans, key)
delete(m.requests, key)
}
m.Unlock()
if !ok {
return
}

if err != nil {
span.SetStatus(codes.Error, err.Error())
request.span.SetStatus(codes.Error, err.Error())
request.attrs = append(request.attrs, semconv.OtelStatusCodeError)
} else {
request.attrs = append(request.attrs, semconv.OtelStatusCodeOk)
}
m.durations.Record(ctx, evt.Duration.Seconds()/1000, metric.WithAttributeSet(attribute.NewSet(request.attrs...)))

span.End()
request.span.End()
}

// TODO sanitize values where possible, then reenable `db.statement` span attributes default.
Expand Down Expand Up @@ -135,9 +149,14 @@ func extractCollection(evt *event.CommandStartedEvent) (string, error) {
// NewMonitor creates a new mongodb event CommandMonitor.
func NewMonitor(opts ...Option) *event.CommandMonitor {
cfg := newConfig(opts...)
histogram, err := cfg.Meter.Float64Histogram("command.duration", metric.WithUnit("ms"), metric.WithDescription("Duration of finished commands"))
if err != nil {
otel.Handle(err)
}
m := &monitor{
spans: make(map[spanKey]trace.Span),
cfg: cfg,
requests: make(map[requestKey]request),
cfg: cfg,
durations: histogram,
}
return &event.CommandMonitor{
Started: m.Started,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mongo-driver/mongo/otelmongo v0.46.1
go.opentelemetry.io/otel v1.21.0
go.opentelemetry.io/otel/sdk v1.21.0
go.opentelemetry.io/otel/sdk/metric v1.21.0
go.opentelemetry.io/otel/trace v1.21.0
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ go.opentelemetry.io/otel/metric v1.21.0 h1:tlYWfeo+Bocx5kLEloTjbcDwBuELRrIFxwdQ3
go.opentelemetry.io/otel/metric v1.21.0/go.mod h1:o1p3CA8nNHW8j5yuQLdc1eeqEaPfzug24uvsyIEJRWM=
go.opentelemetry.io/otel/sdk v1.21.0 h1:FTt8qirL1EysG6sTQRZ5TokkU8d0ugCj8htOgThZXQ8=
go.opentelemetry.io/otel/sdk v1.21.0/go.mod h1:Nna6Yv7PWTdgJHVRD9hIYywQBRx7pbox6nwBnZIxl/E=
go.opentelemetry.io/otel/sdk/metric v1.21.0 h1:smhI5oD714d6jHE6Tie36fPx4WDFIg+Y6RfAY4ICcR0=
go.opentelemetry.io/otel/sdk/metric v1.21.0/go.mod h1:FJ8RAsoPGv/wYMgBdUJXOm+6pzFY3YdljnXtv1SBE8Q=
go.opentelemetry.io/otel/trace v1.21.0 h1:WD9i5gzvoUPuXIXH24ZNBudiarZDKuekPqi/E8fpfLc=
go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ import (
"go.opentelemetry.io/contrib/internal/util"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/sdk/instrumentation"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -104,6 +107,7 @@ func TestDBCrudOperation(t *testing.T) {
t.Run(title, func(t *testing.T) {
sr := tracetest.NewSpanRecorder()
provider := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr))
mr := sdkmetric.NewManualReader()

ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
Expand All @@ -115,6 +119,7 @@ func TestDBCrudOperation(t *testing.T) {
opts.Monitor = otelmongo.NewMonitor(
otelmongo.WithTracerProvider(provider),
otelmongo.WithCommandAttributeDisabled(tc.excludeCommand),
otelmongo.WithMeterProvider(sdkmetric.NewMeterProvider(sdkmetric.WithReader(mr))),
)
opts.ApplyURI(addr)
client, err := mongo.Connect(ctx, opts)
Expand Down Expand Up @@ -149,6 +154,37 @@ func TestDBCrudOperation(t *testing.T) {
for _, v := range tc.validators {
assert.True(t, v(s))
}

var md metricdata.ResourceMetrics
err = mr.Collect(context.Background(), &md)
if !assert.NoError(t, err) {
t.FailNow()
}
assert.Len(t, md.ScopeMetrics, 1)
scopedMetrics := md.ScopeMetrics[0]
assert.Equal(t, instrumentation.Scope{
Name: otelmongo.ScopeName,
Version: otelmongo.Version(),
SchemaURL: "",
}, scopedMetrics.Scope)
assert.Len(t, scopedMetrics.Metrics, 1)
metrics := scopedMetrics.Metrics[0]
assert.Equal(t, "command.duration", metrics.Name)
assert.Equal(t, "Duration of finished commands", metrics.Description)
assert.Equal(t, "ms", metrics.Unit)
assert.Len(t, metrics.Data.(metricdata.Histogram[float64]).DataPoints, 1)
dp := metrics.Data.(metricdata.Histogram[float64]).DataPoints[0]
attrs = dp.Attributes.ToSlice()
assert.Contains(t, attrs, attribute.String("db.system", "mongodb"))
assert.Contains(t, attrs, attribute.String("db.operation", "insert"))
assert.Contains(t, attrs, attribute.String("db.name", "test-database"))
assert.Contains(t, attrs, attribute.String("net.peer.name", "localhost"))
assert.Contains(t, attrs, attribute.Int64("net.peer.port", int64(27017)))
assert.Contains(t, attrs, attribute.String("net.transport", "ip_tcp"))
assert.Contains(t, attrs, attribute.String("db.mongodb.collection", "test-collection"))
assert.Contains(t, attrs, attribute.String("otel.status_code", "OK"))
assert.EqualValues(t, 1, dp.Count)
assert.Greater(t, dp.Sum, 0.0)
})
}
}
Expand Down Expand Up @@ -212,6 +248,7 @@ func TestDBCollectionAttribute(t *testing.T) {
opts.Monitor = otelmongo.NewMonitor(
otelmongo.WithTracerProvider(provider),
otelmongo.WithCommandAttributeDisabled(true),
otelmongo.WithMeterProvider(sdkmetric.NewMeterProvider()),
)
opts.ApplyURI(addr)
client, err := mongo.Connect(ctx, opts)
Expand Down