Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(permissionless batches): recovery mode after permissionless batches #1073

Draft
wants to merge 8 commits into
base: syncUpstream/active
Choose a base branch
from
5 changes: 5 additions & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ var (
utils.DABlockNativeAPIEndpointFlag,
utils.DABlobScanAPIEndpointFlag,
utils.DABeaconNodeAPIEndpointFlag,
utils.DARecoveryModeFlag,
utils.DARecoveryInitialL1BlockFlag,
utils.DARecoveryInitialBatchFlag,
utils.DARecoverySignBlocksFlag,
utils.DARecoveryL2EndBlockFlag,
}, utils.NetworkFlags, utils.DatabaseFlags)

rpcFlags = []cli.Flag{
Expand Down
35 changes: 35 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -1053,6 +1053,26 @@ Please note that --` + MetricsHTTPFlag.Name + ` must be set to start the server.
Name: "da.blob.beaconnode",
Usage: "Beacon node API endpoint",
}
DARecoveryModeFlag = &cli.BoolFlag{
Name: "da.recovery",
Usage: "Enable recovery mode for DA syncing",
}
DARecoveryInitialL1BlockFlag = &cli.Uint64Flag{
Name: "da.recovery.initiall1block",
Usage: "Initial L1 block to start recovery from",
}
DARecoveryInitialBatchFlag = &cli.Uint64Flag{
Name: "da.recovery.initialbatch",
Usage: "Initial batch to start recovery from",
}
DARecoverySignBlocksFlag = &cli.BoolFlag{
Name: "da.recovery.signblocks",
Usage: "Sign blocks during recovery (requires correct Clique signer key and history of blocks with Clique signatures)",
}
DARecoveryL2EndBlockFlag = &cli.Uint64Flag{
Name: "da.recovery.l2endblock",
Usage: "End L2 block to recover to",
}
)

var (
Expand Down Expand Up @@ -1816,6 +1836,21 @@ func setDA(ctx *cli.Context, cfg *ethconfig.Config) {
if ctx.IsSet(DABeaconNodeAPIEndpointFlag.Name) {
cfg.DA.BeaconNodeAPIEndpoint = ctx.String(DABeaconNodeAPIEndpointFlag.Name)
}
if ctx.IsSet(DARecoveryModeFlag.Name) {
cfg.DA.RecoveryMode = ctx.Bool(DARecoveryModeFlag.Name)
}
if ctx.IsSet(DARecoveryInitialL1BlockFlag.Name) {
cfg.DA.InitialL1Block = ctx.Uint64(DARecoveryInitialL1BlockFlag.Name)
}
if ctx.IsSet(DARecoveryInitialBatchFlag.Name) {
cfg.DA.InitialBatch = ctx.Uint64(DARecoveryInitialBatchFlag.Name)
}
if ctx.IsSet(DARecoverySignBlocksFlag.Name) {
cfg.DA.SignBlocks = ctx.Bool(DARecoverySignBlocksFlag.Name)
}
if ctx.IsSet(DARecoveryL2EndBlockFlag.Name) {
cfg.DA.L2EndBlock = ctx.Uint64(DARecoveryL2EndBlockFlag.Name)
}
}
}

Expand Down
52 changes: 46 additions & 6 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2021,15 +2021,15 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
return it.index, err
}

func (bc *BlockChain) BuildAndWriteBlock(parentBlock *types.Block, header *types.Header, txs types.Transactions) (WriteStatus, error) {
func (bc *BlockChain) BuildAndWriteBlock(parentBlock *types.Block, header *types.Header, txs types.Transactions, sign bool) (*types.Block, WriteStatus, error) {
if !bc.chainmu.TryLock() {
return NonStatTy, errInsertionInterrupted
return nil, NonStatTy, errInsertionInterrupted
}
defer bc.chainmu.Unlock()

statedb, err := state.New(parentBlock.Root(), bc.stateCache, bc.snaps)
if err != nil {
return NonStatTy, err
return nil, NonStatTy, err
}

statedb.StartPrefetcher("l1sync")
Expand All @@ -2040,18 +2040,51 @@ func (bc *BlockChain) BuildAndWriteBlock(parentBlock *types.Block, header *types
tempBlock := types.NewBlockWithHeader(header).WithBody(txs, nil)
receipts, logs, gasUsed, err := bc.processor.Process(tempBlock, statedb, bc.vmConfig)
if err != nil {
return NonStatTy, fmt.Errorf("error processing block: %w", err)
return nil, NonStatTy, fmt.Errorf("error processing block %d: %w", header.Number.Uint64(), err)
}

// TODO: once we have the extra and difficulty we need to verify the signature of the block with Clique
// This should be done with https://github.com/scroll-tech/go-ethereum/pull/913.

// finalize and assemble block as fullBlock
if sign {
// remember the time as Clique will override it
originalTime := header.Time

err = bc.engine.Prepare(bc, header)
if err != nil {
return nil, NonStatTy, fmt.Errorf("error preparing block %d: %w", tempBlock.Number().Uint64(), err)
}

// we want to re-sign the block: set time to original value again.
header.Time = originalTime
}

// finalize and assemble block as fullBlock: replicates consensus.FinalizeAndAssemble()
header.GasUsed = gasUsed
header.Root = statedb.IntermediateRoot(bc.chainConfig.IsEIP158(header.Number))

fullBlock := types.NewBlock(header, txs, nil, receipts, trie.NewStackTrie(nil))

// Sign the block if requested
if sign {
resultCh, stopCh := make(chan *types.Block), make(chan struct{})
if err = bc.engine.Seal(bc, fullBlock, resultCh, stopCh); err != nil {
return nil, NonStatTy, fmt.Errorf("error sealing block %d: %w", fullBlock.Number().Uint64(), err)
}
// Clique.Seal() will only wait for a second before giving up on us. So make sure there is nothing computational heavy
// or a call that blocks between the call to Seal and the line below. Seal might introduce some delay, so we keep track of
// that artificially added delay and subtract it from overall runtime of commit().
fullBlock = <-resultCh
if fullBlock == nil {
return nil, NonStatTy, fmt.Errorf("sealing block failed %d: block is nil", header.Number.Uint64())
}

// verify the generated block with local consensus engine to make sure everything is as expected
if err = bc.engine.VerifyHeader(bc, fullBlock.Header()); err != nil {
return nil, NonStatTy, fmt.Errorf("error verifying signed block %d: %w", fullBlock.Number().Uint64(), err)
}
}

blockHash := fullBlock.Hash()
// manually replace the block hash in the receipts
for i, receipt := range receipts {
Expand All @@ -2068,7 +2101,14 @@ func (bc *BlockChain) BuildAndWriteBlock(parentBlock *types.Block, header *types
l.BlockHash = blockHash
}

return bc.writeBlockAndSetHead(fullBlock, receipts, logs, statedb, false)
// Double check: even though we just built the block, make sure it is valid.
if err = bc.validator.ValidateState(fullBlock, statedb, receipts, gasUsed); err != nil {
bc.reportBlock(fullBlock, receipts, err)
return nil, NonStatTy, fmt.Errorf("error validating block %d: %w", fullBlock.Number().Uint64(), err)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not that expensive to run but I can't see a reason that this would ever fail.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well we're creating/assembling the block just before, but during development and testing it actually happened that this process was faulty and this method caught it. That's why I left it in.

}

writeStatus, err := bc.writeBlockAndSetHead(fullBlock, receipts, logs, statedb, false)
colinlyguo marked this conversation as resolved.
Show resolved Hide resolved
return fullBlock, writeStatus, err
}

// insertSideChain is called when an import batch hits upon a pruned ancestor
Expand Down
3 changes: 3 additions & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,9 @@ func New(stack *node.Node, config *ethconfig.Config, l1Client sync_service.EthCl
// simply let them run simultaneously. If messages are missing in DA syncing, it will be handled by the syncing pipeline
// by waiting and retrying.
if config.EnableDASyncing {
// Enable CCC if flag is set so that row consumption can be generated.
config.DA.CCCEnable = config.CheckCircuitCapacity
config.DA.CCCNumWorkers = config.CCCMaxWorkers
eth.syncingPipeline, err = da_syncer.NewSyncingPipeline(context.Background(), eth.blockchain, chainConfig, eth.chainDb, l1Client, stack.Config().L1DeploymentBlock, config.DA)
if err != nil {
return nil, fmt.Errorf("cannot initialize da syncer: %w", err)
Expand Down
29 changes: 20 additions & 9 deletions rollup/da_syncer/da_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@ import (

// DAQueue is a pipeline stage that reads DA entries from a DataSource and provides them to the next stage.
type DAQueue struct {
l1height uint64
initialBatch uint64
l1height uint64

dataSourceFactory *DataSourceFactory
dataSource DataSource
da da.Entries
}

func NewDAQueue(l1height uint64, dataSourceFactory *DataSourceFactory) *DAQueue {
func NewDAQueue(l1height uint64, initialBatch uint64, dataSourceFactory *DataSourceFactory) *DAQueue {
return &DAQueue{
initialBatch: initialBatch,
l1height: l1height,
dataSourceFactory: dataSourceFactory,
dataSource: nil,
Expand All @@ -26,15 +29,23 @@ func NewDAQueue(l1height uint64, dataSourceFactory *DataSourceFactory) *DAQueue
}

func (dq *DAQueue) NextDA(ctx context.Context) (da.Entry, error) {
for len(dq.da) == 0 {
err := dq.getNextData(ctx)
if err != nil {
return nil, err
for {
for len(dq.da) == 0 {
err := dq.getNextData(ctx)
if err != nil {
return nil, err
}
}

daEntry := dq.da[0]
dq.da = dq.da[1:]

if daEntry.BatchIndex() < dq.initialBatch {
continue
}

return daEntry, nil
}
daEntry := dq.da[0]
dq.da = dq.da[1:]
return daEntry, nil
}

func (dq *DAQueue) getNextData(ctx context.Context) error {
Expand Down
55 changes: 46 additions & 9 deletions rollup/da_syncer/da_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (

"github.com/scroll-tech/go-ethereum/core"
"github.com/scroll-tech/go-ethereum/log"
"github.com/scroll-tech/go-ethereum/rollup/ccc"
"github.com/scroll-tech/go-ethereum/rollup/da_syncer/da"
"github.com/scroll-tech/go-ethereum/rollup/da_syncer/serrors"
)

var (
Expand All @@ -14,35 +16,70 @@ var (
)

type DASyncer struct {
blockchain *core.BlockChain
asyncChecker *ccc.AsyncChecker
l2EndBlock uint64
blockchain *core.BlockChain
}

func NewDASyncer(blockchain *core.BlockChain) *DASyncer {
return &DASyncer{
func NewDASyncer(blockchain *core.BlockChain, cccEnable bool, cccNumWorkers int, l2EndBlock uint64) *DASyncer {
s := &DASyncer{
l2EndBlock: l2EndBlock,
blockchain: blockchain,
}

if cccEnable {
s.asyncChecker = ccc.NewAsyncChecker(blockchain, cccNumWorkers, false)
}

return s
}

// SyncOneBlock receives a PartialBlock, makes sure it's the next block in the chain, executes it and inserts it to the blockchain.
func (s *DASyncer) SyncOneBlock(block *da.PartialBlock) error {
func (s *DASyncer) SyncOneBlock(block *da.PartialBlock, override bool, sign bool) error {
currentBlock := s.blockchain.CurrentBlock()

// we expect blocks to be consecutive. block.PartialHeader.Number == parentBlock.Number+1.
if block.PartialHeader.Number <= currentBlock.Number.Uint64() {
// if override is true, we allow blocks to be lower than the current block number and replace the blocks.
if !override && block.PartialHeader.Number <= currentBlock.Number.Uint64() {
log.Debug("block number is too low", "block number", block.PartialHeader.Number, "parent block number", currentBlock.Number.Uint64())
return ErrBlockTooLow
} else if block.PartialHeader.Number > currentBlock.Number.Uint64()+1 {
log.Debug("block number is too high", "block number", block.PartialHeader.Number, "parent block number", currentBlock.Number.Uint64())
return ErrBlockTooHigh
}

parentBlock := s.blockchain.GetBlockByNumber(currentBlock.Number.Uint64())
if _, err := s.blockchain.BuildAndWriteBlock(parentBlock, block.PartialHeader.ToHeader(), block.Transactions); err != nil {
parentBlockNumber := currentBlock.Number.Uint64()
if override {
parentBlockNumber = block.PartialHeader.Number - 1
}

parentBlock := s.blockchain.GetBlockByNumber(parentBlockNumber)
if parentBlock == nil {
return fmt.Errorf("failed getting parent block, number: %d", parentBlockNumber)
}

fullBlock, _, err := s.blockchain.BuildAndWriteBlock(parentBlock, block.PartialHeader.ToHeader(), block.Transactions, sign)
colinlyguo marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("failed building and writing block, number: %d, error: %v", block.PartialHeader.Number, err)
}

if s.blockchain.CurrentBlock().Number.Uint64()%1000 == 0 {
log.Info("L1 sync progress", "blockhain height", s.blockchain.CurrentBlock().Number.Uint64(), "block hash", s.blockchain.CurrentBlock().Hash(), "root", s.blockchain.CurrentBlock().Root)
if s.asyncChecker != nil {
_ = s.asyncChecker.Check(fullBlock)
colinlyguo marked this conversation as resolved.
Show resolved Hide resolved
}

currentBlock = s.blockchain.CurrentBlock()
if override && block.PartialHeader.Number != currentBlock.Number.Uint64() && block.PartialHeader.Number%100 == 0 {
newBlock := s.blockchain.GetHeaderByNumber(block.PartialHeader.Number)
log.Info("L1 sync progress", "processed block ", newBlock.Number.Uint64(), "block hash", newBlock.Hash(), "root", newBlock.Root)
log.Info("L1 sync progress", "blockhain height", currentBlock.Number.Uint64(), "block hash", currentBlock.Hash(), "root", currentBlock.Root)
} else if currentBlock.Number.Uint64()%100 == 0 {
log.Info("L1 sync progress", "blockhain height", currentBlock.Number.Uint64(), "block hash", currentBlock.Hash(), "root", currentBlock.Root)
}

if s.l2EndBlock > 0 && s.l2EndBlock == block.PartialHeader.Number {
newBlock := s.blockchain.GetHeaderByNumber(block.PartialHeader.Number)
log.Warn("L1 sync reached L2EndBlock: you can terminate recovery mode now", "L2EndBlock", newBlock.Number.Uint64(), "block hash", newBlock.Hash(), "root", newBlock.Root)
return serrors.Terminated
}

return nil
Expand Down
1 change: 1 addition & 0 deletions rollup/da_syncer/serrors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const (
var (
TemporaryError = NewTemporaryError(nil)
EOFError = NewEOFError(nil)
Terminated = fmt.Errorf("terminated")
)

type Type uint8
Expand Down
Loading
Loading