From 6ef398fd784aba445115ef90be916062bb8e2da6 Mon Sep 17 00:00:00 2001 From: 0xbundler <124862913+0xbundler@users.noreply.github.com> Date: Sun, 8 Oct 2023 10:44:59 +0800 Subject: [PATCH] pruner: opt size statistic; trie/inspect: opt inspect in PBSS mode; --- core/blockchain.go | 2 +- core/blockchain_insert.go | 5 +-- core/rawdb/database.go | 6 +++- core/state/pruner/pruner.go | 36 ++++++++++++++++----- core/state/snapshot/snapshot_expire.go | 10 +++--- core/state/snapshot/snapshot_expire_test.go | 2 +- trie/inspect_trie.go | 7 ++-- trie/trie.go | 22 +++++++++---- 8 files changed, 62 insertions(+), 28 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 5a13efc7c0..929c1ed018 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2156,7 +2156,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) stats.usedGas += usedGas dirty, _ := bc.triedb.Size() - stats.report(chain, it.index, dirty, setHead) + stats.report(chain, it.index, dirty, setHead, bc.chainConfig) if !setHead { // After merge we expect few side chains. Simply count diff --git a/core/blockchain_insert.go b/core/blockchain_insert.go index 3e913cb87b..453eca4c57 100644 --- a/core/blockchain_insert.go +++ b/core/blockchain_insert.go @@ -17,6 +17,7 @@ package core import ( + "github.com/ethereum/go-ethereum/params" "time" "github.com/ethereum/go-ethereum/common" @@ -39,7 +40,7 @@ const statsReportLimit = 8 * time.Second // report prints statistics if some number of blocks have been processed // or more than a few seconds have passed since the last message. -func (st *insertStats) report(chain []*types.Block, index int, dirty common.StorageSize, setHead bool) { +func (st *insertStats) report(chain []*types.Block, index int, dirty common.StorageSize, setHead bool, config *params.ChainConfig) { // Fetch the timings for the batch var ( now = mclock.Now() @@ -56,7 +57,7 @@ func (st *insertStats) report(chain []*types.Block, index int, dirty common.Stor // Assemble the log context and send it to the logger context := []interface{}{ - "number", end.Number(), "hash", end.Hash(), + "number", end.Number(), "hash", end.Hash(), "stateEpoch", types.GetStateEpoch(config, end.Number()), "blocks", st.processed, "txs", txs, "mgas", float64(st.usedGas) / 1000000, "elapsed", common.PrettyDuration(elapsed), "mgasps", float64(st.usedGas) * 1000 / float64(elapsed), } diff --git a/core/rawdb/database.go b/core/rawdb/database.go index 2ba2dc1b11..9fdb959704 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -608,6 +608,7 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { accountSnaps stat storageSnaps stat snapJournal stat + trieJournal stat preimages stat bloomBits stat cliqueSnaps stat @@ -691,6 +692,8 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { epochMetaMetaSize.Add(size) case bytes.Equal(key, snapshotJournalKey): snapJournal.Add(size) + case bytes.Equal(key, trieJournalKey): + trieJournal.Add(size) case bytes.Equal(key, epochMetaSnapshotJournalKey): epochMetaSnapJournalSize.Add(size) case bytes.HasPrefix(key, EpochMetaPlainStatePrefix) && len(key) >= (len(EpochMetaPlainStatePrefix)+common.HashLength): @@ -702,7 +705,7 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { lastPivotKey, fastTrieProgressKey, snapshotDisabledKey, SnapshotRootKey, snapshotGeneratorKey, snapshotRecoveryKey, txIndexTailKey, fastTxLookupLimitKey, uncleanShutdownKey, badBlockKey, transitionStatusKey, skeletonSyncStatusKey, - persistentStateIDKey, trieJournalKey, snapshotSyncStatusKey, + persistentStateIDKey, snapshotSyncStatusKey, } { if bytes.Equal(key, meta) { metadata.Add(size) @@ -735,6 +738,7 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { {"Key-Value store", "Path trie state lookups", stateLookups.Size(), stateLookups.Count()}, {"Key-Value store", "Path trie account nodes", accountTries.Size(), accountTries.Count()}, {"Key-Value store", "Path trie storage nodes", storageTries.Size(), storageTries.Count()}, + {"Key-Value store", "Path trie snap journal", trieJournal.Size(), trieJournal.Count()}, {"Key-Value store", "Trie preimages", preimages.Size(), preimages.Count()}, {"Key-Value store", "Account snapshot", accountSnaps.Size(), accountSnaps.Count()}, {"Key-Value store", "Storage snapshot", storageSnaps.Size(), storageSnaps.Count()}, diff --git a/core/state/pruner/pruner.go b/core/state/pruner/pruner.go index 5102b8f3d8..f7ad97d7e1 100644 --- a/core/state/pruner/pruner.go +++ b/core/state/pruner/pruner.go @@ -29,6 +29,7 @@ import ( "path/filepath" "strings" "sync" + "sync/atomic" "time" "github.com/prometheus/tsdb/fileutil" @@ -740,8 +741,13 @@ func (p *Pruner) ExpiredPrune(height *big.Int, root common.Hash) error { // here are some issues when just delete it from hash-based storage, because it's shared kv in hbss // but it's ok for pbss. func asyncScanExpiredInTrie(db *trie.Database, stateRoot common.Hash, epoch types.StateEpoch, expireContractCh chan *snapshot.ContractItem, pruneExpiredInDisk chan *trie.NodeInfo) error { + var ( + trieCount atomic.Uint64 + start = time.Now() + logged = time.Now() + ) for item := range expireContractCh { - log.Info("start scan trie expired state", "addr", item.Addr, "root", item.Root) + log.Info("start scan trie expired state", "addrHash", item.Addr, "root", item.Root) tr, err := trie.New(&trie.ID{ StateRoot: stateRoot, Owner: item.Addr, @@ -752,11 +758,16 @@ func asyncScanExpiredInTrie(db *trie.Database, stateRoot common.Hash, epoch type return err } tr.SetEpoch(epoch) - if err = tr.PruneExpired(pruneExpiredInDisk); err != nil { + if err = tr.PruneExpired(pruneExpiredInDisk, &trieCount); err != nil { log.Error("asyncScanExpiredInTrie, PruneExpired err", "id", item, "err", err) return err } + if time.Since(logged) > 8*time.Second { + log.Info("Pruning expired states", "trieNodes", trieCount.Load()) + logged = time.Now() + } } + log.Info("Scan expired states", "trieNodes", trieCount.Load(), "elapsed", common.PrettyDuration(time.Since(start))) close(pruneExpiredInDisk) return nil } @@ -780,32 +791,39 @@ func asyncPruneExpiredStorageInDisk(diskdb ethdb.Database, pruneExpiredInDisk ch addr := info.Addr // delete trie kv trieCount++ - trieSize += common.StorageSize(len(info.Key) + 32) switch scheme { case rawdb.PathScheme: + val := rawdb.ReadTrieNode(diskdb, addr, info.Path, info.Hash, rawdb.PathScheme) + trieSize += common.StorageSize(len(val) + 33 + len(info.Path)) rawdb.DeleteTrieNode(batch, addr, info.Path, info.Hash, rawdb.PathScheme) case rawdb.HashScheme: // hbss has shared kv, so using bloom to filter them out. if !bloom.Contain(info.Hash.Bytes()) { + val := rawdb.ReadTrieNode(diskdb, addr, info.Path, info.Hash, rawdb.HashScheme) + trieSize += common.StorageSize(len(val) + 33) rawdb.DeleteTrieNode(batch, addr, info.Path, info.Hash, rawdb.HashScheme) } } // delete epoch meta if info.IsBranch { epochMetaCount++ - epochMetaSize += common.StorageSize(32 + len(info.Path) + 32) + val := rawdb.ReadEpochMetaPlainState(diskdb, addr, string(info.Path)) + epochMetaSize += common.StorageSize(33 + len(info.Path) + len(val)) rawdb.DeleteEpochMetaPlainState(batch, addr, string(info.Path)) } // replace snapshot kv only epoch if info.IsLeaf { snapCount++ - snapSize += common.StorageSize(32) - if err := snapshot.ShrinkExpiredLeaf(batch, addr, info.Key, info.Epoch, scheme); err != nil { + size, err := snapshot.ShrinkExpiredLeaf(batch, diskdb, addr, info.Key, info.Epoch, scheme) + if err != nil { log.Error("ShrinkExpiredLeaf err", "addr", addr, "key", info.Key, "err", err) } + snapSize += common.StorageSize(size) } if batch.ValueSize() >= ethdb.IdealBatchSize { - batch.Write() + if err := batch.Write(); err != nil { + log.Error("asyncPruneExpiredStorageInDisk, batch write err", "err", err) + } batch.Reset() } if time.Since(logged) > 8*time.Second { @@ -816,7 +834,9 @@ func asyncPruneExpiredStorageInDisk(diskdb ethdb.Database, pruneExpiredInDisk ch } } if batch.ValueSize() > 0 { - batch.Write() + if err := batch.Write(); err != nil { + log.Error("asyncPruneExpiredStorageInDisk, batch write err", "err", err) + } batch.Reset() } log.Info("Pruned expired states", "trieNodes", trieCount, "trieSize", trieSize, diff --git a/core/state/snapshot/snapshot_expire.go b/core/state/snapshot/snapshot_expire.go index a4be6a1f84..29520160b0 100644 --- a/core/state/snapshot/snapshot_expire.go +++ b/core/state/snapshot/snapshot_expire.go @@ -8,17 +8,19 @@ import ( ) // ShrinkExpiredLeaf tool function for snapshot kv prune -func ShrinkExpiredLeaf(db ethdb.KeyValueWriter, accountHash common.Hash, storageHash common.Hash, epoch types.StateEpoch, scheme string) error { +func ShrinkExpiredLeaf(writer ethdb.KeyValueWriter, reader ethdb.KeyValueReader, accountHash common.Hash, storageHash common.Hash, epoch types.StateEpoch, scheme string) (int64, error) { switch scheme { case rawdb.HashScheme: //cannot prune snapshot in hbss, because it will used for trie prune, but it's ok in pbss. case rawdb.PathScheme: + val := rawdb.ReadStorageSnapshot(reader, accountHash, storageHash) valWithEpoch := NewValueWithEpoch(epoch, nil) enc, err := EncodeValueToRLPBytes(valWithEpoch) if err != nil { - return err + return 0, err } - rawdb.WriteStorageSnapshot(db, accountHash, storageHash, enc) + rawdb.WriteStorageSnapshot(writer, accountHash, storageHash, enc) + return int64(65 + len(val)), nil } - return nil + return 0, nil } diff --git a/core/state/snapshot/snapshot_expire_test.go b/core/state/snapshot/snapshot_expire_test.go index 50067c53b3..ac20e9c90f 100644 --- a/core/state/snapshot/snapshot_expire_test.go +++ b/core/state/snapshot/snapshot_expire_test.go @@ -21,7 +21,7 @@ func TestShrinkExpiredLeaf(t *testing.T) { db := memorydb.New() rawdb.WriteStorageSnapshot(db, accountHash, storageHash1, encodeSnapVal(NewRawValue([]byte("val1")))) - err := ShrinkExpiredLeaf(db, accountHash, storageHash1, types.StateEpoch0, rawdb.PathScheme) + _, err := ShrinkExpiredLeaf(db, db, accountHash, storageHash1, types.StateEpoch0, rawdb.PathScheme) assert.NoError(t, err) assert.Equal(t, encodeSnapVal(NewValueWithEpoch(types.StateEpoch0, nil)), rawdb.ReadStorageSnapshot(db, accountHash, storageHash1)) diff --git a/trie/inspect_trie.go b/trie/inspect_trie.go index 818ad63cdd..d00c6fde75 100644 --- a/trie/inspect_trie.go +++ b/trie/inspect_trie.go @@ -229,19 +229,16 @@ func (inspect *Inspector) ConcurrentTraversal(theTrie *Trie, theTrieTreeStat *Tr case *shortNode: path = append(path, current.Key...) inspect.ConcurrentTraversal(theTrie, theTrieTreeStat, current.Val, height+1, path) - path = path[:len(path)-len(current.Key)] case *fullNode: for idx, child := range current.Children { if child == nil { continue } - childPath := path - childPath = append(childPath, byte(idx)) if len(inspect.concurrentQueue)*2 < cap(inspect.concurrentQueue) { inspect.wg.Add(1) - go inspect.SubConcurrentTraversal(theTrie, theTrieTreeStat, child, height+1, childPath) + go inspect.SubConcurrentTraversal(theTrie, theTrieTreeStat, child, height+1, copyNewSlice(path, []byte{byte(idx)})) } else { - inspect.ConcurrentTraversal(theTrie, theTrieTreeStat, child, height+1, childPath) + inspect.ConcurrentTraversal(theTrie, theTrieTreeStat, child, height+1, append(path, byte(idx))) } } case hashNode: diff --git a/trie/trie.go b/trie/trie.go index 6a74636b45..f6b747f467 100644 --- a/trie/trie.go +++ b/trie/trie.go @@ -26,6 +26,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/trie/trienode" + "sync/atomic" ) var ( @@ -1430,7 +1431,7 @@ type NodeInfo struct { } // PruneExpired traverses the storage trie and prunes all expired nodes. -func (t *Trie) PruneExpired(pruneItemCh chan *NodeInfo) error { +func (t *Trie) PruneExpired(pruneItemCh chan *NodeInfo, stats *atomic.Uint64) error { if !t.enableExpiry { return nil @@ -1444,7 +1445,7 @@ func (t *Trie) PruneExpired(pruneItemCh chan *NodeInfo) error { if pruneErr := t.recursePruneExpiredNode(n, path, epoch, pruneItemCh); pruneErr != nil { log.Error("recursePruneExpiredNode err", "Path", path, "err", pruneErr) } - }) + }, stats) if err != nil { return err } @@ -1452,7 +1453,7 @@ func (t *Trie) PruneExpired(pruneItemCh chan *NodeInfo) error { return nil } -func (t *Trie) findExpiredSubTree(n node, path []byte, epoch types.StateEpoch, pruner func(n node, path []byte, epoch types.StateEpoch)) error { +func (t *Trie) findExpiredSubTree(n node, path []byte, epoch types.StateEpoch, pruner func(n node, path []byte, epoch types.StateEpoch), stats *atomic.Uint64) error { // Upon reaching expired node, it will recursively traverse downwards to all the child nodes // and collect their hashes. Then, the corresponding key-value pairs will be deleted from the // database by batches. @@ -1463,16 +1464,22 @@ func (t *Trie) findExpiredSubTree(n node, path []byte, epoch types.StateEpoch, p switch n := n.(type) { case *shortNode: - err := t.findExpiredSubTree(n.Val, append(path, n.Key...), epoch, pruner) + if stats != nil { + stats.Add(1) + } + err := t.findExpiredSubTree(n.Val, append(path, n.Key...), epoch, pruner, stats) if err != nil { return err } return nil case *fullNode: + if stats != nil { + stats.Add(1) + } var err error // Go through every child and recursively delete expired nodes for i, child := range n.Children { - err = t.findExpiredSubTree(child, append(path, byte(i)), epoch, pruner) + err = t.findExpiredSubTree(child, append(path, byte(i)), epoch, pruner, stats) if err != nil { return err } @@ -1481,6 +1488,9 @@ func (t *Trie) findExpiredSubTree(n node, path []byte, epoch types.StateEpoch, p case hashNode: resolve, err := t.resolveAndTrack(n, path) + if _, ok := err.(*MissingNodeError); ok { + return nil + } if err != nil { return err } @@ -1488,7 +1498,7 @@ func (t *Trie) findExpiredSubTree(n node, path []byte, epoch types.StateEpoch, p return err } - return t.findExpiredSubTree(resolve, path, epoch, pruner) + return t.findExpiredSubTree(resolve, path, epoch, pruner, stats) case valueNode: return nil case nil: