Skip to content

Commit

Permalink
state/stateobject: support snap prune level0&level1;
Browse files Browse the repository at this point in the history
prune: opt prune logic for snap;
  • Loading branch information
0xbundler committed Nov 13, 2023
1 parent c1dabea commit 8a096dc
Show file tree
Hide file tree
Showing 11 changed files with 147 additions and 156 deletions.
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ var (
utils.StateExpiryStateEpochPeriodFlag,
utils.StateExpiryEnableLocalReviveFlag,
utils.StateExpiryEnableRemoteModeFlag,
utils.StateExpiryPruneLevelFlag,
}
)

Expand Down
12 changes: 6 additions & 6 deletions cmd/geth/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,12 +440,12 @@ func pruneState(ctx *cli.Context) error {
StateExpiryCfg: cfg.Eth.StateExpiryCfg,
}
prunerconfig := pruner.Config{
Datadir: stack.ResolvePath(""),
BloomSize: ctx.Uint64(utils.BloomFilterSizeFlag.Name),
EnableStateExpiry: cfg.Eth.StateExpiryCfg.EnableExpiry(),
ChainConfig: chainConfig,
CacheConfig: cacheConfig,
MaxExpireThreads: ctx.Uint64(utils.StateExpiryMaxThreadFlag.Name),
Datadir: stack.ResolvePath(""),
BloomSize: ctx.Uint64(utils.BloomFilterSizeFlag.Name),
ExpiryCfg: cfg.Eth.StateExpiryCfg,
ChainConfig: chainConfig,
CacheConfig: cacheConfig,
MaxExpireThreads: ctx.Uint64(utils.StateExpiryMaxThreadFlag.Name),
}
pruner, err := pruner.NewPruner(chaindb, prunerconfig, ctx.Uint64(utils.TriesInMemoryFlag.Name))
if err != nil {
Expand Down
23 changes: 13 additions & 10 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -1169,6 +1169,12 @@ var (
Value: false,
Category: flags.StateExpiryCategory,
}
StateExpiryPruneLevelFlag = &cli.UintFlag{
Name: "state-expiry.prunelevel",
Usage: "set prune level for state expiry,",
Value: types.StateExpiryPruneLevel0,
Category: flags.StateExpiryCategory,
}
)

func init() {
Expand Down Expand Up @@ -2607,20 +2613,17 @@ func ParseStateExpiryConfig(ctx *cli.Context, disk ethdb.Database, scheme string
} else if stored != nil {
newCfg.StateEpochPeriod = stored.StateEpochPeriod
}
if ctx.IsSet(StateExpiryPruneLevelFlag.Name) {
newCfg.PruneLevel = ctx.Uint(StateExpiryPruneLevelFlag.Name)
} else if stored != nil {
newCfg.PruneLevel = stored.PruneLevel
}
if ctx.IsSet(StateExpiryEnableLocalReviveFlag.Name) {
newCfg.EnableLocalRevive = ctx.Bool(StateExpiryEnableLocalReviveFlag.Name)
}

// override prune level
newCfg.PruneLevel = types.StateExpiryPruneLevel1
switch newCfg.StateScheme {
case rawdb.HashScheme:
// TODO(0xbundler): will stop support HBSS later.
newCfg.PruneLevel = types.StateExpiryPruneLevel0
case rawdb.PathScheme:
newCfg.PruneLevel = types.StateExpiryPruneLevel1
default:
return nil, fmt.Errorf("not support the state scheme: %v", newCfg.StateScheme)
if rawdb.HashScheme == newCfg.StateScheme && types.StateExpiryPruneLevel1 != newCfg.PruneLevel {
return nil, errors.New("PruneLevel must be StateExpiryPruneLevel1 in HBSS")
}

if err := newCfg.Validation(); err != nil {
Expand Down
68 changes: 30 additions & 38 deletions core/state/pruner/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ const (
// to avoid triggering range compaction because of small deletion.
rangeCompactionThreshold = 1000000

FixedPrefixAndAddrSize = 33
ContractTrieNodeAvgSize = 105 // This is an experience value

ContractEpochMetaAvgSize = 16 // This is an experience value

defaultReportDuration = 60 * time.Second

Expand All @@ -74,12 +76,12 @@ const (

// Config includes all the configurations for pruning.
type Config struct {
Datadir string // The directory of the state database
BloomSize uint64 // The Megabytes of memory allocated to bloom-filter
EnableStateExpiry bool
ChainConfig *params.ChainConfig
CacheConfig *core.CacheConfig
MaxExpireThreads uint64
Datadir string // The directory of the state database
BloomSize uint64 // The Megabytes of memory allocated to bloom-filter
ExpiryCfg *types.StateExpiryConfig
ChainConfig *params.ChainConfig
CacheConfig *core.CacheConfig
MaxExpireThreads uint64
}

// Pruner is an offline tool to prune the stale state with the
Expand Down Expand Up @@ -702,7 +704,7 @@ func (p *Pruner) Prune(root common.Hash) error {
// ExpiredPrune it must run later to prune, using bloom filter in HBSS to prevent pruning in use trie node, cannot prune concurrently.
// but in PBSS, it need not bloom filter
func (p *Pruner) ExpiredPrune(height *big.Int, root common.Hash) error {
if !p.config.EnableStateExpiry {
if !p.config.ExpiryCfg.EnableExpiry() {
log.Info("stop prune expired state, disable state expiry", "height", height, "root", root, "scheme", p.config.CacheConfig.StateScheme)
return nil
}
Expand Down Expand Up @@ -742,7 +744,7 @@ func (p *Pruner) ExpiredPrune(height *big.Int, root common.Hash) error {
}()
go func() {
defer tasksWG.Done()
rets[1] = asyncPruneExpiredStorageInDisk(p.db, pruneExpiredInDiskCh, bloom, p.config.CacheConfig.StateScheme)
rets[1] = asyncPruneExpiredStorageInDisk(p.db, pruneExpiredInDiskCh, bloom, p.config.ExpiryCfg)
}()
rets[2] = snapshot.TraverseContractTrie(p.snaptree, root, scanExpiredTrieCh)

Expand Down Expand Up @@ -874,6 +876,14 @@ func asyncScanExpiredInTrie(db *trie.Database, stateRoot common.Hash, epoch type
return err
}
tr.SetEpoch(epoch)
if st.MoreThread() {
st.Schedule(func() {
if err = tr.ScanForPrune(st); err != nil {
log.Error("asyncScanExpiredInTrie, ScanForPrune err", "id", item, "err", err)
}
})
continue
}
if err = tr.ScanForPrune(st); err != nil {
log.Error("asyncScanExpiredInTrie, ScanForPrune err", "id", item, "err", err)
return err
Expand All @@ -883,7 +893,7 @@ func asyncScanExpiredInTrie(db *trie.Database, stateRoot common.Hash, epoch type
return nil
}

func asyncPruneExpiredStorageInDisk(diskdb ethdb.Database, pruneExpiredInDisk chan *trie.NodeInfo, bloom *bloomfilter.Filter, scheme string) error {
func asyncPruneExpiredStorageInDisk(diskdb ethdb.Database, pruneExpiredInDisk chan *trie.NodeInfo, bloom *bloomfilter.Filter, cfg *types.StateExpiryConfig) error {
var (
itemCount = 0
trieCount = 0
Expand All @@ -902,44 +912,26 @@ func asyncPruneExpiredStorageInDisk(diskdb ethdb.Database, pruneExpiredInDisk ch
info.IsBranch, "isLeaf", info.IsLeaf)
itemCount++
addr := info.Addr
switch scheme {
trieCount++
trieSize += ContractTrieNodeAvgSize
switch cfg.StateScheme {
case rawdb.PathScheme:
val := rawdb.ReadTrieNode(diskdb, addr, info.Path, info.Hash, rawdb.PathScheme)
if len(val) == 0 {
log.Debug("cannot find source trie?", "addr", addr, "path", info.Path, "hash", info.Hash, "epoch", info.Epoch)
} else {
trieCount++
trieSize += common.StorageSize(len(val) + FixedPrefixAndAddrSize + len(info.Path))
rawdb.DeleteTrieNode(batch, addr, info.Path, info.Hash, rawdb.PathScheme)
}
rawdb.DeleteTrieNode(batch, addr, info.Path, info.Hash, rawdb.PathScheme)
case rawdb.HashScheme:
// hbss has shared kv, so using bloom to filter them out.
if bloom == nil || !bloom.Contains(stateBloomHasher(info.Hash.Bytes())) {
val := rawdb.ReadTrieNode(diskdb, addr, info.Path, info.Hash, rawdb.HashScheme)
if len(val) == 0 {
log.Debug("cannot find source trie?", "addr", addr, "path", info.Path, "hash", info.Hash, "epoch", info.Epoch)
} else {
trieCount++
trieSize += common.StorageSize(len(val) + FixedPrefixAndAddrSize)
rawdb.DeleteTrieNode(batch, addr, info.Path, info.Hash, rawdb.HashScheme)
}
rawdb.DeleteTrieNode(batch, addr, info.Path, info.Hash, rawdb.HashScheme)
}
}
// delete epoch meta in HBSS
if info.IsBranch && rawdb.HashScheme == scheme {
val := rawdb.ReadEpochMetaPlainState(diskdb, addr, string(info.Path))
if len(val) == 0 && info.Epoch > types.StateEpoch0 {
log.Debug("cannot find source epochmeta?", "addr", addr, "path", info.Path, "hash", info.Hash, "epoch", info.Epoch)
}
if len(val) > 0 {
epochMetaCount++
epochMetaSize += common.StorageSize(FixedPrefixAndAddrSize + len(info.Path) + len(val))
rawdb.DeleteEpochMetaPlainState(batch, addr, string(info.Path))
}
if info.IsBranch && rawdb.HashScheme == cfg.StateScheme {
epochMetaCount++
epochMetaSize += ContractEpochMetaAvgSize
rawdb.DeleteEpochMetaPlainState(batch, addr, string(info.Path))
}
// replace snapshot kv only epoch
if info.IsLeaf {
size, err := snapshot.ShrinkExpiredLeaf(batch, diskdb, addr, info.Key, info.Epoch, scheme)
size, err := snapshot.ShrinkExpiredLeaf(batch, diskdb, addr, info.Key, cfg)
if err != nil {
log.Error("ShrinkExpiredLeaf err", "addr", addr, "key", info.Key, "err", err)
}
Expand Down
33 changes: 10 additions & 23 deletions core/state/snapshot/snapshot_expire.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,18 @@ import (
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
)

const (
ContractSnapshotAvgSize = 17 // This is an experience value
)

// ShrinkExpiredLeaf tool function for snapshot kv prune
func ShrinkExpiredLeaf(writer ethdb.KeyValueWriter, reader ethdb.KeyValueReader, accountHash common.Hash, storageHash common.Hash, epoch types.StateEpoch, scheme string) (int64, error) {
switch scheme {
case rawdb.HashScheme:
//cannot prune snapshot in hbss, because it will used for trie prune, but it's ok in pbss.
case rawdb.PathScheme:
val := rawdb.ReadStorageSnapshot(reader, accountHash, storageHash)
if len(val) == 0 {
log.Debug("cannot find source snapshot?", "addr", accountHash, "key", storageHash, "epoch", epoch)
return 0, nil
}
valWithEpoch := NewValueWithEpoch(epoch, nil)
enc, err := EncodeValueToRLPBytes(valWithEpoch)
if err != nil {
return 0, err
}
rawdb.WriteStorageSnapshot(writer, accountHash, storageHash, enc)
shrinkSize := len(val) - len(enc)
if shrinkSize < 0 {
shrinkSize = 0
}
return int64(shrinkSize), nil
func ShrinkExpiredLeaf(writer ethdb.KeyValueWriter, reader ethdb.KeyValueReader, accountHash common.Hash, storageHash common.Hash, cfg *types.StateExpiryConfig) (int64, error) {
if types.StateExpiryPruneLevel1 == cfg.PruneLevel {
return 0, nil
}
return 0, nil

rawdb.DeleteStorageSnapshot(writer, accountHash, storageHash)
return ContractSnapshotAvgSize, nil
}
27 changes: 24 additions & 3 deletions core/state/snapshot/snapshot_expire_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,35 @@ var (
storageHash1 = common.HexToHash("0x0bb2f3e66816c6fd12513f053d5ee034b1fa2d448a1dc8ee7f56e4c87d6c53fe")
)

func TestShrinkExpiredLeaf(t *testing.T) {
func TestShrinkExpiredLeaf_Level1(t *testing.T) {
db := memorydb.New()
rawdb.WriteStorageSnapshot(db, accountHash, storageHash1, encodeSnapVal(NewRawValue([]byte("val1"))))

_, err := ShrinkExpiredLeaf(db, db, accountHash, storageHash1, types.StateEpoch0, rawdb.PathScheme)
cfg := &types.StateExpiryConfig{
StateScheme: rawdb.PathScheme,
PruneLevel: types.StateExpiryPruneLevel0,
}

_, err := ShrinkExpiredLeaf(db, db, accountHash, storageHash1, cfg)
assert.NoError(t, err)

assert.True(t, len(rawdb.ReadStorageSnapshot(db, accountHash, storageHash1)) == 0)
}

func TestShrinkExpiredLeaf_Level0(t *testing.T) {
db := memorydb.New()
raw := encodeSnapVal(NewRawValue([]byte("val1")))
rawdb.WriteStorageSnapshot(db, accountHash, storageHash1, raw)

cfg := &types.StateExpiryConfig{
StateScheme: rawdb.PathScheme,
PruneLevel: types.StateExpiryPruneLevel1,
}

_, err := ShrinkExpiredLeaf(db, db, accountHash, storageHash1, cfg)
assert.NoError(t, err)

assert.Equal(t, encodeSnapVal(NewValueWithEpoch(types.StateEpoch0, nil)), rawdb.ReadStorageSnapshot(db, accountHash, storageHash1))
assert.Equal(t, raw, rawdb.ReadStorageSnapshot(db, accountHash, storageHash1))
}

func encodeSnapVal(val SnapValue) []byte {
Expand Down
1 change: 1 addition & 0 deletions core/state/state_expiry.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type stateExpiryMeta struct {
epoch types.StateEpoch
originalRoot common.Hash
originalHash common.Hash
pruneLevel uint
}

func defaultStateExpiryMeta() *stateExpiryMeta {
Expand Down
Loading

0 comments on commit 8a096dc

Please sign in to comment.