From c01af3e14b7dd89eef63610bcdebd95239836745 Mon Sep 17 00:00:00 2001 From: kuba-4chain Date: Mon, 2 Dec 2024 14:50:32 +0100 Subject: [PATCH] feat(ARCO-299): merge register and request transactions in blocktx --- cmd/arc/services/blocktx.go | 8 +- cmd/arc/services/metamorph.go | 3 +- config/config.go | 2 +- config/defaults.go | 2 +- config/example_config.yaml | 2 +- internal/blocktx/mq_client.go | 5 +- internal/blocktx/processor.go | 117 ++++---------- internal/blocktx/processor_opts.go | 26 +-- internal/blocktx/processor_test.go | 83 +--------- .../blocktx/store/mocks/blocktx_store_mock.go | 150 ++++++------------ .../register_transactions/blocktx.blocks.yaml | 11 -- .../blocktx.transactions.yaml | 32 ---- .../blocktx/store/postgresql/postgres_test.go | 90 +---------- .../store/postgresql/register_transactions.go | 47 ------ ...o => upsert_and_get_mined_transactions.go} | 33 ++-- internal/blocktx/store/store.go | 3 +- internal/metamorph/mq_client.go | 7 +- internal/metamorph/processor.go | 4 +- test/config/config.yaml | 2 +- 19 files changed, 126 insertions(+), 501 deletions(-) delete mode 100644 internal/blocktx/store/postgresql/fixtures/register_transactions/blocktx.blocks.yaml delete mode 100644 internal/blocktx/store/postgresql/fixtures/register_transactions/blocktx.transactions.yaml delete mode 100644 internal/blocktx/store/postgresql/register_transactions.go rename internal/blocktx/store/postgresql/{get_mined_transactions.go => upsert_and_get_mined_transactions.go} (53%) diff --git a/cmd/arc/services/blocktx.go b/cmd/arc/services/blocktx.go index 1c9f6a9da..aded0c9f7 100644 --- a/cmd/arc/services/blocktx.go +++ b/cmd/arc/services/blocktx.go @@ -84,7 +84,6 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err return nil, fmt.Errorf("failed to create blocktx store: %v", err) } - registerTxsChan := make(chan []byte, chanBufferSize) requestTxChannel := make(chan []byte, chanBufferSize) natsConnection, err := nats_connection.New(arcConfig.MessageQueue.URL, logger) @@ -94,7 +93,7 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err } if arcConfig.MessageQueue.Streaming.Enabled { - opts := []nats_jetstream.Option{nats_jetstream.WithSubscribedTopics(blocktx.RegisterTxTopic, blocktx.RequestTxTopic)} + opts := []nats_jetstream.Option{nats_jetstream.WithSubscribedTopics(blocktx.RequestTxTopic)} if arcConfig.MessageQueue.Streaming.FileStorage { opts = append(opts, nats_jetstream.WithFileStorage()) } @@ -104,7 +103,7 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err } mqClient, err = nats_jetstream.New(natsConnection, logger, - []string{blocktx.MinedTxsTopic, blocktx.RegisterTxTopic, blocktx.RequestTxTopic}, + []string{blocktx.MinedTxsTopic, blocktx.RequestTxTopic}, opts..., ) if err != nil { @@ -121,9 +120,8 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err processorOpts = append(processorOpts, blocktx.WithRetentionDays(btxConfig.RecordRetentionDays), - blocktx.WithRegisterTxsChan(registerTxsChan), blocktx.WithRequestTxChan(requestTxChannel), - blocktx.WithRegisterTxsInterval(btxConfig.RegisterTxsInterval), + blocktx.WithRequestTxsInterval(btxConfig.RequestTxsInterval), blocktx.WithMessageQueueClient(mqClient), ) diff --git a/cmd/arc/services/metamorph.go b/cmd/arc/services/metamorph.go index 1e7831e23..f82aa6f6a 100644 --- a/cmd/arc/services/metamorph.go +++ b/cmd/arc/services/metamorph.go @@ -120,7 +120,7 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore } mqClient, err = nats_jetstream.New(natsClient, logger, - []string{metamorph.MinedTxsTopic, metamorph.SubmitTxTopic, metamorph.RegisterTxTopic, metamorph.RequestTxTopic}, + []string{metamorph.MinedTxsTopic, metamorph.SubmitTxTopic, metamorph.RequestTxTopic}, opts..., ) if err != nil { @@ -196,7 +196,6 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore server, err = metamorph.NewServer(arcConfig.PrometheusEndpoint, arcConfig.GrpcMessageSize, logger, metamorphStore, processor, arcConfig.Tracing, optsServer...) - if err != nil { stopFn() return nil, fmt.Errorf("create GRPCServer failed: %v", err) diff --git a/config/config.go b/config/config.go index 171ddb8e5..13eb5cf47 100644 --- a/config/config.go +++ b/config/config.go @@ -113,7 +113,7 @@ type BlocktxConfig struct { HealthServerDialAddr string `mapstructure:"healthServerDialAddr"` Db *DbConfig `mapstructure:"db"` RecordRetentionDays int `mapstructure:"recordRetentionDays"` - RegisterTxsInterval time.Duration `mapstructure:"registerTxsInterval"` + RequestTxsInterval time.Duration `mapstructure:"requestTxsInterval"` MonitorPeers bool `mapstructure:"monitorPeers"` FillGapsInterval time.Duration `mapstructure:"fillGapsInterval"` MaxAllowedBlockHeightMismatch int `mapstructure:"maxAllowedBlockHeightMismatch"` diff --git a/config/defaults.go b/config/defaults.go index ca902819d..048c40dcf 100644 --- a/config/defaults.go +++ b/config/defaults.go @@ -113,7 +113,7 @@ func getBlocktxConfig() *BlocktxConfig { HealthServerDialAddr: "localhost:8006", Db: getDbConfig("blocktx"), RecordRetentionDays: 28, - RegisterTxsInterval: 10 * time.Second, + RequestTxsInterval: 10 * time.Second, MonitorPeers: false, FillGapsInterval: 15 * time.Minute, MaxAllowedBlockHeightMismatch: 3, diff --git a/config/example_config.yaml b/config/example_config.yaml index 281a953f9..9f1b82de1 100644 --- a/config/example_config.yaml +++ b/config/example_config.yaml @@ -102,7 +102,7 @@ blocktx: maxOpenConns: 80 # maximum open connections sslMode: disable recordRetentionDays: 28 # number of days for which data integrity is ensured - registerTxsInterval: 10s # time interval to read from the channel registered transactions + requestTxsInterval: 10s # time interval to register and request mined transactions monitorPeers: false # if enabled, peers which do not receive alive signal from nodes will be restarted fillGapsInterval: 15m # time interval to check and fill gaps in processed blocks maxAllowedBlockHeightMismatch: 3 # maximum number of blocks that can be ahead of current highest block in blocktx, used for merkle roots verification diff --git a/internal/blocktx/mq_client.go b/internal/blocktx/mq_client.go index 865befd4b..73005f8bf 100644 --- a/internal/blocktx/mq_client.go +++ b/internal/blocktx/mq_client.go @@ -7,9 +7,8 @@ import ( ) const ( - MinedTxsTopic = "mined-txs" - RegisterTxTopic = "register-tx" - RequestTxTopic = "request-tx" + MinedTxsTopic = "mined-txs" + RequestTxTopic = "request-tx" ) type MessageQueueClient interface { diff --git a/internal/blocktx/processor.go b/internal/blocktx/processor.go index 217a825dd..2fb9a0676 100644 --- a/internal/blocktx/processor.go +++ b/internal/blocktx/processor.go @@ -26,20 +26,19 @@ import ( ) var ( - ErrFailedToSubscribeToTopic = errors.New("failed to subscribe to register topic") + ErrFailedToSubscribeToTopic = errors.New("failed to subscribe to request-tx topic") ErrFailedToCreateBUMP = errors.New("failed to create new bump for tx hash from merkle tree and index") ErrFailedToGetStringFromBUMPHex = errors.New("failed to get string from bump for tx hash") ErrFailedToInsertBlockTransactions = errors.New("failed to insert block transactions") + ErrFailedToPublishMinedTransaction = errors.New("failed to publish mined transactions") ) const ( transactionStoringBatchsizeDefault = 8192 // power of 2 for easier memory allocation maxRequestBlocks = 10 maxBlocksInProgress = 1 - registerTxsIntervalDefault = time.Second * 10 - registerRequestTxsIntervalDefault = time.Second * 5 - registerTxsBatchSizeDefault = 100 - registerRequestTxBatchSizeDefault = 100 + requestTxsIntervalDefault = time.Second * 5 + requestTxBatchSizeDefault = 100 waitForBlockProcessing = 5 * time.Minute ) @@ -52,12 +51,9 @@ type Processor struct { transactionStorageBatchSize int dataRetentionDays int mqClient MessageQueueClient - registerTxsChan chan []byte requestTxChannel chan []byte - registerTxsInterval time.Duration - registerRequestTxsInterval time.Duration - registerTxsBatchSize int - registerRequestTxsBatchSize int + requestTxsInterval time.Duration + requestTxsBatchSize int tracingEnabled bool tracingAttributes []attribute.KeyValue processGuardsMap sync.Map @@ -89,10 +85,8 @@ func NewProcessor( blockRequestCh: blockRequestCh, blockProcessCh: blockProcessCh, transactionStorageBatchSize: transactionStoringBatchsizeDefault, - registerTxsInterval: registerTxsIntervalDefault, - registerRequestTxsInterval: registerRequestTxsIntervalDefault, - registerTxsBatchSize: registerTxsBatchSizeDefault, - registerRequestTxsBatchSize: registerRequestTxBatchSizeDefault, + requestTxsInterval: requestTxsIntervalDefault, + requestTxsBatchSize: requestTxBatchSizeDefault, hostname: hostname, stats: newProcessorStats(), statCollectionInterval: statCollectionIntervalDefault, @@ -112,15 +106,7 @@ func NewProcessor( } func (p *Processor) Start() error { - err := p.mqClient.Subscribe(RegisterTxTopic, func(msg []byte) error { - p.registerTxsChan <- msg - return nil - }) - if err != nil { - return errors.Join(ErrFailedToSubscribeToTopic, fmt.Errorf("topic: %s", RegisterTxTopic), err) - } - - err = p.mqClient.Subscribe(RequestTxTopic, func(msg []byte) error { + err := p.mqClient.Subscribe(RequestTxTopic, func(msg []byte) error { p.requestTxChannel <- msg return nil }) @@ -130,7 +116,6 @@ func (p *Processor) Start() error { p.StartBlockRequesting() p.StartBlockProcessing() - p.StartProcessRegisterTxs() p.StartProcessRequestTxs() return nil @@ -287,47 +272,12 @@ func (p *Processor) unlockBlock(ctx context.Context, hash *chainhash.Hash) { } } -func (p *Processor) StartProcessRegisterTxs() { - p.waitGroup.Add(1) - txHashes := make([][]byte, 0, p.registerTxsBatchSize) - - ticker := time.NewTicker(p.registerTxsInterval) - go func() { - defer p.waitGroup.Done() - for { - select { - case <-p.ctx.Done(): - return - case txHash := <-p.registerTxsChan: - txHashes = append(txHashes, txHash) - - if len(txHashes) < p.registerTxsBatchSize { - continue - } - - p.registerTransactions(txHashes[:]) - txHashes = txHashes[:0] - ticker.Reset(p.registerTxsInterval) - - case <-ticker.C: - if len(txHashes) == 0 { - continue - } - - p.registerTransactions(txHashes[:]) - txHashes = txHashes[:0] - ticker.Reset(p.registerTxsInterval) - } - } - }() -} - func (p *Processor) StartProcessRequestTxs() { p.waitGroup.Add(1) - txHashes := make([]*chainhash.Hash, 0, p.registerRequestTxsBatchSize) + txHashes := make([][]byte, 0, p.requestTxsBatchSize) - ticker := time.NewTicker(p.registerRequestTxsInterval) + ticker := time.NewTicker(p.requestTxsInterval) go func() { defer p.waitGroup.Done() @@ -337,15 +287,15 @@ func (p *Processor) StartProcessRequestTxs() { case <-p.ctx.Done(): return case txHash := <-p.requestTxChannel: - tx, err := chainhash.NewHash(txHash) + _, err := chainhash.NewHash(txHash) if err != nil { p.logger.Error("Failed to create hash from byte array", slog.String("err", err.Error())) continue } - txHashes = append(txHashes, tx) + txHashes = append(txHashes, txHash) - if len(txHashes) < p.registerRequestTxsBatchSize || len(txHashes) == 0 { + if len(txHashes) < p.requestTxsBatchSize { continue } @@ -355,8 +305,8 @@ func (p *Processor) StartProcessRequestTxs() { continue // retry, don't clear the txHashes slice } - txHashes = make([]*chainhash.Hash, 0, p.registerRequestTxsBatchSize) - ticker.Reset(p.registerRequestTxsInterval) + txHashes = make([][]byte, 0, p.requestTxsBatchSize) + ticker.Reset(p.requestTxsInterval) case <-ticker.C: if len(txHashes) == 0 { @@ -366,23 +316,25 @@ func (p *Processor) StartProcessRequestTxs() { err := p.publishMinedTxs(txHashes) if err != nil { p.logger.Error("failed to publish mined txs", slog.String("err", err.Error())) - ticker.Reset(p.registerRequestTxsInterval) + ticker.Reset(p.requestTxsInterval) continue // retry, don't clear the txHashes slice } - txHashes = make([]*chainhash.Hash, 0, p.registerRequestTxsBatchSize) - ticker.Reset(p.registerRequestTxsInterval) + txHashes = make([][]byte, 0, p.requestTxsBatchSize) + ticker.Reset(p.requestTxsInterval) } } }() } -func (p *Processor) publishMinedTxs(txHashes []*chainhash.Hash) error { - minedTxs, err := p.store.GetMinedTransactions(p.ctx, txHashes) +func (p *Processor) publishMinedTxs(txHashes [][]byte) error { + minedTxs, err := p.store.UpsertAndGetMinedTransactions(p.ctx, txHashes) if err != nil { return fmt.Errorf("failed to get mined transactions: %v", err) } + var publishErr error + for _, minedTx := range minedTxs { txBlock := &blocktx_api.TransactionBlock{ TransactionHash: minedTx.TxHash, @@ -390,30 +342,21 @@ func (p *Processor) publishMinedTxs(txHashes []*chainhash.Hash) error { BlockHeight: minedTx.BlockHeight, MerklePath: minedTx.MerklePath, } + err = p.mqClient.PublishMarshal(p.ctx, MinedTxsTopic, txBlock) + if err != nil { + p.logger.Error("failed to publish mined transaction", slog.String("err", err.Error())) + publishErr = err + } } - if err != nil { - return fmt.Errorf("failed to publish mined transactions: %v", err) + if publishErr != nil { + return ErrFailedToPublishMinedTransaction } return nil } -func (p *Processor) registerTransactions(txHashes [][]byte) { - updatedTxs, err := p.store.RegisterTransactions(p.ctx, txHashes) - if err != nil { - p.logger.Error("failed to register transactions", slog.String("err", err.Error())) - } - - if len(updatedTxs) > 0 { - err = p.publishMinedTxs(updatedTxs) - if err != nil { - p.logger.Error("failed to publish mined txs", slog.String("err", err.Error())) - } - } -} - func (p *Processor) buildMerkleTreeStoreChainHash(ctx context.Context, txids []*chainhash.Hash) []*chainhash.Hash { _, span := tracing.StartTracing(ctx, "buildMerkleTreeStoreChainHash", p.tracingEnabled, p.tracingAttributes...) defer tracing.EndTracing(span, nil) diff --git a/internal/blocktx/processor_opts.go b/internal/blocktx/processor_opts.go index ed1141893..db01758bd 100644 --- a/internal/blocktx/processor_opts.go +++ b/internal/blocktx/processor_opts.go @@ -25,21 +25,9 @@ func WithRetentionDays(dataRetentionDays int) func(handler *Processor) { } } -func WithRegisterTxsInterval(d time.Duration) func(handler *Processor) { +func WithRequestTxsInterval(d time.Duration) func(handler *Processor) { return func(p *Processor) { - p.registerTxsInterval = d - } -} - -func WithRegisterRequestTxsInterval(d time.Duration) func(handler *Processor) { - return func(p *Processor) { - p.registerRequestTxsInterval = d - } -} - -func WithRegisterTxsChan(registerTxsChan chan []byte) func(handler *Processor) { - return func(handler *Processor) { - handler.registerTxsChan = registerTxsChan + p.requestTxsInterval = d } } @@ -49,15 +37,9 @@ func WithRequestTxChan(requestTxChannel chan []byte) func(handler *Processor) { } } -func WithRegisterTxsBatchSize(size int) func(handler *Processor) { - return func(handler *Processor) { - handler.registerTxsBatchSize = size - } -} - -func WithRegisterRequestTxsBatchSize(size int) func(handler *Processor) { +func WithRequestTxsBatchSize(size int) func(handler *Processor) { return func(handler *Processor) { - handler.registerRequestTxsBatchSize = size + handler.requestTxsBatchSize = size } } diff --git a/internal/blocktx/processor_test.go b/internal/blocktx/processor_test.go index 9c931d683..ededa2672 100644 --- a/internal/blocktx/processor_test.go +++ b/internal/blocktx/processor_test.go @@ -390,71 +390,6 @@ func TestHandleBlockReorg(t *testing.T) { } } -func TestStartProcessRegisterTxs(t *testing.T) { - tt := []struct { - name string - registerErr error - - expectedRegisterTxsCalls int - }{ - { - name: "success", - - expectedRegisterTxsCalls: 2, - }, - { - name: "error", - registerErr: errors.New("failed to register"), - - expectedRegisterTxsCalls: 2, - }, - } - - for _, tc := range tt { - t.Run(tc.name, func(t *testing.T) { - // given - registerErrTest := tc.registerErr - storeMock := &storeMocks.BlocktxStoreMock{ - RegisterTransactionsFunc: func(_ context.Context, _ [][]byte) ([]*chainhash.Hash, error) { - return nil, registerErrTest - }, - GetBlockHashesProcessingInProgressFunc: func(_ context.Context, _ string) ([]*chainhash.Hash, error) { - return nil, nil - }, - } - - txChan := make(chan []byte, 10) - - txChan <- testdata.TX1Hash[:] - txChan <- testdata.TX2Hash[:] - txChan <- testdata.TX3Hash[:] - txChan <- testdata.TX4Hash[:] - - logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})) - - // when - sut, err := blocktx.NewProcessor( - logger, - storeMock, - nil, - nil, - blocktx.WithRegisterTxsInterval(time.Millisecond*20), - blocktx.WithRegisterTxsChan(txChan), - blocktx.WithRegisterTxsBatchSize(3), - ) - require.NoError(t, err) - - sut.StartProcessRegisterTxs() - - time.Sleep(120 * time.Millisecond) - sut.Shutdown() - - // then - require.Equal(t, tc.expectedRegisterTxsCalls, len(storeMock.RegisterTransactionsCalls())) - }) - } -} - func TestStartBlockRequesting(t *testing.T) { blockHash, err := chainhash.NewHashFromStr("00000000000007b1f872a8abe664223d65acd22a500b1b8eb5db3fe09a9837ff") require.NoError(t, err) @@ -635,13 +570,13 @@ func TestStartProcessRequestTxs(t *testing.T) { t.Run(tc.name, func(t *testing.T) { // given storeMock := &storeMocks.BlocktxStoreMock{ - GetMinedTransactionsFunc: func(_ context.Context, hashes []*chainhash.Hash) ([]store.GetMinedTransactionResult, error) { + UpsertAndGetMinedTransactionsFunc: func(_ context.Context, hashes [][]byte) ([]store.GetMinedTransactionResult, error) { for _, hash := range hashes { - require.Equal(t, testdata.TX1Hash, hash) + require.Equal(t, tc.requestedTx, hash) } return []store.GetMinedTransactionResult{{ - TxHash: testdata.TX1Hash[:], + TxHash: tc.requestedTx, BlockHash: testdata.Block1Hash[:], BlockHeight: 1, }}, tc.getMinedErr @@ -664,8 +599,8 @@ func TestStartProcessRequestTxs(t *testing.T) { // when sut, err := blocktx.NewProcessor(logger, storeMock, nil, nil, - blocktx.WithRegisterRequestTxsInterval(15*time.Millisecond), - blocktx.WithRegisterRequestTxsBatchSize(3), + blocktx.WithRequestTxsInterval(15*time.Millisecond), + blocktx.WithRequestTxsBatchSize(3), blocktx.WithRequestTxChan(requestTxChannel), blocktx.WithMessageQueueClient(mq)) require.NoError(t, err) @@ -680,7 +615,7 @@ func TestStartProcessRequestTxs(t *testing.T) { sut.Shutdown() // then - require.Equal(t, tc.expectedGetMinedCalls, len(storeMock.GetMinedTransactionsCalls())) + require.Equal(t, tc.expectedGetMinedCalls, len(storeMock.UpsertAndGetMinedTransactionsCalls())) require.Equal(t, tc.expectedPublishMinedCalls, len(mq.PublishMarshalCalls())) }) } @@ -696,12 +631,6 @@ func TestStart(t *testing.T) { { name: "success", }, - { - name: "error - subscribe mined txs", - topicErr: map[string]error{blocktx.RegisterTxTopic: errors.New("failed to subscribe")}, - - expectedError: blocktx.ErrFailedToSubscribeToTopic, - }, { name: "error - subscribe submit txs", topicErr: map[string]error{blocktx.RequestTxTopic: errors.New("failed to subscribe")}, diff --git a/internal/blocktx/store/mocks/blocktx_store_mock.go b/internal/blocktx/store/mocks/blocktx_store_mock.go index 8a9451d33..6c3373f86 100644 --- a/internal/blocktx/store/mocks/blocktx_store_mock.go +++ b/internal/blocktx/store/mocks/blocktx_store_mock.go @@ -48,9 +48,6 @@ var _ store.BlocktxStore = &BlocktxStoreMock{} // GetLongestChainFromHeightFunc: func(ctx context.Context, height uint64) ([]*blocktx_api.Block, error) { // panic("mock out the GetLongestChainFromHeight method") // }, -// GetMinedTransactionsFunc: func(ctx context.Context, hashes []*chainhash.Hash) ([]store.GetMinedTransactionResult, error) { -// panic("mock out the GetMinedTransactions method") -// }, // GetStaleChainBackFromHashFunc: func(ctx context.Context, hash []byte) ([]*blocktx_api.Block, error) { // panic("mock out the GetStaleChainBackFromHash method") // }, @@ -63,15 +60,15 @@ var _ store.BlocktxStore = &BlocktxStoreMock{} // PingFunc: func(ctx context.Context) error { // panic("mock out the Ping method") // }, -// RegisterTransactionsFunc: func(ctx context.Context, txHashes [][]byte) ([]*chainhash.Hash, error) { -// panic("mock out the RegisterTransactions method") -// }, // SetBlockProcessingFunc: func(ctx context.Context, hash *chainhash.Hash, processedBy string) (string, error) { // panic("mock out the SetBlockProcessing method") // }, // UpdateBlocksStatusesFunc: func(ctx context.Context, blockStatusUpdates []store.BlockStatusUpdate) error { // panic("mock out the UpdateBlocksStatuses method") // }, +// UpsertAndGetMinedTransactionsFunc: func(ctx context.Context, txHashes [][]byte) ([]store.GetMinedTransactionResult, error) { +// panic("mock out the UpsertAndGetMinedTransactions method") +// }, // UpsertBlockFunc: func(ctx context.Context, block *blocktx_api.Block) (uint64, error) { // panic("mock out the UpsertBlock method") // }, @@ -115,9 +112,6 @@ type BlocktxStoreMock struct { // GetLongestChainFromHeightFunc mocks the GetLongestChainFromHeight method. GetLongestChainFromHeightFunc func(ctx context.Context, height uint64) ([]*blocktx_api.Block, error) - // GetMinedTransactionsFunc mocks the GetMinedTransactions method. - GetMinedTransactionsFunc func(ctx context.Context, hashes []*chainhash.Hash) ([]store.GetMinedTransactionResult, error) - // GetStaleChainBackFromHashFunc mocks the GetStaleChainBackFromHash method. GetStaleChainBackFromHashFunc func(ctx context.Context, hash []byte) ([]*blocktx_api.Block, error) @@ -130,15 +124,15 @@ type BlocktxStoreMock struct { // PingFunc mocks the Ping method. PingFunc func(ctx context.Context) error - // RegisterTransactionsFunc mocks the RegisterTransactions method. - RegisterTransactionsFunc func(ctx context.Context, txHashes [][]byte) ([]*chainhash.Hash, error) - // SetBlockProcessingFunc mocks the SetBlockProcessing method. SetBlockProcessingFunc func(ctx context.Context, hash *chainhash.Hash, processedBy string) (string, error) // UpdateBlocksStatusesFunc mocks the UpdateBlocksStatuses method. UpdateBlocksStatusesFunc func(ctx context.Context, blockStatusUpdates []store.BlockStatusUpdate) error + // UpsertAndGetMinedTransactionsFunc mocks the UpsertAndGetMinedTransactions method. + UpsertAndGetMinedTransactionsFunc func(ctx context.Context, txHashes [][]byte) ([]store.GetMinedTransactionResult, error) + // UpsertBlockFunc mocks the UpsertBlock method. UpsertBlockFunc func(ctx context.Context, block *blocktx_api.Block) (uint64, error) @@ -213,13 +207,6 @@ type BlocktxStoreMock struct { // Height is the height argument value. Height uint64 } - // GetMinedTransactions holds details about calls to the GetMinedTransactions method. - GetMinedTransactions []struct { - // Ctx is the ctx argument value. - Ctx context.Context - // Hashes is the hashes argument value. - Hashes []*chainhash.Hash - } // GetStaleChainBackFromHash holds details about calls to the GetStaleChainBackFromHash method. GetStaleChainBackFromHash []struct { // Ctx is the ctx argument value. @@ -248,13 +235,6 @@ type BlocktxStoreMock struct { // Ctx is the ctx argument value. Ctx context.Context } - // RegisterTransactions holds details about calls to the RegisterTransactions method. - RegisterTransactions []struct { - // Ctx is the ctx argument value. - Ctx context.Context - // TxHashes is the txHashes argument value. - TxHashes [][]byte - } // SetBlockProcessing holds details about calls to the SetBlockProcessing method. SetBlockProcessing []struct { // Ctx is the ctx argument value. @@ -271,6 +251,13 @@ type BlocktxStoreMock struct { // BlockStatusUpdates is the blockStatusUpdates argument value. BlockStatusUpdates []store.BlockStatusUpdate } + // UpsertAndGetMinedTransactions holds details about calls to the UpsertAndGetMinedTransactions method. + UpsertAndGetMinedTransactions []struct { + // Ctx is the ctx argument value. + Ctx context.Context + // TxHashes is the txHashes argument value. + TxHashes [][]byte + } // UpsertBlock holds details about calls to the UpsertBlock method. UpsertBlock []struct { // Ctx is the ctx argument value. @@ -306,14 +293,13 @@ type BlocktxStoreMock struct { lockGetBlockHashesProcessingInProgress sync.RWMutex lockGetChainTip sync.RWMutex lockGetLongestChainFromHeight sync.RWMutex - lockGetMinedTransactions sync.RWMutex lockGetStaleChainBackFromHash sync.RWMutex lockGetStats sync.RWMutex lockMarkBlockAsDone sync.RWMutex lockPing sync.RWMutex - lockRegisterTransactions sync.RWMutex lockSetBlockProcessing sync.RWMutex lockUpdateBlocksStatuses sync.RWMutex + lockUpsertAndGetMinedTransactions sync.RWMutex lockUpsertBlock sync.RWMutex lockUpsertBlockTransactions sync.RWMutex lockVerifyMerkleRoots sync.RWMutex @@ -642,42 +628,6 @@ func (mock *BlocktxStoreMock) GetLongestChainFromHeightCalls() []struct { return calls } -// GetMinedTransactions calls GetMinedTransactionsFunc. -func (mock *BlocktxStoreMock) GetMinedTransactions(ctx context.Context, hashes []*chainhash.Hash) ([]store.GetMinedTransactionResult, error) { - if mock.GetMinedTransactionsFunc == nil { - panic("BlocktxStoreMock.GetMinedTransactionsFunc: method is nil but BlocktxStore.GetMinedTransactions was just called") - } - callInfo := struct { - Ctx context.Context - Hashes []*chainhash.Hash - }{ - Ctx: ctx, - Hashes: hashes, - } - mock.lockGetMinedTransactions.Lock() - mock.calls.GetMinedTransactions = append(mock.calls.GetMinedTransactions, callInfo) - mock.lockGetMinedTransactions.Unlock() - return mock.GetMinedTransactionsFunc(ctx, hashes) -} - -// GetMinedTransactionsCalls gets all the calls that were made to GetMinedTransactions. -// Check the length with: -// -// len(mockedBlocktxStore.GetMinedTransactionsCalls()) -func (mock *BlocktxStoreMock) GetMinedTransactionsCalls() []struct { - Ctx context.Context - Hashes []*chainhash.Hash -} { - var calls []struct { - Ctx context.Context - Hashes []*chainhash.Hash - } - mock.lockGetMinedTransactions.RLock() - calls = mock.calls.GetMinedTransactions - mock.lockGetMinedTransactions.RUnlock() - return calls -} - // GetStaleChainBackFromHash calls GetStaleChainBackFromHashFunc. func (mock *BlocktxStoreMock) GetStaleChainBackFromHash(ctx context.Context, hash []byte) ([]*blocktx_api.Block, error) { if mock.GetStaleChainBackFromHashFunc == nil { @@ -822,42 +772,6 @@ func (mock *BlocktxStoreMock) PingCalls() []struct { return calls } -// RegisterTransactions calls RegisterTransactionsFunc. -func (mock *BlocktxStoreMock) RegisterTransactions(ctx context.Context, txHashes [][]byte) ([]*chainhash.Hash, error) { - if mock.RegisterTransactionsFunc == nil { - panic("BlocktxStoreMock.RegisterTransactionsFunc: method is nil but BlocktxStore.RegisterTransactions was just called") - } - callInfo := struct { - Ctx context.Context - TxHashes [][]byte - }{ - Ctx: ctx, - TxHashes: txHashes, - } - mock.lockRegisterTransactions.Lock() - mock.calls.RegisterTransactions = append(mock.calls.RegisterTransactions, callInfo) - mock.lockRegisterTransactions.Unlock() - return mock.RegisterTransactionsFunc(ctx, txHashes) -} - -// RegisterTransactionsCalls gets all the calls that were made to RegisterTransactions. -// Check the length with: -// -// len(mockedBlocktxStore.RegisterTransactionsCalls()) -func (mock *BlocktxStoreMock) RegisterTransactionsCalls() []struct { - Ctx context.Context - TxHashes [][]byte -} { - var calls []struct { - Ctx context.Context - TxHashes [][]byte - } - mock.lockRegisterTransactions.RLock() - calls = mock.calls.RegisterTransactions - mock.lockRegisterTransactions.RUnlock() - return calls -} - // SetBlockProcessing calls SetBlockProcessingFunc. func (mock *BlocktxStoreMock) SetBlockProcessing(ctx context.Context, hash *chainhash.Hash, processedBy string) (string, error) { if mock.SetBlockProcessingFunc == nil { @@ -934,6 +848,42 @@ func (mock *BlocktxStoreMock) UpdateBlocksStatusesCalls() []struct { return calls } +// UpsertAndGetMinedTransactions calls UpsertAndGetMinedTransactionsFunc. +func (mock *BlocktxStoreMock) UpsertAndGetMinedTransactions(ctx context.Context, txHashes [][]byte) ([]store.GetMinedTransactionResult, error) { + if mock.UpsertAndGetMinedTransactionsFunc == nil { + panic("BlocktxStoreMock.UpsertAndGetMinedTransactionsFunc: method is nil but BlocktxStore.UpsertAndGetMinedTransactions was just called") + } + callInfo := struct { + Ctx context.Context + TxHashes [][]byte + }{ + Ctx: ctx, + TxHashes: txHashes, + } + mock.lockUpsertAndGetMinedTransactions.Lock() + mock.calls.UpsertAndGetMinedTransactions = append(mock.calls.UpsertAndGetMinedTransactions, callInfo) + mock.lockUpsertAndGetMinedTransactions.Unlock() + return mock.UpsertAndGetMinedTransactionsFunc(ctx, txHashes) +} + +// UpsertAndGetMinedTransactionsCalls gets all the calls that were made to UpsertAndGetMinedTransactions. +// Check the length with: +// +// len(mockedBlocktxStore.UpsertAndGetMinedTransactionsCalls()) +func (mock *BlocktxStoreMock) UpsertAndGetMinedTransactionsCalls() []struct { + Ctx context.Context + TxHashes [][]byte +} { + var calls []struct { + Ctx context.Context + TxHashes [][]byte + } + mock.lockUpsertAndGetMinedTransactions.RLock() + calls = mock.calls.UpsertAndGetMinedTransactions + mock.lockUpsertAndGetMinedTransactions.RUnlock() + return calls +} + // UpsertBlock calls UpsertBlockFunc. func (mock *BlocktxStoreMock) UpsertBlock(ctx context.Context, block *blocktx_api.Block) (uint64, error) { if mock.UpsertBlockFunc == nil { diff --git a/internal/blocktx/store/postgresql/fixtures/register_transactions/blocktx.blocks.yaml b/internal/blocktx/store/postgresql/fixtures/register_transactions/blocktx.blocks.yaml deleted file mode 100644 index 2cf6008e8..000000000 --- a/internal/blocktx/store/postgresql/fixtures/register_transactions/blocktx.blocks.yaml +++ /dev/null @@ -1,11 +0,0 @@ -- inserted_at: 2024-01-10 13:06:03.375 - id: 9736 - hash: 0x6258b02da70a3e367e4c993b049fa9b76ef8f090ef9fd2010000000000000000 - prevhash: 0x000000000000000001a7aa3999410ca53fb645851531ec0a7a5cb9ce2d4ae313 - merkleroot: 0x0d72bf92e7862df18d1935c171ca4dbb70d268b0f025e46716e913bc7e4f2bdb - height: 826481 - processed_at: 2024-01-10 13:06:06.122 - size: 108689370 - tx_count: 799 - orphanedyn: FALSE - merkle_path: "" diff --git a/internal/blocktx/store/postgresql/fixtures/register_transactions/blocktx.transactions.yaml b/internal/blocktx/store/postgresql/fixtures/register_transactions/blocktx.transactions.yaml deleted file mode 100644 index de3289b32..000000000 --- a/internal/blocktx/store/postgresql/fixtures/register_transactions/blocktx.transactions.yaml +++ /dev/null @@ -1,32 +0,0 @@ -- id: 110383995 - hash: 0x76732b80598326a18d3bf0a86518adbdf95d0ddc6ff6693004440f4776168c3b - inserted_at: 2024-01-09 13:00:00 - is_registered: FALSE -- id: 115361489 - hash: 0x164e85a5d5bc2b2372e8feaa266e5e4b7d0808f8d2b784fb1f7349c4726392b0 - inserted_at: 2024-01-10 12:00:00 - is_registered: FALSE -- id: 115361490 - hash: 0xb4201cc6fc5768abff14adf75042ace6061da9176ee5bb943291b9ba7d7f5743 - inserted_at: 2024-01-10 12:00:00 - is_registered: TRUE -- id: 115361491 - hash: 0x37bd6c87927e75faeb3b3c939f64721cda48e1bb98742676eebe83aceee1a669 - inserted_at: 2024-01-10 12:00:00 - is_registered: TRUE -- id: 115361492 - hash: 0x952f80e20a0330f3b9c2dfd1586960064e797218b5c5df665cada221452c17eb - inserted_at: 2024-01-10 12:00:00 - is_registered: TRUE -- id: 115361493 - hash: 0x861a281b27de016e50887288de87eab5ca56a1bb172cdff6dba965474ce0f608 - inserted_at: 2024-01-10 12:00:00 - is_registered: TRUE -- id: 115361494 - hash: 0x9421cc760c5405af950a76dc3e4345eaefd4e7322f172a3aee5e0ddc7b4f8313 - inserted_at: 2024-01-10 12:00:00 - is_registered: FALSE -- id: 115361495 - hash: 0x8b7d038db4518ac4c665abfc5aeaacbd2124ad8ca70daa8465ed2c4427c41b9b - inserted_at: 2024-01-10 12:00:00 - is_registered: FALSE diff --git a/internal/blocktx/store/postgresql/postgres_test.go b/internal/blocktx/store/postgresql/postgres_test.go index 18bfd6cbd..8ff4f954c 100644 --- a/internal/blocktx/store/postgresql/postgres_test.go +++ b/internal/blocktx/store/postgresql/postgres_test.go @@ -9,8 +9,6 @@ import ( "testing" "time" - "github.com/bitcoin-sv/arc/internal/testdata" - "github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api" "github.com/bitcoin-sv/arc/internal/blocktx/store" _ "github.com/golang-migrate/migrate/v4/source/file" @@ -337,7 +335,7 @@ func TestPostgresDB(t *testing.T) { // when // get mined transaction and corresponding block - minedTxs, err := postgresDB.GetMinedTransactions(ctx, []*chainhash.Hash{txHash1, txHash2, txHash3, txHash4}) + minedTxs, err := postgresDB.UpsertAndGetMinedTransactions(ctx, [][]byte{txHash1[:], txHash2[:], txHash3[:], txHash4[:]}) require.NoError(t, err) // then @@ -681,92 +679,8 @@ func TestPostgresStore_UpsertBlockTransactions_CompetingBlocks(t *testing.T) { require.NoError(t, err) // then - actual, err := sut.GetMinedTransactions(ctx, []*chainhash.Hash{txHash}) + actual, err := sut.UpsertAndGetMinedTransactions(ctx, [][]byte{txHash[:]}) require.NoError(t, err) require.ElementsMatch(t, expected, actual) } - -func TestPostgresStore_RegisterTransactions(t *testing.T) { - if testing.Short() { - t.Skip("skipping integration test") - } - - tcs := []struct { - name string - txs [][]byte - }{ - { - name: "register new transactions", - txs: [][]byte{ - testdata.TX1Hash[:], - testdata.TX2Hash[:], - testdata.TX3Hash[:], - testdata.TX4Hash[:], - }, - }, - { - name: "register already known, not registered transactions", - txs: [][]byte{ - testutils.RevChainhash(t, "76732b80598326a18d3bf0a86518adbdf95d0ddc6ff6693004440f4776168c3b")[:], - testutils.RevChainhash(t, "164e85a5d5bc2b2372e8feaa266e5e4b7d0808f8d2b784fb1f7349c4726392b0")[:], - testutils.RevChainhash(t, "8b7d038db4518ac4c665abfc5aeaacbd2124ad8ca70daa8465ed2c4427c41b9b")[:], - testutils.RevChainhash(t, "9421cc760c5405af950a76dc3e4345eaefd4e7322f172a3aee5e0ddc7b4f8313")[:], - }, - }, - { - name: "register already registered transactions", - txs: [][]byte{ - testutils.RevChainhash(t, "b4201cc6fc5768abff14adf75042ace6061da9176ee5bb943291b9ba7d7f5743")[:], - testutils.RevChainhash(t, "37bd6c87927e75faeb3b3c939f64721cda48e1bb98742676eebe83aceee1a669")[:], - testutils.RevChainhash(t, "952f80e20a0330f3b9c2dfd1586960064e797218b5c5df665cada221452c17eb")[:], - testutils.RevChainhash(t, "861a281b27de016e50887288de87eab5ca56a1bb172cdff6dba965474ce0f608")[:], - }, - }, - } - - // common setup for test cases - ctx, now, sut := setupPostgresTest(t) - defer sut.Close() - - for _, tc := range tcs { - t.Run(tc.name, func(t *testing.T) { - // given - prepareDb(t, sut.db, "fixtures/register_transactions") - - // when - result, err := sut.RegisterTransactions(ctx, tc.txs) - require.NoError(t, err) - require.NotNil(t, result) - - resultmap := make(map[chainhash.Hash]bool) - for _, h := range result { - resultmap[*h] = false - } - - // then - // assert data are correctly saved in the store - d, err := sqlx.Open("postgres", dbInfo) - require.NoError(t, err) - - updatedCounter := 0 - for _, hash := range tc.txs { - var storedtx Transaction - err = d.Get(&storedtx, "SELECT hash, is_registered from blocktx.transactions WHERE hash=$1", hash) - require.NoError(t, err) - - require.NotNil(t, storedtx) - require.True(t, storedtx.IsRegistered) - - if _, found := resultmap[chainhash.Hash(storedtx.Hash)]; found { - require.Greater(t, storedtx.InsertedAt, now) - updatedCounter++ - } else { - require.Less(t, storedtx.InsertedAt, now) - } - } - - require.Equal(t, len(result), updatedCounter) - }) - } -} diff --git a/internal/blocktx/store/postgresql/register_transactions.go b/internal/blocktx/store/postgresql/register_transactions.go deleted file mode 100644 index 772827d43..000000000 --- a/internal/blocktx/store/postgresql/register_transactions.go +++ /dev/null @@ -1,47 +0,0 @@ -package postgresql - -import ( - "context" - "errors" - "github.com/bitcoin-sv/arc/internal/blocktx/store" - "time" - - "github.com/lib/pq" - "github.com/libsv/go-p2p/chaincfg/chainhash" -) - -func (p *PostgreSQL) RegisterTransactions(ctx context.Context, txHashes [][]byte) ([]*chainhash.Hash, error) { - const q = ` - INSERT INTO blocktx.transactions (hash, is_registered) - SELECT hash, TRUE - FROM UNNEST ($1::BYTEA[]) as hash - ON CONFLICT (hash) DO UPDATE - SET is_registered = TRUE - RETURNING hash, inserted_at - ` - - now := p.now() - rows, err := p.db.QueryContext(ctx, q, pq.Array(txHashes)) - if err != nil { - return nil, errors.Join(store.ErrFailedToInsertTransactions, err) - } - defer rows.Close() - - updatedTxs := make([]*chainhash.Hash, 0) - for rows.Next() { - var hash []byte - var insertedAt time.Time - - err = rows.Scan(&hash, &insertedAt) - if err != nil { - return nil, errors.Join(store.ErrFailedToGetRows, err) - } - - if insertedAt.Before(now) { - ch, _ := chainhash.NewHash(hash) - updatedTxs = append(updatedTxs, ch) - } - } - - return updatedTxs, nil -} diff --git a/internal/blocktx/store/postgresql/get_mined_transactions.go b/internal/blocktx/store/postgresql/upsert_and_get_mined_transactions.go similarity index 53% rename from internal/blocktx/store/postgresql/get_mined_transactions.go rename to internal/blocktx/store/postgresql/upsert_and_get_mined_transactions.go index 67032c79a..58ce0ac61 100644 --- a/internal/blocktx/store/postgresql/get_mined_transactions.go +++ b/internal/blocktx/store/postgresql/upsert_and_get_mined_transactions.go @@ -2,28 +2,30 @@ package postgresql import ( "context" - - "github.com/lib/pq" - "github.com/libsv/go-p2p/chaincfg/chainhash" + "errors" "github.com/bitcoin-sv/arc/internal/blocktx/store" + "github.com/bitcoin-sv/arc/internal/tracing" + "github.com/lib/pq" ) -func (p *PostgreSQL) GetMinedTransactions(ctx context.Context, hashes []*chainhash.Hash) (result []store.GetMinedTransactionResult, err error) { - ctx, span := tracing.StartTracing(ctx, "GetMinedTransactions", p.tracingEnabled, p.tracingAttributes...) +func (p *PostgreSQL) UpsertAndGetMinedTransactions(ctx context.Context, txHashes [][]byte) (result []store.GetMinedTransactionResult, err error) { + ctx, span := tracing.StartTracing(ctx, "UpsertAndGetMinedTransactions", p.tracingEnabled, p.tracingAttributes...) defer func() { tracing.EndTracing(span, err) }() - var hashSlice [][]byte - for _, hash := range hashes { - hashSlice = append(hashSlice, hash[:]) - } - - result = make([]store.GetMinedTransactionResult, 0, len(hashSlice)) + const q = ` + WITH inserted_transactions AS ( + INSERT INTO blocktx.transactions (hash, is_registered) + SELECT hash, TRUE + FROM UNNEST ($1::BYTEA[]) as hash + ON CONFLICT (hash) DO UPDATE + SET is_registered = TRUE + RETURNING hash + ) - q := ` SELECT t.hash, b.hash, @@ -32,15 +34,16 @@ func (p *PostgreSQL) GetMinedTransactions(ctx context.Context, hashes []*chainha FROM blocktx.transactions AS t JOIN blocktx.block_transactions_map AS m ON t.id = m.txid JOIN blocktx.blocks AS b ON m.blockid = b.id - WHERE t.hash = ANY($1) + WHERE t.hash = ANY(SELECT hash FROM inserted_transactions) ` - rows, err := p.db.QueryContext(ctx, q, pq.Array(hashSlice)) + rows, err := p.db.QueryContext(ctx, q, pq.Array(txHashes)) if err != nil { - return nil, err + return nil, errors.Join(store.ErrFailedToInsertTransactions, err) } defer rows.Close() + result = make([]store.GetMinedTransactionResult, 0, len(txHashes)) for rows.Next() { var txHash []byte var blockHash []byte diff --git a/internal/blocktx/store/store.go b/internal/blocktx/store/store.go index 97d45b7dd..0ca0dea86 100644 --- a/internal/blocktx/store/store.go +++ b/internal/blocktx/store/store.go @@ -28,7 +28,7 @@ type Stats struct { } type BlocktxStore interface { - RegisterTransactions(ctx context.Context, txHashes [][]byte) (updatedTxs []*chainhash.Hash, err error) + UpsertAndGetMinedTransactions(ctx context.Context, txHashes [][]byte) ([]GetMinedTransactionResult, error) GetBlock(ctx context.Context, hash *chainhash.Hash) (*blocktx_api.Block, error) GetBlockByHeight(ctx context.Context, height uint64, status blocktx_api.Status) (*blocktx_api.Block, error) GetChainTip(ctx context.Context) (*blocktx_api.Block, error) @@ -37,7 +37,6 @@ type BlocktxStore interface { MarkBlockAsDone(ctx context.Context, hash *chainhash.Hash, size uint64, txCount uint64) error GetBlockGaps(ctx context.Context, heightRange int) ([]*BlockGap, error) ClearBlocktxTable(ctx context.Context, retentionDays int32, table string) (*blocktx_api.RowsAffectedResponse, error) - GetMinedTransactions(ctx context.Context, hashes []*chainhash.Hash) ([]GetMinedTransactionResult, error) GetLongestChainFromHeight(ctx context.Context, height uint64) ([]*blocktx_api.Block, error) GetStaleChainBackFromHash(ctx context.Context, hash []byte) ([]*blocktx_api.Block, error) UpdateBlocksStatuses(ctx context.Context, blockStatusUpdates []BlockStatusUpdate) error diff --git a/internal/metamorph/mq_client.go b/internal/metamorph/mq_client.go index c960ba8f6..15c6eed8f 100644 --- a/internal/metamorph/mq_client.go +++ b/internal/metamorph/mq_client.go @@ -3,10 +3,9 @@ package metamorph import "context" const ( - SubmitTxTopic = "submit-tx" - MinedTxsTopic = "mined-txs" - RegisterTxTopic = "register-tx" - RequestTxTopic = "request-tx" + SubmitTxTopic = "submit-tx" + MinedTxsTopic = "mined-txs" + RequestTxTopic = "request-tx" ) type MessageQueueClient interface { diff --git a/internal/metamorph/processor.go b/internal/metamorph/processor.go index 08fb89272..a2c4be7a6 100644 --- a/internal/metamorph/processor.go +++ b/internal/metamorph/processor.go @@ -713,7 +713,7 @@ func (p *Processor) ProcessTransaction(ctx context.Context, req *ProcessorReques } // register transaction in blocktx using message queue - if err = p.mqClient.Publish(ctx, RegisterTxTopic, req.Data.Hash[:]); err != nil { + if err = p.mqClient.Publish(ctx, RequestTxTopic, req.Data.Hash[:]); err != nil { p.logger.Error("failed to register tx in blocktx", slog.String("hash", req.Data.Hash.String()), slog.String("err", err.Error())) } @@ -770,7 +770,7 @@ func (p *Processor) ProcessTransactions(ctx context.Context, sReq []*store.Data) for _, data := range sReq { // register transaction in blocktx using message queue - err = p.mqClient.Publish(ctx, RegisterTxTopic, data.Hash[:]) + err = p.mqClient.Publish(ctx, RequestTxTopic, data.Hash[:]) if err != nil { p.logger.Error("Failed to register tx in blocktx", slog.String("hash", data.Hash.String()), slog.String("err", err.Error())) } diff --git a/test/config/config.yaml b/test/config/config.yaml index cab172f84..40ad5acb2 100644 --- a/test/config/config.yaml +++ b/test/config/config.yaml @@ -93,7 +93,7 @@ blocktx: sslMode: disable recordRetentionDays: 28 profilerAddr: localhost:9993 - registerTxsInterval: 200ms + requestTxsInterval: 200ms fillGapsInterval: 15m maxAllowedBlockHeightMismatch: 3