Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/state, trie, api: batch mode revive, remoteDB cache, unit tests #120

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion cmd/geth/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/ethereum/go-ethereum/accounts/scwallet"
"github.com/ethereum/go-ethereum/accounts/usbwallet"
"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/internal/flags"
Expand Down Expand Up @@ -272,8 +273,14 @@ func applyMetricConfig(ctx *cli.Context, cfg *gethConfig) {
}

func applyStateExpiryConfig(ctx *cli.Context, cfg *gethConfig) {

if ctx.IsSet(utils.StateExpiryEnableFlag.Name) {
cfg.Eth.StateExpiryEnable = ctx.Bool(utils.StateExpiryEnableFlag.Name)
enableStateExpiry := ctx.Bool(utils.StateExpiryEnableFlag.Name)
if enableStateExpiry && ctx.IsSet(utils.StateSchemeFlag.Name) && ctx.String(utils.StateSchemeFlag.Name) == rawdb.HashScheme {
log.Warn("State expiry is not supported with hash scheme. Disabling state expiry")
enableStateExpiry = false
}
cfg.Eth.StateExpiryEnable = enableStateExpiry
}
if ctx.IsSet(utils.StateExpiryFullStateEndpointFlag.Name) {
cfg.Eth.StateExpiryFullStateEndpoint = ctx.String(utils.StateExpiryFullStateEndpointFlag.Name)
Expand Down
71 changes: 71 additions & 0 deletions core/state/state_expiry.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,77 @@ func fetchExpiredStorageFromRemote(fullDB ethdb.FullStateDB, stateRoot common.Ha
return reviveStorageTrie(addr, tr, proofs[0], key)
}

// batchFetchExpiredStorageFromRemote request expired state from remote full state node with a list of keys and prefixes.
func batchFetchExpiredFromRemote(fullDB ethdb.FullStateDB, stateRoot common.Hash, addr common.Address, root common.Hash, tr Trie, prefixKeys [][]byte, keys []common.Hash) ([]map[string][]byte, error) {

ret := make([]map[string][]byte, len(keys))
prefixKeysStr := make([]string, len(prefixKeys))
keysStr := make([]string, len(keys))

if EnableLocalRevive {
var expiredKeys []common.Hash
var expiredPrefixKeys [][]byte
for i, key := range keys {
val, err := tr.TryLocalRevive(addr, key.Bytes())
log.Debug("fetchExpiredStorageFromRemote TryLocalRevive", "addr", addr, "key", key, "val", val, "err", err)
if _, ok := err.(*trie.MissingNodeError); !ok {
return nil, err
}
switch err.(type) {
case *trie.MissingNodeError:
expiredKeys = append(expiredKeys, key)
expiredPrefixKeys = append(expiredPrefixKeys, prefixKeys[i])
case nil:
kv := make(map[string][]byte, 1)
kv[key.String()] = val
ret = append(ret, kv)
default:
return nil, err
}
}

for i, prefix := range expiredPrefixKeys {
prefixKeysStr[i] = common.Bytes2Hex(prefix)
}
for i, key := range expiredKeys {
keysStr[i] = common.Bytes2Hex(key[:])
}

} else {
for i, prefix := range prefixKeys {
prefixKeysStr[i] = common.Bytes2Hex(prefix)
}

for i, key := range keys {
keysStr[i] = common.Bytes2Hex(key[:])
}
}

// cannot revive locally, fetch remote proof
proofs, err := fullDB.GetStorageReviveProof(stateRoot, addr, root, prefixKeysStr, keysStr)
log.Debug("fetchExpiredStorageFromRemote GetStorageReviveProof", "addr", addr, "keys", keysStr, "prefixKeys", prefixKeysStr, "proofs", len(proofs), "err", err)
if err != nil {
return nil, err
}

if len(proofs) == 0 {
log.Error("cannot find any revive proof from remoteDB", "addr", addr, "keys", keysStr, "prefixKeys", prefixKeysStr)
return nil, fmt.Errorf("cannot find any revive proof from remoteDB")
}

for i, proof := range proofs {
// kvs, err := reviveStorageTrie(addr, tr, proof, common.HexToHash(keysStr[i])) // TODO(asyukii): this logically should work but it doesn't because of some reason, will need to investigate
kvs, err := reviveStorageTrie(addr, tr, proof, common.HexToHash(proof.Key))
if err != nil {
log.Error("reviveStorageTrie failed", "addr", addr, "key", keys[i], "err", err)
continue
}
ret = append(ret, kvs)
}

return ret, nil
}

// reviveStorageTrie revive trie's expired state from proof
func reviveStorageTrie(addr common.Address, tr Trie, proof types.ReviveStorageProof, targetKey common.Hash) (map[string][]byte, error) {
defer func(start time.Time) {
Expand Down
2 changes: 1 addition & 1 deletion core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ func (s *stateObject) updateTrie() (Trie, error) {
continue
}
if _, err = fetchExpiredStorageFromRemote(s.db.fullStateDB, s.db.originalRoot, s.address, s.data.Root, tr, enErr.Path, key); err != nil {
s.db.setError(fmt.Errorf("state object pendingFutureReviveState fetchExpiredStorageFromRemote err, contract: %v, key: %v, err: %v", s.address, key, err))
s.db.setError(fmt.Errorf("state object pendingFutureReviveState fetchExpiredStorageFromRemote err, contract: %v, key: %v, path: %v, err: %v", s.address, key, enErr.Path, err))
}
}
}
Expand Down
16 changes: 11 additions & 5 deletions core/state/trie_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,8 @@ func (sf *subfetcher) loop() {
sf.tasks = nil
sf.lock.Unlock()

reviveKeys := make([]common.Hash, 0, len(tasks))
revivePaths := make([][]byte, 0, len(tasks))
// Prefetch any tasks until the loop is interrupted
for i, task := range tasks {
select {
Expand All @@ -587,11 +589,8 @@ func (sf *subfetcher) loop() {
// handle expired state
if sf.enableStateExpiry {
if exErr, match := err.(*trie2.ExpiredNodeError); match {
key := common.BytesToHash(task)
_, err = fetchExpiredStorageFromRemote(sf.fullStateDB, sf.state, sf.addr, sf.root, sf.trie, exErr.Path, key)
if err != nil {
log.Error("subfetcher fetchExpiredStorageFromRemote err", "addr", sf.addr, "path", exErr.Path, "err", err)
}
reviveKeys = append(reviveKeys, common.BytesToHash(task))
revivePaths = append(revivePaths, exErr.Path)
}
}
}
Expand All @@ -601,6 +600,13 @@ func (sf *subfetcher) loop() {
}
}

if len(reviveKeys) != 0 {
_, err = batchFetchExpiredFromRemote(sf.fullStateDB, sf.state, sf.addr, sf.root, sf.trie, revivePaths, reviveKeys)
if err != nil {
log.Error("subfetcher batchFetchExpiredFromRemote err", "addr", sf.addr, "state", sf.state, "revivePaths", revivePaths, "reviveKeys", reviveKeys, "err", err)
}
}

case ch := <-sf.copy:
// Somebody wants a copy of the current trie, grant them
ch <- sf.db.CopyTrie(sf.trie)
Expand Down
6 changes: 3 additions & 3 deletions ethdb/fullstatedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (f *FullStateRPCServer) GetStorageReviveProof(stateRoot common.Hash, accoun
uncachedKeys := make([]string, 0, len(keys))
ret := make([]types.ReviveStorageProof, 0, len(keys))
for i, key := range keys {
val, ok := f.cache.Get(proofCacheKey(account, root, prefixKeys[i], key))
val, ok := f.cache.Get(ProofCacheKey(account, root, prefixKeys[i], key))
log.Debug("GetStorageReviveProof hit cache", "account", account, "key", key, "ok", ok)
if !ok {
uncachedPrefixKeys = append(uncachedPrefixKeys, prefixKeys[i])
Expand All @@ -98,14 +98,14 @@ func (f *FullStateRPCServer) GetStorageReviveProof(stateRoot common.Hash, accoun

// add to cache
for _, proof := range proofs {
f.cache.Add(proofCacheKey(account, root, proof.PrefixKey, proof.Key), proof)
f.cache.Add(ProofCacheKey(account, root, proof.PrefixKey, proof.Key), proof)
}

ret = append(ret, proofs...)
return ret, err
}

func proofCacheKey(account common.Address, root common.Hash, prefix, key string) string {
func ProofCacheKey(account common.Address, root common.Hash, prefix, key string) string {
buf := bytes.NewBuffer(make([]byte, 0, 67+len(prefix)+len(key)))
buf.Write(account[:])
buf.WriteByte('$')
Expand Down
28 changes: 25 additions & 3 deletions internal/ethapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,21 @@ import (
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth/tracers/logger"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/trie"
lru "github.com/hashicorp/golang-lru"
"github.com/tyler-smith/go-bip39"
)

const UnHealthyTimeout = 5 * time.Second
const (
UnHealthyTimeout = 5 * time.Second
APICache = 10000
)

// max is a helper function which returns the larger of the two given integers.
func max(a, b int64) int64 {
Expand Down Expand Up @@ -624,12 +629,20 @@ func (s *PersonalAccountAPI) Unpair(ctx context.Context, url string, pin string)

// BlockChainAPI provides an API to access Ethereum blockchain data.
type BlockChainAPI struct {
b Backend
b Backend
cache *lru.Cache
}

// NewBlockChainAPI creates a new Ethereum blockchain API.
func NewBlockChainAPI(b Backend) *BlockChainAPI {
return &BlockChainAPI{b}
cache, err := lru.New(APICache)
if err != nil {
return nil
}
return &BlockChainAPI{
b: b,
cache: cache,
}
}

// ChainId is the EIP-155 replay-protection chain id for the current Ethereum chain config.
Expand Down Expand Up @@ -845,6 +858,14 @@ func (s *BlockChainAPI) GetStorageReviveProof(ctx context.Context, stateRoot com

var proof proofList
prefixKey := prefixKeys[i]

// Check if request has been cached
val, ok := s.cache.Get(ethdb.ProofCacheKey(address, root, storagePrefixKeys[i], storageKeys[i]))
if ok {
storageProof[i] = val.(types.ReviveStorageProof)
continue
}

if err := storageTrie.ProveByPath(crypto.Keccak256(key.Bytes()), prefixKey, &proof); err != nil {
return nil, err
}
Expand All @@ -853,6 +874,7 @@ func (s *BlockChainAPI) GetStorageReviveProof(ctx context.Context, stateRoot com
PrefixKey: storagePrefixKeys[i],
Proof: proof,
}
s.cache.Add(ethdb.ProofCacheKey(address, root, storagePrefixKeys[i], storageKeys[i]), storageProof[i])
}

return &types.ReviveResult{
Expand Down
132 changes: 130 additions & 2 deletions trie/trie_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1023,7 +1023,8 @@ func TestRevive(t *testing.T) {
assert.NoError(t, err, "TryRevive failed, key %x, prefixKey %x, val %x", key, prefixKey, val)

// Verifiy value exists after revive
v := trie.MustGet(key)
v, err := trie.Get(key)
assert.NoError(t, err, "Get failed, key %x, prefixKey %x, val %x", key, prefixKey, val)
assert.Equal(t, val, v, "value mismatch, got %x, exp %x, key %x, prefixKey %x", v, val, key, prefixKey)

// Verify root hash
Expand Down Expand Up @@ -1066,7 +1067,8 @@ func TestReviveCustom(t *testing.T) {
_, err = trie.TryRevive(key, proofCache.cacheNubs)
assert.NoError(t, err, "TryRevive failed, key %x, prefixKey %x, val %x", key, prefixKey, val)

res := trie.MustGet(key)
res, err := trie.Get(key)
assert.NoError(t, err, "Get failed, key %x, prefixKey %x, val %x", key, prefixKey, val)
assert.Equal(t, val, res, "value mismatch, got %x, exp %x, key %x, prefixKey %x", res, val, key, prefixKey)

// Verify root hash
Expand All @@ -1079,6 +1081,132 @@ func TestReviveCustom(t *testing.T) {
}
}

// TestReviveBadProof tests that a trie cannot be revived from a bad proof
func TestReviveBadProof(t *testing.T) {

dataA := map[string]string{
"abcd": "A", "abce": "B", "abde": "C", "abdf": "D",
"defg": "E", "defh": "F", "degh": "G", "degi": "H",
}

dataB := map[string]string{
"qwer": "A", "qwet": "B", "qwrt": "C", "qwry": "D",
"abcd": "E", "abce": "F", "abde": "G", "abdf": "H",
}

trieA := createCustomTrie(dataA, 0)
trieB := createCustomTrie(dataB, 0)

var proofB proofList

err := trieB.ProveByPath([]byte("abcd"), nil, &proofB)
assert.NoError(t, err)

// Expire trie A
trieA.ExpireByPrefix(nil)

// Construct MPTProofCache
proofCache := makeRawMPTProofCache(nil, proofB)

// VerifyProof
err = proofCache.VerifyProof()
assert.NoError(t, err)

// Revive trie
_, err = trieA.TryRevive([]byte("abcd"), proofCache.cacheNubs)
assert.Error(t, err)

// Verify value does exists after revive
val, err := trieA.Get([]byte("abcd"))
assert.NoError(t, err, "Get failed, key %x, val %x", []byte("abcd"), val)
assert.NotEqual(t, []byte("A"), val)
}

// TestReviveBadProofAfterUpdate tests that after reviving a path and
// then update the value, old proof should be invalid
func TestReviveBadProofAfterUpdate(t *testing.T) {
trie, vals := nonRandomTrieWithExpiry(100)

for _, kv := range vals {
key := kv.k
val := kv.v
prefixKeys := getFullNodePrefixKeys(trie, key)
for _, prefixKey := range prefixKeys {
// Generate proof
var proof proofList
err := trie.ProveByPath(key, prefixKey, &proof)
assert.NoError(t, err)

// Expire trie
trie.ExpireByPrefix(prefixKey)

proofCache := makeRawMPTProofCache(prefixKey, proof)
err = proofCache.VerifyProof()
assert.NoError(t, err)

// Revive trie
_, err = trie.TryRevive(key, proofCache.CacheNubs())
assert.NoError(t, err, "TryRevive failed, key %x, prefixKey %x, val %x", key, prefixKey, val)

// Verify value exists after revive
v, err := trie.Get(key)
assert.NoError(t, err, "Get failed, key %x, prefixKey %x, val %x", key, prefixKey, val)
assert.Equal(t, val, v, "value mismatch, got %x, exp %x, key %x, prefixKey %x", v, val, key, prefixKey)

trie.Update(key, []byte("new value"))
v, err = trie.Get(key)
assert.NoError(t, err, "Get failed, key %x, prefixKey %x, val %x", key, prefixKey, val)
assert.Equal(t, []byte("new value"), v, "value mismatch, got %x, exp %x, key %x, prefixKey %x", v, val, key, prefixKey)

_, err = trie.TryRevive(key, proofCache.CacheNubs())
assert.NoError(t, err, "TryRevive failed, key %x, prefixKey %x, val %x", key, prefixKey, val)

v, err = trie.Get(key)
assert.NoError(t, err, "Get failed, key %x, prefixKey %x, val %x", key, prefixKey, val)
assert.Equal(t, []byte("new value"), v, "value mismatch, got %x, exp %x, key %x, prefixKey %x", v, val, key, prefixKey)

// Reset trie
trie, _ = nonRandomTrieWithExpiry(100)
}
}
}

func TestPartialReviveFullProof(t *testing.T) {
data := map[string]string{
"abcd": "A", "abce": "B", "abde": "C", "abdf": "D",
"defg": "E", "defh": "F", "degh": "G", "degi": "H",
}

trie := createCustomTrie(data, 10)
key := []byte("abcd")
val := []byte("A")

// Get proof
var proof proofList
err := trie.ProveByPath(key, nil, &proof)
assert.NoError(t, err)

// Expire trie
err = trie.ExpireByPrefix([]byte{6, 1})
assert.NoError(t, err)

// Construct MPTProofCache
proofCache := makeRawMPTProofCache(nil, proof)

// Verify proof
err = proofCache.VerifyProof()
assert.NoError(t, err)

// Revive trie
_, err = trie.TryRevive(key, proofCache.cacheNubs)
assert.NoError(t, err)

// Validate trie
resVal, err := trie.Get(key)
assert.NoError(t, err)
assert.Equal(t, val, resVal)
}

func createCustomTrie(data map[string]string, epoch types.StateEpoch) *Trie {
db := NewDatabase(rawdb.NewMemoryDatabase(), nil)
trie := NewEmpty(db)
Expand Down