diff --git a/cmd/geth/config.go b/cmd/geth/config.go index a9f16531c3..b1744c8040 100644 --- a/cmd/geth/config.go +++ b/cmd/geth/config.go @@ -32,7 +32,6 @@ import ( "github.com/ethereum/go-ethereum/accounts/scwallet" "github.com/ethereum/go-ethereum/accounts/usbwallet" "github.com/ethereum/go-ethereum/cmd/utils" - "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/eth/ethconfig" "github.com/ethereum/go-ethereum/internal/ethapi" "github.com/ethereum/go-ethereum/internal/flags" @@ -158,7 +157,6 @@ func makeConfigNode(ctx *cli.Context) (*node.Node, gethConfig) { cfg.Ethstats.URL = ctx.String(utils.EthStatsURLFlag.Name) } applyMetricConfig(ctx, &cfg) - applyStateExpiryConfig(ctx, &cfg) return stack, cfg } @@ -272,21 +270,6 @@ func applyMetricConfig(ctx *cli.Context, cfg *gethConfig) { } } -func applyStateExpiryConfig(ctx *cli.Context, cfg *gethConfig) { - - if ctx.IsSet(utils.StateExpiryEnableFlag.Name) { - enableStateExpiry := ctx.Bool(utils.StateExpiryEnableFlag.Name) - if enableStateExpiry && ctx.IsSet(utils.StateSchemeFlag.Name) && ctx.String(utils.StateSchemeFlag.Name) == rawdb.HashScheme { - log.Warn("State expiry is not supported with hash scheme. Disabling state expiry") - enableStateExpiry = false - } - cfg.Eth.StateExpiryEnable = enableStateExpiry - } - if ctx.IsSet(utils.StateExpiryFullStateEndpointFlag.Name) { - cfg.Eth.StateExpiryFullStateEndpoint = ctx.String(utils.StateExpiryFullStateEndpointFlag.Name) - } -} - func deprecated(field string) bool { switch field { case "ethconfig.Config.EVMInterpreter": diff --git a/cmd/geth/main.go b/cmd/geth/main.go index abbe56a38e..6332506689 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -219,6 +219,10 @@ var ( stateExpiryFlags = []cli.Flag{ utils.StateExpiryEnableFlag, utils.StateExpiryFullStateEndpointFlag, + utils.StateExpiryStateEpoch1BlockFlag, + utils.StateExpiryStateEpoch2BlockFlag, + utils.StateExpiryStateEpochPeriodFlag, + utils.StateExpiryEnableLocalReviveFlag, } ) diff --git a/cmd/geth/snapshot.go b/cmd/geth/snapshot.go index 86b12b3533..78d1e6a9e1 100644 --- a/cmd/geth/snapshot.go +++ b/cmd/geth/snapshot.go @@ -436,13 +436,12 @@ func pruneState(ctx *cli.Context) error { Preimages: cfg.Eth.Preimages, StateHistory: cfg.Eth.StateHistory, StateScheme: cfg.Eth.StateScheme, - EnableStateExpiry: cfg.Eth.StateExpiryEnable, - RemoteEndPoint: cfg.Eth.StateExpiryFullStateEndpoint, + StateExpiryCfg: cfg.Eth.StateExpiryCfg, } prunerconfig := pruner.Config{ Datadir: stack.ResolvePath(""), BloomSize: ctx.Uint64(utils.BloomFilterSizeFlag.Name), - EnableStateExpiry: cfg.Eth.StateExpiryEnable, + EnableStateExpiry: cfg.Eth.StateExpiryCfg.EnableExpiry(), ChainConfig: chainConfig, CacheConfig: cacheConfig, } diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index b35b5644b9..63ea196949 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -23,6 +23,8 @@ import ( "encoding/hex" "errors" "fmt" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/rlp" "math" "math/big" "net" @@ -1125,6 +1127,26 @@ var ( Usage: "set state expiry remote full state rpc endpoint, every expired state will fetch from remote", Category: flags.StateExpiryCategory, } + StateExpiryStateEpoch1BlockFlag = &cli.Uint64Flag{ + Name: "state-expiry.epoch1", + Usage: "set state expiry epoch1 block number", + Category: flags.StateExpiryCategory, + } + StateExpiryStateEpoch2BlockFlag = &cli.Uint64Flag{ + Name: "state-expiry.epoch2", + Usage: "set state expiry epoch2 block number", + Category: flags.StateExpiryCategory, + } + StateExpiryStateEpochPeriodFlag = &cli.Uint64Flag{ + Name: "state-expiry.period", + Usage: "set state expiry epoch period after epoch2", + Category: flags.StateExpiryCategory, + } + StateExpiryEnableLocalReviveFlag = &cli.BoolFlag{ + Name: "state-expiry.localrevive", + Usage: "if enable local revive", + Category: flags.StateExpiryCategory, + } ) func init() { @@ -1940,13 +1962,18 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { cfg.StateHistory = ctx.Uint64(StateHistoryFlag.Name) } // Parse state scheme, abort the process if it's not compatible. - chaindb := tryMakeReadOnlyDatabase(ctx, stack) + chaindb := MakeChainDatabase(ctx, stack, false, false) scheme, err := ParseStateScheme(ctx, chaindb) - chaindb.Close() if err != nil { Fatalf("%v", err) } cfg.StateScheme = scheme + seCfg, err := ParseStateExpiryConfig(ctx, chaindb, scheme) + if err != nil { + Fatalf("%v", err) + } + cfg.StateExpiryCfg = seCfg + chaindb.Close() // Parse transaction history flag, if user is still using legacy config // file with 'TxLookupLimit' configured, copy the value to 'TransactionHistory'. @@ -2521,6 +2548,79 @@ func ParseStateScheme(ctx *cli.Context, disk ethdb.Database) (string, error) { return "", fmt.Errorf("incompatible state scheme, stored: %s, provided: %s", stored, scheme) } +func ParseStateExpiryConfig(ctx *cli.Context, disk ethdb.Database, scheme string) (*types.StateExpiryConfig, error) { + enc := rawdb.ReadStateExpiryCfg(disk) + var stored *types.StateExpiryConfig + if len(enc) > 0 { + var cfg types.StateExpiryConfig + if err := rlp.DecodeBytes(enc, &cfg); err != nil { + return nil, err + } + stored = &cfg + } + newCfg := &types.StateExpiryConfig{StateScheme: scheme} + if ctx.IsSet(StateExpiryEnableFlag.Name) { + newCfg.Enable = ctx.Bool(StateExpiryEnableFlag.Name) + } + if ctx.IsSet(StateExpiryFullStateEndpointFlag.Name) { + newCfg.FullStateEndpoint = ctx.String(StateExpiryFullStateEndpointFlag.Name) + } + + // some config will use stored default + if ctx.IsSet(StateExpiryStateEpoch1BlockFlag.Name) { + newCfg.StateEpoch1Block = ctx.Uint64(StateExpiryStateEpoch1BlockFlag.Name) + } else if stored != nil { + newCfg.StateEpoch1Block = stored.StateEpoch1Block + } + if ctx.IsSet(StateExpiryStateEpoch2BlockFlag.Name) { + newCfg.StateEpoch2Block = ctx.Uint64(StateExpiryStateEpoch2BlockFlag.Name) + } else if stored != nil { + newCfg.StateEpoch2Block = stored.StateEpoch2Block + } + if ctx.IsSet(StateExpiryStateEpochPeriodFlag.Name) { + newCfg.StateEpochPeriod = ctx.Uint64(StateExpiryStateEpochPeriodFlag.Name) + } else if stored != nil { + newCfg.StateEpochPeriod = stored.StateEpochPeriod + } + if ctx.IsSet(StateExpiryEnableLocalReviveFlag.Name) { + newCfg.EnableLocalRevive = ctx.Bool(StateExpiryEnableLocalReviveFlag.Name) + } + + // override prune level + newCfg.PruneLevel = types.StateExpiryPruneLevel1 + switch newCfg.StateScheme { + case rawdb.HashScheme: + // TODO(0xbundler): will stop support HBSS later. + newCfg.PruneLevel = types.StateExpiryPruneLevel0 + case rawdb.PathScheme: + newCfg.PruneLevel = types.StateExpiryPruneLevel1 + default: + return nil, fmt.Errorf("not support the state scheme: %v", newCfg.StateScheme) + } + + if err := newCfg.Validation(); err != nil { + return nil, err + } + if err := stored.CheckCompatible(newCfg); err != nil { + return nil, err + } + + log.Info("Apply State Expiry", "cfg", newCfg) + if !newCfg.Enable { + return newCfg, nil + } + + // save it into db + enc, err := rlp.EncodeToBytes(newCfg) + if err != nil { + return nil, err + } + if err = rawdb.WriteStateExpiryCfg(disk, enc); err != nil { + return nil, err + } + return newCfg, nil +} + // MakeTrieDatabase constructs a trie database based on the configured scheme. func MakeTrieDatabase(ctx *cli.Context, disk ethdb.Database, preimage bool, readOnly bool) *trie.Database { config := &trie.Config{ diff --git a/core/blockchain.go b/core/blockchain.go index 5a13efc7c0..dee98a48a4 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -160,8 +160,7 @@ type CacheConfig struct { SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it // state expiry feature - EnableStateExpiry bool - RemoteEndPoint string + StateExpiryCfg *types.StateExpiryConfig } // TriedbConfig derives the configures for trie database. @@ -170,7 +169,7 @@ func (c *CacheConfig) TriedbConfig() *trie.Config { Cache: c.TrieCleanLimit, Preimages: c.Preimages, NoTries: c.NoTries, - EnableStateExpiry: c.EnableStateExpiry, + EnableStateExpiry: c.StateExpiryCfg.EnableExpiry(), } if c.StateScheme == rawdb.HashScheme { config.HashDB = &hashdb.Config{ @@ -300,8 +299,8 @@ type BlockChain struct { doubleSignMonitor *monitor.DoubleSignMonitor // state expiry feature - enableStateExpiry bool - fullStateDB ethdb.FullStateDB + stateExpiryCfg *types.StateExpiryConfig + fullStateDB ethdb.FullStateDB } // NewBlockChain returns a fully initialised block chain using information @@ -374,10 +373,10 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis bc.processor = NewStateProcessor(chainConfig, bc, engine) var err error - if cacheConfig.EnableStateExpiry { - log.Info("enable state expiry feature", "RemoteEndPoint", cacheConfig.RemoteEndPoint) - bc.enableStateExpiry = true - bc.fullStateDB, err = ethdb.NewFullStateRPCServer(cacheConfig.RemoteEndPoint) + if cacheConfig.StateExpiryCfg.EnableExpiry() { + log.Info("enable state expiry feature", "RemoteEndPoint", cacheConfig.StateExpiryCfg.FullStateEndpoint) + bc.stateExpiryCfg = cacheConfig.StateExpiryCfg + bc.fullStateDB, err = ethdb.NewFullStateRPCServer(cacheConfig.StateExpiryCfg.FullStateEndpoint) if err != nil { return nil, err } @@ -624,7 +623,15 @@ func (bc *BlockChain) cacheBlock(hash common.Hash, block *types.Block) { } func (bc *BlockChain) EnableStateExpiry() bool { - return bc.enableStateExpiry + return bc.stateExpiryCfg.EnableExpiry() +} + +func (bc *BlockChain) EnableStateExpiryLocalRevive() bool { + if bc.EnableStateExpiry() { + return bc.stateExpiryCfg.EnableLocalRevive + } + + return false } func (bc *BlockChain) FullStateDB() ethdb.FullStateDB { @@ -1052,8 +1059,8 @@ func (bc *BlockChain) StateAtWithSharedPool(root, startAtBlockHash common.Hash, if err != nil { return nil, err } - if bc.enableStateExpiry { - stateDB.InitStateExpiryFeature(bc.chainConfig, bc.fullStateDB, startAtBlockHash, height) + if bc.EnableStateExpiry() { + stateDB.InitStateExpiryFeature(bc.stateExpiryCfg, bc.fullStateDB, startAtBlockHash, height) } return stateDB, err } @@ -2059,8 +2066,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) return it.index, err } bc.updateHighestVerifiedHeader(block.Header()) - if bc.enableStateExpiry { - statedb.InitStateExpiryFeature(bc.chainConfig, bc.fullStateDB, parent.Hash(), block.Number()) + if bc.EnableStateExpiry() { + statedb.InitStateExpiryFeature(bc.stateExpiryCfg, bc.fullStateDB, parent.Hash(), block.Number()) } // Enable prefetching to pull in trie node paths while processing transactions @@ -2071,8 +2078,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) // do Prefetch in a separate goroutine to avoid blocking the critical path // 1.do state prefetch for snapshot cache throwaway := statedb.CopyDoPrefetch() - if throwaway != nil && bc.enableStateExpiry { - throwaway.InitStateExpiryFeature(bc.chainConfig, bc.fullStateDB, parent.Hash(), block.Number()) + if throwaway != nil && bc.EnableStateExpiry() { + throwaway.InitStateExpiryFeature(bc.stateExpiryCfg, bc.fullStateDB, parent.Hash(), block.Number()) } go bc.prefetcher.Prefetch(block, throwaway, &bc.vmConfig, interruptCh) @@ -2156,7 +2163,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) stats.usedGas += usedGas dirty, _ := bc.triedb.Size() - stats.report(chain, it.index, dirty, setHead) + stats.report(chain, it.index, dirty, setHead, bc.stateExpiryCfg) if !setHead { // After merge we expect few side chains. Simply count diff --git a/core/blockchain_insert.go b/core/blockchain_insert.go index 3e913cb87b..7a35df0721 100644 --- a/core/blockchain_insert.go +++ b/core/blockchain_insert.go @@ -39,7 +39,7 @@ const statsReportLimit = 8 * time.Second // report prints statistics if some number of blocks have been processed // or more than a few seconds have passed since the last message. -func (st *insertStats) report(chain []*types.Block, index int, dirty common.StorageSize, setHead bool) { +func (st *insertStats) report(chain []*types.Block, index int, dirty common.StorageSize, setHead bool, config *types.StateExpiryConfig) { // Fetch the timings for the batch var ( now = mclock.Now() @@ -60,6 +60,9 @@ func (st *insertStats) report(chain []*types.Block, index int, dirty common.Stor "blocks", st.processed, "txs", txs, "mgas", float64(st.usedGas) / 1000000, "elapsed", common.PrettyDuration(elapsed), "mgasps", float64(st.usedGas) * 1000 / float64(elapsed), } + if config.EnableExpiry() { + context = append(context, []interface{}{"stateEpoch", types.GetStateEpoch(config, end.Number())}...) + } if timestamp := time.Unix(int64(end.Time()), 0); time.Since(timestamp) > time.Minute { context = append(context, []interface{}{"age", common.PrettyAge(timestamp)}...) } diff --git a/core/blockchain_reader.go b/core/blockchain_reader.go index 540a70ad00..2369d8a7da 100644 --- a/core/blockchain_reader.go +++ b/core/blockchain_reader.go @@ -356,8 +356,8 @@ func (bc *BlockChain) StateAt(startAtRoot common.Hash, startAtBlockHash common.H if err != nil { return nil, err } - if bc.enableStateExpiry { - sdb.InitStateExpiryFeature(bc.chainConfig, bc.fullStateDB, startAtBlockHash, expectHeight) + if bc.EnableStateExpiry() { + sdb.InitStateExpiryFeature(bc.stateExpiryCfg, bc.fullStateDB, startAtBlockHash, expectHeight) } return sdb, err } @@ -365,6 +365,8 @@ func (bc *BlockChain) StateAt(startAtRoot common.Hash, startAtBlockHash common.H // Config retrieves the chain's fork configuration. func (bc *BlockChain) Config() *params.ChainConfig { return bc.chainConfig } +func (bc *BlockChain) StateExpiryConfig() *types.StateExpiryConfig { return bc.stateExpiryCfg } + // Engine retrieves the blockchain's consensus engine. func (bc *BlockChain) Engine() consensus.Engine { return bc.engine } diff --git a/core/rawdb/accessors_epoch_meta.go b/core/rawdb/accessors_epoch_meta.go index 39c33ad034..9c72f36dc9 100644 --- a/core/rawdb/accessors_epoch_meta.go +++ b/core/rawdb/accessors_epoch_meta.go @@ -45,6 +45,15 @@ func DeleteEpochMetaPlainState(db ethdb.KeyValueWriter, addr common.Hash, path s return db.Delete(epochMetaPlainStateKey(addr, path)) } +func ReadStateExpiryCfg(db ethdb.Reader) []byte { + val, _ := db.Get(stateExpiryCfgKey) + return val +} + +func WriteStateExpiryCfg(db ethdb.KeyValueWriter, val []byte) error { + return db.Put(stateExpiryCfgKey, val) +} + func epochMetaPlainStateKey(addr common.Hash, path string) []byte { key := make([]byte, len(EpochMetaPlainStatePrefix)+len(addr)+len(path)) copy(key[:], EpochMetaPlainStatePrefix) diff --git a/core/rawdb/database.go b/core/rawdb/database.go index 2ba2dc1b11..9fdb959704 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -608,6 +608,7 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { accountSnaps stat storageSnaps stat snapJournal stat + trieJournal stat preimages stat bloomBits stat cliqueSnaps stat @@ -691,6 +692,8 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { epochMetaMetaSize.Add(size) case bytes.Equal(key, snapshotJournalKey): snapJournal.Add(size) + case bytes.Equal(key, trieJournalKey): + trieJournal.Add(size) case bytes.Equal(key, epochMetaSnapshotJournalKey): epochMetaSnapJournalSize.Add(size) case bytes.HasPrefix(key, EpochMetaPlainStatePrefix) && len(key) >= (len(EpochMetaPlainStatePrefix)+common.HashLength): @@ -702,7 +705,7 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { lastPivotKey, fastTrieProgressKey, snapshotDisabledKey, SnapshotRootKey, snapshotGeneratorKey, snapshotRecoveryKey, txIndexTailKey, fastTxLookupLimitKey, uncleanShutdownKey, badBlockKey, transitionStatusKey, skeletonSyncStatusKey, - persistentStateIDKey, trieJournalKey, snapshotSyncStatusKey, + persistentStateIDKey, snapshotSyncStatusKey, } { if bytes.Equal(key, meta) { metadata.Add(size) @@ -735,6 +738,7 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { {"Key-Value store", "Path trie state lookups", stateLookups.Size(), stateLookups.Count()}, {"Key-Value store", "Path trie account nodes", accountTries.Size(), accountTries.Count()}, {"Key-Value store", "Path trie storage nodes", storageTries.Size(), storageTries.Count()}, + {"Key-Value store", "Path trie snap journal", trieJournal.Size(), trieJournal.Count()}, {"Key-Value store", "Trie preimages", preimages.Size(), preimages.Count()}, {"Key-Value store", "Account snapshot", accountSnaps.Size(), accountSnaps.Count()}, {"Key-Value store", "Storage snapshot", storageSnaps.Size(), storageSnaps.Count()}, diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index 779f5570ba..99498174d4 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -110,6 +110,9 @@ var ( // epochMetaPlainStateMeta save disk layer meta data epochMetaPlainStateMeta = []byte("epochMetaPlainStateMeta") + // stateExpiryCfgKey save state expiry persistence config + stateExpiryCfgKey = []byte("stateExpiryCfgKey") + // Data item prefixes (use single byte to avoid mixing data types, avoid `i`, used for indexes). headerPrefix = []byte("h") // headerPrefix + num (uint64 big endian) + hash -> header headerTDSuffix = []byte("t") // headerPrefix + num (uint64 big endian) + hash + headerTDSuffix -> td diff --git a/core/state/iterator.go b/core/state/iterator.go index 7cb212ff15..24fe4a77d5 100644 --- a/core/state/iterator.go +++ b/core/state/iterator.go @@ -127,8 +127,8 @@ func (it *nodeIterator) step() error { if err != nil { return err } - if it.state.enableStateExpiry { - dataTrie.SetEpoch(it.state.epoch) + if it.state.EnableExpire() { + dataTrie.SetEpoch(it.state.Epoch()) } it.dataIt, err = dataTrie.NodeIterator(nil) if err != nil { diff --git a/core/state/pruner/pruner.go b/core/state/pruner/pruner.go index 5102b8f3d8..1d34025328 100644 --- a/core/state/pruner/pruner.go +++ b/core/state/pruner/pruner.go @@ -29,6 +29,7 @@ import ( "path/filepath" "strings" "sync" + "sync/atomic" "time" "github.com/prometheus/tsdb/fileutil" @@ -694,7 +695,7 @@ func (p *Pruner) ExpiredPrune(height *big.Int, root common.Hash) error { var ( pruneExpiredTrieCh = make(chan *snapshot.ContractItem, 100000) pruneExpiredInDiskCh = make(chan *trie.NodeInfo, 100000) - epoch = types.GetStateEpoch(p.config.ChainConfig, height) + epoch = types.GetStateEpoch(p.config.CacheConfig.StateExpiryCfg, height) rets = make([]error, 3) tasksWG sync.WaitGroup ) @@ -740,8 +741,13 @@ func (p *Pruner) ExpiredPrune(height *big.Int, root common.Hash) error { // here are some issues when just delete it from hash-based storage, because it's shared kv in hbss // but it's ok for pbss. func asyncScanExpiredInTrie(db *trie.Database, stateRoot common.Hash, epoch types.StateEpoch, expireContractCh chan *snapshot.ContractItem, pruneExpiredInDisk chan *trie.NodeInfo) error { + var ( + trieCount atomic.Uint64 + start = time.Now() + logged = time.Now() + ) for item := range expireContractCh { - log.Info("start scan trie expired state", "addr", item.Addr, "root", item.Root) + log.Info("start scan trie expired state", "addrHash", item.Addr, "root", item.Root) tr, err := trie.New(&trie.ID{ StateRoot: stateRoot, Owner: item.Addr, @@ -752,11 +758,16 @@ func asyncScanExpiredInTrie(db *trie.Database, stateRoot common.Hash, epoch type return err } tr.SetEpoch(epoch) - if err = tr.PruneExpired(pruneExpiredInDisk); err != nil { + if err = tr.PruneExpired(pruneExpiredInDisk, &trieCount); err != nil { log.Error("asyncScanExpiredInTrie, PruneExpired err", "id", item, "err", err) return err } + if time.Since(logged) > 8*time.Second { + log.Info("Pruning expired states", "trieNodes", trieCount.Load()) + logged = time.Now() + } } + log.Info("Scan expired states", "trieNodes", trieCount.Load(), "elapsed", common.PrettyDuration(time.Since(start))) close(pruneExpiredInDisk) return nil } @@ -780,32 +791,39 @@ func asyncPruneExpiredStorageInDisk(diskdb ethdb.Database, pruneExpiredInDisk ch addr := info.Addr // delete trie kv trieCount++ - trieSize += common.StorageSize(len(info.Key) + 32) switch scheme { case rawdb.PathScheme: + val := rawdb.ReadTrieNode(diskdb, addr, info.Path, info.Hash, rawdb.PathScheme) + trieSize += common.StorageSize(len(val) + 33 + len(info.Path)) rawdb.DeleteTrieNode(batch, addr, info.Path, info.Hash, rawdb.PathScheme) case rawdb.HashScheme: // hbss has shared kv, so using bloom to filter them out. if !bloom.Contain(info.Hash.Bytes()) { + val := rawdb.ReadTrieNode(diskdb, addr, info.Path, info.Hash, rawdb.HashScheme) + trieSize += common.StorageSize(len(val) + 33) rawdb.DeleteTrieNode(batch, addr, info.Path, info.Hash, rawdb.HashScheme) } } // delete epoch meta if info.IsBranch { epochMetaCount++ - epochMetaSize += common.StorageSize(32 + len(info.Path) + 32) + val := rawdb.ReadEpochMetaPlainState(diskdb, addr, string(info.Path)) + epochMetaSize += common.StorageSize(33 + len(info.Path) + len(val)) rawdb.DeleteEpochMetaPlainState(batch, addr, string(info.Path)) } // replace snapshot kv only epoch if info.IsLeaf { snapCount++ - snapSize += common.StorageSize(32) - if err := snapshot.ShrinkExpiredLeaf(batch, addr, info.Key, info.Epoch, scheme); err != nil { + size, err := snapshot.ShrinkExpiredLeaf(batch, diskdb, addr, info.Key, info.Epoch, scheme) + if err != nil { log.Error("ShrinkExpiredLeaf err", "addr", addr, "key", info.Key, "err", err) } + snapSize += common.StorageSize(size) } if batch.ValueSize() >= ethdb.IdealBatchSize { - batch.Write() + if err := batch.Write(); err != nil { + log.Error("asyncPruneExpiredStorageInDisk, batch write err", "err", err) + } batch.Reset() } if time.Since(logged) > 8*time.Second { @@ -816,7 +834,9 @@ func asyncPruneExpiredStorageInDisk(diskdb ethdb.Database, pruneExpiredInDisk ch } } if batch.ValueSize() > 0 { - batch.Write() + if err := batch.Write(); err != nil { + log.Error("asyncPruneExpiredStorageInDisk, batch write err", "err", err) + } batch.Reset() } log.Info("Pruned expired states", "trieNodes", trieCount, "trieSize", trieSize, diff --git a/core/state/snapshot/snapshot_expire.go b/core/state/snapshot/snapshot_expire.go index a4be6a1f84..29520160b0 100644 --- a/core/state/snapshot/snapshot_expire.go +++ b/core/state/snapshot/snapshot_expire.go @@ -8,17 +8,19 @@ import ( ) // ShrinkExpiredLeaf tool function for snapshot kv prune -func ShrinkExpiredLeaf(db ethdb.KeyValueWriter, accountHash common.Hash, storageHash common.Hash, epoch types.StateEpoch, scheme string) error { +func ShrinkExpiredLeaf(writer ethdb.KeyValueWriter, reader ethdb.KeyValueReader, accountHash common.Hash, storageHash common.Hash, epoch types.StateEpoch, scheme string) (int64, error) { switch scheme { case rawdb.HashScheme: //cannot prune snapshot in hbss, because it will used for trie prune, but it's ok in pbss. case rawdb.PathScheme: + val := rawdb.ReadStorageSnapshot(reader, accountHash, storageHash) valWithEpoch := NewValueWithEpoch(epoch, nil) enc, err := EncodeValueToRLPBytes(valWithEpoch) if err != nil { - return err + return 0, err } - rawdb.WriteStorageSnapshot(db, accountHash, storageHash, enc) + rawdb.WriteStorageSnapshot(writer, accountHash, storageHash, enc) + return int64(65 + len(val)), nil } - return nil + return 0, nil } diff --git a/core/state/snapshot/snapshot_expire_test.go b/core/state/snapshot/snapshot_expire_test.go index 50067c53b3..ac20e9c90f 100644 --- a/core/state/snapshot/snapshot_expire_test.go +++ b/core/state/snapshot/snapshot_expire_test.go @@ -21,7 +21,7 @@ func TestShrinkExpiredLeaf(t *testing.T) { db := memorydb.New() rawdb.WriteStorageSnapshot(db, accountHash, storageHash1, encodeSnapVal(NewRawValue([]byte("val1")))) - err := ShrinkExpiredLeaf(db, accountHash, storageHash1, types.StateEpoch0, rawdb.PathScheme) + _, err := ShrinkExpiredLeaf(db, db, accountHash, storageHash1, types.StateEpoch0, rawdb.PathScheme) assert.NoError(t, err) assert.Equal(t, encodeSnapVal(NewValueWithEpoch(types.StateEpoch0, nil)), rawdb.ReadStorageSnapshot(db, accountHash, storageHash1)) diff --git a/core/state/state_expiry.go b/core/state/state_expiry.go index 38458cca1b..8afce61b01 100644 --- a/core/state/state_expiry.go +++ b/core/state/state_expiry.go @@ -14,13 +14,26 @@ import ( var ( reviveStorageTrieTimer = metrics.NewRegisteredTimer("state/revivetrie/rt", nil) - EnableLocalRevive = false // indicate if using local revive ) +// stateExpiryMeta it contains all state expiry meta for target block +type stateExpiryMeta struct { + enableStateExpiry bool + enableLocalRevive bool + fullStateDB ethdb.FullStateDB + epoch types.StateEpoch + originalRoot common.Hash + originalHash common.Hash +} + +func defaultStateExpiryMeta() *stateExpiryMeta { + return &stateExpiryMeta{enableStateExpiry: false} +} + // fetchExpiredStorageFromRemote request expired state from remote full state node; -func fetchExpiredStorageFromRemote(fullDB ethdb.FullStateDB, stateRoot common.Hash, addr common.Address, root common.Hash, tr Trie, prefixKey []byte, key common.Hash) (map[string][]byte, error) { +func fetchExpiredStorageFromRemote(meta *stateExpiryMeta, addr common.Address, root common.Hash, tr Trie, prefixKey []byte, key common.Hash) (map[string][]byte, error) { log.Debug("fetching expired storage from remoteDB", "addr", addr, "prefix", prefixKey, "key", key) - if EnableLocalRevive { + if meta.enableLocalRevive { // if there need revive expired state, try to revive locally, when the node is not being pruned, just renew the epoch val, err := tr.TryLocalRevive(addr, key.Bytes()) log.Debug("fetchExpiredStorageFromRemote TryLocalRevive", "addr", addr, "key", key, "val", val, "err", err) @@ -40,7 +53,7 @@ func fetchExpiredStorageFromRemote(fullDB ethdb.FullStateDB, stateRoot common.Ha } // cannot revive locally, fetch remote proof - proofs, err := fullDB.GetStorageReviveProof(stateRoot, addr, root, []string{common.Bytes2Hex(prefixKey)}, []string{common.Bytes2Hex(key[:])}) + proofs, err := meta.fullStateDB.GetStorageReviveProof(meta.originalRoot, addr, root, []string{common.Bytes2Hex(prefixKey)}, []string{common.Bytes2Hex(key[:])}) log.Debug("fetchExpiredStorageFromRemote GetStorageReviveProof", "addr", addr, "key", key, "proofs", len(proofs), "err", err) if err != nil { return nil, err @@ -55,13 +68,13 @@ func fetchExpiredStorageFromRemote(fullDB ethdb.FullStateDB, stateRoot common.Ha } // batchFetchExpiredStorageFromRemote request expired state from remote full state node with a list of keys and prefixes. -func batchFetchExpiredFromRemote(fullDB ethdb.FullStateDB, stateRoot common.Hash, addr common.Address, root common.Hash, tr Trie, prefixKeys [][]byte, keys []common.Hash) ([]map[string][]byte, error) { +func batchFetchExpiredFromRemote(expiryMeta *stateExpiryMeta, addr common.Address, root common.Hash, tr Trie, prefixKeys [][]byte, keys []common.Hash) ([]map[string][]byte, error) { ret := make([]map[string][]byte, len(keys)) prefixKeysStr := make([]string, len(prefixKeys)) keysStr := make([]string, len(keys)) - if EnableLocalRevive { + if expiryMeta.enableLocalRevive { var expiredKeys []common.Hash var expiredPrefixKeys [][]byte for i, key := range keys { @@ -101,7 +114,7 @@ func batchFetchExpiredFromRemote(fullDB ethdb.FullStateDB, stateRoot common.Hash } // cannot revive locally, fetch remote proof - proofs, err := fullDB.GetStorageReviveProof(stateRoot, addr, root, prefixKeysStr, keysStr) + proofs, err := expiryMeta.fullStateDB.GetStorageReviveProof(expiryMeta.originalRoot, addr, root, prefixKeysStr, keysStr) log.Debug("fetchExpiredStorageFromRemote GetStorageReviveProof", "addr", addr, "keys", keysStr, "prefixKeys", prefixKeysStr, "proofs", len(proofs), "err", err) if err != nil { return nil, err diff --git a/core/state/state_object.go b/core/state/state_object.go index 168b2497c3..122b93388e 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -174,8 +174,8 @@ func (s *stateObject) getTrie() (Trie, error) { if err != nil { return nil, err } - if s.db.enableStateExpiry { - tr.SetEpoch(s.db.epoch) + if s.db.EnableExpire() { + tr.SetEpoch(s.db.Epoch()) } s.trie = tr } @@ -463,7 +463,7 @@ func (s *stateObject) updateTrie() (Trie, error) { // it must hit in cache value := s.GetState(key) dirtyStorage[key] = common.TrimLeftZeroes(value[:]) - log.Debug("updateTrie access state", "contract", s.address, "key", key, "epoch", s.db.epoch) + log.Debug("updateTrie access state", "contract", s.address, "key", key, "epoch", s.db.Epoch()) } } @@ -483,7 +483,7 @@ func (s *stateObject) updateTrie() (Trie, error) { s.db.setError(fmt.Errorf("state object pendingFutureReviveState err, contract: %v, key: %v, err: %v", s.address, key, err)) continue } - if _, err = fetchExpiredStorageFromRemote(s.db.fullStateDB, s.db.originalRoot, s.address, s.data.Root, tr, enErr.Path, key); err != nil { + if _, err = fetchExpiredStorageFromRemote(s.db.expiryMeta, s.address, s.data.Root, tr, enErr.Path, key); err != nil { s.db.setError(fmt.Errorf("state object pendingFutureReviveState fetchExpiredStorageFromRemote err, contract: %v, key: %v, path: %v, err: %v", s.address, key, enErr.Path, err)) } } @@ -529,7 +529,7 @@ func (s *stateObject) updateTrie() (Trie, error) { var snapshotVal []byte // Encoding []byte cannot fail, ok to ignore the error. if s.db.EnableExpire() { - snapshotVal, _ = snapshot.EncodeValueToRLPBytes(snapshot.NewValueWithEpoch(s.db.epoch, value)) + snapshotVal, _ = snapshot.EncodeValueToRLPBytes(snapshot.NewValueWithEpoch(s.db.Epoch(), value)) } else { snapshotVal, _ = rlp.EncodeToBytes(value) } @@ -815,7 +815,7 @@ func (s *stateObject) accessState(key common.Hash) { return } - if s.db.epoch > s.originStorageEpoch[key] { + if s.db.Epoch() > s.originStorageEpoch[key] { count := s.pendingAccessedState[key] s.pendingAccessedState[key] = count + 1 } @@ -860,7 +860,7 @@ func (s *stateObject) fetchExpiredFromRemote(prefixKey []byte, key common.Hash, prefixKey = enErr.Path } - kvs, err := fetchExpiredStorageFromRemote(s.db.fullStateDB, s.db.originalRoot, s.address, s.data.Root, tr, prefixKey, key) + kvs, err := fetchExpiredStorageFromRemote(s.db.expiryMeta, s.address, s.data.Root, tr, prefixKey, key) if err != nil { return nil, err } @@ -894,12 +894,12 @@ func (s *stateObject) getExpirySnapStorage(key common.Hash) ([]byte, error, erro } s.originStorageEpoch[key] = val.GetEpoch() - if !types.EpochExpired(val.GetEpoch(), s.db.epoch) { + if !types.EpochExpired(val.GetEpoch(), s.db.Epoch()) { return val.GetVal(), nil, nil } // if found value not been pruned, just return, local revive later - if EnableLocalRevive && len(val.GetVal()) > 0 { + if s.db.EnableLocalRevive() && len(val.GetVal()) > 0 { s.futureReviveState(key) log.Debug("getExpirySnapStorage GetVal", "addr", s.address, "key", key, "val", hex.EncodeToString(val.GetVal())) return val.GetVal(), nil, nil diff --git a/core/state/statedb.go b/core/state/statedb.go index bb5b743325..f9b47c9130 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -149,10 +149,7 @@ type StateDB struct { nextRevisionId int // state expiry feature - enableStateExpiry bool // default disable - epoch types.StateEpoch // epoch indicate stateDB start at which block's target epoch - fullStateDB ethdb.FullStateDB // RemoteFullStateNode - originalHash common.Hash + expiryMeta *stateExpiryMeta // Measurements gathered during execution for debugging purposes // MetricsMux should be used in more places, but will affect on performance, so following meteration is not accruate @@ -206,7 +203,7 @@ func New(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error) accessList: newAccessList(), transientStorage: newTransientStorage(), hasher: crypto.NewKeccakState(), - epoch: types.StateEpoch0, + expiryMeta: defaultStateExpiryMeta(), } if sdb.snaps != nil { @@ -248,15 +245,20 @@ func (s *StateDB) TransferPrefetcher(prev *StateDB) { // InitStateExpiryFeature it must set in initial, reset later will cause wrong result // Attention: startAtBlockHash corresponding to stateDB's originalRoot, expectHeight is the epoch indicator. -func (s *StateDB) InitStateExpiryFeature(config *params.ChainConfig, remote ethdb.FullStateDB, startAtBlockHash common.Hash, expectHeight *big.Int) *StateDB { +func (s *StateDB) InitStateExpiryFeature(config *types.StateExpiryConfig, remote ethdb.FullStateDB, startAtBlockHash common.Hash, expectHeight *big.Int) *StateDB { if config == nil || expectHeight == nil || remote == nil { panic("cannot init state expiry stateDB with nil config/height/remote") } - s.enableStateExpiry = true - s.fullStateDB = remote - s.epoch = types.GetStateEpoch(config, expectHeight) - s.originalHash = startAtBlockHash - log.Debug("StateDB enable state expiry feature", "expectHeight", expectHeight, "startAtBlockHash", startAtBlockHash, "epoch", s.epoch) + epoch := types.GetStateEpoch(config, expectHeight) + s.expiryMeta = &stateExpiryMeta{ + enableStateExpiry: config.Enable, + enableLocalRevive: config.EnableLocalRevive, + fullStateDB: remote, + epoch: epoch, + originalRoot: s.originalRoot, + originalHash: startAtBlockHash, + } + log.Debug("StateDB enable state expiry feature", "expectHeight", expectHeight, "startAtBlockHash", startAtBlockHash, "epoch", epoch) return s } @@ -280,7 +282,7 @@ func (s *StateDB) StartPrefetcher(namespace string) { } else { s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, common.Hash{}, namespace) } - s.prefetcher.InitStateExpiryFeature(s.epoch, s.originalHash, s.fullStateDB) + s.prefetcher.InitStateExpiryFeature(s.expiryMeta) } } @@ -394,7 +396,7 @@ func (s *StateDB) AddLog(log *types.Log) { } // GetLogs returns the logs matching the specified transaction hash, and annotates -// them with the given blockNumber and blockHash. +// them with the given blockNumber and originalHash. func (s *StateDB) GetLogs(hash common.Hash, blockNumber uint64, blockHash common.Hash) []*types.Log { logs := s.logs[hash] for _, l := range logs { @@ -995,10 +997,7 @@ func (s *StateDB) copyInternal(doPrefetch bool) *StateDB { hasher: crypto.NewKeccakState(), // state expiry copy - epoch: s.epoch, - enableStateExpiry: s.enableStateExpiry, - fullStateDB: s.fullStateDB, - originalHash: s.originalHash, + expiryMeta: s.expiryMeta, // In order for the block producer to be able to use and make additions // to the snapshot tree, we need to copy that as well. Otherwise, any @@ -1075,7 +1074,7 @@ func (s *StateDB) copyInternal(doPrefetch bool) *StateDB { state.transientStorage = s.transientStorage.Copy() state.prefetcher = s.prefetcher - if !s.enableStateExpiry && s.prefetcher != nil && !doPrefetch { + if !s.EnableExpire() && s.prefetcher != nil && !doPrefetch { // If there's a prefetcher running, make an inactive copy of it that can // only access data but does not actively preload (since the user will not // know that they need to explicitly terminate an active copy). @@ -1387,8 +1386,8 @@ func (s *StateDB) deleteStorage(addr common.Address, addrHash common.Hash, root if err != nil { return false, nil, nil, fmt.Errorf("failed to open storage trie, err: %w", err) } - if s.enableStateExpiry { - tr.SetEpoch(s.epoch) + if s.EnableExpire() { + tr.SetEpoch(s.Epoch()) } it, err := tr.NodeIterator(nil) if err != nil { @@ -1918,7 +1917,15 @@ func (s *StateDB) AddSlotToAccessList(addr common.Address, slot common.Hash) { } func (s *StateDB) EnableExpire() bool { - return s.enableStateExpiry + return s.expiryMeta.enableStateExpiry +} + +func (s *StateDB) Epoch() types.StateEpoch { + return s.expiryMeta.epoch +} + +func (s *StateDB) EnableLocalRevive() bool { + return s.expiryMeta.enableLocalRevive } // AddressInAccessList returns true if the given address is in the access list. diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index f1102e262e..62c48bc45b 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -17,8 +17,6 @@ package state import ( - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/ethdb" "sync" "sync/atomic" @@ -67,10 +65,7 @@ type triePrefetcher struct { prefetchChan chan *prefetchMsg // no need to wait for return // state expiry feature - enableStateExpiry bool // default disable - epoch types.StateEpoch // epoch indicate stateDB start at which block's target epoch - fullStateDB ethdb.FullStateDB // RemoteFullStateNode - blockHash common.Hash + expiryMeta *stateExpiryMeta deliveryMissMeter metrics.Meter accountLoadMeter metrics.Meter @@ -116,6 +111,8 @@ func newTriePrefetcher(db Database, root, rootParent common.Hash, namespace stri accountStaleDupMeter: metrics.GetOrRegisterMeter(prefix+"/accountst/dup", nil), accountStaleSkipMeter: metrics.GetOrRegisterMeter(prefix+"/accountst/skip", nil), accountStaleWasteMeter: metrics.GetOrRegisterMeter(prefix+"/accountst/waste", nil), + + expiryMeta: defaultStateExpiryMeta(), } go p.mainLoop() return p @@ -132,8 +129,8 @@ func (p *triePrefetcher) mainLoop() { fetcher := p.fetchers[id] if fetcher == nil { fetcher = newSubfetcher(p.db, p.root, pMsg.owner, pMsg.root, pMsg.addr) - if p.enableStateExpiry { - fetcher.initStateExpiryFeature(p.epoch, p.blockHash, p.fullStateDB) + if p.expiryMeta.enableStateExpiry { + fetcher.initStateExpiryFeature(p.expiryMeta) } fetcher.start() p.fetchersMutex.Lock() @@ -218,11 +215,8 @@ func (p *triePrefetcher) mainLoop() { } // InitStateExpiryFeature it must call in initial period. -func (p *triePrefetcher) InitStateExpiryFeature(epoch types.StateEpoch, blockHash common.Hash, fullStateDB ethdb.FullStateDB) { - p.enableStateExpiry = true - p.epoch = epoch - p.fullStateDB = fullStateDB - p.blockHash = blockHash +func (p *triePrefetcher) InitStateExpiryFeature(expiryMeta *stateExpiryMeta) { + p.expiryMeta = expiryMeta } // close iterates over all the subfetchers, aborts any that were left spinning @@ -376,10 +370,7 @@ type subfetcher struct { trie Trie // Trie being populated with nodes // state expiry feature - enableStateExpiry bool // default disable - epoch types.StateEpoch // epoch indicate stateDB start at which block's target epoch - fullStateDB ethdb.FullStateDB // RemoteFullStateNode - blockHash common.Hash + expiryMeta *stateExpiryMeta tasks [][]byte // Items queued up for retrieval lock sync.Mutex // Lock protecting the task queue @@ -401,16 +392,17 @@ type subfetcher struct { // particular root hash. func newSubfetcher(db Database, state common.Hash, owner common.Hash, root common.Hash, addr common.Address) *subfetcher { sf := &subfetcher{ - db: db, - state: state, - owner: owner, - root: root, - addr: addr, - wake: make(chan struct{}, 1), - stop: make(chan struct{}), - term: make(chan struct{}), - copy: make(chan chan Trie), - seen: make(map[string]struct{}), + db: db, + state: state, + owner: owner, + root: root, + addr: addr, + wake: make(chan struct{}, 1), + stop: make(chan struct{}), + term: make(chan struct{}), + copy: make(chan chan Trie), + seen: make(map[string]struct{}), + expiryMeta: defaultStateExpiryMeta(), } return sf } @@ -420,11 +412,8 @@ func (sf *subfetcher) start() { } // InitStateExpiryFeature it must call in initial period. -func (sf *subfetcher) initStateExpiryFeature(epoch types.StateEpoch, blockHash common.Hash, fullStateDB ethdb.FullStateDB) { - sf.enableStateExpiry = true - sf.epoch = epoch - sf.fullStateDB = fullStateDB - sf.blockHash = blockHash +func (sf *subfetcher) initStateExpiryFeature(expiryMeta *stateExpiryMeta) { + sf.expiryMeta = expiryMeta } // schedule adds a batch of trie keys to the queue to prefetch. @@ -470,8 +459,8 @@ func (sf *subfetcher) scheduleParallel(keys [][]byte) { keysLeftSize := len(keysLeft) for i := 0; i*parallelTriePrefetchCapacity < keysLeftSize; i++ { child := newSubfetcher(sf.db, sf.state, sf.owner, sf.root, sf.addr) - if sf.enableStateExpiry { - child.initStateExpiryFeature(sf.epoch, sf.blockHash, sf.fullStateDB) + if sf.expiryMeta.enableStateExpiry { + child.initStateExpiryFeature(sf.expiryMeta) } child.start() sf.paraChildren = append(sf.paraChildren, child) @@ -526,8 +515,8 @@ func (sf *subfetcher) loop() { trie, err = sf.db.OpenTrie(sf.root) } else { trie, err = sf.db.OpenStorageTrie(sf.state, sf.addr, sf.root) - if err == nil && sf.enableStateExpiry { - trie.SetEpoch(sf.epoch) + if err == nil && sf.expiryMeta.enableStateExpiry { + trie.SetEpoch(sf.expiryMeta.epoch) } } if err != nil { @@ -547,8 +536,8 @@ func (sf *subfetcher) loop() { } else { // address is useless sf.trie, err = sf.db.OpenStorageTrie(sf.state, sf.addr, sf.root) - if err == nil && sf.enableStateExpiry { - trie.SetEpoch(sf.epoch) + if err == nil && sf.expiryMeta.enableStateExpiry { + trie.SetEpoch(sf.expiryMeta.epoch) } } if err != nil { @@ -587,7 +576,7 @@ func (sf *subfetcher) loop() { } else { _, err := sf.trie.GetStorage(sf.addr, task) // handle expired state - if sf.enableStateExpiry { + if sf.expiryMeta.enableStateExpiry { if exErr, match := err.(*trie2.ExpiredNodeError); match { reviveKeys = append(reviveKeys, common.BytesToHash(task)) revivePaths = append(revivePaths, exErr.Path) @@ -601,7 +590,7 @@ func (sf *subfetcher) loop() { } if len(reviveKeys) != 0 { - _, err = batchFetchExpiredFromRemote(sf.fullStateDB, sf.state, sf.addr, sf.root, sf.trie, revivePaths, reviveKeys) + _, err = batchFetchExpiredFromRemote(sf.expiryMeta, sf.addr, sf.root, sf.trie, revivePaths, reviveKeys) if err != nil { log.Error("subfetcher batchFetchExpiredFromRemote err", "addr", sf.addr, "state", sf.state, "revivePaths", revivePaths, "reviveKeys", reviveKeys, "err", err) } diff --git a/core/types/state_epoch.go b/core/types/state_epoch.go index b03b96ea18..7846b9e051 100644 --- a/core/types/state_epoch.go +++ b/core/types/state_epoch.go @@ -4,7 +4,6 @@ import ( "math/big" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/params" ) const ( @@ -22,7 +21,7 @@ type StateEpoch uint16 // ClaudeBlock indicates start state epoch1. // ElwoodBlock indicates start state epoch2 and start epoch rotate by StateEpochPeriod. // When N>=2 and epochN started, epoch(N-2)'s state will expire. -func GetStateEpoch(config *params.ChainConfig, blockNumber *big.Int) StateEpoch { +func GetStateEpoch(config *StateExpiryConfig, blockNumber *big.Int) StateEpoch { if blockNumber == nil || config == nil { return StateEpoch0 } @@ -30,10 +29,10 @@ func GetStateEpoch(config *params.ChainConfig, blockNumber *big.Int) StateEpoch epoch1Block := epochPeriod epoch2Block := new(big.Int).Add(epoch1Block, epochPeriod) - if config.Parlia != nil && config.Parlia.StateEpochPeriod != 0 { - epochPeriod = new(big.Int).SetUint64(config.Parlia.StateEpochPeriod) - epoch1Block = new(big.Int).SetUint64(config.Parlia.StateEpoch1Block) - epoch2Block = new(big.Int).SetUint64(config.Parlia.StateEpoch2Block) + if config != nil { + epochPeriod = new(big.Int).SetUint64(config.StateEpochPeriod) + epoch1Block = new(big.Int).SetUint64(config.StateEpoch1Block) + epoch2Block = new(big.Int).SetUint64(config.StateEpoch2Block) } if isBlockReached(blockNumber, epoch2Block) { ret := new(big.Int).Sub(blockNumber, epoch2Block) diff --git a/core/types/state_expiry.go b/core/types/state_expiry.go new file mode 100644 index 0000000000..1f5856b9ff --- /dev/null +++ b/core/types/state_expiry.go @@ -0,0 +1,103 @@ +package types + +import ( + "errors" + "fmt" + "github.com/ethereum/go-ethereum/log" + "strings" +) + +const ( + StateExpiryPruneLevel0 = iota // StateExpiryPruneLevel0 is for HBSS, in HBSS we cannot prune any expired snapshot, it need rebuild trie for old tire node prune, it also cannot prune any shared trie node too. + StateExpiryPruneLevel1 // StateExpiryPruneLevel1 is the default level, it left some expired snapshot meta for performance friendly. + StateExpiryPruneLevel2 // StateExpiryPruneLevel2 will prune all expired snapshot kvs and trie nodes, but it will access more times in tire when execution. TODO(0xbundler): will support it later +) + +type StateExpiryConfig struct { + Enable bool + FullStateEndpoint string + StateScheme string + PruneLevel uint8 + StateEpoch1Block uint64 + StateEpoch2Block uint64 + StateEpochPeriod uint64 + EnableLocalRevive bool +} + +func (s *StateExpiryConfig) EnableExpiry() bool { + if s == nil { + return false + } + return s.Enable +} + +func (s *StateExpiryConfig) Validation() error { + if s == nil || !s.Enable { + return nil + } + + s.FullStateEndpoint = strings.TrimSpace(s.FullStateEndpoint) + if s.StateEpoch1Block == 0 || + s.StateEpoch2Block == 0 || + s.StateEpochPeriod == 0 { + return errors.New("StateEpoch1Block or StateEpoch2Block or StateEpochPeriod cannot be 0") + } + + if s.StateEpoch1Block >= s.StateEpoch2Block { + return errors.New("StateEpoch1Block cannot >= StateEpoch2Block") + } + + if s.StateEpochPeriod < DefaultStateEpochPeriod { + log.Warn("The State Expiry state period is too small and may result in frequent expiration affecting performance", + "input", s.StateEpochPeriod, "default", DefaultStateEpochPeriod) + } + + return nil +} + +func (s *StateExpiryConfig) CheckCompatible(newCfg *StateExpiryConfig) error { + if s == nil || newCfg == nil { + return nil + } + + if s.Enable && !newCfg.Enable { + return errors.New("disable state expiry is dangerous after enabled, expired state may pruned") + } + + if err := s.CheckStateEpochCompatible(newCfg.StateEpoch1Block, newCfg.StateEpoch2Block, newCfg.StateEpochPeriod); err != nil { + return err + } + + if s.StateScheme != newCfg.StateScheme { + return errors.New("StateScheme is incompatible") + } + + if s.PruneLevel != newCfg.PruneLevel { + return errors.New("state expiry PruneLevel is incompatible") + } + + return nil +} + +func (s *StateExpiryConfig) CheckStateEpochCompatible(StateEpoch1Block, StateEpoch2Block, StateEpochPeriod uint64) error { + if s == nil { + return nil + } + + if s.StateEpoch1Block != StateEpoch1Block || + s.StateEpoch2Block != StateEpoch2Block || + s.StateEpochPeriod != StateEpochPeriod { + return fmt.Errorf("state Epoch info is incompatible, StateEpoch1Block: [%v|%v], StateEpoch2Block: [%v|%v], StateEpochPeriod: [%v|%v]", + s.StateEpoch1Block, StateEpoch1Block, s.StateEpoch2Block, StateEpoch2Block, s.StateEpochPeriod, StateEpochPeriod) + } + + return nil +} + +func (s *StateExpiryConfig) String() string { + if !s.Enable { + return "State Expiry Disable" + } + return fmt.Sprintf("Enable State Expiry, RemoteEndpoint: %v, StateEpoch: [%v|%v|%v], StateScheme: %v, PruneLevel: %v", + s.FullStateEndpoint, s.StateEpoch1Block, s.StateEpoch2Block, s.StateEpochPeriod, s.StateScheme, s.PruneLevel) +} diff --git a/eth/backend.go b/eth/backend.go index 6cca4141fe..2f191539bb 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -215,8 +215,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { Preimages: config.Preimages, StateHistory: config.StateHistory, StateScheme: config.StateScheme, - EnableStateExpiry: config.StateExpiryEnable, - RemoteEndPoint: config.StateExpiryFullStateEndpoint, + StateExpiryCfg: config.StateExpiryCfg, } ) bcOps := make([]core.BlockChainOption, 0) diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index c2a5c94d3b..091e7c65ca 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -19,6 +19,7 @@ package ethconfig import ( "errors" + "github.com/ethereum/go-ethereum/core/types" "time" "github.com/ethereum/go-ethereum/common" @@ -104,8 +105,7 @@ type Config struct { DisablePeerTxBroadcast bool // state expiry configs - StateExpiryEnable bool - StateExpiryFullStateEndpoint string + StateExpiryCfg *types.StateExpiryConfig // This can be set to list of enrtree:// URLs which will be queried for // for nodes to connect to. diff --git a/eth/state_accessor.go b/eth/state_accessor.go index 445a57fc4d..0a9b9c985f 100644 --- a/eth/state_accessor.go +++ b/eth/state_accessor.go @@ -72,7 +72,7 @@ func (eth *Ethereum) hashState(ctx context.Context, block *types.Block, reexec u database = state.NewDatabaseWithConfig(eth.chainDb, trie.HashDefaults) if statedb, err = state.New(block.Root(), database, nil); err == nil { if eth.blockchain.EnableStateExpiry() { - statedb.InitStateExpiryFeature(eth.blockchain.Config(), eth.blockchain.FullStateDB(), block.Hash(), block.Number()) + statedb.InitStateExpiryFeature(eth.blockchain.StateExpiryConfig(), eth.blockchain.FullStateDB(), block.Hash(), block.Number()) } log.Info("Found disk backend for state trie", "root", block.Root(), "number", block.Number()) return statedb, noopReleaser, nil @@ -102,7 +102,7 @@ func (eth *Ethereum) hashState(ctx context.Context, block *types.Block, reexec u statedb, err = state.New(current.Root(), database, nil) if err == nil { if eth.blockchain.EnableStateExpiry() { - statedb.InitStateExpiryFeature(eth.blockchain.Config(), eth.blockchain.FullStateDB(), current.Hash(), block.Number()) + statedb.InitStateExpiryFeature(eth.blockchain.StateExpiryConfig(), eth.blockchain.FullStateDB(), current.Hash(), block.Number()) } return statedb, noopReleaser, nil } @@ -124,7 +124,7 @@ func (eth *Ethereum) hashState(ctx context.Context, block *types.Block, reexec u statedb, err = state.New(current.Root(), database, nil) if err == nil { if eth.blockchain.EnableStateExpiry() { - statedb.InitStateExpiryFeature(eth.blockchain.Config(), eth.blockchain.FullStateDB(), current.Hash(), block.Number()) + statedb.InitStateExpiryFeature(eth.blockchain.StateExpiryConfig(), eth.blockchain.FullStateDB(), current.Hash(), block.Number()) } break } @@ -176,7 +176,7 @@ func (eth *Ethereum) hashState(ctx context.Context, block *types.Block, reexec u return nil, nil, fmt.Errorf("state reset after block %d failed: %v", current.NumberU64(), err) } if eth.blockchain.EnableStateExpiry() { - statedb.InitStateExpiryFeature(eth.blockchain.Config(), eth.blockchain.FullStateDB(), current.Hash(), new(big.Int).Add(current.Number(), common.Big1)) + statedb.InitStateExpiryFeature(eth.blockchain.StateExpiryConfig(), eth.blockchain.FullStateDB(), current.Hash(), new(big.Int).Add(current.Number(), common.Big1)) } // Hold the state reference and also drop the parent state // to prevent accumulating too many nodes in memory. diff --git a/params/config.go b/params/config.go index b4a8cc050e..fc1f40e6de 100644 --- a/params/config.go +++ b/params/config.go @@ -512,11 +512,8 @@ func (c *CliqueConfig) String() string { // ParliaConfig is the consensus engine configs for proof-of-staked-authority based sealing. type ParliaConfig struct { - Period uint64 `json:"period"` // Number of seconds between blocks to enforce - Epoch uint64 `json:"epoch"` // Epoch length to update validatorSet - StateEpoch1Block uint64 `json:"stateEpoch1Block"` - StateEpoch2Block uint64 `json:"stateEpoch2Block"` - StateEpochPeriod uint64 `json:"stateEpochPeriod"` + Period uint64 `json:"period"` // Number of seconds between blocks to enforce + Epoch uint64 `json:"epoch"` // Epoch length to update validatorSet } // String implements the stringer interface, returning the consensus engine details. diff --git a/trie/inspect_trie.go b/trie/inspect_trie.go index 818ad63cdd..d00c6fde75 100644 --- a/trie/inspect_trie.go +++ b/trie/inspect_trie.go @@ -229,19 +229,16 @@ func (inspect *Inspector) ConcurrentTraversal(theTrie *Trie, theTrieTreeStat *Tr case *shortNode: path = append(path, current.Key...) inspect.ConcurrentTraversal(theTrie, theTrieTreeStat, current.Val, height+1, path) - path = path[:len(path)-len(current.Key)] case *fullNode: for idx, child := range current.Children { if child == nil { continue } - childPath := path - childPath = append(childPath, byte(idx)) if len(inspect.concurrentQueue)*2 < cap(inspect.concurrentQueue) { inspect.wg.Add(1) - go inspect.SubConcurrentTraversal(theTrie, theTrieTreeStat, child, height+1, childPath) + go inspect.SubConcurrentTraversal(theTrie, theTrieTreeStat, child, height+1, copyNewSlice(path, []byte{byte(idx)})) } else { - inspect.ConcurrentTraversal(theTrie, theTrieTreeStat, child, height+1, childPath) + inspect.ConcurrentTraversal(theTrie, theTrieTreeStat, child, height+1, append(path, byte(idx))) } } case hashNode: diff --git a/trie/trie.go b/trie/trie.go index 6a74636b45..f6b747f467 100644 --- a/trie/trie.go +++ b/trie/trie.go @@ -26,6 +26,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/trie/trienode" + "sync/atomic" ) var ( @@ -1430,7 +1431,7 @@ type NodeInfo struct { } // PruneExpired traverses the storage trie and prunes all expired nodes. -func (t *Trie) PruneExpired(pruneItemCh chan *NodeInfo) error { +func (t *Trie) PruneExpired(pruneItemCh chan *NodeInfo, stats *atomic.Uint64) error { if !t.enableExpiry { return nil @@ -1444,7 +1445,7 @@ func (t *Trie) PruneExpired(pruneItemCh chan *NodeInfo) error { if pruneErr := t.recursePruneExpiredNode(n, path, epoch, pruneItemCh); pruneErr != nil { log.Error("recursePruneExpiredNode err", "Path", path, "err", pruneErr) } - }) + }, stats) if err != nil { return err } @@ -1452,7 +1453,7 @@ func (t *Trie) PruneExpired(pruneItemCh chan *NodeInfo) error { return nil } -func (t *Trie) findExpiredSubTree(n node, path []byte, epoch types.StateEpoch, pruner func(n node, path []byte, epoch types.StateEpoch)) error { +func (t *Trie) findExpiredSubTree(n node, path []byte, epoch types.StateEpoch, pruner func(n node, path []byte, epoch types.StateEpoch), stats *atomic.Uint64) error { // Upon reaching expired node, it will recursively traverse downwards to all the child nodes // and collect their hashes. Then, the corresponding key-value pairs will be deleted from the // database by batches. @@ -1463,16 +1464,22 @@ func (t *Trie) findExpiredSubTree(n node, path []byte, epoch types.StateEpoch, p switch n := n.(type) { case *shortNode: - err := t.findExpiredSubTree(n.Val, append(path, n.Key...), epoch, pruner) + if stats != nil { + stats.Add(1) + } + err := t.findExpiredSubTree(n.Val, append(path, n.Key...), epoch, pruner, stats) if err != nil { return err } return nil case *fullNode: + if stats != nil { + stats.Add(1) + } var err error // Go through every child and recursively delete expired nodes for i, child := range n.Children { - err = t.findExpiredSubTree(child, append(path, byte(i)), epoch, pruner) + err = t.findExpiredSubTree(child, append(path, byte(i)), epoch, pruner, stats) if err != nil { return err } @@ -1481,6 +1488,9 @@ func (t *Trie) findExpiredSubTree(n node, path []byte, epoch types.StateEpoch, p case hashNode: resolve, err := t.resolveAndTrack(n, path) + if _, ok := err.(*MissingNodeError); ok { + return nil + } if err != nil { return err } @@ -1488,7 +1498,7 @@ func (t *Trie) findExpiredSubTree(n node, path []byte, epoch types.StateEpoch, p return err } - return t.findExpiredSubTree(resolve, path, epoch, pruner) + return t.findExpiredSubTree(resolve, path, epoch, pruner, stats) case valueNode: return nil case nil: