Skip to content

Commit

Permalink
pruner: support PBSS expired state prune;
Browse files Browse the repository at this point in the history
  • Loading branch information
0xbundler committed Sep 26, 2023
1 parent 1c26e5d commit a8222b5
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 63 deletions.
23 changes: 21 additions & 2 deletions cmd/geth/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,20 +424,39 @@ func pruneState(ctx *cli.Context) error {
return err
}

if rawdb.ReadStateScheme(chaindb) != rawdb.HashScheme {
log.Crit("Offline pruning is not required for path scheme")
cacheConfig := &core.CacheConfig{
TrieCleanLimit: cfg.Eth.TrieCleanCache,
TrieCleanNoPrefetch: cfg.Eth.NoPrefetch,
TrieDirtyLimit: cfg.Eth.TrieDirtyCache,
TrieDirtyDisabled: cfg.Eth.NoPruning,
TrieTimeLimit: cfg.Eth.TrieTimeout,
NoTries: cfg.Eth.TriesVerifyMode != core.LocalVerify,
SnapshotLimit: cfg.Eth.SnapshotCache,
TriesInMemory: cfg.Eth.TriesInMemory,
Preimages: cfg.Eth.Preimages,
StateHistory: cfg.Eth.StateHistory,
StateScheme: cfg.Eth.StateScheme,
EnableStateExpiry: cfg.Eth.StateExpiryEnable,
RemoteEndPoint: cfg.Eth.StateExpiryFullStateEndpoint,
}
prunerconfig := pruner.Config{
Datadir: stack.ResolvePath(""),
BloomSize: ctx.Uint64(utils.BloomFilterSizeFlag.Name),
EnableStateExpiry: cfg.Eth.StateExpiryEnable,
ChainConfig: chainConfig,
CacheConfig: cacheConfig,
}
pruner, err := pruner.NewPruner(chaindb, prunerconfig, ctx.Uint64(utils.TriesInMemoryFlag.Name))
if err != nil {
log.Error("Failed to open snapshot tree", "err", err)
return err
}

if cfg.Eth.StateScheme == rawdb.PathScheme {
// when using PathScheme, only prune expired state
return pruner.ExpiredPrune(common.Big0, common.Hash{})
}

if ctx.NArg() > 1 {
log.Error("Too many arguments given")
return errors.New("too many arguments")
Expand Down
6 changes: 3 additions & 3 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ type CacheConfig struct {
RemoteEndPoint string
}

// triedbConfig derives the configures for trie database.
func (c *CacheConfig) triedbConfig() *trie.Config {
// TriedbConfig derives the configures for trie database.
func (c *CacheConfig) TriedbConfig() *trie.Config {
config := &trie.Config{
Cache: c.TrieCleanLimit,
Preimages: c.Preimages,
Expand Down Expand Up @@ -322,7 +322,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
diffLayerChanCache, _ := exlru.New(diffLayerCacheLimit)

// Open trie database with provided config
triedb := trie.NewDatabase(db, cacheConfig.triedbConfig())
triedb := trie.NewDatabase(db, cacheConfig.TriedbConfig())

// Setup the genesis block, commit the provided genesis specification
// to database if the genesis block is not present yet, or load the
Expand Down
126 changes: 77 additions & 49 deletions core/state/pruner/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"github.com/ethereum/go-ethereum/params"
"math"
"math/big"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -69,6 +70,7 @@ type Config struct {
BloomSize uint64 // The Megabytes of memory allocated to bloom-filter
EnableStateExpiry bool
ChainConfig *params.ChainConfig
CacheConfig *core.CacheConfig
}

// Pruner is an offline tool to prune the stale state with the
Expand All @@ -89,6 +91,7 @@ type Pruner struct {
stateBloom *stateBloom
snaptree *snapshot.Tree
triesInMemory uint64
flattenBlock *types.Header
}

type BlockPruner struct {
Expand Down Expand Up @@ -128,13 +131,17 @@ func NewPruner(db ethdb.Database, config Config, triesInMemory uint64) (*Pruner,
if err != nil {
return nil, err
}

flattenBlockHash := rawdb.ReadCanonicalHash(db, headBlock.NumberU64()-triesInMemory)
flattenBlock := rawdb.ReadHeader(db, flattenBlockHash, headBlock.NumberU64()-triesInMemory)
return &Pruner{
config: config,
chainHeader: headBlock.Header(),
db: db,
stateBloom: stateBloom,
snaptree: snaptree,
triesInMemory: triesInMemory,
flattenBlock: flattenBlock,
}, nil
}

Expand Down Expand Up @@ -662,53 +669,69 @@ func (p *Pruner) Prune(root common.Hash) error {
return err
}

// it must run later to prune, using bloom filter to prevent pruning in use trie node, cannot prune concurrently.
if p.config.EnableStateExpiry {
var (
pruneExpiredTrieCh = make(chan *snapshot.ContractItem, 100000)
pruneExpiredInDiskCh = make(chan *trie.NodeInfo, 100000)
epoch = types.GetStateEpoch(p.config.ChainConfig, p.chainHeader.Number)
rets = make([]error, 3)
expiryWG sync.WaitGroup
)
trieDB := trie.NewDatabase(p.db, &trie.Config{
EnableStateExpiry: true,
PathDB: nil, // TODO(0xbundler): support later
})
expiryWG.Add(2)
go func() {
defer expiryWG.Done()
rets[0] = asyncScanExpiredInTrie(trieDB, root, epoch, pruneExpiredTrieCh, pruneExpiredInDiskCh)
}()
go func() {
defer expiryWG.Done()
rets[1] = asyncPruneExpiredStorageInDisk(p.db, pruneExpiredInDiskCh, p.stateBloom)
}()
rets[2] = snapshot.TraverseContractTrie(p.snaptree, root, pruneExpiredTrieCh)

// wait task done
expiryWG.Wait()
for i, item := range rets {
if item != nil {
log.Error("prune expired state got error", "index", i, "err", item)
}
}
if err = p.ExpiredPrune(p.chainHeader.Number, root); err != nil {
return err
}

// recap epoch meta snap, save journal
snap := trieDB.EpochMetaSnapTree()
if snap != nil {
log.Info("epoch meta snap handle", "root", root)
if err := snap.Cap(root); err != nil {
log.Error("asyncPruneExpired, SnapTree Cap err", "err", err)
return err
}
if err := snap.Journal(); err != nil {
log.Error("asyncPruneExpired, SnapTree Journal err", "err", err)
return err
}
return nil
}

// ExpiredPrune it must run later to prune, using bloom filter in HBSS to prevent pruning in use trie node, cannot prune concurrently.
// but in PBSS, it need not bloom filter
func (p *Pruner) ExpiredPrune(height *big.Int, root common.Hash) error {
if !p.config.EnableStateExpiry {
log.Info("stop prune expired state, disable state expiry", "height", height, "root", root, "scheme", p.config.CacheConfig.StateScheme)
return nil
}

// if root is empty, using the deepest snap block to prune expired state
if root == (common.Hash{}) {
height = p.flattenBlock.Number
root = p.flattenBlock.Root
}
log.Info("start prune expired state", "height", height, "root", root, "scheme", p.config.CacheConfig.StateScheme)

var (
pruneExpiredTrieCh = make(chan *snapshot.ContractItem, 100000)
pruneExpiredInDiskCh = make(chan *trie.NodeInfo, 100000)
epoch = types.GetStateEpoch(p.config.ChainConfig, height)
rets = make([]error, 3)
tasksWG sync.WaitGroup
)
trieDB := trie.NewDatabase(p.db, p.config.CacheConfig.TriedbConfig())
tasksWG.Add(2)
go func() {
defer tasksWG.Done()
rets[0] = asyncScanExpiredInTrie(trieDB, root, epoch, pruneExpiredTrieCh, pruneExpiredInDiskCh)
}()
go func() {
defer tasksWG.Done()
rets[1] = asyncPruneExpiredStorageInDisk(p.db, pruneExpiredInDiskCh, p.stateBloom, p.config.CacheConfig.StateScheme)
}()
rets[2] = snapshot.TraverseContractTrie(p.snaptree, root, pruneExpiredTrieCh)

// wait task done
tasksWG.Wait()
for i, item := range rets {
if item != nil {
log.Error("prune expired state got error", "index", i, "err", item)
}
}

// recap epoch meta snap, save journal
snap := trieDB.EpochMetaSnapTree()
if snap != nil {
log.Info("epoch meta snap handle", "root", root)
if err := snap.Cap(root); err != nil {
log.Error("asyncPruneExpired, SnapTree Cap err", "err", err)
return err
}
if err := snap.Journal(); err != nil {
log.Error("asyncPruneExpired, SnapTree Journal err", "err", err)
return err
}
log.Info("Expired State pruning successful")
}
log.Info("Expired State pruning successful")

return nil
}
Expand Down Expand Up @@ -738,7 +761,7 @@ func asyncScanExpiredInTrie(db *trie.Database, stateRoot common.Hash, epoch type
return nil
}

func asyncPruneExpiredStorageInDisk(diskdb ethdb.Database, pruneExpiredInDisk chan *trie.NodeInfo, bloom *stateBloom) error {
func asyncPruneExpiredStorageInDisk(diskdb ethdb.Database, pruneExpiredInDisk chan *trie.NodeInfo, bloom *stateBloom, scheme string) error {
var (
trieCount = 0
epochMetaCount = 0
Expand All @@ -758,9 +781,14 @@ func asyncPruneExpiredStorageInDisk(diskdb ethdb.Database, pruneExpiredInDisk ch
// delete trie kv
trieCount++
trieSize += common.StorageSize(len(info.Key) + 32)
// hbss has shared kv, so using bloom to filter them out.
if !bloom.Contain(info.Hash.Bytes()) {
rawdb.DeleteTrieNode(batch, addr, info.Path, info.Hash, rawdb.HashScheme)
switch scheme {
case rawdb.PathScheme:
rawdb.DeleteTrieNode(batch, addr, info.Path, info.Hash, rawdb.PathScheme)
case rawdb.HashScheme:
// hbss has shared kv, so using bloom to filter them out.
if !bloom.Contain(info.Hash.Bytes()) {
rawdb.DeleteTrieNode(batch, addr, info.Path, info.Hash, rawdb.HashScheme)
}
}
// delete epoch meta
if info.IsBranch {
Expand All @@ -772,7 +800,7 @@ func asyncPruneExpiredStorageInDisk(diskdb ethdb.Database, pruneExpiredInDisk ch
if info.IsLeaf {
snapCount++
snapSize += common.StorageSize(32)
if err := snapshot.ShrinkExpiredLeaf(batch, addr, info.Key, info.Epoch); err != nil {
if err := snapshot.ShrinkExpiredLeaf(batch, addr, info.Key, info.Epoch, scheme); err != nil {
log.Error("ShrinkExpiredLeaf err", "addr", addr, "key", info.Key, "err", err)
}
}
Expand Down
21 changes: 13 additions & 8 deletions core/state/snapshot/snapshot_expire.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,23 @@ package snapshot

import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
)

// ShrinkExpiredLeaf tool function for snapshot kv prune
func ShrinkExpiredLeaf(db ethdb.KeyValueWriter, accountHash common.Hash, storageHash common.Hash, epoch types.StateEpoch) error {
// TODO: cannot prune snapshot in hbss, because it will used for trie prune, but it's ok in pbss.
//valWithEpoch := NewValueWithEpoch(epoch, nil)
//enc, err := EncodeValueToRLPBytes(valWithEpoch)
//if err != nil {
// return err
//}
//rawdb.WriteStorageSnapshot(db, accountHash, storageHash, enc)
func ShrinkExpiredLeaf(db ethdb.KeyValueWriter, accountHash common.Hash, storageHash common.Hash, epoch types.StateEpoch, scheme string) error {
switch scheme {
case rawdb.HashScheme:
//cannot prune snapshot in hbss, because it will used for trie prune, but it's ok in pbss.
case rawdb.PathScheme:
valWithEpoch := NewValueWithEpoch(epoch, nil)
enc, err := EncodeValueToRLPBytes(valWithEpoch)
if err != nil {
return err
}
rawdb.WriteStorageSnapshot(db, accountHash, storageHash, enc)
}
return nil
}
2 changes: 1 addition & 1 deletion core/state/snapshot/snapshot_expire_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestShrinkExpiredLeaf(t *testing.T) {
db := memorydb.New()
rawdb.WriteStorageSnapshot(db, accountHash, storageHash1, encodeSnapVal(NewRawValue([]byte("val1"))))

err := ShrinkExpiredLeaf(db, accountHash, storageHash1, types.StateEpoch0)
err := ShrinkExpiredLeaf(db, accountHash, storageHash1, types.StateEpoch0, rawdb.PathScheme)
assert.NoError(t, err)

assert.Equal(t, encodeSnapVal(NewValueWithEpoch(types.StateEpoch0, nil)), rawdb.ReadStorageSnapshot(db, accountHash, storageHash1))
Expand Down

0 comments on commit a8222b5

Please sign in to comment.