diff --git a/internal/blocktx/integration_test/helpers.go b/internal/blocktx/integration_test/helpers.go index 7792d7d14..c151f09af 100644 --- a/internal/blocktx/integration_test/helpers.go +++ b/internal/blocktx/integration_test/helpers.go @@ -2,48 +2,11 @@ package integrationtest import ( "database/sql" - "encoding/hex" - "fmt" - "log" "testing" - "github.com/go-testfixtures/testfixtures/v3" - "github.com/libsv/go-p2p/chaincfg/chainhash" - "github.com/stretchr/testify/require" + testutils "github.com/bitcoin-sv/arc/internal/test_utils" ) -func revChainhash(t *testing.T, hashString string) *chainhash.Hash { - hash, err := hex.DecodeString(hashString) - require.NoError(t, err) - txHash, err := chainhash.NewHash(hash) - require.NoError(t, err) - - return txHash -} - -func loadFixtures(db *sql.DB, path string) error { - fixtures, err := testfixtures.New( - testfixtures.Database(db), - testfixtures.Dialect("postgresql"), - testfixtures.Directory(path), // The directory containing the YAML files - ) - if err != nil { - log.Fatalf("failed to create fixtures: %v", err) - } - - err = fixtures.Load() - if err != nil { - return fmt.Errorf("failed to load fixtures: %v", err) - } - - return nil -} - -func pruneTables(db *sql.DB) error { - _, err := db.Exec("TRUNCATE TABLE blocktx.blocks;") - if err != nil { - return err - } - - return nil +func pruneTables(t *testing.T, db *sql.DB) { + testutils.PruneTables(t, db, "blocktx.blocks") } diff --git a/internal/blocktx/integration_test/reorg_integration_test.go b/internal/blocktx/integration_test/reorg_integration_test.go index ed2cd22ea..03c8030aa 100644 --- a/internal/blocktx/integration_test/reorg_integration_test.go +++ b/internal/blocktx/integration_test/reorg_integration_test.go @@ -19,8 +19,6 @@ package integrationtest import ( "context" "database/sql" - "errors" - "fmt" "log" "log/slog" "os" @@ -30,15 +28,13 @@ import ( "github.com/bitcoin-sv/arc/internal/blocktx" "github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api" "github.com/bitcoin-sv/arc/internal/blocktx/store/postgresql" - "github.com/golang-migrate/migrate/v4" - migratepostgres "github.com/golang-migrate/migrate/v4/database/postgres" + testutils "github.com/bitcoin-sv/arc/internal/test_utils" _ "github.com/golang-migrate/migrate/v4/source/file" _ "github.com/lib/pq" "github.com/libsv/go-p2p" "github.com/libsv/go-p2p/chaincfg/chainhash" "github.com/libsv/go-p2p/wire" "github.com/ory/dockertest/v3" - "github.com/ory/dockertest/v3/docker" "github.com/stretchr/testify/require" ) @@ -56,80 +52,38 @@ var ( ) func TestMain(m *testing.M) { + os.Exit(testmain(m)) +} + +func testmain(m *testing.M) int { pool, err := dockertest.NewPool("") if err != nil { log.Fatalf("failed to create pool: %v", err) + return 1 } - port := "5436" - opts := dockertest.RunOptions{ - Repository: "postgres", - Tag: "15.4", - Env: []string{ - fmt.Sprintf("POSTGRES_PASSWORD=%s", dbPassword), - fmt.Sprintf("POSTGRES_USER=%s", dbUsername), - fmt.Sprintf("POSTGRES_DB=%s", dbName), - "listen_addresses = '*'", - }, - ExposedPorts: []string{"5432"}, - PortBindings: map[docker.Port][]docker.PortBinding{ - postgresPort: { - {HostIP: "0.0.0.0", HostPort: port}, - }, - }, - } - - resource, err := pool.RunWithOptions(&opts, func(config *docker.HostConfig) { - config.AutoRemove = true - config.RestartPolicy = docker.RestartPolicy{ - Name: "no", - } - config.Tmpfs = map[string]string{ - "/var/lib/postgresql/data": "", - } - }) + port := "5437" + resource, connStr, err := testutils.RunAndMigratePostgresql(pool, port, "blocktx", migrationsPath) if err != nil { - log.Fatalf("failed to create resource: %v", err) + log.Print(err) + return 1 } + defer func() { + err = pool.Purge(resource) + if err != nil { + log.Fatalf("failed to purge pool: %v", err) + } + }() - hostPort := resource.GetPort("5432/tcp") + dbInfo = connStr - dbInfo = fmt.Sprintf("host=localhost port=%s user=%s password=%s dbname=%s sslmode=disable", hostPort, dbUsername, dbPassword, dbName) dbConn, err = sql.Open("postgres", dbInfo) if err != nil { log.Fatalf("failed to create db connection: %v", err) - } - err = pool.Retry(func() error { - return dbConn.Ping() - }) - if err != nil { - log.Fatalf("failed to connect to docker: %s", err) - } - - driver, err := migratepostgres.WithInstance(dbConn, &migratepostgres.Config{ - MigrationsTable: "metamorph", - }) - if err != nil { - log.Fatalf("failed to create driver: %v", err) - } - - migrations, err := migrate.NewWithDatabaseInstance(migrationsPath, "postgres", driver) - if err != nil { - log.Fatalf("failed to initialize migrate instance: %v", err) - } - err = migrations.Up() - if err != nil && !errors.Is(err, migrate.ErrNoChange) { - log.Fatalf("failed to initialize migrate instance: %v", err) - } - - code := m.Run() - - err = pool.Purge(resource) - if err != nil { - log.Fatalf("failed to purge pool: %v", err) + return 1 } - os.Exit(code) + return m.Run() } func TestBlockStatus(t *testing.T) { @@ -137,7 +91,7 @@ func TestBlockStatus(t *testing.T) { t.Skip("skipping integration test") } - defer require.NoError(t, pruneTables(dbConn)) + defer pruneTables(t, dbConn) logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) @@ -154,7 +108,7 @@ func TestBlockStatus(t *testing.T) { processor.StartBlockProcessing() // test for empty database edge case before inserting fixtures - prevBlockHash := revChainhash(t, "00000000000000000a00c377b260a3219b0c314763f486bc363df7aa7e22ad72") + prevBlockHash := testutils.RevChainhash(t, "00000000000000000a00c377b260a3219b0c314763f486bc363df7aa7e22ad72") txHash, err := chainhash.NewHashFromStr("be181e91217d5f802f695e52144078f8dfbe51b8a815c3d6fb48c0d853ec683b") require.NoError(t, err) merkleRoot, err := chainhash.NewHashFromStr("be181e91217d5f802f695e52144078f8dfbe51b8a815c3d6fb48c0d853ec683b") @@ -177,17 +131,17 @@ func TestBlockStatus(t *testing.T) { // Allow DB to process the block time.Sleep(200 * time.Millisecond) - blockHash := blockMessage.Header.BlockHash() + blockHashZero := blockMessage.Header.BlockHash() - block, err := blocktxStore.GetBlock(context.Background(), &blockHash) + block, err := blocktxStore.GetBlock(context.Background(), &blockHashZero) require.NoError(t, err) require.Equal(t, uint64(822011), block.Height) require.Equal(t, blocktx_api.Status_LONGEST, block.Status) // only load fixtures at this point - require.NoError(t, loadFixtures(dbConn, "fixtures")) + testutils.LoadFixtures(t, dbConn, "fixtures") - prevBlockHash = revChainhash(t, "f97e20396f02ab990ed31b9aec70c240f48b7e5ea239aa050000000000000000") + prevBlockHash = testutils.RevChainhash(t, "f97e20396f02ab990ed31b9aec70c240f48b7e5ea239aa050000000000000000") txHash, err = chainhash.NewHashFromStr("be181e91217d5f802f695e52144078f8dfbe51b8a815c3d6fb48c0d853ec683b") require.NoError(t, err) merkleRoot, err = chainhash.NewHashFromStr("be181e91217d5f802f695e52144078f8dfbe51b8a815c3d6fb48c0d853ec683b") @@ -210,18 +164,19 @@ func TestBlockStatus(t *testing.T) { // Allow DB to process the block time.Sleep(200 * time.Millisecond) - blockHash = blockMessage.Header.BlockHash() + blockHashStale := blockMessage.Header.BlockHash() - block, err = blocktxStore.GetBlock(context.Background(), &blockHash) + block, err = blocktxStore.GetBlock(context.Background(), &blockHashStale) require.NoError(t, err) require.Equal(t, uint64(822015), block.Height) require.Equal(t, blocktx_api.Status_STALE, block.Status) // should become LONGEST + // reorg should happen blockMessage = &p2p.BlockMessage{ Header: &wire.BlockHeader{ Version: 541065216, - PrevBlock: blockHash, // block with status STALE at height 822015 + PrevBlock: blockHashStale, // block with status STALE at height 822015 MerkleRoot: *merkleRoot, Bits: 0x1a05db8b, // chainwork: "12301577519373468" higher than the competing block }, @@ -231,13 +186,43 @@ func TestBlockStatus(t *testing.T) { err = peerHandler.HandleBlock(blockMessage, nil) require.NoError(t, err) - // Allow DB to process the block - time.Sleep(200 * time.Millisecond) + // Allow DB to process the block and perform reorg + time.Sleep(1 * time.Second) - blockHash = blockMessage.Header.BlockHash() + // verify that reorg happened + blockHashLongest := blockMessage.Header.BlockHash() - block, err = blocktxStore.GetBlock(context.Background(), &blockHash) + block, err = blocktxStore.GetBlock(context.Background(), &blockHashLongest) require.NoError(t, err) require.Equal(t, uint64(822016), block.Height) require.Equal(t, blocktx_api.Status_LONGEST, block.Status) + + block, err = blocktxStore.GetBlock(context.Background(), &blockHashStale) + require.NoError(t, err) + require.Equal(t, uint64(822015), block.Height) + require.Equal(t, blocktx_api.Status_LONGEST, block.Status) + + previouslyLongestBlockHash := testutils.RevChainhash(t, "c9b4e1e4dcf9188416027511671b9346be8ef93c0ddf59060000000000000000") + block, err = blocktxStore.GetBlock(context.Background(), previouslyLongestBlockHash) + require.NoError(t, err) + require.Equal(t, uint64(822015), block.Height) + require.Equal(t, blocktx_api.Status_STALE, block.Status) + + previouslyLongestBlockHash = testutils.RevChainhash(t, "e1df1273e6e7270f96b508545d7aa80aebda7d758dc82e080000000000000000") + block, err = blocktxStore.GetBlock(context.Background(), previouslyLongestBlockHash) + require.NoError(t, err) + require.Equal(t, uint64(822016), block.Height) + require.Equal(t, blocktx_api.Status_STALE, block.Status) + + previouslyLongestBlockHash = testutils.RevChainhash(t, "76404890880cb36ce68100abb05b3a958e17c0ed274d5c0a0000000000000000") + block, err = blocktxStore.GetBlock(context.Background(), previouslyLongestBlockHash) + require.NoError(t, err) + require.Equal(t, uint64(822017), block.Height) + require.Equal(t, blocktx_api.Status_STALE, block.Status) + + beginningOfChain := testutils.RevChainhash(t, "f97e20396f02ab990ed31b9aec70c240f48b7e5ea239aa050000000000000000") + block, err = blocktxStore.GetBlock(context.Background(), beginningOfChain) + require.NoError(t, err) + require.Equal(t, uint64(822014), block.Height) + require.Equal(t, blocktx_api.Status_LONGEST, block.Status) } diff --git a/internal/blocktx/processor.go b/internal/blocktx/processor.go index f1c018481..1de2bf2b7 100644 --- a/internal/blocktx/processor.go +++ b/internal/blocktx/processor.go @@ -373,7 +373,7 @@ func (p *Processor) fillGaps(peer p2p.PeerI) error { break } - p.logger.Info("Requesting missing block", slog.String("hash", gaps.Hash.String()), slog.Int64("height", int64(gaps.Height)), slog.String("peer", peer.String())) + p.logger.Info("Requesting missing block", slog.String("hash", gaps.Hash.String()), slog.Uint64("height", gaps.Height), slog.String("peer", peer.String())) pair := BlockRequest{ Hash: gaps.Hash, @@ -447,8 +447,15 @@ func (p *Processor) processBlock(msg *p2p.BlockMessage) error { incomingBlock.Status = blocktx_api.Status_STALE if hasGreatestChainwork { - // TODO: perform reorg - next ticket + p.logger.Info("reorg detected - updating blocks", slog.String("hash", blockHash.String()), slog.Uint64("height", incomingBlock.Height)) + incomingBlock.Status = blocktx_api.Status_LONGEST + + err := p.performReorg(ctx, incomingBlock) + if err != nil { + p.logger.Error("unable to perform reorg", slog.String("hash", blockHash.String()), slog.Uint64("height", incomingBlock.Height), slog.String("err", err.Error())) + return err + } } } @@ -556,6 +563,38 @@ func (p *Processor) hasGreatestChainwork(ctx context.Context, incomingBlock *blo return tipChainWork.Cmp(incomingBlockChainwork) < 0, nil } +func (p *Processor) performReorg(ctx context.Context, incomingBlock *blocktx_api.Block) error { + staleBlocks, err := p.store.GetStaleChainBackFromHash(ctx, incomingBlock.PreviousHash) + if err != nil { + return err + } + + lowestHeight := incomingBlock.Height + if len(staleBlocks) > 0 { + lowestHeight = getLowestHeight(staleBlocks) + } + + longestBlocks, err := p.store.GetLongestChainFromHeight(ctx, lowestHeight) + if err != nil { + return err + } + + blockStatusUpdates := make([]store.BlockStatusUpdate, 0) + + for _, b := range staleBlocks { + update := store.BlockStatusUpdate{Hash: b.Hash, Status: blocktx_api.Status_LONGEST} + blockStatusUpdates = append(blockStatusUpdates, update) + } + + for _, b := range longestBlocks { + update := store.BlockStatusUpdate{Hash: b.Hash, Status: blocktx_api.Status_STALE} + blockStatusUpdates = append(blockStatusUpdates, update) + } + + err = p.store.UpdateBlocksStatuses(ctx, blockStatusUpdates) + return err +} + func (p *Processor) markTransactionsAsMined(ctx context.Context, blockId uint64, merkleTree []*chainhash.Hash, blockHeight uint64, blockhash *chainhash.Hash) error { if tracer != nil { var span trace.Span @@ -653,7 +692,7 @@ func (p *Processor) markTransactionsAsMined(ctx context.Context, blockId uint64, } err = p.mqClient.PublishMarshal(MinedTxsTopic, txBlock) if err != nil { - p.logger.Error("failed to publish mined txs", slog.String("hash", blockhash.String()), slog.Int64("height", int64(blockHeight)), slog.String("err", err.Error())) + p.logger.Error("failed to publish mined txs", slog.String("hash", blockhash.String()), slog.Uint64("height", blockHeight), slog.String("err", err.Error())) } } diff --git a/internal/blocktx/processor_helpers.go b/internal/blocktx/processor_helpers.go index 652963549..258535bd2 100644 --- a/internal/blocktx/processor_helpers.go +++ b/internal/blocktx/processor_helpers.go @@ -58,6 +58,21 @@ func createBlock(msg *p2p.BlockMessage, prevBlock *blocktx_api.Block, longestTip } } +func getLowestHeight(blocks []*blocktx_api.Block) uint64 { + if len(blocks) == 0 { + return 0 + } + + lowest := blocks[0].Height + for _, b := range blocks { + if b.Height < lowest { + lowest = b.Height + } + } + + return lowest +} + // calculateChainwork calculates chainwork from the given difficulty bits // // This function comes from block-header-service: diff --git a/internal/blocktx/processor_helpers_test.go b/internal/blocktx/processor_helpers_test.go index 56faf0a6a..6dff06be5 100644 --- a/internal/blocktx/processor_helpers_test.go +++ b/internal/blocktx/processor_helpers_test.go @@ -1,49 +1,57 @@ package blocktx import ( - "bytes" - "encoding/hex" "fmt" "testing" + "github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api" sdkTx "github.com/bitcoin-sv/go-sdk/transaction" - "github.com/bitcoinsv/bsvutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func TestExtractHeight(t *testing.T) { - coinbase, _ := hex.DecodeString("01000000010000000000000000000000000000000000000000000000000000000000000000ffffffff570350cc0b041547b5630cfabe6d6d0000000000000000000000000000000000000000000000000000000000000000010000000000000047ed20542096bd0000000000143362663865373833636662643732306431383436000000000140be4025000000001976a914c9b0abe09b7dd8e9d1e8c1e3502d32ab0d7119e488ac00000000") - tx, err := bsvutil.NewTxFromBytes(coinbase) + tx, err := sdkTx.NewTransactionFromHex("01000000010000000000000000000000000000000000000000000000000000000000000000ffffffff570350cc0b041547b5630cfabe6d6d0000000000000000000000000000000000000000000000000000000000000000010000000000000047ed20542096bd0000000000143362663865373833636662643732306431383436000000000140be4025000000001976a914c9b0abe09b7dd8e9d1e8c1e3502d32ab0d7119e488ac00000000") require.NoError(t, err) - buff := bytes.NewBuffer(nil) - err = tx.MsgTx().Serialize(buff) - require.NoError(t, err) - btTx, err := sdkTx.NewTransactionFromBytes(buff.Bytes()) - require.NoError(t, err) - - height := extractHeightFromCoinbaseTx(btTx) + height := extractHeightFromCoinbaseTx(tx) assert.Equalf(t, uint64(773200), height, "height should be 773200, got %d", height) } func TestExtractHeightForRegtest(t *testing.T) { - coinbase, _ := hex.DecodeString("02000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0502dc070101ffffffff012f500900000000002321032efe256e14fd77eea05d0453374f8920e0a7a4a573bb3937ef3f567f3937129cac00000000") - tx, err := bsvutil.NewTxFromBytes(coinbase) - require.NoError(t, err) - - buff := bytes.NewBuffer(nil) - err = tx.MsgTx().Serialize(buff) - require.NoError(t, err) - btTx, err := sdkTx.NewTransactionFromBytes(buff.Bytes()) + tx, err := sdkTx.NewTransactionFromHex("02000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0502dc070101ffffffff012f500900000000002321032efe256e14fd77eea05d0453374f8920e0a7a4a573bb3937ef3f567f3937129cac00000000") require.NoError(t, err) - height := extractHeightFromCoinbaseTx(btTx) + height := extractHeightFromCoinbaseTx(tx) assert.Equalf(t, uint64(2012), height, "height should be 2012, got %d", height) } +func TestGetLowestHeight(t *testing.T) { + blocks := []*blocktx_api.Block{ + { + Height: 123, + }, + { + Height: 250, + }, + { + Height: 83340, + }, + { + Height: 4, + }, + { + Height: 40, + }, + } + + lowestHeight := getLowestHeight(blocks) + + require.Equal(t, uint64(4), lowestHeight) +} + func TestChainWork(t *testing.T) { testCases := []struct { height int diff --git a/internal/blocktx/processor_test.go b/internal/blocktx/processor_test.go index 810554f11..5d2fa2e31 100644 --- a/internal/blocktx/processor_test.go +++ b/internal/blocktx/processor_test.go @@ -330,6 +330,15 @@ func TestHandleBlockReorg(t *testing.T) { Chainwork: "62209952899966", }, nil }, + GetStaleChainBackFromHashFunc: func(ctx context.Context, hash []byte) ([]*blocktx_api.Block, error) { + return nil, nil + }, + GetLongestChainFromHeightFunc: func(ctx context.Context, height uint64) ([]*blocktx_api.Block, error) { + return nil, nil + }, + UpdateBlocksStatusesFunc: func(ctx context.Context, blockStatusUpdates []store.BlockStatusUpdate) error { + return nil + }, InsertBlockFunc: func(ctx context.Context, block *blocktx_api.Block) (uint64, error) { mtx.Lock() insertedBlock = block diff --git a/internal/blocktx/store/mocks/blocktx_store_mock.go b/internal/blocktx/store/mocks/blocktx_store_mock.go index 2faa8f9ef..e4ac26b9c 100644 --- a/internal/blocktx/store/mocks/blocktx_store_mock.go +++ b/internal/blocktx/store/mocks/blocktx_store_mock.go @@ -45,9 +45,15 @@ var _ store.BlocktxStore = &BlocktxStoreMock{} // GetChainTipFunc: func(ctx context.Context) (*blocktx_api.Block, error) { // panic("mock out the GetChainTip method") // }, +// GetLongestChainFromHeightFunc: func(ctx context.Context, height uint64) ([]*blocktx_api.Block, error) { +// panic("mock out the GetLongestChainFromHeight method") +// }, // GetMinedTransactionsFunc: func(ctx context.Context, hashes []*chainhash.Hash) ([]store.GetMinedTransactionResult, error) { // panic("mock out the GetMinedTransactions method") // }, +// GetStaleChainBackFromHashFunc: func(ctx context.Context, hash []byte) ([]*blocktx_api.Block, error) { +// panic("mock out the GetStaleChainBackFromHash method") +// }, // InsertBlockFunc: func(ctx context.Context, block *blocktx_api.Block) (uint64, error) { // panic("mock out the InsertBlock method") // }, @@ -63,6 +69,9 @@ var _ store.BlocktxStore = &BlocktxStoreMock{} // SetBlockProcessingFunc: func(ctx context.Context, hash *chainhash.Hash, processedBy string) (string, error) { // panic("mock out the SetBlockProcessing method") // }, +// UpdateBlocksStatusesFunc: func(ctx context.Context, blockStatusUpdates []store.BlockStatusUpdate) error { +// panic("mock out the UpdateBlocksStatuses method") +// }, // UpsertBlockTransactionsFunc: func(ctx context.Context, blockId uint64, transactions []*blocktx_api.TransactionAndSource, merklePaths []string) ([]store.UpsertBlockTransactionsResult, error) { // panic("mock out the UpsertBlockTransactions method") // }, @@ -100,9 +109,15 @@ type BlocktxStoreMock struct { // GetChainTipFunc mocks the GetChainTip method. GetChainTipFunc func(ctx context.Context) (*blocktx_api.Block, error) + // GetLongestChainFromHeightFunc mocks the GetLongestChainFromHeight method. + GetLongestChainFromHeightFunc func(ctx context.Context, height uint64) ([]*blocktx_api.Block, error) + // GetMinedTransactionsFunc mocks the GetMinedTransactions method. GetMinedTransactionsFunc func(ctx context.Context, hashes []*chainhash.Hash) ([]store.GetMinedTransactionResult, error) + // GetStaleChainBackFromHashFunc mocks the GetStaleChainBackFromHash method. + GetStaleChainBackFromHashFunc func(ctx context.Context, hash []byte) ([]*blocktx_api.Block, error) + // InsertBlockFunc mocks the InsertBlock method. InsertBlockFunc func(ctx context.Context, block *blocktx_api.Block) (uint64, error) @@ -118,6 +133,9 @@ type BlocktxStoreMock struct { // SetBlockProcessingFunc mocks the SetBlockProcessing method. SetBlockProcessingFunc func(ctx context.Context, hash *chainhash.Hash, processedBy string) (string, error) + // UpdateBlocksStatusesFunc mocks the UpdateBlocksStatuses method. + UpdateBlocksStatusesFunc func(ctx context.Context, blockStatusUpdates []store.BlockStatusUpdate) error + // UpsertBlockTransactionsFunc mocks the UpsertBlockTransactions method. UpsertBlockTransactionsFunc func(ctx context.Context, blockId uint64, transactions []*blocktx_api.TransactionAndSource, merklePaths []string) ([]store.UpsertBlockTransactionsResult, error) @@ -182,6 +200,13 @@ type BlocktxStoreMock struct { // Ctx is the ctx argument value. Ctx context.Context } + // GetLongestChainFromHeight holds details about calls to the GetLongestChainFromHeight method. + GetLongestChainFromHeight []struct { + // Ctx is the ctx argument value. + Ctx context.Context + // Height is the height argument value. + Height uint64 + } // GetMinedTransactions holds details about calls to the GetMinedTransactions method. GetMinedTransactions []struct { // Ctx is the ctx argument value. @@ -189,6 +214,13 @@ type BlocktxStoreMock struct { // Hashes is the hashes argument value. Hashes []*chainhash.Hash } + // GetStaleChainBackFromHash holds details about calls to the GetStaleChainBackFromHash method. + GetStaleChainBackFromHash []struct { + // Ctx is the ctx argument value. + Ctx context.Context + // Hash is the hash argument value. + Hash []byte + } // InsertBlock holds details about calls to the InsertBlock method. InsertBlock []struct { // Ctx is the ctx argument value. @@ -228,6 +260,13 @@ type BlocktxStoreMock struct { // ProcessedBy is the processedBy argument value. ProcessedBy string } + // UpdateBlocksStatuses holds details about calls to the UpdateBlocksStatuses method. + UpdateBlocksStatuses []struct { + // Ctx is the ctx argument value. + Ctx context.Context + // BlockStatusUpdates is the blockStatusUpdates argument value. + BlockStatusUpdates []store.BlockStatusUpdate + } // UpsertBlockTransactions holds details about calls to the UpsertBlockTransactions method. UpsertBlockTransactions []struct { // Ctx is the ctx argument value. @@ -257,12 +296,15 @@ type BlocktxStoreMock struct { lockGetBlockGaps sync.RWMutex lockGetBlockHashesProcessingInProgress sync.RWMutex lockGetChainTip sync.RWMutex + lockGetLongestChainFromHeight sync.RWMutex lockGetMinedTransactions sync.RWMutex + lockGetStaleChainBackFromHash sync.RWMutex lockInsertBlock sync.RWMutex lockMarkBlockAsDone sync.RWMutex lockPing sync.RWMutex lockRegisterTransactions sync.RWMutex lockSetBlockProcessing sync.RWMutex + lockUpdateBlocksStatuses sync.RWMutex lockUpsertBlockTransactions sync.RWMutex lockVerifyMerkleRoots sync.RWMutex } @@ -554,6 +596,42 @@ func (mock *BlocktxStoreMock) GetChainTipCalls() []struct { return calls } +// GetLongestChainFromHeight calls GetLongestChainFromHeightFunc. +func (mock *BlocktxStoreMock) GetLongestChainFromHeight(ctx context.Context, height uint64) ([]*blocktx_api.Block, error) { + if mock.GetLongestChainFromHeightFunc == nil { + panic("BlocktxStoreMock.GetLongestChainFromHeightFunc: method is nil but BlocktxStore.GetLongestChainFromHeight was just called") + } + callInfo := struct { + Ctx context.Context + Height uint64 + }{ + Ctx: ctx, + Height: height, + } + mock.lockGetLongestChainFromHeight.Lock() + mock.calls.GetLongestChainFromHeight = append(mock.calls.GetLongestChainFromHeight, callInfo) + mock.lockGetLongestChainFromHeight.Unlock() + return mock.GetLongestChainFromHeightFunc(ctx, height) +} + +// GetLongestChainFromHeightCalls gets all the calls that were made to GetLongestChainFromHeight. +// Check the length with: +// +// len(mockedBlocktxStore.GetLongestChainFromHeightCalls()) +func (mock *BlocktxStoreMock) GetLongestChainFromHeightCalls() []struct { + Ctx context.Context + Height uint64 +} { + var calls []struct { + Ctx context.Context + Height uint64 + } + mock.lockGetLongestChainFromHeight.RLock() + calls = mock.calls.GetLongestChainFromHeight + mock.lockGetLongestChainFromHeight.RUnlock() + return calls +} + // GetMinedTransactions calls GetMinedTransactionsFunc. func (mock *BlocktxStoreMock) GetMinedTransactions(ctx context.Context, hashes []*chainhash.Hash) ([]store.GetMinedTransactionResult, error) { if mock.GetMinedTransactionsFunc == nil { @@ -590,6 +668,42 @@ func (mock *BlocktxStoreMock) GetMinedTransactionsCalls() []struct { return calls } +// GetStaleChainBackFromHash calls GetStaleChainBackFromHashFunc. +func (mock *BlocktxStoreMock) GetStaleChainBackFromHash(ctx context.Context, hash []byte) ([]*blocktx_api.Block, error) { + if mock.GetStaleChainBackFromHashFunc == nil { + panic("BlocktxStoreMock.GetStaleChainBackFromHashFunc: method is nil but BlocktxStore.GetStaleChainBackFromHash was just called") + } + callInfo := struct { + Ctx context.Context + Hash []byte + }{ + Ctx: ctx, + Hash: hash, + } + mock.lockGetStaleChainBackFromHash.Lock() + mock.calls.GetStaleChainBackFromHash = append(mock.calls.GetStaleChainBackFromHash, callInfo) + mock.lockGetStaleChainBackFromHash.Unlock() + return mock.GetStaleChainBackFromHashFunc(ctx, hash) +} + +// GetStaleChainBackFromHashCalls gets all the calls that were made to GetStaleChainBackFromHash. +// Check the length with: +// +// len(mockedBlocktxStore.GetStaleChainBackFromHashCalls()) +func (mock *BlocktxStoreMock) GetStaleChainBackFromHashCalls() []struct { + Ctx context.Context + Hash []byte +} { + var calls []struct { + Ctx context.Context + Hash []byte + } + mock.lockGetStaleChainBackFromHash.RLock() + calls = mock.calls.GetStaleChainBackFromHash + mock.lockGetStaleChainBackFromHash.RUnlock() + return calls +} + // InsertBlock calls InsertBlockFunc. func (mock *BlocktxStoreMock) InsertBlock(ctx context.Context, block *blocktx_api.Block) (uint64, error) { if mock.InsertBlockFunc == nil { @@ -778,6 +892,42 @@ func (mock *BlocktxStoreMock) SetBlockProcessingCalls() []struct { return calls } +// UpdateBlocksStatuses calls UpdateBlocksStatusesFunc. +func (mock *BlocktxStoreMock) UpdateBlocksStatuses(ctx context.Context, blockStatusUpdates []store.BlockStatusUpdate) error { + if mock.UpdateBlocksStatusesFunc == nil { + panic("BlocktxStoreMock.UpdateBlocksStatusesFunc: method is nil but BlocktxStore.UpdateBlocksStatuses was just called") + } + callInfo := struct { + Ctx context.Context + BlockStatusUpdates []store.BlockStatusUpdate + }{ + Ctx: ctx, + BlockStatusUpdates: blockStatusUpdates, + } + mock.lockUpdateBlocksStatuses.Lock() + mock.calls.UpdateBlocksStatuses = append(mock.calls.UpdateBlocksStatuses, callInfo) + mock.lockUpdateBlocksStatuses.Unlock() + return mock.UpdateBlocksStatusesFunc(ctx, blockStatusUpdates) +} + +// UpdateBlocksStatusesCalls gets all the calls that were made to UpdateBlocksStatuses. +// Check the length with: +// +// len(mockedBlocktxStore.UpdateBlocksStatusesCalls()) +func (mock *BlocktxStoreMock) UpdateBlocksStatusesCalls() []struct { + Ctx context.Context + BlockStatusUpdates []store.BlockStatusUpdate +} { + var calls []struct { + Ctx context.Context + BlockStatusUpdates []store.BlockStatusUpdate + } + mock.lockUpdateBlocksStatuses.RLock() + calls = mock.calls.UpdateBlocksStatuses + mock.lockUpdateBlocksStatuses.RUnlock() + return calls +} + // UpsertBlockTransactions calls UpsertBlockTransactionsFunc. func (mock *BlocktxStoreMock) UpsertBlockTransactions(ctx context.Context, blockId uint64, transactions []*blocktx_api.TransactionAndSource, merklePaths []string) ([]store.UpsertBlockTransactionsResult, error) { if mock.UpsertBlockTransactionsFunc == nil { diff --git a/internal/blocktx/store/model.go b/internal/blocktx/store/model.go index d97fad506..8fff83d5f 100644 --- a/internal/blocktx/store/model.go +++ b/internal/blocktx/store/model.go @@ -1,6 +1,7 @@ package store import ( + "github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api" "github.com/libsv/go-p2p/chaincfg/chainhash" ) @@ -20,3 +21,8 @@ type GetMinedTransactionResult struct { BlockHeight uint64 MerklePath string } + +type BlockStatusUpdate struct { + Hash []byte + Status blocktx_api.Status +} diff --git a/internal/blocktx/store/postgresql/fixtures/get_longest_chain/blocktx.blocks.yaml b/internal/blocktx/store/postgresql/fixtures/get_longest_chain/blocktx.blocks.yaml new file mode 100644 index 000000000..ce21cb387 --- /dev/null +++ b/internal/blocktx/store/postgresql/fixtures/get_longest_chain/blocktx.blocks.yaml @@ -0,0 +1,59 @@ +- inserted_at: 2023-12-15 14:00:00 + id: 0 + hash: 0x0000000000000000025855b62f4c2e3732dad363a6f2ead94e4657ef96877067 + prevhash: 0xb71ab063c5f96cad71cdc59dcc94182a20a69cbd7eed2d070000000000000000 + merkleroot: 0x7f4019eb006f5333cce752df387fa8443035c22291eb771ee5b16a02b81c8483 + height: 822014 + processed_at: 2023-12-15 14:10:00 + size: 86840000 + tx_count: 23477 + orphanedyn: false + status: 10 # LONGEST + chainwork: '123456' +- inserted_at: 2023-12-15 14:30:00 + id: 1 + hash: 0x000000000000000003b15d668b54c4b91ae81a86298ee209d9f39fd7a769bcde + prevhash: 0x0000000000000000025855b62f4c2e3732dad363a6f2ead94e4657ef96877067 + merkleroot: 0x7382df1b717287ab87e5e3e25759697c4c45eea428f701cdd0c77ad3fc707257 + height: 822015 + processed_at: 2023-12-15 14:30:00 + size: 20160000 + tx_count: 6523 + orphanedyn: false + status: 10 # LONGEST + chainwork: '123456' +- inserted_at: 2023-12-15 14:30:00 + id: 2 + hash: 0x00000000000000000659df0d3cf98ebe46931b67117502168418f9dce4e1b4c9 + prevhash: 0x0000000000000000025855b62f4c2e3732dad363a6f2ead94e4657ef96877067 + merkleroot: 0x7382df1b717287ab87e5e3e25759697c4c45eea428f701cdd0c77ad3fc707257 + height: 822015 + processed_at: 2023-12-15 14:30:00 + size: 20160000 + tx_count: 6523 + orphanedyn: false + status: 20 # STALE - competing block + chainwork: '123456' +- inserted_at: 2023-12-15 14:40:00 + id: 3 + hash: 0x0000000000000000082ec88d757ddaeb0aa87a5d5408b5960f27e7e67312dfe1 + prevhash: 0x00000000000000000659df0d3cf98ebe46931b67117502168418f9dce4e1b4c9 + merkleroot: 0x4b58b0402a84012269b124f78c91a78a814eb3c9caa03f1df1d33172b23082d1 + height: 822016 + processed_at: 2023-12-15 14:40:00 + size: 299650000 + tx_count: 62162 + orphanedyn: false + status: 20 # STALE + chainwork: '123456' +- inserted_at: 2023-12-15 14:50:00 + id: 4 + hash: 0x000000000000000004bf3e68405b31650559ff28d38a42b5e4f1440a865611ca + prevhash: 0x0000000000000000082ec88d757ddaeb0aa87a5d5408b5960f27e7e67312dfe1 + merkleroot: 0xc458aa382364e216c9c0533175ec8579a544c750ca181b18296e784d1dc53085 + height: 822017 + size: 8630000 + tx_count: 36724 + orphanedyn: false + status: 20 # STALE + chainwork: '123456' diff --git a/internal/blocktx/store/postgresql/fixtures/get_stale_chain/blocktx.blocks.yaml b/internal/blocktx/store/postgresql/fixtures/get_stale_chain/blocktx.blocks.yaml new file mode 100644 index 000000000..ce21cb387 --- /dev/null +++ b/internal/blocktx/store/postgresql/fixtures/get_stale_chain/blocktx.blocks.yaml @@ -0,0 +1,59 @@ +- inserted_at: 2023-12-15 14:00:00 + id: 0 + hash: 0x0000000000000000025855b62f4c2e3732dad363a6f2ead94e4657ef96877067 + prevhash: 0xb71ab063c5f96cad71cdc59dcc94182a20a69cbd7eed2d070000000000000000 + merkleroot: 0x7f4019eb006f5333cce752df387fa8443035c22291eb771ee5b16a02b81c8483 + height: 822014 + processed_at: 2023-12-15 14:10:00 + size: 86840000 + tx_count: 23477 + orphanedyn: false + status: 10 # LONGEST + chainwork: '123456' +- inserted_at: 2023-12-15 14:30:00 + id: 1 + hash: 0x000000000000000003b15d668b54c4b91ae81a86298ee209d9f39fd7a769bcde + prevhash: 0x0000000000000000025855b62f4c2e3732dad363a6f2ead94e4657ef96877067 + merkleroot: 0x7382df1b717287ab87e5e3e25759697c4c45eea428f701cdd0c77ad3fc707257 + height: 822015 + processed_at: 2023-12-15 14:30:00 + size: 20160000 + tx_count: 6523 + orphanedyn: false + status: 10 # LONGEST + chainwork: '123456' +- inserted_at: 2023-12-15 14:30:00 + id: 2 + hash: 0x00000000000000000659df0d3cf98ebe46931b67117502168418f9dce4e1b4c9 + prevhash: 0x0000000000000000025855b62f4c2e3732dad363a6f2ead94e4657ef96877067 + merkleroot: 0x7382df1b717287ab87e5e3e25759697c4c45eea428f701cdd0c77ad3fc707257 + height: 822015 + processed_at: 2023-12-15 14:30:00 + size: 20160000 + tx_count: 6523 + orphanedyn: false + status: 20 # STALE - competing block + chainwork: '123456' +- inserted_at: 2023-12-15 14:40:00 + id: 3 + hash: 0x0000000000000000082ec88d757ddaeb0aa87a5d5408b5960f27e7e67312dfe1 + prevhash: 0x00000000000000000659df0d3cf98ebe46931b67117502168418f9dce4e1b4c9 + merkleroot: 0x4b58b0402a84012269b124f78c91a78a814eb3c9caa03f1df1d33172b23082d1 + height: 822016 + processed_at: 2023-12-15 14:40:00 + size: 299650000 + tx_count: 62162 + orphanedyn: false + status: 20 # STALE + chainwork: '123456' +- inserted_at: 2023-12-15 14:50:00 + id: 4 + hash: 0x000000000000000004bf3e68405b31650559ff28d38a42b5e4f1440a865611ca + prevhash: 0x0000000000000000082ec88d757ddaeb0aa87a5d5408b5960f27e7e67312dfe1 + merkleroot: 0xc458aa382364e216c9c0533175ec8579a544c750ca181b18296e784d1dc53085 + height: 822017 + size: 8630000 + tx_count: 36724 + orphanedyn: false + status: 20 # STALE + chainwork: '123456' diff --git a/internal/blocktx/store/postgresql/fixtures/update_blocks_statuses/blocktx.blocks.yaml b/internal/blocktx/store/postgresql/fixtures/update_blocks_statuses/blocktx.blocks.yaml new file mode 100644 index 000000000..ce21cb387 --- /dev/null +++ b/internal/blocktx/store/postgresql/fixtures/update_blocks_statuses/blocktx.blocks.yaml @@ -0,0 +1,59 @@ +- inserted_at: 2023-12-15 14:00:00 + id: 0 + hash: 0x0000000000000000025855b62f4c2e3732dad363a6f2ead94e4657ef96877067 + prevhash: 0xb71ab063c5f96cad71cdc59dcc94182a20a69cbd7eed2d070000000000000000 + merkleroot: 0x7f4019eb006f5333cce752df387fa8443035c22291eb771ee5b16a02b81c8483 + height: 822014 + processed_at: 2023-12-15 14:10:00 + size: 86840000 + tx_count: 23477 + orphanedyn: false + status: 10 # LONGEST + chainwork: '123456' +- inserted_at: 2023-12-15 14:30:00 + id: 1 + hash: 0x000000000000000003b15d668b54c4b91ae81a86298ee209d9f39fd7a769bcde + prevhash: 0x0000000000000000025855b62f4c2e3732dad363a6f2ead94e4657ef96877067 + merkleroot: 0x7382df1b717287ab87e5e3e25759697c4c45eea428f701cdd0c77ad3fc707257 + height: 822015 + processed_at: 2023-12-15 14:30:00 + size: 20160000 + tx_count: 6523 + orphanedyn: false + status: 10 # LONGEST + chainwork: '123456' +- inserted_at: 2023-12-15 14:30:00 + id: 2 + hash: 0x00000000000000000659df0d3cf98ebe46931b67117502168418f9dce4e1b4c9 + prevhash: 0x0000000000000000025855b62f4c2e3732dad363a6f2ead94e4657ef96877067 + merkleroot: 0x7382df1b717287ab87e5e3e25759697c4c45eea428f701cdd0c77ad3fc707257 + height: 822015 + processed_at: 2023-12-15 14:30:00 + size: 20160000 + tx_count: 6523 + orphanedyn: false + status: 20 # STALE - competing block + chainwork: '123456' +- inserted_at: 2023-12-15 14:40:00 + id: 3 + hash: 0x0000000000000000082ec88d757ddaeb0aa87a5d5408b5960f27e7e67312dfe1 + prevhash: 0x00000000000000000659df0d3cf98ebe46931b67117502168418f9dce4e1b4c9 + merkleroot: 0x4b58b0402a84012269b124f78c91a78a814eb3c9caa03f1df1d33172b23082d1 + height: 822016 + processed_at: 2023-12-15 14:40:00 + size: 299650000 + tx_count: 62162 + orphanedyn: false + status: 20 # STALE + chainwork: '123456' +- inserted_at: 2023-12-15 14:50:00 + id: 4 + hash: 0x000000000000000004bf3e68405b31650559ff28d38a42b5e4f1440a865611ca + prevhash: 0x0000000000000000082ec88d757ddaeb0aa87a5d5408b5960f27e7e67312dfe1 + merkleroot: 0xc458aa382364e216c9c0533175ec8579a544c750ca181b18296e784d1dc53085 + height: 822017 + size: 8630000 + tx_count: 36724 + orphanedyn: false + status: 20 # STALE + chainwork: '123456' diff --git a/internal/blocktx/store/postgresql/get_block.go b/internal/blocktx/store/postgresql/get_block.go index 7537a1b8c..4733ab91c 100644 --- a/internal/blocktx/store/postgresql/get_block.go +++ b/internal/blocktx/store/postgresql/get_block.go @@ -46,14 +46,14 @@ func (p *PostgreSQL) queryBlockByPredicate(ctx context.Context, predicate string var block blocktx_api.Block - var processed_at sql.NullString + var processedAt sql.NullString if err := p.db.QueryRowContext(ctx, q, predicateParams...).Scan( &block.Hash, &block.PreviousHash, &block.MerkleRoot, &block.Height, - &processed_at, + &processedAt, &block.Orphaned, &block.Status, &block.Chainwork, @@ -64,7 +64,7 @@ func (p *PostgreSQL) queryBlockByPredicate(ctx context.Context, predicate string return nil, err } - block.Processed = processed_at.Valid + block.Processed = processedAt.Valid return &block, nil } diff --git a/internal/blocktx/store/postgresql/get_longest_chain.go b/internal/blocktx/store/postgresql/get_longest_chain.go new file mode 100644 index 000000000..9e8da6bc3 --- /dev/null +++ b/internal/blocktx/store/postgresql/get_longest_chain.go @@ -0,0 +1,57 @@ +package postgresql + +import ( + "context" + "database/sql" + + "github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api" +) + +func (p *PostgreSQL) GetLongestChainFromHeight(ctx context.Context, height uint64) ([]*blocktx_api.Block, error) { + q := ` + SELECT + hash + ,prevhash + ,merkleroot + ,height + ,processed_at + ,orphanedyn + ,status + ,chainwork + FROM blocktx.blocks + WHERE height >= $1 AND status = $2 + ` + + longestBlocks := make([]*blocktx_api.Block, 0) + + rows, err := p.db.QueryContext(ctx, q, height, blocktx_api.Status_LONGEST) + if err != nil { + return nil, err + } + defer rows.Close() + + for rows.Next() { + var block blocktx_api.Block + var processedAt sql.NullString + + err := rows.Scan( + &block.Hash, + &block.PreviousHash, + &block.MerkleRoot, + &block.Height, + &processedAt, + &block.Orphaned, + &block.Status, + &block.Chainwork, + ) + if err != nil { + return nil, err + } + + block.Processed = processedAt.Valid + + longestBlocks = append(longestBlocks, &block) + } + + return longestBlocks, nil +} diff --git a/internal/blocktx/store/postgresql/get_stale_chain.go b/internal/blocktx/store/postgresql/get_stale_chain.go new file mode 100644 index 000000000..31b000028 --- /dev/null +++ b/internal/blocktx/store/postgresql/get_stale_chain.go @@ -0,0 +1,98 @@ +package postgresql + +import ( + "context" + "database/sql" + + "github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api" +) + +// GetStaleChainBackFromHash is a function that recursively searches for blocks +// marked as STALE from the given hash - back to the block marked as LONGEST, +// which is the common ancestor for the STALE and LONGEST chains. +// +// It searches for the block by given hash and finds parent blocks recursively +// using the prevhash field from that found block. +// +// A In this scenario, the block A, B and D are marked as LONGEST +// |\ while blocks C and E are marked as STALE. +// B C +// | | Function GetStaleChainBackFromHash(ctx, E), given the hash E +// D E will return blocks C and E, which is the entire STALE chain. +func (p *PostgreSQL) GetStaleChainBackFromHash(ctx context.Context, hash []byte) ([]*blocktx_api.Block, error) { + // The way this query works, is that the result from the first SELECT + // will be stored in the prevBlocks variable, which is later used + // for recursion in the second SELECT. + // + // Then entire recursion happens in the second SELECT, after UNION ALL, + // and the first SELECT is just to set up the prevBlocks variable with + // the first, initial value. Then, the prevBlocks variable is recursively + // updated with values returned from the second SELECT. + q := ` + WITH RECURSIVE prevBlocks AS ( + SELECT + hash + ,prevhash + ,merkleroot + ,height + ,processed_at + ,orphanedyn + ,status + ,chainwork + FROM blocktx.blocks WHERE hash = $1 + UNION ALL + SELECT + b.hash + ,b.prevhash + ,b.merkleroot + ,b.height + ,b.processed_at + ,b.orphanedyn + ,b.status + ,b.chainwork + FROM blocktx.blocks b JOIN prevBlocks p ON b.hash = p.prevhash AND b.status = $2 + ) + SELECT + hash + ,prevhash + ,merkleroot + ,height + ,processed_at + ,orphanedyn + ,status + ,chainwork + FROM prevBlocks + ` + staleBlocks := make([]*blocktx_api.Block, 0) + + rows, err := p.db.QueryContext(ctx, q, hash, blocktx_api.Status_STALE) + if err != nil { + return nil, err + } + defer rows.Close() + + for rows.Next() { + var block blocktx_api.Block + var processedAt sql.NullString + + err := rows.Scan( + &block.Hash, + &block.PreviousHash, + &block.MerkleRoot, + &block.Height, + &processedAt, + &block.Orphaned, + &block.Status, + &block.Chainwork, + ) + if err != nil { + return nil, err + } + + block.Processed = processedAt.Valid + + staleBlocks = append(staleBlocks, &block) + } + + return staleBlocks, nil +} diff --git a/internal/blocktx/store/postgresql/migrations/000015_height_index.down.sql b/internal/blocktx/store/postgresql/migrations/000015_height_index.down.sql new file mode 100644 index 000000000..5230e8ff6 --- /dev/null +++ b/internal/blocktx/store/postgresql/migrations/000015_height_index.down.sql @@ -0,0 +1 @@ +DROP INDEX ix_block_height; diff --git a/internal/blocktx/store/postgresql/migrations/000015_height_index.up.sql b/internal/blocktx/store/postgresql/migrations/000015_height_index.up.sql new file mode 100644 index 000000000..a4cbca578 --- /dev/null +++ b/internal/blocktx/store/postgresql/migrations/000015_height_index.up.sql @@ -0,0 +1 @@ +CREATE INDEX ix_block_height ON blocktx.blocks(height); diff --git a/internal/blocktx/store/postgresql/postgres_test.go b/internal/blocktx/store/postgresql/postgres_test.go index 645a275af..d71c20c5d 100644 --- a/internal/blocktx/store/postgresql/postgres_test.go +++ b/internal/blocktx/store/postgresql/postgres_test.go @@ -218,6 +218,84 @@ func TestPostgresDB(t *testing.T) { require.ElementsMatch(t, expectedBlockGaps, blockGaps) }) + t.Run("get longest chain from height", func(t *testing.T) { + prepareDb(t, postgresDB.db, "fixtures/get_longest_chain") + + starting_height := uint64(822014) + hash0Longest := testutils.RevChainhash(t, "0000000000000000025855b62f4c2e3732dad363a6f2ead94e4657ef96877067") + hash1Longest := testutils.RevChainhash(t, "000000000000000003b15d668b54c4b91ae81a86298ee209d9f39fd7a769bcde") + + expectedStaleHashes := []*chainhash.Hash{ + hash0Longest, + hash1Longest, + } + + longestBlocks, err := postgresDB.GetLongestChainFromHeight(ctx, starting_height) + require.NoError(t, err) + + require.Equal(t, len(expectedStaleHashes), len(longestBlocks)) + for i, b := range longestBlocks { + require.Equal(t, expectedStaleHashes[i][:], b.Hash) + } + }) + + t.Run("get stale chain back from hash", func(t *testing.T) { + prepareDb(t, postgresDB.db, "fixtures/get_stale_chain") + + hash2Stale := testutils.RevChainhash(t, "00000000000000000659df0d3cf98ebe46931b67117502168418f9dce4e1b4c9") + hash3Stale := testutils.RevChainhash(t, "0000000000000000082ec88d757ddaeb0aa87a5d5408b5960f27e7e67312dfe1") + hash4Stale := testutils.RevChainhash(t, "000000000000000004bf3e68405b31650559ff28d38a42b5e4f1440a865611ca") + + expectedStaleHashes := [][]byte{ + hash4Stale[:], + hash3Stale[:], + hash2Stale[:], + } + + staleBlocks, err := postgresDB.GetStaleChainBackFromHash(ctx, hash4Stale[:]) + require.NoError(t, err) + + require.Equal(t, len(expectedStaleHashes), len(staleBlocks)) + for i, b := range staleBlocks { + require.Equal(t, expectedStaleHashes[i], b.Hash) + } + }) + + t.Run("update blocks statuses", func(t *testing.T) { + prepareDb(t, postgresDB.db, "fixtures/update_blocks_statuses") + + hash1Longest := testutils.RevChainhash(t, "000000000000000003b15d668b54c4b91ae81a86298ee209d9f39fd7a769bcde") + hash2Stale := testutils.RevChainhash(t, "00000000000000000659df0d3cf98ebe46931b67117502168418f9dce4e1b4c9") + hash3Stale := testutils.RevChainhash(t, "0000000000000000082ec88d757ddaeb0aa87a5d5408b5960f27e7e67312dfe1") + hash4Stale := testutils.RevChainhash(t, "000000000000000004bf3e68405b31650559ff28d38a42b5e4f1440a865611ca") + + blockStatusUpdates := []store.BlockStatusUpdate{ + {Hash: hash1Longest[:], Status: blocktx_api.Status_STALE}, + {Hash: hash2Stale[:], Status: blocktx_api.Status_LONGEST}, + {Hash: hash3Stale[:], Status: blocktx_api.Status_LONGEST}, + {Hash: hash4Stale[:], Status: blocktx_api.Status_LONGEST}, + } + + err := postgresDB.UpdateBlocksStatuses(ctx, blockStatusUpdates) + require.NoError(t, err) + + longest1, err := postgresDB.GetBlock(ctx, hash1Longest) + require.NoError(t, err) + require.Equal(t, blocktx_api.Status_STALE, longest1.Status) + + stale2, err := postgresDB.GetBlock(ctx, hash2Stale) + require.NoError(t, err) + require.Equal(t, blocktx_api.Status_LONGEST, stale2.Status) + + stale3, err := postgresDB.GetBlock(ctx, hash3Stale) + require.NoError(t, err) + require.Equal(t, blocktx_api.Status_LONGEST, stale3.Status) + + stale4, err := postgresDB.GetBlock(ctx, hash4Stale) + require.NoError(t, err) + require.Equal(t, blocktx_api.Status_LONGEST, stale4.Status) + }) + t.Run("test getting mined txs", func(t *testing.T) { prepareDb(t, postgresDB.db, "fixtures/get_mined_transactions") diff --git a/internal/blocktx/store/postgresql/update_block_statuses.go b/internal/blocktx/store/postgresql/update_block_statuses.go new file mode 100644 index 000000000..dc5e19dd3 --- /dev/null +++ b/internal/blocktx/store/postgresql/update_block_statuses.go @@ -0,0 +1,33 @@ +package postgresql + +import ( + "context" + + "github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api" + "github.com/bitcoin-sv/arc/internal/blocktx/store" + "github.com/lib/pq" +) + +func (p *PostgreSQL) UpdateBlocksStatuses(ctx context.Context, blockStatusUpdates []store.BlockStatusUpdate) error { + q := ` + UPDATE blocktx.blocks b + SET status = updates.status + FROM (SELECT * FROM UNNEST($1::BYTEA[], $2::INTEGER[]) AS u(hash, status)) AS updates + WHERE b.hash = updates.hash + ` + + blockHashes := make([][]byte, len(blockStatusUpdates)) + statuses := make([]blocktx_api.Status, len(blockStatusUpdates)) + + for i, update := range blockStatusUpdates { + blockHashes[i] = update.Hash + statuses[i] = update.Status + } + + _, err := p.db.ExecContext(ctx, q, pq.Array(blockHashes), pq.Array(statuses)) + if err != nil { + return err + } + + return nil +} diff --git a/internal/blocktx/store/store.go b/internal/blocktx/store/store.go index 94ca1c002..709f890b5 100644 --- a/internal/blocktx/store/store.go +++ b/internal/blocktx/store/store.go @@ -25,6 +25,9 @@ type BlocktxStore interface { GetBlockGaps(ctx context.Context, heightRange int) ([]*BlockGap, error) ClearBlocktxTable(ctx context.Context, retentionDays int32, table string) (*blocktx_api.RowsAffectedResponse, error) GetMinedTransactions(ctx context.Context, hashes []*chainhash.Hash) ([]GetMinedTransactionResult, error) + GetLongestChainFromHeight(ctx context.Context, height uint64) ([]*blocktx_api.Block, error) + GetStaleChainBackFromHash(ctx context.Context, hash []byte) ([]*blocktx_api.Block, error) + UpdateBlocksStatuses(ctx context.Context, blockStatusUpdates []BlockStatusUpdate) error SetBlockProcessing(ctx context.Context, hash *chainhash.Hash, processedBy string) (string, error) GetBlockHashesProcessingInProgress(ctx context.Context, processedBy string) ([]*chainhash.Hash, error)