Skip to content

Commit

Permalink
minor changes/fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
i-norden committed May 21, 2020
1 parent 7ba6efa commit 97454eb
Show file tree
Hide file tree
Showing 11 changed files with 129 additions and 184 deletions.
81 changes: 31 additions & 50 deletions statediff/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@ import (
"bytes"
"fmt"

"github.com/ethereum/go-ethereum/core/types"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
Expand Down Expand Up @@ -154,14 +153,14 @@ func (sdb *builder) buildStateDiffWithIntermediateStateNodes(args Args, params P
// collect a slice of all the intermediate nodes that were touched and exist at B
// a map of their leafkey to all the accounts that were touched and exist at B
// and a slice of all the paths for the nodes in both of the above sets
createdOrUpdatedIntermediateNodes, diffAccountsAtB, diffPathsAtB, err := sdb.createdAndUpdatedState(oldTrie.NodeIterator([]byte{}), newTrie.NodeIterator([]byte{}))
createdOrUpdatedIntermediateNodes, diffAccountsAtB, diffPathsAtB, err := sdb.createdAndUpdatedStateWithIntermediateNodes(oldTrie.NodeIterator([]byte{}), newTrie.NodeIterator([]byte{}))
if err != nil {
return StateObject{}, fmt.Errorf("error collecting createdAndUpdatedNodes: %v", err)
}

// collect a slice of all the nodes that existed at a path in A that doesn't exist in B
// a map of their leafkey to all the accounts that were touched and exist at A
deletedNodes, diffAccountsAtA, err := sdb.deletedOrUpdatedState(oldTrie.NodeIterator([]byte{}), newTrie.NodeIterator([]byte{}), diffPathsAtB)
emptiedPaths, diffAccountsAtA, err := sdb.deletedOrUpdatedState(oldTrie.NodeIterator([]byte{}), newTrie.NodeIterator([]byte{}), diffPathsAtB)
if err != nil {
return StateObject{}, fmt.Errorf("error collecting deletedOrUpdatedNodes: %v", err)
}
Expand Down Expand Up @@ -191,7 +190,7 @@ func (sdb *builder) buildStateDiffWithIntermediateStateNodes(args Args, params P
return StateObject{
BlockNumber: args.BlockNumber,
BlockHash: args.BlockHash,
Nodes: append(append(append(updatedAccounts, createdAccounts...), createdOrUpdatedIntermediateNodes...), deletedNodes...),
Nodes: append(append(append(updatedAccounts, createdAccounts...), createdOrUpdatedIntermediateNodes...), emptiedPaths...),
}, nil
}

Expand All @@ -207,13 +206,15 @@ func (sdb *builder) buildStateDiffWithoutIntermediateStateNodes(args Args, param
}

// collect a map of their leafkey to all the accounts that were touched and exist at B
diffAccountsAtB, err := sdb.collectDiffAccounts(oldTrie.NodeIterator([]byte{}), newTrie.NodeIterator([]byte{}), params.WatchedAddresses)
// and a slice of all the paths for the nodes in both of the above sets
diffAccountsAtB, diffPathsAtB, err := sdb.createdAndUpdatedState(oldTrie.NodeIterator([]byte{}), newTrie.NodeIterator([]byte{}), params.WatchedAddresses)
if err != nil {
return StateObject{}, fmt.Errorf("error collecting createdAndUpdatedNodes: %v", err)
}

// collect a map of their leafkey to all the accounts that were touched and exist at A
diffAccountsAtA, err := sdb.collectDiffAccounts(newTrie.NodeIterator([]byte{}), oldTrie.NodeIterator([]byte{}), params.WatchedAddresses)
// collect a slice of all the nodes that existed at a path in A that doesn't exist in B
// a map of their leafkey to all the accounts that were touched and exist at A
emptiedPaths, diffAccountsAtA, err := sdb.deletedOrUpdatedState(oldTrie.NodeIterator([]byte{}), newTrie.NodeIterator([]byte{}), diffPathsAtB)
if err != nil {
return StateObject{}, fmt.Errorf("error collecting deletedOrUpdatedNodes: %v", err)
}
Expand All @@ -238,23 +239,20 @@ func (sdb *builder) buildStateDiffWithoutIntermediateStateNodes(args Args, param
if err != nil {
return StateObject{}, fmt.Errorf("error building diff for created accounts: %v", err)
}
// build the diff nodes for deleted accounts
deletedAccounts, err := sdb.buildAccountDeletions(diffAccountsAtA)
if err != nil {
return StateObject{}, fmt.Errorf("error building diff for deleted accounts: %v", err)
}

// assemble all of the nodes into the statediff object
return StateObject{
BlockNumber: args.BlockNumber,
BlockHash: args.BlockHash,
Nodes: append(append(updatedAccounts, createdAccounts...), deletedAccounts...),
Nodes: append(append(updatedAccounts, createdAccounts...), emptiedPaths...),
}, nil
}

// collectDiffAccounts returns a mapping of their leafkeys to all the accounts that exist in a different state at B than A
// restricts the returned set to the watchedAddresses if any are provided
func (sdb *builder) collectDiffAccounts(a, b trie.NodeIterator, watchedAddresses []common.Address) (AccountMap, error) {
// createdAndUpdatedState returns
// a mapping of their leafkeys to all the accounts that exist in a different state at B than A
// and a slice of the paths for all of the nodes included in both
func (sdb *builder) createdAndUpdatedState(a, b trie.NodeIterator, watchedAddresses []common.Address) (AccountMap, map[string]bool, error) {
diffPathsAtB := make(map[string]bool)
diffAcountsAtB := make(AccountMap)
it, _ := trie.NewDifferenceIterator(a, b)
for it.Next(true) {
Expand All @@ -269,23 +267,22 @@ func (sdb *builder) collectDiffAccounts(a, b trie.NodeIterator, watchedAddresses
copy(nodePath, it.Path())
node, err := sdb.stateCache.TrieDB().Node(it.Hash())
if err != nil {
return nil, err
return nil, nil, err
}
var nodeElements []interface{}
if err := rlp.DecodeBytes(node, &nodeElements); err != nil {
return nil, err
return nil, nil, err
}
ty, err := CheckKeyType(nodeElements)
if err != nil {
return nil, err
return nil, nil, err
}
switch ty {
case Leaf:
if ty == Leaf {
// created vs updated is important for leaf nodes since we need to diff their storage
// so we need to map all changed accounts at B to their leafkey, since account can change pathes but not leafkey
var account state.Account
if err := rlp.DecodeBytes(nodeElements[1].([]byte), &account); err != nil {
return nil, fmt.Errorf("error decoding account for leaf node at path %x nerror: %v", nodePath, err)
return nil, nil, fmt.Errorf("error decoding account for leaf node at path %x nerror: %v", nodePath, err)
}
partialPath := trie.CompactToHex(nodeElements[0].([]byte))
valueNodePath := append(nodePath, partialPath...)
Expand All @@ -300,19 +297,18 @@ func (sdb *builder) collectDiffAccounts(a, b trie.NodeIterator, watchedAddresses
Account: &account,
}
}
case Extension, Branch:
// fall through to next iteration
default:
return nil, fmt.Errorf("unexpected node type %s", ty)
}
// add both intermediate and leaf node paths to the list of diffPathsAtB
diffPathsAtB[common.Bytes2Hex(nodePath)] = true
}
return diffAcountsAtB, nil
return diffAcountsAtB, diffPathsAtB, nil
}

// createdAndUpdatedState returns a slice of all the intermediate nodes that exist in a different state at B than A
// createdAndUpdatedStateWithIntermediateNodes returns
// a slice of all the intermediate nodes that exist in a different state at B than A
// a mapping of their leafkeys to all the accounts that exist in a different state at B than A
// and a slice of the paths for all of the nodes included in both
func (sdb *builder) createdAndUpdatedState(a, b trie.NodeIterator) ([]StateNode, AccountMap, map[string]bool, error) {
func (sdb *builder) createdAndUpdatedStateWithIntermediateNodes(a, b trie.NodeIterator) ([]StateNode, AccountMap, map[string]bool, error) {
createdOrUpdatedIntermediateNodes := make([]StateNode, 0)
diffPathsAtB := make(map[string]bool)
diffAcountsAtB := make(AccountMap)
Expand Down Expand Up @@ -375,10 +371,10 @@ func (sdb *builder) createdAndUpdatedState(a, b trie.NodeIterator) ([]StateNode,
return createdOrUpdatedIntermediateNodes, diffAcountsAtB, diffPathsAtB, nil
}

// deletedOrUpdatedState returns a slice of all the nodes that exist at A but not at B
// deletedOrUpdatedState returns a slice of all the pathes that are emptied at B
// and a mapping of their leafkeys to all the accounts that exist in a different state at A than B
func (sdb *builder) deletedOrUpdatedState(a, b trie.NodeIterator, diffPathsAtB map[string]bool) ([]StateNode, AccountMap, error) {
deletedNodes := make([]StateNode, 0)
emptiedPaths := make([]StateNode, 0)
diffAccountAtA := make(AccountMap)
it, _ := trie.NewDifferenceIterator(b, a)
for it.Next(true) {
Expand All @@ -395,10 +391,10 @@ func (sdb *builder) deletedOrUpdatedState(a, b trie.NodeIterator, diffPathsAtB m
// that means the node at this path was deleted (or moved) in B
// emit an empty "removed" diff to signify as such
if _, ok := diffPathsAtB[common.Bytes2Hex(nodePath)]; !ok {
deletedNodes = append(deletedNodes, StateNode{
NodeType: Removed,
emptiedPaths = append(emptiedPaths, StateNode{
Path: nodePath,
NodeValue: []byte{},
NodeType: Removed,
})
}
node, err := sdb.stateCache.TrieDB().Node(it.Hash())
Expand Down Expand Up @@ -437,7 +433,7 @@ func (sdb *builder) deletedOrUpdatedState(a, b trie.NodeIterator, diffPathsAtB m
return nil, nil, fmt.Errorf("unexpected node type %s", ty)
}
}
return deletedNodes, diffAccountAtA, nil
return emptiedPaths, diffAccountAtA, nil
}

// buildAccountUpdates uses the account diffs maps for A => B and B => A and the known intersection of their leafkeys
Expand Down Expand Up @@ -494,21 +490,6 @@ func (sdb *builder) buildAccountCreations(accounts AccountMap, watchedStorageKey
return accountDiffs, nil
}

// buildAccountDeletions returns the state diff "removed" empty node objects
// to represent when an account exists at A but not at B
func (sdb *builder) buildAccountDeletions(accounts AccountMap) ([]StateNode, error) {
accountDiffs := make([]StateNode, 0, len(accounts))
for _, val := range accounts {
// For account deletions, we can not have any storage or the account would not be deleted
accountDiffs = append(accountDiffs, StateNode{
NodeType: Removed,
Path: val.Path,
NodeValue: []byte{},
})
}
return accountDiffs, nil
}

// buildStorageNodesEventual builds the storage diff node objects for a created account
// i.e. it returns all the storage nodes at this state, since there is no previous state
func (sdb *builder) buildStorageNodesEventual(sr common.Hash, watchedStorageKeys []common.Hash, intermediateNodes bool) ([]StorageNode, error) {
Expand Down
11 changes: 5 additions & 6 deletions statediff/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1968,12 +1968,11 @@ func TestBuilderWithMovedAccountOnlyLeafs(t *testing.T) {
NodeType: statediff.Removed,
NodeValue: []byte{},
},
// For accounts that move up a level due to the deletion of the only other child account of a shared parent branch node,
// the leaf-only diffing process emits a node for the account at the new path
// but does not emit a "removed" node object for the now empty path
// Fix this, not a major issue since if you are not watching intermediate nodes you aren't worried about the complete picture in the first place
// One solution is to simply use the same process as when including intermediate nodes and simply discard the intermediate nodes
// But that method is significantly more memory intensive
{
Path: []byte{'\x00'},
NodeType: statediff.Removed,
NodeValue: []byte{},
},
},
},
},
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,8 @@ func TestBuilderOnMainnetBlocks(t *testing.T) {
startingArguments statediff.Args
expected *statediff.StateObject
}{
// note that block0 (genesis) has over 1000 nodes due to the pre-allocation for the crowd-sale
// it is not feasible to write a unit test of that size at this time
{
"testBlock1",
//10000 transferred from testBankAddress to account1Addr
Expand Down
105 changes: 44 additions & 61 deletions statediff/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (sds *Service) streamStateDiff(currentBlock *types.Block, parentRoot common
for id, sub := range subs {
select {
case sub.PayloadChan <- *payload:
log.Debug(fmt.Sprintf("sending statediff payload to subscription %s", id))
log.Debug(fmt.Sprintf("sending statediff payload at head height %d to subscription %s", currentBlock.Number(), id))
default:
log.Info(fmt.Sprintf("unable to send statediff payload to subscription %s; channel has no receiver", id))
}
Expand All @@ -181,6 +181,18 @@ func (sds *Service) streamStateDiff(currentBlock *types.Block, parentRoot common
sds.Unlock()
}

// StateDiffAt returns a state diff object payload at the specific blockheight
// This operation cannot be performed back past the point of db pruning; it requires an archival node for historical data
func (sds *Service) StateDiffAt(blockNumber uint64, params Params) (*Payload, error) {
currentBlock := sds.BlockChain.GetBlockByNumber(blockNumber)
log.Info(fmt.Sprintf("sending state diff at block %d", blockNumber))
if blockNumber == 0 {
return sds.processStateDiff(currentBlock, common.Hash{}, params)
}
parentBlock := sds.BlockChain.GetBlockByHash(currentBlock.ParentHash())
return sds.processStateDiff(currentBlock, parentBlock.Root(), params)
}

// processStateDiff method builds the state diff payload from the current block, parent state root, and provided params
func (sds *Service) processStateDiff(currentBlock *types.Block, parentRoot common.Hash, params Params) (*Payload, error) {
stateDiff, err := sds.Builder.BuildStateDiffObject(Args{
Expand All @@ -196,28 +208,52 @@ func (sds *Service) processStateDiff(currentBlock *types.Block, parentRoot commo
if err != nil {
return nil, err
}
payload := Payload{
StateObjectRlp: stateDiffRlp,
return sds.newPayload(stateDiffRlp, currentBlock, params)
}

func (sds *Service) newPayload(stateObject []byte, block *types.Block, params Params) (*Payload, error) {
payload := &Payload{
StateObjectRlp: stateObject,
}
if params.IncludeBlock {
blockBuff := new(bytes.Buffer)
if err = currentBlock.EncodeRLP(blockBuff); err != nil {
if err := block.EncodeRLP(blockBuff); err != nil {
return nil, err
}
payload.BlockRlp = blockBuff.Bytes()
}
if params.IncludeTD {
payload.TotalDifficulty = sds.BlockChain.GetTdByHash(currentBlock.Hash())
payload.TotalDifficulty = sds.BlockChain.GetTdByHash(block.Hash())
}
if params.IncludeReceipts {
receiptBuff := new(bytes.Buffer)
receipts := sds.BlockChain.GetReceiptsByHash(currentBlock.Hash())
if err = rlp.Encode(receiptBuff, receipts); err != nil {
receipts := sds.BlockChain.GetReceiptsByHash(block.Hash())
if err := rlp.Encode(receiptBuff, receipts); err != nil {
return nil, err
}
payload.ReceiptsRlp = receiptBuff.Bytes()
}
return &payload, nil
return payload, nil
}

// StateTrieAt returns a state trie object payload at the specified blockheight
// This operation cannot be performed back past the point of db pruning; it requires an archival node for historical data
func (sds *Service) StateTrieAt(blockNumber uint64, params Params) (*Payload, error) {
currentBlock := sds.BlockChain.GetBlockByNumber(blockNumber)
log.Info(fmt.Sprintf("sending state trie at block %d", blockNumber))
return sds.processStateTrie(currentBlock, params)
}

func (sds *Service) processStateTrie(block *types.Block, params Params) (*Payload, error) {
stateNodes, err := sds.Builder.BuildStateTrieObject(block)
if err != nil {
return nil, err
}
stateTrieRlp, err := rlp.EncodeToBytes(stateNodes)
if err != nil {
return nil, err
}
return sds.newPayload(stateTrieRlp, block, params)
}

// Subscribe is used by the API to subscribe to the service loop
Expand Down Expand Up @@ -277,59 +313,6 @@ func (sds *Service) Start(*p2p.Server) error {
return nil
}

// StateDiffAt returns a state diff object payload at the specific blockheight
// This operation cannot be performed back past the point of db pruning; it requires an archival node for historical data
func (sds *Service) StateDiffAt(blockNumber uint64, params Params) (*Payload, error) {
currentBlock := sds.BlockChain.GetBlockByNumber(blockNumber)
log.Info(fmt.Sprintf("sending state diff at %d", blockNumber))
if blockNumber == 0 {
return sds.processStateDiff(currentBlock, common.Hash{}, params)
}
parentBlock := sds.BlockChain.GetBlockByHash(currentBlock.ParentHash())
return sds.processStateDiff(currentBlock, parentBlock.Root(), params)
}

// StateTrieAt returns a state trie object payload at the specified blockheight
// This operation cannot be performed back past the point of db pruning; it requires an archival node for historical data
func (sds *Service) StateTrieAt(blockNumber uint64, params Params) (*Payload, error) {
currentBlock := sds.BlockChain.GetBlockByNumber(blockNumber)
log.Info(fmt.Sprintf("sending state trie at %d", blockNumber))
return sds.stateTrieAt(currentBlock, params)
}

func (sds *Service) stateTrieAt(block *types.Block, params Params) (*Payload, error) {
stateNodes, err := sds.Builder.BuildStateTrieObject(block)
if err != nil {
return nil, err
}
stateTrieRlp, err := rlp.EncodeToBytes(stateNodes)
if err != nil {
return nil, err
}
payload := Payload{
StateObjectRlp: stateTrieRlp,
}
if params.IncludeBlock {
blockBuff := new(bytes.Buffer)
if err = block.EncodeRLP(blockBuff); err != nil {
return nil, err
}
payload.BlockRlp = blockBuff.Bytes()
}
if params.IncludeTD {
payload.TotalDifficulty = sds.BlockChain.GetTdByHash(block.Hash())
}
if params.IncludeReceipts {
receiptBuff := new(bytes.Buffer)
receipts := sds.BlockChain.GetReceiptsByHash(block.Hash())
if err = rlp.Encode(receiptBuff, receipts); err != nil {
return nil, err
}
payload.ReceiptsRlp = receiptBuff.Bytes()
}
return &payload, nil
}

// Stop is used to close down the service
func (sds *Service) Stop() error {
log.Info("Stopping statediff service")
Expand Down
Loading

0 comments on commit 97454eb

Please sign in to comment.