Skip to content

Commit

Permalink
feat: improve tests, add a ticker
Browse files Browse the repository at this point in the history
  • Loading branch information
kuba-4chain committed Aug 22, 2024
1 parent ca40840 commit a93fe48
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 16 deletions.
9 changes: 4 additions & 5 deletions internal/blocktx/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func (ph *Processor) StartProcessRequestTxs() {
err = ph.publishMinedTxs(txHashes)
if err != nil {
ph.logger.Error("failed to publish mined txs", slog.String("err", err.Error()))
continue
continue // retry, don't clear the txHashes slice
}

txHashes = make([]*chainhash.Hash, 0, ph.registerRequestTxsBatchSize)
Expand All @@ -364,7 +364,8 @@ func (ph *Processor) StartProcessRequestTxs() {
err := ph.publishMinedTxs(txHashes)
if err != nil {
ph.logger.Error("failed to publish mined txs", slog.String("err", err.Error()))
continue
ticker.Reset(ph.registerRequestTxsInterval)
continue // retry, don't clear the txHashes slice
}

txHashes = make([]*chainhash.Hash, 0, ph.registerRequestTxsBatchSize)
Expand Down Expand Up @@ -674,9 +675,7 @@ func ExtractHeightFromCoinbaseTx(tx *bt.Tx) uint64 {
}

func (ph *Processor) Shutdown() {
if ph.cancelAll != nil {
ph.cancelAll()
}
ph.cancelAll()
ph.waitGroup.Wait()
}

Expand Down
19 changes: 8 additions & 11 deletions internal/blocktx/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@ func TestHandleBlock(t *testing.T) {
peerHandler := blocktx.NewPeerHandler(logger, blockRequestCh, blockProcessCh)
processor, err := blocktx.NewProcessor(logger, storeMock, blockRequestCh, blockProcessCh, blocktx.WithTransactionBatchSize(batchSize), blocktx.WithMessageQueueClient(mq))
require.NoError(t, err)
defer processor.Shutdown()

processor.StartBlockProcessing()

Expand Down Expand Up @@ -258,6 +257,8 @@ func TestHandleBlock(t *testing.T) {
err = peerHandler.HandleBlock(blockMessage, peer)
require.NoError(t, err)
time.Sleep(20 * time.Millisecond)
processor.Shutdown()

require.ElementsMatch(t, expectedInsertedTransactions, insertedBlockTransactions)
})
}
Expand Down Expand Up @@ -547,7 +548,7 @@ func TestStartProcessRequestTxs(t *testing.T) {
getMinedErr: errors.New("get mined error"),
requestedTx: testdata.TX1Hash[:],

expectedGetMinedCalls: 4,
expectedGetMinedCalls: 4, // 3 times on the channel message, 1 time on ticker
expectedPublishMinedCalls: 0,
},
{
Expand All @@ -556,7 +557,7 @@ func TestStartProcessRequestTxs(t *testing.T) {
publishMinedErr: errors.New("publish mined error"),
requestedTx: testdata.TX1Hash[:],

expectedGetMinedCalls: 4,
expectedGetMinedCalls: 4, // 3 times on the channel message, 1 time on ticker
expectedPublishMinedCalls: 4,
},
{
Expand Down Expand Up @@ -587,8 +588,6 @@ func TestStartProcessRequestTxs(t *testing.T) {

for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
publishMinedErrTest := tc.publishMinedErr
getMinedErrTest := tc.getMinedErr
storeMock := &storeMocks.BlocktxStoreMock{
GetMinedTransactionsFunc: func(ctx context.Context, hashes []*chainhash.Hash) ([]store.GetMinedTransactionResult, error) {
for _, hash := range hashes {
Expand All @@ -599,13 +598,13 @@ func TestStartProcessRequestTxs(t *testing.T) {
TxHash: testdata.TX1Hash[:],
BlockHash: testdata.Block1Hash[:],
BlockHeight: 1,
}}, getMinedErrTest
}}, tc.getMinedErr
},
}

mq := &mocks.MessageQueueClientMock{
PublishMarshalFunc: func(topic string, m protoreflect.ProtoMessage) error {
return publishMinedErrTest
return tc.publishMinedErr
},
}

Expand All @@ -614,7 +613,7 @@ func TestStartProcessRequestTxs(t *testing.T) {
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))
processor, err := blocktx.NewProcessor(logger, storeMock,
nil, nil,
blocktx.WithRegisterRequestTxsInterval(20*time.Millisecond),
blocktx.WithRegisterRequestTxsInterval(15*time.Millisecond),
blocktx.WithRegisterRequestTxsBatchSize(3),
blocktx.WithRequestTxChan(requestTxChannel),
blocktx.WithMessageQueueClient(mq))
Expand All @@ -624,10 +623,8 @@ func TestStartProcessRequestTxs(t *testing.T) {
requestTxChannel <- tc.requestedTx
}

processor.StartProcessRequestTxs()

// call tested function
require.NoError(t, err)
processor.StartProcessRequestTxs()
time.Sleep(20 * time.Millisecond)
processor.Shutdown()

Expand Down

0 comments on commit a93fe48

Please sign in to comment.