From 88103dbf860c811690b680dcdfc53b3feb51183c Mon Sep 17 00:00:00 2001 From: Kuba <127198012+kuba-4chain@users.noreply.github.com> Date: Wed, 20 Nov 2024 16:41:18 +0100 Subject: [PATCH] feat(reorg): simplification (#651) --- internal/blocktx/chain.go | 29 - .../fixtures/blocktx.blocks.yaml | 24 +- .../reorg_integration_test.go | 45 +- internal/blocktx/processor.go | 471 +++--- internal/blocktx/processor_helpers.go | 69 +- internal/blocktx/processor_helpers_test.go | 43 +- internal/blocktx/processor_test.go | 204 +-- .../blocktx/store/mocks/blocktx_db_tx_mock.go | 1269 ----------------- .../blocktx/store/mocks/blocktx_store_mock.go | 186 +-- .../get_orphaned_chain/blocktx.blocks.yaml | 41 +- .../blocktx/store/postgresql/get_block.go | 2 +- .../store/postgresql/get_orphaned_chain.go | 38 +- .../store/postgresql/get_stale_chain.go | 1 + internal/blocktx/store/postgresql/postgres.go | 71 +- .../blocktx/store/postgresql/postgres_test.go | 55 +- .../store/postgresql/update_block_statuses.go | 27 +- internal/blocktx/store/store.go | 13 +- internal/blocktx/store/store_mocks.go | 1 - 18 files changed, 534 insertions(+), 2055 deletions(-) delete mode 100644 internal/blocktx/chain.go delete mode 100644 internal/blocktx/store/mocks/blocktx_db_tx_mock.go diff --git a/internal/blocktx/chain.go b/internal/blocktx/chain.go deleted file mode 100644 index 3bce4e301..000000000 --- a/internal/blocktx/chain.go +++ /dev/null @@ -1,29 +0,0 @@ -package blocktx - -import ( - "errors" - - "github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api" -) - -var ErrEmptyChain = errors.New("empty chain of blocks") - -type chain []*blocktx_api.Block - -func (c chain) getTip() (*blocktx_api.Block, error) { - if len(c) == 0 { - return nil, ErrEmptyChain - } - - return c[len(c)-1], nil -} - -func (c chain) getHashes() [][]byte { - hashes := make([][]byte, len(c)) - - for i, b := range c { - hashes[i] = b.Hash - } - - return hashes -} diff --git a/internal/blocktx/integration_test/fixtures/blocktx.blocks.yaml b/internal/blocktx/integration_test/fixtures/blocktx.blocks.yaml index 920da9cbc..1e913f285 100644 --- a/internal/blocktx/integration_test/fixtures/blocktx.blocks.yaml +++ b/internal/blocktx/integration_test/fixtures/blocktx.blocks.yaml @@ -41,20 +41,31 @@ merkleroot: 0xc458aa382364e216c9c0533175ec8579a544c750ca181b18296e784d1dc53085 height: 822017 size: 8630000 + processed_at: 2023-12-15 14:40:00 tx_count: 36724 status: 10 is_longest: true chainwork: '62209952899966' - -# gap - +- inserted_at: 2023-12-15 14:50:00 + id: 10052 + hash: 0x000000000000000003b15d668b54c4b91ae81a86298ee209d9f39fd7a769bcde + prevhash: 0x76404890880cb36ce68100abb05b3a958e17c0ed274d5c0a0000000000000000 + merkleroot: 0xde0753d9ce6f92e340843cbfdd11e58beff8c578956ecdec4c461b018a26b8a9 + height: 822018 + size: 8630000 + processed_at: 2023-12-15 14:40:00 + tx_count: 36724 + status: 30 # ORPHANED + is_longest: false + chainwork: '62209952899966' - inserted_at: 2023-12-15 14:50:00 id: 1005 hash: 0x00000000000000000364332e1bbd61dc928141b9469c5daea26a4b506efc9656 - prevhash: 0x212a7598a62295f1a520ef525a34f657bc636d9da9bda74acdf6f051cd84c353 + prevhash: 0x000000000000000003b15d668b54c4b91ae81a86298ee209d9f39fd7a769bcde merkleroot: 0x51f33784f6d54f1d6414fa4d8b8d6904215cb16a3fa0a8b1fe02e456a90544d4 height: 822019 size: 8630000 + processed_at: 2023-12-15 14:40:00 tx_count: 36724 status: 30 # ORPHANED is_longest: false @@ -66,6 +77,7 @@ merkleroot: 0xc458aa382364e216c9c0533175ec8579a544c750ca181b18296e784d1dc53085 height: 822020 size: 8630000 + processed_at: 2023-12-15 14:40:00 tx_count: 36724 status: 30 # ORPHANED is_longest: false @@ -76,10 +88,11 @@ - inserted_at: 2023-12-15 14:50:00 id: 1007 hash: 0x0000000000000000059d6add76e3ddb8ec4f5ffd6efecd4c8b8c577bd32aed6c - prevhash: 0x743c7dc491ae5fddd37ebf63058f9574b4db9f6a89f483a4baec31820e5df61d + prevhash: 0xd46bf0a189927b62c8ff785d393a545093ca01af159aed771a8d94749f06c060 merkleroot: 0xda71199f8ed9203d8a765595e6c030a22e5ed8330b1abb467a82c97d7d21d512 height: 822022 size: 8630000 + processed_at: 2023-12-15 14:40:00 tx_count: 36724 status: 30 # ORPHANED is_longest: false @@ -91,6 +104,7 @@ merkleroot: 0x8e3177a33d6a87785b7104f20ca345e1713ae11ec2723a41028efddabebb861b height: 822023 size: 8630000 + processed_at: 2023-12-15 14:40:00 tx_count: 36724 status: 30 # ORPHANED is_longest: false diff --git a/internal/blocktx/integration_test/reorg_integration_test.go b/internal/blocktx/integration_test/reorg_integration_test.go index e8a695fba..aa0241aa4 100644 --- a/internal/blocktx/integration_test/reorg_integration_test.go +++ b/internal/blocktx/integration_test/reorg_integration_test.go @@ -7,10 +7,10 @@ package integrationtest // Message queue sending txs to metamorph - mocked // // Flow of this test: -// 1. Blocks at heights 822014-822017, 822019-822020 and 822022-822023 are added to db from fixtures +// 1. Blocks at heights 822014-822017 (LONGEST), 822018-822020 (ORPHANED) and 822022-822023 (ORPHANED) are added to db from fixtures // 2. A hardcoded msg with competing block at height 822015 is being sent through the mocked PeerHandler // 3. This block has a chainwork lower than the current tip of chain - becomes STALE -// 4. Registered transactions from this block that are not in the longest chain are published to metamorph message queue with blockstatus = STALE +// 4. Registered transactions from this block are ignored // 5. Next competing block, at height 822016 is being send through the mocked PeerHandler // 6. This block has a greater chainwork than the current tip of longest chain - it becomes LONGEST despite not being the highest // 7. Verification of reorg - checking if statuses are correctly switched @@ -18,13 +18,13 @@ package integrationtest // - transactions from the stale chain becoming the longest are published // - transactions that were previously in the longest chain are published with udpated block data // - transactions that were previously in the longest chain, but are not in the stale chain are published with blockstatus = STALE -// 9. A new block at height 822018 is being sent through the mocked PeerHandler -// 10. This block is extending the previously LONGEST but now STALE chain and finds orphaned chain at heights 822019, 822020 -// 11. The tip of the orphaned chain does not have a greater chainwork than the current longest chain - entire orphaned chain becomes STALE -// 12. A new block at height 822021 is being sent through the mocked PeerHandler -// 13. This block extends the STALE chain and finds orphaned chain at height 822022, 822023 -// 14. The tip of the orphaned chain has a greater chainwork than the current tip of longest chain -// - entire STALE chain at heights 822015 - 822023 becomes LONGEST +// 9. A new block at height 822021 is being sent through the mocked PeerHandler +// 10. This block is extending the orphaned chain and finds that it's connected to the stale chain - orphans get updated to STALE +// 11. The new stale chain does not have a greater chainwork than the current longest chain - entire orphaned chain becomes STALE +// 12. A new block at height 822024 is being sent through the mocked PeerHandler +// 13. This block extends the orphaned chain and finds that it's connected to the stale chain - orphans get updated to STALE +// 14. The new stale chain has a greater chainwork than the current longest chain +// - entire STALE chain at heights 822015 - 822024 becomes LONGEST // - entire LONGEST chain at height 822015 - 822016 becomes STALE // 15. Verification of reorg - checking if statuses are correctly switched (for blocks and for transactions) @@ -113,12 +113,13 @@ const ( blockHash822015Fork = "82471bbf045ab13825a245b37de71d77ec12513b37e2524ec11551d18c19f7c3" blockHash822016Fork = "032c3688bc7536b2d787f3a196b1145a09bf33183cd1448ff6b1a9dfbb022db8" - blockHash822018 = "212a7598a62295f1a520ef525a34f657bc636d9da9bda74acdf6f051cd84c353" + blockHash822018Orphan = "000000000000000003b15d668b54c4b91ae81a86298ee209d9f39fd7a769bcde" blockHash822019Orphan = "00000000000000000364332e1bbd61dc928141b9469c5daea26a4b506efc9656" blockHash822020Orphan = "00000000000000000a5c4d27edc0178e953a5bb0ab0081e66cb30c8890484076" - blockHash822021 = "743c7dc491ae5fddd37ebf63058f9574b4db9f6a89f483a4baec31820e5df61d" + blockHash822021 = "d46bf0a189927b62c8ff785d393a545093ca01af159aed771a8d94749f06c060" blockHash822022Orphan = "0000000000000000059d6add76e3ddb8ec4f5ffd6efecd4c8b8c577bd32aed6c" blockHash822023Orphan = "0000000000000000082131979a4e25a5101912a5f8461e18f306d23e158161cd" + blockHash822024 = "5d60cfea9a7ef96554768150716788e9643eaafd5a1979636777a6a5835b07c6" txhash822015 = "cd3d2f97dfc0cdb6a07ec4b72df5e1794c9553ff2f62d90ed4add047e8088853" txhash822015Competing = "b16cea53fc823e146fbb9ae4ad3124f7c273f30562585ad6e4831495d609f430" @@ -314,17 +315,17 @@ func testHandleReorg(t *testing.T, peerHandler *blocktx.PeerHandler, store *post func testHandleStaleOrphans(t *testing.T, peerHandler *blocktx.PeerHandler, store *postgresql.PostgreSQL) { txHash := testutils.RevChainhash(t, "de0753d9ce6f92e340843cbfdd11e58beff8c578956ecdec4c461b018a26b8a9") merkleRoot := testutils.RevChainhash(t, "de0753d9ce6f92e340843cbfdd11e58beff8c578956ecdec4c461b018a26b8a9") - prevhash := testutils.RevChainhash(t, blockHash822017) + prevhash := testutils.RevChainhash(t, blockHash822020Orphan) // should become STALE blockMessage := &p2p.BlockMessage{ Header: &wire.BlockHeader{ Version: 541065216, - PrevBlock: *prevhash, // block with status STALE at height 822017 + PrevBlock: *prevhash, // block with status ORPHANED at height 822020 - connected to STALE chain MerkleRoot: *merkleRoot, Bits: 0x1d00ffff, // chainwork: "4295032833" lower than the competing chain }, - Height: uint64(822018), + Height: uint64(822021), TransactionHashes: []*chainhash.Hash{txHash}, } @@ -334,9 +335,10 @@ func testHandleStaleOrphans(t *testing.T, peerHandler *blocktx.PeerHandler, stor time.Sleep(1 * time.Second) // verify that the block and orphans have STALE status - verifyBlock(t, store, blockHash822018, 822018, blocktx_api.Status_STALE) + verifyBlock(t, store, blockHash822018Orphan, 822018, blocktx_api.Status_STALE) verifyBlock(t, store, blockHash822019Orphan, 822019, blocktx_api.Status_STALE) verifyBlock(t, store, blockHash822020Orphan, 822020, blocktx_api.Status_STALE) + verifyBlock(t, store, blockHash822021, 822021, blocktx_api.Status_STALE) // verify that the blocks after the next gap are still orphans verifyBlock(t, store, blockHash822022Orphan, 822022, blocktx_api.Status_ORPHANED) @@ -346,19 +348,19 @@ func testHandleStaleOrphans(t *testing.T, peerHandler *blocktx.PeerHandler, stor func testHandleOrphansReorg(t *testing.T, peerHandler *blocktx.PeerHandler, store *postgresql.PostgreSQL) []*blocktx_api.TransactionBlock { txHash := testutils.RevChainhash(t, "3e15f823a7de25c26ce9001d4814a6f0ebc915a1ca4f1ba9cfac720bd941c39c") merkleRoot := testutils.RevChainhash(t, "3e15f823a7de25c26ce9001d4814a6f0ebc915a1ca4f1ba9cfac720bd941c39c") - prevhash := testutils.RevChainhash(t, blockHash822020Orphan) + prevhash := testutils.RevChainhash(t, blockHash822023Orphan) // should become LONGEST // reorg should happen blockMessage := &p2p.BlockMessage{ Header: &wire.BlockHeader{ Version: 541065216, - PrevBlock: *prevhash, // block with status STALE at height 822020 + PrevBlock: *prevhash, // block with status ORPHANED at height 822023 - connected to STALE chain MerkleRoot: *merkleRoot, - Bits: 0x1d00ffff, // chainwork: "4295032833" lower than the competing chain - // but the sum of orphan chain has a higher chainwork and should cause a reorg + Bits: 0x1d00ffff, // chainwork: "4295032833" + // the sum of orphan chain has a higher chainwork and should cause a reorg }, - Height: uint64(822021), + Height: uint64(822024), TransactionHashes: []*chainhash.Hash{txHash}, } @@ -372,12 +374,13 @@ func testHandleOrphansReorg(t *testing.T, peerHandler *blocktx.PeerHandler, stor verifyBlock(t, store, blockHash822015, 822015, blocktx_api.Status_LONGEST) verifyBlock(t, store, blockHash822016, 822016, blocktx_api.Status_LONGEST) verifyBlock(t, store, blockHash822017, 822017, blocktx_api.Status_LONGEST) - verifyBlock(t, store, blockHash822018, 822018, blocktx_api.Status_LONGEST) + verifyBlock(t, store, blockHash822018Orphan, 822018, blocktx_api.Status_LONGEST) verifyBlock(t, store, blockHash822019Orphan, 822019, blocktx_api.Status_LONGEST) verifyBlock(t, store, blockHash822020Orphan, 822020, blocktx_api.Status_LONGEST) verifyBlock(t, store, blockHash822021, 822021, blocktx_api.Status_LONGEST) verifyBlock(t, store, blockHash822022Orphan, 822022, blocktx_api.Status_LONGEST) verifyBlock(t, store, blockHash822023Orphan, 822023, blocktx_api.Status_LONGEST) + verifyBlock(t, store, blockHash822024, 822024, blocktx_api.Status_LONGEST) verifyBlock(t, store, blockHash822015Fork, 822015, blocktx_api.Status_STALE) verifyBlock(t, store, blockHash822016Fork, 822016, blocktx_api.Status_STALE) diff --git a/internal/blocktx/processor.go b/internal/blocktx/processor.go index fbab9d373..6e8de6732 100644 --- a/internal/blocktx/processor.go +++ b/internal/blocktx/processor.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "log/slog" - "math/big" "os" "sync" "time" @@ -30,6 +29,8 @@ var ( ErrFailedToGetStringFromBUMPHex = errors.New("failed to get string from bump for tx hash") ErrFailedToParseBlockHash = errors.New("failed to parse block hash") ErrFailedToInsertBlockTransactions = errors.New("failed to insert block transactions") + ErrBlockAlreadyExists = errors.New("block already exists in the database") + ErrUnexpectedBlockStatus = errors.New("unexpected block status") ) const ( @@ -451,71 +452,37 @@ func (p *Processor) processBlock(msg *p2p.BlockMessage) (err error) { blockHash := msg.Header.BlockHash() blockHeight := msg.Height - p.logger.Info("processing incoming block", slog.String("hash", blockHash.String())) - - var chain chain - var competing bool - var err error + p.logger.Info("processing incoming block", slog.String("hash", blockHash.String()), slog.Uint64("height", blockHeight)) // check if we've already processed that block existingBlock, _ := p.store.GetBlock(ctx, &blockHash) if existingBlock != nil && existingBlock.Processed { - // if the block was already processed, check and update - // possible orphan children of that block - chain, competing, err = p.updateOrphans(ctx, existingBlock, competing) - if err != nil { - p.logger.Error("unable to check and update possible orphaned child blocks", slog.String("hash", blockHash.String()), slog.String("err", err.Error())) - return err - } - - if len(chain) == 1 { // this means that no orphans were found - p.logger.Warn("ignoring already existing block", slog.String("hash", blockHash.String())) - return nil - } - } else { - // if the block was not yet processed, proceed normally - chain, competing, err = p.verifyAndInsertBlock(ctx, msg) - if err != nil { - p.logger.Error("unable to verify the longest tip existence in db", slog.String("hash", blockHash.String()), slog.Uint64("height", msg.Height), slog.String("err", err.Error())) - return err - } + p.logger.Warn("ignoring already existing block", slog.String("hash", blockHash.String()), slog.Uint64("height", blockHeight)) + return nil } - chainTip, err := chain.getTip() + block, err := p.verifyAndInsertBlock(ctx, msg) if err != nil { - p.logger.Error("unable to get chain tip", slog.String("hash", blockHash.String()), slog.String("err", err.Error())) return err } - shouldPerformReorg := false - if competing { - hasGreatestChainwork, err := p.hasGreatestChainwork(ctx, chainTip) - if err != nil { - p.logger.Error("unable to get the chain tip to verify chainwork", slog.String("hash", blockHash.String()), slog.Uint64("height", blockHeight), slog.String("err", err.Error())) - return err - } + var txsToPublish []store.TransactionBlock - if hasGreatestChainwork { - p.logger.Info("chain reorg detected", slog.String("hash", blockHash.String()), slog.Uint64("height", blockHeight)) - shouldPerformReorg = true - } + switch block.Status { + case blocktx_api.Status_LONGEST: + txsToPublish, err = p.getRegisteredTransactions(ctx, []*blocktx_api.Block{block}) + case blocktx_api.Status_STALE: + txsToPublish, err = p.handleStaleBlock(ctx, block) + case blocktx_api.Status_ORPHANED: + txsToPublish, err = p.handleOrphans(ctx, block) + default: + return ErrUnexpectedBlockStatus } - txsToPublish := make([]store.TransactionBlock, 0) - - if shouldPerformReorg { - txsToPublish, err = p.performReorg(ctx, chainTip) - if err != nil { - p.logger.Error("unable to perform reorg", slog.String("hash", blockHash.String()), slog.Uint64("height", blockHeight), slog.String("err", err.Error())) - return err - } - } else if chainTip.Status == blocktx_api.Status_LONGEST { - txsToPublish, err = p.store.GetRegisteredTxsByBlockHashes(ctx, chain.getHashes()) - if err != nil { - p.logger.Error("unable to get registered transactions", slog.String("hash", blockHash.String()), slog.Uint64("height", blockHeight), slog.String("err", err.Error())) - return err - } + if err != nil { + // error is already logged in each method above + return err } for _, tx := range txsToPublish { @@ -538,69 +505,95 @@ func (p *Processor) processBlock(msg *p2p.BlockMessage) (err error) { return nil } -func (p *Processor) verifyAndInsertBlock(ctx context.Context, msg *p2p.BlockMessage) (chain, bool, error) { +func (p *Processor) verifyAndInsertBlock(ctx context.Context, msg *p2p.BlockMessage) (*blocktx_api.Block, error) { blockHash := msg.Header.BlockHash() previousBlockHash := msg.Header.PrevBlock + merkleRoot := msg.Header.MerkleRoot + + incomingBlock := &blocktx_api.Block{ + Hash: blockHash[:], + PreviousHash: previousBlockHash[:], + MerkleRoot: merkleRoot[:], + Height: msg.Height, + Chainwork: calculateChainwork(msg.Header.Bits).String(), + } + + err := p.assignBlockStatus(ctx, incomingBlock, previousBlockHash) + if err != nil { + p.logger.Error("unable to assign block status", slog.String("hash", blockHash.String()), slog.Uint64("height", incomingBlock.Height), slog.String("err", err.Error())) + return nil, err + } + + p.logger.Info("Inserting block", slog.String("hash", blockHash.String()), slog.Uint64("height", incomingBlock.Height), slog.String("status", incomingBlock.Status.String())) - prevBlock, err := p.getPrevBlock(ctx, &previousBlockHash) + err = p.insertBlockAndStoreTransactions(ctx, incomingBlock, msg.TransactionHashes, msg.Header.MerkleRoot) if err != nil { - p.logger.Error("unable to get previous block from db", slog.String("hash", blockHash.String()), slog.Uint64("height", msg.Height), slog.String("prevHash", previousBlockHash.String()), slog.String("err", err.Error())) - return nil, false, err + p.logger.Error("unable to insert block and store its transactions", slog.String("hash", blockHash.String()), slog.Uint64("height", incomingBlock.Height), slog.String("err", err.Error())) + return nil, err } - longestTipExists := true + return incomingBlock, nil +} + +func (p *Processor) assignBlockStatus(ctx context.Context, block *blocktx_api.Block, prevBlockHash chainhash.Hash) error { + prevBlock, _ := p.store.GetBlock(ctx, &prevBlockHash) + if prevBlock == nil { // This check is only in case there's a fresh, empty database // with no blocks, to mark the first block as the LONGEST chain - longestTipExists, err = p.longestTipExists(ctx) + longestTipExists, err := p.longestTipExists(ctx) if err != nil { - p.logger.Error("unable to verify the longest tip existance in db", slog.String("hash", blockHash.String()), slog.Uint64("height", msg.Height), slog.String("err", err.Error())) - return nil, false, err + p.logger.Error("unable to verify the longest tip existance in db", slog.String("hash", getHashStringNoErr(block.Hash)), slog.Uint64("height", block.Height), slog.String("err", err.Error())) + return err } - } - - incomingBlock := createBlock(msg, prevBlock, longestTipExists) - competing, err := p.competingChainsExist(ctx, incomingBlock) - if err != nil { - p.logger.Error("unable to check for competing chains", slog.String("hash", blockHash.String()), slog.Uint64("height", msg.Height), slog.String("err", err.Error())) - return nil, false, err + // if there's no longest block in the + // database - mark this block as LONGEST + // otherwise - it's an orphan + if !longestTipExists { + block.Status = blocktx_api.Status_LONGEST + } else { + block.Status = blocktx_api.Status_ORPHANED + } + return nil } - if competing { - p.logger.Info("Competing blocks found", slog.String("incoming block hash", blockHash.String()), slog.Uint64("height", incomingBlock.Height)) - incomingBlock.Status = blocktx_api.Status_STALE + // if the previous block exists in the db but is currently being + // processed by another instance, we don't know what the final + // status of that parent block will be, so mark the incoming block + // as ORPHANED and wait for the next block to confirm the status + if !prevBlock.Processed { + block.Status = blocktx_api.Status_ORPHANED + return nil } - p.logger.Info("Inserting block", slog.String("hash", blockHash.String()), slog.Uint64("height", incomingBlock.Height), slog.String("status", incomingBlock.Status.String())) + if prevBlock.Status == blocktx_api.Status_LONGEST { + competingBlock, err := p.store.GetLongestBlockByHeight(ctx, block.Height) + if err != nil && !errors.Is(err, store.ErrBlockNotFound) { + p.logger.Error("unable to get the competing block from db", slog.String("hash", getHashStringNoErr(block.Hash)), slog.Uint64("height", block.Height), slog.String("err", err.Error())) + return err + } - err = p.insertBlockAndStoreTransactions(ctx, incomingBlock, msg.TransactionHashes, msg.Header.MerkleRoot) - if err != nil { - p.logger.Error("unable to insert block and store its transactions", slog.String("hash", blockHash.String()), slog.String("err", err.Error())) - return nil, false, err - } + if competingBlock == nil { + block.Status = blocktx_api.Status_LONGEST + return nil + } - // if the block is ORPHANED, there's no need to process it any further - if incomingBlock.Status == blocktx_api.Status_ORPHANED { - return chain{incomingBlock}, false, nil - } + if bytes.Equal(block.Hash, competingBlock.Hash) { + // this means that another instance is already processing + // or have processed this block that we're processing here + // so we can throw an error and finish processing + return ErrBlockAlreadyExists + } - chain, competing, err := p.updateOrphans(ctx, incomingBlock, competing) - if err != nil { - p.logger.Error("unable to check and update possible orphaned child blocks", slog.String("hash", blockHash.String()), slog.String("err", err.Error())) - return nil, false, err + block.Status = blocktx_api.Status_STALE + return nil } - return chain, competing, nil -} + // ORPHANED or STALE + block.Status = prevBlock.Status -func (p *Processor) getPrevBlock(ctx context.Context, prevHash *chainhash.Hash) (*blocktx_api.Block, error) { - prevBlock, err := p.store.GetBlock(ctx, prevHash) - if err != nil && !errors.Is(err, store.ErrBlockNotFound) { - return nil, err - } - - return prevBlock, nil + return nil } func (p *Processor) longestTipExists(ctx context.Context) (bool, error) { @@ -616,60 +609,20 @@ func (p *Processor) longestTipExists(ctx context.Context) (bool, error) { return true, nil } -func (p *Processor) competingChainsExist(ctx context.Context, block *blocktx_api.Block) (bool, error) { - if block.Status == blocktx_api.Status_ORPHANED { - return false, nil +func (p *Processor) getRegisteredTransactions(ctx context.Context, blocks []*blocktx_api.Block) ([]store.TransactionBlock, error) { + blockHashes := make([][]byte, len(blocks)) + for i, b := range blocks { + blockHashes[i] = b.Hash } - if block.Status == blocktx_api.Status_LONGEST { - competingBlock, err := p.store.GetBlockByHeight(ctx, block.Height) - if err != nil && !errors.Is(err, store.ErrBlockNotFound) { - return false, err - } - - if competingBlock != nil && !bytes.Equal(competingBlock.Hash, block.Hash) { - return true, nil - } - - return false, nil - } - - // If STALE status - return true, nil -} - -func (p *Processor) hasGreatestChainwork(ctx context.Context, competingChainTip *blocktx_api.Block) (bool, error) { - staleBlocks, err := p.store.GetStaleChainBackFromHash(ctx, competingChainTip.Hash) - if err != nil { - return false, err - } - - lowestHeight := competingChainTip.Height - if len(staleBlocks) > 0 { - lowestHeight = getLowestHeight(staleBlocks) - } - - longestBlocks, err := p.store.GetLongestChainFromHeight(ctx, lowestHeight) + txsToPublish, err := p.store.GetRegisteredTxsByBlockHashes(ctx, blockHashes) if err != nil { - return false, err - } - - sumStaleChainwork := big.NewInt(0) - sumLongChainwork := big.NewInt(0) - - for _, b := range staleBlocks { - chainwork := new(big.Int) - chainwork.SetString(b.Chainwork, 10) - sumStaleChainwork = sumStaleChainwork.Add(sumStaleChainwork, chainwork) - } - - for _, b := range longestBlocks { - chainwork := new(big.Int) - chainwork.SetString(b.Chainwork, 10) - sumLongChainwork = sumLongChainwork.Add(sumLongChainwork, chainwork) + block := blocks[len(blocks)-1] + p.logger.Error("unable to get registered transactions", slog.String("hash", getHashStringNoErr(block.Hash)), slog.Uint64("height", block.Height), slog.String("err", err.Error())) + return nil, err } - return sumLongChainwork.Cmp(sumStaleChainwork) < 0, nil + return txsToPublish, nil } func (p *Processor) insertBlockAndStoreTransactions(ctx context.Context, incomingBlock *blocktx_api.Block, txHashes []*chainhash.Hash, merkleRoot chainhash.Hash) error { @@ -770,179 +723,175 @@ func (p *Processor) storeTransactions(ctx context.Context, blockID uint64, block return nil } -func (p *Processor) updateOrphans(ctx context.Context, incomingBlock *blocktx_api.Block, competing bool) (chain, bool, error) { - chain := []*blocktx_api.Block{incomingBlock} - - uow, err := p.store.StartUnitOfWork(ctx) +func (p *Processor) handleStaleBlock(ctx context.Context, block *blocktx_api.Block) ([]store.TransactionBlock, error) { + staleBlocks, err := p.store.GetStaleChainBackFromHash(ctx, block.Hash) if err != nil { - return nil, false, err + p.logger.Error("unable to get STALE blocks to verify chainwork", slog.String("hash", getHashStringNoErr(block.Hash)), slog.Uint64("height", block.Height), slog.String("err", err.Error())) + return nil, err } - defer func() { - _ = uow.Rollback() - }() - // Very important step, this will lock blocks - // table for writing but still allow reading. - err = uow.WriteLockBlocksTable(ctx) - if err != nil { - return nil, false, err + lowestHeight := block.Height + if len(staleBlocks) > 0 { + lowestHeight = staleBlocks[0].Height } - orphanedBlocks, err := uow.GetOrphanedChainUpFromHash(ctx, incomingBlock.Hash) + longestBlocks, err := p.store.GetLongestChainFromHeight(ctx, lowestHeight) if err != nil { - return nil, false, err - } - if len(orphanedBlocks) == 0 { - return chain, competing, nil + p.logger.Error("unable to get LONGEST blocks to verify chainwork", slog.String("hash", getHashStringNoErr(block.Hash)), slog.Uint64("height", block.Height), slog.String("err", err.Error())) + return nil, err } - blockStatusUpdates := make([]store.BlockStatusUpdate, len(orphanedBlocks)) - for i := range orphanedBlocks { - // We want to mark all orphaned blocks as STALE - // in case there already exists a block at any - // of their height with status LONGEST, which - // would cause constraint validation (height, is_longest). - // - // If they are part of the LONGEST chain, the reorg - // will happen and update their statuses accordingly. - orphanedBlocks[i].Status = blocktx_api.Status_STALE + staleChainwork := sumChainwork(staleBlocks) + longestChainwork := sumChainwork(longestBlocks) - blockStatusUpdates[i] = store.BlockStatusUpdate{ - Hash: orphanedBlocks[i].Hash, - Status: blocktx_api.Status_STALE, - } - } + if longestChainwork.Cmp(staleChainwork) < 0 { + p.logger.Info("chain reorg detected", slog.String("hash", getHashStringNoErr(block.Hash)), slog.Uint64("height", block.Height)) - err = uow.UpdateBlocksStatuses(ctx, blockStatusUpdates) - if err != nil { - return nil, false, err + txsToPublish, err := p.performReorg(ctx, staleBlocks, longestBlocks) + if err != nil { + p.logger.Error("unable to perform reorg", slog.String("hash", getHashStringNoErr(block.Hash)), slog.Uint64("height", block.Height), slog.String("err", err.Error())) + return nil, err + } + return txsToPublish, nil } - err = uow.Commit() - if err != nil { - return nil, false, err - } + return nil, nil +} - p.logger.Info("orphans were found and updated", slog.Int("len", len(orphanedBlocks))) +func (p *Processor) performReorg(ctx context.Context, staleBlocks []*blocktx_api.Block, longestBlocks []*blocktx_api.Block) ([]store.TransactionBlock, error) { + staleHashes := make([][]byte, len(staleBlocks)) + longestHashes := make([][]byte, len(longestBlocks)) - chain = append(chain, orphanedBlocks...) + blockStatusUpdates := make([]store.BlockStatusUpdate, len(longestBlocks)+len(staleBlocks)) - // if we found any orphans and marked them as STALE - // we need to find out if they are part of the longest - // or stale chain, so competing is returned as true - return chain, true, nil -} + for i, b := range longestBlocks { + longestHashes[i] = b.Hash -func (p *Processor) performReorg(ctx context.Context, staleChainTip *blocktx_api.Block) ([]store.TransactionBlock, error) { - uow, err := p.store.StartUnitOfWork(ctx) - if err != nil { - return nil, err + b.Status = blocktx_api.Status_STALE + update := store.BlockStatusUpdate{Hash: b.Hash, Status: b.Status} + blockStatusUpdates[i] = update } - defer func() { - _ = uow.Rollback() - }() - // Very important step, this will lock blocks - // table for writing but still allow reading. - err = uow.WriteLockBlocksTable(ctx) - if err != nil { - return nil, err + for i, b := range staleBlocks { + staleHashes[i] = b.Hash + + b.Status = blocktx_api.Status_LONGEST + update := store.BlockStatusUpdate{Hash: b.Hash, Status: b.Status} + blockStatusUpdates[i+len(longestBlocks)] = update } - staleBlocks, err := uow.GetStaleChainBackFromHash(ctx, staleChainTip.Hash) + err := p.store.UpdateBlocksStatuses(ctx, blockStatusUpdates) if err != nil { return nil, err } - lowestHeight := staleChainTip.Height - if len(staleBlocks) > 0 { - lowestHeight = getLowestHeight(staleBlocks) - } + p.logger.Info("reorg performed successfully") - longestBlocks, err := uow.GetLongestChainFromHeight(ctx, lowestHeight) + registeredTxs, err := p.store.GetRegisteredTxsByBlockHashes(ctx, append(staleHashes, longestHashes...)) if err != nil { return nil, err } - staleHashes := make([][]byte, len(staleBlocks)) - longestHashes := make([][]byte, len(longestBlocks)) + longestTxs := make([]store.TransactionBlock, 0) + staleTxs := make([]store.TransactionBlock, 0) - for i, b := range longestBlocks { - longestHashes[i] = b.Hash + for _, tx := range registeredTxs { + switch tx.BlockStatus { + case blocktx_api.Status_LONGEST: + longestTxs = append(longestTxs, tx) + case blocktx_api.Status_STALE: + staleTxs = append(staleTxs, tx) + default: + // do nothing - ignore txs from ORPHANED or UNKNOWN blocks + } } - for i, b := range staleBlocks { - staleHashes[i] = b.Hash - } + staleTxs = findDistinctStaleTxs(longestTxs, staleTxs) + + return append(longestTxs, staleTxs...), nil +} - registeredTxs, err := uow.GetRegisteredTxsByBlockHashes(ctx, append(staleHashes, longestHashes...)) +func (p *Processor) handleOrphans(ctx context.Context, block *blocktx_api.Block) ([]store.TransactionBlock, error) { + orphans, ancestor, err := p.store.GetOrphansBackToNonOrphanAncestor(ctx, block.Hash) if err != nil { + p.logger.Error("unable to get ORPHANED blocks", slog.String("hash", getHashStringNoErr(block.Hash)), slog.Uint64("height", block.Height), slog.String("err", err.Error())) return nil, err } - // Order of inserting into blockStatusUpdates is important here, we need to do: - // 1. LONGEST -> STALE - // 2. STALE -> LONGEST - // otherwise, a unique constraint on (height, is_longest) might be violated. - - // 1. LONGEST -> STALE - blockStatusUpdates := make([]store.BlockStatusUpdate, len(longestBlocks)) - for i, b := range longestBlocks { - update := store.BlockStatusUpdate{Hash: b.Hash, Status: blocktx_api.Status_STALE} - blockStatusUpdates[i] = update + if ancestor == nil || !ancestor.Processed || len(orphans) == 0 { + return nil, nil } - err = uow.UpdateBlocksStatuses(ctx, blockStatusUpdates) - if err != nil { - return nil, err - } + p.logger.Info("orphaned chain found", slog.String("hash", getHashStringNoErr(block.Hash)), slog.Uint64("height", block.Height), slog.String("status", block.Status.String())) - // 2. STALE -> LONGEST - blockStatusUpdates = make([]store.BlockStatusUpdate, len(staleBlocks)) - for _, b := range staleBlocks { - update := store.BlockStatusUpdate{Hash: b.Hash, Status: blocktx_api.Status_LONGEST} - blockStatusUpdates = append(blockStatusUpdates, update) - } + if ancestor.Status == blocktx_api.Status_STALE { + err = p.acceptIntoChain(ctx, orphans, ancestor.Status) + if err != nil { + return nil, err + } - err = uow.UpdateBlocksStatuses(ctx, blockStatusUpdates) - if err != nil { - return nil, err + block.Status = blocktx_api.Status_STALE + return p.handleStaleBlock(ctx, block) } - err = uow.Commit() - if err != nil { - return nil, err - } + if ancestor.Status == blocktx_api.Status_LONGEST { + // If there is competing block at the height of + // the first orphan, then we need to mark them + // all as stale and recheck for reorg. + // + // If there's no competing block at the height + // of the first orphan, then we can assume that + // there's no competing chain at all. - p.logger.Info("reorg performed successfully") + competingBlock, err := p.store.GetLongestBlockByHeight(ctx, orphans[0].Height) + if err != nil && !errors.Is(err, store.ErrBlockNotFound) { + p.logger.Error("unable to get competing block when handling orphans", slog.String("hash", getHashStringNoErr(block.Hash)), slog.Uint64("height", block.Height), slog.String("err", err.Error())) + return nil, err + } - prevLongestTxs := make([]store.TransactionBlock, 0) - prevStaleTxs := make([]store.TransactionBlock, 0) + if competingBlock != nil && !bytes.Equal(competingBlock.Hash, orphans[0].Hash) { + err = p.acceptIntoChain(ctx, orphans, blocktx_api.Status_STALE) + if err != nil { + return nil, err + } - for _, tx := range registeredTxs { - switch tx.BlockStatus { - case blocktx_api.Status_LONGEST: - prevLongestTxs = append(prevLongestTxs, tx) - case blocktx_api.Status_STALE: - prevStaleTxs = append(prevStaleTxs, tx) - default: - // do nothing - ignore ORPHANED and UNKNOWN blocks + block.Status = blocktx_api.Status_STALE + return p.handleStaleBlock(ctx, block) } - } - nowMinedTxs, nowStaleTxs := findMinedAndStaleTxs(prevStaleTxs, prevLongestTxs) + err = p.acceptIntoChain(ctx, orphans, ancestor.Status) // LONGEST + if err != nil { + return nil, err + } - for i := range nowMinedTxs { - nowMinedTxs[i].BlockStatus = blocktx_api.Status_LONGEST + p.logger.Info("orphaned chain accepted into LONGEST chain", slog.String("hash", getHashStringNoErr(block.Hash)), slog.Uint64("height", block.Height)) + return p.getRegisteredTransactions(ctx, orphans) } - for i := range nowStaleTxs { - nowStaleTxs[i].BlockStatus = blocktx_api.Status_STALE + return nil, nil +} + +func (p *Processor) acceptIntoChain(ctx context.Context, blocks []*blocktx_api.Block, chain blocktx_api.Status) error { + blockStatusUpdates := make([]store.BlockStatusUpdate, len(blocks)) + + for i, b := range blocks { + b.Status = chain + blockStatusUpdates[i] = store.BlockStatusUpdate{ + Hash: b.Hash, + Status: b.Status, + } } - txsToPublish := append(nowMinedTxs, nowStaleTxs...) + tip := blocks[len(blocks)-1] - return txsToPublish, nil + err := p.store.UpdateBlocksStatuses(ctx, blockStatusUpdates) + if err != nil { + p.logger.Error("unable to accept blocks into chain", slog.String("hash", getHashStringNoErr(tip.Hash)), slog.Uint64("height", tip.Height), slog.String("chain", chain.String()), slog.String("err", err.Error())) + return err + } + + p.logger.Info("blocks successfully accepted into chain", slog.String("hash", getHashStringNoErr(tip.Hash)), slog.Uint64("height", tip.Height), slog.String("chain", chain.String())) + return nil } func (p *Processor) Shutdown() { diff --git a/internal/blocktx/processor_helpers.go b/internal/blocktx/processor_helpers.go index 4ba6ce945..a5fbcd53a 100644 --- a/internal/blocktx/processor_helpers.go +++ b/internal/blocktx/processor_helpers.go @@ -6,7 +6,6 @@ import ( "github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api" "github.com/bitcoin-sv/arc/internal/blocktx/store" - "github.com/libsv/go-p2p" "github.com/libsv/go-p2p/chaincfg/chainhash" ) @@ -18,69 +17,33 @@ func getHashStringNoErr(hash []byte) string { return chash.String() } -func createBlock(msg *p2p.BlockMessage, prevBlock *blocktx_api.Block, longestTipExists bool) *blocktx_api.Block { - hash := msg.Header.BlockHash() - prevHash := msg.Header.PrevBlock - merkleRoot := msg.Header.MerkleRoot - chainwork := calculateChainwork(msg.Header.Bits) - - var status blocktx_api.Status - if prevBlock == nil { - if longestTipExists { - status = blocktx_api.Status_ORPHANED - } else { - status = blocktx_api.Status_LONGEST - } - } else { - status = prevBlock.Status - } - - return &blocktx_api.Block{ - Hash: hash[:], - PreviousHash: prevHash[:], - MerkleRoot: merkleRoot[:], - Height: msg.Height, - Status: status, - Chainwork: chainwork.String(), - } -} - -func getLowestHeight(blocks []*blocktx_api.Block) uint64 { - if len(blocks) == 0 { - return 0 - } - - lowest := blocks[0].Height +func sumChainwork(blocks []*blocktx_api.Block) *big.Int { + sum := big.NewInt(0) for _, b := range blocks { - if b.Height < lowest { - lowest = b.Height - } + chainwork := new(big.Int) + chainwork.SetString(b.Chainwork, 10) + + sum = sum.Add(sum, chainwork) } - return lowest + return sum } -func findMinedAndStaleTxs(prevStaleTxs, prevLongestTxs []store.TransactionBlock) (nowMinedTxs, nowStaleTxs []store.TransactionBlock) { - prevStaleMap := make(map[string]store.TransactionBlock) +func findDistinctStaleTxs(longestTxs, staleTxs []store.TransactionBlock) []store.TransactionBlock { + longestTxsMap := make(map[string]struct{}) - for _, tx := range prevStaleTxs { - prevStaleMap[string(tx.TxHash)] = tx - // every tx that was in previously stale blocks is to - // be mined regardless of whether it was also in the - // previously longest chain (update block info) - // or previously stale chain (new mined) - nowMinedTxs = append(nowMinedTxs, tx) + for _, tx := range longestTxs { + longestTxsMap[string(tx.TxHash)] = struct{}{} } - for _, longestTx := range prevLongestTxs { - if _, found := prevStaleMap[string(longestTx.TxHash)]; !found { - // if a transaction that was previously in a longest chain is - // not found in the previously stale blocks - it is now stale - nowStaleTxs = append(nowStaleTxs, longestTx) + distinctStaleTxs := make([]store.TransactionBlock, 0) + for _, tx := range staleTxs { + if _, found := longestTxsMap[string(tx.TxHash)]; !found { + distinctStaleTxs = append(distinctStaleTxs, tx) } } - return + return distinctStaleTxs } // calculateChainwork calculates chainwork from the given difficulty bits diff --git a/internal/blocktx/processor_helpers_test.go b/internal/blocktx/processor_helpers_test.go index 2aa6c0e54..b7ad49fd3 100644 --- a/internal/blocktx/processor_helpers_test.go +++ b/internal/blocktx/processor_helpers_test.go @@ -4,41 +4,13 @@ import ( "fmt" "testing" - "github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api" "github.com/bitcoin-sv/arc/internal/blocktx/store" "github.com/stretchr/testify/require" ) -func TestGetLowestHeight(t *testing.T) { - // given - blocks := []*blocktx_api.Block{ - { - Height: 123, - }, - { - Height: 250, - }, - { - Height: 83340, - }, - { - Height: 4, - }, - { - Height: 40, - }, - } - - // when - lowestHeight := getLowestHeight(blocks) - - // then - require.Equal(t, uint64(4), lowestHeight) -} - func TestFindMinedAndStaleTxs(t *testing.T) { // given - prevStaleTxs := []store.TransactionBlock{ + longestTxs := []store.TransactionBlock{ { TxHash: []byte("1"), }, @@ -46,7 +18,7 @@ func TestFindMinedAndStaleTxs(t *testing.T) { TxHash: []byte("2"), }, } - prevLongestTxs := []store.TransactionBlock{ + staleTxs := []store.TransactionBlock{ { TxHash: []byte("A"), }, @@ -58,14 +30,6 @@ func TestFindMinedAndStaleTxs(t *testing.T) { }, } - expectedMinedTxs := []store.TransactionBlock{ - { - TxHash: []byte("1"), - }, - { - TxHash: []byte("2"), - }, - } expectedStaleTxs := []store.TransactionBlock{ { TxHash: []byte("A"), @@ -76,10 +40,9 @@ func TestFindMinedAndStaleTxs(t *testing.T) { } // when - actualMinedTxs, actualStaleTxs := findMinedAndStaleTxs(prevStaleTxs, prevLongestTxs) + actualStaleTxs := findDistinctStaleTxs(longestTxs, staleTxs) // then - require.Equal(t, expectedMinedTxs, actualMinedTxs) require.Equal(t, expectedStaleTxs, actualStaleTxs) } diff --git a/internal/blocktx/processor_test.go b/internal/blocktx/processor_test.go index e289bc4ac..2b466541f 100644 --- a/internal/blocktx/processor_test.go +++ b/internal/blocktx/processor_test.go @@ -14,7 +14,6 @@ import ( "github.com/bitcoin-sv/arc/internal/blocktx/mocks" "github.com/bitcoin-sv/arc/internal/blocktx/store" storeMocks "github.com/bitcoin-sv/arc/internal/blocktx/store/mocks" - testutils "github.com/bitcoin-sv/arc/internal/test_utils" "github.com/bitcoin-sv/arc/internal/testdata" "github.com/libsv/go-bc" "github.com/libsv/go-p2p" @@ -145,31 +144,14 @@ func TestHandleBlock(t *testing.T) { t.Run(tc.name, func(t *testing.T) { // given batchSize := 4 - uowMock := &storeMocks.UnitOfWorkMock{ - GetOrphanedChainUpFromHashFunc: func(_ context.Context, _ []byte) ([]*blocktx_api.Block, error) { - return nil, nil - }, - CommitFunc: func() error { - return nil - }, - RollbackFunc: func() error { - return nil - }, - WriteLockBlocksTableFunc: func(_ context.Context) error { - return nil - }, - } storeMock := &storeMocks.BlocktxStoreMock{ - StartUnitOfWorkFunc: func(_ context.Context) (store.UnitOfWork, error) { - return uowMock, nil - }, GetBlockFunc: func(_ context.Context, _ *chainhash.Hash) (*blocktx_api.Block, error) { if tc.blockAlreadyProcessed { return &blocktx_api.Block{Processed: true}, nil } return nil, store.ErrBlockNotFound }, - GetBlockByHeightFunc: func(_ context.Context, _ uint64) (*blocktx_api.Block, error) { + GetLongestBlockByHeightFunc: func(_ context.Context, _ uint64) (*blocktx_api.Block, error) { return nil, store.ErrBlockNotFound }, GetChainTipFunc: func(_ context.Context) (*blocktx_api.Block, error) { @@ -275,25 +257,19 @@ func TestHandleBlock(t *testing.T) { func TestHandleBlockReorgAndOrphans(t *testing.T) { testCases := []struct { - name string - blockAlreadyExists bool - prevBlockStatus blocktx_api.Status - hasCompetingBlock bool - hasGreaterChainwork bool - expectedStatus blocktx_api.Status - shouldFindOrphanChain bool + name string + blockAlreadyExists bool + prevBlockStatus blocktx_api.Status + hasCompetingBlock bool + hasGreaterChainwork bool + shouldFindOrphanAncestor bool + ancestorStatus blocktx_api.Status + expectedStatus blocktx_api.Status }{ { - name: "block already exists - no orphans - should be ingored", - blockAlreadyExists: true, - shouldFindOrphanChain: false, - expectedStatus: blocktx_api.Status_UNKNOWN, - }, - { - name: "block already exists - orphans found - reorg", - blockAlreadyExists: true, - shouldFindOrphanChain: true, - expectedStatus: blocktx_api.Status_LONGEST, + name: "block already exists - should be ingored", + blockAlreadyExists: true, + expectedStatus: blocktx_api.Status_UNKNOWN, }, { name: "previous block longest - no competing - no reorg", @@ -316,56 +292,55 @@ func TestHandleBlockReorgAndOrphans(t *testing.T) { expectedStatus: blocktx_api.Status_LONGEST, }, { - name: "previous block stale - competing - no reorg", - prevBlockStatus: blocktx_api.Status_STALE, - hasCompetingBlock: true, - hasGreaterChainwork: false, - expectedStatus: blocktx_api.Status_STALE, - }, - { - name: "previous block stale - no competing - no reorg", + name: "previous block stale - no reorg", prevBlockStatus: blocktx_api.Status_STALE, - hasCompetingBlock: false, hasGreaterChainwork: false, expectedStatus: blocktx_api.Status_STALE, }, { - name: "previous block stale - no competing - reorg", + name: "previous block stale - reorg", prevBlockStatus: blocktx_api.Status_STALE, - hasCompetingBlock: false, hasGreaterChainwork: true, expectedStatus: blocktx_api.Status_LONGEST, }, { - name: "previous block orphaned - no competing - no reorg", - prevBlockStatus: blocktx_api.Status_ORPHANED, - hasCompetingBlock: false, - hasGreaterChainwork: false, - expectedStatus: blocktx_api.Status_ORPHANED, + name: "previous block orphaned - no ancestor", + prevBlockStatus: blocktx_api.Status_ORPHANED, + shouldFindOrphanAncestor: false, + expectedStatus: blocktx_api.Status_ORPHANED, }, { - name: "previous block longest - orphaned chain - no competing - no reorg", - prevBlockStatus: blocktx_api.Status_LONGEST, - hasCompetingBlock: false, - hasGreaterChainwork: false, - expectedStatus: blocktx_api.Status_LONGEST, - shouldFindOrphanChain: true, + name: "previous block orphaned - stale ancestor", + prevBlockStatus: blocktx_api.Status_ORPHANED, + shouldFindOrphanAncestor: true, + ancestorStatus: blocktx_api.Status_STALE, + expectedStatus: blocktx_api.Status_STALE, }, { - name: "previous block longest - orphaned chain - competing - reorg", - prevBlockStatus: blocktx_api.Status_LONGEST, - hasCompetingBlock: true, - hasGreaterChainwork: false, // tip of orphan chain has greater chainwork - expectedStatus: blocktx_api.Status_LONGEST, - shouldFindOrphanChain: true, + name: "previous block orphaned - longest ancestor - no competing", + prevBlockStatus: blocktx_api.Status_ORPHANED, + shouldFindOrphanAncestor: true, + ancestorStatus: blocktx_api.Status_LONGEST, + hasCompetingBlock: false, + expectedStatus: blocktx_api.Status_LONGEST, }, { - name: "previous block stale - orphaned chain - competing - reorg", - prevBlockStatus: blocktx_api.Status_STALE, - hasCompetingBlock: true, - hasGreaterChainwork: false, // tip of orphan chain has greater chainwork - expectedStatus: blocktx_api.Status_LONGEST, - shouldFindOrphanChain: true, + name: "previous block orphaned - longest ancestor - competing - no reorg", + prevBlockStatus: blocktx_api.Status_ORPHANED, + shouldFindOrphanAncestor: true, + ancestorStatus: blocktx_api.Status_LONGEST, + hasCompetingBlock: true, + hasGreaterChainwork: false, + expectedStatus: blocktx_api.Status_STALE, + }, + { + name: "previous block orphaned - longest ancestor - competing - reorg", + prevBlockStatus: blocktx_api.Status_ORPHANED, + shouldFindOrphanAncestor: true, + ancestorStatus: blocktx_api.Status_LONGEST, + hasCompetingBlock: true, + hasGreaterChainwork: true, + expectedStatus: blocktx_api.Status_LONGEST, }, } @@ -374,72 +349,9 @@ func TestHandleBlockReorgAndOrphans(t *testing.T) { // given var mtx sync.Mutex insertedBlockStatus := blocktx_api.Status_UNKNOWN - orphanedChainTip := &blocktx_api.Block{ - Hash: testutils.RevChainhash(t, "0000000000000000025855b62f4c2e3732dad363a6f2ead94e4657ef96877067")[:], - Status: blocktx_api.Status_ORPHANED, - Chainwork: "34364008516618225545", // greatest chainwork - should cause reorg if found - } - shouldReturnNoBlock := !tc.blockAlreadyExists - shouldCheckUpdateStatuses := true - uowMock := &storeMocks.UnitOfWorkMock{ - GetOrphanedChainUpFromHashFunc: func(_ context.Context, _ []byte) ([]*blocktx_api.Block, error) { - if tc.shouldFindOrphanChain { - return []*blocktx_api.Block{ - { - Hash: []byte("123"), - Status: blocktx_api.Status_ORPHANED, - Chainwork: "123", - }, - orphanedChainTip, - }, nil - } - - return nil, nil - }, - UpdateBlocksStatusesFunc: func(_ context.Context, blockStatusUpdates []store.BlockStatusUpdate) error { - if shouldCheckUpdateStatuses && tc.shouldFindOrphanChain { - mtx.Lock() - shouldCheckUpdateStatuses = false - tipStatusUpdate := blockStatusUpdates[len(blockStatusUpdates)-1] - require.Equal(t, orphanedChainTip.Hash, tipStatusUpdate.Hash) - require.Equal(t, blocktx_api.Status_STALE, tipStatusUpdate.Status) - mtx.Unlock() - } - return nil - }, - GetStaleChainBackFromHashFunc: func(_ context.Context, hash []byte) ([]*blocktx_api.Block, error) { - // if this method is called from UnitOfwork, it means that reorg is happening - mtx.Lock() - insertedBlockStatus = blocktx_api.Status_LONGEST - if tc.shouldFindOrphanChain { - require.Equal(t, orphanedChainTip.Hash[:], hash) - orphanedChainTip.Status = blocktx_api.Status_LONGEST - } - mtx.Unlock() - return nil, nil - }, - GetLongestChainFromHeightFunc: func(_ context.Context, _ uint64) ([]*blocktx_api.Block, error) { - return nil, nil - }, - GetRegisteredTxsByBlockHashesFunc: func(_ context.Context, _ [][]byte) ([]store.TransactionBlock, error) { - return nil, nil - }, - CommitFunc: func() error { - return nil - }, - RollbackFunc: func() error { - return nil - }, - WriteLockBlocksTableFunc: func(_ context.Context) error { - return nil - }, - } storeMock := &storeMocks.BlocktxStoreMock{ - StartUnitOfWorkFunc: func(_ context.Context) (store.UnitOfWork, error) { - return uowMock, nil - }, GetBlockFunc: func(_ context.Context, _ *chainhash.Hash) (*blocktx_api.Block, error) { if shouldReturnNoBlock { shouldReturnNoBlock = false @@ -451,7 +363,7 @@ func TestHandleBlockReorgAndOrphans(t *testing.T) { Processed: true, }, nil }, - GetBlockByHeightFunc: func(_ context.Context, _ uint64) (*blocktx_api.Block, error) { + GetLongestBlockByHeightFunc: func(_ context.Context, _ uint64) (*blocktx_api.Block, error) { if tc.hasCompetingBlock { blockHash, err := chainhash.NewHashFromStr("0000000000000000087590e1ad6360c0c491556c9af75c0d22ce9324cb5713cf") require.NoError(t, err) @@ -471,11 +383,7 @@ func TestHandleBlockReorgAndOrphans(t *testing.T) { mtx.Unlock() return 1, nil }, - GetStaleChainBackFromHashFunc: func(_ context.Context, hash []byte) ([]*blocktx_api.Block, error) { - if tc.shouldFindOrphanChain { - require.Equal(t, orphanedChainTip.Hash, hash) - return []*blocktx_api.Block{orphanedChainTip}, nil - } + GetStaleChainBackFromHashFunc: func(_ context.Context, _ []byte) ([]*blocktx_api.Block, error) { if tc.hasGreaterChainwork { return []*blocktx_api.Block{ { @@ -505,6 +413,21 @@ func TestHandleBlockReorgAndOrphans(t *testing.T) { }, }, nil }, + UpdateBlocksStatusesFunc: func(_ context.Context, blockStatusUpdates []store.BlockStatusUpdate) error { + mtx.Lock() + tipStatusUpdate := blockStatusUpdates[len(blockStatusUpdates)-1] + insertedBlockStatus = tipStatusUpdate.Status + mtx.Unlock() + return nil + }, + GetOrphansBackToNonOrphanAncestorFunc: func(_ context.Context, hash []byte) ([]*blocktx_api.Block, *blocktx_api.Block, error) { + if tc.shouldFindOrphanAncestor { + orphans := []*blocktx_api.Block{{Hash: hash}} + ancestor := &blocktx_api.Block{Hash: []byte("123"), Status: tc.ancestorStatus, Processed: true} + return orphans, ancestor, nil + } + return nil, nil, nil + }, UpsertBlockTransactionsFunc: func(_ context.Context, _ uint64, _ []store.TxWithMerklePath) error { return nil }, @@ -557,9 +480,6 @@ func TestHandleBlockReorgAndOrphans(t *testing.T) { time.Sleep(20 * time.Millisecond) mtx.Lock() require.Equal(t, tc.expectedStatus, insertedBlockStatus) - if tc.shouldFindOrphanChain { - require.Equal(t, tc.expectedStatus, orphanedChainTip.Status) - } mtx.Unlock() }) } diff --git a/internal/blocktx/store/mocks/blocktx_db_tx_mock.go b/internal/blocktx/store/mocks/blocktx_db_tx_mock.go deleted file mode 100644 index f93c655c7..000000000 --- a/internal/blocktx/store/mocks/blocktx_db_tx_mock.go +++ /dev/null @@ -1,1269 +0,0 @@ -// Code generated by moq; DO NOT EDIT. -// github.com/matryer/moq - -package mocks - -import ( - "context" - "github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api" - "github.com/bitcoin-sv/arc/internal/blocktx/store" - "github.com/libsv/go-p2p/chaincfg/chainhash" - "sync" -) - -// Ensure, that UnitOfWorkMock does implement store.UnitOfWork. -// If this is not the case, regenerate this file with moq. -var _ store.UnitOfWork = &UnitOfWorkMock{} - -// UnitOfWorkMock is a mock implementation of store.UnitOfWork. -// -// func TestSomethingThatUsesUnitOfWork(t *testing.T) { -// -// // make and configure a mocked store.UnitOfWork -// mockedUnitOfWork := &UnitOfWorkMock{ -// ClearBlocktxTableFunc: func(ctx context.Context, retentionDays int32, table string) (*blocktx_api.RowsAffectedResponse, error) { -// panic("mock out the ClearBlocktxTable method") -// }, -// CloseFunc: func() error { -// panic("mock out the Close method") -// }, -// CommitFunc: func() error { -// panic("mock out the Commit method") -// }, -// DelBlockProcessingFunc: func(ctx context.Context, hash *chainhash.Hash, processedBy string) (int64, error) { -// panic("mock out the DelBlockProcessing method") -// }, -// GetBlockFunc: func(ctx context.Context, hash *chainhash.Hash) (*blocktx_api.Block, error) { -// panic("mock out the GetBlock method") -// }, -// GetBlockByHeightFunc: func(ctx context.Context, height uint64) (*blocktx_api.Block, error) { -// panic("mock out the GetBlockByHeight method") -// }, -// GetBlockGapsFunc: func(ctx context.Context, heightRange int) ([]*store.BlockGap, error) { -// panic("mock out the GetBlockGaps method") -// }, -// GetBlockHashesProcessingInProgressFunc: func(ctx context.Context, processedBy string) ([]*chainhash.Hash, error) { -// panic("mock out the GetBlockHashesProcessingInProgress method") -// }, -// GetChainTipFunc: func(ctx context.Context) (*blocktx_api.Block, error) { -// panic("mock out the GetChainTip method") -// }, -// GetLongestChainFromHeightFunc: func(ctx context.Context, height uint64) ([]*blocktx_api.Block, error) { -// panic("mock out the GetLongestChainFromHeight method") -// }, -// GetMinedTransactionsFunc: func(ctx context.Context, hashes [][]byte, onlyLongestChain bool) ([]store.TransactionBlock, error) { -// panic("mock out the GetMinedTransactions method") -// }, -// GetOrphanedChainUpFromHashFunc: func(ctx context.Context, hash []byte) ([]*blocktx_api.Block, error) { -// panic("mock out the GetOrphanedChainUpFromHash method") -// }, -// GetRegisteredTxsByBlockHashesFunc: func(ctx context.Context, blockHashes [][]byte) ([]store.TransactionBlock, error) { -// panic("mock out the GetRegisteredTxsByBlockHashes method") -// }, -// GetStaleChainBackFromHashFunc: func(ctx context.Context, hash []byte) ([]*blocktx_api.Block, error) { -// panic("mock out the GetStaleChainBackFromHash method") -// }, -// MarkBlockAsDoneFunc: func(ctx context.Context, hash *chainhash.Hash, size uint64, txCount uint64) error { -// panic("mock out the MarkBlockAsDone method") -// }, -// PingFunc: func(ctx context.Context) error { -// panic("mock out the Ping method") -// }, -// RegisterTransactionsFunc: func(ctx context.Context, txHashes [][]byte) ([]*chainhash.Hash, error) { -// panic("mock out the RegisterTransactions method") -// }, -// RollbackFunc: func() error { -// panic("mock out the Rollback method") -// }, -// SetBlockProcessingFunc: func(ctx context.Context, hash *chainhash.Hash, processedBy string) (string, error) { -// panic("mock out the SetBlockProcessing method") -// }, -// StartUnitOfWorkFunc: func(ctx context.Context) (store.UnitOfWork, error) { -// panic("mock out the StartUnitOfWork method") -// }, -// UpdateBlocksStatusesFunc: func(ctx context.Context, blockStatusUpdates []store.BlockStatusUpdate) error { -// panic("mock out the UpdateBlocksStatuses method") -// }, -// UpsertBlockFunc: func(ctx context.Context, block *blocktx_api.Block) (uint64, error) { -// panic("mock out the UpsertBlock method") -// }, -// UpsertBlockTransactionsFunc: func(ctx context.Context, blockID uint64, txsWithMerklePaths []store.TxWithMerklePath) error { -// panic("mock out the UpsertBlockTransactions method") -// }, -// VerifyMerkleRootsFunc: func(ctx context.Context, merkleRoots []*blocktx_api.MerkleRootVerificationRequest, maxAllowedBlockHeightMismatch int) (*blocktx_api.MerkleRootVerificationResponse, error) { -// panic("mock out the VerifyMerkleRoots method") -// }, -// WriteLockBlocksTableFunc: func(ctx context.Context) error { -// panic("mock out the WriteLockBlocksTable method") -// }, -// } -// -// // use mockedUnitOfWork in code that requires store.UnitOfWork -// // and then make assertions. -// -// } -type UnitOfWorkMock struct { - // ClearBlocktxTableFunc mocks the ClearBlocktxTable method. - ClearBlocktxTableFunc func(ctx context.Context, retentionDays int32, table string) (*blocktx_api.RowsAffectedResponse, error) - - // CloseFunc mocks the Close method. - CloseFunc func() error - - // CommitFunc mocks the Commit method. - CommitFunc func() error - - // DelBlockProcessingFunc mocks the DelBlockProcessing method. - DelBlockProcessingFunc func(ctx context.Context, hash *chainhash.Hash, processedBy string) (int64, error) - - // GetBlockFunc mocks the GetBlock method. - GetBlockFunc func(ctx context.Context, hash *chainhash.Hash) (*blocktx_api.Block, error) - - // GetBlockByHeightFunc mocks the GetBlockByHeight method. - GetBlockByHeightFunc func(ctx context.Context, height uint64) (*blocktx_api.Block, error) - - // GetBlockGapsFunc mocks the GetBlockGaps method. - GetBlockGapsFunc func(ctx context.Context, heightRange int) ([]*store.BlockGap, error) - - // GetBlockHashesProcessingInProgressFunc mocks the GetBlockHashesProcessingInProgress method. - GetBlockHashesProcessingInProgressFunc func(ctx context.Context, processedBy string) ([]*chainhash.Hash, error) - - // GetChainTipFunc mocks the GetChainTip method. - GetChainTipFunc func(ctx context.Context) (*blocktx_api.Block, error) - - // GetLongestChainFromHeightFunc mocks the GetLongestChainFromHeight method. - GetLongestChainFromHeightFunc func(ctx context.Context, height uint64) ([]*blocktx_api.Block, error) - - // GetMinedTransactionsFunc mocks the GetMinedTransactions method. - GetMinedTransactionsFunc func(ctx context.Context, hashes [][]byte, onlyLongestChain bool) ([]store.TransactionBlock, error) - - // GetOrphanedChainUpFromHashFunc mocks the GetOrphanedChainUpFromHash method. - GetOrphanedChainUpFromHashFunc func(ctx context.Context, hash []byte) ([]*blocktx_api.Block, error) - - // GetRegisteredTxsByBlockHashesFunc mocks the GetRegisteredTxsByBlockHashes method. - GetRegisteredTxsByBlockHashesFunc func(ctx context.Context, blockHashes [][]byte) ([]store.TransactionBlock, error) - - // GetStaleChainBackFromHashFunc mocks the GetStaleChainBackFromHash method. - GetStaleChainBackFromHashFunc func(ctx context.Context, hash []byte) ([]*blocktx_api.Block, error) - - // MarkBlockAsDoneFunc mocks the MarkBlockAsDone method. - MarkBlockAsDoneFunc func(ctx context.Context, hash *chainhash.Hash, size uint64, txCount uint64) error - - // PingFunc mocks the Ping method. - PingFunc func(ctx context.Context) error - - // RegisterTransactionsFunc mocks the RegisterTransactions method. - RegisterTransactionsFunc func(ctx context.Context, txHashes [][]byte) ([]*chainhash.Hash, error) - - // RollbackFunc mocks the Rollback method. - RollbackFunc func() error - - // SetBlockProcessingFunc mocks the SetBlockProcessing method. - SetBlockProcessingFunc func(ctx context.Context, hash *chainhash.Hash, processedBy string) (string, error) - - // StartUnitOfWorkFunc mocks the StartUnitOfWork method. - StartUnitOfWorkFunc func(ctx context.Context) (store.UnitOfWork, error) - - // UpdateBlocksStatusesFunc mocks the UpdateBlocksStatuses method. - UpdateBlocksStatusesFunc func(ctx context.Context, blockStatusUpdates []store.BlockStatusUpdate) error - - // UpsertBlockFunc mocks the UpsertBlock method. - UpsertBlockFunc func(ctx context.Context, block *blocktx_api.Block) (uint64, error) - - // UpsertBlockTransactionsFunc mocks the UpsertBlockTransactions method. - UpsertBlockTransactionsFunc func(ctx context.Context, blockID uint64, txsWithMerklePaths []store.TxWithMerklePath) error - - // VerifyMerkleRootsFunc mocks the VerifyMerkleRoots method. - VerifyMerkleRootsFunc func(ctx context.Context, merkleRoots []*blocktx_api.MerkleRootVerificationRequest, maxAllowedBlockHeightMismatch int) (*blocktx_api.MerkleRootVerificationResponse, error) - - // WriteLockBlocksTableFunc mocks the WriteLockBlocksTable method. - WriteLockBlocksTableFunc func(ctx context.Context) error - - // calls tracks calls to the methods. - calls struct { - // ClearBlocktxTable holds details about calls to the ClearBlocktxTable method. - ClearBlocktxTable []struct { - // Ctx is the ctx argument value. - Ctx context.Context - // RetentionDays is the retentionDays argument value. - RetentionDays int32 - // Table is the table argument value. - Table string - } - // Close holds details about calls to the Close method. - Close []struct { - } - // Commit holds details about calls to the Commit method. - Commit []struct { - } - // DelBlockProcessing holds details about calls to the DelBlockProcessing method. - DelBlockProcessing []struct { - // Ctx is the ctx argument value. - Ctx context.Context - // Hash is the hash argument value. - Hash *chainhash.Hash - // ProcessedBy is the processedBy argument value. - ProcessedBy string - } - // GetBlock holds details about calls to the GetBlock method. - GetBlock []struct { - // Ctx is the ctx argument value. - Ctx context.Context - // Hash is the hash argument value. - Hash *chainhash.Hash - } - // GetBlockByHeight holds details about calls to the GetBlockByHeight method. - GetBlockByHeight []struct { - // Ctx is the ctx argument value. - Ctx context.Context - // Height is the height argument value. - Height uint64 - } - // GetBlockGaps holds details about calls to the GetBlockGaps method. - GetBlockGaps []struct { - // Ctx is the ctx argument value. - Ctx context.Context - // HeightRange is the heightRange argument value. - HeightRange int - } - // GetBlockHashesProcessingInProgress holds details about calls to the GetBlockHashesProcessingInProgress method. - GetBlockHashesProcessingInProgress []struct { - // Ctx is the ctx argument value. - Ctx context.Context - // ProcessedBy is the processedBy argument value. - ProcessedBy string - } - // GetChainTip holds details about calls to the GetChainTip method. - GetChainTip []struct { - // Ctx is the ctx argument value. - Ctx context.Context - } - // GetLongestChainFromHeight holds details about calls to the GetLongestChainFromHeight method. - GetLongestChainFromHeight []struct { - // Ctx is the ctx argument value. - Ctx context.Context - // Height is the height argument value. - Height uint64 - } - // GetMinedTransactions holds details about calls to the GetMinedTransactions method. - GetMinedTransactions []struct { - // Ctx is the ctx argument value. - Ctx context.Context - // Hashes is the hashes argument value. - Hashes [][]byte - // OnlyLongestChain is the onlyLongestChain argument value. - OnlyLongestChain bool - } - // GetOrphanedChainUpFromHash holds details about calls to the GetOrphanedChainUpFromHash method. - GetOrphanedChainUpFromHash []struct { - // Ctx is the ctx argument value. - Ctx context.Context - // Hash is the hash argument value. - Hash []byte - } - // GetRegisteredTxsByBlockHashes holds details about calls to the GetRegisteredTxsByBlockHashes method. - GetRegisteredTxsByBlockHashes []struct { - // Ctx is the ctx argument value. - Ctx context.Context - // BlockHashes is the blockHashes argument value. - BlockHashes [][]byte - } - // GetStaleChainBackFromHash holds details about calls to the GetStaleChainBackFromHash method. - GetStaleChainBackFromHash []struct { - // Ctx is the ctx argument value. - Ctx context.Context - // Hash is the hash argument value. - Hash []byte - } - // MarkBlockAsDone holds details about calls to the MarkBlockAsDone method. - MarkBlockAsDone []struct { - // Ctx is the ctx argument value. - Ctx context.Context - // Hash is the hash argument value. - Hash *chainhash.Hash - // Size is the size argument value. - Size uint64 - // TxCount is the txCount argument value. - TxCount uint64 - } - // Ping holds details about calls to the Ping method. - Ping []struct { - // Ctx is the ctx argument value. - Ctx context.Context - } - // RegisterTransactions holds details about calls to the RegisterTransactions method. - RegisterTransactions []struct { - // Ctx is the ctx argument value. - Ctx context.Context - // TxHashes is the txHashes argument value. - TxHashes [][]byte - } - // Rollback holds details about calls to the Rollback method. - Rollback []struct { - } - // SetBlockProcessing holds details about calls to the SetBlockProcessing method. - SetBlockProcessing []struct { - // Ctx is the ctx argument value. - Ctx context.Context - // Hash is the hash argument value. - Hash *chainhash.Hash - // ProcessedBy is the processedBy argument value. - ProcessedBy string - } - // StartUnitOfWork holds details about calls to the StartUnitOfWork method. - StartUnitOfWork []struct { - // Ctx is the ctx argument value. - Ctx context.Context - } - // UpdateBlocksStatuses holds details about calls to the UpdateBlocksStatuses method. - UpdateBlocksStatuses []struct { - // Ctx is the ctx argument value. - Ctx context.Context - // BlockStatusUpdates is the blockStatusUpdates argument value. - BlockStatusUpdates []store.BlockStatusUpdate - } - // UpsertBlock holds details about calls to the UpsertBlock method. - UpsertBlock []struct { - // Ctx is the ctx argument value. - Ctx context.Context - // Block is the block argument value. - Block *blocktx_api.Block - } - // UpsertBlockTransactions holds details about calls to the UpsertBlockTransactions method. - UpsertBlockTransactions []struct { - // Ctx is the ctx argument value. - Ctx context.Context - // BlockID is the blockID argument value. - BlockID uint64 - // TxsWithMerklePaths is the txsWithMerklePaths argument value. - TxsWithMerklePaths []store.TxWithMerklePath - } - // VerifyMerkleRoots holds details about calls to the VerifyMerkleRoots method. - VerifyMerkleRoots []struct { - // Ctx is the ctx argument value. - Ctx context.Context - // MerkleRoots is the merkleRoots argument value. - MerkleRoots []*blocktx_api.MerkleRootVerificationRequest - // MaxAllowedBlockHeightMismatch is the maxAllowedBlockHeightMismatch argument value. - MaxAllowedBlockHeightMismatch int - } - // WriteLockBlocksTable holds details about calls to the WriteLockBlocksTable method. - WriteLockBlocksTable []struct { - // Ctx is the ctx argument value. - Ctx context.Context - } - } - lockClearBlocktxTable sync.RWMutex - lockClose sync.RWMutex - lockCommit sync.RWMutex - lockDelBlockProcessing sync.RWMutex - lockGetBlock sync.RWMutex - lockGetBlockByHeight sync.RWMutex - lockGetBlockGaps sync.RWMutex - lockGetBlockHashesProcessingInProgress sync.RWMutex - lockGetChainTip sync.RWMutex - lockGetLongestChainFromHeight sync.RWMutex - lockGetMinedTransactions sync.RWMutex - lockGetOrphanedChainUpFromHash sync.RWMutex - lockGetRegisteredTxsByBlockHashes sync.RWMutex - lockGetStaleChainBackFromHash sync.RWMutex - lockMarkBlockAsDone sync.RWMutex - lockPing sync.RWMutex - lockRegisterTransactions sync.RWMutex - lockRollback sync.RWMutex - lockSetBlockProcessing sync.RWMutex - lockStartUnitOfWork sync.RWMutex - lockUpdateBlocksStatuses sync.RWMutex - lockUpsertBlock sync.RWMutex - lockUpsertBlockTransactions sync.RWMutex - lockVerifyMerkleRoots sync.RWMutex - lockWriteLockBlocksTable sync.RWMutex -} - -// ClearBlocktxTable calls ClearBlocktxTableFunc. -func (mock *UnitOfWorkMock) ClearBlocktxTable(ctx context.Context, retentionDays int32, table string) (*blocktx_api.RowsAffectedResponse, error) { - if mock.ClearBlocktxTableFunc == nil { - panic("UnitOfWorkMock.ClearBlocktxTableFunc: method is nil but UnitOfWork.ClearBlocktxTable was just called") - } - callInfo := struct { - Ctx context.Context - RetentionDays int32 - Table string - }{ - Ctx: ctx, - RetentionDays: retentionDays, - Table: table, - } - mock.lockClearBlocktxTable.Lock() - mock.calls.ClearBlocktxTable = append(mock.calls.ClearBlocktxTable, callInfo) - mock.lockClearBlocktxTable.Unlock() - return mock.ClearBlocktxTableFunc(ctx, retentionDays, table) -} - -// ClearBlocktxTableCalls gets all the calls that were made to ClearBlocktxTable. -// Check the length with: -// -// len(mockedUnitOfWork.ClearBlocktxTableCalls()) -func (mock *UnitOfWorkMock) ClearBlocktxTableCalls() []struct { - Ctx context.Context - RetentionDays int32 - Table string -} { - var calls []struct { - Ctx context.Context - RetentionDays int32 - Table string - } - mock.lockClearBlocktxTable.RLock() - calls = mock.calls.ClearBlocktxTable - mock.lockClearBlocktxTable.RUnlock() - return calls -} - -// Close calls CloseFunc. -func (mock *UnitOfWorkMock) Close() error { - if mock.CloseFunc == nil { - panic("UnitOfWorkMock.CloseFunc: method is nil but UnitOfWork.Close was just called") - } - callInfo := struct { - }{} - mock.lockClose.Lock() - mock.calls.Close = append(mock.calls.Close, callInfo) - mock.lockClose.Unlock() - return mock.CloseFunc() -} - -// CloseCalls gets all the calls that were made to Close. -// Check the length with: -// -// len(mockedUnitOfWork.CloseCalls()) -func (mock *UnitOfWorkMock) CloseCalls() []struct { -} { - var calls []struct { - } - mock.lockClose.RLock() - calls = mock.calls.Close - mock.lockClose.RUnlock() - return calls -} - -// Commit calls CommitFunc. -func (mock *UnitOfWorkMock) Commit() error { - if mock.CommitFunc == nil { - panic("UnitOfWorkMock.CommitFunc: method is nil but UnitOfWork.Commit was just called") - } - callInfo := struct { - }{} - mock.lockCommit.Lock() - mock.calls.Commit = append(mock.calls.Commit, callInfo) - mock.lockCommit.Unlock() - return mock.CommitFunc() -} - -// CommitCalls gets all the calls that were made to Commit. -// Check the length with: -// -// len(mockedUnitOfWork.CommitCalls()) -func (mock *UnitOfWorkMock) CommitCalls() []struct { -} { - var calls []struct { - } - mock.lockCommit.RLock() - calls = mock.calls.Commit - mock.lockCommit.RUnlock() - return calls -} - -// DelBlockProcessing calls DelBlockProcessingFunc. -func (mock *UnitOfWorkMock) DelBlockProcessing(ctx context.Context, hash *chainhash.Hash, processedBy string) (int64, error) { - if mock.DelBlockProcessingFunc == nil { - panic("UnitOfWorkMock.DelBlockProcessingFunc: method is nil but UnitOfWork.DelBlockProcessing was just called") - } - callInfo := struct { - Ctx context.Context - Hash *chainhash.Hash - ProcessedBy string - }{ - Ctx: ctx, - Hash: hash, - ProcessedBy: processedBy, - } - mock.lockDelBlockProcessing.Lock() - mock.calls.DelBlockProcessing = append(mock.calls.DelBlockProcessing, callInfo) - mock.lockDelBlockProcessing.Unlock() - return mock.DelBlockProcessingFunc(ctx, hash, processedBy) -} - -// DelBlockProcessingCalls gets all the calls that were made to DelBlockProcessing. -// Check the length with: -// -// len(mockedUnitOfWork.DelBlockProcessingCalls()) -func (mock *UnitOfWorkMock) DelBlockProcessingCalls() []struct { - Ctx context.Context - Hash *chainhash.Hash - ProcessedBy string -} { - var calls []struct { - Ctx context.Context - Hash *chainhash.Hash - ProcessedBy string - } - mock.lockDelBlockProcessing.RLock() - calls = mock.calls.DelBlockProcessing - mock.lockDelBlockProcessing.RUnlock() - return calls -} - -// GetBlock calls GetBlockFunc. -func (mock *UnitOfWorkMock) GetBlock(ctx context.Context, hash *chainhash.Hash) (*blocktx_api.Block, error) { - if mock.GetBlockFunc == nil { - panic("UnitOfWorkMock.GetBlockFunc: method is nil but UnitOfWork.GetBlock was just called") - } - callInfo := struct { - Ctx context.Context - Hash *chainhash.Hash - }{ - Ctx: ctx, - Hash: hash, - } - mock.lockGetBlock.Lock() - mock.calls.GetBlock = append(mock.calls.GetBlock, callInfo) - mock.lockGetBlock.Unlock() - return mock.GetBlockFunc(ctx, hash) -} - -// GetBlockCalls gets all the calls that were made to GetBlock. -// Check the length with: -// -// len(mockedUnitOfWork.GetBlockCalls()) -func (mock *UnitOfWorkMock) GetBlockCalls() []struct { - Ctx context.Context - Hash *chainhash.Hash -} { - var calls []struct { - Ctx context.Context - Hash *chainhash.Hash - } - mock.lockGetBlock.RLock() - calls = mock.calls.GetBlock - mock.lockGetBlock.RUnlock() - return calls -} - -// GetBlockByHeight calls GetBlockByHeightFunc. -func (mock *UnitOfWorkMock) GetBlockByHeight(ctx context.Context, height uint64) (*blocktx_api.Block, error) { - if mock.GetBlockByHeightFunc == nil { - panic("UnitOfWorkMock.GetBlockByHeightFunc: method is nil but UnitOfWork.GetBlockByHeight was just called") - } - callInfo := struct { - Ctx context.Context - Height uint64 - }{ - Ctx: ctx, - Height: height, - } - mock.lockGetBlockByHeight.Lock() - mock.calls.GetBlockByHeight = append(mock.calls.GetBlockByHeight, callInfo) - mock.lockGetBlockByHeight.Unlock() - return mock.GetBlockByHeightFunc(ctx, height) -} - -// GetBlockByHeightCalls gets all the calls that were made to GetBlockByHeight. -// Check the length with: -// -// len(mockedUnitOfWork.GetBlockByHeightCalls()) -func (mock *UnitOfWorkMock) GetBlockByHeightCalls() []struct { - Ctx context.Context - Height uint64 -} { - var calls []struct { - Ctx context.Context - Height uint64 - } - mock.lockGetBlockByHeight.RLock() - calls = mock.calls.GetBlockByHeight - mock.lockGetBlockByHeight.RUnlock() - return calls -} - -// GetBlockGaps calls GetBlockGapsFunc. -func (mock *UnitOfWorkMock) GetBlockGaps(ctx context.Context, heightRange int) ([]*store.BlockGap, error) { - if mock.GetBlockGapsFunc == nil { - panic("UnitOfWorkMock.GetBlockGapsFunc: method is nil but UnitOfWork.GetBlockGaps was just called") - } - callInfo := struct { - Ctx context.Context - HeightRange int - }{ - Ctx: ctx, - HeightRange: heightRange, - } - mock.lockGetBlockGaps.Lock() - mock.calls.GetBlockGaps = append(mock.calls.GetBlockGaps, callInfo) - mock.lockGetBlockGaps.Unlock() - return mock.GetBlockGapsFunc(ctx, heightRange) -} - -// GetBlockGapsCalls gets all the calls that were made to GetBlockGaps. -// Check the length with: -// -// len(mockedUnitOfWork.GetBlockGapsCalls()) -func (mock *UnitOfWorkMock) GetBlockGapsCalls() []struct { - Ctx context.Context - HeightRange int -} { - var calls []struct { - Ctx context.Context - HeightRange int - } - mock.lockGetBlockGaps.RLock() - calls = mock.calls.GetBlockGaps - mock.lockGetBlockGaps.RUnlock() - return calls -} - -// GetBlockHashesProcessingInProgress calls GetBlockHashesProcessingInProgressFunc. -func (mock *UnitOfWorkMock) GetBlockHashesProcessingInProgress(ctx context.Context, processedBy string) ([]*chainhash.Hash, error) { - if mock.GetBlockHashesProcessingInProgressFunc == nil { - panic("UnitOfWorkMock.GetBlockHashesProcessingInProgressFunc: method is nil but UnitOfWork.GetBlockHashesProcessingInProgress was just called") - } - callInfo := struct { - Ctx context.Context - ProcessedBy string - }{ - Ctx: ctx, - ProcessedBy: processedBy, - } - mock.lockGetBlockHashesProcessingInProgress.Lock() - mock.calls.GetBlockHashesProcessingInProgress = append(mock.calls.GetBlockHashesProcessingInProgress, callInfo) - mock.lockGetBlockHashesProcessingInProgress.Unlock() - return mock.GetBlockHashesProcessingInProgressFunc(ctx, processedBy) -} - -// GetBlockHashesProcessingInProgressCalls gets all the calls that were made to GetBlockHashesProcessingInProgress. -// Check the length with: -// -// len(mockedUnitOfWork.GetBlockHashesProcessingInProgressCalls()) -func (mock *UnitOfWorkMock) GetBlockHashesProcessingInProgressCalls() []struct { - Ctx context.Context - ProcessedBy string -} { - var calls []struct { - Ctx context.Context - ProcessedBy string - } - mock.lockGetBlockHashesProcessingInProgress.RLock() - calls = mock.calls.GetBlockHashesProcessingInProgress - mock.lockGetBlockHashesProcessingInProgress.RUnlock() - return calls -} - -// GetChainTip calls GetChainTipFunc. -func (mock *UnitOfWorkMock) GetChainTip(ctx context.Context) (*blocktx_api.Block, error) { - if mock.GetChainTipFunc == nil { - panic("UnitOfWorkMock.GetChainTipFunc: method is nil but UnitOfWork.GetChainTip was just called") - } - callInfo := struct { - Ctx context.Context - }{ - Ctx: ctx, - } - mock.lockGetChainTip.Lock() - mock.calls.GetChainTip = append(mock.calls.GetChainTip, callInfo) - mock.lockGetChainTip.Unlock() - return mock.GetChainTipFunc(ctx) -} - -// GetChainTipCalls gets all the calls that were made to GetChainTip. -// Check the length with: -// -// len(mockedUnitOfWork.GetChainTipCalls()) -func (mock *UnitOfWorkMock) GetChainTipCalls() []struct { - Ctx context.Context -} { - var calls []struct { - Ctx context.Context - } - mock.lockGetChainTip.RLock() - calls = mock.calls.GetChainTip - mock.lockGetChainTip.RUnlock() - return calls -} - -// GetLongestChainFromHeight calls GetLongestChainFromHeightFunc. -func (mock *UnitOfWorkMock) GetLongestChainFromHeight(ctx context.Context, height uint64) ([]*blocktx_api.Block, error) { - if mock.GetLongestChainFromHeightFunc == nil { - panic("UnitOfWorkMock.GetLongestChainFromHeightFunc: method is nil but UnitOfWork.GetLongestChainFromHeight was just called") - } - callInfo := struct { - Ctx context.Context - Height uint64 - }{ - Ctx: ctx, - Height: height, - } - mock.lockGetLongestChainFromHeight.Lock() - mock.calls.GetLongestChainFromHeight = append(mock.calls.GetLongestChainFromHeight, callInfo) - mock.lockGetLongestChainFromHeight.Unlock() - return mock.GetLongestChainFromHeightFunc(ctx, height) -} - -// GetLongestChainFromHeightCalls gets all the calls that were made to GetLongestChainFromHeight. -// Check the length with: -// -// len(mockedUnitOfWork.GetLongestChainFromHeightCalls()) -func (mock *UnitOfWorkMock) GetLongestChainFromHeightCalls() []struct { - Ctx context.Context - Height uint64 -} { - var calls []struct { - Ctx context.Context - Height uint64 - } - mock.lockGetLongestChainFromHeight.RLock() - calls = mock.calls.GetLongestChainFromHeight - mock.lockGetLongestChainFromHeight.RUnlock() - return calls -} - -// GetMinedTransactions calls GetMinedTransactionsFunc. -func (mock *UnitOfWorkMock) GetMinedTransactions(ctx context.Context, hashes [][]byte, onlyLongestChain bool) ([]store.TransactionBlock, error) { - if mock.GetMinedTransactionsFunc == nil { - panic("UnitOfWorkMock.GetMinedTransactionsFunc: method is nil but UnitOfWork.GetMinedTransactions was just called") - } - callInfo := struct { - Ctx context.Context - Hashes [][]byte - OnlyLongestChain bool - }{ - Ctx: ctx, - Hashes: hashes, - OnlyLongestChain: onlyLongestChain, - } - mock.lockGetMinedTransactions.Lock() - mock.calls.GetMinedTransactions = append(mock.calls.GetMinedTransactions, callInfo) - mock.lockGetMinedTransactions.Unlock() - return mock.GetMinedTransactionsFunc(ctx, hashes, onlyLongestChain) -} - -// GetMinedTransactionsCalls gets all the calls that were made to GetMinedTransactions. -// Check the length with: -// -// len(mockedUnitOfWork.GetMinedTransactionsCalls()) -func (mock *UnitOfWorkMock) GetMinedTransactionsCalls() []struct { - Ctx context.Context - Hashes [][]byte - OnlyLongestChain bool -} { - var calls []struct { - Ctx context.Context - Hashes [][]byte - OnlyLongestChain bool - } - mock.lockGetMinedTransactions.RLock() - calls = mock.calls.GetMinedTransactions - mock.lockGetMinedTransactions.RUnlock() - return calls -} - -// GetOrphanedChainUpFromHash calls GetOrphanedChainUpFromHashFunc. -func (mock *UnitOfWorkMock) GetOrphanedChainUpFromHash(ctx context.Context, hash []byte) ([]*blocktx_api.Block, error) { - if mock.GetOrphanedChainUpFromHashFunc == nil { - panic("UnitOfWorkMock.GetOrphanedChainUpFromHashFunc: method is nil but UnitOfWork.GetOrphanedChainUpFromHash was just called") - } - callInfo := struct { - Ctx context.Context - Hash []byte - }{ - Ctx: ctx, - Hash: hash, - } - mock.lockGetOrphanedChainUpFromHash.Lock() - mock.calls.GetOrphanedChainUpFromHash = append(mock.calls.GetOrphanedChainUpFromHash, callInfo) - mock.lockGetOrphanedChainUpFromHash.Unlock() - return mock.GetOrphanedChainUpFromHashFunc(ctx, hash) -} - -// GetOrphanedChainUpFromHashCalls gets all the calls that were made to GetOrphanedChainUpFromHash. -// Check the length with: -// -// len(mockedUnitOfWork.GetOrphanedChainUpFromHashCalls()) -func (mock *UnitOfWorkMock) GetOrphanedChainUpFromHashCalls() []struct { - Ctx context.Context - Hash []byte -} { - var calls []struct { - Ctx context.Context - Hash []byte - } - mock.lockGetOrphanedChainUpFromHash.RLock() - calls = mock.calls.GetOrphanedChainUpFromHash - mock.lockGetOrphanedChainUpFromHash.RUnlock() - return calls -} - -// GetRegisteredTxsByBlockHashes calls GetRegisteredTxsByBlockHashesFunc. -func (mock *UnitOfWorkMock) GetRegisteredTxsByBlockHashes(ctx context.Context, blockHashes [][]byte) ([]store.TransactionBlock, error) { - if mock.GetRegisteredTxsByBlockHashesFunc == nil { - panic("UnitOfWorkMock.GetRegisteredTxsByBlockHashesFunc: method is nil but UnitOfWork.GetRegisteredTxsByBlockHashes was just called") - } - callInfo := struct { - Ctx context.Context - BlockHashes [][]byte - }{ - Ctx: ctx, - BlockHashes: blockHashes, - } - mock.lockGetRegisteredTxsByBlockHashes.Lock() - mock.calls.GetRegisteredTxsByBlockHashes = append(mock.calls.GetRegisteredTxsByBlockHashes, callInfo) - mock.lockGetRegisteredTxsByBlockHashes.Unlock() - return mock.GetRegisteredTxsByBlockHashesFunc(ctx, blockHashes) -} - -// GetRegisteredTxsByBlockHashesCalls gets all the calls that were made to GetRegisteredTxsByBlockHashes. -// Check the length with: -// -// len(mockedUnitOfWork.GetRegisteredTxsByBlockHashesCalls()) -func (mock *UnitOfWorkMock) GetRegisteredTxsByBlockHashesCalls() []struct { - Ctx context.Context - BlockHashes [][]byte -} { - var calls []struct { - Ctx context.Context - BlockHashes [][]byte - } - mock.lockGetRegisteredTxsByBlockHashes.RLock() - calls = mock.calls.GetRegisteredTxsByBlockHashes - mock.lockGetRegisteredTxsByBlockHashes.RUnlock() - return calls -} - -// GetStaleChainBackFromHash calls GetStaleChainBackFromHashFunc. -func (mock *UnitOfWorkMock) GetStaleChainBackFromHash(ctx context.Context, hash []byte) ([]*blocktx_api.Block, error) { - if mock.GetStaleChainBackFromHashFunc == nil { - panic("UnitOfWorkMock.GetStaleChainBackFromHashFunc: method is nil but UnitOfWork.GetStaleChainBackFromHash was just called") - } - callInfo := struct { - Ctx context.Context - Hash []byte - }{ - Ctx: ctx, - Hash: hash, - } - mock.lockGetStaleChainBackFromHash.Lock() - mock.calls.GetStaleChainBackFromHash = append(mock.calls.GetStaleChainBackFromHash, callInfo) - mock.lockGetStaleChainBackFromHash.Unlock() - return mock.GetStaleChainBackFromHashFunc(ctx, hash) -} - -// GetStaleChainBackFromHashCalls gets all the calls that were made to GetStaleChainBackFromHash. -// Check the length with: -// -// len(mockedUnitOfWork.GetStaleChainBackFromHashCalls()) -func (mock *UnitOfWorkMock) GetStaleChainBackFromHashCalls() []struct { - Ctx context.Context - Hash []byte -} { - var calls []struct { - Ctx context.Context - Hash []byte - } - mock.lockGetStaleChainBackFromHash.RLock() - calls = mock.calls.GetStaleChainBackFromHash - mock.lockGetStaleChainBackFromHash.RUnlock() - return calls -} - -// MarkBlockAsDone calls MarkBlockAsDoneFunc. -func (mock *UnitOfWorkMock) MarkBlockAsDone(ctx context.Context, hash *chainhash.Hash, size uint64, txCount uint64) error { - if mock.MarkBlockAsDoneFunc == nil { - panic("UnitOfWorkMock.MarkBlockAsDoneFunc: method is nil but UnitOfWork.MarkBlockAsDone was just called") - } - callInfo := struct { - Ctx context.Context - Hash *chainhash.Hash - Size uint64 - TxCount uint64 - }{ - Ctx: ctx, - Hash: hash, - Size: size, - TxCount: txCount, - } - mock.lockMarkBlockAsDone.Lock() - mock.calls.MarkBlockAsDone = append(mock.calls.MarkBlockAsDone, callInfo) - mock.lockMarkBlockAsDone.Unlock() - return mock.MarkBlockAsDoneFunc(ctx, hash, size, txCount) -} - -// MarkBlockAsDoneCalls gets all the calls that were made to MarkBlockAsDone. -// Check the length with: -// -// len(mockedUnitOfWork.MarkBlockAsDoneCalls()) -func (mock *UnitOfWorkMock) MarkBlockAsDoneCalls() []struct { - Ctx context.Context - Hash *chainhash.Hash - Size uint64 - TxCount uint64 -} { - var calls []struct { - Ctx context.Context - Hash *chainhash.Hash - Size uint64 - TxCount uint64 - } - mock.lockMarkBlockAsDone.RLock() - calls = mock.calls.MarkBlockAsDone - mock.lockMarkBlockAsDone.RUnlock() - return calls -} - -// Ping calls PingFunc. -func (mock *UnitOfWorkMock) Ping(ctx context.Context) error { - if mock.PingFunc == nil { - panic("UnitOfWorkMock.PingFunc: method is nil but UnitOfWork.Ping was just called") - } - callInfo := struct { - Ctx context.Context - }{ - Ctx: ctx, - } - mock.lockPing.Lock() - mock.calls.Ping = append(mock.calls.Ping, callInfo) - mock.lockPing.Unlock() - return mock.PingFunc(ctx) -} - -// PingCalls gets all the calls that were made to Ping. -// Check the length with: -// -// len(mockedUnitOfWork.PingCalls()) -func (mock *UnitOfWorkMock) PingCalls() []struct { - Ctx context.Context -} { - var calls []struct { - Ctx context.Context - } - mock.lockPing.RLock() - calls = mock.calls.Ping - mock.lockPing.RUnlock() - return calls -} - -// RegisterTransactions calls RegisterTransactionsFunc. -func (mock *UnitOfWorkMock) RegisterTransactions(ctx context.Context, txHashes [][]byte) ([]*chainhash.Hash, error) { - if mock.RegisterTransactionsFunc == nil { - panic("UnitOfWorkMock.RegisterTransactionsFunc: method is nil but UnitOfWork.RegisterTransactions was just called") - } - callInfo := struct { - Ctx context.Context - TxHashes [][]byte - }{ - Ctx: ctx, - TxHashes: txHashes, - } - mock.lockRegisterTransactions.Lock() - mock.calls.RegisterTransactions = append(mock.calls.RegisterTransactions, callInfo) - mock.lockRegisterTransactions.Unlock() - return mock.RegisterTransactionsFunc(ctx, txHashes) -} - -// RegisterTransactionsCalls gets all the calls that were made to RegisterTransactions. -// Check the length with: -// -// len(mockedUnitOfWork.RegisterTransactionsCalls()) -func (mock *UnitOfWorkMock) RegisterTransactionsCalls() []struct { - Ctx context.Context - TxHashes [][]byte -} { - var calls []struct { - Ctx context.Context - TxHashes [][]byte - } - mock.lockRegisterTransactions.RLock() - calls = mock.calls.RegisterTransactions - mock.lockRegisterTransactions.RUnlock() - return calls -} - -// Rollback calls RollbackFunc. -func (mock *UnitOfWorkMock) Rollback() error { - if mock.RollbackFunc == nil { - panic("UnitOfWorkMock.RollbackFunc: method is nil but UnitOfWork.Rollback was just called") - } - callInfo := struct { - }{} - mock.lockRollback.Lock() - mock.calls.Rollback = append(mock.calls.Rollback, callInfo) - mock.lockRollback.Unlock() - return mock.RollbackFunc() -} - -// RollbackCalls gets all the calls that were made to Rollback. -// Check the length with: -// -// len(mockedUnitOfWork.RollbackCalls()) -func (mock *UnitOfWorkMock) RollbackCalls() []struct { -} { - var calls []struct { - } - mock.lockRollback.RLock() - calls = mock.calls.Rollback - mock.lockRollback.RUnlock() - return calls -} - -// SetBlockProcessing calls SetBlockProcessingFunc. -func (mock *UnitOfWorkMock) SetBlockProcessing(ctx context.Context, hash *chainhash.Hash, processedBy string) (string, error) { - if mock.SetBlockProcessingFunc == nil { - panic("UnitOfWorkMock.SetBlockProcessingFunc: method is nil but UnitOfWork.SetBlockProcessing was just called") - } - callInfo := struct { - Ctx context.Context - Hash *chainhash.Hash - ProcessedBy string - }{ - Ctx: ctx, - Hash: hash, - ProcessedBy: processedBy, - } - mock.lockSetBlockProcessing.Lock() - mock.calls.SetBlockProcessing = append(mock.calls.SetBlockProcessing, callInfo) - mock.lockSetBlockProcessing.Unlock() - return mock.SetBlockProcessingFunc(ctx, hash, processedBy) -} - -// SetBlockProcessingCalls gets all the calls that were made to SetBlockProcessing. -// Check the length with: -// -// len(mockedUnitOfWork.SetBlockProcessingCalls()) -func (mock *UnitOfWorkMock) SetBlockProcessingCalls() []struct { - Ctx context.Context - Hash *chainhash.Hash - ProcessedBy string -} { - var calls []struct { - Ctx context.Context - Hash *chainhash.Hash - ProcessedBy string - } - mock.lockSetBlockProcessing.RLock() - calls = mock.calls.SetBlockProcessing - mock.lockSetBlockProcessing.RUnlock() - return calls -} - -// StartUnitOfWork calls StartUnitOfWorkFunc. -func (mock *UnitOfWorkMock) StartUnitOfWork(ctx context.Context) (store.UnitOfWork, error) { - if mock.StartUnitOfWorkFunc == nil { - panic("UnitOfWorkMock.StartUnitOfWorkFunc: method is nil but UnitOfWork.StartUnitOfWork was just called") - } - callInfo := struct { - Ctx context.Context - }{ - Ctx: ctx, - } - mock.lockStartUnitOfWork.Lock() - mock.calls.StartUnitOfWork = append(mock.calls.StartUnitOfWork, callInfo) - mock.lockStartUnitOfWork.Unlock() - return mock.StartUnitOfWorkFunc(ctx) -} - -// StartUnitOfWorkCalls gets all the calls that were made to StartUnitOfWork. -// Check the length with: -// -// len(mockedUnitOfWork.StartUnitOfWorkCalls()) -func (mock *UnitOfWorkMock) StartUnitOfWorkCalls() []struct { - Ctx context.Context -} { - var calls []struct { - Ctx context.Context - } - mock.lockStartUnitOfWork.RLock() - calls = mock.calls.StartUnitOfWork - mock.lockStartUnitOfWork.RUnlock() - return calls -} - -// UpdateBlocksStatuses calls UpdateBlocksStatusesFunc. -func (mock *UnitOfWorkMock) UpdateBlocksStatuses(ctx context.Context, blockStatusUpdates []store.BlockStatusUpdate) error { - if mock.UpdateBlocksStatusesFunc == nil { - panic("UnitOfWorkMock.UpdateBlocksStatusesFunc: method is nil but UnitOfWork.UpdateBlocksStatuses was just called") - } - callInfo := struct { - Ctx context.Context - BlockStatusUpdates []store.BlockStatusUpdate - }{ - Ctx: ctx, - BlockStatusUpdates: blockStatusUpdates, - } - mock.lockUpdateBlocksStatuses.Lock() - mock.calls.UpdateBlocksStatuses = append(mock.calls.UpdateBlocksStatuses, callInfo) - mock.lockUpdateBlocksStatuses.Unlock() - return mock.UpdateBlocksStatusesFunc(ctx, blockStatusUpdates) -} - -// UpdateBlocksStatusesCalls gets all the calls that were made to UpdateBlocksStatuses. -// Check the length with: -// -// len(mockedUnitOfWork.UpdateBlocksStatusesCalls()) -func (mock *UnitOfWorkMock) UpdateBlocksStatusesCalls() []struct { - Ctx context.Context - BlockStatusUpdates []store.BlockStatusUpdate -} { - var calls []struct { - Ctx context.Context - BlockStatusUpdates []store.BlockStatusUpdate - } - mock.lockUpdateBlocksStatuses.RLock() - calls = mock.calls.UpdateBlocksStatuses - mock.lockUpdateBlocksStatuses.RUnlock() - return calls -} - -// UpsertBlock calls UpsertBlockFunc. -func (mock *UnitOfWorkMock) UpsertBlock(ctx context.Context, block *blocktx_api.Block) (uint64, error) { - if mock.UpsertBlockFunc == nil { - panic("UnitOfWorkMock.UpsertBlockFunc: method is nil but UnitOfWork.UpsertBlock was just called") - } - callInfo := struct { - Ctx context.Context - Block *blocktx_api.Block - }{ - Ctx: ctx, - Block: block, - } - mock.lockUpsertBlock.Lock() - mock.calls.UpsertBlock = append(mock.calls.UpsertBlock, callInfo) - mock.lockUpsertBlock.Unlock() - return mock.UpsertBlockFunc(ctx, block) -} - -// UpsertBlockCalls gets all the calls that were made to UpsertBlock. -// Check the length with: -// -// len(mockedUnitOfWork.UpsertBlockCalls()) -func (mock *UnitOfWorkMock) UpsertBlockCalls() []struct { - Ctx context.Context - Block *blocktx_api.Block -} { - var calls []struct { - Ctx context.Context - Block *blocktx_api.Block - } - mock.lockUpsertBlock.RLock() - calls = mock.calls.UpsertBlock - mock.lockUpsertBlock.RUnlock() - return calls -} - -// UpsertBlockTransactions calls UpsertBlockTransactionsFunc. -func (mock *UnitOfWorkMock) UpsertBlockTransactions(ctx context.Context, blockID uint64, txsWithMerklePaths []store.TxWithMerklePath) error { - if mock.UpsertBlockTransactionsFunc == nil { - panic("UnitOfWorkMock.UpsertBlockTransactionsFunc: method is nil but UnitOfWork.UpsertBlockTransactions was just called") - } - callInfo := struct { - Ctx context.Context - BlockID uint64 - TxsWithMerklePaths []store.TxWithMerklePath - }{ - Ctx: ctx, - BlockID: blockID, - TxsWithMerklePaths: txsWithMerklePaths, - } - mock.lockUpsertBlockTransactions.Lock() - mock.calls.UpsertBlockTransactions = append(mock.calls.UpsertBlockTransactions, callInfo) - mock.lockUpsertBlockTransactions.Unlock() - return mock.UpsertBlockTransactionsFunc(ctx, blockID, txsWithMerklePaths) -} - -// UpsertBlockTransactionsCalls gets all the calls that were made to UpsertBlockTransactions. -// Check the length with: -// -// len(mockedUnitOfWork.UpsertBlockTransactionsCalls()) -func (mock *UnitOfWorkMock) UpsertBlockTransactionsCalls() []struct { - Ctx context.Context - BlockID uint64 - TxsWithMerklePaths []store.TxWithMerklePath -} { - var calls []struct { - Ctx context.Context - BlockID uint64 - TxsWithMerklePaths []store.TxWithMerklePath - } - mock.lockUpsertBlockTransactions.RLock() - calls = mock.calls.UpsertBlockTransactions - mock.lockUpsertBlockTransactions.RUnlock() - return calls -} - -// VerifyMerkleRoots calls VerifyMerkleRootsFunc. -func (mock *UnitOfWorkMock) VerifyMerkleRoots(ctx context.Context, merkleRoots []*blocktx_api.MerkleRootVerificationRequest, maxAllowedBlockHeightMismatch int) (*blocktx_api.MerkleRootVerificationResponse, error) { - if mock.VerifyMerkleRootsFunc == nil { - panic("UnitOfWorkMock.VerifyMerkleRootsFunc: method is nil but UnitOfWork.VerifyMerkleRoots was just called") - } - callInfo := struct { - Ctx context.Context - MerkleRoots []*blocktx_api.MerkleRootVerificationRequest - MaxAllowedBlockHeightMismatch int - }{ - Ctx: ctx, - MerkleRoots: merkleRoots, - MaxAllowedBlockHeightMismatch: maxAllowedBlockHeightMismatch, - } - mock.lockVerifyMerkleRoots.Lock() - mock.calls.VerifyMerkleRoots = append(mock.calls.VerifyMerkleRoots, callInfo) - mock.lockVerifyMerkleRoots.Unlock() - return mock.VerifyMerkleRootsFunc(ctx, merkleRoots, maxAllowedBlockHeightMismatch) -} - -// VerifyMerkleRootsCalls gets all the calls that were made to VerifyMerkleRoots. -// Check the length with: -// -// len(mockedUnitOfWork.VerifyMerkleRootsCalls()) -func (mock *UnitOfWorkMock) VerifyMerkleRootsCalls() []struct { - Ctx context.Context - MerkleRoots []*blocktx_api.MerkleRootVerificationRequest - MaxAllowedBlockHeightMismatch int -} { - var calls []struct { - Ctx context.Context - MerkleRoots []*blocktx_api.MerkleRootVerificationRequest - MaxAllowedBlockHeightMismatch int - } - mock.lockVerifyMerkleRoots.RLock() - calls = mock.calls.VerifyMerkleRoots - mock.lockVerifyMerkleRoots.RUnlock() - return calls -} - -// WriteLockBlocksTable calls WriteLockBlocksTableFunc. -func (mock *UnitOfWorkMock) WriteLockBlocksTable(ctx context.Context) error { - if mock.WriteLockBlocksTableFunc == nil { - panic("UnitOfWorkMock.WriteLockBlocksTableFunc: method is nil but UnitOfWork.WriteLockBlocksTable was just called") - } - callInfo := struct { - Ctx context.Context - }{ - Ctx: ctx, - } - mock.lockWriteLockBlocksTable.Lock() - mock.calls.WriteLockBlocksTable = append(mock.calls.WriteLockBlocksTable, callInfo) - mock.lockWriteLockBlocksTable.Unlock() - return mock.WriteLockBlocksTableFunc(ctx) -} - -// WriteLockBlocksTableCalls gets all the calls that were made to WriteLockBlocksTable. -// Check the length with: -// -// len(mockedUnitOfWork.WriteLockBlocksTableCalls()) -func (mock *UnitOfWorkMock) WriteLockBlocksTableCalls() []struct { - Ctx context.Context -} { - var calls []struct { - Ctx context.Context - } - mock.lockWriteLockBlocksTable.RLock() - calls = mock.calls.WriteLockBlocksTable - mock.lockWriteLockBlocksTable.RUnlock() - return calls -} diff --git a/internal/blocktx/store/mocks/blocktx_store_mock.go b/internal/blocktx/store/mocks/blocktx_store_mock.go index 3e946db27..3cd0fa0bc 100644 --- a/internal/blocktx/store/mocks/blocktx_store_mock.go +++ b/internal/blocktx/store/mocks/blocktx_store_mock.go @@ -33,9 +33,6 @@ var _ store.BlocktxStore = &BlocktxStoreMock{} // GetBlockFunc: func(ctx context.Context, hash *chainhash.Hash) (*blocktx_api.Block, error) { // panic("mock out the GetBlock method") // }, -// GetBlockByHeightFunc: func(ctx context.Context, height uint64) (*blocktx_api.Block, error) { -// panic("mock out the GetBlockByHeight method") -// }, // GetBlockGapsFunc: func(ctx context.Context, heightRange int) ([]*store.BlockGap, error) { // panic("mock out the GetBlockGaps method") // }, @@ -45,14 +42,17 @@ var _ store.BlocktxStore = &BlocktxStoreMock{} // GetChainTipFunc: func(ctx context.Context) (*blocktx_api.Block, error) { // panic("mock out the GetChainTip method") // }, +// GetLongestBlockByHeightFunc: func(ctx context.Context, height uint64) (*blocktx_api.Block, error) { +// panic("mock out the GetLongestBlockByHeight method") +// }, // GetLongestChainFromHeightFunc: func(ctx context.Context, height uint64) ([]*blocktx_api.Block, error) { // panic("mock out the GetLongestChainFromHeight method") // }, // GetMinedTransactionsFunc: func(ctx context.Context, hashes [][]byte, onlyLongestChain bool) ([]store.TransactionBlock, error) { // panic("mock out the GetMinedTransactions method") // }, -// GetOrphanedChainUpFromHashFunc: func(ctx context.Context, hash []byte) ([]*blocktx_api.Block, error) { -// panic("mock out the GetOrphanedChainUpFromHash method") +// GetOrphansBackToNonOrphanAncestorFunc: func(ctx context.Context, hash []byte) ([]*blocktx_api.Block, *blocktx_api.Block, error) { +// panic("mock out the GetOrphansBackToNonOrphanAncestor method") // }, // GetRegisteredTxsByBlockHashesFunc: func(ctx context.Context, blockHashes [][]byte) ([]store.TransactionBlock, error) { // panic("mock out the GetRegisteredTxsByBlockHashes method") @@ -72,9 +72,6 @@ var _ store.BlocktxStore = &BlocktxStoreMock{} // SetBlockProcessingFunc: func(ctx context.Context, hash *chainhash.Hash, processedBy string) (string, error) { // panic("mock out the SetBlockProcessing method") // }, -// StartUnitOfWorkFunc: func(ctx context.Context) (store.UnitOfWork, error) { -// panic("mock out the StartUnitOfWork method") -// }, // UpdateBlocksStatusesFunc: func(ctx context.Context, blockStatusUpdates []store.BlockStatusUpdate) error { // panic("mock out the UpdateBlocksStatuses method") // }, @@ -106,9 +103,6 @@ type BlocktxStoreMock struct { // GetBlockFunc mocks the GetBlock method. GetBlockFunc func(ctx context.Context, hash *chainhash.Hash) (*blocktx_api.Block, error) - // GetBlockByHeightFunc mocks the GetBlockByHeight method. - GetBlockByHeightFunc func(ctx context.Context, height uint64) (*blocktx_api.Block, error) - // GetBlockGapsFunc mocks the GetBlockGaps method. GetBlockGapsFunc func(ctx context.Context, heightRange int) ([]*store.BlockGap, error) @@ -118,14 +112,17 @@ type BlocktxStoreMock struct { // GetChainTipFunc mocks the GetChainTip method. GetChainTipFunc func(ctx context.Context) (*blocktx_api.Block, error) + // GetLongestBlockByHeightFunc mocks the GetLongestBlockByHeight method. + GetLongestBlockByHeightFunc func(ctx context.Context, height uint64) (*blocktx_api.Block, error) + // GetLongestChainFromHeightFunc mocks the GetLongestChainFromHeight method. GetLongestChainFromHeightFunc func(ctx context.Context, height uint64) ([]*blocktx_api.Block, error) // GetMinedTransactionsFunc mocks the GetMinedTransactions method. GetMinedTransactionsFunc func(ctx context.Context, hashes [][]byte, onlyLongestChain bool) ([]store.TransactionBlock, error) - // GetOrphanedChainUpFromHashFunc mocks the GetOrphanedChainUpFromHash method. - GetOrphanedChainUpFromHashFunc func(ctx context.Context, hash []byte) ([]*blocktx_api.Block, error) + // GetOrphansBackToNonOrphanAncestorFunc mocks the GetOrphansBackToNonOrphanAncestor method. + GetOrphansBackToNonOrphanAncestorFunc func(ctx context.Context, hash []byte) ([]*blocktx_api.Block, *blocktx_api.Block, error) // GetRegisteredTxsByBlockHashesFunc mocks the GetRegisteredTxsByBlockHashes method. GetRegisteredTxsByBlockHashesFunc func(ctx context.Context, blockHashes [][]byte) ([]store.TransactionBlock, error) @@ -145,9 +142,6 @@ type BlocktxStoreMock struct { // SetBlockProcessingFunc mocks the SetBlockProcessing method. SetBlockProcessingFunc func(ctx context.Context, hash *chainhash.Hash, processedBy string) (string, error) - // StartUnitOfWorkFunc mocks the StartUnitOfWork method. - StartUnitOfWorkFunc func(ctx context.Context) (store.UnitOfWork, error) - // UpdateBlocksStatusesFunc mocks the UpdateBlocksStatuses method. UpdateBlocksStatusesFunc func(ctx context.Context, blockStatusUpdates []store.BlockStatusUpdate) error @@ -190,13 +184,6 @@ type BlocktxStoreMock struct { // Hash is the hash argument value. Hash *chainhash.Hash } - // GetBlockByHeight holds details about calls to the GetBlockByHeight method. - GetBlockByHeight []struct { - // Ctx is the ctx argument value. - Ctx context.Context - // Height is the height argument value. - Height uint64 - } // GetBlockGaps holds details about calls to the GetBlockGaps method. GetBlockGaps []struct { // Ctx is the ctx argument value. @@ -216,6 +203,13 @@ type BlocktxStoreMock struct { // Ctx is the ctx argument value. Ctx context.Context } + // GetLongestBlockByHeight holds details about calls to the GetLongestBlockByHeight method. + GetLongestBlockByHeight []struct { + // Ctx is the ctx argument value. + Ctx context.Context + // Height is the height argument value. + Height uint64 + } // GetLongestChainFromHeight holds details about calls to the GetLongestChainFromHeight method. GetLongestChainFromHeight []struct { // Ctx is the ctx argument value. @@ -232,8 +226,8 @@ type BlocktxStoreMock struct { // OnlyLongestChain is the onlyLongestChain argument value. OnlyLongestChain bool } - // GetOrphanedChainUpFromHash holds details about calls to the GetOrphanedChainUpFromHash method. - GetOrphanedChainUpFromHash []struct { + // GetOrphansBackToNonOrphanAncestor holds details about calls to the GetOrphansBackToNonOrphanAncestor method. + GetOrphansBackToNonOrphanAncestor []struct { // Ctx is the ctx argument value. Ctx context.Context // Hash is the hash argument value. @@ -285,11 +279,6 @@ type BlocktxStoreMock struct { // ProcessedBy is the processedBy argument value. ProcessedBy string } - // StartUnitOfWork holds details about calls to the StartUnitOfWork method. - StartUnitOfWork []struct { - // Ctx is the ctx argument value. - Ctx context.Context - } // UpdateBlocksStatuses holds details about calls to the UpdateBlocksStatuses method. UpdateBlocksStatuses []struct { // Ctx is the ctx argument value. @@ -327,20 +316,19 @@ type BlocktxStoreMock struct { lockClose sync.RWMutex lockDelBlockProcessing sync.RWMutex lockGetBlock sync.RWMutex - lockGetBlockByHeight sync.RWMutex lockGetBlockGaps sync.RWMutex lockGetBlockHashesProcessingInProgress sync.RWMutex lockGetChainTip sync.RWMutex + lockGetLongestBlockByHeight sync.RWMutex lockGetLongestChainFromHeight sync.RWMutex lockGetMinedTransactions sync.RWMutex - lockGetOrphanedChainUpFromHash sync.RWMutex + lockGetOrphansBackToNonOrphanAncestor sync.RWMutex lockGetRegisteredTxsByBlockHashes sync.RWMutex lockGetStaleChainBackFromHash sync.RWMutex lockMarkBlockAsDone sync.RWMutex lockPing sync.RWMutex lockRegisterTransactions sync.RWMutex lockSetBlockProcessing sync.RWMutex - lockStartUnitOfWork sync.RWMutex lockUpdateBlocksStatuses sync.RWMutex lockUpsertBlock sync.RWMutex lockUpsertBlockTransactions sync.RWMutex @@ -490,42 +478,6 @@ func (mock *BlocktxStoreMock) GetBlockCalls() []struct { return calls } -// GetBlockByHeight calls GetBlockByHeightFunc. -func (mock *BlocktxStoreMock) GetBlockByHeight(ctx context.Context, height uint64) (*blocktx_api.Block, error) { - if mock.GetBlockByHeightFunc == nil { - panic("BlocktxStoreMock.GetBlockByHeightFunc: method is nil but BlocktxStore.GetBlockByHeight was just called") - } - callInfo := struct { - Ctx context.Context - Height uint64 - }{ - Ctx: ctx, - Height: height, - } - mock.lockGetBlockByHeight.Lock() - mock.calls.GetBlockByHeight = append(mock.calls.GetBlockByHeight, callInfo) - mock.lockGetBlockByHeight.Unlock() - return mock.GetBlockByHeightFunc(ctx, height) -} - -// GetBlockByHeightCalls gets all the calls that were made to GetBlockByHeight. -// Check the length with: -// -// len(mockedBlocktxStore.GetBlockByHeightCalls()) -func (mock *BlocktxStoreMock) GetBlockByHeightCalls() []struct { - Ctx context.Context - Height uint64 -} { - var calls []struct { - Ctx context.Context - Height uint64 - } - mock.lockGetBlockByHeight.RLock() - calls = mock.calls.GetBlockByHeight - mock.lockGetBlockByHeight.RUnlock() - return calls -} - // GetBlockGaps calls GetBlockGapsFunc. func (mock *BlocktxStoreMock) GetBlockGaps(ctx context.Context, heightRange int) ([]*store.BlockGap, error) { if mock.GetBlockGapsFunc == nil { @@ -630,6 +582,42 @@ func (mock *BlocktxStoreMock) GetChainTipCalls() []struct { return calls } +// GetLongestBlockByHeight calls GetLongestBlockByHeightFunc. +func (mock *BlocktxStoreMock) GetLongestBlockByHeight(ctx context.Context, height uint64) (*blocktx_api.Block, error) { + if mock.GetLongestBlockByHeightFunc == nil { + panic("BlocktxStoreMock.GetLongestBlockByHeightFunc: method is nil but BlocktxStore.GetLongestBlockByHeight was just called") + } + callInfo := struct { + Ctx context.Context + Height uint64 + }{ + Ctx: ctx, + Height: height, + } + mock.lockGetLongestBlockByHeight.Lock() + mock.calls.GetLongestBlockByHeight = append(mock.calls.GetLongestBlockByHeight, callInfo) + mock.lockGetLongestBlockByHeight.Unlock() + return mock.GetLongestBlockByHeightFunc(ctx, height) +} + +// GetLongestBlockByHeightCalls gets all the calls that were made to GetLongestBlockByHeight. +// Check the length with: +// +// len(mockedBlocktxStore.GetLongestBlockByHeightCalls()) +func (mock *BlocktxStoreMock) GetLongestBlockByHeightCalls() []struct { + Ctx context.Context + Height uint64 +} { + var calls []struct { + Ctx context.Context + Height uint64 + } + mock.lockGetLongestBlockByHeight.RLock() + calls = mock.calls.GetLongestBlockByHeight + mock.lockGetLongestBlockByHeight.RUnlock() + return calls +} + // GetLongestChainFromHeight calls GetLongestChainFromHeightFunc. func (mock *BlocktxStoreMock) GetLongestChainFromHeight(ctx context.Context, height uint64) ([]*blocktx_api.Block, error) { if mock.GetLongestChainFromHeightFunc == nil { @@ -706,10 +694,10 @@ func (mock *BlocktxStoreMock) GetMinedTransactionsCalls() []struct { return calls } -// GetOrphanedChainUpFromHash calls GetOrphanedChainUpFromHashFunc. -func (mock *BlocktxStoreMock) GetOrphanedChainUpFromHash(ctx context.Context, hash []byte) ([]*blocktx_api.Block, error) { - if mock.GetOrphanedChainUpFromHashFunc == nil { - panic("BlocktxStoreMock.GetOrphanedChainUpFromHashFunc: method is nil but BlocktxStore.GetOrphanedChainUpFromHash was just called") +// GetOrphansBackToNonOrphanAncestor calls GetOrphansBackToNonOrphanAncestorFunc. +func (mock *BlocktxStoreMock) GetOrphansBackToNonOrphanAncestor(ctx context.Context, hash []byte) ([]*blocktx_api.Block, *blocktx_api.Block, error) { + if mock.GetOrphansBackToNonOrphanAncestorFunc == nil { + panic("BlocktxStoreMock.GetOrphansBackToNonOrphanAncestorFunc: method is nil but BlocktxStore.GetOrphansBackToNonOrphanAncestor was just called") } callInfo := struct { Ctx context.Context @@ -718,17 +706,17 @@ func (mock *BlocktxStoreMock) GetOrphanedChainUpFromHash(ctx context.Context, ha Ctx: ctx, Hash: hash, } - mock.lockGetOrphanedChainUpFromHash.Lock() - mock.calls.GetOrphanedChainUpFromHash = append(mock.calls.GetOrphanedChainUpFromHash, callInfo) - mock.lockGetOrphanedChainUpFromHash.Unlock() - return mock.GetOrphanedChainUpFromHashFunc(ctx, hash) + mock.lockGetOrphansBackToNonOrphanAncestor.Lock() + mock.calls.GetOrphansBackToNonOrphanAncestor = append(mock.calls.GetOrphansBackToNonOrphanAncestor, callInfo) + mock.lockGetOrphansBackToNonOrphanAncestor.Unlock() + return mock.GetOrphansBackToNonOrphanAncestorFunc(ctx, hash) } -// GetOrphanedChainUpFromHashCalls gets all the calls that were made to GetOrphanedChainUpFromHash. +// GetOrphansBackToNonOrphanAncestorCalls gets all the calls that were made to GetOrphansBackToNonOrphanAncestor. // Check the length with: // -// len(mockedBlocktxStore.GetOrphanedChainUpFromHashCalls()) -func (mock *BlocktxStoreMock) GetOrphanedChainUpFromHashCalls() []struct { +// len(mockedBlocktxStore.GetOrphansBackToNonOrphanAncestorCalls()) +func (mock *BlocktxStoreMock) GetOrphansBackToNonOrphanAncestorCalls() []struct { Ctx context.Context Hash []byte } { @@ -736,9 +724,9 @@ func (mock *BlocktxStoreMock) GetOrphanedChainUpFromHashCalls() []struct { Ctx context.Context Hash []byte } - mock.lockGetOrphanedChainUpFromHash.RLock() - calls = mock.calls.GetOrphanedChainUpFromHash - mock.lockGetOrphanedChainUpFromHash.RUnlock() + mock.lockGetOrphansBackToNonOrphanAncestor.RLock() + calls = mock.calls.GetOrphansBackToNonOrphanAncestor + mock.lockGetOrphansBackToNonOrphanAncestor.RUnlock() return calls } @@ -966,38 +954,6 @@ func (mock *BlocktxStoreMock) SetBlockProcessingCalls() []struct { return calls } -// StartUnitOfWork calls StartUnitOfWorkFunc. -func (mock *BlocktxStoreMock) StartUnitOfWork(ctx context.Context) (store.UnitOfWork, error) { - if mock.StartUnitOfWorkFunc == nil { - panic("BlocktxStoreMock.StartUnitOfWorkFunc: method is nil but BlocktxStore.StartUnitOfWork was just called") - } - callInfo := struct { - Ctx context.Context - }{ - Ctx: ctx, - } - mock.lockStartUnitOfWork.Lock() - mock.calls.StartUnitOfWork = append(mock.calls.StartUnitOfWork, callInfo) - mock.lockStartUnitOfWork.Unlock() - return mock.StartUnitOfWorkFunc(ctx) -} - -// StartUnitOfWorkCalls gets all the calls that were made to StartUnitOfWork. -// Check the length with: -// -// len(mockedBlocktxStore.StartUnitOfWorkCalls()) -func (mock *BlocktxStoreMock) StartUnitOfWorkCalls() []struct { - Ctx context.Context -} { - var calls []struct { - Ctx context.Context - } - mock.lockStartUnitOfWork.RLock() - calls = mock.calls.StartUnitOfWork - mock.lockStartUnitOfWork.RUnlock() - return calls -} - // UpdateBlocksStatuses calls UpdateBlocksStatusesFunc. func (mock *BlocktxStoreMock) UpdateBlocksStatuses(ctx context.Context, blockStatusUpdates []store.BlockStatusUpdate) error { if mock.UpdateBlocksStatusesFunc == nil { diff --git a/internal/blocktx/store/postgresql/fixtures/get_orphaned_chain/blocktx.blocks.yaml b/internal/blocktx/store/postgresql/fixtures/get_orphaned_chain/blocktx.blocks.yaml index b8f7b9e32..dd193b81d 100644 --- a/internal/blocktx/store/postgresql/fixtures/get_orphaned_chain/blocktx.blocks.yaml +++ b/internal/blocktx/store/postgresql/fixtures/get_orphaned_chain/blocktx.blocks.yaml @@ -46,12 +46,49 @@ status: 30 # ORPHANED chainwork: '123456' is_longest: false -- inserted_at: 2023-12-15 14:50:00 +- inserted_at: 2023-12-15 14:40:00 id: 4 + hash: 0x00000000000000000364332e1bbd61dc928141b9469c5daea26a4b506efc9656 + prevhash: 0x0000000000000000082ec88d757ddaeb0aa87a5d5408b5960f27e7e67312dfe1 + merkleroot: 0x4b58b0402a84012269b124f78c91a78a814eb3c9caa03f1df1d33172b23082d1 + height: 822019 + processed_at: 2023-12-15 14:40:00 + size: 299650000 + tx_count: 62162 + status: 30 # ORPHANED + chainwork: '123456' + is_longest: false +- inserted_at: 2023-12-15 14:50:00 + id: 5 hash: 0x000000000000000004bf3e68405b31650559ff28d38a42b5e4f1440a865611ca prevhash: 0x00000000000000000364332e1bbd61dc928141b9469c5daea26a4b506efc9656 merkleroot: 0xc458aa382364e216c9c0533175ec8579a544c750ca181b18296e784d1dc53085 - height: 822020 # Another gap + height: 822020 + size: 8630000 + tx_count: 36724 + status: 30 # ORPHANED + chainwork: '123456' + is_longest: false + +# GAP + +- inserted_at: 2023-12-15 14:50:00 + id: 6 + hash: 0x0000000000000000059d6add76e3ddb8ec4f5ffd6efecd4c8b8c577bd32aed6c + prevhash: 0x0000000000000000094510c50011a891b74ef054d6cac0a5ae8bd60f02c85f1d + merkleroot: 0xda71199f8ed9203d8a765595e6c030a22e5ed8330b1abb467a82c97d7d21d512 + height: 822022 + size: 8630000 + tx_count: 36724 + status: 30 # ORPHANED + chainwork: '123456' + is_longest: false +- inserted_at: 2023-12-15 14:50:00 + id: 7 + hash: 0x0000000000000000082131979a4e25a5101912a5f8461e18f306d23e158161cd + prevhash: 0x0000000000000000059d6add76e3ddb8ec4f5ffd6efecd4c8b8c577bd32aed6c + merkleroot: 0xda71199f8ed9203d8a765595e6c030a22e5ed8330b1abb467a82c97d7d21d512 + height: 822023 size: 8630000 tx_count: 36724 status: 30 # ORPHANED diff --git a/internal/blocktx/store/postgresql/get_block.go b/internal/blocktx/store/postgresql/get_block.go index b8852d460..55b9646f3 100644 --- a/internal/blocktx/store/postgresql/get_block.go +++ b/internal/blocktx/store/postgresql/get_block.go @@ -16,7 +16,7 @@ func (p *PostgreSQL) GetBlock(ctx context.Context, hash *chainhash.Hash) (*block return p.queryBlockByPredicate(ctx, predicate, hash[:]) } -func (p *PostgreSQL) GetBlockByHeight(ctx context.Context, height uint64) (*blocktx_api.Block, error) { +func (p *PostgreSQL) GetLongestBlockByHeight(ctx context.Context, height uint64) (*blocktx_api.Block, error) { predicate := "WHERE height = $1 AND is_longest = true" return p.queryBlockByPredicate(ctx, predicate, height) diff --git a/internal/blocktx/store/postgresql/get_orphaned_chain.go b/internal/blocktx/store/postgresql/get_orphaned_chain.go index 14ce171b3..441034a43 100644 --- a/internal/blocktx/store/postgresql/get_orphaned_chain.go +++ b/internal/blocktx/store/postgresql/get_orphaned_chain.go @@ -4,15 +4,17 @@ import ( "context" "github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api" + "github.com/libsv/go-p2p/chaincfg/chainhash" ) -// GetOrphanedChainUpFromHash is a function that recursively searches for blocks marked -// as ORPHANED from the given hash - up to the tip of orphaned chain of blocks. +// GetOrphansBackToNonOrphanAncestor recursively searches for blocks marked +// as ORPHANED from the given hash - back to the first ORPHANED block. Then, it +// tries to get the first non-orphaned ancestor of that orphan chain. // -// It searches for the block whose prevhash matches the hash of the given block, +// It searches for the block whose hash matches the prevhash of the given block, // and then repeats that recursively for each newly found orphaned block until // it has the entire orphaned chain. -func (p *PostgreSQL) GetOrphanedChainUpFromHash(ctx context.Context, hash []byte) ([]*blocktx_api.Block, error) { +func (p *PostgreSQL) GetOrphansBackToNonOrphanAncestor(ctx context.Context, hash []byte) (orphans []*blocktx_api.Block, nonOrphanAncestor *blocktx_api.Block, err error) { // The way this query works, is that the result from the first SELECT // will be stored in the `orphans` variable, which is later used // for recursion in the second SELECT. @@ -31,7 +33,7 @@ func (p *PostgreSQL) GetOrphanedChainUpFromHash(ctx context.Context, hash []byte ,processed_at ,status ,chainwork - FROM blocktx.blocks WHERE prevhash = $1 AND status = $2 + FROM blocktx.blocks WHERE hash = $1 AND status = $2 UNION ALL SELECT b.hash @@ -41,7 +43,7 @@ func (p *PostgreSQL) GetOrphanedChainUpFromHash(ctx context.Context, hash []byte ,b.processed_at ,b.status ,b.chainwork - FROM blocktx.blocks b JOIN orphans o ON b.prevhash = o.hash AND b.status = $2 + FROM blocktx.blocks b JOIN orphans o ON o.prevhash = b.hash AND b.status = $2 ) SELECT hash @@ -52,13 +54,33 @@ func (p *PostgreSQL) GetOrphanedChainUpFromHash(ctx context.Context, hash []byte ,status ,chainwork FROM orphans + ORDER BY height ` rows, err := p.db.QueryContext(ctx, q, hash, blocktx_api.Status_ORPHANED) if err != nil { - return nil, err + return } defer rows.Close() - return p.parseBlocks(rows) + orphans, err = p.parseBlocks(rows) + if err != nil { + return + } + + // first element in orphans + // will be the given block + if len(orphans) < 2 { + return + } + + // try to get first non-orphan ancestor + nonOrphanHash, err := chainhash.NewHash(orphans[0].PreviousHash) + if err != nil { + return + } + + nonOrphanAncestor, _ = p.GetBlock(ctx, nonOrphanHash) + + return } diff --git a/internal/blocktx/store/postgresql/get_stale_chain.go b/internal/blocktx/store/postgresql/get_stale_chain.go index 71c96aa63..66aadb75b 100644 --- a/internal/blocktx/store/postgresql/get_stale_chain.go +++ b/internal/blocktx/store/postgresql/get_stale_chain.go @@ -58,6 +58,7 @@ func (p *PostgreSQL) GetStaleChainBackFromHash(ctx context.Context, hash []byte) ,status ,chainwork FROM prevBlocks + ORDER BY height ` rows, err := p.db.QueryContext(ctx, q, hash, blocktx_api.Status_STALE) diff --git a/internal/blocktx/store/postgresql/postgres.go b/internal/blocktx/store/postgresql/postgres.go index e73d94b2a..f3b6ec321 100644 --- a/internal/blocktx/store/postgresql/postgres.go +++ b/internal/blocktx/store/postgresql/postgres.go @@ -20,24 +20,12 @@ const ( maxPostgresBulkInsertRows = 8192 ) -type QueryAble interface { - ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) - Prepare(query string) (*sql.Stmt, error) - Query(query string, args ...interface{}) (*sql.Rows, error) - QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) - QueryRow(query string, args ...interface{}) *sql.Row - QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row -} - type PostgreSQL struct { - _db *sql.DB - _tx *sql.Tx - db QueryAble // this would be pointing either to _db or _tx + db *sql.DB now func() time.Time maxPostgresBulkInsertRows int tracingEnabled bool tracingAttributes []attribute.KeyValue - dbInfo string } func WithNow(nowFunc func() time.Time) func(*PostgreSQL) { @@ -72,11 +60,9 @@ func New(dbInfo string, idleConns int, maxOpenConns int, opts ...func(postgreSQL db.SetMaxOpenConns(maxOpenConns) p := &PostgreSQL{ - _db: db, db: db, now: time.Now, maxPostgresBulkInsertRows: maxPostgresBulkInsertRows, - dbInfo: dbInfo, } for _, opt := range opts { @@ -87,65 +73,14 @@ func New(dbInfo string, idleConns int, maxOpenConns int, opts ...func(postgreSQL } func (p *PostgreSQL) Close() error { - return p._db.Close() + return p.db.Close() } func (p *PostgreSQL) Ping(ctx context.Context) error { - r, err := p._db.QueryContext(ctx, "SELECT 1;") + r, err := p.db.QueryContext(ctx, "SELECT 1;") if err != nil { return err } return r.Close() } - -func (p *PostgreSQL) StartUnitOfWork(ctx context.Context) (store.UnitOfWork, error) { - tx, err := p._db.BeginTx(ctx, nil) - if err != nil { - return nil, err - } - - // This will create a clone of the store and use the transaction created - // above to avoid messing with the state of the main singleton store - cloneStore := &PostgreSQL{ - _tx: tx, - db: tx, - now: time.Now, - maxPostgresBulkInsertRows: maxPostgresBulkInsertRows, - tracingEnabled: p.tracingEnabled, - tracingAttributes: p.tracingAttributes, - } - - return cloneStore, nil -} - -// UnitOfWork methods below -func (p *PostgreSQL) Commit() error { - if p._tx == nil { - return ErrNoTransaction - } - return p._tx.Commit() -} - -func (p *PostgreSQL) Rollback() error { - if p._tx == nil { - return ErrNoTransaction - } - return p._tx.Rollback() -} - -func (p *PostgreSQL) WriteLockBlocksTable(ctx context.Context) error { - if p._tx == nil { - return ErrNoTransaction - } - - // This will lock `blocks` table for writing, when performing reorg. - // Any INSERT or UPDATE to the table will wait until the lock is released. - // Another instance wanting to acquire this lock at the same time will have - // to wait until the transaction holding the lock is completed and the lock - // is released. - // - // Reading from the table is still allowed. - _, err := p._tx.ExecContext(ctx, "LOCK TABLE blocktx.blocks IN EXCLUSIVE MODE") - return err -} diff --git a/internal/blocktx/store/postgresql/postgres_test.go b/internal/blocktx/store/postgresql/postgres_test.go index d8bd66ff2..80751cb96 100644 --- a/internal/blocktx/store/postgresql/postgres_test.go +++ b/internal/blocktx/store/postgresql/postgres_test.go @@ -92,14 +92,14 @@ func testmain(m *testing.M) int { func prepareDb(t *testing.T, postgres *PostgreSQL, fixture string) { t.Helper() - testutils.PruneTables(t, postgres._db, + testutils.PruneTables(t, postgres.db, "blocktx.blocks", "blocktx.transactions", "blocktx.block_transactions_map", ) if fixture != "" { - testutils.LoadFixtures(t, postgres._db, fixture) + testutils.LoadFixtures(t, postgres.db, fixture) } } @@ -200,11 +200,11 @@ func TestPostgresDB(t *testing.T) { hashAtTip := testutils.RevChainhash(t, "76404890880cb36ce68100abb05b3a958e17c0ed274d5c0a0000000000000000") // when -> then - actualBlock, err := postgresDB.GetBlockByHeight(context.Background(), height) + actualBlock, err := postgresDB.GetLongestBlockByHeight(context.Background(), height) require.NoError(t, err) require.Equal(t, expectedHashAtHeightLongest[:], actualBlock.Hash) - actualBlock, err = postgresDB.GetBlockByHeight(context.Background(), heightNotFound) + actualBlock, err = postgresDB.GetLongestBlockByHeight(context.Background(), heightNotFound) require.Nil(t, actualBlock) require.Equal(t, store.ErrBlockNotFound, err) @@ -290,9 +290,9 @@ func TestPostgresDB(t *testing.T) { hash4Stale := testutils.RevChainhash(t, "000000000000000004bf3e68405b31650559ff28d38a42b5e4f1440a865611ca") expectedStaleHashes := [][]byte{ - hash4Stale[:], - hash3Stale[:], hash2Stale[:], + hash3Stale[:], + hash4Stale[:], } // when @@ -306,23 +306,33 @@ func TestPostgresDB(t *testing.T) { } }) - t.Run("get orphaned chain up from hash", func(t *testing.T) { + t.Run("get orphans back to non-orphaned ancestor", func(t *testing.T) { // given prepareDb(t, postgresDB, "fixtures/get_orphaned_chain") - hashGapFiller := testutils.RevChainhash(t, "0000000000000000025855b62f4c2e3732dad363a6f2ead94e4657ef96877067") + newHash := testutils.RevChainhash(t, "00000000000000000364332e1bbd61dc928141b9469c5daea26a4b506efc9656") hash2Orphaned := testutils.RevChainhash(t, "000000000000000003b15d668b54c4b91ae81a86298ee209d9f39fd7a769bcde") hash3Orphaned := testutils.RevChainhash(t, "00000000000000000659df0d3cf98ebe46931b67117502168418f9dce4e1b4c9") hash4Orphaned := testutils.RevChainhash(t, "0000000000000000082ec88d757ddaeb0aa87a5d5408b5960f27e7e67312dfe1") + noAncestorHash := testutils.RevChainhash(t, "0000000000000000082131979a4e25a5101912a5f8461e18f306d23e158161cd") + hash6Orphaned := testutils.RevChainhash(t, "0000000000000000059d6add76e3ddb8ec4f5ffd6efecd4c8b8c577bd32aed6c") + expectedOrphanedHashes := [][]byte{ hash2Orphaned[:], hash3Orphaned[:], hash4Orphaned[:], + newHash[:], + } + expectedAncestorHash := testutils.RevChainhash(t, "0000000000000000025855b62f4c2e3732dad363a6f2ead94e4657ef96877067") + + expectedNoAncestorOrphanedHashes := [][]byte{ + noAncestorHash[:], + hash6Orphaned[:], } // when - actualOrphanedBlocks, err := postgresDB.GetOrphanedChainUpFromHash(ctx, hashGapFiller[:]) + actualOrphanedBlocks, actualAncestor, err := postgresDB.GetOrphansBackToNonOrphanAncestor(ctx, newHash[:]) require.NoError(t, err) // then @@ -330,6 +340,15 @@ func TestPostgresDB(t *testing.T) { for i, b := range actualOrphanedBlocks { require.Equal(t, expectedOrphanedHashes[i], b.Hash) } + require.Equal(t, expectedAncestorHash[:], actualAncestor.Hash) + + // when + actualOrphanedBlocks, actualAncestor, err = postgresDB.GetOrphansBackToNonOrphanAncestor(ctx, noAncestorHash[:]) + require.NoError(t, err) + + // then + require.Equal(t, len(expectedNoAncestorOrphanedHashes), len(actualOrphanedBlocks)) + require.Nil(t, actualAncestor) }) t.Run("update blocks statuses", func(t *testing.T) { @@ -651,24 +670,6 @@ func TestPostgresDB(t *testing.T) { // then assert.Equal(t, expectedUnverifiedBlockHeights, res.UnverifiedBlockHeights) }) - - t.Run("lock blocks table", func(t *testing.T) { - err := postgresDB.WriteLockBlocksTable(context.Background()) - require.Error(t, err) - require.Equal(t, ErrNoTransaction, err) - - uow, err := postgresDB.StartUnitOfWork(context.Background()) - require.NoError(t, err) - - err = uow.WriteLockBlocksTable(context.Background()) - require.NoError(t, err) - - err = uow.Rollback() - require.NoError(t, err) - - err = uow.Commit() - require.Equal(t, ErrNoTransaction, err) - }) } func TestPostgresStore_UpsertBlockTransactions(t *testing.T) { diff --git a/internal/blocktx/store/postgresql/update_block_statuses.go b/internal/blocktx/store/postgresql/update_block_statuses.go index 3f5a56e7c..249fe4295 100644 --- a/internal/blocktx/store/postgresql/update_block_statuses.go +++ b/internal/blocktx/store/postgresql/update_block_statuses.go @@ -13,7 +13,10 @@ func (p *PostgreSQL) UpdateBlocksStatuses(ctx context.Context, blockStatusUpdate q := ` UPDATE blocktx.blocks b SET status = updates.status, is_longest = updates.is_longest - FROM (SELECT * FROM UNNEST($1::BYTEA[], $2::INTEGER[], $3::BOOLEAN[]) AS u(hash, status, is_longest)) AS updates + FROM ( + SELECT * FROM UNNEST($1::BYTEA[], $2::INTEGER[], $3::BOOLEAN[]) AS u(hash, status, is_longest) + WHERE is_longest = $4 + ) AS updates WHERE b.hash = updates.hash ` @@ -27,7 +30,27 @@ func (p *PostgreSQL) UpdateBlocksStatuses(ctx context.Context, blockStatusUpdate isLongest[i] = update.Status == blocktx_api.Status_LONGEST } - _, err := p.db.ExecContext(ctx, q, pq.Array(blockHashes), pq.Array(statuses), pq.Array(isLongest)) + tx, err := p.db.Begin() + if err != nil { + return errors.Join(store.ErrFailedToUpdateBlockStatuses, err) + } + defer func() { + _ = tx.Rollback() + }() + + // first update blocks that are changing statuses to non-LONGEST + _, err = tx.ExecContext(ctx, q, pq.Array(blockHashes), pq.Array(statuses), pq.Array(isLongest), false) + if err != nil { + return errors.Join(store.ErrFailedToUpdateBlockStatuses, err) + } + + // then update blocks that are changing statuses to LONGEST + _, err = tx.ExecContext(ctx, q, pq.Array(blockHashes), pq.Array(statuses), pq.Array(isLongest), true) + if err != nil { + return errors.Join(store.ErrFailedToUpdateBlockStatuses, err) + } + + err = tx.Commit() if err != nil { return errors.Join(store.ErrFailedToUpdateBlockStatuses, err) } diff --git a/internal/blocktx/store/store.go b/internal/blocktx/store/store.go index cefc3424e..116b6b043 100644 --- a/internal/blocktx/store/store.go +++ b/internal/blocktx/store/store.go @@ -27,7 +27,7 @@ var ( type BlocktxStore interface { RegisterTransactions(ctx context.Context, txHashes [][]byte) (updatedTxs []*chainhash.Hash, err error) GetBlock(ctx context.Context, hash *chainhash.Hash) (*blocktx_api.Block, error) - GetBlockByHeight(ctx context.Context, height uint64) (*blocktx_api.Block, error) + GetLongestBlockByHeight(ctx context.Context, height uint64) (*blocktx_api.Block, error) GetChainTip(ctx context.Context) (*blocktx_api.Block, error) UpsertBlock(ctx context.Context, block *blocktx_api.Block) (uint64, error) UpsertBlockTransactions(ctx context.Context, blockID uint64, txsWithMerklePaths []TxWithMerklePath) error @@ -37,7 +37,7 @@ type BlocktxStore interface { GetMinedTransactions(ctx context.Context, hashes [][]byte, onlyLongestChain bool) ([]TransactionBlock, error) GetLongestChainFromHeight(ctx context.Context, height uint64) ([]*blocktx_api.Block, error) GetStaleChainBackFromHash(ctx context.Context, hash []byte) ([]*blocktx_api.Block, error) - GetOrphanedChainUpFromHash(ctx context.Context, hash []byte) ([]*blocktx_api.Block, error) + GetOrphansBackToNonOrphanAncestor(ctx context.Context, hash []byte) (orphans []*blocktx_api.Block, nonOrphanAncestor *blocktx_api.Block, err error) GetRegisteredTxsByBlockHashes(ctx context.Context, blockHashes [][]byte) ([]TransactionBlock, error) UpdateBlocksStatuses(ctx context.Context, blockStatusUpdates []BlockStatusUpdate) error @@ -48,13 +48,4 @@ type BlocktxStore interface { Ping(ctx context.Context) error Close() error - - StartUnitOfWork(ctx context.Context) (UnitOfWork, error) -} - -type UnitOfWork interface { - BlocktxStore - Commit() error - Rollback() error - WriteLockBlocksTable(ctx context.Context) error } diff --git a/internal/blocktx/store/store_mocks.go b/internal/blocktx/store/store_mocks.go index 746a61325..a82b9ba66 100644 --- a/internal/blocktx/store/store_mocks.go +++ b/internal/blocktx/store/store_mocks.go @@ -1,4 +1,3 @@ package store //go:generate moq -pkg mocks -out ./mocks/blocktx_store_mock.go . BlocktxStore -//go:generate moq -pkg mocks -out ./mocks/blocktx_db_tx_mock.go . UnitOfWork