From fefd043509e05cbb404431339683900335a6b232 Mon Sep 17 00:00:00 2001 From: 0xbundler <124862913+0xbundler@users.noreply.github.com> Date: Thu, 12 Oct 2023 21:42:25 +0800 Subject: [PATCH] metrics: add some trace metrics for epoch meta; trie/trie: fix some update epoch bugs; state/state_object: fix epoch update issue; state/statedb: fix oom issue; --- core/blockchain.go | 14 +++++--- core/state/state_expiry.go | 3 ++ core/state/state_object.go | 19 ++++++---- core/state/statedb.go | 12 +++++++ trie/epochmeta/difflayer.go | 36 ++++++++++--------- trie/epochmeta/snapshot.go | 3 +- trie/trie.go | 64 ++++++++++++++++----------------- trie/trie_expiry.go | 8 ++--- trie/trie_reader.go | 17 +++++++++ trie/triedb/pathdb/layertree.go | 3 ++ 10 files changed, 115 insertions(+), 64 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index caec127fd5..b5541f053b 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1654,6 +1654,10 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. // If node is running in path mode, skip explicit gc operation // which is unnecessary in this mode. if bc.triedb.Scheme() == rawdb.PathScheme { + err := bc.triedb.CommitEpochMeta(block.Root()) + if err != nil { + return err + } return nil } @@ -1668,10 +1672,12 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. // Full but not archive node, do proper garbage collection triedb.Reference(block.Root(), common.Hash{}) // metadata reference to keep trie alive bc.triegc.Push(block.Root(), -int64(block.NumberU64())) - err := triedb.CommitEpochMeta(block.Root()) - if err != nil { - return err - } + // TODO(0xbundler): when opt commit later, remove it. + go triedb.CommitEpochMeta(block.Root()) + //err := triedb.CommitEpochMeta(block.Root()) + //if err != nil { + // return err + //} if current := block.NumberU64(); current > bc.triesInMemory { // If we exceeded our memory allowance, flush matured singleton nodes to disk diff --git a/core/state/state_expiry.go b/core/state/state_expiry.go index fbbd45f9a4..1f90760a43 100644 --- a/core/state/state_expiry.go +++ b/core/state/state_expiry.go @@ -112,6 +112,9 @@ func batchFetchExpiredFromRemote(expiryMeta *stateExpiryMeta, addr common.Addres keysStr[i] = common.Bytes2Hex(key[:]) } } + if len(prefixKeysStr) == 0 { + return ret, nil + } // cannot revive locally, fetch remote proof proofs, err := expiryMeta.fullStateDB.GetStorageReviveProof(expiryMeta.originalRoot, addr, root, prefixKeysStr, keysStr) diff --git a/core/state/state_object.go b/core/state/state_object.go index 122b93388e..626b2a05e2 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -169,7 +169,6 @@ func (s *stateObject) getTrie() (Trie, error) { s.trie = s.db.prefetcher.trie(s.addrHash, s.data.Root) } if s.trie == nil { - // TODO(0xbundler): if any change to open a storage trie in state expiry feature? tr, err := s.db.db.OpenStorageTrie(s.db.originalRoot, s.address, s.data.Root) if err != nil { return nil, err @@ -320,6 +319,7 @@ func (s *stateObject) GetCommittedState(key common.Hash) common.Hash { // handle state expiry situation if s.db.EnableExpire() { if enErr, ok := err.(*trie.ExpiredNodeError); ok { + log.Debug("GetCommittedState expired in trie", "addr", s.address, "key", key, "err", err) val, err = s.fetchExpiredFromRemote(enErr.Path, key, false) } // TODO(0xbundler): add epoch record cache for prevent frequency access epoch update, may implement later @@ -486,6 +486,7 @@ func (s *stateObject) updateTrie() (Trie, error) { if _, err = fetchExpiredStorageFromRemote(s.db.expiryMeta, s.address, s.data.Root, tr, enErr.Path, key); err != nil { s.db.setError(fmt.Errorf("state object pendingFutureReviveState fetchExpiredStorageFromRemote err, contract: %v, key: %v, path: %v, err: %v", s.address, key, enErr.Path, err)) } + log.Debug("updateTrie pendingFutureReviveState", "contract", s.address, "key", key, "epoch", s.db.Epoch(), "tr.epoch", tr.Epoch(), "tr", fmt.Sprintf("%p", tr), "ins", fmt.Sprintf("%p", s)) } } for key, value := range dirtyStorage { @@ -493,11 +494,13 @@ func (s *stateObject) updateTrie() (Trie, error) { if err := tr.DeleteStorage(s.address, key[:]); err != nil { s.db.setError(fmt.Errorf("state object update trie DeleteStorage err, contract: %v, key: %v, err: %v", s.address, key, err)) } + log.Debug("updateTrie DeleteStorage", "contract", s.address, "key", key, "epoch", s.db.Epoch(), "value", value, "tr.epoch", tr.Epoch(), "tr", fmt.Sprintf("%p", tr), "ins", fmt.Sprintf("%p", s)) s.db.StorageDeleted += 1 } else { if err := tr.UpdateStorage(s.address, key[:], value); err != nil { s.db.setError(fmt.Errorf("state object update trie UpdateStorage err, contract: %v, key: %v, err: %v", s.address, key, err)) } + log.Debug("updateTrie UpdateStorage", "contract", s.address, "key", key, "epoch", s.db.Epoch(), "value", value, "tr.epoch", tr.Epoch(), "tr", fmt.Sprintf("%p", tr), "ins", fmt.Sprintf("%p", s)) s.db.StorageUpdated += 1 } // Cache the items for preloading @@ -527,13 +530,16 @@ func (s *stateObject) updateTrie() (Trie, error) { // rlp-encoded value to be used by the snapshot var snapshotVal []byte - // Encoding []byte cannot fail, ok to ignore the error. - if s.db.EnableExpire() { - snapshotVal, _ = snapshot.EncodeValueToRLPBytes(snapshot.NewValueWithEpoch(s.db.Epoch(), value)) - } else { - snapshotVal, _ = rlp.EncodeToBytes(value) + if len(value) != 0 { + // Encoding []byte cannot fail, ok to ignore the error. + if s.db.EnableExpire() { + snapshotVal, _ = snapshot.EncodeValueToRLPBytes(snapshot.NewValueWithEpoch(s.db.Epoch(), value)) + } else { + snapshotVal, _ = rlp.EncodeToBytes(value) + } } storage[khash] = snapshotVal // snapshotVal will be nil if it's deleted + log.Debug("updateTrie UpdateSnapShot", "contract", s.address, "key", key, "epoch", s.db.Epoch(), "value", snapshotVal, "tr.epoch", tr.Epoch(), "tr", fmt.Sprintf("%p", tr), "ins", fmt.Sprintf("%p", s)) // Track the original value of slot only if it's mutated first time prev := s.originStorage[key] @@ -905,6 +911,7 @@ func (s *stateObject) getExpirySnapStorage(key common.Hash) ([]byte, error, erro return val.GetVal(), nil, nil } + log.Debug("GetCommittedState expired in snapshot", "addr", s.address, "key", key, "val", val, "enc", enc, "err", err) // handle from remoteDB, if got err just setError, or return to revert in consensus version. valRaw, err := s.fetchExpiredFromRemote(nil, key, true) if err != nil { diff --git a/core/state/statedb.go b/core/state/statedb.go index 142e434e59..a7eb0aa300 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -18,6 +18,7 @@ package state import ( + "bytes" "errors" "fmt" "github.com/ethereum/go-ethereum/ethdb" @@ -1800,6 +1801,7 @@ func (s *StateDB) Commit(block uint64, failPostCommitFunc func(), postCommitFunc if root == (common.Hash{}) { root = types.EmptyRootHash } + //log.Info("state commit", "nodes", stringfyEpochMeta(nodes.FlattenEpochMeta())) //origin := s.originalRoot //if origin == (common.Hash{}) { // origin = types.EmptyRootHash @@ -1824,6 +1826,16 @@ func (s *StateDB) Commit(block uint64, failPostCommitFunc func(), postCommitFunc return root, diffLayer, nil } +func stringfyEpochMeta(meta map[common.Hash]map[string][]byte) string { + buf := bytes.NewBuffer(nil) + for hash, m := range meta { + for k, v := range m { + buf.WriteString(fmt.Sprintf("%v: %v|%v, ", hash, []byte(k), common.Bytes2Hex(v))) + } + } + return buf.String() +} + func (s *StateDB) SnapToDiffLayer() ([]common.Address, []types.DiffAccount, []types.DiffStorage) { destructs := make([]common.Address, 0, len(s.stateObjectsDestruct)) for account := range s.stateObjectsDestruct { diff --git a/trie/epochmeta/difflayer.go b/trie/epochmeta/difflayer.go index 1dc8f9dbe0..2838e8a9d0 100644 --- a/trie/epochmeta/difflayer.go +++ b/trie/epochmeta/difflayer.go @@ -17,6 +17,7 @@ const ( // MaxEpochMetaDiffDepth default is 128 layers MaxEpochMetaDiffDepth = 128 journalVersion uint64 = 1 + enableBloomFilter = false ) var ( @@ -95,7 +96,6 @@ func (h storageBloomHasher) Sum64() uint64 { binary.BigEndian.Uint64([]byte(h.path[bloomStorageHasherOffset:bloomStorageHasherOffset+8])) } -// TODO(0xbundler): add bloom filter? type diffLayer struct { blockNumber *big.Int blockRoot common.Hash @@ -114,23 +114,25 @@ func newEpochMetaDiffLayer(blockNumber *big.Int, blockRoot common.Hash, parent s nodeSet: nodeSet, } - switch p := parent.(type) { - case *diffLayer: - dl.origin = p.origin - dl.diffed, _ = p.diffed.Copy() - case *diskLayer: - dl.origin = p - dl.diffed, _ = bloomfilter.New(uint64(bloomSize), uint64(bloomFuncs)) - default: - panic("newEpochMetaDiffLayer got wrong snapshot type") - } - - // Iterate over all the accounts and storage metas and index them - for accountHash, metas := range dl.nodeSet { - for path := range metas { - dl.diffed.Add(storageBloomHasher{accountHash, path}) + if enableBloomFilter { + switch p := parent.(type) { + case *diffLayer: + dl.origin = p.origin + dl.diffed, _ = p.diffed.Copy() + case *diskLayer: + dl.origin = p + dl.diffed, _ = bloomfilter.New(uint64(bloomSize), uint64(bloomFuncs)) + default: + panic("newEpochMetaDiffLayer got wrong snapshot type") + } + // Iterate over all the accounts and storage metas and index them + for accountHash, metas := range dl.nodeSet { + for path := range metas { + dl.diffed.Add(storageBloomHasher{accountHash, path}) + } } } + return dl } @@ -141,7 +143,7 @@ func (s *diffLayer) Root() common.Hash { // EpochMeta find target epoch meta from diff layer or disk layer func (s *diffLayer) EpochMeta(addrHash common.Hash, path string) ([]byte, error) { // if the diff chain not contain the meta or staled, try get from disk layer - if !s.diffed.Contains(storageBloomHasher{addrHash, path}) { + if s.diffed != nil && !s.diffed.Contains(storageBloomHasher{addrHash, path}) { return s.origin.EpochMeta(addrHash, path) } diff --git a/trie/epochmeta/snapshot.go b/trie/epochmeta/snapshot.go index 89675b760a..71201f961d 100644 --- a/trie/epochmeta/snapshot.go +++ b/trie/epochmeta/snapshot.go @@ -142,7 +142,7 @@ func (s *SnapshotTree) Cap(blockRoot common.Hash) error { diff.resetParent(newDiskLayer) } } - log.Info("SnapshotTree cap", "layers", len(s.layers), "children", len(s.children), "flatten", len(flatten)) + log.Debug("epochmeta snap tree cap", "root", blockRoot, "layers", len(s.layers), "flatten", len(flatten)) return nil } @@ -172,6 +172,7 @@ func (s *SnapshotTree) Update(parentRoot common.Hash, blockNumber *big.Int, bloc s.layers[blockRoot] = snap s.children[parentRoot] = append(s.children[parentRoot], blockRoot) + log.Debug("epochmeta snap tree update", "root", blockRoot, "number", blockNumber, "layers", len(s.layers)) return nil } diff --git a/trie/trie.go b/trie/trie.go index 8ab262cc97..fe364aac0b 100644 --- a/trie/trie.go +++ b/trie/trie.go @@ -274,6 +274,7 @@ func (t *Trie) getWithEpoch(origNode node, key []byte, pos int, epoch types.Stat if updateEpoch { n.setEpoch(t.currentEpoch) } + n.flags = t.newFlag() didResolve = true } return value, n, didResolve, err @@ -288,6 +289,7 @@ func (t *Trie) getWithEpoch(origNode node, key []byte, pos int, epoch types.Stat if updateEpoch && newnode != nil { n.UpdateChildEpoch(int(key[pos]), t.currentEpoch) } + n.flags = t.newFlag() didResolve = true } return value, n, didResolve, err @@ -297,10 +299,8 @@ func (t *Trie) getWithEpoch(origNode node, key []byte, pos int, epoch types.Stat return nil, n, true, err } - if child, ok := child.(*fullNode); ok { - if err = t.resolveEpochMeta(child, epoch, key[:pos]); err != nil { - return nil, n, true, err - } + if err = t.resolveEpochMeta(child, epoch, key[:pos]); err != nil { + return nil, n, true, err } value, newnode, _, err := t.getWithEpoch(child, key, pos, epoch, updateEpoch) return value, newnode, true, err @@ -626,7 +626,6 @@ func (t *Trie) insertWithEpoch(n node, prefix, key []byte, value node, epoch typ return false, nil, err } branch.UpdateChildEpoch(int(key[matchlen]), t.currentEpoch) - branch.setEpoch(t.currentEpoch) // Replace this shortNode with the branch if it occurs at index 0. if matchlen == 0 { @@ -672,10 +671,8 @@ func (t *Trie) insertWithEpoch(n node, prefix, key []byte, value node, epoch typ return false, nil, err } - if child, ok := rn.(*fullNode); ok { - if err = t.resolveEpochMeta(child, epoch, prefix); err != nil { - return false, nil, err - } + if err = t.resolveEpochMeta(rn, epoch, prefix); err != nil { + return false, nil, err } dirty, nn, err := t.insertWithEpoch(rn, prefix, key, value, epoch) @@ -813,7 +810,7 @@ func (t *Trie) delete(n node, prefix, key []byte) (bool, node, error) { // shortNode{..., shortNode{...}}. Since the entry // might not be loaded yet, resolve it just for this // check. - cnode, err := t.resolve(n.Children[pos], append(prefix, byte(pos))) + cnode, err := t.resolve(n.Children[pos], append(prefix, byte(pos)), n.GetChildEpoch(pos)) if err != nil { return false, nil, err } @@ -885,7 +882,7 @@ func (t *Trie) deleteWithEpoch(n node, prefix, key []byte, epoch types.StateEpoc // subtrie must contain at least two other values with keys // longer than n.Key. dirty, child, err := t.deleteWithEpoch(n.Val, append(prefix, key[:len(n.Key)]...), key[len(n.Key):], epoch) - if !t.renewNode(epoch, dirty, true) || err != nil { + if !dirty || err != nil { return false, n, err } switch child := child.(type) { @@ -907,7 +904,7 @@ func (t *Trie) deleteWithEpoch(n node, prefix, key []byte, epoch types.StateEpoc case *fullNode: dirty, nn, err := t.deleteWithEpoch(n.Children[key[0]], append(prefix, key[0]), key[1:], n.GetChildEpoch(int(key[0]))) - if !t.renewNode(epoch, dirty, true) || err != nil { + if !dirty || err != nil { return false, n, err } n = n.copy() @@ -952,7 +949,7 @@ func (t *Trie) deleteWithEpoch(n node, prefix, key []byte, epoch types.StateEpoc // shortNode{..., shortNode{...}}. Since the entry // might not be loaded yet, resolve it just for this // check. - cnode, err := t.resolve(n.Children[pos], append(prefix, byte(pos))) + cnode, err := t.resolve(n.Children[pos], append(prefix, byte(pos)), n.GetChildEpoch(pos)) if err != nil { return false, nil, err } @@ -990,14 +987,12 @@ func (t *Trie) deleteWithEpoch(n node, prefix, key []byte, epoch types.StateEpoc return false, nil, err } - if child, ok := rn.(*fullNode); ok { - if err = t.resolveEpochMeta(child, epoch, prefix); err != nil { - return false, nil, err - } + if err = t.resolveEpochMeta(rn, epoch, prefix); err != nil { + return false, nil, err } dirty, nn, err := t.deleteWithEpoch(rn, prefix, key, epoch) - if !t.renewNode(epoch, dirty, true) || err != nil { + if !dirty || err != nil { return false, rn, err } return true, nn, nil @@ -1015,9 +1010,16 @@ func concat(s1 []byte, s2 ...byte) []byte { return r } -func (t *Trie) resolve(n node, prefix []byte) (node, error) { +func (t *Trie) resolve(n node, prefix []byte, epoch types.StateEpoch) (node, error) { if n, ok := n.(hashNode); ok { - return t.resolveAndTrack(n, prefix) + n, err := t.resolveAndTrack(n, prefix) + if err != nil { + return nil, err + } + if err = t.resolveEpochMeta(n, epoch, prefix); err != nil { + return nil, err + } + return n, nil } return n, nil } @@ -1055,10 +1057,6 @@ func (t *Trie) resolveEpochMeta(n node, epoch types.StateEpoch, prefix []byte) e return nil case *fullNode: n.setEpoch(epoch) - // TODO if parent's epoch <= 1, just set Epoch0, opt in startup hit more time epochmeta problem - //if epoch <= types.StateEpoch1 { - // return nil - //} meta, err := t.reader.epochMeta(prefix) if err != nil { return err @@ -1298,11 +1296,8 @@ func (t *Trie) tryRevive(n node, key []byte, targetPrefixKey []byte, nub MPTProo if err != nil { return nil, false, err } - - if child, ok := child.(*fullNode); ok { - if err = t.resolveEpochMeta(child, epoch, key[:pos]); err != nil { - return nil, false, err - } + if err = t.resolveEpochMeta(child, epoch, key[:pos]); err != nil { + return nil, false, err } newNode, _, err := t.tryRevive(child, key, targetPrefixKey, nub, pos, epoch, isExpired) @@ -1480,7 +1475,7 @@ func (t *Trie) findExpiredSubTree(n node, path []byte, epoch types.StateEpoch, p Hash: common.BytesToHash(n.flags.hash), } } - err := t.findExpiredSubTree(n.Val, append(path, n.Key...), epoch, pruner, stats, nil, false) + err := t.findExpiredSubTree(n.Val, append(path, n.Key...), epoch, pruner, stats, itemCh, findExpired) if err != nil { return err } @@ -1489,10 +1484,15 @@ func (t *Trie) findExpiredSubTree(n node, path []byte, epoch types.StateEpoch, p if stats != nil { stats.Add(1) } + if !findExpired { + itemCh <- &NodeInfo{ + Hash: common.BytesToHash(n.flags.hash), + } + } var err error // Go through every child and recursively delete expired nodes for i, child := range n.Children { - err = t.findExpiredSubTree(child, append(path, byte(i)), n.GetChildEpoch(i), pruner, stats, nil, false) + err = t.findExpiredSubTree(child, append(path, byte(i)), n.GetChildEpoch(i), pruner, stats, itemCh, findExpired) if err != nil { return err } @@ -1511,7 +1511,7 @@ func (t *Trie) findExpiredSubTree(n node, path []byte, epoch types.StateEpoch, p return err } - return t.findExpiredSubTree(resolve, path, epoch, pruner, stats, nil, false) + return t.findExpiredSubTree(resolve, path, epoch, pruner, stats, itemCh, findExpired) case valueNode: return nil case nil: diff --git a/trie/trie_expiry.go b/trie/trie_expiry.go index 1dd6f098a1..c284e4b6ae 100644 --- a/trie/trie_expiry.go +++ b/trie/trie_expiry.go @@ -38,6 +38,7 @@ func (t *Trie) tryLocalRevive(origNode node, key []byte, pos int, epoch types.St n = n.copy() n.Val = newnode n.setEpoch(t.currentEpoch) + n.flags = t.newFlag() didResolve = true } return value, n, didResolve, err @@ -50,6 +51,7 @@ func (t *Trie) tryLocalRevive(origNode node, key []byte, pos int, epoch types.St if newnode != nil { n.UpdateChildEpoch(int(key[pos]), t.currentEpoch) } + n.flags = t.newFlag() didResolve = true } return value, n, didResolve, err @@ -59,10 +61,8 @@ func (t *Trie) tryLocalRevive(origNode node, key []byte, pos int, epoch types.St return nil, n, true, err } - if child, ok := child.(*fullNode); ok { - if err = t.resolveEpochMeta(child, epoch, key[:pos]); err != nil { - return nil, n, true, err - } + if err = t.resolveEpochMeta(child, epoch, key[:pos]); err != nil { + return nil, n, true, err } value, newnode, _, err := t.tryLocalRevive(child, key, pos, epoch) return value, newnode, true, err diff --git a/trie/trie_reader.go b/trie/trie_reader.go index e0d7b2bf3e..c1b9424765 100644 --- a/trie/trie_reader.go +++ b/trie/trie_reader.go @@ -22,9 +22,17 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/trie/epochmeta" "github.com/ethereum/go-ethereum/trie/triestate" "math/big" + "time" +) + +var ( + accountMetaTimer = metrics.NewRegisteredTimer("trie/reader/accountmeta", nil) + epochMetaTimer = metrics.NewRegisteredTimer("trie/reader/epochmeta", nil) + nodeTimer = metrics.NewRegisteredTimer("trie/reader/node", nil) ) // Reader wraps the Node method of a backing trie store. @@ -90,6 +98,9 @@ func newEmptyReader() *trieReader { // information. An MissingNodeError will be returned in case the node is // not found or any error is encountered. func (r *trieReader) node(path []byte, hash common.Hash) ([]byte, error) { + defer func(start time.Time) { + nodeTimer.Update(time.Since(start)) + }(time.Now()) // Perform the logics in tests for preventing trie node access. if r.banned != nil { if _, ok := r.banned[string(path)]; ok { @@ -123,6 +134,9 @@ func (l *trieLoader) OpenStorageTrie(stateRoot common.Hash, addrHash, root commo // epochMeta resolve from epoch meta storage func (r *trieReader) epochMeta(path []byte) (*epochmeta.BranchNodeEpochMeta, error) { + defer func(start time.Time) { + epochMetaTimer.Update(time.Since(start)) + }(time.Now()) if r.emReader == nil { return nil, fmt.Errorf("cannot resolve epochmeta without db, path: %#x", path) } @@ -144,6 +158,9 @@ func (r *trieReader) epochMeta(path []byte) (*epochmeta.BranchNodeEpochMeta, err // accountMeta resolve account metadata func (r *trieReader) accountMeta() (types.MetaNoConsensus, error) { + defer func(start time.Time) { + accountMetaTimer.Update(time.Since(start)) + }(time.Now()) if r.emReader == nil { return types.EmptyMetaNoConsensus, errors.New("cannot resolve epoch meta without db for account") } diff --git a/trie/triedb/pathdb/layertree.go b/trie/triedb/pathdb/layertree.go index d314779910..f7fc4930bc 100644 --- a/trie/triedb/pathdb/layertree.go +++ b/trie/triedb/pathdb/layertree.go @@ -19,6 +19,7 @@ package pathdb import ( "errors" "fmt" + "github.com/ethereum/go-ethereum/log" "sync" "github.com/ethereum/go-ethereum/common" @@ -105,6 +106,7 @@ func (tree *layerTree) add(root common.Hash, parentRoot common.Hash, block uint6 tree.lock.Lock() tree.layers[l.rootHash()] = l + log.Debug("pathdb snap tree update", "root", root, "number", block, "layers", len(tree.layers)) tree.lock.Unlock() return nil } @@ -190,6 +192,7 @@ func (tree *layerTree) cap(root common.Hash, layers int) error { remove(root) } } + log.Debug("pathdb snap tree cap", "root", root, "layers", len(tree.layers)) return nil }