From e916a8aaf439fc1824ded0d96c0c1eb42f25ffbf Mon Sep 17 00:00:00 2001 From: will-2012 <117156346+will-2012@users.noreply.github.com> Date: Tue, 21 May 2024 17:33:15 +0800 Subject: [PATCH] feat: add proof keeper (#90) Co-authored-by: will@2012 --- cmd/geth/main.go | 2 + cmd/utils/flags.go | 21 +- common/types.go | 33 ++ core/blockchain.go | 27 +- core/blockchain_reader.go | 3 + core/proof_keeper.go | 605 +++++++++++++++++++++++ core/proof_keeper_test.go | 198 ++++++++ core/rawdb/accessors_proof.go | 75 +++ core/rawdb/accessors_proof_test.go | 87 ++++ core/rawdb/ancient_scheme.go | 17 +- core/rawdb/ancient_utils.go | 20 + core/rawdb/schema.go | 8 + core/state/statedb.go | 29 ++ eth/api_backend.go | 8 +- eth/backend.go | 2 + eth/ethconfig/config.go | 2 + ethclient/gethclient/gethclient.go | 26 +- internal/ethapi/api.go | 52 +- internal/ethapi/api_test.go | 4 + internal/ethapi/backend.go | 2 + internal/ethapi/transaction_args_test.go | 4 + trie/secure_trie.go | 22 + trie/triedb/pathdb/database.go | 3 +- trie/triedb/pathdb/disklayer.go | 11 +- trie/triedb/pathdb/journal.go | 4 +- trie/triedb/pathdb/nodebufferlist.go | 102 +++- 26 files changed, 1293 insertions(+), 74 deletions(-) create mode 100644 core/proof_keeper.go create mode 100644 core/proof_keeper_test.go create mode 100644 core/rawdb/accessors_proof.go create mode 100644 core/rawdb/accessors_proof_test.go diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 01944d373b..fa7ec78f12 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -98,6 +98,8 @@ var ( utils.StateHistoryFlag, utils.ProposeBlockIntervalFlag, utils.PathDBNodeBufferTypeFlag, + utils.EnableProofKeeperFlag, + utils.KeepProofBlockSpanFlag, utils.LightServeFlag, // deprecated utils.LightIngressFlag, // deprecated utils.LightEgressFlag, // deprecated diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 4c1b799daf..221be07e7d 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -34,12 +34,11 @@ import ( "strings" "time" + "github.com/ethereum/go-ethereum/core/opcodeCompiler/compiler" pcsclite "github.com/gballet/go-libpcsclite" gopsutil "github.com/shirou/gopsutil/mem" "github.com/urfave/cli/v2" - "github.com/ethereum/go-ethereum/core/opcodeCompiler/compiler" - "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/accounts/keystore" "github.com/ethereum/go-ethereum/common" @@ -329,6 +328,18 @@ var ( Value: pathdb.DefaultProposeBlockInterval, Category: flags.StateCategory, } + EnableProofKeeperFlag = &cli.BoolFlag{ + Name: "pathdb.enableproofkeeper", + Usage: "Enable path db proof keeper to store proposed proof", + Value: false, + Category: flags.StateCategory, + } + KeepProofBlockSpanFlag = &cli.Uint64Flag{ + Name: "pathdb.keepproofblockspan", + Usage: "Block span of keep proof (default = 90,000 blocks)", + Value: params.FullImmutabilityThreshold, + Category: flags.StateCategory, + } StateHistoryFlag = &cli.Uint64Flag{ Name: "history.state", Usage: "Number of recent blocks to retain state history for (default = 90,000 blocks, 0 = entire chain)", @@ -1861,6 +1872,12 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { if ctx.IsSet(ProposeBlockIntervalFlag.Name) { cfg.ProposeBlockInterval = ctx.Uint64(ProposeBlockIntervalFlag.Name) } + if ctx.IsSet(EnableProofKeeperFlag.Name) { + cfg.EnableProofKeeper = ctx.Bool(EnableProofKeeperFlag.Name) + } + if ctx.IsSet(KeepProofBlockSpanFlag.Name) { + cfg.KeepProofBlockSpan = ctx.Uint64(KeepProofBlockSpanFlag.Name) + } if ctx.String(GCModeFlag.Name) == "archive" && cfg.TransactionHistory != 0 { cfg.TransactionHistory = 0 log.Warn("Disabled transaction unindexing for archive node") diff --git a/common/types.go b/common/types.go index aadca87f82..37a21f276b 100644 --- a/common/types.go +++ b/common/types.go @@ -475,3 +475,36 @@ func (d *Decimal) UnmarshalJSON(input []byte) error { return err } } + +// ProofList implements ethdb.KeyValueWriter and collects the proofs as +// hex-strings for delivery to rpc-caller. +type ProofList []string + +// Put implements ethdb.KeyValueWriter put interface. +func (n *ProofList) Put(key []byte, value []byte) error { + *n = append(*n, hexutil.Encode(value)) + return nil +} + +// Delete implements ethdb.KeyValueWriter delete interface. +func (n *ProofList) Delete(key []byte) error { + panic("not supported") +} + +// AccountResult is the result of a GetProof operation. +type AccountResult struct { + Address Address `json:"address"` + AccountProof []string `json:"accountProof"` + Balance *big.Int `json:"balance"` + CodeHash Hash `json:"codeHash"` + Nonce uint64 `json:"nonce"` + StorageHash Hash `json:"storageHash"` + StorageProof []StorageResult `json:"storageProof"` +} + +// StorageResult provides a proof for a key-value pair. +type StorageResult struct { + Key string `json:"key"` + Value *big.Int `json:"value"` + Proof []string `json:"proof"` +} diff --git a/core/blockchain.go b/core/blockchain.go index 6e9a32059d..d62871ab2c 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -158,6 +158,8 @@ type CacheConfig struct { StateScheme string // Scheme used to store ethereum states and merkle tree nodes on top PathNodeBuffer pathdb.NodeBufferType // Type of trienodebuffer to cache trie nodes in disklayer ProposeBlockInterval uint64 // Propose block to L1 block interval. + EnableProofKeeper bool // Whether to enable proof keeper + KeepProofBlockSpan uint64 // Block span of keep proof SnapshotNoBuild bool // Whether the background generation is allowed SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it @@ -165,7 +167,7 @@ type CacheConfig struct { } // triedbConfig derives the configures for trie database. -func (c *CacheConfig) triedbConfig() *trie.Config { +func (c *CacheConfig) triedbConfig(keepFunc pathdb.NotifyKeepFunc) *trie.Config { config := &trie.Config{ Preimages: c.Preimages, NoTries: c.NoTries, @@ -182,6 +184,7 @@ func (c *CacheConfig) triedbConfig() *trie.Config { CleanCacheSize: c.TrieCleanLimit * 1024 * 1024, DirtyCacheSize: c.TrieDirtyLimit * 1024 * 1024, ProposeBlockInterval: c.ProposeBlockInterval, + NotifyKeep: keepFunc, } } return config @@ -232,6 +235,7 @@ type BlockChain struct { flushInterval atomic.Int64 // Time interval (processing time) after which to flush a state triedb *trie.Database // The database handler for maintaining trie nodes. stateCache state.Database // State database to reuse between imports (contains state cache) + proofKeeper *ProofKeeper // Store/Query op-proposal proof to ensure consistent. // txLookupLimit is the maximum number of blocks from head whose tx indices // are reserved: @@ -292,8 +296,16 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis if cacheConfig == nil { cacheConfig = defaultCacheConfig } + opts := &proofKeeperOptions{ + enable: cacheConfig.EnableProofKeeper, + keepProofBlockSpan: cacheConfig.KeepProofBlockSpan, + watchStartKeepCh: make(chan *pathdb.KeepRecord), + notifyFinishKeepCh: make(chan error), + } + proofKeeper := newProofKeeper(opts) // Open trie database with provided config - triedb := trie.NewDatabase(db, cacheConfig.triedbConfig()) + trieConfig := cacheConfig.triedbConfig(proofKeeper.GetNotifyKeepRecordFunc()) + triedb := trie.NewDatabase(db, trieConfig) // Setup the genesis block, commit the provided genesis specification // to database if the genesis block is not present yet, or load the @@ -341,7 +353,12 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis bc.prefetcher = newStatePrefetcher(chainConfig, bc, engine) bc.processor = NewStateProcessor(chainConfig, bc, engine) - var err error + err := proofKeeper.Start(bc, db) + if err != nil { + return nil, err + } + bc.proofKeeper = proofKeeper + bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.insertStopped) if err != nil { return nil, err @@ -1037,6 +1054,9 @@ func (bc *BlockChain) Stop() { if err := bc.triedb.Journal(bc.CurrentBlock().Root); err != nil { log.Info("Failed to journal in-memory trie nodes", "err", err) } + if err := bc.proofKeeper.Stop(); err != nil { + log.Info("Failed to stop proof keeper", "err", err) + } } else { // Ensure the state of a recent block is also stored to disk before exiting. // We're writing three different states to catch different restart scenarios: @@ -1057,7 +1077,6 @@ func (bc *BlockChain) Stop() { for _, offset := range blockOffsets { if number := bc.CurrentBlock().Number.Uint64(); number > offset { recent := bc.GetBlockByNumber(number - offset) - log.Info("Writing cached state to disk", "block", recent.Number(), "hash", recent.Hash(), "root", recent.Root()) if err := triedb.Commit(recent.Root(), true); err != nil { log.Error("Failed to commit recent state trie", "err", err) diff --git a/core/blockchain_reader.go b/core/blockchain_reader.go index e98f462380..716ee2fccc 100644 --- a/core/blockchain_reader.go +++ b/core/blockchain_reader.go @@ -442,3 +442,6 @@ func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscript func (bc *BlockChain) SubscribeBlockProcessingEvent(ch chan<- bool) event.Subscription { return bc.scope.Track(bc.blockProcFeed.Subscribe(ch)) } + +// ProofKeeper returns block chain proof keeper. +func (bc *BlockChain) ProofKeeper() *ProofKeeper { return bc.proofKeeper } diff --git a/core/proof_keeper.go b/core/proof_keeper.go new file mode 100644 index 0000000000..720e808d1c --- /dev/null +++ b/core/proof_keeper.go @@ -0,0 +1,605 @@ +package core + +import ( + "encoding/binary" + "encoding/json" + "fmt" + "math/big" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/params" + trie2 "github.com/ethereum/go-ethereum/trie" + "github.com/ethereum/go-ethereum/trie/triedb/pathdb" +) + +const ( + // l2ToL1MessagePasser pre-deploy address. + l2ToL1MessagePasser = "0x4200000000000000000000000000000000000016" + + // gcProofIntervalSecond is used to control gc loop interval. + gcProofIntervalSecond = 3600 + + // maxKeeperMetaNumber is used to gc keep meta, trigger gc workflow + // when meta number > maxKeeperMetaNumber && meta.block_id < latest_block_id - keep_block_span. + maxKeeperMetaNumber = 100 +) + +var ( + l2ToL1MessagePasserAddr = common.HexToAddress(l2ToL1MessagePasser) + addProofTimer = metrics.NewRegisteredTimer("proofkeeper/addproof/time", nil) + getInnerProofTimer = metrics.NewRegisteredTimer("proofkeeper/getinnerproof/time", nil) + queryProofTimer = metrics.NewRegisteredTimer("proofkeeper/queryproof/time", nil) +) + +// keeperMetaRecord is used to ensure proof continuous in scenarios such as enable/disable keeper, interval changes, reorg, etc. +// which is stored in kv db, indexed by prefix+block-id. +type keeperMetaRecord struct { + BlockID uint64 `json:"blockID"` + ProofID uint64 `json:"proofID"` + KeepInterval uint64 `json:"keepInterval"` +} + +// proofDataRecord is used to store proposed proof data. +// which is stored in ancient db, indexed by proof-id. +type proofDataRecord struct { + ProofID uint64 `json:"proofID"` + BlockID uint64 `json:"blockID"` + StateRoot common.Hash `json:"stateRoot"` + + Address common.Address `json:"address"` + AccountProof []string `json:"accountProof"` + Balance *big.Int `json:"balance"` + CodeHash common.Hash `json:"codeHash"` + Nonce uint64 `json:"nonce"` + StorageHash common.Hash `json:"storageHash"` + StorageProof []common.StorageResult `json:"storageProof"` +} + +// proofKeeperOptions defines proof keeper options. +type proofKeeperOptions struct { + enable bool + keepProofBlockSpan uint64 + gcInterval uint64 + watchStartKeepCh chan *pathdb.KeepRecord + notifyFinishKeepCh chan error +} + +// ProofKeeper is used to store proposed proof and op-proposer can query. +type ProofKeeper struct { + opts *proofKeeperOptions + blockChain *BlockChain + keeperMetaDB ethdb.Database + proofDataDB *rawdb.ResettableFreezer + + queryProofCh chan uint64 + waitQueryProofCh chan *proofDataRecord + stopCh chan struct{} + waitStopCh chan error + latestBlockID uint64 +} + +// newProofKeeper returns a proof keeper instance. +func newProofKeeper(opts *proofKeeperOptions) *ProofKeeper { + if opts.keepProofBlockSpan == 0 { + opts.keepProofBlockSpan = params.FullImmutabilityThreshold + } + if opts.gcInterval == 0 { + opts.gcInterval = gcProofIntervalSecond + } + keeper := &ProofKeeper{ + opts: opts, + queryProofCh: make(chan uint64), + waitQueryProofCh: make(chan *proofDataRecord), + stopCh: make(chan struct{}), + waitStopCh: make(chan error), + } + log.Info("Succeed to init proof keeper", "options", opts) + return keeper +} + +// Start is used to start event loop. +func (keeper *ProofKeeper) Start(blockChain *BlockChain, keeperMetaDB ethdb.Database) error { + if !keeper.opts.enable { + return nil + } + var ( + err error + ancientDir string + ) + + keeper.blockChain = blockChain + keeper.keeperMetaDB = keeperMetaDB + if ancientDir, err = keeper.keeperMetaDB.AncientDatadir(); err != nil { + log.Error("Failed to get ancient data dir", "error", err) + return err + } + if keeper.proofDataDB, err = rawdb.NewProofFreezer(ancientDir, false); err != nil { + log.Error("Failed to new proof ancient freezer", "error", err) + return err + } + + go keeper.eventLoop() + log.Info("Succeed to start proof keeper") + return nil +} + +// Stop is used to sync ancient db and stop the event loop. +func (keeper *ProofKeeper) Stop() error { + if !keeper.opts.enable { + return nil + } + + close(keeper.stopCh) + err := <-keeper.waitStopCh + log.Info("Succeed to stop proof keeper", "error", err) + return err +} + +// GetNotifyKeepRecordFunc returns a keeper callback func which is used by path db node buffer list. +// This is a synchronous operation. +func (keeper *ProofKeeper) GetNotifyKeepRecordFunc() pathdb.NotifyKeepFunc { + return func(keepRecord *pathdb.KeepRecord) { + if keeper == nil { + return + } + if keeper.opts == nil || keeper.opts.watchStartKeepCh == nil || keeper.opts.notifyFinishKeepCh == nil { + return + } + if !keeper.opts.enable { + return + } + if keepRecord.BlockID == 0 || keepRecord.KeepInterval == 0 { + return + } + if keepRecord.BlockID%keepRecord.KeepInterval != 0 { + return + } + + var ( + startTimestamp time.Time + err error + ) + + startTimestamp = time.Now() + defer func() { + addProofTimer.UpdateSince(startTimestamp) + log.Info("Keep a new proof", "record", keepRecord, "elapsed", common.PrettyDuration(time.Since(startTimestamp)), "error", err) + }() + + keeper.opts.watchStartKeepCh <- keepRecord + err = <-keeper.opts.notifyFinishKeepCh + } +} + +// getInnerProof is used to make proof by state db interface. +func (keeper *ProofKeeper) getInnerProof(kRecord *pathdb.KeepRecord) (*proofDataRecord, error) { + var ( + err error + header *types.Header + stateDB *state.StateDB + worldTrie *trie2.StateTrie + accountProof common.ProofList + pRecord *proofDataRecord + ) + + startTimestamp := time.Now() + defer func() { + getInnerProofTimer.UpdateSince(startTimestamp) + // log.Info("Succeed to get proof", "proof_record", pRecord, "error", err, "elapsed", common.PrettyDuration(time.Since(startTimestamp))) + }() + + if header = keeper.blockChain.GetHeaderByNumber(kRecord.BlockID); header == nil { + return nil, fmt.Errorf("block is not found, block_id=%d", kRecord.BlockID) + } + if worldTrie, err = trie2.NewStateTrieByInnerReader( + trie2.StateTrieID(header.Root), + keeper.blockChain.stateCache.TrieDB(), + kRecord.PinnedInnerTrieReader); err != nil { + return nil, err + } + if err = worldTrie.Prove(crypto.Keccak256(l2ToL1MessagePasserAddr.Bytes()), &accountProof); err != nil { + return nil, err + } + if stateDB, err = state.NewStateDBByTrie(worldTrie, keeper.blockChain.stateCache, keeper.blockChain.snaps); err != nil { + return nil, err + } + + pRecord = &proofDataRecord{ + BlockID: kRecord.BlockID, + StateRoot: kRecord.StateRoot, + Address: l2ToL1MessagePasserAddr, + AccountProof: accountProof, + Balance: stateDB.GetBalance(l2ToL1MessagePasserAddr), + CodeHash: stateDB.GetCodeHash(l2ToL1MessagePasserAddr), + Nonce: stateDB.GetNonce(l2ToL1MessagePasserAddr), + StorageHash: stateDB.GetStorageRoot(l2ToL1MessagePasserAddr), + StorageProof: make([]common.StorageResult, 0), + } + err = stateDB.Error() + return pRecord, err +} + +// eventLoop is used to update/query keeper meta and proof data in the event loop, which ensure thread-safe. +func (keeper *ProofKeeper) eventLoop() { + if !keeper.opts.enable { + return + } + var ( + err error + putKeeperMetaRecordOnce bool + ancientInitSequenceID uint64 + ) + + gcProofTicker := time.NewTicker(time.Second * time.Duration(keeper.opts.gcInterval)) + defer gcProofTicker.Stop() + + for { + select { + case keepRecord := <-keeper.opts.watchStartKeepCh: + var ( + hasTruncatedMeta bool + curProofID uint64 + proofRecord *proofDataRecord + ) + + proofRecord, err = keeper.getInnerProof(keepRecord) + if err == nil { + hasTruncatedMeta = keeper.truncateKeeperMetaRecordHeadIfNeeded(keepRecord.BlockID) + metaList := keeper.getKeeperMetaRecordList() + if len(metaList) == 0 { + keeper.proofDataDB.Reset() + curProofID = ancientInitSequenceID + } else { + keeper.truncateProofDataRecordHeadIfNeeded(keepRecord.BlockID) + latestProofData := keeper.getLatestProofDataRecord() + if latestProofData != nil { + curProofID = latestProofData.ProofID + 1 + } else { + curProofID = ancientInitSequenceID + } + } + + if hasTruncatedMeta || !putKeeperMetaRecordOnce { + putKeeperMetaRecordOnce = true + keeper.putKeeperMetaRecord(&keeperMetaRecord{ + BlockID: keepRecord.BlockID, + ProofID: curProofID, + KeepInterval: keepRecord.KeepInterval, + }) + } + proofRecord.ProofID = curProofID + err = keeper.putProofDataRecord(proofRecord) + keeper.latestBlockID = keepRecord.BlockID + } + keeper.opts.notifyFinishKeepCh <- err + + case queryBlockID := <-keeper.queryProofCh: + var resultProofRecord *proofDataRecord + metaList := keeper.getKeeperMetaRecordList() + if len(metaList) != 0 && (queryBlockID+keeper.opts.keepProofBlockSpan > keeper.latestBlockID) { + proofID := uint64(0) + index := len(metaList) - 1 + for index >= 0 { + m := metaList[index] + if queryBlockID >= m.BlockID { + if m.KeepInterval == 0 || queryBlockID%m.KeepInterval != 0 { // check + break + } + + proofID = m.ProofID + (queryBlockID-m.BlockID)/m.KeepInterval + resultProofRecord = keeper.getProofDataRecord(proofID) + break + } + index = index - 1 + } + } + keeper.waitQueryProofCh <- resultProofRecord + + case <-keeper.stopCh: + err = keeper.proofDataDB.Sync() + if err == nil { + err = keeper.proofDataDB.Close() + } + keeper.waitStopCh <- err + return + + case <-gcProofTicker.C: + log.Info("Start to gc proof", "latest_block_id", keeper.latestBlockID, "keep_block_span", keeper.opts.keepProofBlockSpan) + if keeper.latestBlockID > keeper.opts.keepProofBlockSpan { + gcBeforeBlockID := keeper.latestBlockID - keeper.opts.keepProofBlockSpan + var gcBeforeKeepMetaRecord *keeperMetaRecord + var gcBeforeProofDataRecord *proofDataRecord + metaList := keeper.getKeeperMetaRecordList() + proofID := uint64(0) + if len(metaList) != 0 { + index := len(metaList) - 1 + for index >= 0 { + m := metaList[index] + if gcBeforeBlockID >= m.BlockID { + gcBeforeKeepMetaRecord = m + proofID = m.ProofID + (gcBeforeBlockID-m.BlockID)/m.KeepInterval + gcBeforeProofDataRecord = keeper.getProofDataRecord(proofID) + break + } + index = index - 1 + } + } + keeper.gcKeeperMetaRecordIfNeeded(gcBeforeKeepMetaRecord) + keeper.gcProofDataRecordIfNeeded(gcBeforeProofDataRecord) + + } + } + } +} + +// getKeeperMetaRecordList returns keeper meta list. +func (keeper *ProofKeeper) getKeeperMetaRecordList() []*keeperMetaRecord { + var ( + metaList []*keeperMetaRecord + err error + iter ethdb.Iterator + ) + + iter = rawdb.IterateKeeperMeta(keeper.keeperMetaDB) + defer iter.Release() + for iter.Next() { + keyBlockID := binary.BigEndian.Uint64(iter.Key()[1:]) + m := keeperMetaRecord{} + if err = json.Unmarshal(iter.Value(), &m); err != nil { + log.Error("Failed to unmarshal keeper meta record", "key_block_id", keyBlockID, "error", err) + continue + } + if keyBlockID != m.BlockID { // check + log.Error("Failed to check consistency between key and value", "key_block_id", keyBlockID, "value_block_id", m.BlockID) + continue + } + // log.Info("Keep meta", "key_block_id", keyBlockID, "meta_record", m) + metaList = append(metaList, &m) + } + // log.Info("Succeed to get meta list", "list", metaList) + return metaList +} + +// truncateKeeperMetaRecordHeadIfNeeded is used to truncate keeper meta record head, +// which is used in reorg. +func (keeper *ProofKeeper) truncateKeeperMetaRecordHeadIfNeeded(blockID uint64) bool { + var ( + err error + iter ethdb.Iterator + batch ethdb.Batch + hasTruncated bool + ) + + iter = rawdb.IterateKeeperMeta(keeper.keeperMetaDB) + defer iter.Release() + batch = keeper.keeperMetaDB.NewBatch() + for iter.Next() { + m := keeperMetaRecord{} + if err = json.Unmarshal(iter.Value(), &m); err != nil { + continue + } + if m.BlockID >= blockID { + hasTruncated = true + rawdb.DeleteKeeperMeta(batch, m.BlockID) + } + } + err = batch.Write() + if err != nil { + log.Crit("Failed to truncate keeper meta head", "err", err) + } + // log.Info("Succeed to truncate keeper meta", "block_id", blockID, "has_truncated", hasTruncated) + return hasTruncated +} + +// putKeeperMetaRecord puts a new keeper meta record. +func (keeper *ProofKeeper) putKeeperMetaRecord(m *keeperMetaRecord) { + meta, err := json.Marshal(*m) + if err != nil { + log.Crit("Failed to marshal keeper meta record", "err", err) + } + rawdb.PutKeeperMeta(keeper.keeperMetaDB, m.BlockID, meta) + log.Info("Succeed to put keeper meta", "record", m) +} + +// gcKeeperMetaRecordIfNeeded is used to the older keeper meta record. +func (keeper *ProofKeeper) gcKeeperMetaRecordIfNeeded(meta *keeperMetaRecord) { + if !keeper.opts.enable { + return + } + if meta == nil { + return + } + metaList := keeper.getKeeperMetaRecordList() + if len(metaList) < maxKeeperMetaNumber { + return + } + + var ( + err error + iter ethdb.Iterator + batch ethdb.Batch + gcCounter uint64 + ) + + iter = rawdb.IterateKeeperMeta(keeper.keeperMetaDB) + defer iter.Release() + batch = keeper.keeperMetaDB.NewBatch() + for iter.Next() { + m := keeperMetaRecord{} + if err = json.Unmarshal(iter.Value(), &m); err != nil { + continue + } + if m.BlockID < meta.BlockID { + rawdb.DeleteKeeperMeta(batch, m.BlockID) + gcCounter++ + } + } + err = batch.Write() + if err != nil { + log.Crit("Failed to gc keeper meta", "err", err) + } + log.Info("Succeed to gc keeper meta", "gc_before_keep_meta", meta, "gc_counter", gcCounter) +} + +// truncateProofDataRecordHeadIfNeeded is used to truncate proof data record head, +// which is used in reorg. +func (keeper *ProofKeeper) truncateProofDataRecordHeadIfNeeded(blockID uint64) { + latestProofDataRecord := keeper.getLatestProofDataRecord() + if latestProofDataRecord == nil { + log.Info("Skip to truncate proof data due to proof data is empty") + return + } + if blockID > latestProofDataRecord.BlockID { + // log.Info("Skip to truncate proof data due to block id is newer") + return + } + + truncateProofID := uint64(0) + proofID := latestProofDataRecord.ProofID + for proofID > 0 { + proof := keeper.getProofDataRecord(proofID) + if proof == nil { + keeper.proofDataDB.Reset() + return + } + if proof.BlockID < blockID { + truncateProofID = proof.ProofID + break + } + proofID = proofID - 1 + } + rawdb.TruncateProofDataHead(keeper.proofDataDB, truncateProofID) + log.Info("Succeed to truncate proof data", "block_id", blockID, "truncate_proof_id", truncateProofID) +} + +// getLatestProofDataRecord return the latest proof data record. +func (keeper *ProofKeeper) getLatestProofDataRecord() *proofDataRecord { + latestProofData := rawdb.GetLatestProofData(keeper.proofDataDB) + if latestProofData == nil { + log.Info("Skip get latest proof data record due to empty") + return nil + } + var data proofDataRecord + err := json.Unmarshal(latestProofData, &data) + if err != nil { + log.Crit("Failed to unmarshal proof data", "err", err) + } + // log.Info("Succeed to get latest proof data", "record", data) + return &data +} + +// getProofDataRecord returns proof record by proofid. +func (keeper *ProofKeeper) getProofDataRecord(proofID uint64) *proofDataRecord { + latestProofData := rawdb.GetProofData(keeper.proofDataDB, proofID) + if latestProofData == nil { + log.Info("Skip get proof data record due not found", "proof_id", proofID) + return nil + } + var data proofDataRecord + err := json.Unmarshal(latestProofData, &data) + if err != nil { + log.Crit("Failed to unmarshal proof data", "err", err) + } + // log.Info("Succeed to get proof data", "record", data) + return &data +} + +// putProofDataRecord puts a new proof data record. +func (keeper *ProofKeeper) putProofDataRecord(p *proofDataRecord) error { + proof, err := json.Marshal(*p) + if err != nil { + log.Error("Failed to marshal proof data", "error", err) + return err + } + err = rawdb.PutProofData(keeper.proofDataDB, p.ProofID, proof) + // log.Info("Succeed to put proof data", "record", p, "error", err) + return err +} + +// gcProofDataRecordIfNeeded is used to the older proof data record. +func (keeper *ProofKeeper) gcProofDataRecordIfNeeded(data *proofDataRecord) { + if !keeper.opts.enable { + return + } + if data == nil { + return + } + if data.ProofID == 0 { + return + } + + rawdb.TruncateProofDataTail(keeper.proofDataDB, data.ProofID) + log.Info("Succeed to gc proof data", "gc_before_proof_data", data) +} + +// IsProposeProofQuery is used to determine whether it is proposed proof. +func (keeper *ProofKeeper) IsProposeProofQuery(address common.Address, storageKeys []string, blockID uint64) bool { + if !keeper.opts.enable { + return false + } + if l2ToL1MessagePasserAddr.Cmp(address) != 0 { + return false + } + if len(storageKeys) != 0 { + return false + } + // blockID%keepInterval == 0 is not checked because keepInterval may have been adjusted before. + _ = blockID + return true +} + +// QueryProposeProof is used to get proof which is stored in ancient proof. +func (keeper *ProofKeeper) QueryProposeProof(blockID uint64, stateRoot common.Hash) (*common.AccountResult, error) { + var ( + result *common.AccountResult + err error + startTimestamp time.Time + ) + + startTimestamp = time.Now() + defer func() { + queryProofTimer.UpdateSince(startTimestamp) + log.Info("Query propose proof", + "block_id", blockID, + "state_root", stateRoot.String(), + "error", err, "elapsed", common.PrettyDuration(time.Since(startTimestamp))) + }() + + keeper.queryProofCh <- blockID + resultProofRecord := <-keeper.waitQueryProofCh + if resultProofRecord == nil { + // Maybe the keeper was disabled for a certain period of time before. + err = fmt.Errorf("proof is not found, block_id=%d", blockID) + return nil, err + } + if resultProofRecord.BlockID != blockID { + // Maybe expected_block_id proof is not kept due to disabled or some bug + err = fmt.Errorf("proof is not found due to block is mismatch, expected_block_id=%d, actual_block_id=%d", + blockID, resultProofRecord.BlockID) + return nil, err + } + if resultProofRecord.StateRoot.Cmp(stateRoot) != 0 { + // Impossible, unless there is a bug. + err = fmt.Errorf("proof is not found due to state root is mismatch, expected_state_root=%s, actual_state_root=%s", + stateRoot.String(), resultProofRecord.StateRoot.String()) + return nil, err + } + result = &common.AccountResult{ + Address: resultProofRecord.Address, + AccountProof: resultProofRecord.AccountProof, + Balance: resultProofRecord.Balance, + CodeHash: resultProofRecord.CodeHash, + Nonce: resultProofRecord.Nonce, + StorageHash: resultProofRecord.StorageHash, + StorageProof: resultProofRecord.StorageProof, + } + return result, nil +} diff --git a/core/proof_keeper_test.go b/core/proof_keeper_test.go new file mode 100644 index 0000000000..a081f58b1a --- /dev/null +++ b/core/proof_keeper_test.go @@ -0,0 +1,198 @@ +package core + +import ( + "os" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/trie/triedb/pathdb" + "github.com/stretchr/testify/assert" +) + +const ( + testProofKeeperDBDir = "./test_proof_keeper_db" +) + +var ( + mockBlockChain *BlockChain + mockKeeperMetaDB ethdb.Database +) + +func setupTestEnv() { + mockKeeperMetaDB, _ = rawdb.Open(rawdb.OpenOptions{ + Type: "pebble", + Directory: testProofKeeperDBDir, + AncientsDirectory: testProofKeeperDBDir + "/ancient", + Namespace: "test_proof_keeper", + Cache: 10, + Handles: 10, + ReadOnly: false, + }) +} + +func cleanupTestEnv() { + mockKeeperMetaDB.Close() + os.RemoveAll(testProofKeeperDBDir) +} + +func TestProofKeeperStartAndStop(t *testing.T) { + setupTestEnv() + + keeperOpts := &proofKeeperOptions{ + enable: true, + keepProofBlockSpan: 100, + watchStartKeepCh: make(chan *pathdb.KeepRecord), + notifyFinishKeepCh: make(chan error), + } + keeper := newProofKeeper(keeperOpts) + assert.NotNil(t, keeper) + + err := keeper.Start(mockBlockChain, mockKeeperMetaDB) + assert.Nil(t, err) + + err = keeper.Stop() + assert.Nil(t, err) + + cleanupTestEnv() +} + +func TestProofKeeperGC(t *testing.T) { + setupTestEnv() + keeperOpts := &proofKeeperOptions{ + enable: true, + keepProofBlockSpan: 100, + gcInterval: 1, + watchStartKeepCh: make(chan *pathdb.KeepRecord), + notifyFinishKeepCh: make(chan error), + } + keeper := newProofKeeper(keeperOpts) + assert.NotNil(t, keeper) + + err := keeper.Start(mockBlockChain, mockKeeperMetaDB) + assert.Nil(t, err) + + for i := uint64(1); i <= 100; i++ { + keeper.putKeeperMetaRecord(&keeperMetaRecord{ + BlockID: i, + ProofID: i - 1, + KeepInterval: 1, + }) + keeper.putProofDataRecord(&proofDataRecord{ + ProofID: i - 1, + BlockID: i, + StateRoot: common.Hash{}, + Address: common.Address{}, + AccountProof: nil, + Balance: nil, + CodeHash: common.Hash{}, + Nonce: 0, + StorageHash: common.Hash{}, + StorageProof: nil, + }) + } + keeper.latestBlockID = 100 + time.Sleep(2 * time.Second) // wait gc loop + + // no gc, becase keeper.latestBlockID <= keeper.opts.keepProofBlockSpan + metaList := keeper.getKeeperMetaRecordList() + assert.Equal(t, 100, len(metaList)) + + for i := uint64(101); i <= 105; i++ { + keeper.putKeeperMetaRecord(&keeperMetaRecord{ + BlockID: i, + ProofID: i - 1, + KeepInterval: 1, + }) + keeper.putProofDataRecord(&proofDataRecord{ + ProofID: i - 1, + BlockID: i, + StateRoot: common.Hash{}, + Address: common.Address{}, + AccountProof: nil, + Balance: nil, + CodeHash: common.Hash{}, + Nonce: 0, + StorageHash: common.Hash{}, + StorageProof: nil, + }) + } + + keeper.latestBlockID = 105 + time.Sleep(2 * time.Second) // wait gc loop + + // gc keep meta which block_id < 5(latestBlockID - keepProofBlockSpan), and 1/2/3/4 blockid keeper meta is truncated. + metaList = keeper.getKeeperMetaRecordList() + assert.Equal(t, 101, len(metaList)) + + // gc proof data, truncate proof id = 4, and 0/1/2/3 proofid proof data is truncated. + assert.NotNil(t, keeper.getProofDataRecord(4)) + assert.Nil(t, keeper.getProofDataRecord(3)) + + err = keeper.Stop() + assert.Nil(t, err) + + cleanupTestEnv() +} + +func TestProofKeeperQuery(t *testing.T) { + setupTestEnv() + + keeperOpts := &proofKeeperOptions{ + enable: true, + watchStartKeepCh: make(chan *pathdb.KeepRecord), + notifyFinishKeepCh: make(chan error), + } + keeper := newProofKeeper(keeperOpts) + assert.NotNil(t, keeper) + + err := keeper.Start(mockBlockChain, mockKeeperMetaDB) + assert.Nil(t, err) + + for i := uint64(1); i <= 100; i++ { + if i%15 == 0 { + keeper.putKeeperMetaRecord(&keeperMetaRecord{ + BlockID: i, + ProofID: i - 1, + KeepInterval: 1, + }) + } + keeper.putProofDataRecord(&proofDataRecord{ + ProofID: i - 1, + BlockID: i, + StateRoot: common.Hash{}, + Address: common.Address{}, + AccountProof: nil, + Balance: nil, + CodeHash: common.Hash{}, + Nonce: 0, + StorageHash: common.Hash{}, + StorageProof: nil, + }) + + } + + keeper.latestBlockID = 100 + result, err := keeper.QueryProposeProof(45, common.Hash{}) + assert.Nil(t, err) + assert.NotNil(t, result) + result, err = keeper.QueryProposeProof(46, common.Hash{}) + assert.Nil(t, err) + assert.NotNil(t, result) + result, err = keeper.QueryProposeProof(1, common.Hash{}) // should >= 15 + assert.NotNil(t, err) + assert.Nil(t, result) + result, err = keeper.QueryProposeProof(100, common.Hash{}) + assert.Nil(t, err) + assert.NotNil(t, result) + result, err = keeper.QueryProposeProof(101, common.Hash{}) // should <= 100 + assert.NotNil(t, err) + assert.Nil(t, result) + + err = keeper.Stop() + assert.Nil(t, err) + + cleanupTestEnv() +} diff --git a/core/rawdb/accessors_proof.go b/core/rawdb/accessors_proof.go new file mode 100644 index 0000000000..36f2c9d72b --- /dev/null +++ b/core/rawdb/accessors_proof.go @@ -0,0 +1,75 @@ +package rawdb + +import ( + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" +) + +const ( + blockNumberLength = 8 // uint64 is 8 bytes. +) + +// IterateKeeperMeta returns keep meta iterator. +func IterateKeeperMeta(db ethdb.Iteratee) ethdb.Iterator { + return NewKeyLengthIterator(db.NewIterator(proofKeeperMetaPrefix, nil), len(proofKeeperMetaPrefix)+blockNumberLength) +} + +// DeleteKeeperMeta is used to remove the specified keeper meta. +func DeleteKeeperMeta(db ethdb.KeyValueWriter, blockID uint64) { + if err := db.Delete(proofKeeperMetaKey(blockID)); err != nil { + log.Crit("Failed to delete keeper meta", "err", err) + } +} + +// PutKeeperMeta add a new keeper meta. +func PutKeeperMeta(db ethdb.KeyValueWriter, blockID uint64, meta []byte) { + key := proofKeeperMetaKey(blockID) + if err := db.Put(key, meta); err != nil { + log.Crit("Failed to store keeper meta", "err", err) + } +} + +// GetLatestProofData returns the latest head proof data. +func GetLatestProofData(f *ResettableFreezer) []byte { + proofTable := f.freezer.tables[proposeProofTable] + if proofTable == nil { + return nil + } + blob, err := f.Ancient(proposeProofTable, proofTable.items.Load()-1) + if err != nil { + log.Error("Failed to get latest proof data", "latest_proof_id", proofTable.items.Load()-1, "error", err) + return nil + } + return blob +} + +// GetProofData returns the specified proof data. +func GetProofData(f *ResettableFreezer, proofID uint64) []byte { + proofTable := f.freezer.tables[proposeProofTable] + if proofTable == nil { + return nil + } + blob, err := f.Ancient(proposeProofTable, proofID) + if err != nil { + return nil + } + return blob +} + +// TruncateProofDataHead truncates [proofID, end...]. +func TruncateProofDataHead(f *ResettableFreezer, proofID uint64) { + f.freezer.TruncateHead(proofID) +} + +// TruncateProofDataTail truncates [start..., proofID). +func TruncateProofDataTail(f *ResettableFreezer, proofID uint64) { + f.freezer.TruncateTail(proofID) +} + +// PutProofData appends a new proof to ancient proof db, the proofID should be continuous. +func PutProofData(db ethdb.AncientWriter, proofID uint64, proof []byte) error { + _, err := db.ModifyAncients(func(op ethdb.AncientWriteOp) error { + return op.AppendRaw(proposeProofTable, proofID, proof) + }) + return err +} diff --git a/core/rawdb/accessors_proof_test.go b/core/rawdb/accessors_proof_test.go new file mode 100644 index 0000000000..601b9b90bd --- /dev/null +++ b/core/rawdb/accessors_proof_test.go @@ -0,0 +1,87 @@ +package rawdb + +import ( + "os" + "testing" + + "github.com/stretchr/testify/assert" +) + +const ( + testAncientProofDir = "./test_ancient_proof" +) + +var ( + testAncientProofDB *ResettableFreezer + mockData1 = []byte{'a'} + mockData2 = []byte{'1'} +) + +func setupTestEnv() { + testAncientProofDB, _ = NewProofFreezer(testAncientProofDir, false) +} + +func cleanupTestEnv() { + testAncientProofDB.Close() + os.RemoveAll(testAncientProofDir) +} + +func TestProofDataAPI(t *testing.T) { + setupTestEnv() + var proofData []byte + + // case1: empty db + proofData = GetLatestProofData(testAncientProofDB) + assert.Nil(t, proofData) + + // case2: mismatch sequence put failed + mismatchProofID := uint64(2) // should=0 + err := PutProofData(testAncientProofDB, mismatchProofID, mockData1) + assert.NotNil(t, err) + proofData = GetLatestProofData(testAncientProofDB) + assert.Nil(t, proofData) + + // case3: put/get succeed + matchProofID := uint64(0) + err = PutProofData(testAncientProofDB, matchProofID, mockData1) + assert.Nil(t, err) + err = PutProofData(testAncientProofDB, matchProofID+1, mockData2) + assert.Nil(t, err) + proofData = GetLatestProofData(testAncientProofDB) + assert.Equal(t, proofData, mockData2) + proofData = GetProofData(testAncientProofDB, 0) + assert.Equal(t, proofData, mockData1) + proofData = GetProofData(testAncientProofDB, 1) + assert.Equal(t, proofData, mockData2) + + // case4: truncate head + TruncateProofDataHead(testAncientProofDB, 1) + proofData = GetProofData(testAncientProofDB, 1) + assert.Nil(t, proofData) + proofData = GetProofData(testAncientProofDB, 0) + assert.Equal(t, proofData, mockData1) + + // case5: restart + testAncientProofDB.Close() + setupTestEnv() + proofData = GetProofData(testAncientProofDB, 0) + assert.Equal(t, proofData, mockData1) + proofData = GetLatestProofData(testAncientProofDB) + assert.Equal(t, proofData, mockData1) + + // case6: truncate tail + PutProofData(testAncientProofDB, matchProofID+1, mockData2) + proofData = GetProofData(testAncientProofDB, matchProofID) + assert.Equal(t, proofData, mockData1) + PutProofData(testAncientProofDB, matchProofID+2, mockData2) + TruncateProofDataTail(testAncientProofDB, matchProofID+1) + proofData = GetProofData(testAncientProofDB, matchProofID) + assert.Nil(t, proofData) + proofData = GetProofData(testAncientProofDB, matchProofID+1) + assert.Equal(t, proofData, mockData2) + TruncateProofDataTail(testAncientProofDB, matchProofID+2) + proofData = GetProofData(testAncientProofDB, matchProofID+1) + assert.Nil(t, proofData) + + cleanupTestEnv() +} diff --git a/core/rawdb/ancient_scheme.go b/core/rawdb/ancient_scheme.go index e88867af0e..d6b3f1b76d 100644 --- a/core/rawdb/ancient_scheme.go +++ b/core/rawdb/ancient_scheme.go @@ -66,16 +66,31 @@ var stateFreezerNoSnappy = map[string]bool{ stateHistoryStorageData: false, } +const ( + proposeProofTable = "propose.proof" +) + +var proofFreezerNoSnappy = map[string]bool{ + proposeProofTable: true, +} + // The list of identifiers of ancient stores. var ( ChainFreezerName = "chain" // the folder name of chain segment ancient store. StateFreezerName = "state" // the folder name of reverse diff ancient store. + ProofFreezerName = "proof" // the folder name of propose withdraw proof store. + ) // freezers the collections of all builtin freezers. -var freezers = []string{ChainFreezerName, StateFreezerName} +var freezers = []string{ChainFreezerName, StateFreezerName, ProofFreezerName} // NewStateFreezer initializes the freezer for state history. func NewStateFreezer(ancientDir string, readOnly bool) (*ResettableFreezer, error) { return NewResettableFreezer(filepath.Join(ancientDir, StateFreezerName), "eth/db/state", readOnly, stateHistoryTableSize, stateFreezerNoSnappy) } + +// NewProofFreezer initializes the freezer for propose withdraw proof. +func NewProofFreezer(ancientDir string, readOnly bool) (*ResettableFreezer, error) { + return NewResettableFreezer(filepath.Join(ancientDir, ProofFreezerName), "eth/db/proof", readOnly, stateHistoryTableSize, proofFreezerNoSnappy) +} diff --git a/core/rawdb/ancient_utils.go b/core/rawdb/ancient_utils.go index 428cda544b..9b2a4b1cf5 100644 --- a/core/rawdb/ancient_utils.go +++ b/core/rawdb/ancient_utils.go @@ -108,6 +108,26 @@ func inspectFreezers(db ethdb.Database) ([]freezerInfo, error) { } infos = append(infos, info) + case ProofFreezerName: + if ReadStateScheme(db) != PathScheme { + continue + } + datadir, err := db.AncientDatadir() + if err != nil { + return nil, err + } + f, err := NewProofFreezer(datadir, true) + if err != nil { + return nil, err + } + defer f.Close() + + info, err := inspect(ProofFreezerName, proofFreezerNoSnappy, f) + if err != nil { + return nil, err + } + infos = append(infos, info) + default: return nil, fmt.Errorf("unknown freezer, supported ones: %v", freezers) } diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index be03723553..4fd1d3fcc9 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -115,6 +115,9 @@ var ( trieNodeStoragePrefix = []byte("O") // trieNodeStoragePrefix + accountHash + hexPath -> trie node stateIDPrefix = []byte("L") // stateIDPrefix + state root -> state id + // which is used by proof keeper. + proofKeeperMetaPrefix = []byte("p") // proofKeeperMetaPrefix + num (uint64 big endian) -> proof keeper meta + PreimagePrefix = []byte("secure-key-") // PreimagePrefix + hash -> preimage configPrefix = []byte("ethereum-config-") // config prefix for the db genesisPrefix = []byte("ethereum-genesis-") // genesis state prefix for the db @@ -165,6 +168,11 @@ func headerKey(number uint64, hash common.Hash) []byte { return append(append(headerPrefix, encodeBlockNumber(number)...), hash.Bytes()...) } +// proofKeeperMetaKey = proofKeeperMetaPrefix + num (uint64 big endian) +func proofKeeperMetaKey(number uint64) []byte { + return append(proofKeeperMetaPrefix, encodeBlockNumber(number)...) +} + // headerTDKey = headerPrefix + num (uint64 big endian) + hash + headerTDSuffix func headerTDKey(number uint64, hash common.Hash) []byte { return append(headerKey(number, hash), headerTDSuffix...) diff --git a/core/state/statedb.go b/core/state/statedb.go index a371d4a5f0..adee9fdd20 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -175,6 +175,35 @@ func New(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error) return sdb, nil } +// NewStateDBByTrie creates a new state db by a given trie. +func NewStateDBByTrie(tr Trie, db Database, snaps *snapshot.Tree) (*StateDB, error) { + sdb := &StateDB{ + db: db, + trie: tr, + originalRoot: tr.Hash(), + snaps: snaps, + accounts: make(map[common.Hash][]byte), + storages: make(map[common.Hash]map[common.Hash][]byte), + accountsOrigin: make(map[common.Address][]byte), + storagesOrigin: make(map[common.Address]map[common.Hash][]byte), + stateObjects: make(map[common.Address]*stateObject), + stateObjectsPending: make(map[common.Address]struct{}), + stateObjectsDirty: make(map[common.Address]struct{}), + stateObjectsDestruct: make(map[common.Address]*types.StateAccount), + logs: make(map[common.Hash][]*types.Log), + preimages: make(map[common.Hash][]byte), + journal: newJournal(), + accessList: newAccessList(), + transientStorage: newTransientStorage(), + hasher: crypto.NewKeccakState(), + } + if sdb.snaps != nil { + sdb.snap = sdb.snaps.Snapshot(tr.Hash()) + } + _, sdb.noTrie = tr.(*trie.EmptyTrie) + return sdb, nil +} + // StartPrefetcher initializes a new trie prefetcher to pull in nodes from the // state trie concurrently while the state is mutated so that when we reach the // commit phase, most of the needed data is already hot. diff --git a/eth/api_backend.go b/eth/api_backend.go index b6280a1847..ee6b2db9b8 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -54,6 +54,10 @@ type EthAPIBackend struct { gpo *gasprice.Oracle } +func (b *EthAPIBackend) ProofKeeper() *core.ProofKeeper { + return b.eth.blockchain.ProofKeeper() +} + // ChainConfig returns the active chain configuration. func (b *EthAPIBackend) ChainConfig() *params.ChainConfig { return b.eth.blockchain.Config() @@ -210,7 +214,7 @@ func (b *EthAPIBackend) StateAndHeaderByNumber(ctx context.Context, number rpc.B } stateDb, err := b.eth.BlockChain().StateAt(header.Root) if err != nil { - return nil, nil, err + return nil, header, err } return stateDb, header, nil } @@ -232,7 +236,7 @@ func (b *EthAPIBackend) StateAndHeaderByNumberOrHash(ctx context.Context, blockN } stateDb, err := b.eth.BlockChain().StateAt(header.Root) if err != nil { - return nil, nil, err + return nil, header, err } return stateDb, header, nil } diff --git a/eth/backend.go b/eth/backend.go index 821aa19cff..2ae27ea7f7 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -215,6 +215,8 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { TrieCommitInterval: config.TrieCommitInterval, PathNodeBuffer: config.PathNodeBuffer, ProposeBlockInterval: config.ProposeBlockInterval, + EnableProofKeeper: config.EnableProofKeeper, + KeepProofBlockSpan: config.KeepProofBlockSpan, } ) // Override the chain config with provided settings. diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index 8ef1ba083a..93d39c9f34 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -132,6 +132,8 @@ type Config struct { StateScheme string `toml:",omitempty"` PathNodeBuffer pathdb.NodeBufferType `toml:",omitempty"` // Type of trienodebuffer to cache trie nodes in disklayer ProposeBlockInterval uint64 `toml:",omitempty"` // Keep the same with op-proposer propose block interval + EnableProofKeeper bool `toml:",omitempty"` // Whether to enable proof keeper + KeepProofBlockSpan uint64 `toml:",omitempty"` // Span block of keep proof // RequiredBlocks is a set of block number -> hash mappings which must be in the // canonical chain of all remote peers. Setting the option makes geth verify the diff --git a/ethclient/gethclient/gethclient.go b/ethclient/gethclient/gethclient.go index e2c0ef3ed0..5cb703ea5b 100644 --- a/ethclient/gethclient/gethclient.go +++ b/ethclient/gethclient/gethclient.go @@ -60,27 +60,9 @@ func (ec *Client) CreateAccessList(ctx context.Context, msg ethereum.CallMsg) (* return result.Accesslist, uint64(result.GasUsed), result.Error, nil } -// AccountResult is the result of a GetProof operation. -type AccountResult struct { - Address common.Address `json:"address"` - AccountProof []string `json:"accountProof"` - Balance *big.Int `json:"balance"` - CodeHash common.Hash `json:"codeHash"` - Nonce uint64 `json:"nonce"` - StorageHash common.Hash `json:"storageHash"` - StorageProof []StorageResult `json:"storageProof"` -} - -// StorageResult provides a proof for a key-value pair. -type StorageResult struct { - Key string `json:"key"` - Value *big.Int `json:"value"` - Proof []string `json:"proof"` -} - // GetProof returns the account and storage values of the specified account including the Merkle-proof. // The block number can be nil, in which case the value is taken from the latest known block. -func (ec *Client) GetProof(ctx context.Context, account common.Address, keys []string, blockNumber *big.Int) (*AccountResult, error) { +func (ec *Client) GetProof(ctx context.Context, account common.Address, keys []string, blockNumber *big.Int) (*common.AccountResult, error) { type storageResult struct { Key string `json:"key"` Value *hexutil.Big `json:"value"` @@ -105,15 +87,15 @@ func (ec *Client) GetProof(ctx context.Context, account common.Address, keys []s var res accountResult err := ec.c.CallContext(ctx, &res, "eth_getProof", account, keys, toBlockNumArg(blockNumber)) // Turn hexutils back to normal datatypes - storageResults := make([]StorageResult, 0, len(res.StorageProof)) + storageResults := make([]common.StorageResult, 0, len(res.StorageProof)) for _, st := range res.StorageProof { - storageResults = append(storageResults, StorageResult{ + storageResults = append(storageResults, common.StorageResult{ Key: st.Key, Value: st.Value.ToInt(), Proof: st.Proof, }) } - result := AccountResult{ + result := common.AccountResult{ Address: res.Address, AccountProof: res.AccountProof, Balance: res.Balance.ToInt(), diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index a790d9e6f9..9dffba0323 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -681,21 +681,8 @@ type StorageResult struct { Proof []string `json:"proof"` } -// proofList implements ethdb.KeyValueWriter and collects the proofs as -// hex-strings for delivery to rpc-caller. -type proofList []string - -func (n *proofList) Put(key []byte, value []byte) error { - *n = append(*n, hexutil.Encode(value)) - return nil -} - -func (n *proofList) Delete(key []byte) error { - panic("not supported") -} - // GetProof returns the Merkle-proof for a given account and optionally some storage keys. -func (s *BlockChainAPI) GetProof(ctx context.Context, address common.Address, storageKeys []string, blockNrOrHash rpc.BlockNumberOrHash) (*AccountResult, error) { +func (s *BlockChainAPI) GetProof(ctx context.Context, address common.Address, storageKeys []string, blockNrOrHash rpc.BlockNumberOrHash) (result *AccountResult, err error) { header, err := headerByNumberOrHash(ctx, s.b, blockNrOrHash) if err != nil { return nil, err @@ -703,7 +690,7 @@ func (s *BlockChainAPI) GetProof(ctx context.Context, address common.Address, st if s.b.ChainConfig().IsOptimismPreBedrock(header.Number) { if s.b.HistoricalRPCService() != nil { var res AccountResult - err := s.b.HistoricalRPCService().CallContext(ctx, &res, "eth_getProof", address, storageKeys, blockNrOrHash) + err = s.b.HistoricalRPCService().CallContext(ctx, &res, "eth_getProof", address, storageKeys, blockNrOrHash) if err != nil { return nil, fmt.Errorf("historical backend error: %w", err) } @@ -712,6 +699,26 @@ func (s *BlockChainAPI) GetProof(ctx context.Context, address common.Address, st return nil, rpc.ErrNoHistoricalFallback } } + + defer func() { + if proofKeeper := s.b.ProofKeeper(); err != nil && proofKeeper != nil && header.Number != nil { + if proofKeeper.IsProposeProofQuery(address, storageKeys, header.Number.Uint64()) { + if innerResult, innerError := proofKeeper.QueryProposeProof(header.Number.Uint64(), header.Root); innerError == nil { + result = &AccountResult{ + Address: innerResult.Address, + AccountProof: innerResult.AccountProof, + Balance: (*hexutil.Big)(innerResult.Balance), + CodeHash: innerResult.CodeHash, + Nonce: hexutil.Uint64(innerResult.Nonce), + StorageHash: innerResult.StorageHash, + StorageProof: make([]StorageResult, 0), + } + err = nil + } + } + } + }() + var ( keys = make([]common.Hash, len(storageKeys)) keyLengths = make([]int, len(storageKeys)) @@ -719,7 +726,6 @@ func (s *BlockChainAPI) GetProof(ctx context.Context, address common.Address, st ) // Deserialize all keys. This prevents state access on invalid input. for i, hexKey := range storageKeys { - var err error keys[i], keyLengths[i], err = decodeHash(hexKey) if err != nil { return nil, err @@ -736,7 +742,8 @@ func (s *BlockChainAPI) GetProof(ctx context.Context, address common.Address, st var storageTrie state.Trie if storageRoot != types.EmptyRootHash && storageRoot != (common.Hash{}) { id := trie.StorageTrieID(header.Root, crypto.Keccak256Hash(address.Bytes()), storageRoot) - st, err := trie.NewStateTrie(id, statedb.Database().TrieDB()) + var st *trie.StateTrie + st, err = trie.NewStateTrie(id, statedb.Database().TrieDB()) if err != nil { return nil, err } @@ -758,8 +765,8 @@ func (s *BlockChainAPI) GetProof(ctx context.Context, address common.Address, st storageProof[i] = StorageResult{outputKey, &hexutil.Big{}, []string{}} continue } - var proof proofList - if err := storageTrie.Prove(crypto.Keccak256(key.Bytes()), &proof); err != nil { + var proof common.ProofList + if err = storageTrie.Prove(crypto.Keccak256(key.Bytes()), &proof); err != nil { return nil, err } value := (*hexutil.Big)(statedb.GetState(address, key).Big()) @@ -771,10 +778,11 @@ func (s *BlockChainAPI) GetProof(ctx context.Context, address common.Address, st if err != nil { return nil, err } - var accountProof proofList - if err := tr.Prove(crypto.Keccak256(address.Bytes()), &accountProof); err != nil { + var accountProof common.ProofList + if err = tr.Prove(crypto.Keccak256(address.Bytes()), &accountProof); err != nil { return nil, err } + err = statedb.Error() return &AccountResult{ Address: address, AccountProof: accountProof, @@ -783,7 +791,7 @@ func (s *BlockChainAPI) GetProof(ctx context.Context, address common.Address, st Nonce: hexutil.Uint64(statedb.GetNonce(address)), StorageHash: storageRoot, StorageProof: storageProof, - }, statedb.Error() + }, err } // decodeHash parses a hex-encoded 32-byte hash. The input may optionally diff --git a/internal/ethapi/api_test.go b/internal/ethapi/api_test.go index 132663e085..505de06205 100644 --- a/internal/ethapi/api_test.go +++ b/internal/ethapi/api_test.go @@ -593,6 +593,10 @@ func newTestBackend(t *testing.T, n int, gspec *core.Genesis, engine consensus.E return backend } +func (b testBackend) ProofKeeper() *core.ProofKeeper { + return nil +} + func (b *testBackend) setPendingBlock(block *types.Block) { b.pending = block } diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index 94c6771c43..7da74f1fc9 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -73,6 +73,8 @@ type Backend interface { SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription + ProofKeeper() *core.ProofKeeper + // Transaction pool API SendTx(ctx context.Context, signedTx *types.Transaction) error GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) diff --git a/internal/ethapi/transaction_args_test.go b/internal/ethapi/transaction_args_test.go index b4b6f732a5..85006a073d 100644 --- a/internal/ethapi/transaction_args_test.go +++ b/internal/ethapi/transaction_args_test.go @@ -294,6 +294,10 @@ func newBackendMock() *backendMock { } } +func (b *backendMock) ProofKeeper() *core.ProofKeeper { + return nil +} + func (b *backendMock) setFork(fork string) error { if fork == "legacy" { b.current.Number = big.NewInt(900) diff --git a/trie/secure_trie.go b/trie/secure_trie.go index 7f0685e306..423205ec4e 100644 --- a/trie/secure_trie.go +++ b/trie/secure_trie.go @@ -72,6 +72,28 @@ func NewStateTrie(id *ID, db *Database) (*StateTrie, error) { return &StateTrie{trie: *trie, preimages: db.preimages}, nil } +// NewStateTrieByInnerReader creates a trie with an existing root node from a backing database and a inner reader. +// Which is used by proof keeper to avoid deadlock in pathdb read/write. +func NewStateTrieByInnerReader(id *ID, db *Database, innerReader Reader) (*StateTrie, error) { + if db == nil || innerReader == nil { + panic("trie.NewStateTrieByInnerReader called without a database or a inner reader") + } + reader := &trieReader{owner: id.Owner, reader: innerReader} + trie := &Trie{ + owner: id.Owner, + reader: reader, + tracer: newTracer(), + } + if id.Root != (common.Hash{}) && id.Root != types.EmptyRootHash { + rootnode, err := trie.resolveAndTrack(id.Root[:], nil) + if err != nil { + return nil, err + } + trie.root = rootnode + } + return &StateTrie{trie: *trie, preimages: db.preimages}, nil +} + // MustGet returns the value for key stored in the trie. // The value bytes must not be modified by the caller. // diff --git a/trie/triedb/pathdb/database.go b/trie/triedb/pathdb/database.go index aba1890d99..962f7a8a64 100644 --- a/trie/triedb/pathdb/database.go +++ b/trie/triedb/pathdb/database.go @@ -100,6 +100,7 @@ type Config struct { DirtyCacheSize int // Maximum memory allowance (in bytes) for caching dirty nodes ReadOnly bool // Flag whether the database is opened in read only mode. ProposeBlockInterval uint64 // Propose block to L1 block interval. + NotifyKeep NotifyKeepFunc // NotifyKeep is used to keep the proof which maybe queried by op-proposer. } // sanitize checks the provided user configurations and changes anything that's @@ -335,7 +336,7 @@ func (db *Database) Enable(root common.Hash) error { } // Re-construct a new disk layer backed by persistent state // with **empty clean cache and node buffer**. - nb := NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nil, 0, db.config.ProposeBlockInterval) + nb := NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nil, 0, db.config.ProposeBlockInterval, db.config.NotifyKeep) dl := newDiskLayer(root, 0, db, nil, nb) nb.setClean(dl.cleans) db.tree.reset(dl) diff --git a/trie/triedb/pathdb/disklayer.go b/trie/triedb/pathdb/disklayer.go index e4b134c886..8d935e9422 100644 --- a/trie/triedb/pathdb/disklayer.go +++ b/trie/triedb/pathdb/disklayer.go @@ -113,11 +113,18 @@ func GetNodeBufferType(name string) NodeBufferType { return nodeBufferStringToType[name] } -func NewTrieNodeBuffer(db ethdb.Database, trieNodeBufferType NodeBufferType, limit int, nodes map[common.Hash]map[string]*trienode.Node, layers, proposeBlockInterval uint64) trienodebuffer { +func NewTrieNodeBuffer( + db ethdb.Database, + trieNodeBufferType NodeBufferType, + limit int, + nodes map[common.Hash]map[string]*trienode.Node, + layers, proposeBlockInterval uint64, + keepFunc NotifyKeepFunc, +) trienodebuffer { log.Info("init trie node buffer", "type", nodeBufferTypeToString[trieNodeBufferType]) switch trieNodeBufferType { case NodeBufferList: - return newNodeBufferList(db, uint64(limit), nodes, layers, proposeBlockInterval) + return newNodeBufferList(db, uint64(limit), nodes, layers, proposeBlockInterval, keepFunc) case AsyncNodeBuffer: return newAsyncNodeBuffer(limit, nodes, layers) case SyncNodeBuffer: diff --git a/trie/triedb/pathdb/journal.go b/trie/triedb/pathdb/journal.go index 98fe99390d..4e23497519 100644 --- a/trie/triedb/pathdb/journal.go +++ b/trie/triedb/pathdb/journal.go @@ -130,7 +130,7 @@ func (db *Database) loadLayers() layer { log.Info("Failed to load journal, discard it", "err", err) } // Return single layer with persistent state. - nb := NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nil, 0, db.config.ProposeBlockInterval) + nb := NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nil, 0, db.config.ProposeBlockInterval, db.config.NotifyKeep) dl := newDiskLayer(root, rawdb.ReadPersistentStateID(db.diskdb), db, nil, nb) nb.setClean(dl.cleans) return dl @@ -173,7 +173,7 @@ func (db *Database) loadDiskLayer(r *rlp.Stream) (layer, error) { nodes[entry.Owner] = subset } // Calculate the internal state transitions by id difference. - nb := NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nodes, id-stored, db.config.ProposeBlockInterval) + nb := NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nodes, id-stored, db.config.ProposeBlockInterval, db.config.NotifyKeep) base := newDiskLayer(root, id, db, nil, nb) nb.setClean(base.cleans) return base, nil diff --git a/trie/triedb/pathdb/nodebufferlist.go b/trie/triedb/pathdb/nodebufferlist.go index af9bd828f9..f0954e826d 100644 --- a/trie/triedb/pathdb/nodebufferlist.go +++ b/trie/triedb/pathdb/nodebufferlist.go @@ -29,6 +29,14 @@ const ( DefaultReserveMultiDifflayerNumber = 3 ) +type KeepRecord struct { + BlockID uint64 + StateRoot common.Hash + KeepInterval uint64 + PinnedInnerTrieReader layer +} +type NotifyKeepFunc func(*KeepRecord) + var _ trienodebuffer = &nodebufferlist{} // nodebufferlist implements the trienodebuffer interface, it is designed to meet @@ -58,9 +66,13 @@ type nodebufferlist struct { baseMux sync.RWMutex // The mutex of base multiDifflayer and persistID. flushMux sync.RWMutex // The mutex of flushing base multiDifflayer for reorg corner case. - isFlushing atomic.Bool // Flag indicates writing disk under background. - stopFlushing atomic.Bool // Flag stops writing disk under background. - stopCh chan struct{} + isFlushing atomic.Bool // Flag indicates writing disk under background. + stopFlushing atomic.Bool // Flag stops writing disk under background. + stopCh chan struct{} // Trigger stop background event loop. + waitStopCh chan struct{} // Wait stop background event loop. + forceKeepCh chan struct{} // Trigger force keep event loop. + waitForceKeepCh chan struct{} // Wait force keep event loop. + keepFunc NotifyKeepFunc // Used to keep op-proposer output proof. } // newNodeBufferList initializes the node buffer list with the provided nodes @@ -69,7 +81,8 @@ func newNodeBufferList( limit uint64, nodes map[common.Hash]map[string]*trienode.Node, layers uint64, - proposeBlockInterval uint64) *nodebufferlist { + proposeBlockInterval uint64, + keepFunc NotifyKeepFunc) *nodebufferlist { var ( rsevMdNum uint64 dlInMd uint64 @@ -99,17 +112,21 @@ func newNodeBufferList( base := newMultiDifflayer(limit, size, common.Hash{}, nodes, layers) ele := newMultiDifflayer(limit, 0, common.Hash{}, make(map[common.Hash]map[string]*trienode.Node), 0) nf := &nodebufferlist{ - db: db, - wpBlocks: wpBlocks, - rsevMdNum: rsevMdNum, - dlInMd: dlInMd, - limit: limit, - base: base, - head: ele, - tail: ele, - count: 1, - persistID: rawdb.ReadPersistentStateID(db), - stopCh: make(chan struct{}), + db: db, + wpBlocks: wpBlocks, + rsevMdNum: rsevMdNum, + dlInMd: dlInMd, + limit: limit, + base: base, + head: ele, + tail: ele, + count: 1, + persistID: rawdb.ReadPersistentStateID(db), + stopCh: make(chan struct{}), + waitStopCh: make(chan struct{}), + forceKeepCh: make(chan struct{}), + waitForceKeepCh: make(chan struct{}), + keepFunc: keepFunc, } go nf.loop() @@ -228,6 +245,9 @@ func (nf *nodebufferlist) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, return nil } + nf.forceKeepCh <- struct{}{} + <-nf.waitForceKeepCh + // hang user read/write and background write nf.mux.Lock() nf.baseMux.Lock() @@ -334,6 +354,7 @@ func (nf *nodebufferlist) getLayers() uint64 { // waitAndStopFlushing will block unit writing the trie nodes of trienodebuffer to disk. func (nf *nodebufferlist) waitAndStopFlushing() { close(nf.stopCh) + <-nf.waitStopCh nf.stopFlushing.Store(true) for nf.isFlushing.Load() { time.Sleep(time.Second) @@ -454,6 +475,17 @@ func (nf *nodebufferlist) diffToBase() { log.Crit("committed block number misaligned", "block", buffer.block) } + if nf.keepFunc != nil { // keep in background flush stage + nf.keepFunc(&KeepRecord{ + BlockID: buffer.block, + StateRoot: buffer.root, + KeepInterval: nf.wpBlocks, + PinnedInnerTrieReader: &proposedBlockReader{ + nf: nf, + diff: buffer, + }}) + } + nf.baseMux.Lock() err := nf.base.commit(buffer.root, buffer.id, buffer.block, buffer.layers, buffer.nodes) nf.baseMux.Unlock() @@ -508,10 +540,48 @@ func (nf *nodebufferlist) backgroundFlush() { // loop runs the background task, collects the nodes for writing to disk. func (nf *nodebufferlist) loop() { mergeTicker := time.NewTicker(time.Second * mergeMultiDifflayerInterval) + defer mergeTicker.Stop() for { select { case <-nf.stopCh: + if nf.keepFunc != nil { // keep in stop stage + nf.mux.RLock() + traverseKeepFunc := func(buffer *multiDifflayer) bool { + nf.keepFunc(&KeepRecord{ + BlockID: buffer.block, + StateRoot: buffer.root, + KeepInterval: nf.wpBlocks, + PinnedInnerTrieReader: &proposedBlockReader{ + nf: nf, + diff: buffer, + }}) + return true + } + nf.traverseReverse(traverseKeepFunc) + nf.mux.RUnlock() + } + nf.waitStopCh <- struct{}{} return + + case <-nf.forceKeepCh: + if nf.keepFunc != nil { // keep in force flush stage + nf.mux.RLock() + traverseKeepFunc := func(buffer *multiDifflayer) bool { + nf.keepFunc(&KeepRecord{ + BlockID: buffer.block, + StateRoot: buffer.root, + KeepInterval: nf.wpBlocks, + PinnedInnerTrieReader: &proposedBlockReader{ + nf: nf, + diff: buffer, + }}) + return true + } + nf.traverseReverse(traverseKeepFunc) + nf.mux.RUnlock() + } + nf.waitForceKeepCh <- struct{}{} + case <-mergeTicker.C: if nf.stopFlushing.Load() { continue @@ -809,7 +879,7 @@ func (mf *multiDifflayer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, commitBytesMeter.Mark(int64(size)) commitNodesMeter.Mark(int64(nodes)) commitTimeTimer.UpdateSince(start) - log.Debug("Persisted pathdb nodes", "nodes", len(mf.nodes), "bytes", common.StorageSize(size), "elapsed", common.PrettyDuration(time.Since(start))) + log.Info("Persisted pathdb nodes", "nodes", len(mf.nodes), "bytes", common.StorageSize(size), "state_id", id, "elapsed", common.PrettyDuration(time.Since(start))) return nil }