Skip to content

Commit

Permalink
feat: re-broadcast stale txs
Browse files Browse the repository at this point in the history
  • Loading branch information
kuba-4chain committed Dec 23, 2024
1 parent 1477947 commit 8e01df1
Show file tree
Hide file tree
Showing 11 changed files with 130 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,7 @@
txid: 1000005
merkle_path: merkle-path-5
inserted_at: 2023-12-10 14:00:00
- blockid: 1007 # in ORPHANED block
txid: 1000007
merkle_path: merkle-path-7
inserted_at: 2023-12-10 14:00:00
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
status: 10
is_longest: true
chainwork: '12301577519373468' # Higher chainwork
# Competing
- inserted_at: 2023-12-15 14:50:00
id: 1005
hash: 0x76404890880cb36ce68100abb05b3a958e17c0ed274d5c0a0000000000000000
Expand All @@ -46,6 +47,7 @@
status: 20 # STALE
is_longest: false
chainwork: '62209952899966'
# Orphans
- inserted_at: 2023-12-15 14:50:00
id: 1006
hash: 0x000000000000000003b15d668b54c4b91ae81a86298ee209d9f39fd7a769bcde
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,7 @@
hash: 0xece2b7e40d98749c03c551b783420d6e3fdc3c958244bbf275437839585829a6
inserted_at: 2023-12-10 14:00:00
is_registered: true
- id: 1000007
hash: 0x71fbb8fb5c0f978e3c221bc6ac235587f3c26fa10e231b54fce972d4a5c30e5e
inserted_at: 2023-12-10 14:00:00
is_registered: true
29 changes: 24 additions & 5 deletions internal/blocktx/integration_test/reorg_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ package integrationtest
// 1. Blocks at heights 822014-822017 (LONGEST), 822018-822020 (ORPHANED) and 822022-822023 (ORPHANED) are added to db from fixtures
// 2. A hardcoded msg with competing block at height 822015 is being sent through the mocked PeerHandler
// 3. This block has a chainwork lower than the current tip of chain - becomes STALE
// 4. Registered transactions from this block are ignored
// 4. Registered transactions from this block that are not in the longest chain are published with blockstatus = STALE
// 5. Next competing block, at height 822016 is being sent through the mocked PeerHandler
// 6. This block has a greater chainwork than the current tip of longest chain - it becomes LONGEST despite not being the highest
// 7. Verification of reorg - checking if statuses are correctly switched
Expand Down Expand Up @@ -179,10 +179,18 @@ func TestReorg(t *testing.T) {
verifyBlock(t, store, blockHash822015Fork, 822015, blocktx_api.Status_STALE)
verifyBlock(t, store, blockHash822015, 822015, blocktx_api.Status_LONGEST)

expectedTxs := []*blocktx_api.TransactionBlock{
{
BlockHash: blockHash[:],
BlockHeight: 822015,
TransactionHash: testutils.RevChainhash(t, txhash822015)[:],
BlockStatus: blocktx_api.Status_STALE,
},
}

publishedTxs := getPublishedTxs(publishedTxsCh)

// verify the no transaction was published to metamorph
require.Len(t, publishedTxs, 0)
verifyTxs(t, expectedTxs, publishedTxs)
})

t.Run("reorg", func(t *testing.T) {
Expand Down Expand Up @@ -295,6 +303,8 @@ func TestReorg(t *testing.T) {
blockHash822021 = "d46bf0a189927b62c8ff785d393a545093ca01af159aed771a8d94749f06c060"
blockHash822022Orphan = "0000000000000000059d6add76e3ddb8ec4f5ffd6efecd4c8b8c577bd32aed6c"
blockHash822023Orphan = "0000000000000000082131979a4e25a5101912a5f8461e18f306d23e158161cd"

txhash822019 = "71fbb8fb5c0f978e3c221bc6ac235587f3c26fa10e231b54fce972d4a5c30e5e"
)

//blockHash := testutils.RevChainhash(t, blockHash822021)
Expand Down Expand Up @@ -336,10 +346,19 @@ func TestReorg(t *testing.T) {
verifyBlock(t, store, blockHash822022Orphan, 822022, blocktx_api.Status_ORPHANED)
verifyBlock(t, store, blockHash822023Orphan, 822023, blocktx_api.Status_ORPHANED)

bh := testutils.RevChainhash(t, blockHash822019Orphan)
expectedTxs := []*blocktx_api.TransactionBlock{
{
BlockHash: bh[:],
BlockHeight: 822019,
TransactionHash: testutils.RevChainhash(t, txhash822019)[:],
BlockStatus: blocktx_api.Status_STALE,
},
}

publishedTxs := getPublishedTxs(publishedTxsCh)

// verify no transaction was published
require.Len(t, publishedTxs, 0)
verifyTxs(t, expectedTxs, publishedTxs)
})

t.Run("reorg orphans", func(t *testing.T) {
Expand Down
26 changes: 20 additions & 6 deletions internal/blocktx/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ func (p *Processor) longestTipExists(ctx context.Context) (bool, error) {
return true, nil
}

func (p *Processor) getRegisteredTransactions(ctx context.Context, blocks []*blocktx_api.Block) (txsToPublish []store.TransactionBlock, ok bool) {
func (p *Processor) getRegisteredTransactions(ctx context.Context, blocks []*blocktx_api.Block) (txs []store.TransactionBlock, ok bool) {
var err error
ctx, span := tracing.StartTracing(ctx, "getRegisteredTransactions", p.tracingEnabled, p.tracingAttributes...)
defer func() {
Expand All @@ -637,14 +637,14 @@ func (p *Processor) getRegisteredTransactions(ctx context.Context, blocks []*blo
blockHashes[i] = b.Hash
}

txsToPublish, err = p.store.GetRegisteredTxsByBlockHashes(ctx, blockHashes)
txs, err = p.store.GetRegisteredTxsByBlockHashes(ctx, blockHashes)
if err != nil {
block := blocks[len(blocks)-1]
p.logger.Error("unable to get registered transactions", slog.String("hash", getHashStringNoErr(block.Hash)), slog.Uint64("height", block.Height), slog.String("err", err.Error()))
return nil, false
}

return txsToPublish, true
return txs, true
}

func (p *Processor) insertBlockAndStoreTransactions(ctx context.Context, incomingBlock *blocktx_api.Block, txHashes []*chainhash.Hash, merkleRoot chainhash.Hash) (err error) {
Expand Down Expand Up @@ -749,7 +749,7 @@ func (p *Processor) storeTransactions(ctx context.Context, blockID uint64, block
return nil
}

func (p *Processor) handleStaleBlock(ctx context.Context, block *blocktx_api.Block) (longestTxs, staleTxs []store.TransactionBlock, ok bool) {
func (p *Processor) handleStaleBlock(ctx context.Context, block *blocktx_api.Block, orphans ...*blocktx_api.Block) (longestTxs, staleTxs []store.TransactionBlock, ok bool) {
var err error
ctx, span := tracing.StartTracing(ctx, "handleStaleBlock", p.tracingEnabled, p.tracingAttributes...)
defer func() {
Expand Down Expand Up @@ -787,7 +787,21 @@ func (p *Processor) handleStaleBlock(ctx context.Context, block *blocktx_api.Blo
return longestTxs, staleTxs, true
}

return nil, nil, true
if len(orphans) > 0 {
staleTxs, ok = p.getRegisteredTransactions(ctx, orphans)
} else {
staleTxs, ok = p.getRegisteredTransactions(ctx, staleBlocks)
}
if !ok {
return nil, nil, false
}

longestTxs, ok = p.getRegisteredTransactions(ctx, longestBlocks)
if !ok {
return nil, nil, false
}

return nil, exclusiveRightTxs(longestTxs, staleTxs), true
}

func (p *Processor) performReorg(ctx context.Context, staleBlocks []*blocktx_api.Block, longestBlocks []*blocktx_api.Block) (longestTxs, staleTxs []store.TransactionBlock, err error) {
Expand Down Expand Up @@ -869,7 +883,7 @@ func (p *Processor) handleOrphans(ctx context.Context, block *blocktx_api.Block)
}

block.Status = blocktx_api.Status_STALE
return p.handleStaleBlock(ctx, block)
return p.handleStaleBlock(ctx, block, orphans...)
}

if ancestor.Status == blocktx_api.Status_LONGEST {
Expand Down
24 changes: 24 additions & 0 deletions internal/metamorph/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,30 @@ func (p *Processor) updateMined(ctx context.Context, txsBlocks []*blocktx_api.Tr

p.delTxFromCache(data.Hash)
}

p.rebroadcastStaleTxs(ctx, txsBlocks)
}

func (p *Processor) rebroadcastStaleTxs(ctx context.Context, txsBlocks []*blocktx_api.TransactionBlock) {
ctx, span := tracing.StartTracing(ctx, "rebroadcastStaleTxs", p.tracingEnabled, p.tracingAttributes...)
defer tracing.EndTracing(span, nil)

for _, tx := range txsBlocks {
if tx.BlockStatus == blocktx_api.Status_STALE {
txHash, err := chainhash.NewHash(tx.TransactionHash)
if err != nil {
p.logger.Warn("error parsing transaction hash")
continue
}

p.logger.Debug("Re-announcing stale tx", slog.String("hash", txHash.String()))

peers := p.pm.AnnounceTransaction(txHash, nil)
if len(peers) == 0 {
p.logger.Warn("transaction was not announced to any peer during rebroadcast", slog.String("hash", txHash.String()))
}
}
}
}

func (p *Processor) StartProcessSubmittedTxs() {
Expand Down
11 changes: 4 additions & 7 deletions internal/metamorph/processor_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package metamorph
import (
"encoding/json"
"errors"
"github.com/libsv/go-p2p/chaincfg/chainhash"
"log/slog"

"github.com/libsv/go-p2p/chaincfg/chainhash"

"github.com/bitcoin-sv/arc/internal/cache"
"github.com/bitcoin-sv/arc/internal/callbacker/callbacker_api"
"github.com/bitcoin-sv/arc/internal/metamorph/metamorph_api"
Expand Down Expand Up @@ -116,15 +117,11 @@ func (p *Processor) getStatusUpdateCount() (int, error) {
}

func shouldUpdateCompetingTxs(new, found store.UpdateStatus) bool {
if new.Status >= found.Status && !unorderedEqual(new.CompetingTxs, found.CompetingTxs) {
return true
}

return false
return new.Status >= found.Status && !unorderedEqual(new.CompetingTxs, found.CompetingTxs)
}

func shouldUpdateStatus(new, found store.UpdateStatus) bool {
return new.Status > found.Status
return new.Status > found.Status || found.Status == metamorph_api.Status_MINED_IN_STALE_BLOCK
}

// unorderedEqual checks if two string slices contain
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,8 @@
status: 70
stored_at: 2023-10-01 14:00:00
last_submitted_at: 2023-10-01 14:00:00
- hash: 0xfe3ae78226a8a1c78039a7d10590a42dc4b691acaa8cbc6831b464da49e8ba08
locked_by: metamorph-1
status: 115
stored_at: 2023-10-01 14:00:00
last_submitted_at: 2023-10-01 14:00:00
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,8 @@
status: 70
stored_at: 2023-10-01 14:00:00
last_submitted_at: 2023-10-01 14:00:00
- hash: 0xfe3ae78226a8a1c78039a7d10590a42dc4b691acaa8cbc6831b464da49e8ba08
locked_by: metamorph-1
status: 115
stored_at: 2023-10-01 14:00:00
last_submitted_at: 2023-10-01 14:00:00
25 changes: 20 additions & 5 deletions internal/metamorph/store/postgresql/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,8 +662,8 @@ func (p *PostgreSQL) UpdateStatusBulk(ctx context.Context, updates []store.Updat
SELECT t.hash, t.status, t.reject_reason, t.history_update, t.timestamp
FROM UNNEST($2::BYTEA[], $3::INT[], $4::TEXT[], $5::JSONB[], $6::TIMESTAMP WITH TIME ZONE[]) AS t(hash, status, reject_reason, history_update, timestamp)
) AS bulk_query
WHERE metamorph.transactions.hash = bulk_query.hash
AND metamorph.transactions.status < bulk_query.status
WHERE metamorph.transactions.hash=bulk_query.hash
AND (metamorph.transactions.status < bulk_query.status OR metamorph.transactions.status=$7)
RETURNING metamorph.transactions.stored_at
,metamorph.transactions.hash
,metamorph.transactions.status
Expand Down Expand Up @@ -695,7 +695,15 @@ func (p *PostgreSQL) UpdateStatusBulk(ctx context.Context, updates []store.Updat
return nil, err
}

rows, err := tx.QueryContext(ctx, qBulk, p.now(), pq.Array(txHashes), pq.Array(statuses), pq.Array(rejectReasons), pq.Array(statusHistories), pq.Array(timestamps))
rows, err := tx.QueryContext(ctx, qBulk,
p.now(),
pq.Array(txHashes),
pq.Array(statuses),
pq.Array(rejectReasons),
pq.Array(statusHistories),
pq.Array(timestamps),
metamorph_api.Status_MINED_IN_STALE_BLOCK,
)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -853,7 +861,7 @@ func (p *PostgreSQL) UpdateDoubleSpend(ctx context.Context, updates []store.Upda
AS t(hash, status, reject_reason, competing_txs)
) AS bulk_query
WHERE metamorph.transactions.hash=bulk_query.hash
AND metamorph.transactions.status <= bulk_query.status
AND (metamorph.transactions.status <= bulk_query.status OR metamorph.transactions.status=$6)
AND (metamorph.transactions.competing_txs IS NULL
OR LENGTH(metamorph.transactions.competing_txs) < LENGTH(bulk_query.competing_txs))
RETURNING metamorph.transactions.stored_at
Expand Down Expand Up @@ -916,7 +924,14 @@ func (p *PostgreSQL) UpdateDoubleSpend(ctx context.Context, updates []store.Upda
}
}

rows, err = tx.QueryContext(ctx, qBulk, p.now(), pq.Array(txHashes), pq.Array(statuses), pq.Array(rejectReasons), pq.Array(competingTxs))
rows, err = tx.QueryContext(ctx, qBulk,
p.now(),
pq.Array(txHashes),
pq.Array(statuses),
pq.Array(rejectReasons),
pq.Array(competingTxs),
metamorph_api.Status_MINED_IN_STALE_BLOCK,
)
if err != nil {
if rollbackErr := tx.Rollback(); rollbackErr != nil {
return nil, errors.Join(err, fmt.Errorf("failed to rollback: %v", rollbackErr))
Expand Down
20 changes: 18 additions & 2 deletions internal/metamorph/store/postgresql/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,10 @@ func TestPostgresDB(t *testing.T) {
Hash: *testutils.RevChainhash(t, "7809b730cbe7bb723f299a4e481fb5165f31175876392a54cde85569a18cc75f"), // update not expected - old status > new status
Status: metamorph_api.Status_SENT_TO_NETWORK,
},
{
Hash: *testutils.RevChainhash(t, "fe3ae78226a8a1c78039a7d10590a42dc4b691acaa8cbc6831b464da49e8ba08"), // update expected - old status = MINED_IN_STALE_BLOCK
Status: metamorph_api.Status_SEEN_ON_NETWORK,
},
{
Hash: *testutils.RevChainhash(t, "3ce1e0c6cbbbe2118c3f80d2e6899d2d487f319ef0923feb61f3d26335b2225c"), // update not expected - hash non-existent in db
Status: metamorph_api.Status_ANNOUNCED_TO_NETWORK,
Expand All @@ -367,7 +371,7 @@ func TestPostgresDB(t *testing.T) {
Status: metamorph_api.Status_ANNOUNCED_TO_NETWORK,
},
}
updatedStatuses := 3
updatedStatuses := 4

statusUpdates, err := postgresDB.UpdateStatusBulk(ctx, updates)
require.NoError(t, err)
Expand All @@ -383,6 +387,9 @@ func TestPostgresDB(t *testing.T) {
require.Equal(t, metamorph_api.Status_SEEN_ON_NETWORK, statusUpdates[2].Status)
require.Equal(t, *testutils.RevChainhash(t, "ee76f5b746893d3e6ae6a14a15e464704f4ebd601537820933789740acdcf6aa"), *statusUpdates[2].Hash)

require.Equal(t, metamorph_api.Status_SEEN_ON_NETWORK, statusUpdates[3].Status)
require.Equal(t, *testutils.RevChainhash(t, "fe3ae78226a8a1c78039a7d10590a42dc4b691acaa8cbc6831b464da49e8ba08"), *statusUpdates[3].Hash)

returnedDataRequested, err := postgresDB.Get(ctx, testutils.RevChainhash(t, "7809b730cbe7bb723f299a4e481fb5165f31175876392a54cde85569a18cc75f")[:])
require.NoError(t, err)
require.Equal(t, metamorph_api.Status_ACCEPTED_BY_NETWORK, returnedDataRequested.Status)
Expand Down Expand Up @@ -423,6 +430,11 @@ func TestPostgresDB(t *testing.T) {
CompetingTxs: []string{"1234"},
Error: errors.New("double spend attempted"),
},
{
Hash: *testutils.RevChainhash(t, "fe3ae78226a8a1c78039a7d10590a42dc4b691acaa8cbc6831b464da49e8ba08"), // update expected - old status = MINED_IN_STALE_BLOCK
Status: metamorph_api.Status_DOUBLE_SPEND_ATTEMPTED,
CompetingTxs: []string{"1234"},
},
{
Hash: *testutils.RevChainhash(t, "3ce1e0c6cbbbe2118c3f80d2e6899d2d487f319ef0923feb61f3d26335b2225c"), // update not expected - hash non-existent in db
Status: metamorph_api.Status_DOUBLE_SPEND_ATTEMPTED,
Expand All @@ -434,7 +446,7 @@ func TestPostgresDB(t *testing.T) {
CompetingTxs: []string{"1234"},
},
}
updatedStatuses := 4
updatedStatuses := 5

statusUpdates, err := postgresDB.UpdateDoubleSpend(ctx, updates)
require.NoError(t, err)
Expand All @@ -457,6 +469,10 @@ func TestPostgresDB(t *testing.T) {
require.Equal(t, []string{"1234"}, statusUpdates[3].CompetingTxs)
require.Equal(t, "double spend attempted", statusUpdates[3].RejectReason)

require.Equal(t, metamorph_api.Status_DOUBLE_SPEND_ATTEMPTED, statusUpdates[4].Status)
require.Equal(t, *testutils.RevChainhash(t, "fe3ae78226a8a1c78039a7d10590a42dc4b691acaa8cbc6831b464da49e8ba08"), *statusUpdates[4].Hash)
require.Equal(t, []string{"1234"}, statusUpdates[4].CompetingTxs)

statusUpdates, err = postgresDB.UpdateDoubleSpend(ctx, updates)
require.NoError(t, err)
require.Len(t, statusUpdates, 0)
Expand Down

0 comments on commit 8e01df1

Please sign in to comment.