From 1caa2449ed7c222eee335f9b51e9c6818ff3595f Mon Sep 17 00:00:00 2001 From: Dennis Trautwein Date: Fri, 1 Sep 2023 19:28:04 +0200 Subject: [PATCH] Add traced datastore (#209) * Add trace datastore * Add error recording and status code setting --- go.mod | 5 + go.sum | 15 +- trace/trace.go | 356 ++++++++++++++++++++++++++++++++++++++++++++ trace/trace_test.go | 14 ++ 4 files changed, 389 insertions(+), 1 deletion(-) create mode 100644 trace/trace.go create mode 100644 trace/trace_test.go diff --git a/go.mod b/go.mod index 2fdc6e0..9b6baf4 100644 --- a/go.mod +++ b/go.mod @@ -5,14 +5,19 @@ require ( github.com/ipfs/go-detect-race v0.0.1 github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8 github.com/jbenet/goprocess v0.1.4 + go.opentelemetry.io/otel v1.16.0 + go.opentelemetry.io/otel/trace v1.16.0 go.uber.org/multierr v1.5.0 golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 ) require ( + github.com/go-logr/logr v1.2.4 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/kr/pretty v0.2.0 // indirect github.com/kr/text v0.1.0 // indirect + go.opentelemetry.io/otel/metric v1.16.0 // indirect go.uber.org/atomic v1.6.0 // indirect ) diff --git a/go.sum b/go.sum index a25a102..7db547d 100644 --- a/go.sum +++ b/go.sum @@ -3,6 +3,12 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= +github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -24,8 +30,14 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gtY= +go.opentelemetry.io/otel v1.16.0 h1:Z7GVAX/UkAXPKsy94IU+i6thsQS4nb7LviLpnaNeW8s= +go.opentelemetry.io/otel v1.16.0/go.mod h1:vl0h9NUa1D5s1nv3A5vZOYWn8av4K8Ml6JDeHrT/bx4= +go.opentelemetry.io/otel/metric v1.16.0 h1:RbrpwVG1Hfv85LgnZ7+txXioPDoh6EdbZHo26Q3hqOo= +go.opentelemetry.io/otel/metric v1.16.0/go.mod h1:QE47cpOmkwipPiefDwo2wDzwJrlfxxNYodqc4xnGCo4= +go.opentelemetry.io/otel/trace v1.16.0 h1:8JRpaObFoW0pxuVPapkgH8UhHQj+bJW8jJsCZEu5MQs= +go.opentelemetry.io/otel/trace v1.16.0/go.mod h1:Yt9vYq1SdNz3xdjZZK7wcXv1qv2pwLkqr2QVwea0ef0= go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A= @@ -55,5 +67,6 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM= honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= diff --git a/trace/trace.go b/trace/trace.go new file mode 100644 index 0000000..b892f7e --- /dev/null +++ b/trace/trace.go @@ -0,0 +1,356 @@ +// Package trace wraps a datastore where all datastore interactions are traced +// with open telemetry. +package trace + +import ( + "context" + "fmt" + "io" + + ds "github.com/ipfs/go-datastore" + dsq "github.com/ipfs/go-datastore/query" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + otel "go.opentelemetry.io/otel/trace" +) + +// New returns a new traced datastore. All datastore interactions are traced. +func New(ds ds.Datastore, tracer otel.Tracer) *Datastore { + return &Datastore{ds: ds, tracer: tracer} +} + +// Datastore is an adapter that traces inner datastore interactions. +type Datastore struct { + ds ds.Datastore + tracer otel.Tracer +} + +var ( + _ ds.Datastore = (*Datastore)(nil) + _ ds.Batching = (*Datastore)(nil) + _ ds.PersistentDatastore = (*Datastore)(nil) + _ ds.TxnDatastore = (*Datastore)(nil) + _ ds.CheckedDatastore = (*Datastore)(nil) + _ ds.ScrubbedDatastore = (*Datastore)(nil) + _ ds.GCDatastore = (*Datastore)(nil) + _ io.Closer = (*Datastore)(nil) +) + +// Put implements the ds.Datastore interface. +func (t *Datastore) Put(ctx context.Context, key ds.Key, value []byte) error { + ctx, span := t.tracer.Start(ctx, "Put", otel.WithAttributes(attribute.String("key", key.String()))) + defer span.End() + + err := t.ds.Put(ctx, key, value) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + + return err +} + +// Sync implements Datastore.Sync +func (t *Datastore) Sync(ctx context.Context, key ds.Key) error { + ctx, span := t.tracer.Start(ctx, "Sync", otel.WithAttributes(attribute.String("key", key.String()))) + defer span.End() + + err := t.ds.Sync(ctx, key) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + + return err +} + +// Get implements the ds.Datastore interface. +func (t *Datastore) Get(ctx context.Context, key ds.Key) (value []byte, err error) { + ctx, span := t.tracer.Start(ctx, "Get", otel.WithAttributes(attribute.String("key", key.String()))) + defer span.End() + + val, err := t.ds.Get(ctx, key) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + + return val, err +} + +// Has implements the ds.Datastore interface. +func (t *Datastore) Has(ctx context.Context, key ds.Key) (bool, error) { + ctx, span := t.tracer.Start(ctx, "Has", otel.WithAttributes(attribute.String("key", key.String()))) + defer span.End() + + exists, err := t.ds.Has(ctx, key) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + + return exists, err +} + +// GetSize implements the ds.Datastore interface. +func (t *Datastore) GetSize(ctx context.Context, key ds.Key) (int, error) { + ctx, span := t.tracer.Start(ctx, "GetSize", otel.WithAttributes(attribute.String("key", key.String()))) + defer span.End() + + size, err := t.ds.GetSize(ctx, key) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + + return size, err +} + +// Delete implements the ds.Datastore interface. +func (t *Datastore) Delete(ctx context.Context, key ds.Key) error { + ctx, span := t.tracer.Start(ctx, "Delete", otel.WithAttributes(attribute.String("key", key.String()))) + defer span.End() + + err := t.ds.Delete(ctx, key) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + + return err +} + +// Query implements the ds.Datastore interface. +func (t *Datastore) Query(ctx context.Context, q dsq.Query) (dsq.Results, error) { + ctx, span := t.tracer.Start(ctx, "Query", otel.WithAttributes(attribute.String("query", q.String()))) + defer span.End() + + res, err := t.ds.Query(ctx, q) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + + return res, err +} + +// Batch implements the ds.Batching interface. +func (t *Datastore) Batch(ctx context.Context) (ds.Batch, error) { + ctx, span := t.tracer.Start(ctx, "Batch") + defer span.End() + + if dstore, ok := t.ds.(ds.Batching); ok { + batch, err := dstore.Batch(ctx) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + return batch, err + } + + return ds.NewBasicBatch(t), nil +} + +// DiskUsage implements the ds.PersistentDatastore interface. +func (t *Datastore) DiskUsage(ctx context.Context) (uint64, error) { + ctx, span := t.tracer.Start(ctx, "DiskUsage") + defer span.End() + + usage, err := ds.DiskUsage(ctx, t.ds) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + + return usage, err +} + +// Scrub implements the ds.ScrubbedDatastore interface. +func (t *Datastore) Scrub(ctx context.Context) error { + ctx, span := t.tracer.Start(ctx, "Scrub") + defer span.End() + + if dstore, ok := t.tracer.(ds.ScrubbedDatastore); ok { + err := dstore.Scrub(ctx) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + return err + } + + return nil +} + +// CollectGarbage implements the ds.GCDatastore interface. +func (t *Datastore) CollectGarbage(ctx context.Context) error { + ctx, span := t.tracer.Start(ctx, "CollectGarbage") + defer span.End() + + if dstore, ok := t.tracer.(ds.GCDatastore); ok { + err := dstore.CollectGarbage(ctx) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + return err + } + + return nil +} + +// Check implements the ds.CheckedDatastore interface. +func (t *Datastore) Check(ctx context.Context) error { + ctx, span := t.tracer.Start(ctx, "Check") + defer span.End() + + if dstore, ok := t.tracer.(ds.CheckedDatastore); ok { + err := dstore.Check(ctx) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + return err + } + + return nil +} + +// NewTransaction implements the ds.TxnDatastore interface. +func (t *Datastore) NewTransaction(ctx context.Context, readOnly bool) (ds.Txn, error) { + ctx, span := t.tracer.Start(ctx, "NewTransaction", otel.WithAttributes(attribute.Bool("readOnly", readOnly))) + defer span.End() + + if txnDs, ok := t.ds.(ds.TxnDatastore); ok { + txn, err := txnDs.NewTransaction(ctx, readOnly) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + return &Txn{txn: txn, tracer: t.tracer}, nil + } + + return nil, fmt.Errorf("transactions are unsupported by traced datastore") +} + +// Close closes the inner datastore (if it implements the io.Closer interface). +func (t *Datastore) Close() error { + if closer, ok := t.ds.(io.Closer); ok { + return closer.Close() + } + return nil +} + +// Txn is an adapter that traces datastore transactions +type Txn struct { + txn ds.Txn + tracer otel.Tracer +} + +var _ ds.Txn = (*Txn)(nil) + +// Put implements the ds.Txn interface. +func (t *Txn) Put(ctx context.Context, key ds.Key, value []byte) error { + ctx, span := t.tracer.Start(ctx, "Put", otel.WithAttributes(attribute.String("key", key.String()))) + defer span.End() + + err := t.txn.Put(ctx, key, value) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + + return err +} + +// Get implements the ds.Txn interface. +func (t *Txn) Get(ctx context.Context, key ds.Key) (value []byte, err error) { + ctx, span := t.tracer.Start(ctx, "Get", otel.WithAttributes(attribute.String("key", key.String()))) + defer span.End() + + val, err := t.txn.Get(ctx, key) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + + return val, err +} + +// Has implements the ds.Txn interface. +func (t *Txn) Has(ctx context.Context, key ds.Key) (bool, error) { + ctx, span := t.tracer.Start(ctx, "Has", otel.WithAttributes(attribute.String("key", key.String()))) + defer span.End() + + exists, err := t.txn.Has(ctx, key) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + + return exists, err +} + +// GetSize implements the ds.Txn interface. +func (t *Txn) GetSize(ctx context.Context, key ds.Key) (int, error) { + ctx, span := t.tracer.Start(ctx, "GetSize", otel.WithAttributes(attribute.String("key", key.String()))) + defer span.End() + + size, err := t.txn.GetSize(ctx, key) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + + return size, err +} + +// Delete implements the ds.Txn interface. +func (t *Txn) Delete(ctx context.Context, key ds.Key) error { + ctx, span := t.tracer.Start(ctx, "Delete", otel.WithAttributes(attribute.String("key", key.String()))) + defer span.End() + + err := t.txn.Delete(ctx, key) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + + return err +} + +// Query implements the ds.Txn interface. +func (t *Txn) Query(ctx context.Context, q dsq.Query) (dsq.Results, error) { + ctx, span := t.tracer.Start(ctx, "Query", otel.WithAttributes(attribute.String("query", q.String()))) + defer span.End() + + res, err := t.txn.Query(ctx, q) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + + return res, err +} + +// Commit implements the ds.Txn interface. +func (t *Txn) Commit(ctx context.Context) error { + ctx, span := t.tracer.Start(ctx, "Commit") + defer span.End() + + err := t.txn.Commit(ctx) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + + return err +} + +// Discard implements the ds.Txn interface. +func (t *Txn) Discard(ctx context.Context) { + ctx, span := t.tracer.Start(ctx, "Discard") + defer span.End() + t.txn.Discard(ctx) +} diff --git a/trace/trace_test.go b/trace/trace_test.go new file mode 100644 index 0000000..3ff9619 --- /dev/null +++ b/trace/trace_test.go @@ -0,0 +1,14 @@ +package trace + +import ( + "testing" + + "github.com/ipfs/go-datastore" + dstest "github.com/ipfs/go-datastore/test" + "go.opentelemetry.io/otel" +) + +func TestTraceAll(t *testing.T) { + tracer := otel.Tracer("tracer") + dstest.SubtestAll(t, New(datastore.NewMapDatastore(), tracer)) +}