Skip to content

Commit

Permalink
feat: adapt to new error traces
Browse files Browse the repository at this point in the history
  • Loading branch information
kuba-4chain committed Nov 27, 2024
1 parent 02cc8d8 commit 0dcabf9
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 30 deletions.
74 changes: 47 additions & 27 deletions internal/blocktx/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ func (p *Processor) processBlock(msg *p2p.BlockMessage) (err error) {

p.logger.Info("publishing tx", slog.String("txHash", getHashStringNoErr(tx.TxHash)))

err = p.mqClient.PublishMarshal(MinedTxsTopic, txBlock)
err = p.mqClient.PublishMarshal(ctx, MinedTxsTopic, txBlock)
if err != nil {
p.logger.Error("failed to publish mined txs", slog.String("blockHash", getHashStringNoErr(tx.BlockHash)), slog.Uint64("height", tx.BlockHeight), slog.String("txHash", getHashStringNoErr(tx.TxHash)), slog.String("err", err.Error()))
}
Expand All @@ -505,23 +505,25 @@ func (p *Processor) processBlock(msg *p2p.BlockMessage) (err error) {
return nil
}

func (p *Processor) verifyAndInsertBlock(ctx context.Context, msg *p2p.BlockMessage) (*blocktx_api.Block, error) {
func (p *Processor) verifyAndInsertBlock(ctx context.Context, msg *p2p.BlockMessage) (incomingBlock *blocktx_api.Block, err error) {
ctx, span := tracing.StartTracing(ctx, "verifyAndInsertBlock", p.tracingEnabled, p.tracingAttributes...)
defer tracing.EndTracing(span)
defer func() {
tracing.EndTracing(span, err)
}()

blockHash := msg.Header.BlockHash()
previousBlockHash := msg.Header.PrevBlock
merkleRoot := msg.Header.MerkleRoot

incomingBlock := &blocktx_api.Block{
incomingBlock = &blocktx_api.Block{
Hash: blockHash[:],
PreviousHash: previousBlockHash[:],
MerkleRoot: merkleRoot[:],
Height: msg.Height,
Chainwork: calculateChainwork(msg.Header.Bits).String(),
}

err := p.assignBlockStatus(ctx, incomingBlock, previousBlockHash)
err = p.assignBlockStatus(ctx, incomingBlock, previousBlockHash)
if err != nil {
p.logger.Error("unable to assign block status", slog.String("hash", blockHash.String()), slog.Uint64("height", incomingBlock.Height), slog.String("err", err.Error()))
return nil, err
Expand All @@ -538,16 +540,19 @@ func (p *Processor) verifyAndInsertBlock(ctx context.Context, msg *p2p.BlockMess
return incomingBlock, nil
}

func (p *Processor) assignBlockStatus(ctx context.Context, block *blocktx_api.Block, prevBlockHash chainhash.Hash) error {
func (p *Processor) assignBlockStatus(ctx context.Context, block *blocktx_api.Block, prevBlockHash chainhash.Hash) (err error) {
ctx, span := tracing.StartTracing(ctx, "assignBlockStatus", p.tracingEnabled, p.tracingAttributes...)
defer tracing.EndTracing(span)
defer func() {
tracing.EndTracing(span, err)
}()

prevBlock, _ := p.store.GetBlock(ctx, &prevBlockHash)

if prevBlock == nil {
// This check is only in case there's a fresh, empty database
// with no blocks, to mark the first block as the LONGEST chain
longestTipExists, err := p.longestTipExists(ctx)
var longestTipExists bool
longestTipExists, err = p.longestTipExists(ctx)
if err != nil {
p.logger.Error("unable to verify the longest tip existance in db", slog.String("hash", getHashStringNoErr(block.Hash)), slog.Uint64("height", block.Height), slog.String("err", err.Error()))
return err
Expand All @@ -574,7 +579,8 @@ func (p *Processor) assignBlockStatus(ctx context.Context, block *blocktx_api.Bl
}

if prevBlock.Status == blocktx_api.Status_LONGEST {
competingBlock, err := p.store.GetLongestBlockByHeight(ctx, block.Height)
var competingBlock *blocktx_api.Block
competingBlock, err = p.store.GetLongestBlockByHeight(ctx, block.Height)
if err != nil && !errors.Is(err, store.ErrBlockNotFound) {
p.logger.Error("unable to get the competing block from db", slog.String("hash", getHashStringNoErr(block.Hash)), slog.Uint64("height", block.Height), slog.String("err", err.Error()))
return err
Expand All @@ -589,7 +595,8 @@ func (p *Processor) assignBlockStatus(ctx context.Context, block *blocktx_api.Bl
// this means that another instance is already processing
// or have processed this block that we're processing here
// so we can throw an error and finish processing
return ErrBlockAlreadyExists
err = ErrBlockAlreadyExists
return err
}

block.Status = blocktx_api.Status_STALE
Expand All @@ -615,16 +622,18 @@ func (p *Processor) longestTipExists(ctx context.Context) (bool, error) {
return true, nil
}

func (p *Processor) getRegisteredTransactions(ctx context.Context, blocks []*blocktx_api.Block) ([]store.TransactionBlock, error) {
func (p *Processor) getRegisteredTransactions(ctx context.Context, blocks []*blocktx_api.Block) (txsToPublish []store.TransactionBlock, err error) {
ctx, span := tracing.StartTracing(ctx, "getRegisteredTransactions", p.tracingEnabled, p.tracingAttributes...)
defer tracing.EndTracing(span)
defer func() {
tracing.EndTracing(span, err)
}()

blockHashes := make([][]byte, len(blocks))
for i, b := range blocks {
blockHashes[i] = b.Hash
}

txsToPublish, err := p.store.GetRegisteredTxsByBlockHashes(ctx, blockHashes)
txsToPublish, err = p.store.GetRegisteredTxsByBlockHashes(ctx, blockHashes)
if err != nil {
block := blocks[len(blocks)-1]
p.logger.Error("unable to get registered transactions", slog.String("hash", getHashStringNoErr(block.Hash)), slog.Uint64("height", block.Height), slog.String("err", err.Error()))
Expand All @@ -634,9 +643,11 @@ func (p *Processor) getRegisteredTransactions(ctx context.Context, blocks []*blo
return txsToPublish, nil
}

func (p *Processor) insertBlockAndStoreTransactions(ctx context.Context, incomingBlock *blocktx_api.Block, txHashes []*chainhash.Hash, merkleRoot chainhash.Hash) error {
func (p *Processor) insertBlockAndStoreTransactions(ctx context.Context, incomingBlock *blocktx_api.Block, txHashes []*chainhash.Hash, merkleRoot chainhash.Hash) (err error) {
ctx, span := tracing.StartTracing(ctx, "insertBlockAndStoreTransactions", p.tracingEnabled, p.tracingAttributes...)
defer tracing.EndTracing(span)
defer func() {
tracing.EndTracing(span, err)
}()

blockID, err := p.store.UpsertBlock(ctx, incomingBlock)
if err != nil {
Expand Down Expand Up @@ -735,9 +746,11 @@ func (p *Processor) storeTransactions(ctx context.Context, blockID uint64, block
return nil
}

func (p *Processor) handleStaleBlock(ctx context.Context, block *blocktx_api.Block) ([]store.TransactionBlock, error) {
func (p *Processor) handleStaleBlock(ctx context.Context, block *blocktx_api.Block) (txsToPublish []store.TransactionBlock, err error) {
ctx, span := tracing.StartTracing(ctx, "handleStaleBlock", p.tracingEnabled, p.tracingAttributes...)
defer tracing.EndTracing(span)
defer func() {
tracing.EndTracing(span, err)
}()

staleBlocks, err := p.store.GetStaleChainBackFromHash(ctx, block.Hash)
if err != nil {
Expand All @@ -762,7 +775,7 @@ func (p *Processor) handleStaleBlock(ctx context.Context, block *blocktx_api.Blo
if longestChainwork.Cmp(staleChainwork) < 0 {
p.logger.Info("chain reorg detected", slog.String("hash", getHashStringNoErr(block.Hash)), slog.Uint64("height", block.Height))

txsToPublish, err := p.performReorg(ctx, staleBlocks, longestBlocks)
txsToPublish, err = p.performReorg(ctx, staleBlocks, longestBlocks)
if err != nil {
p.logger.Error("unable to perform reorg", slog.String("hash", getHashStringNoErr(block.Hash)), slog.Uint64("height", block.Height), slog.String("err", err.Error()))
return nil, err
Expand All @@ -773,9 +786,11 @@ func (p *Processor) handleStaleBlock(ctx context.Context, block *blocktx_api.Blo
return nil, nil
}

func (p *Processor) performReorg(ctx context.Context, staleBlocks []*blocktx_api.Block, longestBlocks []*blocktx_api.Block) ([]store.TransactionBlock, error) {
func (p *Processor) performReorg(ctx context.Context, staleBlocks []*blocktx_api.Block, longestBlocks []*blocktx_api.Block) (txsToPublish []store.TransactionBlock, err error) {
ctx, span := tracing.StartTracing(ctx, "performReorg", p.tracingEnabled, p.tracingAttributes...)
defer tracing.EndTracing(span)
defer func() {
tracing.EndTracing(span, err)
}()

staleHashes := make([][]byte, len(staleBlocks))
longestHashes := make([][]byte, len(longestBlocks))
Expand All @@ -798,7 +813,7 @@ func (p *Processor) performReorg(ctx context.Context, staleBlocks []*blocktx_api
blockStatusUpdates[i+len(longestBlocks)] = update
}

err := p.store.UpdateBlocksStatuses(ctx, blockStatusUpdates)
err = p.store.UpdateBlocksStatuses(ctx, blockStatusUpdates)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -829,9 +844,11 @@ func (p *Processor) performReorg(ctx context.Context, staleBlocks []*blocktx_api
return append(longestTxs, staleTxs...), nil
}

func (p *Processor) handleOrphans(ctx context.Context, block *blocktx_api.Block) ([]store.TransactionBlock, error) {
func (p *Processor) handleOrphans(ctx context.Context, block *blocktx_api.Block) (txsToPublis []store.TransactionBlock, err error) {
ctx, span := tracing.StartTracing(ctx, "handleOrphans", p.tracingEnabled, p.tracingAttributes...)
defer tracing.EndTracing(span)
defer func() {
tracing.EndTracing(span, err)
}()

orphans, ancestor, err := p.store.GetOrphansBackToNonOrphanAncestor(ctx, block.Hash)
if err != nil {
Expand Down Expand Up @@ -864,7 +881,8 @@ func (p *Processor) handleOrphans(ctx context.Context, block *blocktx_api.Block)
// of the first orphan, then we can assume that
// there's no competing chain at all.

competingBlock, err := p.store.GetLongestBlockByHeight(ctx, orphans[0].Height)
var competingBlock *blocktx_api.Block
competingBlock, err = p.store.GetLongestBlockByHeight(ctx, orphans[0].Height)
if err != nil && !errors.Is(err, store.ErrBlockNotFound) {
p.logger.Error("unable to get competing block when handling orphans", slog.String("hash", getHashStringNoErr(block.Hash)), slog.Uint64("height", block.Height), slog.String("err", err.Error()))
return nil, err
Expand Down Expand Up @@ -892,9 +910,11 @@ func (p *Processor) handleOrphans(ctx context.Context, block *blocktx_api.Block)
return nil, nil
}

func (p *Processor) acceptIntoChain(ctx context.Context, blocks []*blocktx_api.Block, chain blocktx_api.Status) error {
func (p *Processor) acceptIntoChain(ctx context.Context, blocks []*blocktx_api.Block, chain blocktx_api.Status) (err error) {
ctx, span := tracing.StartTracing(ctx, "acceptIntoChain", p.tracingEnabled, p.tracingAttributes...)
defer tracing.EndTracing(span)
defer func() {
tracing.EndTracing(span, err)
}()

blockStatusUpdates := make([]store.BlockStatusUpdate, len(blocks))

Expand All @@ -908,7 +928,7 @@ func (p *Processor) acceptIntoChain(ctx context.Context, blocks []*blocktx_api.B

tip := blocks[len(blocks)-1]

err := p.store.UpdateBlocksStatuses(ctx, blockStatusUpdates)
err = p.store.UpdateBlocksStatuses(ctx, blockStatusUpdates)
if err != nil {
p.logger.Error("unable to accept blocks into chain", slog.String("hash", getHashStringNoErr(tip.Hash)), slog.Uint64("height", tip.Height), slog.String("chain", chain.String()), slog.String("err", err.Error()))
return err
Expand Down
13 changes: 10 additions & 3 deletions internal/blocktx/store/postgresql/get_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ import (
"github.com/lib/pq"
)

func (p *PostgreSQL) GetMinedTransactions(ctx context.Context, hashes [][]byte, onlyLongestChain bool) ([]store.TransactionBlock, error) {
func (p *PostgreSQL) GetMinedTransactions(ctx context.Context, hashes [][]byte, onlyLongestChain bool) (minedTransactions []store.TransactionBlock, err error) {
ctx, span := tracing.StartTracing(ctx, "GetMinedTransactions", p.tracingEnabled, p.tracingAttributes...)
defer tracing.EndTracing(span)
defer func() {
tracing.EndTracing(span, err)
}()

if onlyLongestChain {
predicate := "WHERE t.hash = ANY($1) AND b.is_longest = true"
Expand All @@ -27,7 +29,12 @@ func (p *PostgreSQL) GetMinedTransactions(ctx context.Context, hashes [][]byte,
)
}

func (p *PostgreSQL) GetRegisteredTxsByBlockHashes(ctx context.Context, blockHashes [][]byte) ([]store.TransactionBlock, error) {
func (p *PostgreSQL) GetRegisteredTxsByBlockHashes(ctx context.Context, blockHashes [][]byte) (registeredTxs []store.TransactionBlock, err error) {
ctx, span := tracing.StartTracing(ctx, "GetMinedTransactions", p.tracingEnabled, p.tracingAttributes...)
defer func() {
tracing.EndTracing(span, err)
}()

predicate := "WHERE b.hash = ANY($1) AND t.is_registered = TRUE"

return p.getTransactionBlocksByPredicate(ctx, predicate, pq.Array(blockHashes))
Expand Down

0 comments on commit 0dcabf9

Please sign in to comment.