Skip to content

Commit

Permalink
pruner: opt expired prune, add more logs;
Browse files Browse the repository at this point in the history
  • Loading branch information
0xbundler committed Oct 17, 2023
1 parent 28ce9c8 commit 2a21175
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 50 deletions.
51 changes: 32 additions & 19 deletions core/state/pruner/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ const (
// triggering range compaction. It's a quite arbitrary number but just
// to avoid triggering range compaction because of small deletion.
rangeCompactionThreshold = 100000

FixedPrefixAndAddrSize = 33
)

// Config includes all the configurations for pruning.
Expand Down Expand Up @@ -136,6 +138,9 @@ func NewPruner(db ethdb.Database, config Config, triesInMemory uint64) (*Pruner,

flattenBlockHash := rawdb.ReadCanonicalHash(db, headBlock.NumberU64()-triesInMemory)
flattenBlock := rawdb.ReadHeader(db, flattenBlockHash, headBlock.NumberU64()-triesInMemory)
if flattenBlock == nil {
return nil, fmt.Errorf("cannot find %v depth block, it cannot prune", triesInMemory)
}
return &Pruner{
config: config,
chainHeader: headBlock.Header(),
Expand Down Expand Up @@ -831,7 +836,7 @@ func asyncScanUnExpiredInTrie(db *trie.Database, stateRoot common.Hash, epoch ty
return err
}
if time.Since(logged) > 8*time.Second {
log.Info("Pruning expired states", "trieNodes", trieCount.Load())
log.Info("Scan unexpired states", "trieNodes", trieCount.Load())
logged = time.Now()
}
}
Expand Down Expand Up @@ -866,7 +871,7 @@ func asyncScanExpiredInTrie(db *trie.Database, stateRoot common.Hash, epoch type
return err
}
if time.Since(logged) > 8*time.Second {
log.Info("Pruning expired states", "trieNodes", trieCount.Load())
log.Info("Scan unexpired states", "trieNodes", trieCount.Load())
logged = time.Now()
}
}
Expand All @@ -877,6 +882,7 @@ func asyncScanExpiredInTrie(db *trie.Database, stateRoot common.Hash, epoch type

func asyncPruneExpiredStorageInDisk(diskdb ethdb.Database, pruneExpiredInDisk chan *trie.NodeInfo, bloom *bloomfilter.Filter, scheme string) error {
var (
itemCount = 0
trieCount = 0
epochMetaCount = 0
snapCount = 0
Expand All @@ -891,46 +897,53 @@ func asyncPruneExpiredStorageInDisk(diskdb ethdb.Database, pruneExpiredInDisk ch
log.Debug("found expired state", "addr", info.Addr, "path",
hex.EncodeToString(info.Path), "epoch", info.Epoch, "isBranch",
info.IsBranch, "isLeaf", info.IsLeaf)
itemCount++
addr := info.Addr
// delete trie kv
trieCount++
switch scheme {
case rawdb.PathScheme:
val := rawdb.ReadTrieNode(diskdb, addr, info.Path, info.Hash, rawdb.PathScheme)
if len(val) == 0 {
log.Warn("cannot find source trie?", "addr", addr, "path", info.Path, "hash", info.Hash)
log.Debug("cannot find source trie?", "addr", addr, "path", info.Path, "hash", info.Hash, "epoch", info.Epoch)
} else {
trieCount++
trieSize += common.StorageSize(len(val) + FixedPrefixAndAddrSize + len(info.Path))
rawdb.DeleteTrieNode(batch, addr, info.Path, info.Hash, rawdb.PathScheme)
}
trieSize += common.StorageSize(len(val) + 33 + len(info.Path))
rawdb.DeleteTrieNode(batch, addr, info.Path, info.Hash, rawdb.PathScheme)
case rawdb.HashScheme:
// hbss has shared kv, so using bloom to filter them out.
if bloom == nil || !bloom.Contains(stateBloomHasher(info.Hash.Bytes())) {
val := rawdb.ReadTrieNode(diskdb, addr, info.Path, info.Hash, rawdb.HashScheme)
if len(val) == 0 {
log.Warn("cannot find source trie?", "addr", addr, "path", info.Path, "hash", info.Hash)
log.Debug("cannot find source trie?", "addr", addr, "path", info.Path, "hash", info.Hash, "epoch", info.Epoch)
} else {
trieCount++
trieSize += common.StorageSize(len(val) + FixedPrefixAndAddrSize)
rawdb.DeleteTrieNode(batch, addr, info.Path, info.Hash, rawdb.HashScheme)
}
trieSize += common.StorageSize(len(val) + 33)
rawdb.DeleteTrieNode(batch, addr, info.Path, info.Hash, rawdb.HashScheme)
}
}
// delete epoch meta
if info.IsBranch {
epochMetaCount++
val := rawdb.ReadEpochMetaPlainState(diskdb, addr, string(info.Path))
if len(val) == 0 {
log.Warn("cannot find source epochmeta?", "addr", addr, "path", info.Path, "hash", info.Hash)
if len(val) == 0 && info.Epoch > types.StateEpoch0 {
log.Debug("cannot find source epochmeta?", "addr", addr, "path", info.Path, "hash", info.Hash, "epoch", info.Epoch)
}
if len(val) > 0 {
epochMetaCount++
epochMetaSize += common.StorageSize(FixedPrefixAndAddrSize + len(info.Path) + len(val))
rawdb.DeleteEpochMetaPlainState(batch, addr, string(info.Path))
}
epochMetaSize += common.StorageSize(33 + len(info.Path) + len(val))
rawdb.DeleteEpochMetaPlainState(batch, addr, string(info.Path))
}
// replace snapshot kv only epoch
if info.IsLeaf {
snapCount++
size, err := snapshot.ShrinkExpiredLeaf(batch, diskdb, addr, info.Key, info.Epoch, scheme)
if err != nil {
log.Error("ShrinkExpiredLeaf err", "addr", addr, "key", info.Key, "err", err)
}
snapSize += common.StorageSize(size)
if size > 0 {
snapCount++
snapSize += common.StorageSize(size)
}
}
if batch.ValueSize() >= ethdb.IdealBatchSize {
if err := batch.Write(); err != nil {
Expand All @@ -939,7 +952,7 @@ func asyncPruneExpiredStorageInDisk(diskdb ethdb.Database, pruneExpiredInDisk ch
batch.Reset()
}
if time.Since(logged) > 8*time.Second {
log.Info("Pruning expired states", "trieNodes", trieCount, "trieSize", trieSize,
log.Info("Pruning expired states", "items", itemCount, "trieNodes", trieCount, "trieSize", trieSize,
"SnapKV", snapCount, "SnapKVSize", snapSize, "EpochMeta", epochMetaCount,
"EpochMetaSize", epochMetaSize)
logged = time.Now()
Expand All @@ -951,7 +964,7 @@ func asyncPruneExpiredStorageInDisk(diskdb ethdb.Database, pruneExpiredInDisk ch
}
batch.Reset()
}
log.Info("Pruned expired states", "trieNodes", trieCount, "trieSize", trieSize,
log.Info("Pruned expired states", "items", itemCount, "trieNodes", trieCount, "trieSize", trieSize,
"SnapKV", snapCount, "SnapKVSize", snapSize, "EpochMeta", epochMetaCount,
"EpochMetaSize", epochMetaSize, "elapsed", common.PrettyDuration(time.Since(start)))
// Start compactions, will remove the deleted data from the disk immediately.
Expand Down
3 changes: 2 additions & 1 deletion core/state/snapshot/snapshot_expire.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ func ShrinkExpiredLeaf(writer ethdb.KeyValueWriter, reader ethdb.KeyValueReader,
case rawdb.PathScheme:
val := rawdb.ReadStorageSnapshot(reader, accountHash, storageHash)
if len(val) == 0 {
log.Warn("cannot find source snapshot?", "addr", accountHash, "path", storageHash)
log.Warn("cannot find source snapshot?", "addr", accountHash, "key", storageHash, "epoch", epoch)
return 0, nil
}
valWithEpoch := NewValueWithEpoch(epoch, nil)
enc, err := EncodeValueToRLPBytes(valWithEpoch)
Expand Down
2 changes: 1 addition & 1 deletion trie/epochmeta/disklayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

const (
defaultDiskLayerCacheSize = 100000
defaultDiskLayerCacheSize = 1024000
)

type diskLayer struct {
Expand Down
2 changes: 1 addition & 1 deletion trie/inspect_trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (inspect *Inspector) ConcurrentTraversal(theTrie *Trie, theTrieTreeStat *Tr
}
if len(inspect.concurrentQueue)*2 < cap(inspect.concurrentQueue) {
inspect.wg.Add(1)
go inspect.SubConcurrentTraversal(theTrie, theTrieTreeStat, child, height+1, copyNewSlice(path, []byte{byte(idx)}))
go inspect.SubConcurrentTraversal(theTrie, theTrieTreeStat, child, height+1, copy2NewBytes(path, []byte{byte(idx)}))
} else {
inspect.ConcurrentTraversal(theTrie, theTrieTreeStat, child, height+1, append(path, byte(idx)))
}
Expand Down
12 changes: 9 additions & 3 deletions trie/proof.go
Original file line number Diff line number Diff line change
Expand Up @@ -919,7 +919,7 @@ func (m *MPTProofCache) VerifyProof() error {
prefix := m.RootKeyHex
for i := 0; i < len(m.cacheNodes); i++ {
if i-1 >= 0 {
prefix = copyNewSlice(prefix, m.cacheHexPath[i-1])
prefix = copy2NewBytes(prefix, m.cacheHexPath[i-1])
}
// prefix = append(prefix, m.cacheHexPath[i]...)
n1 := m.cacheNodes[i]
Expand All @@ -938,7 +938,7 @@ func (m *MPTProofCache) VerifyProof() error {
}
if merge {
i++
prefix = copyNewSlice(prefix, m.cacheHexPath[i-1])
prefix = copy2NewBytes(prefix, m.cacheHexPath[i-1])
nub.n2 = m.cacheNodes[i]
nub.n2PrefixKey = prefix
}
Expand All @@ -948,13 +948,19 @@ func (m *MPTProofCache) VerifyProof() error {
return nil
}

func copyNewSlice(s1, s2 []byte) []byte {
func copy2NewBytes(s1, s2 []byte) []byte {
ret := make([]byte, len(s1)+len(s2))
copy(ret, s1)
copy(ret[len(s1):], s2)
return ret
}

func renewBytes(s []byte) []byte {
ret := make([]byte, len(s))
copy(ret, s)
return ret
}

func (m *MPTProofCache) CacheNubs() []*MPTProofNub {
return m.cacheNubs
}
Expand Down
46 changes: 25 additions & 21 deletions trie/trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -1582,52 +1582,56 @@ func (t *Trie) findExpiredSubTree(n node, path []byte, epoch types.StateEpoch, p
func (t *Trie) recursePruneExpiredNode(n node, path []byte, epoch types.StateEpoch, pruneItemCh chan *NodeInfo) error {
switch n := n.(type) {
case *shortNode:
subPath := append(path, n.Key...)
key := common.Hash{}
_, ok := n.Val.(valueNode)
if ok {
key = common.BytesToHash(hexToKeybytes(append(path, n.Key...)))
_, isLeaf := n.Val.(valueNode)
if isLeaf {
key = common.BytesToHash(hexToKeybytes(subPath))
}
err := t.recursePruneExpiredNode(n.Val, append(path, n.Key...), epoch, pruneItemCh)
if err != nil {
return err
}
// prune child first
pruneItemCh <- &NodeInfo{
Addr: t.owner,
Hash: common.BytesToHash(n.flags.hash),
Path: path,
Path: renewBytes(path),
Key: key,
Epoch: epoch,
IsLeaf: ok,
IsLeaf: isLeaf,
}

err := t.recursePruneExpiredNode(n.Val, subPath, epoch, pruneItemCh)
if err != nil {
return err
}
return nil
case *fullNode:
pruneItemCh <- &NodeInfo{
Addr: t.owner,
Hash: common.BytesToHash(n.flags.hash),
Path: renewBytes(path),
Epoch: epoch,
IsBranch: true,
}
// recurse child, and except valueNode
for i := 0; i < BranchNodeLength-1; i++ {
err := t.recursePruneExpiredNode(n.Children[i], append(path, byte(i)), n.EpochMap[i], pruneItemCh)
if err != nil {
return err
}
}
// prune child first
pruneItemCh <- &NodeInfo{
Addr: t.owner,
Hash: common.BytesToHash(n.flags.hash),
Path: path,
Epoch: epoch,
IsBranch: true,
}
return nil
case hashNode:
// hashNode is a index of trie node storage, need not prune.
resolve, err := t.resolveAndTrack(n, path)
rn, err := t.resolveAndTrack(n, path)
// if touch miss node, just skip
if _, ok := err.(*MissingNodeError); ok {
return nil
}
if err != nil {
return err
}
if err = t.resolveEpochMetaAndTrack(resolve, epoch, path); err != nil {
if err = t.resolveEpochMetaAndTrack(rn, epoch, path); err != nil {
return err
}
return t.recursePruneExpiredNode(resolve, path, epoch, pruneItemCh)
return t.recursePruneExpiredNode(rn, path, epoch, pruneItemCh)
case valueNode:
// value node is not a single storage uint, so pass to prune.
return nil
Expand Down
2 changes: 1 addition & 1 deletion trie/triedb/pathdb/difflayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (dl *diffLayer) node(owner common.Hash, path []byte, hash common.Hash, dept
// bubble up an error here. It shouldn't happen at all.
if n.Hash != hash {
dirtyFalseMeter.Mark(1)
log.Error("Unexpected trie node in diff layer", "root", dl.root, "owner", owner, "path", path, "expect", hash, "got", n.Hash)
log.Debug("Unexpected trie node in diff layer", "root", dl.root, "owner", owner, "path", path, "expect", hash, "got", n.Hash)
return nil, newUnexpectedNodeError("diff", hash, n.Hash, owner, path)
}
dirtyHitMeter.Mark(1)
Expand Down
4 changes: 2 additions & 2 deletions trie/triedb/pathdb/disklayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (dl *diskLayer) Node(owner common.Hash, path []byte, hash common.Hash) ([]b
return blob, nil
}
cleanFalseMeter.Mark(1)
log.Error("Unexpected trie node in clean cache", "owner", owner, "path", path, "expect", hash, "got", got)
log.Debug("Unexpected trie node in clean cache", "owner", owner, "path", path, "expect", hash, "got", got)
}
cleanMissMeter.Mark(1)
}
Expand All @@ -149,7 +149,7 @@ func (dl *diskLayer) Node(owner common.Hash, path []byte, hash common.Hash) ([]b
}
if nHash != hash {
diskFalseMeter.Mark(1)
log.Error("Unexpected trie node in disk", "owner", owner, "path", path, "expect", hash, "got", nHash)
log.Debug("Unexpected trie node in disk", "owner", owner, "path", path, "expect", hash, "got", nHash)
return nil, newUnexpectedNodeError("disk", hash, nHash, owner, path)
}
if dl.cleans != nil && len(nBlob) > 0 {
Expand Down
2 changes: 1 addition & 1 deletion trie/triedb/pathdb/nodebuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (b *nodebuffer) node(owner common.Hash, path []byte, hash common.Hash) (*tr
}
if n.Hash != hash {
dirtyFalseMeter.Mark(1)
log.Error("Unexpected trie node in node buffer", "owner", owner, "path", path, "expect", hash, "got", n.Hash)
log.Debug("Unexpected trie node in node buffer", "owner", owner, "path", path, "expect", hash, "got", n.Hash)
return nil, newUnexpectedNodeError("dirty", hash, n.Hash, owner, path)
}
return n, nil
Expand Down

0 comments on commit 2a21175

Please sign in to comment.