Skip to content

Commit

Permalink
ARCO-196: unify tests and refactor errors (#590)
Browse files Browse the repository at this point in the history
* ARCO-196: refactor blockTx

* ARCO-196: refactor broadcaster

* ARCO-196: refactor callbacker

* ARCO-196: refactor message queue

* ARCO-196: refactor validator

* ARCO-196: refactor metamorph

* ARCO-196: reactor new errors

* ARCO-196: fix linter errors

* ARCO-196: fix e2e test

* ARCO-196: fix expected error in e2e test

* ARCO-196: fix PR comments

* ARCO-196: remove unused variables

* ARCO-196: fix PR comments

* ARCO-196: import fmt

* ARCO-196: fix unit test
  • Loading branch information
pawellewandowski98 authored Sep 23, 2024
1 parent 977d5ae commit 6e07a5c
Show file tree
Hide file tree
Showing 59 changed files with 1,013 additions and 572 deletions.
55 changes: 27 additions & 28 deletions internal/blocktx/health_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,11 @@ func TestCheck(t *testing.T) {

for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
const batchSize = 4

// given
storeMock := &storeMocks.BlocktxStoreMock{
GetBlockGapsFunc: func(ctx context.Context, heightRange int) ([]*store.BlockGap, error) {
return nil, nil
},

PingFunc: func(ctx context.Context) error {
return tc.pingErr
},
Expand All @@ -67,8 +65,6 @@ func TestCheck(t *testing.T) {
}

logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))
processor, err := blocktx.NewProcessor(logger, storeMock, nil, nil, blocktx.WithTransactionBatchSize(batchSize))
require.NoError(t, err)
pm := &mocks.PeerManagerMock{GetPeersFunc: func() []p2p.PeerI {
return []p2p.PeerI{&mocks.PeerMock{
IsHealthyFunc: func() bool {
Expand All @@ -79,14 +75,16 @@ func TestCheck(t *testing.T) {
},
}}
}}
server := blocktx.NewServer(storeMock, logger, pm, 0)
resp, err := server.Check(context.Background(), req)
require.NoError(t, err)
sut := blocktx.NewServer(storeMock, logger, pm, 0)

require.Equal(t, tc.expectedStatus, resp.Status)
// when
resp, err := sut.Check(context.Background(), req)

processor.Shutdown()
// then
require.NoError(t, err)
require.Equal(t, tc.expectedStatus, resp.Status)
})

}
}

Expand Down Expand Up @@ -123,13 +121,11 @@ func TestWatch(t *testing.T) {

for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
const batchSize = 4

// given
storeMock := &storeMocks.BlocktxStoreMock{
GetBlockGapsFunc: func(ctx context.Context, heightRange int) ([]*store.BlockGap, error) {
return nil, nil
},

PingFunc: func(ctx context.Context) error {
return tc.pingErr
},
Expand All @@ -140,19 +136,19 @@ func TestWatch(t *testing.T) {
}

logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))
processor, err := blocktx.NewProcessor(logger, storeMock, nil, nil, blocktx.WithTransactionBatchSize(batchSize))
require.NoError(t, err)
pm := &mocks.PeerManagerMock{GetPeersFunc: func() []p2p.PeerI {
return []p2p.PeerI{&mocks.PeerMock{
IsHealthyFunc: func() bool {
return false
},
ConnectedFunc: func() bool {
return false
},
}}
}}
server := blocktx.NewServer(storeMock, logger, pm, 0)

pm := &mocks.PeerManagerMock{
GetPeersFunc: func() []p2p.PeerI {
return []p2p.PeerI{
&mocks.PeerMock{
IsHealthyFunc: func() bool { return false },
ConnectedFunc: func() bool { return false },
},
}
},
}

sut := blocktx.NewServer(storeMock, logger, pm, 0)

watchServer := &mocks.HealthWatchServerMock{
SendFunc: func(healthCheckResponse *grpc_health_v1.HealthCheckResponse) error {
Expand All @@ -161,9 +157,12 @@ func TestWatch(t *testing.T) {
},
}

err = server.Watch(req, watchServer)
// when
err := sut.Watch(req, watchServer)

// then
require.NoError(t, err)
processor.Shutdown()
})

}
}
7 changes: 4 additions & 3 deletions internal/blocktx/peer_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/libsv/go-p2p/wire"
)

var ErrUnableToCastWireMessage = errors.New("unable to cast wire.Message to p2p.BlockMessage")

func init() {
// override the default wire block handler with our own that streams and stores only the transaction ids
wire.SetExternalHandler(wire.CmdBlock, func(reader io.Reader, length uint64, bytesRead int) (int, wire.Message, []byte, error) {
Expand Down Expand Up @@ -112,9 +114,8 @@ func (ph *PeerHandler) HandleBlockAnnouncement(msg *wire.InvVect, peer p2p.PeerI
func (ph *PeerHandler) HandleBlock(wireMsg wire.Message, _ p2p.PeerI) error {
msg, ok := wireMsg.(*p2p.BlockMessage)
if !ok {
errMsg := "unable to cast wire.Message to p2p.BlockMessage"
ph.logger.Debug(errMsg)
return errors.New(errMsg)
ph.logger.Debug(ErrUnableToCastWireMessage.Error())
return ErrUnableToCastWireMessage
}

ph.blockProcessCh <- msg
Expand Down
19 changes: 13 additions & 6 deletions internal/blocktx/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ import (

var tracer trace.Tracer

var (
ErrFailedToSubscribeToTopic = errors.New("failed to subscribe to register topic")
ErrFailedToCreateBUMP = errors.New("failed to create new bump for tx hash from merkle tree and index")
ErrFailedToGetStringFromBUMPHex = errors.New("failed to get string from bump for tx hash")
ErrFailedToInsertBlockTransactions = errors.New("failed to insert block transactions")
)

const (
transactionStoringBatchsizeDefault = 8192 // power of 2 for easier memory allocation
maxRequestBlocks = 1
Expand Down Expand Up @@ -99,15 +106,15 @@ func (p *Processor) Start() error {
return nil
})
if err != nil {
return fmt.Errorf("failed to subscribe to %s topic: %w", RegisterTxTopic, err)
return errors.Join(ErrFailedToSubscribeToTopic, fmt.Errorf("topic: %s", RegisterTxTopic), err)
}

err = p.mqClient.Subscribe(RequestTxTopic, func(msg []byte) error {
p.requestTxChannel <- msg
return nil
})
if err != nil {
return fmt.Errorf("failed to subscribe to %s topic: %w", RequestTxTopic, err)
return errors.Join(ErrFailedToSubscribeToTopic, fmt.Errorf("topic: %s", RequestTxTopic), err)
}

p.StartBlockRequesting()
Expand Down Expand Up @@ -626,12 +633,12 @@ func (p *Processor) markTransactionsAsMined(ctx context.Context, blockId uint64,

bump, err := bc.NewBUMPFromMerkleTreeAndIndex(blockHeight, merkleTree, uint64(txIndex))
if err != nil {
return fmt.Errorf("failed to create new bump for tx hash %s from merkle tree and index at block height %d: %v", hash.String(), blockHeight, err)
return errors.Join(ErrFailedToCreateBUMP, err)
}

bumpHex, err := bump.String()
if err != nil {
return fmt.Errorf("failed to get string from bump for tx hash %s at block height %d: %v", hash.String(), blockHeight, err)
return errors.Join(ErrFailedToGetStringFromBUMPHex, err)
}

txs = append(txs, store.TxWithMerklePath{
Expand All @@ -642,7 +649,7 @@ func (p *Processor) markTransactionsAsMined(ctx context.Context, blockId uint64,
if (txIndex+1)%p.transactionStorageBatchSize == 0 {
updateResp, err := p.store.UpsertBlockTransactions(ctx, blockId, txs)
if err != nil {
return fmt.Errorf("failed to insert block transactions at block height %d: %v", blockHeight, err)
return errors.Join(ErrFailedToInsertBlockTransactions, err)
}
// free up memory
txs = txs[:0]
Expand Down Expand Up @@ -675,7 +682,7 @@ func (p *Processor) markTransactionsAsMined(ctx context.Context, blockId uint64,
// update all remaining transactions
updateResp, err := p.store.UpsertBlockTransactions(ctx, blockId, txs)
if err != nil {
return fmt.Errorf("failed to insert block transactions at block height %d: %v", blockHeight, err)
return errors.Join(ErrFailedToInsertBlockTransactions, fmt.Errorf("block height: %d", blockHeight), err)
}

for _, updResp := range updateResp {
Expand Down
11 changes: 11 additions & 0 deletions internal/blocktx/processor_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,31 @@ import (
)

func TestExtractHeight(t *testing.T) {
// given
tx, err := sdkTx.NewTransactionFromHex("01000000010000000000000000000000000000000000000000000000000000000000000000ffffffff570350cc0b041547b5630cfabe6d6d0000000000000000000000000000000000000000000000000000000000000000010000000000000047ed20542096bd0000000000143362663865373833636662643732306431383436000000000140be4025000000001976a914c9b0abe09b7dd8e9d1e8c1e3502d32ab0d7119e488ac00000000")
require.NoError(t, err)

// when
height := extractHeightFromCoinbaseTx(tx)

// then
assert.Equalf(t, uint64(773200), height, "height should be 773200, got %d", height)
}

func TestExtractHeightForRegtest(t *testing.T) {
// given
tx, err := sdkTx.NewTransactionFromHex("02000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0502dc070101ffffffff012f500900000000002321032efe256e14fd77eea05d0453374f8920e0a7a4a573bb3937ef3f567f3937129cac00000000")
require.NoError(t, err)

// when
height := extractHeightFromCoinbaseTx(tx)

// then
assert.Equalf(t, uint64(2012), height, "height should be 2012, got %d", height)
}

func TestGetLowestHeight(t *testing.T) {
// given
blocks := []*blocktx_api.Block{
{
Height: 123,
Expand All @@ -47,8 +54,10 @@ func TestGetLowestHeight(t *testing.T) {
},
}

// when
lowestHeight := getLowestHeight(blocks)

// then
require.Equal(t, uint64(4), lowestHeight)
}

Expand Down Expand Up @@ -118,8 +127,10 @@ func TestChainWork(t *testing.T) {
name := fmt.Sprintf("should evaluate bits %d from block %d as chainwork %s",
params.bits, params.height, params.expectedChainWork)
t.Run(name, func(t *testing.T) {
// when
cw := calculateChainwork(params.bits)

// then
require.Equal(t, cw.String(), params.expectedChainWork)
})
}
Expand Down
Loading

0 comments on commit 6e07a5c

Please sign in to comment.