diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 131c2b0785cc..54d9b92bbefb 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -160,6 +160,11 @@ var ( utils.DABlockNativeAPIEndpointFlag, utils.DABlobScanAPIEndpointFlag, utils.DABeaconNodeAPIEndpointFlag, + utils.DARecoveryModeFlag, + utils.DARecoveryInitialL1BlockFlag, + utils.DARecoveryInitialBatchFlag, + utils.DARecoverySignBlocksFlag, + utils.DARecoveryL2EndBlockFlag, }, utils.NetworkFlags, utils.DatabaseFlags) rpcFlags = []cli.Flag{ diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 379f8d4ee7ac..bf0d82b18a38 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -1053,6 +1053,26 @@ Please note that --` + MetricsHTTPFlag.Name + ` must be set to start the server. Name: "da.blob.beaconnode", Usage: "Beacon node API endpoint", } + DARecoveryModeFlag = &cli.BoolFlag{ + Name: "da.recovery", + Usage: "Enable recovery mode for DA syncing", + } + DARecoveryInitialL1BlockFlag = &cli.Uint64Flag{ + Name: "da.recovery.initiall1block", + Usage: "Initial L1 block to start recovery from", + } + DARecoveryInitialBatchFlag = &cli.Uint64Flag{ + Name: "da.recovery.initialbatch", + Usage: "Initial batch to start recovery from", + } + DARecoverySignBlocksFlag = &cli.BoolFlag{ + Name: "da.recovery.signblocks", + Usage: "Sign blocks during recovery (requires correct Clique signer key and history of blocks with Clique signatures)", + } + DARecoveryL2EndBlockFlag = &cli.Uint64Flag{ + Name: "da.recovery.l2endblock", + Usage: "End L2 block to recover to", + } ) var ( @@ -1816,6 +1836,21 @@ func setDA(ctx *cli.Context, cfg *ethconfig.Config) { if ctx.IsSet(DABeaconNodeAPIEndpointFlag.Name) { cfg.DA.BeaconNodeAPIEndpoint = ctx.String(DABeaconNodeAPIEndpointFlag.Name) } + if ctx.IsSet(DARecoveryModeFlag.Name) { + cfg.DA.RecoveryMode = ctx.Bool(DARecoveryModeFlag.Name) + } + if ctx.IsSet(DARecoveryInitialL1BlockFlag.Name) { + cfg.DA.InitialL1Block = ctx.Uint64(DARecoveryInitialL1BlockFlag.Name) + } + if ctx.IsSet(DARecoveryInitialBatchFlag.Name) { + cfg.DA.InitialBatch = ctx.Uint64(DARecoveryInitialBatchFlag.Name) + } + if ctx.IsSet(DARecoverySignBlocksFlag.Name) { + cfg.DA.SignBlocks = ctx.Bool(DARecoverySignBlocksFlag.Name) + } + if ctx.IsSet(DARecoveryL2EndBlockFlag.Name) { + cfg.DA.L2EndBlock = ctx.Uint64(DARecoveryL2EndBlockFlag.Name) + } } } diff --git a/core/blockchain.go b/core/blockchain.go index db608897e297..6b0697f36a24 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2021,15 +2021,15 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) return it.index, err } -func (bc *BlockChain) BuildAndWriteBlock(parentBlock *types.Block, header *types.Header, txs types.Transactions) (WriteStatus, error) { +func (bc *BlockChain) BuildAndWriteBlock(parentBlock *types.Block, header *types.Header, txs types.Transactions, sign bool) (*types.Block, WriteStatus, error) { if !bc.chainmu.TryLock() { - return NonStatTy, errInsertionInterrupted + return nil, NonStatTy, errInsertionInterrupted } defer bc.chainmu.Unlock() statedb, err := state.New(parentBlock.Root(), bc.stateCache, bc.snaps) if err != nil { - return NonStatTy, err + return nil, NonStatTy, err } statedb.StartPrefetcher("l1sync") @@ -2040,18 +2040,51 @@ func (bc *BlockChain) BuildAndWriteBlock(parentBlock *types.Block, header *types tempBlock := types.NewBlockWithHeader(header).WithBody(txs, nil) receipts, logs, gasUsed, err := bc.processor.Process(tempBlock, statedb, bc.vmConfig) if err != nil { - return NonStatTy, fmt.Errorf("error processing block: %w", err) + return nil, NonStatTy, fmt.Errorf("error processing block %d: %w", header.Number.Uint64(), err) } // TODO: once we have the extra and difficulty we need to verify the signature of the block with Clique // This should be done with https://github.com/scroll-tech/go-ethereum/pull/913. - // finalize and assemble block as fullBlock + if sign { + // remember the time as Clique will override it + originalTime := header.Time + + err = bc.engine.Prepare(bc, header) + if err != nil { + return nil, NonStatTy, fmt.Errorf("error preparing block %d: %w", tempBlock.Number().Uint64(), err) + } + + // we want to re-sign the block: set time to original value again. + header.Time = originalTime + } + + // finalize and assemble block as fullBlock: replicates consensus.FinalizeAndAssemble() header.GasUsed = gasUsed header.Root = statedb.IntermediateRoot(bc.chainConfig.IsEIP158(header.Number)) fullBlock := types.NewBlock(header, txs, nil, receipts, trie.NewStackTrie(nil)) + // Sign the block if requested + if sign { + resultCh, stopCh := make(chan *types.Block), make(chan struct{}) + if err = bc.engine.Seal(bc, fullBlock, resultCh, stopCh); err != nil { + return nil, NonStatTy, fmt.Errorf("error sealing block %d: %w", fullBlock.Number().Uint64(), err) + } + // Clique.Seal() will only wait for a second before giving up on us. So make sure there is nothing computational heavy + // or a call that blocks between the call to Seal and the line below. Seal might introduce some delay, so we keep track of + // that artificially added delay and subtract it from overall runtime of commit(). + fullBlock = <-resultCh + if fullBlock == nil { + return nil, NonStatTy, fmt.Errorf("sealing block failed %d: block is nil", header.Number.Uint64()) + } + + // verify the generated block with local consensus engine to make sure everything is as expected + if err = bc.engine.VerifyHeader(bc, fullBlock.Header()); err != nil { + return nil, NonStatTy, fmt.Errorf("error verifying signed block %d: %w", fullBlock.Number().Uint64(), err) + } + } + blockHash := fullBlock.Hash() // manually replace the block hash in the receipts for i, receipt := range receipts { @@ -2068,7 +2101,14 @@ func (bc *BlockChain) BuildAndWriteBlock(parentBlock *types.Block, header *types l.BlockHash = blockHash } - return bc.writeBlockAndSetHead(fullBlock, receipts, logs, statedb, false) + // Double check: even though we just built the block, make sure it is valid. + if err = bc.validator.ValidateState(fullBlock, statedb, receipts, gasUsed); err != nil { + bc.reportBlock(fullBlock, receipts, err) + return nil, NonStatTy, fmt.Errorf("error validating block %d: %w", fullBlock.Number().Uint64(), err) + } + + writeStatus, err := bc.writeBlockAndSetHead(fullBlock, receipts, logs, statedb, false) + return fullBlock, writeStatus, err } // insertSideChain is called when an import batch hits upon a pruned ancestor diff --git a/eth/backend.go b/eth/backend.go index e5db360f039f..9995ad1c05eb 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -254,6 +254,9 @@ func New(stack *node.Node, config *ethconfig.Config, l1Client sync_service.EthCl // simply let them run simultaneously. If messages are missing in DA syncing, it will be handled by the syncing pipeline // by waiting and retrying. if config.EnableDASyncing { + // Enable CCC if flag is set so that row consumption can be generated. + config.DA.CCCEnable = config.CheckCircuitCapacity + config.DA.CCCNumWorkers = config.CCCMaxWorkers eth.syncingPipeline, err = da_syncer.NewSyncingPipeline(context.Background(), eth.blockchain, chainConfig, eth.chainDb, l1Client, stack.Config().L1DeploymentBlock, config.DA) if err != nil { return nil, fmt.Errorf("cannot initialize da syncer: %w", err) diff --git a/rollup/da_syncer/da_queue.go b/rollup/da_syncer/da_queue.go index 64673a4a646b..13fd05b4ecbc 100644 --- a/rollup/da_syncer/da_queue.go +++ b/rollup/da_syncer/da_queue.go @@ -10,14 +10,17 @@ import ( // DAQueue is a pipeline stage that reads DA entries from a DataSource and provides them to the next stage. type DAQueue struct { - l1height uint64 + initialBatch uint64 + l1height uint64 + dataSourceFactory *DataSourceFactory dataSource DataSource da da.Entries } -func NewDAQueue(l1height uint64, dataSourceFactory *DataSourceFactory) *DAQueue { +func NewDAQueue(l1height uint64, initialBatch uint64, dataSourceFactory *DataSourceFactory) *DAQueue { return &DAQueue{ + initialBatch: initialBatch, l1height: l1height, dataSourceFactory: dataSourceFactory, dataSource: nil, @@ -26,15 +29,23 @@ func NewDAQueue(l1height uint64, dataSourceFactory *DataSourceFactory) *DAQueue } func (dq *DAQueue) NextDA(ctx context.Context) (da.Entry, error) { - for len(dq.da) == 0 { - err := dq.getNextData(ctx) - if err != nil { - return nil, err + for { + for len(dq.da) == 0 { + err := dq.getNextData(ctx) + if err != nil { + return nil, err + } + } + + daEntry := dq.da[0] + dq.da = dq.da[1:] + + if daEntry.BatchIndex() < dq.initialBatch { + continue } + + return daEntry, nil } - daEntry := dq.da[0] - dq.da = dq.da[1:] - return daEntry, nil } func (dq *DAQueue) getNextData(ctx context.Context) error { diff --git a/rollup/da_syncer/da_syncer.go b/rollup/da_syncer/da_syncer.go index c3c223ff22a9..825129292bd7 100644 --- a/rollup/da_syncer/da_syncer.go +++ b/rollup/da_syncer/da_syncer.go @@ -5,7 +5,9 @@ import ( "github.com/scroll-tech/go-ethereum/core" "github.com/scroll-tech/go-ethereum/log" + "github.com/scroll-tech/go-ethereum/rollup/ccc" "github.com/scroll-tech/go-ethereum/rollup/da_syncer/da" + "github.com/scroll-tech/go-ethereum/rollup/da_syncer/serrors" ) var ( @@ -14,21 +16,31 @@ var ( ) type DASyncer struct { - blockchain *core.BlockChain + asyncChecker *ccc.AsyncChecker + l2EndBlock uint64 + blockchain *core.BlockChain } -func NewDASyncer(blockchain *core.BlockChain) *DASyncer { - return &DASyncer{ +func NewDASyncer(blockchain *core.BlockChain, cccEnable bool, cccNumWorkers int, l2EndBlock uint64) *DASyncer { + s := &DASyncer{ + l2EndBlock: l2EndBlock, blockchain: blockchain, } + + if cccEnable { + s.asyncChecker = ccc.NewAsyncChecker(blockchain, cccNumWorkers, false) + } + + return s } // SyncOneBlock receives a PartialBlock, makes sure it's the next block in the chain, executes it and inserts it to the blockchain. -func (s *DASyncer) SyncOneBlock(block *da.PartialBlock) error { +func (s *DASyncer) SyncOneBlock(block *da.PartialBlock, override bool, sign bool) error { currentBlock := s.blockchain.CurrentBlock() // we expect blocks to be consecutive. block.PartialHeader.Number == parentBlock.Number+1. - if block.PartialHeader.Number <= currentBlock.Number.Uint64() { + // if override is true, we allow blocks to be lower than the current block number and replace the blocks. + if !override && block.PartialHeader.Number <= currentBlock.Number.Uint64() { log.Debug("block number is too low", "block number", block.PartialHeader.Number, "parent block number", currentBlock.Number.Uint64()) return ErrBlockTooLow } else if block.PartialHeader.Number > currentBlock.Number.Uint64()+1 { @@ -36,13 +48,38 @@ func (s *DASyncer) SyncOneBlock(block *da.PartialBlock) error { return ErrBlockTooHigh } - parentBlock := s.blockchain.GetBlockByNumber(currentBlock.Number.Uint64()) - if _, err := s.blockchain.BuildAndWriteBlock(parentBlock, block.PartialHeader.ToHeader(), block.Transactions); err != nil { + parentBlockNumber := currentBlock.Number.Uint64() + if override { + parentBlockNumber = block.PartialHeader.Number - 1 + } + + parentBlock := s.blockchain.GetBlockByNumber(parentBlockNumber) + if parentBlock == nil { + return fmt.Errorf("failed getting parent block, number: %d", parentBlockNumber) + } + + fullBlock, _, err := s.blockchain.BuildAndWriteBlock(parentBlock, block.PartialHeader.ToHeader(), block.Transactions, sign) + if err != nil { return fmt.Errorf("failed building and writing block, number: %d, error: %v", block.PartialHeader.Number, err) } - if s.blockchain.CurrentBlock().Number.Uint64()%1000 == 0 { - log.Info("L1 sync progress", "blockhain height", s.blockchain.CurrentBlock().Number.Uint64(), "block hash", s.blockchain.CurrentBlock().Hash(), "root", s.blockchain.CurrentBlock().Root) + if s.asyncChecker != nil { + _ = s.asyncChecker.Check(fullBlock) + } + + currentBlock = s.blockchain.CurrentBlock() + if override && block.PartialHeader.Number != currentBlock.Number.Uint64() && block.PartialHeader.Number%100 == 0 { + newBlock := s.blockchain.GetHeaderByNumber(block.PartialHeader.Number) + log.Info("L1 sync progress", "processed block ", newBlock.Number.Uint64(), "block hash", newBlock.Hash(), "root", newBlock.Root) + log.Info("L1 sync progress", "blockhain height", currentBlock.Number.Uint64(), "block hash", currentBlock.Hash(), "root", currentBlock.Root) + } else if currentBlock.Number.Uint64()%100 == 0 { + log.Info("L1 sync progress", "blockhain height", currentBlock.Number.Uint64(), "block hash", currentBlock.Hash(), "root", currentBlock.Root) + } + + if s.l2EndBlock > 0 && s.l2EndBlock == block.PartialHeader.Number { + newBlock := s.blockchain.GetHeaderByNumber(block.PartialHeader.Number) + log.Warn("L1 sync reached L2EndBlock: you can terminate recovery mode now", "L2EndBlock", newBlock.Number.Uint64(), "block hash", newBlock.Hash(), "root", newBlock.Root) + return serrors.Terminated } return nil diff --git a/rollup/da_syncer/serrors/errors.go b/rollup/da_syncer/serrors/errors.go index aa0426f0771d..6dc373f22936 100644 --- a/rollup/da_syncer/serrors/errors.go +++ b/rollup/da_syncer/serrors/errors.go @@ -12,6 +12,7 @@ const ( var ( TemporaryError = NewTemporaryError(nil) EOFError = NewEOFError(nil) + Terminated = fmt.Errorf("terminated") ) type Type uint8 diff --git a/rollup/da_syncer/syncing_pipeline.go b/rollup/da_syncer/syncing_pipeline.go index 6795f2608e05..0ba2285c3448 100644 --- a/rollup/da_syncer/syncing_pipeline.go +++ b/rollup/da_syncer/syncing_pipeline.go @@ -26,6 +26,15 @@ type Config struct { BlobScanAPIEndpoint string // BlobScan blob api endpoint BlockNativeAPIEndpoint string // BlockNative blob api endpoint BeaconNodeAPIEndpoint string // Beacon node api endpoint + + CCCEnable bool // enable CCC verification and generation of row consumption + CCCNumWorkers int // number of workers for CCC verification + + RecoveryMode bool // Recovery mode is used to override existing blocks with the blocks read from the pipeline and start from a specific L1 block and batch + InitialL1Block uint64 // L1 block in which the InitialBatch was committed (or any earlier L1 block but requires more RPC requests) + InitialBatch uint64 // Batch number from which to start syncing and overriding blocks + SignBlocks bool // Whether to sign the blocks after reading them from the pipeline (requires correct Clique signer key) and history of blocks with Clique signatures + L2EndBlock uint64 // L2 block number to sync until } // SyncingPipeline is a derivation pipeline for syncing data from L1 and DA and transform it into @@ -36,7 +45,7 @@ type SyncingPipeline struct { wg sync.WaitGroup expBackoff *backoff.Exponential - l1DeploymentBlock uint64 + config Config db ethdb.Database blockchain *core.BlockChain @@ -75,28 +84,44 @@ func NewSyncingPipeline(ctx context.Context, blockchain *core.BlockChain, genesi } dataSourceFactory := NewDataSourceFactory(blockchain, genesisConfig, config, l1Client, blobClientList, db) - syncedL1Height := l1DeploymentBlock - 1 - from := rawdb.ReadDASyncedL1BlockNumber(db) - if from != nil { - syncedL1Height = *from + + var initialL1Block uint64 + if config.RecoveryMode { + initialL1Block = config.InitialL1Block + if initialL1Block == 0 { + return nil, errors.New("sync from DA: initial L1 block must be set in recovery mode") + } + if config.InitialBatch == 0 { + return nil, errors.New("sync from DA: initial batch must be set in recovery mode") + } + + log.Info("sync from DA: initializing pipeline in recovery mode", "initialL1Block", initialL1Block, "initialBatch", config.InitialBatch) + } else { + initialL1Block = l1DeploymentBlock - 1 + config.InitialL1Block = initialL1Block + from := rawdb.ReadDASyncedL1BlockNumber(db) + if from != nil { + initialL1Block = *from + } + log.Info("sync from DA: initializing pipeline", "initialL1Block", initialL1Block) } - daQueue := NewDAQueue(syncedL1Height, dataSourceFactory) + daQueue := NewDAQueue(initialL1Block, config.InitialBatch, dataSourceFactory) batchQueue := NewBatchQueue(daQueue, db) blockQueue := NewBlockQueue(batchQueue) - daSyncer := NewDASyncer(blockchain) + daSyncer := NewDASyncer(blockchain, config.CCCEnable, config.CCCNumWorkers, config.L2EndBlock) ctx, cancel := context.WithCancel(ctx) return &SyncingPipeline{ - ctx: ctx, - cancel: cancel, - expBackoff: backoff.NewExponential(100*time.Millisecond, 10*time.Second, 100*time.Millisecond), - wg: sync.WaitGroup{}, - l1DeploymentBlock: l1DeploymentBlock, - db: db, - blockchain: blockchain, - blockQueue: blockQueue, - daSyncer: daSyncer, + ctx: ctx, + cancel: cancel, + expBackoff: backoff.NewExponential(100*time.Millisecond, 10*time.Second, 100*time.Millisecond), + wg: sync.WaitGroup{}, + config: config, + db: db, + blockchain: blockchain, + blockQueue: blockQueue, + daSyncer: daSyncer, }, nil } @@ -105,7 +130,10 @@ func (s *SyncingPipeline) Step() error { if err != nil { return err } - err = s.daSyncer.SyncOneBlock(block) + + // in recovery mode, we override already existing blocks with whatever we read from the pipeline + err = s.daSyncer.SyncOneBlock(block, s.config.RecoveryMode, s.config.SignBlocks) + return err } @@ -205,6 +233,9 @@ func (s *SyncingPipeline) mainLoop() { } else if errors.Is(err, context.Canceled) { log.Info("syncing pipeline stopped due to cancelled context", "err", err) return + } else if errors.Is(err, serrors.Terminated) { + log.Info("syncing pipeline stopped due to terminated state", "err", err) + return } log.Warn("syncing pipeline step failed due to unrecoverable error, stopping pipeline worker", "err", err) @@ -222,7 +253,7 @@ func (s *SyncingPipeline) Stop() { func (s *SyncingPipeline) reset(resetCounter int) { amount := 100 * uint64(resetCounter) - syncedL1Height := s.l1DeploymentBlock - 1 + syncedL1Height := s.config.InitialL1Block from := rawdb.ReadDASyncedL1BlockNumber(s.db) if from != nil && *from+amount > syncedL1Height { syncedL1Height = *from - amount