diff --git a/contrib/valkey-go/example_test.go b/contrib/valkey-go/example_test.go new file mode 100644 index 0000000000..f9105ea894 --- /dev/null +++ b/contrib/valkey-go/example_test.go @@ -0,0 +1,40 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + +package valkey_test + +import ( + "context" + "log/slog" + + "github.com/valkey-io/valkey-go" + valkeytrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/valkey-go" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" +) + +// To start tracing Valkey, simply create a new client using the library and continue +// using as you normally would. +func Example() { + vk, err := valkeytrace.NewClient(valkey.ClientOption{ + InitAddress: []string{"localhost:6379"}, + }) + if err != nil { + slog.Error(err.Error()) + return + } + + span, ctx := tracer.StartSpanFromContext(context.Background(), "parent.request", + tracer.SpanType(ext.SpanTypeValkey), + tracer.ServiceName("web"), + tracer.ResourceName("/home"), + ) + + if err := vk.Do(ctx, vk.B().Set().Key("key").Value("value").Build()).Error(); err != nil { + slog.ErrorContext(ctx, "set a value", slog.Any("error", err)) + } + + span.Finish() +} diff --git a/contrib/valkey-go/option.go b/contrib/valkey-go/option.go new file mode 100644 index 0000000000..21b16498ae --- /dev/null +++ b/contrib/valkey-go/option.go @@ -0,0 +1,83 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + +// Package redis provides tracing functions for tracing the go-redis/redis package (https://github.com/go-redis/redis). +// This package supports versions up to go-redis 6.15. +package valkey + +import ( + "math" + "os" + + "gopkg.in/DataDog/dd-trace-go.v1/internal" + "gopkg.in/DataDog/dd-trace-go.v1/internal/namingschema" +) + +const defaultServiceName = "valkey.client" + +type clientConfig struct { + serviceName string + spanName string + analyticsRate float64 + skipRaw bool +} + +// ClientOption represents an option that can be used to create or wrap a client. +type ClientOption func(*clientConfig) + +func defaults(cfg *clientConfig) { + cfg.serviceName = namingschema.ServiceNameOverrideV0(defaultServiceName, defaultServiceName) + cfg.spanName = namingschema.OpName(namingschema.ValkeyOutbound) + if internal.BoolEnv("DD_TRACE_VALKEY_ANALYTICS_ENABLED", false) { + cfg.analyticsRate = 1.0 + } else { + cfg.analyticsRate = math.NaN() + } + if v := os.Getenv("DD_TRACE_VALKEY_SERVICE_NAME"); v == "" { + cfg.serviceName = defaultServiceName + } else { + cfg.serviceName = v + } + cfg.skipRaw = internal.BoolEnv("DD_TRACE_VALKEY_SKIP_RAW_COMMAND", false) +} + +// WithSkipRawCommand reports whether to skip setting the raw command value +// on instrumenation spans. This may be useful if the Datadog Agent is not +// set up to obfuscate this value and it could contain sensitive information. +func WithSkipRawCommand(skip bool) ClientOption { + return func(cfg *clientConfig) { + cfg.skipRaw = skip + } +} + +// WithServiceName sets the given service name for the client. +func WithServiceName(name string) ClientOption { + return func(cfg *clientConfig) { + cfg.serviceName = name + } +} + +// WithAnalytics enables Trace Analytics for all started spans. +func WithAnalytics(on bool) ClientOption { + return func(cfg *clientConfig) { + if on { + cfg.analyticsRate = 1.0 + } else { + cfg.analyticsRate = math.NaN() + } + } +} + +// WithAnalyticsRate sets the sampling rate for Trace Analytics events +// correlated to started spans. +func WithAnalyticsRate(rate float64) ClientOption { + return func(cfg *clientConfig) { + if rate >= 0.0 && rate <= 1.0 { + cfg.analyticsRate = rate + } else { + cfg.analyticsRate = math.NaN() + } + } +} diff --git a/contrib/valkey-go/valkey.go b/contrib/valkey-go/valkey.go new file mode 100644 index 0000000000..670856bb99 --- /dev/null +++ b/contrib/valkey-go/valkey.go @@ -0,0 +1,347 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + +// Package redis provides tracing functions for tracing the go-redis/redis package (https://github.com/go-redis/redis). +// This package supports versions up to go-redis 6.15. +package valkey + +import ( + "context" + "math" + "net" + "strconv" + "strings" + "time" + + "github.com/valkey-io/valkey-go" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" + "gopkg.in/DataDog/dd-trace-go.v1/internal/log" + "gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry" +) + +const componentName = "valkey-go/valkey" + +func init() { + telemetry.LoadIntegration(componentName) + tracer.MarkIntegrationImported("github.com/valkey/valkey-go") +} + +var ( + _ valkey.CoreClient = (*coreClient)(nil) + _ valkey.Client = (*client)(nil) + _ valkey.DedicatedClient = (*dedicatedClient)(nil) +) + +type coreClient struct { + valkey.Client + option valkey.ClientOption + clientConfig clientConfig + host string + port int +} + +type client struct { + coreClient +} + +type dedicatedClient struct { + coreClient + dedicatedClient valkey.DedicatedClient +} + +func NewClient(option valkey.ClientOption, opts ...ClientOption) (valkey.Client, error) { + valkeyClient, err := valkey.NewClient(option) + if err != nil { + return nil, err + } + var cfg clientConfig + defaults(&cfg) + for _, fn := range opts { + fn(&cfg) + } + var host, portStr string + var port int + if len(option.InitAddress) == 1 { + host, portStr, err = net.SplitHostPort(option.InitAddress[0]) + if err != nil { + log.Error("valkey.ClientOption.InitAddress contains invalid address: %s", err) + } + port, _ = strconv.Atoi(portStr) + } + core := coreClient{ + Client: valkeyClient, + option: option, + clientConfig: cfg, + host: host, + port: port, + } + return &client{ + coreClient: core, + }, nil +} + +type commander interface { + Commands() []string +} + +func processCmd(commander commander) (command, statement string, size int) { + commands := commander.Commands() + if len(commands) == 0 { + return "", "", 0 + } + command = commands[0] + statement = strings.Join(commands, "\n") + return command, statement, len(statement) +} + +func processMultiCmds(multi []commander) (command, statement string, size int) { + var commands []string + var statements []string + for _, cmd := range multi { + cmdStr, stmt, cmdSize := processCmd(cmd) + size += cmdSize + commands = append(commands, cmdStr) + statements = append(statements, stmt) + } + command = strings.Join(commands, " ") + statement = strings.Join(statements, " ") + return command, statement, size +} + +func processMultiCompleted(multi ...valkey.Completed) (command, statement string, size int) { + cmds := make([]commander, len(multi)) + for i, cmd := range multi { + cmds[i] = &cmd + } + return processMultiCmds(cmds) +} + +func processMultiCacheableTTL(multi ...valkey.CacheableTTL) (command, statement string, size int) { + cmds := make([]commander, len(multi)) + for i, cmd := range multi { + cmds[i] = &cmd.Cmd + } + return processMultiCmds(cmds) +} + +func firstError(s []valkey.ValkeyResult) error { + for _, result := range s { + if err := result.Error(); err != nil && !valkey.IsValkeyNil(err) { + return err + } + } + return nil +} + +func setClientCacheTags(s tracer.Span, result valkey.ValkeyResult) { + s.SetTag(ext.ValkeyClientCacheHit, result.IsCacheHit()) + s.SetTag(ext.ValkeyClientCacheTTL, result.CacheTTL()) + s.SetTag(ext.ValkeyClientCachePTTL, result.CachePTTL()) + s.SetTag(ext.ValkeyClientCachePXAT, result.CachePXAT()) +} + +type buildStartSpanOptionsInput struct { + command string + statement string + size int + isWrite bool + isBlock bool + isMulti bool + isStream bool + skipRawCommand bool +} + +func (c *coreClient) buildStartSpanOptions(input buildStartSpanOptionsInput) []tracer.StartSpanOption { + opts := []tracer.StartSpanOption{ + tracer.SpanType(ext.SpanTypeValkey), + tracer.ServiceName(c.clientConfig.serviceName), + tracer.Tag(ext.TargetHost, c.host), + tracer.Tag(ext.TargetPort, c.port), + tracer.Tag(ext.ValkeyClientVersion, valkey.LibVer), + tracer.Tag(ext.ValkeyClientName, valkey.LibName), + tracer.Tag(ext.Component, componentName), + tracer.Tag(ext.SpanKind, ext.SpanKindClient), + tracer.Tag(ext.DBType, ext.DBSystemValkey), + tracer.Tag(ext.DBSystem, ext.DBSystemValkey), + tracer.Tag(ext.ValkeyDatabaseIndex, c.option.SelectDB), + tracer.Tag(ext.ValkeyClientCommandWrite, input.isWrite), + tracer.Tag(ext.ValkeyClientCommandBlock, input.isBlock), + tracer.Tag(ext.ValkeyClientCommandMulti, input.isMulti), + tracer.Tag(ext.ValkeyClientCommandStream, input.isStream), + tracer.Tag(ext.ValkeyClientCommandWithPassword, c.option.Password != ""), + } + if input.command != "" { + opts = append(opts, []tracer.StartSpanOption{ + // valkeyotel tags + tracer.Tag("db.stmt_size", input.size), + tracer.Tag("db.operation", input.command), + }...) + if input.skipRawCommand { + opts = append(opts, tracer.ResourceName(input.command)) + opts = append(opts, tracer.Tag(ext.DBStatement, input.command)) + } else { + opts = append(opts, tracer.ResourceName(input.statement)) + opts = append(opts, tracer.Tag(ext.DBStatement, input.statement)) + } + } + if c.option.ClientName != "" { + opts = append(opts, tracer.Tag(ext.DBApplication, c.option.ClientName)) + } + if c.option.Username != "" { + opts = append(opts, tracer.Tag(ext.DBUser, c.option.Username)) + } + if !math.IsNaN(c.clientConfig.analyticsRate) { + opts = append(opts, tracer.Tag(ext.EventSampleRate, c.clientConfig.analyticsRate)) + } + return opts +} + +func (c *coreClient) Do(ctx context.Context, cmd valkey.Completed) (resp valkey.ValkeyResult) { + command, statement, size := processCmd(&cmd) + span, ctx := tracer.StartSpanFromContext(ctx, c.clientConfig.spanName, c.buildStartSpanOptions(buildStartSpanOptionsInput{ + command: command, + statement: statement, + size: size, + isWrite: cmd.IsWrite(), + isBlock: cmd.IsBlock(), + skipRawCommand: c.clientConfig.skipRaw, + })...) + resp = c.Client.Do(ctx, cmd) + setClientCacheTags(span, resp) + defer span.Finish(tracer.WithError(resp.Error())) + return resp +} + +func (c *coreClient) DoMulti(ctx context.Context, multi ...valkey.Completed) (resp []valkey.ValkeyResult) { + command, statement, size := processMultiCompleted(multi...) + span, ctx := tracer.StartSpanFromContext(ctx, c.clientConfig.spanName, c.buildStartSpanOptions(buildStartSpanOptionsInput{ + command: command, + statement: statement, + size: size, + isMulti: true, + skipRawCommand: c.clientConfig.skipRaw, + })...) + resp = c.Client.DoMulti(ctx, multi...) + defer span.Finish(tracer.WithError(firstError(resp))) + return resp +} + +func (c *coreClient) Receive(ctx context.Context, subscribe valkey.Completed, fn func(msg valkey.PubSubMessage)) (err error) { + command, statement, size := processCmd(&subscribe) + span, ctx := tracer.StartSpanFromContext(ctx, c.clientConfig.spanName, c.buildStartSpanOptions(buildStartSpanOptionsInput{ + command: command, + statement: statement, + size: size, + isWrite: subscribe.IsWrite(), + isBlock: subscribe.IsBlock(), + skipRawCommand: c.clientConfig.skipRaw, + })...) + err = c.Client.Receive(ctx, subscribe, fn) + defer span.Finish(tracer.WithError(err)) + return err +} + +func (c *client) DoCache(ctx context.Context, cmd valkey.Cacheable, ttl time.Duration) (resp valkey.ValkeyResult) { + command, statement, size := processCmd(&cmd) + span, ctx := tracer.StartSpanFromContext(ctx, c.clientConfig.spanName, c.buildStartSpanOptions(buildStartSpanOptionsInput{ + command: command, + statement: statement, + size: size, + isMulti: cmd.IsMGet(), + skipRawCommand: c.clientConfig.skipRaw, + })...) + resp = c.Client.DoCache(ctx, cmd, ttl) + setClientCacheTags(span, resp) + defer span.Finish(tracer.WithError(resp.Error())) + return resp +} + +func (c *client) DoMultiCache(ctx context.Context, multi ...valkey.CacheableTTL) (resp []valkey.ValkeyResult) { + command, statement, size := processMultiCacheableTTL(multi...) + span, ctx := tracer.StartSpanFromContext(ctx, c.clientConfig.spanName, c.buildStartSpanOptions(buildStartSpanOptionsInput{ + command: command, + statement: statement, + size: size, + isWrite: true, + skipRawCommand: c.clientConfig.skipRaw, + })...) + resp = c.Client.DoMultiCache(ctx, multi...) + defer span.Finish(tracer.WithError(firstError(resp))) + return resp +} + +func (c *client) DoStream(ctx context.Context, cmd valkey.Completed) (resp valkey.ValkeyResultStream) { + command, statement, size := processCmd(&cmd) + span, ctx := tracer.StartSpanFromContext(ctx, c.clientConfig.spanName, c.buildStartSpanOptions(buildStartSpanOptionsInput{ + command: command, + statement: statement, + size: size, + isWrite: cmd.IsWrite(), + isBlock: cmd.IsBlock(), + isStream: true, + skipRawCommand: c.clientConfig.skipRaw, + })...) + resp = c.Client.DoStream(ctx, cmd) + defer span.Finish(tracer.WithError(resp.Error())) + return resp +} + +func (c *client) DoMultiStream(ctx context.Context, multi ...valkey.Completed) (resp valkey.MultiValkeyResultStream) { + command, statement, size := processMultiCompleted(multi...) + span, ctx := tracer.StartSpanFromContext(ctx, c.clientConfig.spanName, c.buildStartSpanOptions(buildStartSpanOptionsInput{ + command: command, + statement: statement, + size: size, + isMulti: true, + isStream: true, + skipRawCommand: c.clientConfig.skipRaw, + })...) + resp = c.Client.DoMultiStream(ctx, multi...) + defer span.Finish(tracer.WithError(resp.Error())) + return resp +} + +func (c *client) Dedicated(fn func(valkey.DedicatedClient) error) (err error) { + return c.Client.Dedicated(func(dc valkey.DedicatedClient) error { + return fn(&dedicatedClient{ + coreClient: c.coreClient, + dedicatedClient: dc, + }) + }) +} + +func (c *client) Dedicate() (client valkey.DedicatedClient, cancel func()) { + dedicated, cancel := c.coreClient.Client.Dedicate() + return &dedicatedClient{ + coreClient: c.coreClient, + dedicatedClient: dedicated, + }, cancel +} + +func (c *client) Nodes() map[string]valkey.Client { + nodes := c.Client.Nodes() + for addr, valkeyClient := range nodes { + host, portStr, err := net.SplitHostPort(addr) + if err != nil { + log.Error("invalid address is set to valkey client: %s", err) + } + port, _ := strconv.Atoi(portStr) + nodes[addr] = &client{ + coreClient: coreClient{ + Client: valkeyClient, + option: c.option, + clientConfig: c.clientConfig, + host: host, + port: port, + }, + } + } + return nodes +} + +func (c *dedicatedClient) SetPubSubHooks(hooks valkey.PubSubHooks) <-chan error { + return c.dedicatedClient.SetPubSubHooks(hooks) +} diff --git a/contrib/valkey-go/valkey_test.go b/contrib/valkey-go/valkey_test.go new file mode 100644 index 0000000000..9306440edb --- /dev/null +++ b/contrib/valkey-go/valkey_test.go @@ -0,0 +1,302 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. +package valkey_test + +import ( + "context" + "fmt" + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/valkey-io/valkey-go" + valkeytrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/valkey-go" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer" +) + +var ( + // See docker-compose.yaml + valkeyPort = 6380 + valkeyUsername = "default" + valkeyPassword = "password-for-default" +) + +func TestMain(m *testing.M) { + _, ok := os.LookupEnv("INTEGRATION") + if !ok { + fmt.Println("--- SKIP: to enable integration test, set the INTEGRATION environment variable") + os.Exit(0) + } + os.Exit(m.Run()) +} + +func TestNewClient(t *testing.T) { + tests := []struct { + name string + valkeyClientOptions valkey.ClientOption + valkeytraceClientOptions []valkeytrace.ClientOption + createSpans func(*testing.T, context.Context, valkey.Client) + assertNewClientError func(*testing.T, error) + assertSpans []func(*testing.T, mocktracer.Span) + }{ + { + name: "Test invalid username", + valkeyClientOptions: valkey.ClientOption{ + InitAddress: []string{fmt.Sprintf("localhost:%d", valkeyPort)}, + Username: "invalid-username", + Password: valkeyPassword, + }, + assertNewClientError: func(t *testing.T, err error) { + assert.EqualError(t, err, "WRONGPASS invalid username-password pair or user is disabled.") + }, + }, + { + name: "Test invalid password", + valkeyClientOptions: valkey.ClientOption{ + InitAddress: []string{fmt.Sprintf("localhost:%d", valkeyPort)}, + Username: valkeyUsername, + Password: "invalid", + }, + assertNewClientError: func(t *testing.T, err error) { + assert.EqualError(t, err, "WRONGPASS invalid username-password pair or user is disabled.") + }, + }, + { + name: "Test SET command with custom options", + valkeyClientOptions: valkey.ClientOption{ + InitAddress: []string{fmt.Sprintf("localhost:%d", valkeyPort)}, + Username: valkeyUsername, + Password: valkeyPassword, + }, + valkeytraceClientOptions: []valkeytrace.ClientOption{ + valkeytrace.WithServiceName("my-valkey-client"), + valkeytrace.WithAnalytics(true), + valkeytrace.WithSkipRawCommand(true), + }, + createSpans: func(t *testing.T, ctx context.Context, client valkey.Client) { + assert.NoError(t, client.Do(ctx, client.B().Set().Key("test_key").Value("test_value").Build()).Error()) + }, + assertSpans: []func(t *testing.T, span mocktracer.Span){ + func(t *testing.T, span mocktracer.Span) { + assert.Equal(t, "my-valkey-client", span.Tag(ext.ServiceName)) + assert.Equal(t, "localhost", span.Tag(ext.TargetHost)) + assert.Equal(t, valkeyPort, span.Tag(ext.TargetPort)) + assert.Equal(t, "SET", span.Tag(ext.DBStatement)) + assert.Equal(t, "SET", span.Tag(ext.ResourceName)) + assert.Greater(t, span.Tag("db.stmt_size"), 0) + assert.Equal(t, "SET", span.Tag("db.operation")) + assert.True(t, span.Tag(ext.ValkeyClientCommandWrite).(bool)) + assert.False(t, span.Tag(ext.ValkeyClientCommandStream).(bool)) + assert.False(t, span.Tag(ext.ValkeyClientCommandBlock).(bool)) + assert.False(t, span.Tag(ext.ValkeyClientCommandMulti).(bool)) + assert.False(t, span.Tag(ext.ValkeyClientCacheHit).(bool)) + assert.Less(t, span.Tag(ext.ValkeyClientCacheTTL), int64(0)) + assert.Less(t, span.Tag(ext.ValkeyClientCachePXAT), int64(0)) + assert.Less(t, span.Tag(ext.ValkeyClientCachePTTL), int64(0)) + assert.Nil(t, span.Tag(ext.DBApplication)) + assert.Equal(t, 1.0, span.Tag(ext.EventSampleRate)) + assert.Nil(t, span.Tag(ext.Error)) + }, + }, + }, + { + name: "Test SET/GET commands", + valkeyClientOptions: valkey.ClientOption{ + InitAddress: []string{fmt.Sprintf("localhost:%d", valkeyPort)}, + Username: valkeyUsername, + Password: valkeyPassword, + ClientName: "my-valkey-client", + }, + createSpans: func(t *testing.T, ctx context.Context, client valkey.Client) { + resp := client.DoMulti(ctx, client.B().Set().Key("test_key").Value("test_value").Build(), client.B().Get().Key("test_key").Build()) + assert.Len(t, resp, 2) + }, + assertSpans: []func(t *testing.T, span mocktracer.Span){ + func(t *testing.T, span mocktracer.Span) { + assert.Equal(t, "valkey.client", span.Tag(ext.ServiceName)) + assert.Equal(t, "localhost", span.Tag(ext.TargetHost)) + assert.Equal(t, valkeyPort, span.Tag(ext.TargetPort)) + assert.Equal(t, "SET\ntest_key\ntest_value GET\ntest_key", span.Tag(ext.DBStatement)) + assert.Equal(t, "SET\ntest_key\ntest_value GET\ntest_key", span.Tag(ext.ResourceName)) + assert.Greater(t, span.Tag("db.stmt_size"), 0) + assert.Equal(t, "SET GET", span.Tag("db.operation")) + assert.False(t, span.Tag(ext.ValkeyClientCommandWrite).(bool)) + assert.False(t, span.Tag(ext.ValkeyClientCommandStream).(bool)) + assert.False(t, span.Tag(ext.ValkeyClientCommandBlock).(bool)) + assert.True(t, span.Tag(ext.ValkeyClientCommandMulti).(bool)) + assert.Nil(t, span.Tag(ext.ValkeyClientCacheHit)) + assert.Nil(t, span.Tag(ext.ValkeyClientCacheTTL)) + assert.Nil(t, span.Tag(ext.ValkeyClientCachePXAT)) + assert.Nil(t, span.Tag(ext.ValkeyClientCachePTTL)) + assert.Equal(t, "my-valkey-client", span.Tag(ext.DBApplication)) + assert.Nil(t, span.Tag(ext.EventSampleRate)) + assert.Nil(t, span.Tag(ext.Error)) + }, + }, + }, + { + name: "Test HMGET command with cache", + valkeyClientOptions: valkey.ClientOption{ + InitAddress: []string{fmt.Sprintf("localhost:%d", valkeyPort)}, + Username: valkeyUsername, + Password: valkeyPassword, + }, + createSpans: func(t *testing.T, ctx context.Context, client valkey.Client) { + assert.NoError(t, client.DoCache(ctx, client.B().Hmget().Key("mk").Field("1", "2").Cache(), time.Minute).Error()) + resp, err := client.DoCache(ctx, client.B().Hmget().Key("mk").Field("1", "2").Cache(), time.Minute).ToArray() + assert.Len(t, resp, 2) + assert.NoError(t, err) + }, + assertSpans: []func(t *testing.T, span mocktracer.Span){ + func(t *testing.T, span mocktracer.Span) { + assert.Equal(t, "valkey.client", span.Tag(ext.ServiceName)) + assert.Equal(t, "localhost", span.Tag(ext.TargetHost)) + assert.Equal(t, valkeyPort, span.Tag(ext.TargetPort)) + assert.Greater(t, span.Tag("db.stmt_size"), 0) + assert.Equal(t, "HMGET\nmk\n1\n2", span.Tag(ext.DBStatement)) + assert.Equal(t, "HMGET\nmk\n1\n2", span.Tag(ext.ResourceName)) + assert.Equal(t, "HMGET", span.Tag("db.operation")) + assert.False(t, span.Tag(ext.ValkeyClientCommandWrite).(bool)) + assert.False(t, span.Tag(ext.ValkeyClientCacheHit).(bool)) + assert.Greater(t, span.Tag(ext.ValkeyClientCacheTTL), int64(0)) + assert.Greater(t, span.Tag(ext.ValkeyClientCachePXAT), int64(0)) + assert.Greater(t, span.Tag(ext.ValkeyClientCachePTTL), int64(0)) + assert.False(t, span.Tag(ext.ValkeyClientCommandStream).(bool)) + assert.False(t, span.Tag(ext.ValkeyClientCommandBlock).(bool)) + assert.False(t, span.Tag(ext.ValkeyClientCommandMulti).(bool)) + assert.Nil(t, span.Tag(ext.DBApplication)) + assert.Nil(t, span.Tag(ext.EventSampleRate)) + assert.Nil(t, span.Tag(ext.Error)) + }, + func(t *testing.T, span mocktracer.Span) { + assert.Equal(t, "valkey.client", span.Tag(ext.ServiceName)) + assert.Equal(t, "localhost", span.Tag(ext.TargetHost)) + assert.Equal(t, valkeyPort, span.Tag(ext.TargetPort)) + assert.Greater(t, span.Tag("db.stmt_size"), 0) + assert.Equal(t, "HMGET\nmk\n1\n2", span.Tag(ext.DBStatement)) + assert.Equal(t, "HMGET\nmk\n1\n2", span.Tag(ext.ResourceName)) + assert.Equal(t, "HMGET", span.Tag("db.operation")) + assert.False(t, span.Tag(ext.ValkeyClientCommandWrite).(bool)) + assert.True(t, span.Tag(ext.ValkeyClientCacheHit).(bool)) + assert.Greater(t, span.Tag(ext.ValkeyClientCacheTTL), int64(0)) + assert.Greater(t, span.Tag(ext.ValkeyClientCachePXAT), int64(0)) + assert.Greater(t, span.Tag(ext.ValkeyClientCachePTTL), int64(0)) + assert.False(t, span.Tag(ext.ValkeyClientCommandStream).(bool)) + assert.False(t, span.Tag(ext.ValkeyClientCommandBlock).(bool)) + assert.False(t, span.Tag(ext.ValkeyClientCommandMulti).(bool)) + assert.Nil(t, span.Tag(ext.DBApplication)) + assert.Nil(t, span.Tag(ext.EventSampleRate)) + assert.Nil(t, span.Tag(ext.Error)) + }, + }, + }, + { + name: "Test GET command with stream", + valkeyClientOptions: valkey.ClientOption{ + InitAddress: []string{fmt.Sprintf("localhost:%d", valkeyPort)}, + Username: valkeyUsername, + Password: valkeyPassword, + }, + createSpans: func(t *testing.T, ctx context.Context, client valkey.Client) { + resp := client.DoStream(ctx, client.B().Get().Key("test_key").Build()) + assert.NoError(t, resp.Error()) + }, + assertSpans: []func(t *testing.T, span mocktracer.Span){ + func(t *testing.T, span mocktracer.Span) { + assert.Equal(t, "valkey.client", span.Tag(ext.ServiceName)) + assert.Equal(t, "localhost", span.Tag(ext.TargetHost)) + assert.Equal(t, valkeyPort, span.Tag(ext.TargetPort)) + assert.Equal(t, "GET\ntest_key", span.Tag(ext.DBStatement)) + assert.Equal(t, "GET\ntest_key", span.Tag(ext.ResourceName)) + assert.Greater(t, span.Tag("db.stmt_size"), 0) + assert.Equal(t, "GET", span.Tag("db.operation")) + assert.False(t, span.Tag(ext.ValkeyClientCommandWrite).(bool)) + assert.True(t, span.Tag(ext.ValkeyClientCommandStream).(bool)) + assert.False(t, span.Tag(ext.ValkeyClientCommandBlock).(bool)) + assert.False(t, span.Tag(ext.ValkeyClientCommandMulti).(bool)) + assert.Nil(t, span.Tag(ext.ValkeyClientCacheHit)) + assert.Nil(t, span.Tag(ext.ValkeyClientCacheTTL)) + assert.Nil(t, span.Tag(ext.ValkeyClientCachePXAT)) + assert.Nil(t, span.Tag(ext.ValkeyClientCachePTTL)) + assert.Nil(t, span.Tag(ext.DBApplication)) + assert.Nil(t, span.Tag(ext.EventSampleRate)) + assert.Nil(t, span.Tag(ext.Error)) + }, + }, + }, + { + name: "Test SUBSCRIBE command with timeout", + valkeyClientOptions: valkey.ClientOption{ + InitAddress: []string{fmt.Sprintf("localhost:%d", valkeyPort)}, + Username: valkeyUsername, + Password: valkeyPassword, + }, + createSpans: func(t *testing.T, ctx context.Context, client valkey.Client) { + ctxWithTimeout, cancel := context.WithTimeout(ctx, time.Millisecond) + assert.Equal(t, + context.DeadlineExceeded, + client.Receive(ctxWithTimeout, client.B().Subscribe().Channel("test_channel").Build(), func(msg valkey.PubSubMessage) {}), + ) + cancel() + }, + assertSpans: []func(t *testing.T, span mocktracer.Span){ + func(t *testing.T, span mocktracer.Span) { + assert.Equal(t, "valkey.client", span.Tag(ext.ServiceName)) + assert.Equal(t, "localhost", span.Tag(ext.TargetHost)) + assert.Equal(t, valkeyPort, span.Tag(ext.TargetPort)) + assert.Greater(t, span.Tag("db.stmt_size"), 0) + assert.Equal(t, "SUBSCRIBE\ntest_channel", span.Tag(ext.DBStatement)) + assert.Equal(t, "SUBSCRIBE\ntest_channel", span.Tag(ext.ResourceName)) + assert.Equal(t, "SUBSCRIBE", span.Tag("db.operation")) + assert.False(t, span.Tag(ext.ValkeyClientCommandWrite).(bool)) + assert.Nil(t, span.Tag(ext.ValkeyClientCacheHit)) + assert.Nil(t, span.Tag(ext.ValkeyClientCacheTTL)) + assert.Nil(t, span.Tag(ext.ValkeyClientCachePXAT)) + assert.Nil(t, span.Tag(ext.ValkeyClientCachePTTL)) + assert.False(t, span.Tag(ext.ValkeyClientCommandStream).(bool)) + assert.False(t, span.Tag(ext.ValkeyClientCommandBlock).(bool)) + assert.False(t, span.Tag(ext.ValkeyClientCommandMulti).(bool)) + assert.Nil(t, span.Tag(ext.DBApplication)) + assert.Nil(t, span.Tag(ext.EventSampleRate)) + assert.Equal(t, context.DeadlineExceeded, span.Tag(ext.Error).(error)) + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + mt := mocktracer.Start() + defer mt.Stop() + client, err := valkeytrace.NewClient(tt.valkeyClientOptions, tt.valkeytraceClientOptions...) + if tt.assertNewClientError == nil { + require.NoErrorf(t, err, tt.name) + } else { + tt.assertNewClientError(t, err) + return + } + tt.createSpans(t, ctx, client) + spans := mt.FinishedSpans() + require.Len(t, spans, len(tt.assertSpans)) + for i, span := range spans { + tt.assertSpans[i](t, span) + // Following assertions are common to all spans + assert.NotNil(t, span) + assert.True(t, span.Tag(ext.ValkeyClientCommandWithPassword).(bool)) + assert.Equal(t, tt.valkeyClientOptions.Username, span.Tag(ext.DBUser)) + assert.Equal(t, "valkey.command", span.OperationName()) + assert.Equal(t, "client", span.Tag(ext.SpanKind)) + assert.Equal(t, ext.SpanTypeValkey, span.Tag(ext.SpanType)) + assert.Equal(t, "valkey-go/valkey", span.Tag(ext.Component)) + assert.Equal(t, "valkey", span.Tag(ext.DBType)) + assert.Equal(t, "valkey", span.Tag(ext.DBSystem)) + } + }) + } + +} diff --git a/ddtrace/ext/app_types.go b/ddtrace/ext/app_types.go index eb6ded8f60..1313c78a76 100644 --- a/ddtrace/ext/app_types.go +++ b/ddtrace/ext/app_types.go @@ -51,6 +51,9 @@ const ( // also have a "redis.raw_command" tag. SpanTypeRedis = "redis" + // SpanTypeRedis marks a span as a Valkey operation. + SpanTypeValkey = "valkey" + // SpanTypeMemcached marks a span as a memcached operation. SpanTypeMemcached = "memcached" diff --git a/ddtrace/ext/db.go b/ddtrace/ext/db.go index c9a046f86d..9ad7bd01c5 100644 --- a/ddtrace/ext/db.go +++ b/ddtrace/ext/db.go @@ -32,6 +32,7 @@ const ( DBSystemOtherSQL = "other_sql" DBSystemElasticsearch = "elasticsearch" DBSystemRedis = "redis" + DBSystemValkey = "valkey" DBSystemMongoDB = "mongodb" DBSystemCassandra = "cassandra" DBSystemConsulKV = "consul" @@ -57,6 +58,45 @@ const ( RedisDatabaseIndex = "db.redis.database_index" ) +// Valkey tags. +const ( + // ValkeyDatabaseIndex specifies the index of the database being connected to. + ValkeyDatabaseIndex = "db.valkey.database_index" + + // ValkeyClientVersion denotes the version of the Valkey client in use. + ValkeyClientVersion = "db.valkey.client.version" + + // ValkeyClientName indicates the name of the Valkey client being used. + ValkeyClientName = "db.valkey.client.name" + + // ValkeyClientCacheHit is the remaining TTL in seconds of client side cache. + ValkeyClientCacheHit = "db.valkey.client.cache.hit" + + // ValkeyClientCacheTTL captures the Time-To-Live (TTL) of a cached entry in the client. + ValkeyClientCacheTTL = "db.valkey.client.cache.ttl" + + // ValkeyClientCachePTTL is the remaining PTTL in seconds of client side cache. + ValkeyClientCachePTTL = "db.valkey.client.cache.pttl" + + // ValkeyClientCachePXAT is the remaining PXAT in seconds of client side cache. + ValkeyClientCachePXAT = "db.valkey.client.cache.pxat" + + // ValkeyClientCommandWrite indicates whether a command involves a write operation. + ValkeyClientCommandWrite = "db.valkey.client.command.write" + + // ValkeyClientCommandBlock indicates whether a command is blocking. + ValkeyClientCommandBlock = "db.valkey.client.command.block" + + // ValkeyClientCommandMulti specifies whether multiple Valkey commands are sent together. + ValkeyClientCommandMulti = "db.valkey.client.command.multi" + + // ValkeyClientCommandStream indicates whether a command uses a dedicated connection to stream responses directly. + ValkeyClientCommandStream = "db.valkey.client.command.stream" + + // ValkeyClientCommandWithPassword indicates whether a command was executed with password authentication. + ValkeyClientCommandWithPassword = "db.valkey.client.command.with_password" +) + // Cassandra tags. const ( // CassandraQuery is the tag name used for cassandra queries. diff --git a/docker-compose.yaml b/docker-compose.yaml index 76680be932..c397a91758 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -41,6 +41,12 @@ services: image: redis:3.2 ports: - "6379:6379" + valkey: + image: valkey/valkey:8 + ports: + - "6380:6380" + # https://valkey.io/topics/acl/ + command: [ "valkey-server", "--port", "6380", "--requirepass", "password-for-default" ] elasticsearch2: image: elasticsearch:2 environment: diff --git a/go.mod b/go.mod index f78b187447..c78d2c02dc 100644 --- a/go.mod +++ b/go.mod @@ -90,6 +90,7 @@ require ( github.com/uptrace/bun v1.1.17 github.com/uptrace/bun/dialect/sqlitedialect v1.1.17 github.com/urfave/negroni v1.0.0 + github.com/valkey-io/valkey-go v1.0.52 github.com/valyala/fasthttp v1.51.0 github.com/vektah/gqlparser/v2 v2.5.16 github.com/zenazn/goji v1.0.1 @@ -294,7 +295,7 @@ require ( go.uber.org/zap v1.27.0 // indirect golang.org/x/arch v0.4.0 // indirect golang.org/x/crypto v0.31.0 // indirect - golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect + golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect golang.org/x/net v0.33.0 // indirect golang.org/x/term v0.27.0 // indirect golang.org/x/text v0.21.0 // indirect diff --git a/go.sum b/go.sum index aed16d7b16..4165930177 100644 --- a/go.sum +++ b/go.sum @@ -1819,8 +1819,8 @@ github.com/onsi/gomega v1.15.0/go.mod h1:cIuvLEne0aoVhAgh/O6ac0Op8WWw9H6eYCriF+t github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs= github.com/onsi/gomega v1.19.0/go.mod h1:LY+I3pBVzYsTBU1AnDwOSxaYi9WoWiqgwooUqq9yPro= -github.com/onsi/gomega v1.20.1 h1:PA/3qinGoukvymdIDV8pii6tiZgC8kbmJO6Z5+b002Q= -github.com/onsi/gomega v1.20.1/go.mod h1:DtrZpjmvpn2mPm4YWQa0/ALMDj9v4YxLgojwPeREyVo= +github.com/onsi/gomega v1.34.1 h1:EUMJIKUjM8sKjYbtxQI9A4z2o+rruxnzNvpknOXie6k= +github.com/onsi/gomega v1.34.1/go.mod h1:kU1QgUvBDLXBJq618Xvm2LUX6rSAfRaFRTcdOeDLwwY= github.com/opencontainers/go-digest v0.0.0-20170106003457-a6d0ee40d420/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s= github.com/opencontainers/go-digest v0.0.0-20180430190053-c9281466c8b2/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s= github.com/opencontainers/go-digest v1.0.0-rc1/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s= @@ -2120,6 +2120,8 @@ github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtX github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/urfave/negroni v1.0.0 h1:kIimOitoypq34K7TG7DUaJ9kq/N4Ofuwi1sjz0KipXc= github.com/urfave/negroni v1.0.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4= +github.com/valkey-io/valkey-go v1.0.52 h1:ojrR736satGucqpllYzal8fUrNNROc11V10zokAyIYg= +github.com/valkey-io/valkey-go v1.0.52/go.mod h1:BXlVAPIL9rFQinSFM+N32JfWzfCaUAqBpZkc4vPY6fM= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasthttp v1.51.0 h1:8b30A5JlZ6C7AS81RsWjYMQmrZG6feChmgAolCl1SqA= @@ -2329,8 +2331,8 @@ golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= golang.org/x/exp v0.0.0-20220827204233-334a2380cb91/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE= -golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g= -golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k= +golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0LeHDbnYEryqj5Q1ug8= +golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY= golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= diff --git a/internal/namingschema/op.go b/internal/namingschema/op.go index 01c90726c1..2d3059facf 100644 --- a/internal/namingschema/op.go +++ b/internal/namingschema/op.go @@ -31,6 +31,7 @@ const ( // cache MemcachedOutbound RedisOutbound + ValkeyOutbound // db ElasticSearchOutbound @@ -75,6 +76,8 @@ func opV1(t IntegrationType) string { return "memcached.command" case RedisOutbound: return "redis.command" + case ValkeyOutbound: + return "valkey.command" // Database case ElasticSearchOutbound: @@ -121,6 +124,8 @@ func opV0(t IntegrationType) string { return "memcached.query" case RedisOutbound: return "redis.command" + case ValkeyOutbound: + return "valkey.command" case ElasticSearchOutbound: return "elasticsearch.query" case MongoDBOutbound: