Skip to content

Commit

Permalink
Optionally add attributes to traces (#629)
Browse files Browse the repository at this point in the history
feat(ARCO-276): Refactor start and end tracing functions & optionally add attributes to traces
  • Loading branch information
boecklim authored Nov 4, 2024
1 parent da42f1d commit dd704e1
Show file tree
Hide file tree
Showing 38 changed files with 332 additions and 336 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ run_e2e_tests:
run_e2e_tests_with_tracing:
docker compose -f test/docker-compose.yaml down --remove-orphans
docker compose -f test/docker-compose.yaml up --abort-on-container-exit migrate-blocktx migrate-metamorph migrate-callbacker
ARC_TRACING_DIALADDR=http://arc-jaeger:4317 docker compose -f test/docker-compose.yaml up --build --exit-code-from tests tests arc-blocktx arc-callbacker arc-metamorph arc arc-jaeger --scale arc-blocktx=4 --scale arc-metamorph=2
ARC_TRACING_ENABLED=TRUE docker compose -f test/docker-compose.yaml up --build --exit-code-from tests tests arc-blocktx arc-callbacker arc-metamorph arc arc-jaeger --scale arc-blocktx=4 --scale arc-metamorph=2
docker compose -f test/docker-compose.yaml down

.PHONY: test
Expand Down
4 changes: 2 additions & 2 deletions cmd/arc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

cmd "github.com/bitcoin-sv/arc/cmd/arc/services"
"github.com/bitcoin-sv/arc/config"
"github.com/bitcoin-sv/arc/internal/logger"
arcLogger "github.com/bitcoin-sv/arc/internal/logger"
"github.com/bitcoin-sv/arc/internal/version"
)

Expand All @@ -40,7 +40,7 @@ func run() error {
return config.DumpConfig(dumpConfigFile)
}

logger, err := logger.NewLogger(arcConfig.LogLevel, arcConfig.LogFormat)
logger, err := arcLogger.NewLogger(arcConfig.LogLevel, arcConfig.LogFormat)
if err != nil {
return fmt.Errorf("failed to create logger: %v", err)
}
Expand Down
13 changes: 5 additions & 8 deletions cmd/arc/services/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,22 +58,19 @@ func StartAPIServer(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), e

shutdownFns := make([]func(), 0)

tracingEnabled := false
if arcConfig.Tracing != nil && arcConfig.Tracing.DialAddr != "" {
if arcConfig.IsTracingEnabled() {
cleanup, err := tracing.Enable(logger, "api", arcConfig.Tracing.DialAddr)
if err != nil {
logger.Error("failed to enable tracing", slog.String("err", err.Error()))
} else {
shutdownFns = append(shutdownFns, cleanup)
}

tracingEnabled = true

mtmOpts = append(mtmOpts, metamorph.WithTracer())
apiOpts = append(apiOpts, handler.WithTracer())
mtmOpts = append(mtmOpts, metamorph.WithTracer(arcConfig.Tracing.KeyValueAttributes...))
apiOpts = append(apiOpts, handler.WithTracer(arcConfig.Tracing.KeyValueAttributes...))
}

conn, err := metamorph.DialGRPC(arcConfig.Metamorph.DialAddr, arcConfig.PrometheusEndpoint, arcConfig.GrpcMessageSize, tracingEnabled)
conn, err := metamorph.DialGRPC(arcConfig.Metamorph.DialAddr, arcConfig.PrometheusEndpoint, arcConfig.GrpcMessageSize, arcConfig.Tracing)
if err != nil {
return nil, fmt.Errorf("failed to connect to metamorph server: %v", err)
}
Expand All @@ -83,7 +80,7 @@ func StartAPIServer(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), e
mtmOpts...,
)

btcConn, err := blocktx.DialGRPC(arcConfig.Blocktx.DialAddr, arcConfig.PrometheusEndpoint, arcConfig.GrpcMessageSize, tracingEnabled)
btcConn, err := blocktx.DialGRPC(arcConfig.Blocktx.DialAddr, arcConfig.PrometheusEndpoint, arcConfig.GrpcMessageSize, arcConfig.Tracing)
if err != nil {
return nil, fmt.Errorf("failed to connect to blocktx server: %v", err)
}
Expand Down
14 changes: 7 additions & 7 deletions cmd/arc/services/blocktx.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err
shutdownFns := make([]func(), 0)

tracingEnabled := false
if arcConfig.Tracing != nil && arcConfig.Tracing.DialAddr != "" {
if arcConfig.IsTracingEnabled() {
cleanup, err := tracing.Enable(logger, "blocktx", arcConfig.Tracing.DialAddr)
if err != nil {
logger.Error("failed to enable tracing", slog.String("err", err.Error()))
Expand All @@ -68,7 +68,7 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err
return nil, err
}

blockStore, err = NewBlocktxStore(logger, btxConfig.Db, tracingEnabled)
blockStore, err = NewBlocktxStore(logger, btxConfig.Db, arcConfig.Tracing)
if err != nil {
return nil, fmt.Errorf("failed to create blocktx store: %v", err)
}
Expand Down Expand Up @@ -108,7 +108,7 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err
blocktx.WithMessageQueueClient(mqClient),
}
if tracingEnabled {
processorOpts = append(processorOpts, blocktx.WithTracer())
processorOpts = append(processorOpts, blocktx.WithTracer(arcConfig.Tracing.KeyValueAttributes...))
}

blockRequestCh := make(chan blocktx.BlockRequest, blockProcessingBuffer)
Expand Down Expand Up @@ -171,7 +171,7 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err
workers.StartFillGaps(peers, btxConfig.FillGapsInterval, btxConfig.RecordRetentionDays, blockRequestCh)

server, err = blocktx.NewServer(arcConfig.PrometheusEndpoint, arcConfig.GrpcMessageSize, logger,
blockStore, pm, btxConfig.MaxAllowedBlockHeightMismatch, tracingEnabled)
blockStore, pm, btxConfig.MaxAllowedBlockHeightMismatch, arcConfig.Tracing)
if err != nil {
stopFn()
return nil, fmt.Errorf("create GRPCServer failed: %v", err)
Expand All @@ -192,7 +192,7 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err
return stopFn, nil
}

func NewBlocktxStore(logger *slog.Logger, dbConfig *config.DbConfig, tracingEnabled bool) (s store.BlocktxStore, err error) {
func NewBlocktxStore(logger *slog.Logger, dbConfig *config.DbConfig, tracingConfig *config.TracingConfig) (s store.BlocktxStore, err error) {
switch dbConfig.Mode {
case DbModePostgres:
postgres := dbConfig.Postgres
Expand All @@ -208,8 +208,8 @@ func NewBlocktxStore(logger *slog.Logger, dbConfig *config.DbConfig, tracingEnab
)

var postgresOpts []func(handler *postgresql.PostgreSQL)
if tracingEnabled {
postgresOpts = append(postgresOpts, postgresql.WithTracer())
if tracingConfig != nil && tracingConfig.IsEnabled() {
postgresOpts = append(postgresOpts, postgresql.WithTracer(tracingConfig.KeyValueAttributes...))
}

s, err = postgresql.New(dbInfo, postgres.MaxIdleConns, postgres.MaxOpenConns, postgresOpts...)
Expand Down
2 changes: 1 addition & 1 deletion cmd/arc/services/callbacker.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func StartCallbacker(logger *slog.Logger, appConfig *config.ArcConfig) (func(),
workers.StartCallbackStoreCleanup(config.PruneInterval, config.PruneOlderThan)
workers.StartQuarantineCallbacksDispatch(config.QuarantineCheckInterval)

server, err = callbacker.NewServer(appConfig.PrometheusEndpoint, appConfig.GrpcMessageSize, logger, dispatcher, false)
server, err = callbacker.NewServer(appConfig.PrometheusEndpoint, appConfig.GrpcMessageSize, logger, dispatcher, nil)
if err != nil {
stopFn()
return nil, fmt.Errorf("create GRPCServer failed: %v", err)
Expand Down
4 changes: 2 additions & 2 deletions cmd/arc/services/k8s_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
func StartK8sWatcher(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), error) {
logger.With(slog.String("service", "k8s-watcher"))

mtmConn, err := metamorph.DialGRPC(arcConfig.Metamorph.DialAddr, arcConfig.PrometheusEndpoint, arcConfig.GrpcMessageSize, false)
mtmConn, err := metamorph.DialGRPC(arcConfig.Metamorph.DialAddr, arcConfig.PrometheusEndpoint, arcConfig.GrpcMessageSize, nil)
if err != nil {
return nil, fmt.Errorf("failed to connect to metamorph server: %v", err)
}
Expand All @@ -28,7 +28,7 @@ func StartK8sWatcher(logger *slog.Logger, arcConfig *config.ArcConfig) (func(),
return nil, fmt.Errorf("failed to get k8s-client: %v", err)
}

blocktxConn, err := blocktx.DialGRPC(arcConfig.Blocktx.DialAddr, arcConfig.PrometheusEndpoint, arcConfig.GrpcMessageSize, false)
blocktxConn, err := blocktx.DialGRPC(arcConfig.Blocktx.DialAddr, arcConfig.PrometheusEndpoint, arcConfig.GrpcMessageSize, nil)
if err != nil {
return nil, fmt.Errorf("failed to connect to block-tx server: %v", err)
}
Expand Down
24 changes: 10 additions & 14 deletions cmd/arc/services/metamorph.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,15 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore
optsServer := make([]metamorph.ServerOption, 0)
processorOpts := make([]metamorph.Option, 0)

tracingEnabled := false
if arcConfig.Tracing != nil && arcConfig.Tracing.DialAddr != "" {
if arcConfig.IsTracingEnabled() {
cleanup, err := tracing.Enable(logger, "metamorph", arcConfig.Tracing.DialAddr)
if err != nil {
logger.Error("failed to enable tracing", slog.String("err", err.Error()))
} else {
shutdownFns = append(shutdownFns, cleanup)
}

tracingEnabled = true

optsServer = append(optsServer, metamorph.WithTracer())
processorOpts = append(processorOpts, metamorph.WithProcessorTracer())
optsServer = append(optsServer, metamorph.WithTracer(arcConfig.Tracing.KeyValueAttributes...))
}

stopFn := func() {
Expand All @@ -81,7 +77,7 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore
logger.Info("Shutdown complete")
}

metamorphStore, err = NewMetamorphStore(mtmConfig.Db, tracingEnabled)
metamorphStore, err = NewMetamorphStore(mtmConfig.Db, arcConfig.Tracing)
if err != nil {
return nil, fmt.Errorf("failed to create metamorph store: %v", err)
}
Expand Down Expand Up @@ -122,7 +118,7 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore

procLogger := logger.With(slog.String("module", "mtm-proc"))

callbackerConn, err := initGrpcCallbackerConn(arcConfig.Callbacker.DialAddr, arcConfig.PrometheusEndpoint, arcConfig.GrpcMessageSize, tracingEnabled)
callbackerConn, err := initGrpcCallbackerConn(arcConfig.Callbacker.DialAddr, arcConfig.PrometheusEndpoint, arcConfig.GrpcMessageSize, arcConfig.Tracing)
if err != nil {
stopFn()
return nil, fmt.Errorf("failed to create callbacker client: %v", err)
Expand Down Expand Up @@ -179,7 +175,7 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore
}

server, err = metamorph.NewServer(arcConfig.PrometheusEndpoint, arcConfig.GrpcMessageSize, logger,
metamorphStore, processor, optsServer...)
metamorphStore, processor, arcConfig.Tracing, optsServer...)

if err != nil {
stopFn()
Expand Down Expand Up @@ -230,7 +226,7 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore
return stopFn, nil
}

func NewMetamorphStore(dbConfig *config.DbConfig, tracingEnabled bool) (s store.MetamorphStore, err error) {
func NewMetamorphStore(dbConfig *config.DbConfig, tracingConfig *config.TracingConfig) (s store.MetamorphStore, err error) {
hostname, err := os.Hostname()
if err != nil {
return nil, err
Expand All @@ -246,8 +242,8 @@ func NewMetamorphStore(dbConfig *config.DbConfig, tracingEnabled bool) (s store.
)

opts := make([]func(postgreSQL *postgresql.PostgreSQL), 0)
if tracingEnabled {
opts = append(opts, postgresql.WithTracing())
if tracingConfig != nil && tracingConfig.IsEnabled() {
opts = append(opts, postgresql.WithTracing(tracingConfig.KeyValueAttributes))
}

s, err = postgresql.New(dbInfo, hostname, postgres.MaxIdleConns, postgres.MaxOpenConns, opts...)
Expand Down Expand Up @@ -307,8 +303,8 @@ func initPeerManager(logger *slog.Logger, s store.MetamorphStore, arcConfig *con
return pm, peerHandler, messageCh, nil
}

func initGrpcCallbackerConn(address, prometheusEndpoint string, grpcMsgSize int, tracingEnabled bool) (callbacker_api.CallbackerAPIClient, error) {
dialOpts, err := grpc_opts.GetGRPCClientOpts(prometheusEndpoint, grpcMsgSize, tracingEnabled)
func initGrpcCallbackerConn(address, prometheusEndpoint string, grpcMsgSize int, tracingConfig *config.TracingConfig) (callbacker_api.CallbackerAPIClient, error) {
dialOpts, err := grpc_opts.GetGRPCClientOpts(prometheusEndpoint, grpcMsgSize, tracingConfig)
if err != nil {
return nil, err
}
Expand Down
14 changes: 13 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"time"

"github.com/ordishs/go-bitcoin"
"go.opentelemetry.io/otel/attribute"
)

const (
Expand Down Expand Up @@ -42,7 +43,18 @@ type MessageQueueStreaming struct {
}

type TracingConfig struct {
DialAddr string `mapstructure:"dialAddr"`
Enabled bool `mapstructure:"enabled"`
DialAddr string `mapstructure:"dialAddr"`
Attributes map[string]string `mapstructure:"attributes"`
KeyValueAttributes []attribute.KeyValue
}

func (a *ArcConfig) IsTracingEnabled() bool {
return a.Tracing != nil && a.Tracing.IsEnabled()
}

func (t *TracingConfig) IsEnabled() bool {
return t.Enabled && t.DialAddr != ""
}

type PeerRPCConfig struct {
Expand Down
1 change: 1 addition & 0 deletions config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,5 +202,6 @@ func getCacheConfig() *CacheConfig {
func getDefaultTracingConfig() *TracingConfig {
return &TracingConfig{
DialAddr: "", // optional
Enabled: false,
}
}
4 changes: 4 additions & 0 deletions config/example_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ messageQueue:
fileStorage: false # if enabled messages are persisted on disk instead of in memory
URL: nats://nats:4222 # URL for message queue
tracing: # (optional)
enabled: true # if true, then tracing is enabled
dialAddr: http://localhost:4317 # address where traces are exported to
attributes:
key1: value1
key2: 100

peerRpc: # rpc configuration for bitcoin node
password: bitcoin
Expand Down
21 changes: 18 additions & 3 deletions config/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package config
import (
"errors"
"fmt"
"github.com/mitchellh/mapstructure"
"github.com/spf13/viper"
"os"
"strings"

"github.com/mitchellh/mapstructure"
"github.com/spf13/viper"
"go.opentelemetry.io/otel/attribute"
)

var (
Expand All @@ -33,7 +35,20 @@ func Load(configFileDirs ...string) (*ArcConfig, error) {

err = viper.Unmarshal(arcConfig)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to unmarshal config: %v", err)
}

if arcConfig.Tracing != nil {
tracingAttributes := make([]attribute.KeyValue, len(arcConfig.Tracing.Attributes))
index := 0
for key, value := range arcConfig.Tracing.Attributes {
tracingAttributes[index] = attribute.String(key, value)
index++
}

if len(tracingAttributes) > 0 {
arcConfig.Tracing.KeyValueAttributes = tracingAttributes
}
}

return arcConfig, nil
Expand Down
4 changes: 2 additions & 2 deletions examples/custom/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,15 @@ func main() {
}

// add a single metamorph, with the BlockTx client we want to use
conn, err := metamorph.DialGRPC("localhost:8011", "", arcConfig.GrpcMessageSize, false)
conn, err := metamorph.DialGRPC("localhost:8011", "", arcConfig.GrpcMessageSize, nil)
if err != nil {
panic(err)
}

metamorphClient := metamorph.NewClient(metamorph_api.NewMetaMorphAPIClient(conn))

// add blocktx as MerkleRootsVerifier
btcConn, err := blocktx.DialGRPC("localhost:8011", "", arcConfig.GrpcMessageSize, false)
btcConn, err := blocktx.DialGRPC("localhost:8011", "", arcConfig.GrpcMessageSize, nil)
if err != nil {
panic(err)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/blocktx/health_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestCheck(t *testing.T) {
}}
}}

sut, err := blocktx.NewServer("", 0, logger, storeMock, pm, 0, false)
sut, err := blocktx.NewServer("", 0, logger, storeMock, pm, 0, nil)
require.NoError(t, err)
defer sut.GracefulStop()

Expand Down Expand Up @@ -151,7 +151,7 @@ func TestWatch(t *testing.T) {
},
}

sut, err := blocktx.NewServer("", 0, logger, storeMock, pm, 0, false)
sut, err := blocktx.NewServer("", 0, logger, storeMock, pm, 0, nil)
require.NoError(t, err)
defer sut.GracefulStop()

Expand Down
Loading

0 comments on commit dd704e1

Please sign in to comment.