Skip to content

Commit

Permalink
refactor: Re-register instead of re-request
Browse files Browse the repository at this point in the history
  • Loading branch information
boecklim committed Jan 28, 2025
1 parent d1ce272 commit 5079e85
Show file tree
Hide file tree
Showing 10 changed files with 14 additions and 227 deletions.
4 changes: 1 addition & 3 deletions cmd/arc/services/blocktx.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err
}

registerTxsChan := make(chan []byte, chanBufferSize)
requestTxChannel := make(chan []byte, chanBufferSize)

natsConnection, err := nats_connection.New(arcConfig.MessageQueue.URL, logger)
if err != nil {
Expand All @@ -93,7 +92,7 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err

if arcConfig.MessageQueue.Streaming.Enabled {
opts := []nats_jetstream.Option{
nats_jetstream.WithSubscribedWorkQueuePolicy(blocktx.RegisterTxTopic, blocktx.RequestTxTopic),
nats_jetstream.WithSubscribedWorkQueuePolicy(blocktx.RegisterTxTopic),
nats_jetstream.WithWorkQueuePolicy(blocktx.MinedTxsTopic),
}
if arcConfig.MessageQueue.Streaming.FileStorage {
Expand Down Expand Up @@ -122,7 +121,6 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err
processorOpts = append(processorOpts,
blocktx.WithRetentionDays(btxConfig.RecordRetentionDays),
blocktx.WithRegisterTxsChan(registerTxsChan),
blocktx.WithRequestTxChan(requestTxChannel),
blocktx.WithRegisterTxsInterval(btxConfig.RegisterTxsInterval),
blocktx.WithMessageQueueClient(mqClient),
blocktx.WithMaxBlockProcessingDuration(btxConfig.MaxBlockProcessingDuration),
Expand Down
2 changes: 1 addition & 1 deletion cmd/arc/services/metamorph.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore
if arcConfig.MessageQueue.Streaming.Enabled {
opts := []nats_jetstream.Option{
nats_jetstream.WithSubscribedWorkQueuePolicy(metamorph.MinedTxsTopic, metamorph.SubmitTxTopic),
nats_jetstream.WithWorkQueuePolicy(metamorph.RegisterTxTopic, metamorph.RequestTxTopic),
nats_jetstream.WithWorkQueuePolicy(metamorph.RegisterTxTopic),
nats_jetstream.WithInterestPolicy(metamorph.CallbackTopic),
}
if arcConfig.MessageQueue.Streaming.FileStorage {
Expand Down
8 changes: 4 additions & 4 deletions internal/blocktx/integration_test/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func setupSut(t *testing.T, dbInfo string) (*blocktx.Processor, *blocktx.PeerHan

blockProcessCh := make(chan *p2p.BlockMessage, 10)

requestTxChannel := make(chan []byte, 10)
registerTxChannel := make(chan []byte, 10)
publishedTxsCh := make(chan *blocktx_api.TransactionBlock, 10)

store, err := postgresql.New(dbInfo, 10, 80)
Expand All @@ -51,12 +51,12 @@ func setupSut(t *testing.T, dbInfo string) (*blocktx.Processor, *blocktx.PeerHan
nil,
blockProcessCh,
blocktx.WithMessageQueueClient(mqClient),
blocktx.WithRequestTxChan(requestTxChannel),
blocktx.WithRegisterRequestTxsBatchSize(1), // process transaction immediately
blocktx.WithRegisterTxsChan(registerTxChannel),
blocktx.WithRegisterTxsBatchSize(1), // process transaction immediately
)
require.NoError(t, err)

return processor, p2pMsgHandler, store, requestTxChannel, publishedTxsCh
return processor, p2pMsgHandler, store, registerTxChannel, publishedTxsCh
}

func getPublishedTxs(publishedTxsCh chan *blocktx_api.TransactionBlock) []*blocktx_api.TransactionBlock {
Expand Down
9 changes: 5 additions & 4 deletions internal/blocktx/integration_test/merkle_paths_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ import (
"testing"
"time"

testutils "github.com/bitcoin-sv/arc/internal/test_utils"
"github.com/stretchr/testify/require"

testutils "github.com/bitcoin-sv/arc/internal/test_utils"
)

func TestMerklePaths(t *testing.T) {
Expand All @@ -18,14 +19,14 @@ func TestMerklePaths(t *testing.T) {
defer pruneTables(t, dbConn)
testutils.LoadFixtures(t, dbConn, "fixtures/merkle_paths")

processor, _, _, requestTxChannel, publishedTxsCh := setupSut(t, dbInfo)
processor, _, _, registerTxChannel, publishedTxsCh := setupSut(t, dbInfo)

txWithoutMerklePath := testutils.RevChainhash(t, "cd3d2f97dfc0cdb6a07ec4b72df5e1794c9553ff2f62d90ed4add047e8088853")
expectedMerklePath := "fefe8a0c0003020002cd3d2f97dfc0cdb6a07ec4b72df5e1794c9553ff2f62d90ed4add047e8088853010021132d32cb5411c058bb4391f24f6a36ed9b810df851d0e36cac514fd03d6b4e010100f883cc2d3bb5d4485accaa3502cf834934420616d8556b204da5658456b48b21010100e2277e52528e1a5e6117e45300e3f5f169b1712292399d065bc5167c54b8e0b5"

// when
requestTxChannel <- txWithoutMerklePath[:]
processor.StartProcessRequestTxs()
registerTxChannel <- txWithoutMerklePath[:]
processor.StartProcessRegisterTxs()

// give blocktx time to pull all transactions from block and calculate the merkle path
time.Sleep(200 * time.Millisecond)
Expand Down
1 change: 0 additions & 1 deletion internal/blocktx/mq_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
const (
MinedTxsTopic = "mined-txs"
RegisterTxTopic = "register-tx"
RequestTxTopic = "request-tx"
)

type MessageQueueClient interface {
Expand Down
68 changes: 0 additions & 68 deletions internal/blocktx/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ const (
registerTxsIntervalDefault = time.Second * 10
registerRequestTxsIntervalDefault = time.Second * 5
registerTxsBatchSizeDefault = 100
registerRequestTxBatchSizeDefault = 100
waitForBlockProcessing = 5 * time.Minute
parallellism = 5
)
Expand All @@ -61,11 +60,9 @@ type Processor struct {
dataRetentionDays int
mqClient MessageQueueClient
registerTxsChan chan []byte
requestTxChannel chan []byte
registerTxsInterval time.Duration
registerRequestTxsInterval time.Duration
registerTxsBatchSize int
registerRequestTxsBatchSize int
tracingEnabled bool
tracingAttributes []attribute.KeyValue
stats *processorStats
Expand Down Expand Up @@ -101,7 +98,6 @@ func NewProcessor(
registerTxsInterval: registerTxsIntervalDefault,
registerRequestTxsInterval: registerRequestTxsIntervalDefault,
registerTxsBatchSize: registerTxsBatchSizeDefault,
registerRequestTxsBatchSize: registerRequestTxBatchSizeDefault,
maxBlockProcessingDuration: waitForBlockProcessing,
hostname: hostname,
stats: newProcessorStats(),
Expand Down Expand Up @@ -130,14 +126,6 @@ func (p *Processor) Start(statsEnabled bool) error {
return errors.Join(ErrFailedToSubscribeToTopic, fmt.Errorf("topic: %s", RegisterTxTopic), err)
}

err = p.mqClient.Subscribe(RequestTxTopic, func(msg []byte) error {
p.requestTxChannel <- msg
return nil
})
if err != nil {
return errors.Join(ErrFailedToSubscribeToTopic, fmt.Errorf("topic: %s", RequestTxTopic), err)
}

if statsEnabled {
err = p.StartCollectStats()
if err != nil {
Expand All @@ -147,7 +135,6 @@ func (p *Processor) Start(statsEnabled bool) error {
p.StartBlockRequesting()
p.StartBlockProcessing()
p.StartProcessRegisterTxs()
p.StartProcessRequestTxs()

return nil
}
Expand Down Expand Up @@ -271,61 +258,6 @@ func (p *Processor) StartProcessRegisterTxs() {
}()
}

func (p *Processor) StartProcessRequestTxs() {
p.waitGroup.Add(1)

txHashes := make([][]byte, 0, p.registerRequestTxsBatchSize)

ticker := time.NewTicker(p.registerRequestTxsInterval)

go func() {
defer p.waitGroup.Done()

for {
select {
case <-p.ctx.Done():
return
case txHash := <-p.requestTxChannel:
_, err := chainhash.NewHash(txHash)
if err != nil {
p.logger.Error("Failed to create hash from byte array", slog.String("err", err.Error()))
continue
}

txHashes = append(txHashes, txHash)

if len(txHashes) < p.registerRequestTxsBatchSize || len(txHashes) == 0 {
continue
}

err = p.publishMinedTxs(txHashes)
if err != nil {
p.logger.Error("failed to publish mined txs", slog.String("err", err.Error()))
continue // retry, don't clear the txHashes slice
}

txHashes = make([][]byte, 0, p.registerRequestTxsBatchSize)
ticker.Reset(p.registerRequestTxsInterval)

case <-ticker.C:
if len(txHashes) == 0 {
continue
}

err := p.publishMinedTxs(txHashes)
if err != nil {
p.logger.Error("failed to publish mined txs", slog.String("err", err.Error()))
ticker.Reset(p.registerRequestTxsInterval)
continue // retry, don't clear the txHashes slice
}

txHashes = make([][]byte, 0, p.registerRequestTxsBatchSize)
ticker.Reset(p.registerRequestTxsInterval)
}
}
}()
}

func (p *Processor) publishMinedTxs(txHashes [][]byte) error {
minedTxs, err := p.store.GetMinedTransactions(p.ctx, txHashes, false)
if err != nil {
Expand Down
18 changes: 0 additions & 18 deletions internal/blocktx/processor_opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,36 +31,18 @@ func WithRegisterTxsInterval(d time.Duration) func(*Processor) {
}
}

func WithRegisterRequestTxsInterval(d time.Duration) func(*Processor) {
return func(p *Processor) {
p.registerRequestTxsInterval = d
}
}

func WithRegisterTxsChan(registerTxsChan chan []byte) func(*Processor) {
return func(processor *Processor) {
processor.registerTxsChan = registerTxsChan
}
}

func WithRequestTxChan(requestTxChannel chan []byte) func(*Processor) {
return func(processor *Processor) {
processor.requestTxChannel = requestTxChannel
}
}

func WithRegisterTxsBatchSize(size int) func(*Processor) {
return func(processor *Processor) {
processor.registerTxsBatchSize = size
}
}

func WithRegisterRequestTxsBatchSize(size int) func(*Processor) {
return func(processor *Processor) {
processor.registerRequestTxsBatchSize = size
}
}

func WithTracer(attr ...attribute.KeyValue) func(*Processor) {
return func(p *Processor) {
p.tracingEnabled = true
Expand Down
124 changes: 0 additions & 124 deletions internal/blocktx/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,124 +633,6 @@ func TestStartBlockRequesting(t *testing.T) {
}
}

func TestStartProcessRequestTxs(t *testing.T) {
tt := []struct {
name string
requests int
getMinedErr error
publishMinedErr error
requestedTx []byte

expectedGetMinedCalls int
expectedPublishMinedCalls int
}{
{
name: "success - 5 requests",
requests: 5,
requestedTx: testdata.TX1Hash[:],

expectedGetMinedCalls: 2,
expectedPublishMinedCalls: 2,
},
{
name: "5 requests, error - get mined",
requests: 5,
getMinedErr: errors.New("get mined error"),
requestedTx: testdata.TX1Hash[:],

expectedGetMinedCalls: 4, // 3 times on the channel message, 1 time on ticker
expectedPublishMinedCalls: 0,
},
{
name: "5 requests, error - publish mined",
requests: 5,
publishMinedErr: errors.New("publish mined error"),
requestedTx: testdata.TX1Hash[:],

expectedGetMinedCalls: 4, // 3 times on the channel message, 1 time on ticker
expectedPublishMinedCalls: 4,
},
{
name: "success - 2 requests",
requests: 2,
requestedTx: testdata.TX1Hash[:],

expectedGetMinedCalls: 1,
expectedPublishMinedCalls: 1,
},
{
name: "success - 0 requests",
requests: 0,
requestedTx: testdata.TX1Hash[:],

expectedGetMinedCalls: 0,
expectedPublishMinedCalls: 0,
},
{
name: "error - not a tx",
requests: 1,
requestedTx: []byte("not a tx"),

expectedGetMinedCalls: 0,
expectedPublishMinedCalls: 0,
},
}

for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
// given
storeMock := &storeMocks.BlocktxStoreMock{
GetMinedTransactionsFunc: func(_ context.Context, hashes [][]byte, _ bool) ([]store.BlockTransaction, error) {
for _, hash := range hashes {
require.Equal(t, testdata.TX1Hash[:], hash)
}

return []store.BlockTransaction{{
TxHash: testdata.TX1Hash[:],
BlockHash: testdata.Block1Hash[:],
BlockHeight: 1,
}}, tc.getMinedErr
},
GetBlockTransactionsHashesFunc: func(_ context.Context, _ []byte) ([]*chainhash.Hash, error) {
return []*chainhash.Hash{testdata.TX1Hash}, nil
},
}

mq := &mocks.MessageQueueClientMock{
PublishMarshalFunc: func(_ context.Context, _ string, _ protoreflect.ProtoMessage) error {
return tc.publishMinedErr
},
}

requestTxChannel := make(chan []byte, 5)

logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))

// when
sut, err := blocktx.NewProcessor(logger, storeMock,
nil, nil,
blocktx.WithRegisterRequestTxsInterval(15*time.Millisecond),
blocktx.WithRegisterRequestTxsBatchSize(3),
blocktx.WithRequestTxChan(requestTxChannel),
blocktx.WithMessageQueueClient(mq))
require.NoError(t, err)

for i := 0; i < tc.requests; i++ {
requestTxChannel <- tc.requestedTx
}

// call tested function
sut.StartProcessRequestTxs()
time.Sleep(20 * time.Millisecond)
sut.Shutdown()

// then
require.Equal(t, tc.expectedGetMinedCalls, len(storeMock.GetMinedTransactionsCalls()))
require.Equal(t, tc.expectedPublishMinedCalls, len(mq.PublishMarshalCalls()))
})
}
}

func TestStart(t *testing.T) {
tt := []struct {
name string
Expand All @@ -765,12 +647,6 @@ func TestStart(t *testing.T) {
name: "error - subscribe mined txs",
topicErr: map[string]error{blocktx.RegisterTxTopic: errors.New("failed to subscribe")},

expectedError: blocktx.ErrFailedToSubscribeToTopic,
},
{
name: "error - subscribe submit txs",
topicErr: map[string]error{blocktx.RequestTxTopic: errors.New("failed to subscribe")},

expectedError: blocktx.ErrFailedToSubscribeToTopic,
},
}
Expand Down
1 change: 0 additions & 1 deletion internal/metamorph/mq_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ const (
SubmitTxTopic = "submit-tx"
MinedTxsTopic = "mined-txs"
RegisterTxTopic = "register-tx"
RequestTxTopic = "request-tx"
CallbackTopic = "callback"
)

Expand Down
Loading

0 comments on commit 5079e85

Please sign in to comment.