Skip to content

Commit

Permalink
feat: Error traces (#656)
Browse files Browse the repository at this point in the history
  • Loading branch information
boecklim authored Nov 22, 2024
1 parent 4b851ac commit 12748c0
Show file tree
Hide file tree
Showing 36 changed files with 506 additions and 243 deletions.
10 changes: 9 additions & 1 deletion cmd/arc/services/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,20 @@ func natsMqClient(logger *slog.Logger, arcConfig *config.ArcConfig) (metamorph.M
opts = append(opts, nats_jetstream.WithFileStorage())
}

if arcConfig.Tracing.Enabled {
opts = append(opts, nats_jetstream.WithTracer(arcConfig.Tracing.KeyValueAttributes...))
}

mqClient, err = nats_jetstream.New(conn, logger, []string{metamorph.SubmitTxTopic}, opts...)
if err != nil {
return nil, fmt.Errorf("failed to create nats client: %v", err)
}
} else {
mqClient = nats_core.New(conn, nats_core.WithLogger(logger))
opts := []nats_core.Option{nats_core.WithLogger(logger)}
if arcConfig.Tracing.Enabled {
opts = append(opts, nats_core.WithTracer(arcConfig.Tracing.KeyValueAttributes...))
}
mqClient = nats_core.New(conn, opts...)
}

return mqClient, nil
Expand Down
10 changes: 9 additions & 1 deletion cmd/arc/services/blocktx.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err
opts = append(opts, nats_jetstream.WithFileStorage())
}

if arcConfig.Tracing.Enabled {
opts = append(opts, nats_jetstream.WithTracer(arcConfig.Tracing.KeyValueAttributes...))
}

mqClient, err = nats_jetstream.New(natsConnection, logger,
[]string{blocktx.MinedTxsTopic, blocktx.RegisterTxTopic, blocktx.RequestTxTopic},
opts...,
Expand All @@ -97,7 +101,11 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err
return nil, fmt.Errorf("failed to create nats client: %v", err)
}
} else {
mqClient = nats_core.New(natsConnection, nats_core.WithLogger(logger))
opts := []nats_core.Option{nats_core.WithLogger(logger)}
if arcConfig.Tracing.Enabled {
opts = append(opts, nats_core.WithTracer(arcConfig.Tracing.KeyValueAttributes...))
}
mqClient = nats_core.New(natsConnection, opts...)
}

processorOpts := []func(handler *blocktx.Processor){
Expand Down
10 changes: 9 additions & 1 deletion cmd/arc/services/metamorph.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore
opts = append(opts, nats_jetstream.WithFileStorage())
}

if arcConfig.Tracing.Enabled {
opts = append(opts, nats_jetstream.WithTracer(arcConfig.Tracing.KeyValueAttributes...))
}

mqClient, err = nats_jetstream.New(natsClient, logger,
[]string{metamorph.MinedTxsTopic, metamorph.SubmitTxTopic, metamorph.RegisterTxTopic, metamorph.RequestTxTopic},
opts...,
Expand All @@ -115,7 +119,11 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore
return nil, fmt.Errorf("failed to create nats client: %v", err)
}
} else {
mqClient = nats_core.New(natsClient, nats_core.WithLogger(logger))
opts := []nats_core.Option{nats_core.WithLogger(logger)}
if arcConfig.Tracing.Enabled {
opts = append(opts, nats_core.WithTracer(arcConfig.Tracing.KeyValueAttributes...))
}
mqClient = nats_core.New(natsClient, opts...)
}

procLogger := logger.With(slog.String("module", "mtm-proc"))
Expand Down
15 changes: 11 additions & 4 deletions internal/blocktx/mocks/mq_client_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion internal/blocktx/mq_client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package blocktx

import (
"context"

"google.golang.org/protobuf/proto"
)

Expand All @@ -11,7 +13,7 @@ const (
)

type MessageQueueClient interface {
PublishMarshal(topic string, m proto.Message) error
PublishMarshal(ctx context.Context, topic string, m proto.Message) error
Subscribe(topic string, msgFunc func([]byte) error) error
Shutdown()
}
24 changes: 14 additions & 10 deletions internal/blocktx/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func (p *Processor) publishMinedTxs(txHashes []*chainhash.Hash) error {
BlockHeight: minedTx.BlockHeight,
MerklePath: minedTx.MerklePath,
}
err = p.mqClient.PublishMarshal(MinedTxsTopic, txBlock)
err = p.mqClient.PublishMarshal(p.ctx, MinedTxsTopic, txBlock)
}

if err != nil {
Expand All @@ -413,16 +413,18 @@ func (p *Processor) registerTransactions(txHashes [][]byte) {

func (p *Processor) buildMerkleTreeStoreChainHash(ctx context.Context, txids []*chainhash.Hash) []*chainhash.Hash {
_, span := tracing.StartTracing(ctx, "buildMerkleTreeStoreChainHash", p.tracingEnabled, p.tracingAttributes...)
defer tracing.EndTracing(span)
defer tracing.EndTracing(span, nil)

return bc.BuildMerkleTreeStoreChainHash(txids)
}

func (p *Processor) processBlock(msg *p2p.BlockMessage) error {
func (p *Processor) processBlock(msg *p2p.BlockMessage) (err error) {
ctx := p.ctx

ctx, span := tracing.StartTracing(ctx, "processBlock", p.tracingEnabled, p.tracingAttributes...)
defer tracing.EndTracing(span)
defer func() {
tracing.EndTracing(span, err)
}()

timeStart := time.Now()

Expand Down Expand Up @@ -480,7 +482,7 @@ func (p *Processor) processBlock(msg *p2p.BlockMessage) error {

incomingBlock.Status = blocktx_api.Status_LONGEST

err := p.performReorg(ctx, incomingBlock)
err = p.performReorg(ctx, incomingBlock)
if err != nil {
p.logger.Error("unable to perform reorg", slog.String("hash", blockHash.String()), slog.Uint64("height", incomingBlock.Height), slog.String("err", err.Error()))
return err
Expand Down Expand Up @@ -615,9 +617,11 @@ func (p *Processor) performReorg(ctx context.Context, incomingBlock *blocktx_api
return err
}

func (p *Processor) markTransactionsAsMined(ctx context.Context, blockID uint64, merkleTree []*chainhash.Hash, blockHeight uint64, blockhash *chainhash.Hash) error {
func (p *Processor) markTransactionsAsMined(ctx context.Context, blockID uint64, merkleTree []*chainhash.Hash, blockHeight uint64, blockhash *chainhash.Hash) (err error) {
ctx, span := tracing.StartTracing(ctx, "markTransactionsAsMined", p.tracingEnabled, p.tracingAttributes...)
defer tracing.EndTracing(span)
defer func() {
tracing.EndTracing(span, err)
}()

txs := make([]store.TxWithMerklePath, 0, p.transactionStorageBatchSize)
leaves := merkleTree[:(len(merkleTree)+1)/2]
Expand Down Expand Up @@ -672,7 +676,7 @@ func (p *Processor) markTransactionsAsMined(ctx context.Context, blockID uint64,
BlockHeight: blockHeight,
MerklePath: updResp.MerklePath,
}
err = p.mqClient.PublishMarshal(MinedTxsTopic, txBlock)
err = p.mqClient.PublishMarshal(ctx, MinedTxsTopic, txBlock)
if err != nil {
p.logger.Error("failed to publish mined txs", slog.String("hash", blockhash.String()), slog.Uint64("height", blockHeight), slog.String("err", err.Error()))
}
Expand All @@ -686,7 +690,7 @@ func (p *Processor) markTransactionsAsMined(ctx context.Context, blockID uint64,
}
}

tracing.EndTracing(iterateMerkleTree)
tracing.EndTracing(iterateMerkleTree, nil)

// update all remaining transactions
updateResp, err := p.store.UpsertBlockTransactions(ctx, blockID, txs)
Expand All @@ -701,7 +705,7 @@ func (p *Processor) markTransactionsAsMined(ctx context.Context, blockID uint64,
BlockHeight: blockHeight,
MerklePath: updResp.MerklePath,
}
err = p.mqClient.PublishMarshal(MinedTxsTopic, txBlock)
err = p.mqClient.PublishMarshal(ctx, MinedTxsTopic, txBlock)
if err != nil {
p.logger.Error("failed to publish mined txs", slog.String("hash", blockhash.String()), slog.Uint64("height", blockHeight), slog.String("err", err.Error()))
}
Expand Down
17 changes: 9 additions & 8 deletions internal/blocktx/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,19 @@ import (
"testing"
"time"

"github.com/bitcoin-sv/arc/internal/blocktx"
"github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api"
"github.com/bitcoin-sv/arc/internal/blocktx/mocks"
"github.com/bitcoin-sv/arc/internal/blocktx/store"
storeMocks "github.com/bitcoin-sv/arc/internal/blocktx/store/mocks"
"github.com/bitcoin-sv/arc/internal/testdata"
"github.com/libsv/go-bc"
"github.com/libsv/go-p2p"
"github.com/libsv/go-p2p/chaincfg/chainhash"
"github.com/libsv/go-p2p/wire"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/reflect/protoreflect"

"github.com/bitcoin-sv/arc/internal/blocktx"
"github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api"
"github.com/bitcoin-sv/arc/internal/blocktx/mocks"
"github.com/bitcoin-sv/arc/internal/blocktx/store"
storeMocks "github.com/bitcoin-sv/arc/internal/blocktx/store/mocks"
"github.com/bitcoin-sv/arc/internal/testdata"
)

func TestHandleBlock(t *testing.T) {
Expand Down Expand Up @@ -169,7 +170,7 @@ func TestHandleBlock(t *testing.T) {
}

mq := &mocks.MessageQueueClientMock{
PublishMarshalFunc: func(_ string, _ protoreflect.ProtoMessage) error {
PublishMarshalFunc: func(_ context.Context, _ string, _ protoreflect.ProtoMessage) error {
return nil
},
}
Expand Down Expand Up @@ -699,7 +700,7 @@ func TestStartProcessRequestTxs(t *testing.T) {
}

mq := &mocks.MessageQueueClientMock{
PublishMarshalFunc: func(_ string, _ protoreflect.ProtoMessage) error {
PublishMarshalFunc: func(_ context.Context, _ string, _ protoreflect.ProtoMessage) error {
return tc.publishMinedErr
},
}
Expand Down
8 changes: 5 additions & 3 deletions internal/blocktx/store/postgresql/get_mined_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,18 @@ import (
"github.com/bitcoin-sv/arc/internal/tracing"
)

func (p *PostgreSQL) GetMinedTransactions(ctx context.Context, hashes []*chainhash.Hash) ([]store.GetMinedTransactionResult, error) {
func (p *PostgreSQL) GetMinedTransactions(ctx context.Context, hashes []*chainhash.Hash) (result []store.GetMinedTransactionResult, err error) {
ctx, span := tracing.StartTracing(ctx, "GetMinedTransactions", p.tracingEnabled, p.tracingAttributes...)
defer tracing.EndTracing(span)
defer func() {
tracing.EndTracing(span, err)
}()

var hashSlice [][]byte
for _, hash := range hashes {
hashSlice = append(hashSlice, hash[:])
}

result := make([]store.GetMinedTransactionResult, 0, len(hashSlice))
result = make([]store.GetMinedTransactionResult, 0, len(hashSlice))

q := `
SELECT
Expand Down
10 changes: 5 additions & 5 deletions internal/blocktx/store/postgresql/insert_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ import (
"github.com/bitcoin-sv/arc/internal/tracing"
)

func (p *PostgreSQL) UpsertBlock(ctx context.Context, block *blocktx_api.Block) (uint64, error) {
func (p *PostgreSQL) UpsertBlock(ctx context.Context, block *blocktx_api.Block) (blockID uint64, err error) {
ctx, span := tracing.StartTracing(ctx, "UpsertBlock", p.tracingEnabled, p.tracingAttributes...)
defer tracing.EndTracing(span)
defer func() {
tracing.EndTracing(span, err)
}()

qInsert := `
INSERT INTO blocktx.blocks (hash, prevhash, merkleroot, height, status, chainwork)
Expand All @@ -20,8 +22,6 @@ func (p *PostgreSQL) UpsertBlock(ctx context.Context, block *blocktx_api.Block)
RETURNING id
`

var blockID uint64

row := p.db.QueryRowContext(ctx, qInsert,
block.GetHash(),
block.GetPreviousHash(),
Expand All @@ -31,7 +31,7 @@ func (p *PostgreSQL) UpsertBlock(ctx context.Context, block *blocktx_api.Block)
block.GetChainwork(),
)

err := row.Scan(&blockID)
err = row.Scan(&blockID)
if err != nil {
return 0, errors.Join(store.ErrFailedToInsertBlock, err)
}
Expand Down
8 changes: 5 additions & 3 deletions internal/blocktx/store/postgresql/mark_block_as_done.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import (
"github.com/bitcoin-sv/arc/internal/tracing"
)

func (p *PostgreSQL) MarkBlockAsDone(ctx context.Context, hash *chainhash.Hash, size uint64, txCount uint64) error {
func (p *PostgreSQL) MarkBlockAsDone(ctx context.Context, hash *chainhash.Hash, size uint64, txCount uint64) (err error) {
ctx, span := tracing.StartTracing(ctx, "MarkBlockAsDone", p.tracingEnabled, p.tracingAttributes...)
defer tracing.EndTracing(span)
defer func() {
tracing.EndTracing(span, err)
}()

q := `
UPDATE blocktx.blocks
Expand All @@ -20,7 +22,7 @@ func (p *PostgreSQL) MarkBlockAsDone(ctx context.Context, hash *chainhash.Hash,
WHERE hash = $3
`

if _, err := p.db.ExecContext(ctx, q, size, txCount, hash[:], p.now()); err != nil {
if _, err = p.db.ExecContext(ctx, q, size, txCount, hash[:], p.now()); err != nil {
return err
}

Expand Down
8 changes: 5 additions & 3 deletions internal/blocktx/store/postgresql/set_block_processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ func (p *PostgreSQL) SetBlockProcessing(ctx context.Context, hash *chainhash.Has
return processedBy, nil
}

func (p *PostgreSQL) DelBlockProcessing(ctx context.Context, hash *chainhash.Hash, processedBy string) (int64, error) {
func (p *PostgreSQL) DelBlockProcessing(ctx context.Context, hash *chainhash.Hash, processedBy string) (rowsAffected int64, err error) {
ctx, span := tracing.StartTracing(ctx, "DelBlockProcessing", p.tracingEnabled, p.tracingAttributes...)
defer tracing.EndTracing(span)
defer func() {
tracing.EndTracing(span, err)
}()

q := `
DELETE FROM blocktx.block_processing WHERE block_hash = $1 AND processed_by = $2;
Expand All @@ -50,7 +52,7 @@ func (p *PostgreSQL) DelBlockProcessing(ctx context.Context, hash *chainhash.Has
if err != nil {
return 0, err
}
rowsAffected, _ := res.RowsAffected()
rowsAffected, _ = res.RowsAffected()
if rowsAffected != 1 {
return 0, store.ErrBlockNotFound
}
Expand Down
Loading

0 comments on commit 12748c0

Please sign in to comment.