Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/metamorph client #655

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion cmd/arc/services/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,20 @@ func natsMqClient(logger *slog.Logger, arcConfig *config.ArcConfig) (metamorph.M
opts = append(opts, nats_jetstream.WithFileStorage())
}

if arcConfig.Tracing.Enabled {
opts = append(opts, nats_jetstream.WithTracer(arcConfig.Tracing.KeyValueAttributes...))
}

mqClient, err = nats_jetstream.New(conn, logger, []string{metamorph.SubmitTxTopic}, opts...)
if err != nil {
return nil, fmt.Errorf("failed to create nats client: %v", err)
}
} else {
mqClient = nats_core.New(conn, nats_core.WithLogger(logger))
opts := []nats_core.Option{nats_core.WithLogger(logger)}
if arcConfig.Tracing.Enabled {
opts = append(opts, nats_core.WithTracer(arcConfig.Tracing.KeyValueAttributes...))
}
mqClient = nats_core.New(conn, opts...)
}

return mqClient, nil
Expand Down
10 changes: 9 additions & 1 deletion cmd/arc/services/blocktx.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err
opts = append(opts, nats_jetstream.WithFileStorage())
}

if arcConfig.Tracing.Enabled {
opts = append(opts, nats_jetstream.WithTracer(arcConfig.Tracing.KeyValueAttributes...))
}

mqClient, err = nats_jetstream.New(natsConnection, logger,
[]string{blocktx.MinedTxsTopic, blocktx.RegisterTxTopic, blocktx.RequestTxTopic},
opts...,
Expand All @@ -97,7 +101,11 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err
return nil, fmt.Errorf("failed to create nats client: %v", err)
}
} else {
mqClient = nats_core.New(natsConnection, nats_core.WithLogger(logger))
opts := []nats_core.Option{nats_core.WithLogger(logger)}
if arcConfig.Tracing.Enabled {
opts = append(opts, nats_core.WithTracer(arcConfig.Tracing.KeyValueAttributes...))
}
mqClient = nats_core.New(natsConnection, opts...)
}

processorOpts := []func(handler *blocktx.Processor){
Expand Down
10 changes: 9 additions & 1 deletion cmd/arc/services/metamorph.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore
opts = append(opts, nats_jetstream.WithFileStorage())
}

if arcConfig.Tracing.Enabled {
opts = append(opts, nats_jetstream.WithTracer(arcConfig.Tracing.KeyValueAttributes...))
}

mqClient, err = nats_jetstream.New(natsClient, logger,
[]string{metamorph.MinedTxsTopic, metamorph.SubmitTxTopic, metamorph.RegisterTxTopic, metamorph.RequestTxTopic},
opts...,
Expand All @@ -115,7 +119,11 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore
return nil, fmt.Errorf("failed to create nats client: %v", err)
}
} else {
mqClient = nats_core.New(natsClient, nats_core.WithLogger(logger))
opts := []nats_core.Option{nats_core.WithLogger(logger)}
if arcConfig.Tracing.Enabled {
opts = append(opts, nats_core.WithTracer(arcConfig.Tracing.KeyValueAttributes...))
}
mqClient = nats_core.New(natsClient, opts...)
}

procLogger := logger.With(slog.String("module", "mtm-proc"))
Expand Down
15 changes: 11 additions & 4 deletions internal/blocktx/mocks/mq_client_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion internal/blocktx/mq_client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package blocktx

import (
"context"

"google.golang.org/protobuf/proto"
)

Expand All @@ -11,7 +13,7 @@ const (
)

type MessageQueueClient interface {
PublishMarshal(topic string, m proto.Message) error
PublishMarshal(ctx context.Context, topic string, m proto.Message) error
Subscribe(topic string, msgFunc func([]byte) error) error
Shutdown()
}
16 changes: 9 additions & 7 deletions internal/blocktx/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func (p *Processor) publishMinedTxs(txHashes []*chainhash.Hash) error {
BlockHeight: minedTx.BlockHeight,
MerklePath: minedTx.MerklePath,
}
err = p.mqClient.PublishMarshal(MinedTxsTopic, txBlock)
err = p.mqClient.PublishMarshal(p.ctx, MinedTxsTopic, txBlock)
}

if err != nil {
Expand All @@ -413,16 +413,17 @@ func (p *Processor) registerTransactions(txHashes [][]byte) {

func (p *Processor) buildMerkleTreeStoreChainHash(ctx context.Context, txids []*chainhash.Hash) []*chainhash.Hash {
_, span := tracing.StartTracing(ctx, "buildMerkleTreeStoreChainHash", p.tracingEnabled, p.tracingAttributes...)
defer tracing.EndTracing(span)
defer tracing.EndTracing(span, nil)

return bc.BuildMerkleTreeStoreChainHash(txids)
}

func (p *Processor) processBlock(msg *p2p.BlockMessage) error {
ctx := p.ctx
var err error

ctx, span := tracing.StartTracing(ctx, "processBlock", p.tracingEnabled, p.tracingAttributes...)
defer tracing.EndTracing(span)
defer tracing.EndTracing(span, err)

timeStart := time.Now()

Expand Down Expand Up @@ -616,8 +617,9 @@ func (p *Processor) performReorg(ctx context.Context, incomingBlock *blocktx_api
}

func (p *Processor) markTransactionsAsMined(ctx context.Context, blockID uint64, merkleTree []*chainhash.Hash, blockHeight uint64, blockhash *chainhash.Hash) error {
var err error
ctx, span := tracing.StartTracing(ctx, "markTransactionsAsMined", p.tracingEnabled, p.tracingAttributes...)
defer tracing.EndTracing(span)
defer tracing.EndTracing(span, err)

txs := make([]store.TxWithMerklePath, 0, p.transactionStorageBatchSize)
leaves := merkleTree[:(len(merkleTree)+1)/2]
Expand Down Expand Up @@ -672,7 +674,7 @@ func (p *Processor) markTransactionsAsMined(ctx context.Context, blockID uint64,
BlockHeight: blockHeight,
MerklePath: updResp.MerklePath,
}
err = p.mqClient.PublishMarshal(MinedTxsTopic, txBlock)
err = p.mqClient.PublishMarshal(ctx, MinedTxsTopic, txBlock)
if err != nil {
p.logger.Error("failed to publish mined txs", slog.String("hash", blockhash.String()), slog.Uint64("height", blockHeight), slog.String("err", err.Error()))
}
Expand All @@ -686,7 +688,7 @@ func (p *Processor) markTransactionsAsMined(ctx context.Context, blockID uint64,
}
}

tracing.EndTracing(iterateMerkleTree)
tracing.EndTracing(iterateMerkleTree, nil)

// update all remaining transactions
updateResp, err := p.store.UpsertBlockTransactions(ctx, blockID, txs)
Expand All @@ -701,7 +703,7 @@ func (p *Processor) markTransactionsAsMined(ctx context.Context, blockID uint64,
BlockHeight: blockHeight,
MerklePath: updResp.MerklePath,
}
err = p.mqClient.PublishMarshal(MinedTxsTopic, txBlock)
err = p.mqClient.PublishMarshal(ctx, MinedTxsTopic, txBlock)
if err != nil {
p.logger.Error("failed to publish mined txs", slog.String("hash", blockhash.String()), slog.Uint64("height", blockHeight), slog.String("err", err.Error()))
}
Expand Down
17 changes: 9 additions & 8 deletions internal/blocktx/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,19 @@ import (
"testing"
"time"

"github.com/bitcoin-sv/arc/internal/blocktx"
"github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api"
"github.com/bitcoin-sv/arc/internal/blocktx/mocks"
"github.com/bitcoin-sv/arc/internal/blocktx/store"
storeMocks "github.com/bitcoin-sv/arc/internal/blocktx/store/mocks"
"github.com/bitcoin-sv/arc/internal/testdata"
"github.com/libsv/go-bc"
"github.com/libsv/go-p2p"
"github.com/libsv/go-p2p/chaincfg/chainhash"
"github.com/libsv/go-p2p/wire"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/reflect/protoreflect"

"github.com/bitcoin-sv/arc/internal/blocktx"
"github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api"
"github.com/bitcoin-sv/arc/internal/blocktx/mocks"
"github.com/bitcoin-sv/arc/internal/blocktx/store"
storeMocks "github.com/bitcoin-sv/arc/internal/blocktx/store/mocks"
"github.com/bitcoin-sv/arc/internal/testdata"
)

func TestHandleBlock(t *testing.T) {
Expand Down Expand Up @@ -169,7 +170,7 @@ func TestHandleBlock(t *testing.T) {
}

mq := &mocks.MessageQueueClientMock{
PublishMarshalFunc: func(_ string, _ protoreflect.ProtoMessage) error {
PublishMarshalFunc: func(_ context.Context, _ string, _ protoreflect.ProtoMessage) error {
return nil
},
}
Expand Down Expand Up @@ -699,7 +700,7 @@ func TestStartProcessRequestTxs(t *testing.T) {
}

mq := &mocks.MessageQueueClientMock{
PublishMarshalFunc: func(_ string, _ protoreflect.ProtoMessage) error {
PublishMarshalFunc: func(_ context.Context, _ string, _ protoreflect.ProtoMessage) error {
return tc.publishMinedErr
},
}
Expand Down
3 changes: 2 additions & 1 deletion internal/blocktx/store/postgresql/get_mined_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ import (
)

func (p *PostgreSQL) GetMinedTransactions(ctx context.Context, hashes []*chainhash.Hash) ([]store.GetMinedTransactionResult, error) {
var err error
ctx, span := tracing.StartTracing(ctx, "GetMinedTransactions", p.tracingEnabled, p.tracingAttributes...)
defer tracing.EndTracing(span)
defer tracing.EndTracing(span, err)

var hashSlice [][]byte
for _, hash := range hashes {
Expand Down
5 changes: 3 additions & 2 deletions internal/blocktx/store/postgresql/insert_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ import (
)

func (p *PostgreSQL) UpsertBlock(ctx context.Context, block *blocktx_api.Block) (uint64, error) {
var err error
ctx, span := tracing.StartTracing(ctx, "UpsertBlock", p.tracingEnabled, p.tracingAttributes...)
defer tracing.EndTracing(span)
defer tracing.EndTracing(span, err)

qInsert := `
INSERT INTO blocktx.blocks (hash, prevhash, merkleroot, height, status, chainwork)
Expand All @@ -31,7 +32,7 @@ func (p *PostgreSQL) UpsertBlock(ctx context.Context, block *blocktx_api.Block)
block.GetChainwork(),
)

err := row.Scan(&blockID)
err = row.Scan(&blockID)
if err != nil {
return 0, errors.Join(store.ErrFailedToInsertBlock, err)
}
Expand Down
5 changes: 3 additions & 2 deletions internal/blocktx/store/postgresql/mark_block_as_done.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ import (
)

func (p *PostgreSQL) MarkBlockAsDone(ctx context.Context, hash *chainhash.Hash, size uint64, txCount uint64) error {
var err error
ctx, span := tracing.StartTracing(ctx, "MarkBlockAsDone", p.tracingEnabled, p.tracingAttributes...)
defer tracing.EndTracing(span)
defer tracing.EndTracing(span, err)

q := `
UPDATE blocktx.blocks
Expand All @@ -20,7 +21,7 @@ func (p *PostgreSQL) MarkBlockAsDone(ctx context.Context, hash *chainhash.Hash,
WHERE hash = $3
`

if _, err := p.db.ExecContext(ctx, q, size, txCount, hash[:], p.now()); err != nil {
if _, err = p.db.ExecContext(ctx, q, size, txCount, hash[:], p.now()); err != nil {
return err
}

Expand Down
3 changes: 2 additions & 1 deletion internal/blocktx/store/postgresql/set_block_processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ func (p *PostgreSQL) SetBlockProcessing(ctx context.Context, hash *chainhash.Has
}

func (p *PostgreSQL) DelBlockProcessing(ctx context.Context, hash *chainhash.Hash, processedBy string) (int64, error) {
var err error
ctx, span := tracing.StartTracing(ctx, "DelBlockProcessing", p.tracingEnabled, p.tracingAttributes...)
defer tracing.EndTracing(span)
defer tracing.EndTracing(span, err)

q := `
DELETE FROM blocktx.block_processing WHERE block_hash = $1 AND processed_by = $2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ import (

// UpsertBlockTransactions upserts the transaction hashes for a given block hash and returns updated registered transactions hashes.
func (p *PostgreSQL) UpsertBlockTransactions(ctx context.Context, blockID uint64, txsWithMerklePaths []store.TxWithMerklePath) ([]store.TxWithMerklePath, error) {
var err error
ctx, span := tracing.StartTracing(ctx, "UpdateBlockTransactions", p.tracingEnabled, p.tracingAttributes...)
defer tracing.EndTracing(span)
defer tracing.EndTracing(span, err)

txHashesBytes := make([][]byte, len(txsWithMerklePaths))
merklePaths := make([]string, len(txsWithMerklePaths))
Expand Down Expand Up @@ -50,7 +51,7 @@ func (p *PostgreSQL) UpsertBlockTransactions(ctx context.Context, blockID uint64
WHERE m.blockid = $1 AND t.is_registered = TRUE AND t.hash = ANY($2)
`

_, err := p.db.ExecContext(ctx, qUpsertTransactions, blockID, pq.Array(txHashesBytes), pq.Array(merklePaths))
_, err = p.db.ExecContext(ctx, qUpsertTransactions, blockID, pq.Array(txHashesBytes), pq.Array(merklePaths))
if err != nil {
return nil, errors.Join(store.ErrFailedToExecuteTxUpdateQuery, err)
}
Expand Down
Loading
Loading