Skip to content
This repository has been archived by the owner on Nov 18, 2024. It is now read-only.

Commit

Permalink
add trie prefetching
Browse files Browse the repository at this point in the history
  • Loading branch information
dvush committed Oct 1, 2024
1 parent d18e9ba commit 5e2713b
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 36 deletions.
4 changes: 2 additions & 2 deletions benches/trie_nodes_benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ fn cloning(c: &mut Criterion) {
c.bench_function(
&format!("hashing_{}_branch_node_size_elements", TRIE_SIZE),
|b| {
let mut buff = Vec::new();
let mut buff = Vec::new();
b.iter(|| {
data.encode(&mut buff);
data.encode(&mut buff);
black_box(keccak256(&buff));
})
},
Expand Down
5 changes: 4 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,7 @@ pub mod reth_sparse_trie;
pub mod sparse_mpt;
pub mod utils;

pub use reth_sparse_trie::{calculate_root_hash_with_sparse_trie, SparseTrieSharedCache};
pub use reth_sparse_trie::{
calculate_root_hash_with_sparse_trie, prefetch_tries_for_accounts, ChangedAccountData,
SparseTrieSharedCache,
};
70 changes: 48 additions & 22 deletions src/reth_sparse_trie/change_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use revm::db::BundleAccount;
use revm_primitives::AccountInfo;
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
use crate::ChangedAccountData;

#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ETHTrieChangeSet {
pub account_trie_deletes: Vec<Bytes>,

Expand All @@ -16,17 +18,44 @@ pub struct ETHTrieChangeSet {
pub storage_trie_deleted_keys: Vec<Vec<Bytes>>,
}

pub fn prepare_change_set_for_prefetch<'a>(
changed_data: impl Iterator<Item = &'a ChangedAccountData>,
) -> ETHTrieChangeSet {
let mut result = ETHTrieChangeSet::default();

for data in changed_data {
let hashed_address = Bytes::copy_from_slice(keccak256(data.address).as_slice());

if data.account_deleted {
result.account_trie_deletes.push(hashed_address);
continue;
} else {
result.account_trie_updates.push(hashed_address);
}

let mut storage_updates_keys: Vec<Bytes> = Vec::new();
let mut storage_deleted_keys: Vec<Bytes> = Vec::new();
for (storage_key, deleted) in &data.slots {
let hashed_key = Bytes::copy_from_slice(keccak256(B256::from(*storage_key)).as_slice());
if *deleted {
storage_deleted_keys.push(hashed_key);
} else {
storage_updates_keys.push(hashed_key);
}
}

result.storage_trie_updated_keys.push(storage_updates_keys);
result.storage_trie_deleted_keys.push(storage_deleted_keys);
}

result
}

pub fn prepare_change_set<'a>(
changes: impl Iterator<Item = (Address, &'a BundleAccount)>,
) -> ETHTrieChangeSet {
let mut account_trie_deletes: Vec<Bytes> = Vec::new();
let mut result = ETHTrieChangeSet::default();

let mut account_trie_updates: Vec<Bytes> = Vec::new();
let mut account_trie_updates_info: Vec<AccountInfo> = Vec::new();

let mut storage_trie_updated_keys: Vec<Vec<Bytes>> = Vec::new();
let mut storage_trie_updated_values: Vec<Vec<Bytes>> = Vec::new();
let mut storage_trie_deleted_keys: Vec<Vec<Bytes>> = Vec::new();
for (address, bundle_account) in changes {
let status = bundle_account.status;
if status.is_not_modified() {
Expand All @@ -39,12 +68,14 @@ pub fn prepare_change_set<'a>(
match bundle_account.account_info() {
// account was modified
Some(account) => {
account_trie_updates.push(hashed_address);
account_trie_updates_info.push(account.without_code());
result.account_trie_updates.push(hashed_address);
result
.account_trie_updates_info
.push(account.without_code());
}
// account was destroyed
None => {
account_trie_deletes.push(hashed_address);
result.account_trie_deletes.push(hashed_address);
continue;
}
}
Expand All @@ -68,17 +99,12 @@ pub fn prepare_change_set<'a>(
storage_updates_values.push(value);
}
}
storage_trie_updated_keys.push(storage_updates_keys);
storage_trie_updated_values.push(storage_updates_values);
storage_trie_deleted_keys.push(storage_deleted_keys);
result.storage_trie_updated_keys.push(storage_updates_keys);
result
.storage_trie_updated_values
.push(storage_updates_values);
result.storage_trie_deleted_keys.push(storage_deleted_keys);
}

ETHTrieChangeSet {
account_trie_deletes,
account_trie_updates,
account_trie_updates_info,
storage_trie_updated_keys,
storage_trie_updated_values,
storage_trie_deleted_keys,
}
result
}
53 changes: 50 additions & 3 deletions src/reth_sparse_trie/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use alloy_primitives::B256;
use alloy_primitives::{Address, B256};
use change_set::prepare_change_set;
use change_set::prepare_change_set_for_prefetch;
use hash::RootHashError;
use reth_db_api::database::Database;
use reth_provider::providers::ConsistentDbView;
Expand Down Expand Up @@ -36,15 +37,61 @@ pub enum SparseTrieError {
#[error("Error while computing root hash: {0:?}")]
RootHash(RootHashError),
#[error("Error while fetching trie nodes from db: {0:?}")]
FetchNode(FetchNodeError),
FetchNode(#[from] FetchNodeError),
#[error("Error while updated shared cache: {0:?}")]
FailedToUpdateSharedCache(AddNodeError),
FailedToUpdateSharedCache(#[from] AddNodeError),
/// This might indicate bug in the library
/// or incorrect underlying storage (e.g. when deletes can't be applyed to the trie because it does not have that keys)
#[error("Failed to fetch data")]
FailedToFetchData,
}

#[derive(Debug)]
pub struct ChangedAccountData {
pub address: Address,
pub account_deleted: bool,
/// (slot, deleted)
pub slots: Vec<(B256, bool)>,
}

impl ChangedAccountData {
pub fn new(address: Address, account_deleted: bool) -> Self {
Self {
address,
account_deleted,
slots: Vec::new(),
}
}
}

/// Prefetches data
pub fn prefetch_tries_for_accounts<'a, DB, Provider>(
consistent_db_view: ConsistentDbView<DB, Provider>,
shared_cache: SparseTrieSharedCache,
changed_data: impl Iterator<Item = &'a ChangedAccountData>,
) -> Result<(), SparseTrieError>
where
DB: Database,
Provider: DatabaseProviderFactory<DB> + Send + Sync,
{
let change_set = prepare_change_set_for_prefetch(changed_data);

let fetcher = TrieFetcher::new(consistent_db_view);

for _ in 0..3 {
let gather_result = shared_cache.gather_tries_for_changes(&change_set);

let missing_nodes = match gather_result {
Ok(_) => return Ok(()),
Err(missing_nodes) => missing_nodes,
};
let multiproof = fetcher.fetch_missing_nodes(missing_nodes)?;
shared_cache.update_cache_with_fetched_nodes(multiproof)?;
}

Err(SparseTrieError::FailedToFetchData)
}

/// Calculate root hash for the given outcome on top of the block defined by consistent_db_view.
/// * shared_cache should be created once for each parent block and it stores fethed parts of the trie
/// It uses rayon for parallelism and the thread pool should be configured from outside.
Expand Down
2 changes: 1 addition & 1 deletion src/sparse_mpt/diff_trie/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ impl DiffChildPtr {
pub struct DiffBranchNode {
pub fixed: Option<Arc<FixedBranchNode>>,
/// this must have an element for children that we have in the diff trie
pub changed_children: SmallVec<[(u8, Option<DiffChildPtr>); 4]>,
pub changed_children: SmallVec<[(u8, Option<DiffChildPtr>); 16]>,
pub aux_bits: u16,
}

Expand Down
12 changes: 5 additions & 7 deletions src/sparse_mpt/fixed_trie.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,11 @@ impl FixedTrieNode {
},
})
}
FixedTrieNode::Branch { node, .. } => {
DiffTrieNodeKind::Branch(DiffBranchNode {
fixed: Some(Arc::clone(node)),
changed_children: SmallVec::new(),
aux_bits: node.child_mask,
})
}
FixedTrieNode::Branch { node, .. } => DiffTrieNodeKind::Branch(DiffBranchNode {
fixed: Some(Arc::clone(node)),
changed_children: SmallVec::new(),
aux_bits: node.child_mask,
}),
FixedTrieNode::Null => DiffTrieNodeKind::Null,
};
DiffTrieNode {
Expand Down

0 comments on commit 5e2713b

Please sign in to comment.