Skip to content

Commit

Permalink
feat: re-activate reorg support (#737)
Browse files Browse the repository at this point in the history
  • Loading branch information
boecklim authored Jan 28, 2025
1 parent 38b431a commit ac44317
Show file tree
Hide file tree
Showing 23 changed files with 119 additions and 95 deletions.
1 change: 1 addition & 0 deletions cmd/arc/services/blocktx.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err
blocktx.WithRegisterTxsInterval(btxConfig.RegisterTxsInterval),
blocktx.WithMessageQueueClient(mqClient),
blocktx.WithMaxBlockProcessingDuration(btxConfig.MaxBlockProcessingDuration),
blocktx.WithIncomingIsLongest(btxConfig.IncomingIsLongest),
)

blockRequestCh := make(chan blocktx.BlockRequest, blockProcessingBuffer)
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ type BlocktxConfig struct {
MaxAllowedBlockHeightMismatch int `mapstructure:"maxAllowedBlockHeightMismatch"`
MessageQueue *MessageQueueConfig `mapstructure:"mq"`
P2pReadBufferSize int `mapstructure:"p2pReadBufferSize"`
IncomingIsLongest bool `mapstructure:"incomingIsLongest"`
}

type DbConfig struct {
Expand Down
1 change: 1 addition & 0 deletions config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func getBlocktxConfig() *BlocktxConfig {
MaxBlockProcessingDuration: 5 * time.Minute,
MessageQueue: &MessageQueueConfig{},
P2pReadBufferSize: 8 * 1024 * 1024,
IncomingIsLongest: false,
}
}

Expand Down
1 change: 1 addition & 0 deletions config/example_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ blocktx:
registerTxsInterval: 10s # time interval to read from the channel registered transactions
maxBlockProcessingDuration: 5m # maximum time a blocktx can spend on processing a block before unlocking it to be requested again
monitorPeers: false # if enabled, peers which do not receive alive signal from nodes will be restarted
incomingIsLongest: false # whether each new block received is considered to be from the longest blockchain. If there are a lot of block gaps in blocktx database it is advisable to set this to true
fillGaps:
enabled: true
interval: 15m # time interval to check and fill gaps in processed blocks
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
- block_id: 1002
hash: 0xb16cea53fc823e146fbb9ae4ad3124f7c273f30562585ad6e4831495d609f430
merkle_tree_index: 3
merkle_tree_index: 0
- block_id: 1999 # the same tx also in stale block
hash: 0xb16cea53fc823e146fbb9ae4ad3124f7c273f30562585ad6e4831495d609f430
merkle_tree_index: 999
merkle_tree_index: 0
- block_id: 1999 # the same tx also in stale block
hash: 0xcd3d2f97dfc0cdb6a07ec4b72df5e1794c9553ff2f62d90ed4add047e8088853
merkle_tree_index: 999
merkle_tree_index: 1
- block_id: 1004
hash: 0xece2b7e40d98749c03c551b783420d6e3fdc3c958244bbf275437839585829a6
merkle_tree_index: 5
merkle_tree_index: 0
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
- inserted_at: 2023-12-15 14:00:00
id: 1001
hash: 0xf97e20396f02ab990ed31b9aec70c240f48b7e5ea239aa050000000000000000
hash: 0x67708796ef57464ed9eaf2a663d3da32372e4c2fb65558020000000000000000
prevhash: 0xb71ab063c5f96cad71cdc59dcc94182a20a69cbd7eed2d070000000000000000
merkleroot: 0x7f4019eb006f5333cce752df387fa8443035c22291eb771ee5b16a02b81c8483
height: 822014
Expand All @@ -22,18 +22,6 @@
status: 10
is_longest: true
chainwork: '62209952899966'
- inserted_at: 2023-12-15 14:30:00
id: 1999
hash: 0x82471bbf045ab13825a245b37de71d77ec12513b37e2524ec11551d18c19f7c3
prevhash: 0x67708796ef57464ed9eaf2a663d3da32372e4c2fb65558020000000000000000
merkleroot: 0x7382df1b717287ab87e5e3e25759697c4c45eea428f701cdd0c77ad3fc707257
height: 822015
processed_at: 2023-12-15 14:30:00
size: 20160000
tx_count: 6523
status: 20 # STALE - competing block
is_longest: false
chainwork: '62209952899966'
- inserted_at: 2023-12-15 14:40:00
id: 1003
hash: 0xe1df1273e6e7270f96b508545d7aa80aebda7d758dc82e080000000000000000
Expand All @@ -58,6 +46,23 @@
status: 10
is_longest: true
chainwork: '62209952899966'

# Stale
- inserted_at: 2023-12-15 14:30:00
id: 1999
hash: 0x82471bbf045ab13825a245b37de71d77ec12513b37e2524ec11551d18c19f7c3
prevhash: 0x67708796ef57464ed9eaf2a663d3da32372e4c2fb65558020000000000000000
merkleroot: 0x7382df1b717287ab87e5e3e25759697c4c45eea428f701cdd0c77ad3fc707257
height: 822015
processed_at: 2023-12-15 14:30:00
size: 20160000
tx_count: 6523
status: 20 # STALE - competing block
is_longest: false
chainwork: '62209952899966'



- inserted_at: 2023-12-15 14:50:00
id: 10052
hash: 0x000000000000000003b15d668b54c4b91ae81a86298ee209d9f39fd7a769bcde
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
- hash: 0xcd3d2f97dfc0cdb6a07ec4b72df5e1794c9553ff2f62d90ed4add047e8088853
inserted_at: 2023-12-15 14:00:00
- hash: 0xb16cea53fc823e146fbb9ae4ad3124f7c273f30562585ad6e4831495d609f430
inserted_at: 2023-12-15 14:00:00
- hash: 0x2ff4430eb883c6f6c0640a5d716b2d107bbc0efa5aeaa237aec796d4686b0a8f
inserted_at: 2023-12-15 14:00:00
- hash: 0xece2b7e40d98749c03c551b783420d6e3fdc3c958244bbf275437839585829a6
inserted_at: 2023-12-15 14:00:00
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
- block_id: 1002
hash: 0xcd3d2f97dfc0cdb6a07ec4b72df5e1794c9553ff2f62d90ed4add047e8088853
merkle_tree_index: 1
merkle_tree_index: 0
- block_id: 1002
hash: 0xb16cea53fc823e146fbb9ae4ad3124f7c273f30562585ad6e4831495d609f430
merkle_tree_index: 3
merkle_tree_index: 1
- block_id: 1004
hash: 0xb16cea53fc823e146fbb9ae4ad3124f7c273f30562585ad6e4831495d609f430
merkle_tree_index: 3
merkle_tree_index: 0
- block_id: 1003
hash: 0x2ff4430eb883c6f6c0640a5d716b2d107bbc0efa5aeaa237aec796d4686b0a8f
merkle_tree_index: 4
merkle_tree_index: 0
- block_id: 1006
hash: 0xece2b7e40d98749c03c551b783420d6e3fdc3c958244bbf275437839585829a6
merkle_tree_index: 5
merkle_tree_index: 0
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
- hash: 0xcd3d2f97dfc0cdb6a07ec4b72df5e1794c9553ff2f62d90ed4add047e8088853
inserted_at: 2023-12-15 14:00:00
- hash: 0xb16cea53fc823e146fbb9ae4ad3124f7c273f30562585ad6e4831495d609f430
inserted_at: 2023-12-15 14:00:00
- hash: 0x2ff4430eb883c6f6c0640a5d716b2d107bbc0efa5aeaa237aec796d4686b0a8f
inserted_at: 2023-12-15 14:00:00
- hash: 0xece2b7e40d98749c03c551b783420d6e3fdc3c958244bbf275437839585829a6
inserted_at: 2023-12-15 14:00:00

This file was deleted.

5 changes: 1 addition & 4 deletions internal/blocktx/integration_test/reorg_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ import (
)

func TestReorg(t *testing.T) {
// TODO: remove the skip when gaps are filling quickly again
t.Skip("Skipping until gaps are being processed quickly again")

if testing.Short() {
t.Skip("skipping integration test")
}
Expand Down Expand Up @@ -150,7 +147,7 @@ func TestReorg(t *testing.T) {
blockHash822015Fork = "82471bbf045ab13825a245b37de71d77ec12513b37e2524ec11551d18c19f7c3"
blockHash822016Fork = "032c3688bc7536b2d787f3a196b1145a09bf33183cd1448ff6b1a9dfbb022db8"

blockHash822014StartOfChain = "f97e20396f02ab990ed31b9aec70c240f48b7e5ea239aa050000000000000000"
blockHash822014StartOfChain = "67708796ef57464ed9eaf2a663d3da32372e4c2fb65558020000000000000000"
blockHash822015 = "c9b4e1e4dcf9188416027511671b9346be8ef93c0ddf59060000000000000000"
blockHash822016 = "e1df1273e6e7270f96b508545d7aa80aebda7d758dc82e080000000000000000"
blockHash822017 = "76404890880cb36ce68100abb05b3a958e17c0ed274d5c0a0000000000000000"
Expand Down
22 changes: 11 additions & 11 deletions internal/blocktx/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ const (
registerTxsBatchSizeDefault = 100
registerRequestTxBatchSizeDefault = 100
waitForBlockProcessing = 5 * time.Minute
lockTime = 5 * time.Minute
parallellism = 5
)

Expand All @@ -71,6 +70,7 @@ type Processor struct {
tracingAttributes []attribute.KeyValue
stats *processorStats
statCollectionInterval time.Duration
incomingIsLongest bool

now func() time.Time
maxBlockProcessingDuration time.Duration
Expand Down Expand Up @@ -166,7 +166,7 @@ func (p *Processor) StartBlockRequesting() {
peer := req.Peer

// lock block for the current instance to process
processedBy, err := p.store.SetBlockProcessing(p.ctx, hash, p.hostname, lockTime, maxBlocksInProgress)
processedBy, err := p.store.SetBlockProcessing(p.ctx, hash, p.hostname, p.maxBlockProcessingDuration, maxBlocksInProgress)
if err != nil {
if errors.Is(err, store.ErrBlockProcessingMaximumReached) {
p.logger.Debug("block processing maximum reached", slog.String("hash", hash.String()), slog.String("processed_by", processedBy))
Expand Down Expand Up @@ -456,15 +456,17 @@ func (p *Processor) verifyAndInsertBlock(ctx context.Context, blockMsg *p2p.Bloc
MerkleRoot: merkleRoot[:],
Height: blockMsg.Height,
Chainwork: calculateChainwork(blockMsg.Header.Bits).String(),
Status: blocktx_api.Status_LONGEST, // temporary fix (!), TODO: remove this when gaps are filling quickly again
}

// TODO: uncomment when gaps are filling quickly again
// err = p.assignBlockStatus(ctx, incomingBlock, previousBlockHash)
// if err != nil {
// p.logger.Error("unable to assign block status", slog.String("hash", blockHash.String()), slog.Uint64("height", incomingBlock.Height), slog.String("err", err.Error()))
// return nil, err
// }
if p.incomingIsLongest {
incomingBlock.Status = blocktx_api.Status_LONGEST
} else {
err = p.assignBlockStatus(ctx, incomingBlock, previousBlockHash)
if err != nil {
p.logger.Error("unable to assign block status", slog.String("hash", blockHash.String()), slog.Uint64("height", incomingBlock.Height), slog.String("err", err.Error()))
return nil, err
}
}

p.logger.Info("Inserting block", slog.String("hash", blockHash.String()), slog.Uint64("height", incomingBlock.Height), slog.String("status", incomingBlock.Status.String()))

Expand All @@ -477,7 +479,6 @@ func (p *Processor) verifyAndInsertBlock(ctx context.Context, blockMsg *p2p.Bloc
return incomingBlock, nil
}

//lint:ignore U1000 Ignored until gaps are filling quickly again TODO: remove this ignore
func (p *Processor) assignBlockStatus(ctx context.Context, block *blocktx_api.Block, prevBlockHash chainhash.Hash) (err error) {
ctx, span := tracing.StartTracing(ctx, "assignBlockStatus", p.tracingEnabled, p.tracingAttributes...)
defer func() {
Expand Down Expand Up @@ -538,7 +539,6 @@ func (p *Processor) assignBlockStatus(ctx context.Context, block *blocktx_api.Bl
return nil
}

//lint:ignore U1000 Ignored until gaps are filling quickly again TODO: remove this ignore
func (p *Processor) longestTipExists(ctx context.Context) (bool, error) {
_, err := p.store.GetChainTip(ctx)
if err != nil && !errors.Is(err, store.ErrBlockNotFound) {
Expand Down
6 changes: 6 additions & 0 deletions internal/blocktx/processor_opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,9 @@ func WithMaxBlockProcessingDuration(d time.Duration) func(*Processor) {
processor.maxBlockProcessingDuration = d
}
}

func WithIncomingIsLongest(enabled bool) func(*Processor) {
return func(processor *Processor) {
processor.incomingIsLongest = enabled
}
}
3 changes: 0 additions & 3 deletions internal/blocktx/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,6 @@ func TestHandleBlock(t *testing.T) {
}

func TestHandleBlockReorgAndOrphans(t *testing.T) {
// TODO: remove the skip when gaps are filling quickly again
t.Skip("Skipping until gaps are being processed quickly again")

testCases := []struct {
name string
blockAlreadyExists bool
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
#- hash: 0x76732b80598326a18d3bf0a86518adbdf95d0ddc6ff6693004440f4776168c3b
# inserted_at: 2023-12-15 14:00:00
#- hash: 0x164e85a5d5bc2b2372e8feaa266e5e4b7d0808f8d2b784fb1f7349c4726392b0
# inserted_at: 2023-12-15 14:00:00
#- hash: 0xb4201cc6fc5768abff14adf75042ace6061da9176ee5bb943291b9ba7d7f5743
# inserted_at: 2023-12-15 14:00:00
#- hash: 0x37bd6c87927e75faeb3b3c939f64721cda48e1bb98742676eebe83aceee1a669
# inserted_at: 2023-12-15 14:00:00
#- hash: 0x952f80e20a0330f3b9c2dfd1586960064e797218b5c5df665cada221452c17eb
# inserted_at: 2023-12-15 14:00:00
#- hash: 0x861a281b27de016e50887288de87eab5ca56a1bb172cdff6dba965474ce0f608
# inserted_at: 2023-12-15 14:00:00
#- hash: 0x9421cc760c5405af950a76dc3e4345eaefd4e7322f172a3aee5e0ddc7b4f8313
# inserted_at: 2023-12-15 14:00:00
- hash: 0x76732b80598326a18d3bf0a86518adbdf95d0ddc6ff6693004440f4776168c3b
inserted_at: 2023-12-15 14:00:00
- hash: 0x164e85a5d5bc2b2372e8feaa266e5e4b7d0808f8d2b784fb1f7349c4726392b0
inserted_at: 2023-12-15 14:00:00
- hash: 0xb4201cc6fc5768abff14adf75042ace6061da9176ee5bb943291b9ba7d7f5743
inserted_at: 2023-12-15 14:00:00
- hash: 0x37bd6c87927e75faeb3b3c939f64721cda48e1bb98742676eebe83aceee1a669
inserted_at: 2023-12-15 14:00:00
- hash: 0x952f80e20a0330f3b9c2dfd1586960064e797218b5c5df665cada221452c17eb
inserted_at: 2023-12-15 14:00:00
- hash: 0x861a281b27de016e50887288de87eab5ca56a1bb172cdff6dba965474ce0f608
inserted_at: 2023-12-15 14:00:00
- hash: 0x9421cc760c5405af950a76dc3e4345eaefd4e7322f172a3aee5e0ddc7b4f8313
inserted_at: 2023-12-15 14:00:00
- hash: 0x8b7d038db4518ac4c665abfc5aeaacbd2124ad8ca70daa8465ed2c4427c41b9b
inserted_at: 2023-12-15 14:00:00
3 changes: 2 additions & 1 deletion internal/blocktx/store/postgresql/get_orphaned_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package postgresql
import (
"context"

"github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api"
"github.com/libsv/go-p2p/chaincfg/chainhash"

"github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api"
)

// GetOrphansBackToNonOrphanAncestor recursively searches for blocks marked
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP INDEX blocktx.pux_height_is_longest;
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- This will make sure that there can only be ONE block at any
-- given height that is considered part of the LONGEST chain.
CREATE UNIQUE INDEX pux_height_is_longest ON blocktx.blocks(height)
WHERE is_longest;
37 changes: 16 additions & 21 deletions internal/blocktx/store/postgresql/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,7 @@ func TestPostgresDB(t *testing.T) {

blockHash1 := testutils.RevChainhash(t, "000000000000000001b8adefc1eb98896c80e30e517b9e2655f1f929d9958a48")
blockHash2 := testutils.RevChainhash(t, "00000000000000000a081a539601645abe977946f8f6466a3c9e0c34d50be4a8")
// TODO: uncomment when all block gaps are filled
// blockHashViolating := testutils.RevChainhash(t, "00000000b69bd8e4dc60580117617a466d5c76ada85fb7b87e9baea01f9d9984")
blockHashViolating := testutils.RevChainhash(t, "00000000b69bd8e4dc60580117617a466d5c76ada85fb7b87e9baea01f9d9984")
merkleRoot := testutils.RevChainhash(t, "31e25c5ac7c143687f55fc49caf0f552ba6a16d4f785e4c9a9a842179a085f0c")
expectedBlock := &blocktx_api.Block{
Hash: blockHash2[:],
Expand All @@ -148,14 +147,13 @@ func TestPostgresDB(t *testing.T) {
Status: blocktx_api.Status_LONGEST,
Processed: true,
}
// TODO: uncomment when all block gaps are filled
// expectedBlockViolatingUniqueIndex := &blocktx_api.Block{
// Hash: blockHashViolating[:],
// PreviousHash: blockHash1[:],
// MerkleRoot: merkleRoot[:],
// Height: 100,
// Status: blocktx_api.Status_LONGEST,
// }
expectedBlockViolatingUniqueIndex := &blocktx_api.Block{
Hash: blockHashViolating[:],
PreviousHash: blockHash1[:],
MerkleRoot: merkleRoot[:],
Height: 100,
Status: blocktx_api.Status_LONGEST,
}
expectedBlockOverrideStatus := &blocktx_api.Block{
Hash: blockHash2[:],
PreviousHash: blockHash1[:],
Expand All @@ -178,11 +176,10 @@ func TestPostgresDB(t *testing.T) {
require.NoError(t, err)
require.Equal(t, expectedBlock, actualBlockResp)

// TODO: uncomment when all block gaps are filled
// when
// _, err = postgresDB.UpsertBlock(ctx, expectedBlockViolatingUniqueIndex)
_, err = postgresDB.UpsertBlock(ctx, expectedBlockViolatingUniqueIndex)
// then
// require.ErrorIs(t, err, store.ErrFailedToInsertBlock)
require.ErrorIs(t, err, store.ErrFailedToInsertBlock)

// when
id, err = postgresDB.UpsertBlock(ctx, expectedBlockOverrideStatus)
Expand Down Expand Up @@ -394,11 +391,10 @@ func TestPostgresDB(t *testing.T) {
{Hash: hash4Stale[:], Status: blocktx_api.Status_LONGEST},
}

// TODO: uncomment when all block gaps are filled
// blockStatusUpdatesViolating := []store.BlockStatusUpdate{
// // there is already a LONGEST block at that height
// {Hash: hash1Longest[:], Status: blocktx_api.Status_LONGEST},
// }
blockStatusUpdatesViolating := []store.BlockStatusUpdate{
// there is already a LONGEST block at that height
{Hash: hash1Longest[:], Status: blocktx_api.Status_LONGEST},
}

// when
err := postgresDB.UpdateBlocksStatuses(ctx, blockStatusUpdates)
Expand All @@ -421,10 +417,9 @@ func TestPostgresDB(t *testing.T) {
require.NoError(t, err)
require.Equal(t, blocktx_api.Status_LONGEST, stale4.Status)

// TODO: uncomment when all block gaps are filled
// when
// err = postgresDB.UpdateBlocksStatuses(ctx, blockStatusUpdatesViolating)
// require.ErrorIs(t, err, store.ErrFailedToUpdateBlockStatuses)
err = postgresDB.UpdateBlocksStatuses(ctx, blockStatusUpdatesViolating)
require.ErrorIs(t, err, store.ErrFailedToUpdateBlockStatuses)
})

t.Run("get mined txs", func(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions internal/blocktx/store/postgresql/upsert_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ func (p *PostgreSQL) UpsertBlock(ctx context.Context, block *blocktx_api.Block)
}()

// This query will insert a block ONLY if one of the 3 conditions is met:
// 1. Block being inserted is `ORPHANED` and there's no previous block in the database
// 1. Block being inserted is `ORPHANED` or `LONGEST` and there's no previous block in the database
// 2. The block being inserted has the same status as its previous block
// 3. The block being inserted has status `STALE` but the previous block was `LONGEST`
// Any other situation would mean an error in block processing
// (probably because of another block being inserted by other blocktx instance at the same time)
// (probably because of another block which is being inserted by another blocktx instance at the same time)
// and requires the block to be received and processed again.
qInsert := `
INSERT INTO blocktx.blocks (hash, prevhash, merkleroot, height, status, chainwork, is_longest)
Expand Down
Loading

0 comments on commit ac44317

Please sign in to comment.