Skip to content

Commit

Permalink
Add catch-up sync for EVM space (#274)
Browse files Browse the repository at this point in the history
Also update benchmark command to support eSpace catchup
  • Loading branch information
wanliqun authored Feb 7, 2025
1 parent 8e1cc1d commit 4ac2684
Show file tree
Hide file tree
Showing 10 changed files with 532 additions and 228 deletions.
41 changes: 30 additions & 11 deletions cmd/benchmark/catchup.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ const (
)

type CatchUpCmdConfig struct {
Mode string // catch-up mode ("classic" or "boost")
Start uint64 // start epoch/block number
Count uint64 // number of epochs/blocks to sync
Network string // network space ("cfx" or "eth")
Mode string // catch-up mode ("classic" or "boost")
Start uint64 // start epoch/block number
Count uint64 // number of epochs/blocks to sync
}

var (
Expand All @@ -38,6 +39,9 @@ var (
Short: "Start catch-up benchmark testing",
Run: runCatchUpBenchmark,
PreRunE: func(cmd *cobra.Command, args []string) error {
if network := catchUpConfig.Network; !isValidNetwork(network) {
return fmt.Errorf("invalid network '%s', allowed values are 'cfx' or 'eth'", network)
}
if mode := catchUpConfig.Mode; !isValidCatchUpMode(mode) {
return fmt.Errorf("invalid mode '%s', allowed values are 'classic' or 'boost'", mode)
}
Expand All @@ -56,6 +60,11 @@ func init() {

// hookCatchUpCmdFlags configures the command-line flags for the catch-up command.
func hookCatchUpCmdFlags(cmd *cobra.Command) {
cmd.Flags().StringVarP(
&catchUpConfig.Network, "network", "n", "cfx", "network space ('cfx' or 'eth')",
)
cmd.MarkFlagRequired("network")

cmd.Flags().StringVarP(
&catchUpConfig.Mode, "mode", "m", "", "catch-up mode ('classic' or 'boost')",
)
Expand Down Expand Up @@ -102,7 +111,7 @@ func runCatchUpBenchmark(cmd *cobra.Command, args []string) {
// initializeContexts sets up and returns the required store and sync contexts.
func initializeContexts() (*util.StoreContext, *util.SyncContext, error) {
storeCtx := util.MustInitStoreContext()
if storeCtx.CfxDB == nil {
if storeCtx.CfxDB == nil || storeCtx.EthDB == nil {
return nil, nil, errors.New("database is not provided")
}

Expand All @@ -112,7 +121,18 @@ func initializeContexts() (*util.StoreContext, *util.SyncContext, error) {

// createCatchUpSyncer initializes the catch-up syncer with the necessary dependencies.
func createCatchUpSyncer(syncCtx *util.SyncContext, storeCtx *util.StoreContext) *catchup.Syncer {
return catchup.MustNewSyncer(
if catchUpConfig.Network == "eth" {
return catchup.MustNewEthSyncer(
syncCtx.SyncEths,
storeCtx.EthDB,
election.NewNoopLeaderManager(),
&monitor.Monitor{},
catchUpConfig.Start,
catchup.WithBenchmark(true),
)
}

return catchup.MustNewCfxSyncer(
syncCtx.SyncCfxs,
storeCtx.CfxDB,
election.NewNoopLeaderManager(),
Expand All @@ -124,12 +144,11 @@ func createCatchUpSyncer(syncCtx *util.SyncContext, storeCtx *util.StoreContext)

// isValidCatchUpMode checks if the provided mode is valid.
func isValidCatchUpMode(mode string) bool {
switch mode {
case string(ModeClassic), string(ModeBoost):
return true
default:
return false
}
return mode == string(ModeClassic) || mode == string(ModeBoost)
}

func isValidNetwork(network string) bool {
return network == "cfx" || network == "eth"
}

// reportRpcMetrics outputs RPC-related metrics based on the sync mode.
Expand Down
46 changes: 24 additions & 22 deletions config/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -101,28 +101,38 @@ eth:
# # Blockchain sync configurations
# sync:
# # Core space sync configurations
# #
# # Pub/Sub configurations
# sub:
# # Channel size to buffer notified epoch response
# buffer: 1000
# # Whether to use `epoch_getEpochReceipts` to batch get receipts
# useBatch: false
# # The epoch number from which to sync core space
# fromEpoch: 0
# # Maximum number of epochs to batch sync once
# maxEpochs: 10
# cfx:
# # Whether to use `epoch_getEpochReceipts` to batch get receipts
# useBatch: false
# # The epoch number from which to sync core space
# fromEpoch: 0
# # Maximum number of epochs to batch sync once
# maxEpochs: 10

# # EVM space sync configurations
# eth:
# # The block number from which to sync evm space, better use the evm space hardfork point:
# # for mainnet it is 36935000, for testnet it is 61465000
# fromBlock: 61465000
# # Maximum number of blocks to batch sync ETH data once
# maxBlocks: 10

# # Blacklisted contract address(es) whose event logs will be ignored until some specific
# # epoch height, with 0 means always.
# blackListAddrs: >
# [
# {"address": "cfx:acav5v98np8t3m66uw7x61yer1ja1jm0dpzj1zyzxv", "epoch": 0}
# ]

# # Fast cache-up sync configuration
# catchup:
# # Pool of fullnodes for catching up. There will be 1 goroutine per fullnode or
# # the catch up will be disabled if none fullnode provided.
# cfxPool: [http://test.confluxrpc.com]
# # Node pool for catching up. Each full node will run in a separate goroutine.
# # Catch-up will be disabled if no full node is provided.
# nodePool:
# # For core space node pool
# cfx: [http://test.confluxrpc.com]
# # For Ethereum-compatible node pool
# eth: [http://evmtestnet.confluxrpc.com]
# # Threshold for number of db rows per batch persistence
# dbRowsThreshold: 2500
# # Max number of db rows collected before persistence to restrict memory usage
Expand Down Expand Up @@ -155,14 +165,6 @@ eth:
# # Force persistence interval
# forcePersistenceInterval: 45s

# # EVM space sync configurations
# eth:
# # The block number from which to sync evm space, better use the evm space hardfork point:
# # for mainnet it is 36935000, for testnet it is 61465000
# fromBlock: 61465000
# # Maximum number of blocks to batch sync ETH data once
# maxBlocks: 10

# # HA leader/follower election.
# election:
# # Enable/disable leader election
Expand Down
41 changes: 41 additions & 0 deletions rpc/cfxbridge/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cfxbridge
import (
"math/big"

"github.com/Conflux-Chain/confura/store"
"github.com/Conflux-Chain/go-conflux-sdk/types"
"github.com/Conflux-Chain/go-conflux-sdk/types/cfxaddress"
"github.com/Conflux-Chain/go-conflux-sdk/types/cmptutil"
Expand Down Expand Up @@ -334,3 +335,43 @@ func ConvertLogFilter(fq *ethTypes.FilterQuery, ethNetworkId uint32) *types.LogF

return lf
}

// ConvertToEpochData converts evm space block data to core space epoch data. This is used to bridge
// eth block data with epoch data to reuse code logic eg., db store logic.
func ConvertToEpochData(ethData *store.EthData, chainId uint32) *store.EpochData {
epochData := &store.EpochData{
Number: ethData.Number,
Receipts: make(map[types.Hash]*types.TransactionReceipt),
ReceiptExts: make(map[types.Hash]*store.ReceiptExtra),
}

pivotBlock := ConvertBlock(ethData.Block, chainId)
epochData.Blocks = []*types.Block{pivotBlock}

blockExt := store.ExtractEthBlockExt(ethData.Block)
epochData.BlockExts = []*store.BlockExtra{blockExt}

for txh, rcpt := range ethData.Receipts {
txRcpt := ConvertReceipt(rcpt, chainId)
txHash := ConvertHash(txh)

epochData.Receipts[txHash] = txRcpt
epochData.ReceiptExts[txHash] = store.ExtractEthReceiptExt(rcpt)
}

// Transaction `status` field is not a standard field for evm-compatible chain, so we have
// to manually fill this field from their receipt.
for i := range pivotBlock.Transactions {
if pivotBlock.Transactions[i].Status != nil {
continue
}

txnHash := pivotBlock.Transactions[i].Hash
if rcpt, ok := epochData.Receipts[txnHash]; ok && rcpt != nil {
txnStatus := rcpt.OutcomeStatus
pivotBlock.Transactions[i].Status = &txnStatus
}
}

return epochData
}
114 changes: 5 additions & 109 deletions sync/catchup/boost.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ import (
"github.com/Conflux-Chain/confura/store"
"github.com/Conflux-Chain/confura/types"
"github.com/Conflux-Chain/confura/util/metrics"
cfxTypes "github.com/Conflux-Chain/go-conflux-sdk/types"
"github.com/Conflux-Chain/go-conflux-util/health"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -572,118 +570,16 @@ type boostWorker struct {
// queryEpochData fetches blocks and logs for a given epoch range to construct a minimal `EpochData`
// using `cfx_getLogs` for best peformance.
func (w *boostWorker) queryEpochData(fromEpoch, toEpoch uint64) (res []*store.EpochData, err error) {
space := w.client.Space()
startTime := time.Now()

defer func() {
metrics.Registry.Sync.BoostQueryEpochData("cfx").UpdateSince(startTime)
metrics.Registry.Sync.BoostQueryEpochDataAvailability("cfx").Mark(err == nil)
metrics.Registry.Sync.BoostQueryEpochData(space).UpdateSince(startTime)
metrics.Registry.Sync.BoostQueryEpochDataAvailability(space).Mark(err == nil)
if err == nil {
metrics.Registry.Sync.BoostQueryEpochRange().Update(int64(toEpoch - fromEpoch + 1))
}
}()

// Retrieve event logs within the specified epoch range
logFilter := cfxTypes.LogFilter{
FromEpoch: cfxTypes.NewEpochNumberUint64(fromEpoch),
ToEpoch: cfxTypes.NewEpochNumberUint64(toEpoch),
}
logs, err := w.cfx.GetLogs(logFilter)
if err != nil {
return nil, errors.WithMessage(err, "failed to get event logs")
}

var logCursor int
for epochNum := fromEpoch; epochNum <= toEpoch; epochNum++ {
// Initialize epoch data for the current epoch
epochData := &store.EpochData{
Number: epochNum,
Receipts: make(map[cfxTypes.Hash]*cfxTypes.TransactionReceipt),
}

var blockHashes []cfxTypes.Hash
blockHashes, err = w.cfx.GetBlocksByEpoch(cfxTypes.NewEpochNumberUint64(epochNum))
if err != nil {
return nil, errors.WithMessagef(err, "failed to get blocks by epoch %v", epochNum)
}
if len(blockHashes) == 0 {
err = errors.Errorf("invalid epoch data (must have at least one block)")
return nil, err
}

// Cache to store blocks fetched by their hash to avoid repeated network calls
blockCache := make(map[cfxTypes.Hash]*cfxTypes.Block)

// Get the first and last block of the epoch
for _, bh := range []cfxTypes.Hash{blockHashes[0], blockHashes[len(blockHashes)-1]} {
if _, ok := blockCache[bh]; ok {
continue
}

var block *cfxTypes.Block
block, err = w.cfx.GetBlockByHash(bh)
if err != nil {
return nil, errors.WithMessagef(err, "failed to get block by hash %v", bh)
}
if block == nil {
err = errors.Errorf("block %v not found", bh)
return nil, err
}
blockCache[bh] = block
}

// Process logs that belong to the current epoch
for ; logCursor < len(logs); logCursor++ {
if logs[logCursor].EpochNumber.ToInt().Uint64() != epochNum {
// Move to next epoch data construction if current log doesn't belong here
break
}

// Retrieve or fetch the block associated with the current log
blockHash := logs[logCursor].BlockHash
if _, ok := blockCache[*blockHash]; !ok {
var block *cfxTypes.Block
block, err = w.cfx.GetBlockByHash(*blockHash)
if err != nil {
return nil, errors.WithMessagef(err, "failed to get block by hash %v", *blockHash)
}
if block == nil {
err = errors.Errorf("block %v not found", *blockHash)
return nil, err
}
blockCache[*blockHash] = block
}

// Retrieve or initialize the transaction receipt associated with the current log
txnHash := logs[logCursor].TransactionHash
txnReceipt, ok := epochData.Receipts[*txnHash]
if !ok {
txnReceipt = &cfxTypes.TransactionReceipt{
EpochNumber: (*hexutil.Uint64)(&epochNum),
BlockHash: *blockHash,
TransactionHash: *txnHash,
}

epochData.Receipts[*txnHash] = txnReceipt
}

// Append the current log to the transaction receipt's logs
txnReceipt.Logs = append(txnReceipt.Logs, logs[logCursor])
}

// Append all necessary blocks for the epoch
for _, bh := range blockHashes {
if block, ok := blockCache[bh]; ok {
epochData.Blocks = append(epochData.Blocks, block)
}
}

// Append the constructed epoch data to the result list
res = append(res, epochData)
}

if logCursor != len(logs) {
err = errors.Errorf("failed to process all logs: processed %v, total %v", logCursor, len(logs))
return nil, err
}

return res, nil
return w.client.BoostQueryEpochData(context.Background(), fromEpoch, toEpoch)
}
7 changes: 5 additions & 2 deletions sync/catchup/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package catchup
import "time"

type config struct {
// list of Conflux fullnodes to accelerate catching up until the latest stable epoch
CfxPool []string
// Pool of full nodes to accelerate catching up until the latest stable epoch
NodePool struct {
Cfx []string // for core space
Eth []string // for evm space
}
// threshold for num of db rows per batch persistence
DbRowsThreshold int `default:"2500"`
// max number of db rows collected before persistence
Expand Down
Loading

0 comments on commit 4ac2684

Please sign in to comment.