Skip to content

Commit

Permalink
Support valkey-go tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
keisku committed Jan 12, 2025
1 parent 27624f6 commit d543f6f
Show file tree
Hide file tree
Showing 7 changed files with 391 additions and 5 deletions.
66 changes: 66 additions & 0 deletions contrib/valkey-go/option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

// Package redis provides tracing functions for tracing the go-redis/redis package (https://github.com/go-redis/redis).
// This package supports versions up to go-redis 6.15.
package valkey

import (
"math"

"gopkg.in/DataDog/dd-trace-go.v1/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal/namingschema"
)

const defaultServiceName = "valkey.client"

type clientConfig struct {
serviceName string
spanName string
analyticsRate float64
}

// ClientOption represents an option that can be used to create or wrap a client.
type ClientOption func(*clientConfig)

func defaults(cfg *clientConfig) {
cfg.serviceName = namingschema.ServiceNameOverrideV0(defaultServiceName, defaultServiceName)
cfg.spanName = namingschema.OpName(namingschema.ValkeyOutbound)
if internal.BoolEnv("DD_TRACE_VALKEY_ANALYTICS_ENABLED", false) {
cfg.analyticsRate = 1.0
} else {
cfg.analyticsRate = math.NaN()
}
}

// WithServiceName sets the given service name for the client.
func WithServiceName(name string) ClientOption {
return func(cfg *clientConfig) {
cfg.serviceName = name
}
}

// WithAnalytics enables Trace Analytics for all started spans.
func WithAnalytics(on bool) ClientOption {
return func(cfg *clientConfig) {
if on {
cfg.analyticsRate = 1.0
} else {
cfg.analyticsRate = math.NaN()
}
}
}

// WithAnalyticsRate sets the sampling rate for Trace Analytics events
// correlated to started spans.
func WithAnalyticsRate(rate float64) ClientOption {
return func(cfg *clientConfig) {
if rate >= 0.0 && rate <= 1.0 {
cfg.analyticsRate = rate
} else {
cfg.analyticsRate = math.NaN()
}
}
}
292 changes: 292 additions & 0 deletions contrib/valkey-go/valkey.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,292 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

// Package redis provides tracing functions for tracing the go-redis/redis package (https://github.com/go-redis/redis).
// This package supports versions up to go-redis 6.15.
package valkey

import (
"bytes"
"context"
"math"
"net"
"strconv"
"time"

"github.com/valkey-io/valkey-go"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
"gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry"
)

const componentName = "valkey-go/valkey"

func init() {
telemetry.LoadIntegration(componentName)
tracer.MarkIntegrationImported("github.com/valkey/valkey-go")
}

var (
_ valkey.CoreClient = (*coreClient)(nil)
_ valkey.Client = (*client)(nil)
_ valkey.DedicatedClient = (*dedicatedClient)(nil)
)

type coreClient struct {
valkey.Client
option valkey.ClientOption
clientConfig clientConfig
host string
port int
}

type client struct {
coreClient
}

type dedicatedClient struct {
coreClient
dedicatedClient valkey.DedicatedClient
}

func NewClient(option valkey.ClientOption, opts ...ClientOption) (valkey.Client, error) {
valkeyClient, err := valkey.NewClient(option)
if err != nil {
return nil, err
}
var cfg clientConfig
defaults(&cfg)
for _, fn := range opts {
fn(&cfg)
}
var host, portStr string
var port int
if len(option.InitAddress) == 1 {
host, portStr, err = net.SplitHostPort(option.InitAddress[0])
if err != nil {
log.Debug("failed to split host and port: %s", err)
}
port, _ = strconv.Atoi(portStr)
}
core := coreClient{
Client: valkeyClient,
option: option,
clientConfig: cfg,
host: host,
port: port,
}
return &client{
coreClient: core,
}, nil
}

type commander interface {
Commands() []string
}

func processCmd(commander commander) (command, statement string, size int) {
var b bytes.Buffer
for i, cmd := range commander.Commands() {
if i == 0 {
command = cmd
}
b.WriteString(cmd)
b.WriteString("\n")
}
return command, b.String(), b.Len()
}

func processMultiCmds(multi []commander) (command, statement string, size int) {
var commandBuilder bytes.Buffer
var statementBuilder bytes.Buffer
for i, cmd := range multi {
cmdStr, statement, cmdSize := processCmd(cmd)
size += cmdSize
if i > 0 {
commandBuilder.WriteString(" ")
statementBuilder.WriteString(" ")
}
commandBuilder.WriteString(cmdStr)
statementBuilder.WriteString(statement)
}
return commandBuilder.String(), statementBuilder.String(), size
}

func processMultiCompleted(multi ...valkey.Completed) (command, statement string, size int) {
cmds := make([]commander, len(multi))
for i, cmd := range multi {
cmds[i] = &cmd
}
return processMultiCmds(cmds)
}

func processMultiCacheableTTL(multi ...valkey.CacheableTTL) (command, statement string, size int) {
cmds := make([]commander, len(multi))
for i, cmd := range multi {
cmds[i] = &cmd.Cmd
}
return processMultiCmds(cmds)
}

func firstError(s []valkey.ValkeyResult) error {
for _, result := range s {
if err := result.Error(); err != nil && !valkey.IsValkeyNil(err) {
return err
}
}
return nil
}

func setClientCacheTags(s tracer.Span, result valkey.ValkeyResult) {
s.SetTag(ext.ValkeyClientCacheHit, result.IsCacheHit())
s.SetTag(ext.ValkeyClientCacheTTL, result.CacheTTL())
s.SetTag(ext.ValkeyClientCachePTTL, result.CachePTTL())
s.SetTag(ext.ValkeyClientCachePXAT, result.CachePXAT())
}

// TODO: Consider addiing skipRaw option.
func (c *coreClient) buildStartSpanOptions(command, statement string, size int) []tracer.StartSpanOption {
opts := []tracer.StartSpanOption{
tracer.SpanType(ext.SpanTypeValkey),
tracer.ServiceName(c.clientConfig.serviceName),
tracer.ResourceName(statement),
tracer.Tag(ext.TargetHost, c.host),
tracer.Tag(ext.TargetPort, c.port),
tracer.Tag(ext.ValkeyClientVersion, valkey.LibVer),
tracer.Tag(ext.ValkeyClientName, valkey.LibName),
tracer.Tag(ext.Component, componentName),
tracer.Tag(ext.SpanKind, ext.SpanKindClient),
tracer.Tag(ext.DBType, ext.DBSystemValkey),
tracer.Tag(ext.DBSystem, ext.DBSystemValkey),
tracer.Tag(ext.ValkeyDatabaseIndex, c.option.SelectDB),
}
if command != "" {
opts = append(opts, []tracer.StartSpanOption{
tracer.Tag(ext.DBStatement, statement),
// valkeyotel tags
tracer.Tag("db.stmt_size", size),
tracer.Tag("db.operation", command),
}...)
}
if c.option.ClientName != "" {
opts = append(opts, tracer.Tag(ext.DBApplication, c.option.ClientName))
}
if c.option.Username != "" {
opts = append(opts, tracer.Tag(ext.DBUser, c.option.Username))
}
if !math.IsNaN(c.clientConfig.analyticsRate) {
opts = append(opts, tracer.Tag(ext.EventSampleRate, c.clientConfig.analyticsRate))
}
return opts
}

func (c *coreClient) Do(ctx context.Context, cmd valkey.Completed) (resp valkey.ValkeyResult) {
command, statement, size := processCmd(&cmd)
opts := append(c.buildStartSpanOptions(command, statement, size), []tracer.StartSpanOption{
tracer.Tag(ext.ValkeyClientCommandWrite, cmd.IsWrite()),
tracer.Tag(ext.ValkeyClientCommandBlock, cmd.IsBlock()),
}...)
span, ctx := tracer.StartSpanFromContext(ctx, c.clientConfig.spanName, opts...)
resp = c.Client.Do(ctx, cmd)
setClientCacheTags(span, resp)
defer span.Finish(tracer.WithError(resp.Error()))
return resp
}

func (c *coreClient) DoMulti(ctx context.Context, multi ...valkey.Completed) (resp []valkey.ValkeyResult) {
command, statement, size := processMultiCompleted(multi...)
span, ctx := tracer.StartSpanFromContext(ctx, c.clientConfig.spanName, c.buildStartSpanOptions(command, statement, size)...)
resp = c.Client.DoMulti(ctx, multi...)
defer span.Finish(tracer.WithError(firstError(resp)))
return resp
}

func (c *coreClient) Receive(ctx context.Context, subscribe valkey.Completed, fn func(msg valkey.PubSubMessage)) (err error) {
command, statement, size := processCmd(&subscribe)
span, ctx := tracer.StartSpanFromContext(ctx, c.clientConfig.spanName, c.buildStartSpanOptions(command, statement, size)...)
err = c.Client.Receive(ctx, subscribe, fn)
defer span.Finish(tracer.WithError(err))
return err
}

func (c *client) DoCache(ctx context.Context, cmd valkey.Cacheable, ttl time.Duration) (resp valkey.ValkeyResult) {
command, statement, size := processCmd(&cmd)
span, ctx := tracer.StartSpanFromContext(ctx, c.clientConfig.spanName, c.buildStartSpanOptions(command, statement, size)...)
resp = c.Client.DoCache(ctx, cmd, ttl)
setClientCacheTags(span, resp)
defer span.Finish(tracer.WithError(resp.Error()))
return resp
}

func (c *client) DoMultiCache(ctx context.Context, multi ...valkey.CacheableTTL) (resp []valkey.ValkeyResult) {
command, statement, size := processMultiCacheableTTL(multi...)
span, ctx := tracer.StartSpanFromContext(ctx, c.clientConfig.spanName, c.buildStartSpanOptions(command, statement, size)...)
resp = c.Client.DoMultiCache(ctx, multi...)
defer span.Finish(tracer.WithError(firstError(resp)))
return resp
}

func (c *client) DoStream(ctx context.Context, cmd valkey.Completed) (resp valkey.ValkeyResultStream) {
command, statement, size := processCmd(&cmd)
opts := append(c.buildStartSpanOptions(command, statement, size), []tracer.StartSpanOption{
tracer.Tag(ext.ValkeyClientCommandWrite, cmd.IsWrite()),
tracer.Tag(ext.ValkeyClientCommandBlock, cmd.IsBlock()),
}...)
span, ctx := tracer.StartSpanFromContext(ctx, c.clientConfig.spanName, opts...)
resp = c.Client.DoStream(ctx, cmd)
defer span.Finish(tracer.WithError(resp.Error()))
return resp
}

func (c *client) DoMultiStream(ctx context.Context, multi ...valkey.Completed) (resp valkey.MultiValkeyResultStream) {
command, statement, size := processMultiCompleted(multi...)
span, ctx := tracer.StartSpanFromContext(ctx, c.clientConfig.spanName, c.buildStartSpanOptions(command, statement, size)...)
resp = c.Client.DoMultiStream(ctx, multi...)
defer span.Finish(tracer.WithError(resp.Error()))
return resp
}

func (c *client) Dedicated(fn func(valkey.DedicatedClient) error) (err error) {
return c.Client.Dedicated(func(dc valkey.DedicatedClient) error {
return fn(&dedicatedClient{
coreClient: c.coreClient,
dedicatedClient: dc,
})
})
}

func (c *client) Dedicate() (client valkey.DedicatedClient, cancel func()) {
dedicated, cancel := c.coreClient.Client.Dedicate()
return &dedicatedClient{
coreClient: c.coreClient,
dedicatedClient: dedicated,
}, cancel
}

func (c *client) Nodes() map[string]valkey.Client {
nodes := c.Client.Nodes()
for addr, valkeyClient := range nodes {
host, portStr, err := net.SplitHostPort(addr)
if err != nil {
log.Debug("failed to split host and port: %s", err)
}
port, _ := strconv.Atoi(portStr)
nodes[addr] = &client{
coreClient: coreClient{
Client: valkeyClient,
option: c.option,
clientConfig: c.clientConfig,
host: host,
port: port,
},
}
}
return nodes
}

func (c *dedicatedClient) SetPubSubHooks(hooks valkey.PubSubHooks) <-chan error {
return c.dedicatedClient.SetPubSubHooks(hooks)
}
3 changes: 3 additions & 0 deletions ddtrace/ext/app_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ const (
// also have a "redis.raw_command" tag.
SpanTypeRedis = "redis"

// SpanTypeRedis marks a span as a Valkey operation.
SpanTypeValkey = "valkey"

// SpanTypeMemcached marks a span as a memcached operation.
SpanTypeMemcached = "memcached"

Expand Down
Loading

0 comments on commit d543f6f

Please sign in to comment.