Skip to content

Commit

Permalink
feat(ARCO-276): Add traces & attributes (#633)
Browse files Browse the repository at this point in the history
  • Loading branch information
boecklim authored Nov 7, 2024
1 parent d6b6d0f commit 753164f
Show file tree
Hide file tree
Showing 21 changed files with 279 additions and 145 deletions.
3 changes: 2 additions & 1 deletion cmd/arc/services/metamorph.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore
}

optsServer = append(optsServer, metamorph.WithTracer(arcConfig.Tracing.KeyValueAttributes...))
callbackerOpts = append(callbackerOpts, metamorph.WithCallbackerTracer(arcConfig.Tracing.KeyValueAttributes...))
callbackerOpts = append(callbackerOpts, metamorph.WithTracerCallbacker(arcConfig.Tracing.KeyValueAttributes...))
processorOpts = append(processorOpts, metamorph.WithTracerProcessor(arcConfig.Tracing.KeyValueAttributes...))
}

stopFn := func() {
Expand Down
5 changes: 5 additions & 0 deletions internal/blocktx/processor_opts.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package blocktx

import (
"runtime"
"time"

"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -66,5 +67,9 @@ func WithTracer(attr ...attribute.KeyValue) func(s *Processor) {
if len(attr) > 0 {
p.tracingAttributes = append(p.tracingAttributes, attr...)
}
_, file, _, ok := runtime.Caller(1)
if ok {
p.tracingAttributes = append(p.tracingAttributes, attribute.String("file", file))
}
}
}
11 changes: 8 additions & 3 deletions internal/blocktx/store/postgresql/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"errors"
"runtime"
"time"

_ "github.com/lib/pq" // nolint: revive // required for postgres driver
Expand Down Expand Up @@ -32,10 +33,14 @@ func WithNow(nowFunc func() time.Time) func(*PostgreSQL) {
}

func WithTracer(attr ...attribute.KeyValue) func(s *PostgreSQL) {
return func(m *PostgreSQL) {
m.tracingEnabled = true
return func(p *PostgreSQL) {
p.tracingEnabled = true
if len(attr) > 0 {
m.tracingAttributes = append(m.tracingAttributes, attr...)
p.tracingAttributes = append(p.tracingAttributes, attr...)
}
_, file, _, ok := runtime.Caller(1)
if ok {
p.tracingAttributes = append(p.tracingAttributes, attribute.String("file", file))
}
}
}
Expand Down
7 changes: 6 additions & 1 deletion internal/metamorph/grpc_callbacker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package metamorph
import (
"context"
"log/slog"
"runtime"

"go.opentelemetry.io/otel/attribute"

Expand All @@ -19,12 +20,16 @@ type GrpcCallbacker struct {
tracingAttributes []attribute.KeyValue
}

func WithCallbackerTracer(attr ...attribute.KeyValue) func(*GrpcCallbacker) {
func WithTracerCallbacker(attr ...attribute.KeyValue) func(*GrpcCallbacker) {
return func(p *GrpcCallbacker) {
p.tracingEnabled = true
if len(attr) > 0 {
p.tracingAttributes = append(p.tracingAttributes, attr...)
}
_, file, _, ok := runtime.Caller(1)
if ok {
p.tracingAttributes = append(p.tracingAttributes, attribute.String("file", file))
}
}
}

Expand Down
12 changes: 11 additions & 1 deletion internal/metamorph/processor_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package metamorph

import (
"log/slog"
"runtime"
"time"

"go.opentelemetry.io/otel/attribute"

"github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api"
"github.com/bitcoin-sv/arc/internal/metamorph/metamorph_api"
)
Expand Down Expand Up @@ -134,8 +137,15 @@ func WithMinimumHealthyConnections(minimumHealthyConnections int) func(*Processo
}
}

func WithProcessorTracer() func(*Processor) {
func WithTracerProcessor(attr ...attribute.KeyValue) func(*Processor) {
return func(p *Processor) {
p.tracingEnabled = true
if len(attr) > 0 {
p.tracingAttributes = append(p.tracingAttributes, attr...)
}
_, file, _, ok := runtime.Caller(1)
if ok {
p.tracingAttributes = append(p.tracingAttributes, attribute.String("file", file))
}
}
}
5 changes: 5 additions & 0 deletions internal/metamorph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/hex"
"errors"
"log/slog"
"runtime"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -81,6 +82,10 @@ func WithTracer(attr ...attribute.KeyValue) func(s *Server) {
if len(attr) > 0 {
s.tracingAttributes = append(s.tracingAttributes, attr...)
}
_, file, _, ok := runtime.Caller(1)
if ok {
s.tracingAttributes = append(s.tracingAttributes, attribute.String("file", file))
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions internal/metamorph/store/postgresql/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,9 @@ func (p *PostgreSQL) GetRawTxs(ctx context.Context, hashes [][]byte) ([][]byte,
}

func (p *PostgreSQL) GetMany(ctx context.Context, keys [][]byte) ([]*store.Data, error) {
ctx, span := tracing.StartTracing(ctx, "GetMany", p.tracingEnabled, p.tracingAttributes...)
defer tracing.EndTracing(span)

const q = `
SELECT
stored_at
Expand Down
25 changes: 13 additions & 12 deletions internal/tracing/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,23 @@ import (
)

func StartTracing(ctx context.Context, spanName string, tracingEnabled bool, attributes ...attribute.KeyValue) (context.Context, trace.Span) {
if tracingEnabled {
var span trace.Span
tracer := otel.Tracer("")
if tracer == nil {
return ctx, nil
}
if !tracingEnabled {
return ctx, nil
}

if len(attributes) > 0 {
ctx, span = tracer.Start(ctx, spanName, trace.WithAttributes(attributes...))
return ctx, span
}
var span trace.Span
tracer := otel.Tracer("")
if tracer == nil {
return ctx, nil
}

ctx, span = tracer.Start(ctx, spanName)
if len(attributes) > 0 {
ctx, span = tracer.Start(ctx, spanName, trace.WithAttributes(attributes...))
return ctx, span
}
return ctx, nil

ctx, span = tracer.Start(ctx, spanName)
return ctx, span
}

func EndTracing(span trace.Span) {
Expand Down
22 changes: 15 additions & 7 deletions internal/validator/default/default_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ import (
"errors"
"fmt"

sdkTx "github.com/bitcoin-sv/go-sdk/transaction"
"github.com/ordishs/go-bitcoin"
"go.opentelemetry.io/otel/attribute"

"github.com/bitcoin-sv/arc/internal/fees"
"github.com/bitcoin-sv/arc/internal/tracing"
"github.com/bitcoin-sv/arc/internal/validator"
"github.com/bitcoin-sv/arc/pkg/api"
sdkTx "github.com/bitcoin-sv/go-sdk/transaction"
"github.com/ordishs/go-bitcoin"
)

var (
Expand All @@ -28,11 +31,14 @@ func New(policy *bitcoin.Settings, finder validator.TxFinderI) *DefaultValidator
}
}

func (v *DefaultValidator) ValidateTransaction(ctx context.Context, tx *sdkTx.Transaction, feeValidation validator.FeeValidation, scriptValidation validator.ScriptValidation) error { //nolint:funlen - mostly comments
func (v *DefaultValidator) ValidateTransaction(ctx context.Context, tx *sdkTx.Transaction, feeValidation validator.FeeValidation, scriptValidation validator.ScriptValidation, tracingEnabled bool, tracingAttributes ...attribute.KeyValue) error { //nolint:funlen - mostly comments
ctx, span := tracing.StartTracing(ctx, "ValidateTransaction", tracingEnabled, tracingAttributes...)
defer tracing.EndTracing(span)

// 0) Check whether we have a complete transaction in extended format, with all input information
// we cannot check the satoshi input, OP_RETURN is allowed 0 satoshis
if needsExtension(tx, feeValidation, scriptValidation) {
err := extendTx(ctx, v.txFinder, tx)
err := extendTx(ctx, v.txFinder, tx, tracingEnabled, tracingAttributes...)
if err != nil {
return validator.NewError(err, api.ErrStatusTxFormat)
}
Expand All @@ -52,7 +58,7 @@ func (v *DefaultValidator) ValidateTransaction(ctx context.Context, tx *sdkTx.Tr
return err
}
case validator.CumulativeFeeValidation:
if err := cumulativeCheckFees(ctx, v.txFinder, tx, api.FeesToFeeModel(v.policy.MinMiningTxFee)); err != nil {
if err := cumulativeCheckFees(ctx, v.txFinder, tx, api.FeesToFeeModel(v.policy.MinMiningTxFee), tracingEnabled, tracingAttributes...); err != nil {
return err
}
case validator.NoneFeeValidation:
Expand Down Expand Up @@ -108,8 +114,10 @@ func standardCheckFees(tx *sdkTx.Transaction, feeModel sdkTx.FeeModel) *validato
return nil
}

func cumulativeCheckFees(ctx context.Context, txFinder validator.TxFinderI, tx *sdkTx.Transaction, feeModel *fees.SatoshisPerKilobyte) *validator.Error {
txSet, err := getUnminedAncestors(ctx, txFinder, tx)
func cumulativeCheckFees(ctx context.Context, txFinder validator.TxFinderI, tx *sdkTx.Transaction, feeModel *fees.SatoshisPerKilobyte, tracingEnabled bool, tracingAttributes ...attribute.KeyValue) *validator.Error {
ctx, span := tracing.StartTracing(ctx, "cumulativeCheckFees", tracingEnabled, tracingAttributes...)
defer tracing.EndTracing(span)
txSet, err := getUnminedAncestors(ctx, txFinder, tx, tracingEnabled, tracingAttributes...)
if err != nil {
e := fmt.Errorf("getting all unmined ancestors for CFV failed. reason: %w. found: %d", err, len(txSet))
return validator.NewError(e, api.ErrStatusCumulativeFees)
Expand Down
31 changes: 16 additions & 15 deletions internal/validator/default/default_validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,18 @@ import (
"os"
"testing"

"github.com/bitcoin-sv/go-sdk/script"
sdkTx "github.com/bitcoin-sv/go-sdk/transaction"
"github.com/ordishs/go-bitcoin"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/bitcoin-sv/arc/internal/fees"
"github.com/bitcoin-sv/arc/internal/testdata"
validation "github.com/bitcoin-sv/arc/internal/validator"
fixture "github.com/bitcoin-sv/arc/internal/validator/default/testdata"
"github.com/bitcoin-sv/arc/internal/validator/mocks"
"github.com/bitcoin-sv/arc/pkg/api"
"github.com/bitcoin-sv/go-sdk/script"
sdkTx "github.com/bitcoin-sv/go-sdk/transaction"
"github.com/ordishs/go-bitcoin"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var validLockingScript = &script.Script{
Expand All @@ -40,7 +41,7 @@ func TestValidator(t *testing.T) {
sut := New(policy, nil)

// when
actualError := sut.ValidateTransaction(context.TODO(), tx, validation.StandardFeeValidation, validation.StandardScriptValidation)
actualError := sut.ValidateTransaction(context.TODO(), tx, validation.StandardFeeValidation, validation.StandardScriptValidation, false)

// then
require.NoError(t, actualError)
Expand All @@ -54,7 +55,7 @@ func TestValidator(t *testing.T) {
sut := New(policy, nil)

// when
actualError := sut.ValidateTransaction(context.TODO(), tx, validation.StandardFeeValidation, validation.StandardScriptValidation)
actualError := sut.ValidateTransaction(context.TODO(), tx, validation.StandardFeeValidation, validation.StandardScriptValidation, false)

// then
require.Error(t, actualError, "Validation should have returned an error")
Expand All @@ -71,7 +72,7 @@ func TestValidator(t *testing.T) {
sut := New(policy, nil)

// when
actualError := sut.ValidateTransaction(context.TODO(), tx, validation.StandardFeeValidation, validation.StandardScriptValidation)
actualError := sut.ValidateTransaction(context.TODO(), tx, validation.StandardFeeValidation, validation.StandardScriptValidation, false)

// then
require.Error(t, actualError)
Expand All @@ -85,7 +86,7 @@ func TestValidator(t *testing.T) {
sut := New(policy, nil)

// when
actualError := sut.ValidateTransaction(context.TODO(), tx, validation.StandardFeeValidation, validation.StandardScriptValidation)
actualError := sut.ValidateTransaction(context.TODO(), tx, validation.StandardFeeValidation, validation.StandardScriptValidation, false)

// then
require.NoError(t, actualError)
Expand All @@ -107,7 +108,7 @@ func TestValidator(t *testing.T) {
sut := New(policy, nil)

// when
actualError := sut.ValidateTransaction(context.TODO(), tx, validation.StandardFeeValidation, validation.StandardScriptValidation)
actualError := sut.ValidateTransaction(context.TODO(), tx, validation.StandardFeeValidation, validation.StandardScriptValidation, false)

// then
require.NoError(t, actualError, "Failed to validate tx %d", txIndex)
Expand Down Expand Up @@ -141,7 +142,7 @@ func TestValidator(t *testing.T) {
sut := New(policy, nil)

// when
actualError := sut.ValidateTransaction(context.TODO(), tx, validation.StandardFeeValidation, validation.StandardScriptValidation)
actualError := sut.ValidateTransaction(context.TODO(), tx, validation.StandardFeeValidation, validation.StandardScriptValidation, false)

// then
require.NoError(t, actualError, "Failed to validate tx")
Expand All @@ -161,7 +162,7 @@ func TestValidator(t *testing.T) {
sut := New(getPolicy(5), &txFinder)

// when
actualError := sut.ValidateTransaction(context.TODO(), rawTx, validation.StandardFeeValidation, validation.StandardScriptValidation)
actualError := sut.ValidateTransaction(context.TODO(), rawTx, validation.StandardFeeValidation, validation.StandardScriptValidation, false)

// then
require.NoError(t, actualError)
Expand Down Expand Up @@ -287,7 +288,7 @@ func BenchmarkValidator(b *testing.B) {
sut := New(policy, nil)

for i := 0; i < b.N; i++ {
_ = sut.ValidateTransaction(context.TODO(), tx, validation.StandardFeeValidation, validation.StandardScriptValidation)
_ = sut.ValidateTransaction(context.TODO(), tx, validation.StandardFeeValidation, validation.StandardScriptValidation, false)
}
}

Expand All @@ -299,7 +300,7 @@ func TestFeeCalculation(t *testing.T) {
sut := New(policy, nil)

// when
err = sut.ValidateTransaction(context.TODO(), tx, validation.StandardFeeValidation, validation.StandardScriptValidation)
err = sut.ValidateTransaction(context.TODO(), tx, validation.StandardFeeValidation, validation.StandardScriptValidation, false)

// then
t.Log(err)
Expand Down Expand Up @@ -483,7 +484,7 @@ func TestCumulativeCheckFees(t *testing.T) {
tx, _ := sdkTx.NewTransactionFromHex(tc.hex)

// when
actualError := cumulativeCheckFees(context.TODO(), &txFinder, tx, tc.feeModel)
actualError := cumulativeCheckFees(context.TODO(), &txFinder, tx, tc.feeModel, false)

// then
if tc.expectedErr == nil {
Expand Down
16 changes: 12 additions & 4 deletions internal/validator/default/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,22 @@ import (
"errors"
"fmt"

"github.com/bitcoin-sv/arc/internal/validator"
sdkTx "github.com/bitcoin-sv/go-sdk/transaction"
"go.opentelemetry.io/otel/attribute"

"github.com/bitcoin-sv/arc/internal/tracing"
"github.com/bitcoin-sv/arc/internal/validator"
)

var (
ErrParentNotFound = errors.New("parent transaction not found")
ErrFailedToGetRawTxs = errors.New("failed to get raw transactions for parent")
)

func extendTx(ctx context.Context, f validator.TxFinderI, rawTx *sdkTx.Transaction) error {
func extendTx(ctx context.Context, f validator.TxFinderI, rawTx *sdkTx.Transaction, tracingEnabled bool, tracingAttributes ...attribute.KeyValue) error {
ctx, span := tracing.StartTracing(ctx, "extendTx", tracingEnabled, tracingAttributes...)
defer tracing.EndTracing(span)

// potential improvement: implement version for the rawTx with only one input

// get distinct parents
Expand Down Expand Up @@ -69,7 +75,9 @@ func extendTx(ctx context.Context, f validator.TxFinderI, rawTx *sdkTx.Transacti
}

// getUnminedAncestors returns unmined ancestors with data necessary to perform Deep Fee validation
func getUnminedAncestors(ctx context.Context, w validator.TxFinderI, tx *sdkTx.Transaction) (map[string]*sdkTx.Transaction, error) {
func getUnminedAncestors(ctx context.Context, w validator.TxFinderI, tx *sdkTx.Transaction, tracingEnabled bool, tracingAttributes ...attribute.KeyValue) (map[string]*sdkTx.Transaction, error) {
ctx, span := tracing.StartTracing(ctx, "getUnminedAncestors", tracingEnabled, tracingAttributes...)
defer tracing.EndTracing(span)
unmindedAncestorsSet := make(map[string]*sdkTx.Transaction)

// get distinct parents
Expand Down Expand Up @@ -130,7 +138,7 @@ func getUnminedAncestors(ctx context.Context, w validator.TxFinderI, tx *sdkTx.T
unmindedAncestorsSet[p.TxID] = bTx

// get parent ancestors
parentAncestorsSet, err := getUnminedAncestors(ctx, w, bTx)
parentAncestorsSet, err := getUnminedAncestors(ctx, w, bTx, tracingEnabled, tracingAttributes...)
for aID, aTx := range parentAncestorsSet {
unmindedAncestorsSet[aID] = aTx
}
Expand Down
Loading

0 comments on commit 753164f

Please sign in to comment.