Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/jt/permissionless-batches-recove…
Browse files Browse the repository at this point in the history
…ry' into jt/permissionless-batches-recovery-mine
  • Loading branch information
jonastheis committed Nov 18, 2024
2 parents 7056a14 + afdc961 commit d7a8506
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 20 deletions.
25 changes: 16 additions & 9 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2025,15 +2025,15 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
return it.index, err
}

func (bc *BlockChain) BuildAndWriteBlock(parentBlock *types.Block, header *types.Header, txs types.Transactions, sign bool) (WriteStatus, error) {
func (bc *BlockChain) BuildAndWriteBlock(parentBlock *types.Block, header *types.Header, txs types.Transactions, sign bool) (*types.Block, WriteStatus, error) {
if !bc.chainmu.TryLock() {
return NonStatTy, errInsertionInterrupted
return nil, NonStatTy, errInsertionInterrupted
}
defer bc.chainmu.Unlock()

statedb, err := state.New(parentBlock.Root(), bc.stateCache, bc.snaps)
if err != nil {
return NonStatTy, err
return nil, NonStatTy, err
}

statedb.StartPrefetcher("l1sync")
Expand All @@ -2044,7 +2044,7 @@ func (bc *BlockChain) BuildAndWriteBlock(parentBlock *types.Block, header *types
tempBlock := types.NewBlockWithHeader(header).WithBody(txs, nil)
receipts, logs, gasUsed, err := bc.processor.Process(tempBlock, statedb, bc.vmConfig)
if err != nil {
return NonStatTy, fmt.Errorf("error processing block %d: %w", header.Number.Uint64(), err)
return nil, NonStatTy, fmt.Errorf("error processing block %d: %w", header.Number.Uint64(), err)
}

// TODO: once we have the extra and difficulty we need to verify the signature of the block with Clique
Expand All @@ -2056,7 +2056,7 @@ func (bc *BlockChain) BuildAndWriteBlock(parentBlock *types.Block, header *types

err = bc.engine.Prepare(bc, header)
if err != nil {
return NonStatTy, fmt.Errorf("error preparing block %d: %w", tempBlock.Number().Uint64(), err)
return nil, NonStatTy, fmt.Errorf("error preparing block %d: %w", tempBlock.Number().Uint64(), err)
}

// we want to re-sign the block: set time to original value again.
Expand All @@ -2073,19 +2073,19 @@ func (bc *BlockChain) BuildAndWriteBlock(parentBlock *types.Block, header *types
if sign {
resultCh, stopCh := make(chan *types.Block), make(chan struct{})
if err = bc.engine.Seal(bc, fullBlock, resultCh, stopCh); err != nil {
return NonStatTy, fmt.Errorf("error sealing block %d: %w", fullBlock.Number().Uint64(), err)
return nil, NonStatTy, fmt.Errorf("error sealing block %d: %w", fullBlock.Number().Uint64(), err)
}
// Clique.Seal() will only wait for a second before giving up on us. So make sure there is nothing computational heavy
// or a call that blocks between the call to Seal and the line below. Seal might introduce some delay, so we keep track of
// that artificially added delay and subtract it from overall runtime of commit().
fullBlock = <-resultCh
if fullBlock == nil {
return NonStatTy, fmt.Errorf("sealing block failed %d: block is nil", header.Number.Uint64())
return nil, NonStatTy, fmt.Errorf("sealing block failed %d: block is nil", header.Number.Uint64())
}

// verify the generated block with local consensus engine to make sure everything is as expected
if err = bc.engine.VerifyHeader(bc, fullBlock.Header()); err != nil {
return NonStatTy, fmt.Errorf("error verifying signed block %d: %w", fullBlock.Number().Uint64(), err)
return nil, NonStatTy, fmt.Errorf("error verifying signed block %d: %w", fullBlock.Number().Uint64(), err)
}
}

Expand All @@ -2105,7 +2105,14 @@ func (bc *BlockChain) BuildAndWriteBlock(parentBlock *types.Block, header *types
l.BlockHash = blockHash
}

return bc.writeBlockAndSetHead(fullBlock, receipts, logs, statedb, false)
// Double check: even though we just built the block, make sure it is valid.
if err = bc.validator.ValidateState(fullBlock, statedb, receipts, gasUsed); err != nil {
bc.reportBlock(fullBlock, receipts, err)
return nil, NonStatTy, fmt.Errorf("error validating block %d: %w", fullBlock.Number().Uint64(), err)
}

writeStatus, err := bc.writeBlockAndSetHead(fullBlock, receipts, logs, statedb, false)
return fullBlock, writeStatus, err
}

// insertSideChain is called when an import batch hits upon a pruned ancestor
Expand Down
13 changes: 8 additions & 5 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,15 @@ func New(stack *node.Node, config *ethconfig.Config, l1Client sync_service.EthCl
if config.EnableDASyncing {
// Do not start syncing pipeline if we are producing blocks.
if !config.DA.ProduceBlocks {
eth.syncingPipeline, err = da_syncer.NewSyncingPipeline(context.Background(), eth.blockchain, chainConfig, eth.chainDb, l1Client, stack.Config().L1DeploymentBlock, config.DA)
if err != nil {
return nil, fmt.Errorf("cannot initialize da syncer: %w", err)
}
// Enable CCC if flag is set so that row consumption can be generated.
config.DA.CCCEnable = config.CheckCircuitCapacity
config.DA.CCCNumWorkers = config.CCCMaxWorkers
eth.syncingPipeline, err = da_syncer.NewSyncingPipeline(context.Background(), eth.blockchain, chainConfig, eth.chainDb, l1Client, stack.Config().L1DeploymentBlock, config.DA)
if err != nil {
return nil, fmt.Errorf("cannot initialize da syncer: %w", err)
}
eth.syncingPipeline.Start()

eth.syncingPipeline.Start()
}
}

Expand Down
23 changes: 18 additions & 5 deletions rollup/da_syncer/da_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/scroll-tech/go-ethereum/core"
"github.com/scroll-tech/go-ethereum/log"
"github.com/scroll-tech/go-ethereum/rollup/ccc"
"github.com/scroll-tech/go-ethereum/rollup/da_syncer/da"
"github.com/scroll-tech/go-ethereum/rollup/da_syncer/serrors"
)
Expand All @@ -15,15 +16,22 @@ var (
)

type DASyncer struct {
l2EndBlock uint64
blockchain *core.BlockChain
asyncChecker *ccc.AsyncChecker
l2EndBlock uint64
blockchain *core.BlockChain
}

func NewDASyncer(blockchain *core.BlockChain, l2EndBlock uint64) *DASyncer {
return &DASyncer{
func NewDASyncer(blockchain *core.BlockChain, cccEnable bool, cccNumWorkers int, l2EndBlock uint64) *DASyncer {
s := &DASyncer{
l2EndBlock: l2EndBlock,
blockchain: blockchain,
}

if cccEnable {
s.asyncChecker = ccc.NewAsyncChecker(blockchain, cccNumWorkers, false)
}

return s
}

// SyncOneBlock receives a PartialBlock, makes sure it's the next block in the chain, executes it and inserts it to the blockchain.
Expand All @@ -50,10 +58,15 @@ func (s *DASyncer) SyncOneBlock(block *da.PartialBlock, override bool, sign bool
return fmt.Errorf("failed getting parent block, number: %d", parentBlockNumber)
}

if _, err := s.blockchain.BuildAndWriteBlock(parentBlock, block.PartialHeader.ToHeader(), block.Transactions, sign); err != nil {
fullBlock, _, err := s.blockchain.BuildAndWriteBlock(parentBlock, block.PartialHeader.ToHeader(), block.Transactions, sign)
if err != nil {
return fmt.Errorf("failed building and writing block, number: %d, error: %v", block.PartialHeader.Number, err)
}

if s.asyncChecker != nil {
_ = s.asyncChecker.Check(fullBlock)
}

currentBlock = s.blockchain.CurrentBlock()
if override && block.PartialHeader.Number != currentBlock.Number.Uint64() && block.PartialHeader.Number%100 == 0 {
newBlock := s.blockchain.GetHeaderByNumber(block.PartialHeader.Number)
Expand Down
12 changes: 11 additions & 1 deletion rollup/da_syncer/syncing_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ type Config struct {
BlockNativeAPIEndpoint string // BlockNative blob api endpoint
BeaconNodeAPIEndpoint string // Beacon node api endpoint

CCCEnable bool // enable CCC verification and generation of row consumption
CCCNumWorkers int // number of workers for CCC verification

RecoveryMode bool // Recovery mode is used to override existing blocks with the blocks read from the pipeline and start from a specific L1 block and batch
InitialL1Block uint64 // L1 block in which the InitialBatch was committed (or any earlier L1 block but requires more RPC requests)
InitialBatch uint64 // Batch number from which to start syncing and overriding blocks
Expand Down Expand Up @@ -94,6 +97,13 @@ func NewSyncingPipeline(ctx context.Context, blockchain *core.BlockChain, genesi
}

initialL1Block = config.InitialL1Block
if initialL1Block == 0 {
return nil, errors.New("sync from DA: initial L1 block must be set in recovery mode")
}
if config.InitialBatch == 0 {
return nil, errors.New("sync from DA: initial batch must be set in recovery mode")
}

log.Info("sync from DA: initializing pipeline in recovery mode", "initialL1Block", initialL1Block, "initialBatch", config.InitialBatch)
} else {
initialL1Block = l1DeploymentBlock - 1
Expand All @@ -108,7 +118,7 @@ func NewSyncingPipeline(ctx context.Context, blockchain *core.BlockChain, genesi
daQueue := NewDAQueue(initialL1Block, config.InitialBatch, dataSourceFactory)
batchQueue := NewBatchQueue(daQueue, db)
blockQueue := NewBlockQueue(batchQueue)
daSyncer := NewDASyncer(blockchain, config.L2EndBlock)
daSyncer := NewDASyncer(blockchain, config.CCCEnable, config.CCCNumWorkers, config.L2EndBlock)

ctx, cancel := context.WithCancel(ctx)
return &SyncingPipeline{
Expand Down

0 comments on commit d7a8506

Please sign in to comment.