Skip to content

Commit

Permalink
feat: (Reorg Support) Perform Reorg internally with blocktx statuses (#…
Browse files Browse the repository at this point in the history
…580)

* feat: reorg template

* feat: query for stale and longest blocks

* feat: perform reorg in database and update integration test

* feat: update tests to new utils

* feat: add one test

* feat: comment get_stale_blocks function and fix cammelCase variables

* feat: fix indentation

* fix: int64 to uint64 in logs

* feat: comment explaining the recursive query

* feat: change update status to perform one query instead of two

* feat: rename index
  • Loading branch information
kuba-4chain authored Sep 5, 2024
1 parent 2b723dd commit b51638e
Show file tree
Hide file tree
Showing 19 changed files with 768 additions and 145 deletions.
43 changes: 3 additions & 40 deletions internal/blocktx/integration_test/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,48 +2,11 @@ package integrationtest

import (
"database/sql"
"encoding/hex"
"fmt"
"log"
"testing"

"github.com/go-testfixtures/testfixtures/v3"
"github.com/libsv/go-p2p/chaincfg/chainhash"
"github.com/stretchr/testify/require"
testutils "github.com/bitcoin-sv/arc/internal/test_utils"
)

func revChainhash(t *testing.T, hashString string) *chainhash.Hash {
hash, err := hex.DecodeString(hashString)
require.NoError(t, err)
txHash, err := chainhash.NewHash(hash)
require.NoError(t, err)

return txHash
}

func loadFixtures(db *sql.DB, path string) error {
fixtures, err := testfixtures.New(
testfixtures.Database(db),
testfixtures.Dialect("postgresql"),
testfixtures.Directory(path), // The directory containing the YAML files
)
if err != nil {
log.Fatalf("failed to create fixtures: %v", err)
}

err = fixtures.Load()
if err != nil {
return fmt.Errorf("failed to load fixtures: %v", err)
}

return nil
}

func pruneTables(db *sql.DB) error {
_, err := db.Exec("TRUNCATE TABLE blocktx.blocks;")
if err != nil {
return err
}

return nil
func pruneTables(t *testing.T, db *sql.DB) {
testutils.PruneTables(t, db, "blocktx.blocks")
}
141 changes: 63 additions & 78 deletions internal/blocktx/integration_test/reorg_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package integrationtest
import (
"context"
"database/sql"
"errors"
"fmt"
"log"
"log/slog"
"os"
Expand All @@ -30,15 +28,13 @@ import (
"github.com/bitcoin-sv/arc/internal/blocktx"
"github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api"
"github.com/bitcoin-sv/arc/internal/blocktx/store/postgresql"
"github.com/golang-migrate/migrate/v4"
migratepostgres "github.com/golang-migrate/migrate/v4/database/postgres"
testutils "github.com/bitcoin-sv/arc/internal/test_utils"
_ "github.com/golang-migrate/migrate/v4/source/file"
_ "github.com/lib/pq"
"github.com/libsv/go-p2p"
"github.com/libsv/go-p2p/chaincfg/chainhash"
"github.com/libsv/go-p2p/wire"
"github.com/ory/dockertest/v3"
"github.com/ory/dockertest/v3/docker"
"github.com/stretchr/testify/require"
)

Expand All @@ -56,88 +52,46 @@ var (
)

func TestMain(m *testing.M) {
os.Exit(testmain(m))
}

func testmain(m *testing.M) int {
pool, err := dockertest.NewPool("")
if err != nil {
log.Fatalf("failed to create pool: %v", err)
return 1
}

port := "5436"
opts := dockertest.RunOptions{
Repository: "postgres",
Tag: "15.4",
Env: []string{
fmt.Sprintf("POSTGRES_PASSWORD=%s", dbPassword),
fmt.Sprintf("POSTGRES_USER=%s", dbUsername),
fmt.Sprintf("POSTGRES_DB=%s", dbName),
"listen_addresses = '*'",
},
ExposedPorts: []string{"5432"},
PortBindings: map[docker.Port][]docker.PortBinding{
postgresPort: {
{HostIP: "0.0.0.0", HostPort: port},
},
},
}

resource, err := pool.RunWithOptions(&opts, func(config *docker.HostConfig) {
config.AutoRemove = true
config.RestartPolicy = docker.RestartPolicy{
Name: "no",
}
config.Tmpfs = map[string]string{
"/var/lib/postgresql/data": "",
}
})
port := "5437"
resource, connStr, err := testutils.RunAndMigratePostgresql(pool, port, "blocktx", migrationsPath)
if err != nil {
log.Fatalf("failed to create resource: %v", err)
log.Print(err)
return 1
}
defer func() {
err = pool.Purge(resource)
if err != nil {
log.Fatalf("failed to purge pool: %v", err)
}
}()

hostPort := resource.GetPort("5432/tcp")
dbInfo = connStr

dbInfo = fmt.Sprintf("host=localhost port=%s user=%s password=%s dbname=%s sslmode=disable", hostPort, dbUsername, dbPassword, dbName)
dbConn, err = sql.Open("postgres", dbInfo)
if err != nil {
log.Fatalf("failed to create db connection: %v", err)
}
err = pool.Retry(func() error {
return dbConn.Ping()
})
if err != nil {
log.Fatalf("failed to connect to docker: %s", err)
}

driver, err := migratepostgres.WithInstance(dbConn, &migratepostgres.Config{
MigrationsTable: "metamorph",
})
if err != nil {
log.Fatalf("failed to create driver: %v", err)
}

migrations, err := migrate.NewWithDatabaseInstance(migrationsPath, "postgres", driver)
if err != nil {
log.Fatalf("failed to initialize migrate instance: %v", err)
}
err = migrations.Up()
if err != nil && !errors.Is(err, migrate.ErrNoChange) {
log.Fatalf("failed to initialize migrate instance: %v", err)
}

code := m.Run()

err = pool.Purge(resource)
if err != nil {
log.Fatalf("failed to purge pool: %v", err)
return 1
}

os.Exit(code)
return m.Run()
}

func TestBlockStatus(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}

defer require.NoError(t, pruneTables(dbConn))
defer pruneTables(t, dbConn)

logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))

Expand All @@ -154,7 +108,7 @@ func TestBlockStatus(t *testing.T) {
processor.StartBlockProcessing()

// test for empty database edge case before inserting fixtures
prevBlockHash := revChainhash(t, "00000000000000000a00c377b260a3219b0c314763f486bc363df7aa7e22ad72")
prevBlockHash := testutils.RevChainhash(t, "00000000000000000a00c377b260a3219b0c314763f486bc363df7aa7e22ad72")
txHash, err := chainhash.NewHashFromStr("be181e91217d5f802f695e52144078f8dfbe51b8a815c3d6fb48c0d853ec683b")
require.NoError(t, err)
merkleRoot, err := chainhash.NewHashFromStr("be181e91217d5f802f695e52144078f8dfbe51b8a815c3d6fb48c0d853ec683b")
Expand All @@ -177,17 +131,17 @@ func TestBlockStatus(t *testing.T) {
// Allow DB to process the block
time.Sleep(200 * time.Millisecond)

blockHash := blockMessage.Header.BlockHash()
blockHashZero := blockMessage.Header.BlockHash()

block, err := blocktxStore.GetBlock(context.Background(), &blockHash)
block, err := blocktxStore.GetBlock(context.Background(), &blockHashZero)
require.NoError(t, err)
require.Equal(t, uint64(822011), block.Height)
require.Equal(t, blocktx_api.Status_LONGEST, block.Status)

// only load fixtures at this point
require.NoError(t, loadFixtures(dbConn, "fixtures"))
testutils.LoadFixtures(t, dbConn, "fixtures")

prevBlockHash = revChainhash(t, "f97e20396f02ab990ed31b9aec70c240f48b7e5ea239aa050000000000000000")
prevBlockHash = testutils.RevChainhash(t, "f97e20396f02ab990ed31b9aec70c240f48b7e5ea239aa050000000000000000")
txHash, err = chainhash.NewHashFromStr("be181e91217d5f802f695e52144078f8dfbe51b8a815c3d6fb48c0d853ec683b")
require.NoError(t, err)
merkleRoot, err = chainhash.NewHashFromStr("be181e91217d5f802f695e52144078f8dfbe51b8a815c3d6fb48c0d853ec683b")
Expand All @@ -210,18 +164,19 @@ func TestBlockStatus(t *testing.T) {
// Allow DB to process the block
time.Sleep(200 * time.Millisecond)

blockHash = blockMessage.Header.BlockHash()
blockHashStale := blockMessage.Header.BlockHash()

block, err = blocktxStore.GetBlock(context.Background(), &blockHash)
block, err = blocktxStore.GetBlock(context.Background(), &blockHashStale)
require.NoError(t, err)
require.Equal(t, uint64(822015), block.Height)
require.Equal(t, blocktx_api.Status_STALE, block.Status)

// should become LONGEST
// reorg should happen
blockMessage = &p2p.BlockMessage{
Header: &wire.BlockHeader{
Version: 541065216,
PrevBlock: blockHash, // block with status STALE at height 822015
PrevBlock: blockHashStale, // block with status STALE at height 822015
MerkleRoot: *merkleRoot,
Bits: 0x1a05db8b, // chainwork: "12301577519373468" higher than the competing block
},
Expand All @@ -231,13 +186,43 @@ func TestBlockStatus(t *testing.T) {

err = peerHandler.HandleBlock(blockMessage, nil)
require.NoError(t, err)
// Allow DB to process the block
time.Sleep(200 * time.Millisecond)
// Allow DB to process the block and perform reorg
time.Sleep(1 * time.Second)

blockHash = blockMessage.Header.BlockHash()
// verify that reorg happened
blockHashLongest := blockMessage.Header.BlockHash()

block, err = blocktxStore.GetBlock(context.Background(), &blockHash)
block, err = blocktxStore.GetBlock(context.Background(), &blockHashLongest)
require.NoError(t, err)
require.Equal(t, uint64(822016), block.Height)
require.Equal(t, blocktx_api.Status_LONGEST, block.Status)

block, err = blocktxStore.GetBlock(context.Background(), &blockHashStale)
require.NoError(t, err)
require.Equal(t, uint64(822015), block.Height)
require.Equal(t, blocktx_api.Status_LONGEST, block.Status)

previouslyLongestBlockHash := testutils.RevChainhash(t, "c9b4e1e4dcf9188416027511671b9346be8ef93c0ddf59060000000000000000")
block, err = blocktxStore.GetBlock(context.Background(), previouslyLongestBlockHash)
require.NoError(t, err)
require.Equal(t, uint64(822015), block.Height)
require.Equal(t, blocktx_api.Status_STALE, block.Status)

previouslyLongestBlockHash = testutils.RevChainhash(t, "e1df1273e6e7270f96b508545d7aa80aebda7d758dc82e080000000000000000")
block, err = blocktxStore.GetBlock(context.Background(), previouslyLongestBlockHash)
require.NoError(t, err)
require.Equal(t, uint64(822016), block.Height)
require.Equal(t, blocktx_api.Status_STALE, block.Status)

previouslyLongestBlockHash = testutils.RevChainhash(t, "76404890880cb36ce68100abb05b3a958e17c0ed274d5c0a0000000000000000")
block, err = blocktxStore.GetBlock(context.Background(), previouslyLongestBlockHash)
require.NoError(t, err)
require.Equal(t, uint64(822017), block.Height)
require.Equal(t, blocktx_api.Status_STALE, block.Status)

beginningOfChain := testutils.RevChainhash(t, "f97e20396f02ab990ed31b9aec70c240f48b7e5ea239aa050000000000000000")
block, err = blocktxStore.GetBlock(context.Background(), beginningOfChain)
require.NoError(t, err)
require.Equal(t, uint64(822014), block.Height)
require.Equal(t, blocktx_api.Status_LONGEST, block.Status)
}
45 changes: 42 additions & 3 deletions internal/blocktx/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ func (p *Processor) fillGaps(peer p2p.PeerI) error {
break
}

p.logger.Info("Requesting missing block", slog.String("hash", gaps.Hash.String()), slog.Int64("height", int64(gaps.Height)), slog.String("peer", peer.String()))
p.logger.Info("Requesting missing block", slog.String("hash", gaps.Hash.String()), slog.Uint64("height", gaps.Height), slog.String("peer", peer.String()))

pair := BlockRequest{
Hash: gaps.Hash,
Expand Down Expand Up @@ -447,8 +447,15 @@ func (p *Processor) processBlock(msg *p2p.BlockMessage) error {
incomingBlock.Status = blocktx_api.Status_STALE

if hasGreatestChainwork {
// TODO: perform reorg - next ticket
p.logger.Info("reorg detected - updating blocks", slog.String("hash", blockHash.String()), slog.Uint64("height", incomingBlock.Height))

incomingBlock.Status = blocktx_api.Status_LONGEST

err := p.performReorg(ctx, incomingBlock)
if err != nil {
p.logger.Error("unable to perform reorg", slog.String("hash", blockHash.String()), slog.Uint64("height", incomingBlock.Height), slog.String("err", err.Error()))
return err
}
}
}

Expand Down Expand Up @@ -556,6 +563,38 @@ func (p *Processor) hasGreatestChainwork(ctx context.Context, incomingBlock *blo
return tipChainWork.Cmp(incomingBlockChainwork) < 0, nil
}

func (p *Processor) performReorg(ctx context.Context, incomingBlock *blocktx_api.Block) error {
staleBlocks, err := p.store.GetStaleChainBackFromHash(ctx, incomingBlock.PreviousHash)
if err != nil {
return err
}

lowestHeight := incomingBlock.Height
if len(staleBlocks) > 0 {
lowestHeight = getLowestHeight(staleBlocks)
}

longestBlocks, err := p.store.GetLongestChainFromHeight(ctx, lowestHeight)
if err != nil {
return err
}

blockStatusUpdates := make([]store.BlockStatusUpdate, 0)

for _, b := range staleBlocks {
update := store.BlockStatusUpdate{Hash: b.Hash, Status: blocktx_api.Status_LONGEST}
blockStatusUpdates = append(blockStatusUpdates, update)
}

for _, b := range longestBlocks {
update := store.BlockStatusUpdate{Hash: b.Hash, Status: blocktx_api.Status_STALE}
blockStatusUpdates = append(blockStatusUpdates, update)
}

err = p.store.UpdateBlocksStatuses(ctx, blockStatusUpdates)
return err
}

func (p *Processor) markTransactionsAsMined(ctx context.Context, blockId uint64, merkleTree []*chainhash.Hash, blockHeight uint64, blockhash *chainhash.Hash) error {
if tracer != nil {
var span trace.Span
Expand Down Expand Up @@ -653,7 +692,7 @@ func (p *Processor) markTransactionsAsMined(ctx context.Context, blockId uint64,
}
err = p.mqClient.PublishMarshal(MinedTxsTopic, txBlock)
if err != nil {
p.logger.Error("failed to publish mined txs", slog.String("hash", blockhash.String()), slog.Int64("height", int64(blockHeight)), slog.String("err", err.Error()))
p.logger.Error("failed to publish mined txs", slog.String("hash", blockhash.String()), slog.Uint64("height", blockHeight), slog.String("err", err.Error()))
}
}

Expand Down
15 changes: 15 additions & 0 deletions internal/blocktx/processor_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,21 @@ func createBlock(msg *p2p.BlockMessage, prevBlock *blocktx_api.Block, longestTip
}
}

func getLowestHeight(blocks []*blocktx_api.Block) uint64 {
if len(blocks) == 0 {
return 0
}

lowest := blocks[0].Height
for _, b := range blocks {
if b.Height < lowest {
lowest = b.Height
}
}

return lowest
}

// calculateChainwork calculates chainwork from the given difficulty bits
//
// This function comes from block-header-service:
Expand Down
Loading

0 comments on commit b51638e

Please sign in to comment.