Skip to content

Commit

Permalink
refactor(blocktx): move merkle_path to block_transactions_map table (#…
Browse files Browse the repository at this point in the history
…582)

* feat: blocktx migrations and udpate the UpsertBlockTransaction function

* feat: update all methods to the new schema

* feat: add db transaction

* feat: revert back to id in transactions table

* feat: add tx rollback

* chore: unchecked error fix

* feat: remove transaction from Upsert query

* chore: fix formatting

* feat: limit returned transactions to only those send in bulk
  • Loading branch information
kuba-4chain authored Sep 17, 2024
1 parent a669b24 commit 255aee8
Show file tree
Hide file tree
Showing 27 changed files with 422 additions and 521 deletions.
256 changes: 92 additions & 164 deletions internal/blocktx/blocktx_api/blocktx_api.pb.go

Large diffs are not rendered by default.

8 changes: 0 additions & 8 deletions internal/blocktx/blocktx_api/blocktx_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,10 @@ message RowsAffectedResponse {
int64 rows = 1;
}


message TransactionAndSource {
bytes hash = 1;
string source = 2;
}


message DelUnfinishedBlockProcessingRequest {
string processed_by = 1;
}


message MerkleRootVerificationRequest {
string merkle_root = 1;
uint64 block_height = 2;
Expand Down
2 changes: 1 addition & 1 deletion internal/blocktx/blocktx_api/blocktx_api_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

66 changes: 26 additions & 40 deletions internal/blocktx/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (p *Processor) StartFillGaps(peers []p2p.PeerI) {

func (p *Processor) StartProcessRegisterTxs() {
p.waitGroup.Add(1)
txHashes := make([]*blocktx_api.TransactionAndSource, 0, p.registerTxsBatchSize)
txHashes := make([][]byte, 0, p.registerTxsBatchSize)

ticker := time.NewTicker(p.registerTxsInterval)
go func() {
Expand All @@ -234,16 +234,14 @@ func (p *Processor) StartProcessRegisterTxs() {
case <-p.ctx.Done():
return
case txHash := <-p.registerTxsChan:
txHashes = append(txHashes, &blocktx_api.TransactionAndSource{
Hash: txHash,
})
txHashes = append(txHashes, txHash)

if len(txHashes) < p.registerTxsBatchSize {
continue
}

p.registerTransactions(txHashes[:])
txHashes = make([]*blocktx_api.TransactionAndSource, 0, p.registerTxsBatchSize)
txHashes = txHashes[:0]
ticker.Reset(p.registerTxsInterval)

case <-ticker.C:
Expand All @@ -252,7 +250,7 @@ func (p *Processor) StartProcessRegisterTxs() {
}

p.registerTransactions(txHashes[:])
txHashes = make([]*blocktx_api.TransactionAndSource, 0, p.registerTxsBatchSize)
txHashes = txHashes[:0]
ticker.Reset(p.registerTxsInterval)
}
}
Expand Down Expand Up @@ -337,7 +335,7 @@ func (p *Processor) publishMinedTxs(txHashes []*chainhash.Hash) error {
return nil
}

func (p *Processor) registerTransactions(txHashes []*blocktx_api.TransactionAndSource) {
func (p *Processor) registerTransactions(txHashes [][]byte) {
updatedTxs, err := p.store.RegisterTransactions(p.ctx, txHashes)
if err != nil {
p.logger.Error("failed to register transactions", slog.String("err", err.Error()))
Expand Down Expand Up @@ -410,6 +408,12 @@ func (p *Processor) processBlock(msg *p2p.BlockMessage) error {
previousBlockHash := msg.Header.PrevBlock
merkleRoot := msg.Header.MerkleRoot

// don't process block that was already processed
existingBlock, _ := p.store.GetBlock(ctx, &blockHash)
if existingBlock != nil {
return nil
}

prevBlock, err := p.getPrevBlock(ctx, &previousBlockHash)
if err != nil {
p.logger.Error("unable to get previous block from db", slog.String("hash", blockHash.String()), slog.Uint64("height", msg.Height), slog.String("prevHash", previousBlockHash.String()), slog.String("err", err.Error()))
Expand Down Expand Up @@ -444,6 +448,9 @@ func (p *Processor) processBlock(msg *p2p.BlockMessage) error {
return err
}

// find competing chains back to the common ancestor
// get all registered transactions
// prepare msg with competing blocks
incomingBlock.Status = blocktx_api.Status_STALE

if hasGreatestChainwork {
Expand Down Expand Up @@ -479,16 +486,7 @@ func (p *Processor) processBlock(msg *p2p.BlockMessage) error {
return err
}

block := &p2p.Block{
Hash: &blockHash,
MerkleRoot: &merkleRoot,
PreviousHash: &previousBlockHash,
Height: msg.Height,
Size: msg.Size,
TxCount: uint64(len(msg.TransactionHashes)),
}

if err = p.markBlockAsProcessed(ctx, block); err != nil {
if err = p.store.MarkBlockAsDone(ctx, &blockHash, msg.Size, uint64(len(msg.TransactionHashes))); err != nil {
p.logger.Error("unable to mark block as processed", slog.String("hash", blockHash.String()), slog.String("err", err.Error()))
return err
}
Expand Down Expand Up @@ -601,8 +599,7 @@ func (p *Processor) markTransactionsAsMined(ctx context.Context, blockId uint64,
ctx, span = tracer.Start(ctx, "markTransactionsAsMined")
defer span.End()
}
txs := make([]*blocktx_api.TransactionAndSource, 0, p.transactionStorageBatchSize)
merklePaths := make([]string, 0, p.transactionStorageBatchSize)
txs := make([]store.TxWithMerklePath, 0, p.transactionStorageBatchSize)
leaves := merkleTree[:(len(merkleTree)+1)/2]

var totalSize int
Expand All @@ -627,11 +624,6 @@ func (p *Processor) markTransactionsAsMined(ctx context.Context, blockId uint64,
break
}

// Otherwise they're txids, which should have merkle paths calculated.
txs = append(txs, &blocktx_api.TransactionAndSource{
Hash: hash[:],
})

bump, err := bc.NewBUMPFromMerkleTreeAndIndex(blockHeight, merkleTree, uint64(txIndex))
if err != nil {
return fmt.Errorf("failed to create new bump for tx hash %s from merkle tree and index at block height %d: %v", hash.String(), blockHeight, err)
Expand All @@ -642,19 +634,22 @@ func (p *Processor) markTransactionsAsMined(ctx context.Context, blockId uint64,
return fmt.Errorf("failed to get string from bump for tx hash %s at block height %d: %v", hash.String(), blockHeight, err)
}

merklePaths = append(merklePaths, bumpHex)
txs = append(txs, store.TxWithMerklePath{
Hash: hash[:],
MerklePath: bumpHex,
})

if (txIndex+1)%p.transactionStorageBatchSize == 0 {
updateResp, err := p.store.UpsertBlockTransactions(ctx, blockId, txs, merklePaths)
updateResp, err := p.store.UpsertBlockTransactions(ctx, blockId, txs)
if err != nil {
return fmt.Errorf("failed to insert block transactions at block height %d: %v", blockHeight, err)
}
// free up memory
txs = make([]*blocktx_api.TransactionAndSource, 0, p.transactionStorageBatchSize)
merklePaths = make([]string, 0, p.transactionStorageBatchSize)
txs = txs[:0]

for _, updResp := range updateResp {
txBlock := &blocktx_api.TransactionBlock{
TransactionHash: updResp.TxHash[:],
TransactionHash: updResp.Hash[:],
BlockHash: blockhash[:],
BlockHeight: blockHeight,
MerklePath: updResp.MerklePath,
Expand All @@ -678,14 +673,14 @@ func (p *Processor) markTransactionsAsMined(ctx context.Context, blockId uint64,
}

// update all remaining transactions
updateResp, err := p.store.UpsertBlockTransactions(ctx, blockId, txs, merklePaths)
updateResp, err := p.store.UpsertBlockTransactions(ctx, blockId, txs)
if err != nil {
return fmt.Errorf("failed to insert block transactions at block height %d: %v", blockHeight, err)
}

for _, updResp := range updateResp {
txBlock := &blocktx_api.TransactionBlock{
TransactionHash: updResp.TxHash[:],
TransactionHash: updResp.Hash[:],
BlockHash: blockhash[:],
BlockHeight: blockHeight,
MerklePath: updResp.MerklePath,
Expand All @@ -699,15 +694,6 @@ func (p *Processor) markTransactionsAsMined(ctx context.Context, blockId uint64,
return nil
}

func (p *Processor) markBlockAsProcessed(ctx context.Context, block *p2p.Block) error {
err := p.store.MarkBlockAsDone(ctx, block.Hash, block.Size, block.TxCount)
if err != nil {
return err
}

return nil
}

func (p *Processor) Shutdown() {
p.cancelAll()
p.waitGroup.Wait()
Expand Down
69 changes: 41 additions & 28 deletions internal/blocktx/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,25 @@ func TestHandleBlock(t *testing.T) {
merkleRootHash1585018, _ := chainhash.NewHashFromStr("9c1fe95a7ac4502e281f4f2eaa2902e12b0f486cf610977c73afb3cd060bebde")

tt := []struct {
name string
prevBlockHash chainhash.Hash
merkleRoot chainhash.Hash
height uint64
txHashes []string
size uint64
nonce uint32
setBlockProcessingErr error
bhsProcInProg []*chainhash.Hash
name string
prevBlockHash chainhash.Hash
merkleRoot chainhash.Hash
height uint64
txHashes []string
size uint64
nonce uint32
blockAlreadyExists bool
}{
{
name: "block height 1573650",
txHashes: []string{}, // expect this block to not be processed
prevBlockHash: *prevBlockHash1573650,
merkleRoot: *merkleRootHash1573650,
height: 1573650,
nonce: 3694498168,
size: 216,
blockAlreadyExists: true,
},
{
name: "block height 1573650",
txHashes: []string{"3d64b2bb6bd4e85aacb6d1965a2407fa21846c08dd9a8616866ad2f5c80fda7f"},
Expand Down Expand Up @@ -136,6 +145,9 @@ func TestHandleBlock(t *testing.T) {
batchSize := 4
storeMock := &storeMocks.BlocktxStoreMock{
GetBlockFunc: func(ctx context.Context, hash *chainhash.Hash) (*blocktx_api.Block, error) {
if tc.blockAlreadyExists {
return &blocktx_api.Block{}, nil
}
return nil, store.ErrBlockNotFound
},
GetBlockByHeightFunc: func(ctx context.Context, height uint64, status blocktx_api.Status) (*blocktx_api.Block, error) {
Expand Down Expand Up @@ -170,41 +182,35 @@ func TestHandleBlock(t *testing.T) {

processor.StartBlockProcessing()

var expectedInsertedTransactions []*blocktx_api.TransactionAndSource
var expectedInsertedTransactions [][]byte
transactionHashes := make([]*chainhash.Hash, len(tc.txHashes))
for i, hash := range tc.txHashes {
txHash, err := chainhash.NewHashFromStr(hash)
require.NoError(t, err)
transactionHashes[i] = txHash

expectedInsertedTransactions = append(expectedInsertedTransactions, &blocktx_api.TransactionAndSource{Hash: txHash[:]})
expectedInsertedTransactions = append(expectedInsertedTransactions, txHash[:])
}

var insertedBlockTransactions []*blocktx_api.TransactionAndSource
var insertedBlockTransactions [][]byte

storeMock.UpsertBlockTransactionsFunc = func(ctx context.Context, blockId uint64, transactions []*blocktx_api.TransactionAndSource, merklePaths []string) ([]store.UpsertBlockTransactionsResult, error) {
require.True(t, len(merklePaths) <= batchSize)
require.True(t, len(transactions) <= batchSize)
storeMock.UpsertBlockTransactionsFunc = func(ctx context.Context, blockId uint64, txsWithMerklePaths []store.TxWithMerklePath) ([]store.TxWithMerklePath, error) {
require.True(t, len(txsWithMerklePaths) <= batchSize)

for i, path := range merklePaths {
bump, err := bc.NewBUMPFromStr(path)
for _, tx := range txsWithMerklePaths {
bump, err := bc.NewBUMPFromStr(tx.MerklePath)
require.NoError(t, err)
tx, err := chainhash.NewHash(transactions[i].GetHash())
tx, err := chainhash.NewHash(tx.Hash)
require.NoError(t, err)
root, err := bump.CalculateRootGivenTxid(tx.String())
require.NoError(t, err)

require.Equal(t, root, tc.merkleRoot.String())
}

insertedBlockTransactions = append(insertedBlockTransactions, transactions...)

result := make([]store.UpsertBlockTransactionsResult, len(transactions))
for i, tx := range transactions {
result[i] = store.UpsertBlockTransactionsResult{TxHash: tx.Hash}
insertedBlockTransactions = append(insertedBlockTransactions, tx[:])
}

return result, nil
return txsWithMerklePaths, nil
}

peer := &mocks.PeerMock{
Expand Down Expand Up @@ -301,8 +307,15 @@ func TestHandleBlockReorg(t *testing.T) {
var mtx sync.Mutex
var insertedBlock *blocktx_api.Block

shouldReturnNoBlock := true

storeMock := &storeMocks.BlocktxStoreMock{
GetBlockFunc: func(ctx context.Context, hash *chainhash.Hash) (*blocktx_api.Block, error) {
if shouldReturnNoBlock {
shouldReturnNoBlock = false
return nil, nil
}

return &blocktx_api.Block{
Status: tc.prevBlockStatus,
}, nil
Expand Down Expand Up @@ -348,8 +361,8 @@ func TestHandleBlockReorg(t *testing.T) {
MarkBlockAsDoneFunc: func(ctx context.Context, hash *chainhash.Hash, size uint64, txCount uint64) error {
return nil
},
UpsertBlockTransactionsFunc: func(ctx context.Context, blockId uint64, transactions []*blocktx_api.TransactionAndSource, merklePaths []string) ([]store.UpsertBlockTransactionsResult, error) {
return []store.UpsertBlockTransactionsResult{}, nil
UpsertBlockTransactionsFunc: func(ctx context.Context, blockId uint64, txsWithMerklePaths []store.TxWithMerklePath) ([]store.TxWithMerklePath, error) {
return []store.TxWithMerklePath{}, nil
},
}

Expand Down Expand Up @@ -510,7 +523,7 @@ func TestStartProcessRegisterTxs(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
registerErrTest := tc.registerErr
storeMock := &storeMocks.BlocktxStoreMock{
RegisterTransactionsFunc: func(ctx context.Context, transaction []*blocktx_api.TransactionAndSource) ([]*chainhash.Hash, error) {
RegisterTransactionsFunc: func(ctx context.Context, transaction [][]byte) ([]*chainhash.Hash, error) {
return nil, registerErrTest
},
}
Expand Down
Loading

0 comments on commit 255aee8

Please sign in to comment.