Skip to content

Commit

Permalink
core/state: subfetcher revive state in batch mode
Browse files Browse the repository at this point in the history
  • Loading branch information
asyukii committed Sep 27, 2023
1 parent 2d82109 commit edbb0bc
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 6 deletions.
71 changes: 71 additions & 0 deletions core/state/state_expiry.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,77 @@ func fetchExpiredStorageFromRemote(fullDB ethdb.FullStateDB, stateRoot common.Ha
return reviveStorageTrie(addr, tr, proofs[0], key)
}

// batchFetchExpiredStorageFromRemote request expired state from remote full state node with a list of keys and prefixes.
func batchFetchExpiredFromRemote(fullDB ethdb.FullStateDB, stateRoot common.Hash, addr common.Address, root common.Hash, tr Trie, prefixKeys [][]byte, keys []common.Hash) ([]map[string][]byte, error) {

ret := make([]map[string][]byte, len(keys))
prefixKeysStr := make([]string, len(prefixKeys))
keysStr := make([]string, len(keys))

if EnableLocalRevive {
var expiredKeys []common.Hash
var expiredPrefixKeys [][]byte
for i, key := range keys {
val, err := tr.TryLocalRevive(addr, key.Bytes())
log.Debug("fetchExpiredStorageFromRemote TryLocalRevive", "addr", addr, "key", key, "val", val, "err", err)
if _, ok := err.(*trie.MissingNodeError); !ok {
return nil, err
}
switch err.(type) {
case *trie.MissingNodeError:
expiredKeys = append(expiredKeys, key)
expiredPrefixKeys = append(expiredPrefixKeys, prefixKeys[i])
case nil:
kv := make(map[string][]byte, 1)
kv[key.String()] = val
ret = append(ret, kv)
default:
return nil, err
}
}

for i, prefix := range expiredPrefixKeys {
prefixKeysStr[i] = common.Bytes2Hex(prefix)
}
for i, key := range expiredKeys {
keysStr[i] = common.Bytes2Hex(key[:])
}

} else {
for i, prefix := range prefixKeys {
prefixKeysStr[i] = common.Bytes2Hex(prefix)
}

for i, key := range keys {
keysStr[i] = common.Bytes2Hex(key[:])
}
}

// cannot revive locally, fetch remote proof
proofs, err := fullDB.GetStorageReviveProof(stateRoot, addr, root, prefixKeysStr, keysStr)
log.Debug("fetchExpiredStorageFromRemote GetStorageReviveProof", "addr", addr, "keys", keysStr, "prefixKeys", prefixKeysStr, "proofs", len(proofs), "err", err)
if err != nil {
return nil, err
}

if len(proofs) == 0 {
log.Error("cannot find any revive proof from remoteDB", "addr", addr, "keys", keysStr, "prefixKeys", prefixKeysStr)
return nil, fmt.Errorf("cannot find any revive proof from remoteDB")
}

for i, proof := range proofs {
// kvs, err := reviveStorageTrie(addr, tr, proof, common.HexToHash(keysStr[i])) // TODO(asyukii): this logically should work but it doesn't because of some reason, will need to investigate
kvs, err := reviveStorageTrie(addr, tr, proof, common.HexToHash(proof.Key))
if err != nil {
log.Error("reviveStorageTrie failed", "addr", addr, "key", keys[i], "err", err)
continue
}
ret = append(ret, kvs)
}

return ret, nil
}

// reviveStorageTrie revive trie's expired state from proof
func reviveStorageTrie(addr common.Address, tr Trie, proof types.ReviveStorageProof, targetKey common.Hash) (map[string][]byte, error) {
defer func(start time.Time) {
Expand Down
2 changes: 1 addition & 1 deletion core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ func (s *stateObject) updateTrie() (Trie, error) {
continue
}
if _, err = fetchExpiredStorageFromRemote(s.db.fullStateDB, s.db.originalRoot, s.address, s.data.Root, tr, enErr.Path, key); err != nil {
s.db.setError(fmt.Errorf("state object pendingFutureReviveState fetchExpiredStorageFromRemote err, contract: %v, key: %v, err: %v", s.address, key, err))
s.db.setError(fmt.Errorf("state object pendingFutureReviveState fetchExpiredStorageFromRemote err, contract: %v, key: %v, path: %v, err: %v", s.address, key, enErr.Path, err))
}
}
}
Expand Down
16 changes: 11 additions & 5 deletions core/state/trie_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,8 @@ func (sf *subfetcher) loop() {
sf.tasks = nil
sf.lock.Unlock()

reviveKeys := make([]common.Hash, 0, len(tasks))
revivePaths := make([][]byte, 0, len(tasks))
// Prefetch any tasks until the loop is interrupted
for i, task := range tasks {
select {
Expand All @@ -587,11 +589,8 @@ func (sf *subfetcher) loop() {
// handle expired state
if sf.enableStateExpiry {
if exErr, match := err.(*trie2.ExpiredNodeError); match {
key := common.BytesToHash(task)
_, err = fetchExpiredStorageFromRemote(sf.fullStateDB, sf.state, sf.addr, sf.root, sf.trie, exErr.Path, key)
if err != nil {
log.Error("subfetcher fetchExpiredStorageFromRemote err", "addr", sf.addr, "path", exErr.Path, "err", err)
}
reviveKeys = append(reviveKeys, common.BytesToHash(task))
revivePaths = append(revivePaths, exErr.Path)
}
}
}
Expand All @@ -601,6 +600,13 @@ func (sf *subfetcher) loop() {
}
}

if len(reviveKeys) != 0 {
_, err = batchFetchExpiredFromRemote(sf.fullStateDB, sf.state, sf.addr, sf.root, sf.trie, revivePaths, reviveKeys)
if err != nil {
log.Error("subfetcher batchFetchExpiredFromRemote err", "addr", sf.addr, "state", sf.state, "revivePaths", revivePaths, "reviveKeys", reviveKeys, "err", err)
}
}

case ch := <-sf.copy:
// Somebody wants a copy of the current trie, grant them
ch <- sf.db.CopyTrie(sf.trie)
Expand Down

0 comments on commit edbb0bc

Please sign in to comment.