diff --git a/Cargo.lock b/Cargo.lock index 5d1a16aaf..8b25d1066 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -312,7 +312,7 @@ dependencies = [ "futures", "futures-utils-wasm", "lru", - "parking_lot 0.12.3", + "parking_lot", "pin-project", "schnellru", "serde", @@ -2111,7 +2111,7 @@ dependencies = [ "hashbrown 0.14.5", "lock_api", "once_cell", - "parking_lot_core 0.9.10", + "parking_lot_core", ] [[package]] @@ -2125,7 +2125,7 @@ dependencies = [ "hashbrown 0.14.5", "lock_api", "once_cell", - "parking_lot_core 0.9.10", + "parking_lot_core", ] [[package]] @@ -2562,7 +2562,7 @@ dependencies = [ "hashbrown 0.15.2", "hex", "log", - "parking_lot 0.12.3", + "parking_lot", "rand", "rlp 0.6.1", "uuid 1.11.0", @@ -2914,18 +2914,6 @@ dependencies = [ "once_cell", ] -[[package]] -name = "fallible-iterator" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" - -[[package]] -name = "fallible-streaming-iterator" -version = "0.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" - [[package]] name = "fast-float" version = "0.2.0" @@ -3618,15 +3606,6 @@ dependencies = [ "fxhash", ] -[[package]] -name = "hashlink" -version = "0.9.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af" -dependencies = [ - "hashbrown 0.14.5", -] - [[package]] name = "heck" version = "0.4.1" @@ -3709,7 +3688,7 @@ dependencies = [ "ipconfig", "lru-cache", "once_cell", - "parking_lot 0.12.3", + "parking_lot", "rand", "resolv-conf", "smallvec", @@ -4564,7 +4543,7 @@ dependencies = [ "http-body 1.0.1", "http-body-util", "jsonrpsee-types 0.24.7", - "parking_lot 0.12.3", + "parking_lot", "rand", "rustc-hash 2.1.0", "serde", @@ -4958,7 +4937,7 @@ dependencies = [ "multihash", "multistream-select", "once_cell", - "parking_lot 0.12.3", + "parking_lot", "pin-project", "quick-protobuf", "rand", @@ -4981,7 +4960,7 @@ dependencies = [ "hickory-resolver", "libp2p-core", "libp2p-identity", - "parking_lot 0.12.3", + "parking_lot", "smallvec", "tracing", ] @@ -5158,7 +5137,7 @@ dependencies = [ "libp2p-core", "libp2p-identity", "libp2p-tls", - "parking_lot 0.12.3", + "parking_lot", "quinn", "rand", "ring 0.17.8", @@ -5294,17 +5273,6 @@ dependencies = [ "libc", ] -[[package]] -name = "libsqlite3-sys" -version = "0.30.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149" -dependencies = [ - "cc", - "pkg-config", - "vcpkg", -] - [[package]] name = "libssh2-sys" version = "0.3.0" @@ -6117,17 +6085,6 @@ version = "2.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" -[[package]] -name = "parking_lot" -version = "0.11.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" -dependencies = [ - "instant", - "lock_api", - "parking_lot_core 0.8.6", -] - [[package]] name = "parking_lot" version = "0.12.3" @@ -6135,21 +6092,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" dependencies = [ "lock_api", - "parking_lot_core 0.9.10", -] - -[[package]] -name = "parking_lot_core" -version = "0.8.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc" -dependencies = [ - "cfg-if", - "instant", - "libc", - "redox_syscall 0.2.16", - "smallvec", - "winapi", + "parking_lot_core", ] [[package]] @@ -6160,7 +6103,7 @@ checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" dependencies = [ "cfg-if", "libc", - "redox_syscall 0.5.7", + "redox_syscall", "smallvec", "windows-targets 0.52.6", ] @@ -6533,7 +6476,7 @@ dependencies = [ "log", "nix", "once_cell", - "parking_lot 0.12.3", + "parking_lot", "smallvec", "symbolic-demangle", "tempfile", @@ -6627,7 +6570,7 @@ checksum = "504ee9ff529add891127c4827eb481bd69dc0ebc72e9a682e187db4caa60c3ca" dependencies = [ "dtoa", "itoa", - "parking_lot 0.12.3", + "parking_lot", "prometheus-client-derive-encode", ] @@ -6915,12 +6858,13 @@ dependencies = [ ] [[package]] -name = "redox_syscall" -version = "0.2.16" +name = "redb" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" +checksum = "84b1de48a7cf7ba193e81e078d17ee2b786236eed1d3f7c60f8a09545efc4925" dependencies = [ - "bitflags 1.3.2", + "libc", + "log", ] [[package]] @@ -7335,20 +7279,6 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48fd7bd8a6377e15ad9d42a8ec25371b94ddc67abe7c8b9127bec79bebaaae18" -[[package]] -name = "rusqlite" -version = "0.32.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7753b721174eb8ff87a9a0e799e2d7bc3749323e773db92e0984debb00019d6e" -dependencies = [ - "bitflags 2.6.0", - "fallible-iterator", - "fallible-streaming-iterator", - "hashlink", - "libsqlite3-sys", - "smallvec", -] - [[package]] name = "rustc-demangle" version = "0.1.24" @@ -8100,22 +8030,6 @@ dependencies = [ "autocfg", ] -[[package]] -name = "sled" -version = "0.34.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f96b4737c2ce5987354855aed3797279def4ebf734436c6aa4552cf8e169935" -dependencies = [ - "crc32fast", - "crossbeam-epoch", - "crossbeam-utils", - "fs2", - "fxhash", - "libc", - "log", - "parking_lot 0.11.2", -] - [[package]] name = "slug" version = "0.1.6" @@ -8285,7 +8199,7 @@ checksum = "f91138e76242f575eb1d3b38b4f1362f10d3a43f47d182a5b359af488a02293b" dependencies = [ "new_debug_unreachable", "once_cell", - "parking_lot 0.12.3", + "parking_lot", "phf_shared 0.10.0", "precomputed-hash", ] @@ -8779,7 +8693,7 @@ dependencies = [ "bytes", "libc", "mio", - "parking_lot 0.12.3", + "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2", @@ -9551,7 +9465,7 @@ checksum = "0048ad49a55b9deb3953841fa1fc5858f0efbcb7a18868c899a360269fac1b23" dependencies = [ "futures", "js-sys", - "parking_lot 0.12.3", + "parking_lot", "pin-utils", "slab", "wasm-bindgen", @@ -9980,7 +9894,7 @@ dependencies = [ "futures", "log", "nohash-hasher", - "parking_lot 0.12.3", + "parking_lot", "pin-project", "rand", "static_assertions", @@ -9995,7 +9909,7 @@ dependencies = [ "futures", "log", "nohash-hasher", - "parking_lot 0.12.3", + "parking_lot", "pin-project", "rand", "static_assertions", @@ -10269,9 +10183,9 @@ dependencies = [ "rand_chacha", "rand_core", "rayon", + "redb", "revm", "revm-inspectors", - "rusqlite", "scilla-parser 2.0.0", "scopeguard", "semver 1.0.23", @@ -10281,7 +10195,6 @@ dependencies = [ "serde_repr", "sha2", "sha3", - "sled", "tempfile", "thiserror 2.0.3", "time", diff --git a/eth-trie.rs/src/db.rs b/eth-trie.rs/src/db.rs index f4b964f0a..e9188583b 100644 --- a/eth-trie.rs/src/db.rs +++ b/eth-trie.rs/src/db.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, error::Error, sync::Arc}; +use std::{collections::HashMap, fmt::Display, sync::Arc}; use parking_lot::RwLock; @@ -8,7 +8,7 @@ use crate::errors::MemDBError; /// You should first write the data to the cache and write the data /// to the database in bulk after the end of a set of operations. pub trait DB: Send + Sync { - type Error: Error; + type Error: Display; fn get(&self, key: &[u8]) -> Result>, Self::Error>; diff --git a/z2/src/converter.rs b/z2/src/converter.rs index 417f5df52..547302f40 100644 --- a/z2/src/converter.rs +++ b/z2/src/converter.rs @@ -39,7 +39,7 @@ use zilliqa::{ constants::SCILLA_INVOKE_CHECKER, contracts, crypto::{self, Hash, SecretKey}, - db::Db, + db::{ArcDb, Db}, exec::{store_external_libraries, BaseFeeCheck}, inspector, message::{Block, BlockHeader, QuorumCertificate, Vote, MAX_COMMITTEE_SIZE}, @@ -95,7 +95,7 @@ fn invoke_checker(state: &State, code: &str, init_data: &[ParamValue]) -> Result #[allow(clippy::type_complexity)] fn convert_scilla_state( zq1_db: &zq1::Db, - zq2_db: &Db, + zq2_db: &Arc, state: &State, code: &str, init_data: &[ParamValue], @@ -201,7 +201,7 @@ fn convert_scilla_state( Ok((storage_root, field_types, transitions)) } -fn convert_evm_state(zq1_db: &zq1::Db, zq2_db: &Db, address: Address) -> Result { +fn convert_evm_state(zq1_db: &zq1::Db, zq2_db: &Arc, address: Address) -> Result { let prefix = create_acc_query_prefix(address); let storage_entries_iter = zq1_db.get_contract_state_data_with_prefix(&prefix); @@ -464,7 +464,7 @@ pub async fn convert_persistence( .get_tx_blocks_aux("MaxTxBlockNumber")? .unwrap_or_default(); - let current_block = zq2_db.get_finalized_view()?.unwrap_or(1); + let current_block = zq2_db.read()?.finalized_view()?.get()?.unwrap_or(1); let progress = ProgressBar::new(max_block) .with_style(style.clone()) @@ -562,7 +562,7 @@ pub async fn convert_persistence( parent_hash, zq1_block.block_num - 1, ); - let block = Block::from_qc( + let mut block = Block::from_qc( secret_key, zq1_block.block_num, zq1_block.block_num, @@ -576,6 +576,7 @@ pub async fn convert_persistence( ScillaGas(zq1_block.gas_used).into(), ScillaGas(zq1_block.gas_limit).into(), ); + block.header.hash = zq1_block.block_hash.into(); // For each receipt update block hash. This can be done once all receipts build receipt_root_hash which is used for calculating block hash for receipt in &mut receipts { @@ -584,53 +585,42 @@ pub async fn convert_persistence( parent_hash = zq1_block.block_hash.into(); - zq2_db.with_sqlite_tx(|sqlite_tx| { - zq2_db.insert_block_with_hash_with_db_tx( - sqlite_tx, - zq1_block.block_hash.into(), - &block, - )?; - zq2_db.set_high_qc_with_db_tx(sqlite_tx, block.header.qc)?; - zq2_db.set_finalized_view_with_db_tx(sqlite_tx, block.view())?; - trace!("{} block inserted", block.number()); - - for (hash, transaction) in &transactions { - if let Err(err) = zq2_db.insert_transaction_with_db_tx(sqlite_tx, hash, transaction) - { - warn!( - "Unable to insert transaction with id: {:?} to db, err: {:?}", - *hash, err - ); - } + let write = zq2_db.write_non_durable()?; + + write.blocks()?.insert(&block)?; + write.high_qc()?.set(&block.header.qc)?; + write.finalized_view()?.set(block.view())?; + trace!("{} block inserted", block.number()); + { + let mut transactions_table = write.transactions()?; + for (hash, txn) in &transactions { + transactions_table.insert(*hash, txn)?; } + let mut receipts_table = write.receipts()?; for receipt in &receipts { - if let Err(err) = - zq2_db.insert_transaction_receipt_with_db_tx(sqlite_tx, receipt.to_owned()) - { - warn!( - "Unable to insert receipt with id: {:?} into db, err: {:?}", - receipt.tx_hash, err - ); - } + receipts_table.insert(receipt)?; } - Ok(()) - })?; + } + write.commit()?; } // Let's insert another block (empty) which will be used as high_qc block when zq2 starts from converted persistence - let highest_block = zq2_db.get_highest_canonical_block_number()?.unwrap(); - let highest_block = zq2_db.get_block_by_view(highest_block)?.unwrap(); + let highest_block = zq2_db.read()?.blocks()?.max_canonical_by_view()?.unwrap(); - zq2_db.with_sqlite_tx(|sqlite_tx| { - let empty_high_qc_block = create_empty_block_from_parent(&highest_block, secret_key); - zq2_db.insert_block_with_db_tx(sqlite_tx, &empty_high_qc_block)?; - zq2_db.set_high_qc_with_db_tx(sqlite_tx, empty_high_qc_block.header.qc)?; - Ok(()) - })?; + let write = zq2_db.write_non_durable()?; + let empty_high_qc_block = create_empty_block_from_parent(&highest_block, secret_key); + write.blocks()?.insert(&empty_high_qc_block)?; + write.high_qc()?.set(&empty_high_qc_block.header.qc)?; + write.commit()?; println!( "Persistence conversion done up to block {}", - zq2_db.get_highest_canonical_block_number()?.unwrap_or(0) + zq2_db + .read()? + .blocks()? + .max_canonical_by_view()? + .map(|b| b.number()) + .unwrap_or(0) ); Ok(()) diff --git a/zilliqa/Cargo.toml b/zilliqa/Cargo.toml index 82b04cd97..831383c2c 100644 --- a/zilliqa/Cargo.toml +++ b/zilliqa/Cargo.toml @@ -59,13 +59,11 @@ rand_chacha = "0.3.1" rand_core = "0.6.4" revm = { version = "18.0.0", features = ["optional_no_base_fee"] } revm-inspectors = { version = "0.11.0", features = ["js-tracer"] } -rusqlite = { version = "0.32.1", features = ["bundled", "trace"] } serde = { version = "1.0.215", features = ["derive", "rc"] } serde_bytes = "0.11.14" serde_json = { version = "1.0.133", features = ["raw_value","arbitrary_precision"] } sha2 = "0.10.8" sha3 = "0.10.8" -sled = "0.34.7" tempfile = "3.14.0" time = { version = "0.3.36", features = ["formatting", "macros"] } tokio = { version = "1.41.1", features = ["macros", "rt-multi-thread", "signal", "sync"] } @@ -85,6 +83,7 @@ serde_repr = "0.1.19" thiserror = "2.0.3" lru-mem = "0.3.0" opentelemetry-semantic-conventions = { version = "0.27.0", features = ["semconv_experimental"] } +redb = { version = "2.2.0", features = ["logging"] } [dev-dependencies] alloy = { version = "0.6.4", default-features = false, features = ["network", "rand", "signers", "signer-local"] } diff --git a/zilliqa/src/api/eth.rs b/zilliqa/src/api/eth.rs index 6fda14fbd..5e1766cbf 100644 --- a/zilliqa/src/api/eth.rs +++ b/zilliqa/src/api/eth.rs @@ -13,7 +13,7 @@ use alloy::{ }; use anyhow::{anyhow, Result}; use http::Extensions; -use itertools::{Either, Itertools}; +use itertools::Either; use jsonrpsee::{ core::StringError, types::{ @@ -502,71 +502,55 @@ fn get_logs(params: Params, node: &Arc>) -> Result> { } }; - // Get the receipts for each transaction. This is an iterator of (receipt, txn_index, txn_hash, block_number, block_hash). - let receipts = blocks - .map(|block: Result<_>| { - let block = block?; - let block_number = block.number(); - let block_hash = block.hash(); - let receipts = node.get_transaction_receipts_in_block(block_hash)?; - - Ok(block - .transactions - .into_iter() - .enumerate() - .zip(receipts) - .map(move |((txn_index, txn_hash), receipt)| { - (receipt, txn_index, txn_hash, block_number, block_hash) - })) - }) - .flatten_ok(); - - // Get the logs from each receipt and filter them based on the provided parameters. This is an iterator of (log, log_index, txn_index, txn_hash, block_number, block_hash). - let logs = receipts - .map(|r: Result<_>| { - let (receipt, txn_index, txn_hash, block_number, block_hash) = r?; - Ok(receipt - .logs - .into_iter() - .map(|log| match log { - Log::Evm(log) => log, - Log::Scilla(log) => log.into_evm(), - }) - .enumerate() - .map(move |(i, l)| (l, i, txn_index, txn_hash, block_number, block_hash))) - }) - .flatten_ok() - .filter_ok(|(log, _, _, _, _, _)| { - params - .address - .as_ref() - .map(|a| a.contains(&log.address)) - .unwrap_or(true) - }) - .filter_ok(|(log, _, _, _, _, _)| { - params - .topics - .iter() - .zip(log.topics.iter()) - .all(|(filter_topic, log_topic)| { - filter_topic.is_empty() || filter_topic.contains(log_topic) - }) - }); - - // Finally convert the iterator to our response format. - let logs = logs.map(|l: Result<_>| { - let (log, log_index, txn_index, txn_hash, block_number, block_hash) = l?; - Ok(eth::Log::new( - log, - log_index, - txn_index, - txn_hash, - block_number, - block_hash, - )) - }); + let mut logs = vec![]; + + for block in blocks { + let block = block?; + + for (txn_index, txn_hash) in block.transactions.iter().enumerate() { + let receipt = node + .get_transaction_receipt(*txn_hash)? + .ok_or(anyhow!("missing receipt"))?; + + for (log_index, log) in receipt.logs.into_iter().enumerate() { + let log = match log { + Log::Evm(l) => l, + Log::Scilla(l) => l.into_evm(), + }; + + if !params + .address + .as_ref() + .map(|a| a.contains(&log.address)) + .unwrap_or(true) + { + continue; + } + + if !params + .topics + .iter() + .zip(log.topics.iter()) + .all(|(filter_topic, log_topic)| { + filter_topic.is_empty() || filter_topic.contains(log_topic) + }) + { + continue; + } + + logs.push(eth::Log::new( + log, + log_index, + txn_index, + *txn_hash, + block.number(), + block.hash(), + )); + } + } + } - logs.collect() + Ok(logs) } fn get_transaction_by_block_hash_and_index( diff --git a/zilliqa/src/block_store.rs b/zilliqa/src/block_store.rs index 58b8bddf4..789dcb99b 100644 --- a/zilliqa/src/block_store.rs +++ b/zilliqa/src/block_store.rs @@ -141,15 +141,14 @@ impl BlockCache { pub fn destructive_proposals_from_parent_hashes( &mut self, - hashes: &Vec, + hashes: impl Iterator, ) -> Vec<(PeerId, Proposal)> { // For each hash, find the list of blocks that have it as the parent. let cache_keys = hashes - .iter() - .filter_map(|x| self.by_parent_hash.remove(x)) + .filter_map(|x| self.by_parent_hash.remove(&x)) .flatten() .collect::>(); - trace!("block_store::destructive.. : parent hashes {hashes:?} maps to keys {cache_keys:?}"); + trace!("block_store::destructive.. : parent hashes maps to keys {cache_keys:?}"); let maybe = cache_keys .iter() .filter_map(|key| { @@ -173,7 +172,7 @@ impl BlockCache { self.fork_counter = 0; } trace!( - "block_store::destructive.. : pulled blocks for parent hashes {hashes:?} - {}", + "block_store::destructive.. : pulled blocks for parent hashes - {}", maybe .iter() .map(|(_, v)| format!("v={} b={}", v.header.view, v.header.number)) @@ -517,64 +516,6 @@ impl PeerInfo { } } -/// Data about a peer -#[derive(Serialize, Deserialize, Clone, Debug)] -pub struct PeerInfoStatus { - availability: BlockAvailability, - availability_updated_at: Option, - pending_requests: Vec<(String, SystemTime, u64, u64)>, - last_request_failed_at: Option, -} - -/// Data about the block store, used for debugging. -#[derive(Serialize, Deserialize, Clone, Debug)] -pub struct BlockStoreStatus { - highest_known_view: u64, - views_held: Vec>, - peers: Vec<(String, PeerInfoStatus)>, - availability: Option>, -} - -impl BlockStoreStatus { - pub fn new(block_store: &mut BlockStore) -> Result { - let peers = block_store - .peers - .iter() - .map(|(k, v)| (format!("{:?}", k), PeerInfoStatus::new(v))) - .collect::>(); - Ok(Self { - highest_known_view: block_store.highest_known_view, - views_held: block_store.db.get_view_ranges()?, - peers, - availability: block_store.availability()?, - }) - } -} - -impl PeerInfoStatus { - // Annoyingly, this can't (easily) be allowed to fail without making generating debug info hard. - fn new(info: &PeerInfo) -> Self { - fn s_from_time(q: Option) -> Option { - q.map(|z| { - z.duration_since(SystemTime::UNIX_EPOCH) - .unwrap_or(Duration::ZERO) - .as_secs() - }) - } - let pending_requests = info - .pending_requests - .iter() - .map(|(k, v)| (format!("{:?}", k), v.0, v.1, v.2)) - .collect::>(); - Self { - availability: info.availability.clone(), - availability_updated_at: s_from_time(info.availability_updated_at), - pending_requests, - last_request_failed_at: s_from_time(info.last_request_failed_at), - } - } -} - impl BlockAvailability { pub fn new() -> Self { Self { @@ -586,13 +527,14 @@ impl BlockAvailability { impl BlockStore { pub fn new(config: &NodeConfig, db: Arc, message_sender: MessageSender) -> Result { - let available_blocks = - db.get_view_ranges()? - .iter() - .fold(RangeMap::new(), |mut range_map, range| { - range_map.with_range(range); - range_map - }); + let read = db.read()?; + let blocks = read.blocks()?; + let min = blocks.min_by_view()?.map(|b| b.view()).unwrap_or_default(); + let max = blocks + .max_canonical_by_view()? + .map(|b| b.view()) + .unwrap_or_default(); + let available_blocks = RangeMap::from_closed_interval(min, max); Ok(BlockStore { db, block_cache: Arc::new(RwLock::new(LruCache::new(NonZeroUsize::new(5).unwrap()))), @@ -1036,7 +978,7 @@ impl BlockStore { if let Some(block) = block_cache.get(&hash) { return Ok(Some(block.clone())); } - let Some(block) = self.db.get_block_by_hash(&hash)? else { + let Some(block) = self.db.read()?.blocks()?.by_hash(hash)? else { return Ok(None); }; block_cache.put(hash, block.clone()); @@ -1044,18 +986,15 @@ impl BlockStore { } pub fn get_block_by_view(&self, view: u64) -> Result> { - let Some(hash) = self.db.get_block_hash_by_view(view)? else { - return Ok(None); - }; - self.get_block(hash) + self.db.read()?.blocks()?.by_view(view) } - pub fn get_highest_canonical_block_number(&self) -> Result> { - self.db.get_highest_canonical_block_number() + pub fn get_highest_block(&self) -> Result> { + self.db.read()?.blocks()?.max_canonical_by_view() } pub fn get_canonical_block_by_number(&self, number: u64) -> Result> { - self.db.get_canonical_block_by_number(number) + self.db.read()?.blocks()?.canonical_by_height(number) } /// Called to process a block which can be added to the chain. @@ -1072,7 +1011,9 @@ impl BlockStore { block: Block, ) -> Result> { trace!(?from, number = block.number(), hash = ?block.hash(), "block_store::process_block() : starting"); - self.db.insert_block(&block)?; + let write = self.db.write()?; + write.blocks()?.insert(&block)?; + write.commit()?; self.available_blocks.with_elem(block.view()); if let Some(from) = from { @@ -1086,7 +1027,7 @@ impl BlockStore { // There are two sets let result = self .buffered - .destructive_proposals_from_parent_hashes(&vec![block.hash()]); + .destructive_proposals_from_parent_hashes(std::iter::once(block.hash())); // Update highest_confirmed_view, but don't trim the cache if // we're not changing anything. @@ -1119,25 +1060,21 @@ impl BlockStore { self.peers.entry(peer).or_insert_with(PeerInfo::new) } - pub fn forget_block_range(&mut self, blocks: Range) -> Result<()> { - self.db.forget_block_range(blocks) - } - - pub fn contains_block(&mut self, block_hash: &Hash) -> Result { - self.db.contains_block(block_hash) + pub fn contains_block(&mut self, view: u64) -> Result { + self.db.read()?.blocks()?.contains(view) } // Retrieve the plausible next blocks for the block with this hash // Because of forks there might be many of these. pub fn obtain_child_block_candidates_for( &mut self, - hashes: &Vec, + blocks: &[Block], ) -> Result> { - trace!("block_store::obtain_child_block_candidates_for : {hashes:?}"); + trace!("block_store::obtain_child_block_candidates_for"); // The easy case is that there's something in the buffer with us as its parent hash. let with_parent_hashes = self .buffered - .destructive_proposals_from_parent_hashes(hashes); + .destructive_proposals_from_parent_hashes(blocks.iter().map(|b| b.hash())); if with_parent_hashes.is_empty() { // There isn't. There are three cases: // @@ -1152,7 +1089,13 @@ impl BlockStore { // In any case, deleting any cached block that calls itself the next block is // the right thing to do - if it really was the next block, we would not be // executing this branch. - if let Some(highest_block_number) = self.db.get_highest_canonical_block_number()? { + if let Some(highest_block_number) = self + .db + .read()? + .blocks()? + .max_canonical_by_view()? + .map(|b| b.number()) + { self.buffered.delete_blocks_up_to(highest_block_number + 1); trace!( "block_store::obtain_child_block_candidates : deleted cached blocks up to and including {0}", @@ -1162,12 +1105,17 @@ impl BlockStore { let fork_elems = self.buffered.inc_fork_counter() * (1 + constants::EXAMINE_BLOCKS_PER_FORK_COUNT); - let parent_hashes = self.db.get_highest_block_hashes(fork_elems)?; + let parents = self + .db + .read()? + .blocks()? + .max_canonical_by_view_count(fork_elems)?; + let parent_hashes = parents.iter().map(|b| b.hash()); let revised = self .buffered - .destructive_proposals_from_parent_hashes(&parent_hashes); + .destructive_proposals_from_parent_hashes(parent_hashes); trace!( - "block_store::obtain_child_block_candidates : fork evasion of {fork_elems} elements - {parent_hashes:?} produces {revised:?}" + "block_store::obtain_child_block_candidates : fork evasion of {fork_elems} elements produces {revised:?}" ); if !revised.is_empty() { // Found some! @@ -1185,7 +1133,9 @@ impl BlockStore { self.obtain_child_block_candidates_for( &self .db - .get_highest_block_hashes(constants::EXAMINE_BLOCKS_PER_FORK_COUNT)?, + .read()? + .blocks()? + .max_canonical_by_view_count(constants::EXAMINE_BLOCKS_PER_FORK_COUNT)?, ) } @@ -1233,8 +1183,8 @@ impl BlockStore { Ok(()) } - pub fn get_num_transactions(&self) -> Result { - let count = self.db.get_total_transaction_count()?; + pub fn get_num_transactions(&self) -> Result { + let count = self.db.read()?.transactions()?.count()?; Ok(count) } @@ -1252,13 +1202,10 @@ impl BlockStore { /// Returns (am_syncing, current_highest_block) pub fn am_syncing(&self) -> Result<(bool, Block)> { - let current_block = self - .db - .get_canonical_block_by_number( - self.db - .get_highest_canonical_block_number()? - .ok_or_else(|| anyhow!("no highest block"))?, - )? + let read = self.db.read()?; + let current_block = read + .blocks()? + .max_canonical_by_view()? .ok_or_else(|| anyhow!("missing highest block"))?; Ok(( (self.highest_known_view + 2) > current_block.view(), diff --git a/zilliqa/src/consensus.rs b/zilliqa/src/consensus.rs index fff830449..33bd52814 100644 --- a/zilliqa/src/consensus.rs +++ b/zilliqa/src/consensus.rs @@ -23,7 +23,7 @@ use crate::{ blockhooks, cfg::{ConsensusConfig, NodeConfig}, crypto::{verify_messages, Hash, NodePublicKey, NodeSignature, SecretKey}, - db::{self, Db}, + db::{self, ArcDb, Db}, exec::{PendingState, TransactionApplyResult}, inspector::{self, ScillaInspector, TouchedAddressInspector}, message::{ @@ -207,7 +207,9 @@ impl Consensus { let block_store = BlockStore::new(&config, db.clone(), message_sender.clone())?; let latest_block = db - .get_finalized_view()? + .read()? + .finalized_view()? + .get()? .map(|view| { block_store .get_block_by_view(view)? @@ -241,22 +243,28 @@ impl Consensus { }; let (start_view, finalized_view, high_qc) = { - match db.get_high_qc()? { - Some(qc) => { + match db.read()?.high_qc()?.get()? { + Some((qc, _)) => { let high_block = block_store .get_block(qc.block_hash)? .ok_or_else(|| anyhow!("missing block that high QC points to!"))?; let finalized_number = db - .get_finalized_view()? + .read()? + .finalized_view()? + .get()? .ok_or_else(|| anyhow!("missing latest finalized view!"))?; let finalized_block = db - .get_block_by_view(finalized_number)? + .read()? + .blocks()? + .by_view(finalized_number)? .ok_or_else(|| anyhow!("missing finalized block!"))?; // If latest view was written to disk then always start from there. Otherwise start from (highest out of high block and finalised block) + 1 let start_view = db - .get_view()? + .read()? + .view()? + .get()? .or_else(|| { Some(std::cmp::max(high_block.view(), finalized_block.view()) + 1) }) @@ -279,14 +287,13 @@ impl Consensus { // If we have newer blocks, erase them // @todo .. more elegantly :-) + let write = db.write()?; loop { - let highest_block_number = db - .get_highest_canonical_block_number()? - .ok_or_else(|| anyhow!("can't find highest block num in database!"))?; - - let head_block = block_store - .get_canonical_block_by_number(highest_block_number)? + let head_block = write + .blocks()? + .max_canonical_by_view()? .ok_or_else(|| anyhow!("missing head block!"))?; + let highest_block_number = head_block.number(); trace!( "recovery: highest_block_number {highest_block_number} view {0}", head_block.view() @@ -295,13 +302,16 @@ impl Consensus { if head_block.view() > high_block.view() && head_block.view() > finalized_number { + for txn_hash in &head_block.transactions { + write.delete_transaction(*txn_hash)?; + } + write.blocks()?.delete(head_block.view())?; trace!("recovery: stored block {0} reverted", highest_block_number); - db.remove_transactions_executed_in_block(&head_block.hash())?; - db.remove_block(&head_block)?; } else { break; } } + write.commit()?; info!( "During recovery, starting consensus at view {}, finalised view {}", @@ -341,8 +351,10 @@ impl Consensus { new_transactions: broadcast::Sender::new(128), new_transaction_hashes: broadcast::Sender::new(128), }; - consensus.db.set_view(start_view)?; - consensus.set_finalized_view(finalized_view)?; + let write = consensus.db.write()?; + write.view()?.set(start_view)?; + write.finalized_view()?.set(finalized_view)?; + write.commit()?; // If we're at genesis, add the genesis block and return if latest_block_view == 0 { @@ -373,7 +385,9 @@ impl Consensus { // If timestamp of when current high_qc was written exists then use it to estimate the minimum number of blocks the network has moved on since shut down // This is useful in scenarios in which consensus has failed since this node went down - if let Some(latest_high_qc_timestamp) = consensus.db.get_high_qc_updated_at()? { + if let Some(latest_high_qc_timestamp) = + consensus.db.read()?.high_qc()?.get()?.map(|(_, t)| t) + { let view_diff = Consensus::minimum_views_in_time_difference( latest_high_qc_timestamp.elapsed()?, consensus.config.consensus.consensus_timeout, @@ -386,7 +400,9 @@ impl Consensus { view_diff, min_view_since_high_qc_updated ); - consensus.db.set_view(min_view_since_high_qc_updated)?; + let write = consensus.db.write()?; + write.view()?.set(min_view_since_high_qc_updated)?; + write.commit()?; } // Remind block_store of our peers and request any potentially missing blocks @@ -449,13 +465,12 @@ impl Consensus { } pub fn head_block(&self) -> Block { - let highest_block_number = self - .block_store - .get_highest_canonical_block_number() + self.db + .read() .unwrap() - .unwrap(); - self.block_store - .get_canonical_block_by_number(highest_block_number) + .blocks() + .unwrap() + .max_canonical_by_view() .unwrap() .unwrap() } @@ -641,7 +656,7 @@ impl Consensus { block.hash() ); - if self.block_store.contains_block(&block.hash())? { + if self.block_store.contains_block(block.view())? { trace!("ignoring block proposal, block store contains this block already"); return Ok(None); } @@ -976,7 +991,7 @@ impl Consensus { } pub fn get_touched_transactions(&self, address: Address) -> Result> { - self.db.get_touched_transactions(address) + self.db.read()?.touched_address_index()?.get(address) } /// Clear up anything in memory that is no longer required. This is to avoid memory leaks. @@ -1256,14 +1271,13 @@ impl Consensus { let (qc, parent) = match agg { // Create dummy QC for now if aggQC not provided None => { - // Start with highest canonical block - let num = self - .db - .get_highest_canonical_block_number()? - .context("no canonical blocks")?; // get highest canonical block number + // Start with highest block let block = self - .get_canonical_block_by_number(num)? - .context("missing canonical block")?; // retrieve highest canonical block + .db + .read()? + .blocks()? + .max_canonical_by_view()? + .ok_or(anyhow!("no blocks"))?; ( QuorumCertificate::new_with_identity(block.hash(), block.view()), block, @@ -1389,7 +1403,9 @@ impl Consensus { proposal.header, &mut inspector, )?; - self.db.insert_transaction(&tx.hash, &tx.tx)?; + let write = self.db.write()?; + write.transactions()?.insert(tx.hash, &tx.tx)?; + write.commit()?; // Skip transactions whose execution resulted in an error and drop them. let Some(result) = result else { @@ -1511,14 +1527,13 @@ impl Consensus { } /// Assembles a Pending block. fn assemble_pending_block_at(&self, state: &mut State) -> Result> { - // Start with highest canonical block - let num = self - .db - .get_highest_canonical_block_number()? - .context("no canonical blocks")?; // get highest canonical block number + // Start with highest block let block = self - .get_canonical_block_by_number(num)? - .context("missing canonical block")?; // retrieve highest canonical block + .db + .read()? + .blocks()? + .max_canonical_by_view()? + .ok_or(anyhow!("no blocks"))?; // Generate early QC let early_qc = QuorumCertificate::new_with_identity(block.hash(), block.view()); @@ -1579,7 +1594,9 @@ impl Consensus { executed_block_header, inspector::noop(), )?; - self.db.insert_transaction(&txn.hash, &txn.tx)?; + let write = self.db.write()?; + write.transactions()?.insert(txn.hash, &txn.tx)?; + write.commit()?; // Skip transactions whose execution resulted in an error let Some(result) = result else { @@ -1888,7 +1905,7 @@ impl Consensus { /// Returns (flag, outcome). /// flag is true if the transaction was newly added to the pool - ie. if it validated correctly and has not been seen before. pub fn new_transaction(&mut self, txn: VerifiedTransaction) -> Result { - if self.db.contains_transaction(&txn.hash)? { + if self.db.read()?.transactions()?.contains(txn.hash)? { debug!("Transaction {:?} already in mempool", txn.hash); return Ok(TxAddResult::Duplicate(txn.hash)); } @@ -1945,20 +1962,16 @@ impl Consensus { pub fn get_transaction_by_hash(&self, hash: Hash) -> Result> { Ok(self .db - .get_transaction(&hash)? + .read()? + .transactions()? + .get(hash)? .map(|tx| tx.verify()) .transpose()? .or_else(|| self.transaction_pool.get_transaction(hash).cloned())) } pub fn get_transaction_receipt(&self, hash: &Hash) -> Result> { - let Some(block_hash) = self.db.get_block_hash_reverse_index(hash)? else { - return Ok(None); - }; - let block_receipts = self.db.get_transaction_receipts_in_block(&block_hash)?; - Ok(block_receipts - .into_iter() - .find(|receipt| receipt.tx_hash == *hash)) + self.db.read()?.receipts()?.get(*hash) } fn update_high_qc_and_view( @@ -1977,7 +1990,9 @@ impl Consensus { if self.high_qc.block_hash == Hash::ZERO { trace!("received high qc, self high_qc is currently uninitialized, setting to the new one."); - self.db.set_high_qc(new_high_qc)?; + let write = self.db.write()?; + write.high_qc()?.set(&new_high_qc)?; + write.commit()?; self.high_qc = new_high_qc; } else { let current_high_qc_view = self @@ -1994,7 +2009,9 @@ impl Consensus { new_high_qc_block_view + 1, current_high_qc_view, ); - self.db.set_high_qc(new_high_qc)?; + let write = self.db.write()?; + write.high_qc()?.set(&new_high_qc)?; + write.commit()?; self.high_qc = new_high_qc; if new_high_qc_block_view >= view { self.set_view(new_high_qc_block_view + 1)?; @@ -2140,7 +2157,16 @@ impl Consensus { ); self.set_finalized_view(block.view())?; - let receipts = self.db.get_transaction_receipts_in_block(&block.hash())?; + let read = self.db.read()?; + let receipts: Vec<_> = block + .transactions + .iter() + .map(|txn_hash| { + read.receipts()? + .get(*txn_hash)? + .ok_or(anyhow!("missing receipt")) + }) + .collect::>()?; for (destination_shard, intershard_call) in blockhooks::get_cross_shard_messages(&receipts)? { @@ -2171,17 +2197,16 @@ impl Consensus { && self.epoch_is_checkpoint(self.epoch_number(block.number())) { if let Some(checkpoint_path) = self.db.get_checkpoint_dir()? { - let parent = self - .db - .get_block_by_hash(&block.parent_hash())? - .ok_or(anyhow!( - "Trying to checkpoint block, but we don't have its parent" - ))?; + let read = self.db.read()?; + let parent = read + .blocks()? + .by_hash(block.parent_hash())? + .ok_or(anyhow!("missing block"))?; let transactions: Vec = block .transactions .iter() .map(|txn_hash| { - let tx = self.db.get_transaction(txn_hash)?.ok_or(anyhow!( + let tx = read.transactions()?.get(*txn_hash)?.ok_or(anyhow!( "failed to fetch transaction {} for checkpoint parent {}", txn_hash, parent.hash() @@ -2213,7 +2238,9 @@ impl Consensus { .ok_or(anyhow!("No such block number {block_number}"))?; let parent = self .db - .get_block_by_hash(&block.parent_hash())? + .read()? + .blocks()? + .by_hash(block.parent_hash())? .ok_or(anyhow!( "Trying to checkpoint block, but we don't have its parent" ))?; @@ -2221,11 +2248,16 @@ impl Consensus { .transactions .iter() .map(|txn_hash| { - let tx = self.db.get_transaction(txn_hash)?.ok_or(anyhow!( - "failed to fetch transaction {} for checkpoint parent {}", - txn_hash, - parent.hash() - ))?; + let tx = self + .db + .read()? + .transactions()? + .get(*txn_hash)? + .ok_or(anyhow!( + "failed to fetch transaction {} for checkpoint parent {}", + txn_hash, + parent.hash() + ))?; Ok::<_, anyhow::Error>(tx) }) .collect::>>()?; @@ -2529,18 +2561,21 @@ impl Consensus { } fn set_finalized_view(&mut self, view: u64) -> Result<()> { - self.db.set_finalized_view(view) + let write = self.db.write()?; + write.finalized_view()?.set(view)?; + write.commit() } pub fn get_finalized_view(&self) -> Result { - Ok(self.db.get_finalized_view()?.unwrap_or_else(|| { + Ok(self.db.read()?.finalized_view()?.get()?.unwrap_or_else(|| { warn!("no finalised view found in table. Defaulting to 0"); 0 })) } fn set_view(&mut self, view: u64) -> Result<()> { - if self.db.set_view(view)? { + let write = self.db.write()?; + if write.view()?.set(view)? { self.view_updated_at = SystemTime::now(); } else { warn!( @@ -2548,11 +2583,11 @@ impl Consensus { view ); } - Ok(()) + write.commit() } pub fn get_view(&self) -> Result { - Ok(self.db.get_view()?.unwrap_or_else(|| { + Ok(self.db.read()?.view()?.get()?.unwrap_or_else(|| { warn!("no view found in table. Defaulting to 0"); 0 })) @@ -2795,28 +2830,24 @@ impl Consensus { // Then, revert the blocks from the head block to the common ancestor // Then, apply the blocks (forward) from the common ancestor to the parent of the new block let mut head = self.head_block(); - let mut head_height = head.number(); let mut proposed_block = block.clone(); - let mut proposed_block_height = block.number(); trace!( "Dealing with fork: from block {} (height {}), back to block {} (height {})", head.hash(), - head_height, + head.number(), proposed_block.hash(), - proposed_block_height + proposed_block.number(), ); // Need to make sure both pointers are at the same height - while head_height > proposed_block_height { + while head.number() > proposed_block.number() { trace!("Stepping back head block pointer"); head = self.get_block(&head.parent_hash())?.unwrap(); - head_height = head.number(); } - while proposed_block_height > head_height { + while proposed_block.number() > head.number() { trace!("Stepping back proposed block pointer"); proposed_block = self.get_block(&proposed_block.parent_hash())?.unwrap(); - proposed_block_height = proposed_block.number(); } // We now have both hash pointers at the same height, we can walk back until they are equal. @@ -2860,6 +2891,7 @@ impl Consensus { self.transaction_pool.insert_transaction(txn, account_nonce); } + let write = self.db.write()?; // block transactions need to be removed from self.transactions and re-injected for tx_hash in &head_block.transactions { let orig_tx = self.get_transaction_by_hash(*tx_hash)?.unwrap(); @@ -2868,13 +2900,13 @@ impl Consensus { let account_nonce = self.state.get_account(orig_tx.signer)?.nonce; self.transaction_pool .insert_transaction(orig_tx, account_nonce); + // purge from the db + write.delete_transaction(*tx_hash)?; } - // then purge them all from the db, including receipts and indexes - self.db - .remove_transactions_executed_in_block(&head_block.hash())?; // this block is no longer in the main chain - self.db.mark_block_as_non_canonical(head_block.hash())?; + write.blocks()?.set_non_canonical(head_block.view())?; + write.commit()?; } // Now, we execute forward from the common ancestor to the new block parent which can @@ -2950,6 +2982,7 @@ impl Consensus { let mut block_receipts = Vec::new(); + let write = self.db.write()?; for (tx_index, txn_hash) in block.transactions.iter().enumerate() { let (receipt, addresses) = self .receipts_cache @@ -2961,9 +2994,10 @@ impl Consensus { // Apply 'touched-address' from cache for address in addresses { - self.db.add_touched_address(address, *txn_hash)?; + write.touched_address_index()?.insert(address, *txn_hash)?; } } + write.commit()?; // fast-forward state self.state.set_to_root(block.state_root_hash().into()); @@ -3039,17 +3073,18 @@ impl Consensus { debug!(?receipt, "applied transaction {:?}", receipt); block_receipts.push((receipt, tx_index)); } - self.db.with_sqlite_tx(|sqlite_tx| { + let write = self.db.write()?; + { + let mut transactions = write.transactions()?; for txn in &verified_txns { - self.db - .insert_transaction_with_db_tx(sqlite_tx, &txn.hash, &txn.tx)?; + transactions.insert(txn.hash, &txn.tx)?; } + let mut touched_address_index = write.touched_address_index()?; for (addr, txn_hash) in touched_addresses { - self.db - .add_touched_address_with_db_tx(sqlite_tx, addr, txn_hash)?; + touched_address_index.insert(addr, txn_hash)?; } - Ok(()) - })?; + } + write.commit()?; if cumulative_gas_used != block.gas_used() { warn!("Cumulative gas used by executing all transactions: {cumulative_gas_used} is different that the one provided in the block: {}", block.gas_used()); @@ -3129,25 +3164,20 @@ impl Consensus { // Important - only add blocks we are going to execute because they can potentially // overwrite the mapping of block height to block, which there should only be one of. // for example, this HAS to be after the deal with fork call - if !self.db.contains_block(&block.hash())? { + if !self.db.read()?.blocks()?.contains(block.view())? { // Only tell the block store where this block came from if it wasn't from ourselves. let from = from.filter(|peer_id| *peer_id != self.peer_id()); // If we were the proposer we would've already processed the block, hence the check self.add_block(from, block.clone())?; } - { - // helper scope to shadow db, to avoid moving it into the closure - // closure has to be move to take ownership of block_receipts - let db = &self.db; - self.db.with_sqlite_tx(move |sqlite_tx| { - for (receipt, _) in block_receipts { - db.insert_transaction_receipt_with_db_tx(sqlite_tx, receipt)?; - } - Ok(()) - })?; + + let write = self.db.write()?; + for (receipt, _) in block_receipts { + write.receipts()?.insert(&receipt)?; } - self.db.mark_block_as_canonical(block.hash())?; + write.blocks()?.set_canonical(block.view())?; + write.commit()?; Ok(()) } diff --git a/zilliqa/src/db.rs b/zilliqa/src/db.rs deleted file mode 100644 index 0ae2ba8bd..000000000 --- a/zilliqa/src/db.rs +++ /dev/null @@ -1,1373 +0,0 @@ -use std::{ - collections::BTreeMap, - fmt::Debug, - fs::{self, File}, - io::{BufReader, BufWriter, Read, Write}, - ops::Range, - path::{Path, PathBuf}, - sync::{Arc, Mutex}, - time::Duration, -}; - -use alloy::primitives::Address; -use anyhow::{anyhow, Context, Result}; -use eth_trie::{EthTrie, Trie}; -use itertools::Itertools; -use lru_mem::LruCache; -use lz4::{Decoder, EncoderBuilder}; -use rusqlite::{ - named_params, - types::{FromSql, FromSqlError, ToSqlOutput}, - Connection, OptionalExtension, Row, ToSql, -}; -use serde::{Deserialize, Serialize}; -use tracing::warn; - -use crate::{ - crypto::{Hash, NodeSignature}, - exec::{ScillaError, ScillaException, ScillaTransition}, - message::{AggregateQc, Block, BlockHeader, QuorumCertificate}, - state::Account, - time::SystemTime, - transaction::{EvmGas, Log, SignedTransaction, TransactionReceipt}, -}; - -macro_rules! sqlify_with_bincode { - ($type: ty) => { - impl ToSql for $type { - fn to_sql(&self) -> rusqlite::Result> { - let data = bincode::serialize(self) - .map_err(|e| rusqlite::Error::ToSqlConversionFailure(e))?; - Ok(ToSqlOutput::from(data)) - } - } - impl FromSql for $type { - fn column_result( - value: rusqlite::types::ValueRef<'_>, - ) -> rusqlite::types::FromSqlResult { - let blob = value.as_blob()?; - bincode::deserialize(blob).map_err(|e| FromSqlError::Other(e)) - } - } - }; -} - -/// Creates a thin wrapper for a type with proper From traits. To ease implementing To/FromSql on -/// foreign types. -macro_rules! make_wrapper { - ($old: ty, $new: ident) => { - paste::paste! { - #[derive(Serialize, Deserialize)] - struct $new($old); - - impl From<$old> for $new { - fn from(value: $old) -> Self { - Self(value) - } - } - - impl From<$new> for $old { - fn from(value: $new) -> Self { - value.0 - } - } - } - }; -} - -sqlify_with_bincode!(AggregateQc); -sqlify_with_bincode!(QuorumCertificate); -sqlify_with_bincode!(NodeSignature); -sqlify_with_bincode!(SignedTransaction); - -make_wrapper!(Vec, VecScillaExceptionSqlable); -sqlify_with_bincode!(VecScillaExceptionSqlable); -make_wrapper!(BTreeMap>, MapScillaErrorSqlable); -sqlify_with_bincode!(MapScillaErrorSqlable); - -make_wrapper!(Vec, VecLogSqlable); -sqlify_with_bincode!(VecLogSqlable); - -make_wrapper!(Vec, VecScillaTransitionSqlable); -sqlify_with_bincode!(VecScillaTransitionSqlable); - -make_wrapper!(SystemTime, SystemTimeSqlable); -impl ToSql for SystemTimeSqlable { - fn to_sql(&self) -> rusqlite::Result> { - use std::mem::size_of; - - let since_epoch = self.0.duration_since(SystemTime::UNIX_EPOCH).unwrap(); - - let mut buf = [0u8; size_of::() + size_of::()]; - - buf[..size_of::()].copy_from_slice(&since_epoch.as_secs().to_be_bytes()[..]); - buf[size_of::()..].copy_from_slice(&since_epoch.subsec_nanos().to_be_bytes()[..]); - - Ok(ToSqlOutput::from(buf.to_vec())) - } -} -impl FromSql for SystemTimeSqlable { - fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult { - use std::mem::size_of; - - let blob = value.as_blob()?; - - if blob.len() != size_of::() + size_of::() { - return Err(FromSqlError::InvalidBlobSize { - expected_size: size_of::() + size_of::(), - blob_size: blob.len(), - }); - } - - let mut secs_buf = [0u8; size_of::()]; - let mut subsec_nanos_buf = [0u8; size_of::()]; - - secs_buf.copy_from_slice(&blob[..size_of::()]); - subsec_nanos_buf.copy_from_slice(&blob[size_of::()..]); - - let secs = u64::from_be_bytes(secs_buf); - let subsec_nanos = u32::from_be_bytes(subsec_nanos_buf); - - Ok(SystemTimeSqlable( - SystemTime::UNIX_EPOCH + Duration::new(secs, subsec_nanos), - )) - } -} - -make_wrapper!(Address, AddressSqlable); -impl ToSql for AddressSqlable { - fn to_sql(&self) -> rusqlite::Result> { - Ok(ToSqlOutput::from(self.0.as_slice())) - } -} -impl FromSql for AddressSqlable { - fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult { - Ok(AddressSqlable(Address::from(<[u8; 20]>::column_result( - value, - )?))) - } -} - -impl ToSql for Hash { - fn to_sql(&self) -> rusqlite::Result> { - Ok(ToSqlOutput::from(self.0.to_vec())) - } -} -impl FromSql for Hash { - fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult { - Ok(Hash(<[u8; 32]>::column_result(value)?)) - } -} - -impl ToSql for EvmGas { - fn to_sql(&self) -> rusqlite::Result> { - self.0.to_sql() - } -} - -impl FromSql for EvmGas { - fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult { - Ok(Self(u64::column_result(value)?)) - } -} - -enum BlockFilter { - Hash(Hash), - View(u64), - Height(u64), -} - -const CHECKPOINT_HEADER_BYTES: [u8; 8] = *b"ZILCHKPT"; - -#[derive(Debug)] -pub struct Db { - db: Arc>, - state_cache: Arc, Vec>>>, - path: Option>, -} - -impl Db { - pub fn new

(data_dir: Option

, shard_id: u64, state_cache_size: usize) -> Result - where - P: AsRef, - { - let (mut connection, path) = match data_dir { - Some(path) => { - let path = path.as_ref().join(shard_id.to_string()); - fs::create_dir_all(&path).context(format!("Unable to create {path:?}"))?; - let db_path = path.join("db.sqlite3"); - ( - Connection::open(&db_path) - .context(format!("Cannot access sqlite db {0:?}", &db_path))?, - Some(path.into_boxed_path()), - ) - } - None => (Connection::open_in_memory()?, None), - }; - - // SQLite performance tweaks - - // large page_size is more compact/efficient - connection.pragma_update(None, "page_size", 1 << 15)?; - let page_size: i32 = connection.pragma_query_value(None, "page_size", |r| r.get(0))?; - - // reduced non-critical fsync() calls - connection.pragma_update(None, "synchronous", "NORMAL")?; - let synchronous: i8 = connection.pragma_query_value(None, "synchronous", |r| r.get(0))?; - - // store temporary tables/indices in-memory - connection.pragma_update(None, "temp_store", "MEMORY")?; - let temp_store: i8 = connection.pragma_query_value(None, "temp_store", |r| r.get(0))?; - - // general read/write performance improvement - let journal_mode: String = - connection.pragma_update_and_check(None, "journal_mode", "WAL", |r| r.get(0))?; - - // retain journal size of 32MB - based on observations - let journal_size_limit: i32 = - connection - .pragma_update_and_check(None, "journal_size_limit", 1 << 25, |r| r.get(0))?; - - // cache 1-days data (256MB) in-memory - connection.pragma_update(None, "cache_size", (1 << 28) / page_size)?; - let cache_size: i32 = connection.pragma_query_value(None, "cache_size", |r| r.get(0))?; - - tracing::info!( - ?journal_mode, - ?journal_size_limit, - ?synchronous, - ?temp_store, - ?page_size, - ?cache_size, - "PRAGMA" - ); - - // Add tracing - logs all SQL statements - connection.trace(Some(|statement| tracing::trace!(statement, "sql executed"))); - - Self::ensure_schema(&connection)?; - - Ok(Db { - db: Arc::new(Mutex::new(connection)), - state_cache: Arc::new(Mutex::new(LruCache::new(state_cache_size))), - path, - }) - } - - fn ensure_schema(connection: &Connection) -> Result<()> { - connection.execute_batch( - "CREATE TABLE IF NOT EXISTS blocks ( - block_hash BLOB NOT NULL PRIMARY KEY, - view INTEGER NOT NULL UNIQUE, - height INTEGER NOT NULL, - signature BLOB NOT NULL, - state_root_hash BLOB NOT NULL, - transactions_root_hash BLOB NOT NULL, - receipts_root_hash BLOB NOT NULL, - timestamp BLOB NOT NULL, - gas_used INTEGER NOT NULL, - gas_limit INTEGER NOT NULL, - qc BLOB NOT NULL, - agg BLOB, - is_canonical BOOLEAN NOT NULL) WITHOUT ROWID; - CREATE INDEX IF NOT EXISTS idx_blocks_height ON blocks(height); - CREATE TABLE IF NOT EXISTS transactions ( - tx_hash BLOB NOT NULL PRIMARY KEY, - data BLOB NOT NULL) WITHOUT ROWID; - CREATE TABLE IF NOT EXISTS receipts ( - tx_hash BLOB NOT NULL PRIMARY KEY REFERENCES transactions (tx_hash) ON DELETE CASCADE, - block_hash BLOB NOT NULL REFERENCES blocks (block_hash), -- the touched_address_index needs to be updated for all the txs in the block, so delete txs first - thus no cascade here - tx_index INTEGER NOT NULL, - success INTEGER NOT NULL, - gas_used INTEGER NOT NULL, - cumulative_gas_used INTEGER NOT NULL, - contract_address BLOB, - logs BLOB, - transitions BLOB, - accepted INTEGER, - errors BLOB, - exceptions BLOB); - CREATE INDEX IF NOT EXISTS block_hash_index ON receipts (block_hash); - CREATE TABLE IF NOT EXISTS touched_address_index ( - address BLOB, - tx_hash BLOB REFERENCES transactions (tx_hash) ON DELETE CASCADE, - PRIMARY KEY (address, tx_hash)) WITHOUT ROWID; - CREATE TABLE IF NOT EXISTS tip_info ( - finalized_view INTEGER, - view INTEGER, - high_qc BLOB, - high_qc_updated_at BLOB, - _single_row INTEGER DEFAULT 0 NOT NULL UNIQUE CHECK (_single_row = 0)); -- max 1 row - CREATE TABLE IF NOT EXISTS state_trie (key BLOB NOT NULL PRIMARY KEY, value BLOB NOT NULL) WITHOUT ROWID; - ", - )?; - Ok(()) - } - - pub fn get_checkpoint_dir(&self) -> Result>> { - let Some(base_path) = &self.path else { - // If we don't have on-disk persistency, disable checkpoints too - warn!( - "Attempting to create checkpoint, but no persistence directory has been configured" - ); - return Ok(None); - }; - Ok(Some(base_path.join("checkpoints").into_boxed_path())) - } - - /// Fetch checkpoint data from file and initialise db state - /// Return checkpointed block and transactions which must be executed after this function - /// Return None if checkpoint already loaded - pub fn load_trusted_checkpoint>( - &self, - path: P, - hash: &Hash, - our_shard_id: u64, - ) -> Result, Block)>> { - // For now, only support a single version: you want to load the latest checkpoint, anyway. - const SUPPORTED_VERSION: u32 = 3; - - // Decompress file and write to temp file - let input_filename = path.as_ref(); - let temp_filename = input_filename.with_extension("part"); - decompress_file(input_filename, &temp_filename)?; - - // Read decompressed file - let input = File::open(&temp_filename)?; - - let mut reader = BufReader::with_capacity(8192 * 1024, input); // 8 MiB read chunks - let trie_storage = Arc::new(self.state_trie()?); - let mut state_trie = EthTrie::new(trie_storage.clone()); - - // Decode and validate header - let mut header: [u8; 21] = [0u8; 21]; - reader.read_exact(&mut header)?; - let header = header; - if header[0..8] != CHECKPOINT_HEADER_BYTES // magic bytes - || header[20] != b'\n' - // header must end in newline - { - return Err(anyhow!("Invalid checkpoint file: invalid header")); - } - let version = u32::from_be_bytes(header[8..12].try_into()?); - // Only support a single version right now. - if version != SUPPORTED_VERSION { - return Err(anyhow!("Invalid checkpoint file: unsupported version.")); - } - let shard_id = u64::from_be_bytes(header[12..20].try_into()?); - if shard_id != our_shard_id { - return Err(anyhow!("Invalid checkpoint file: wrong shard ID.")); - } - - // Decode and validate checkpoint block, its transactions and parent block - let mut block_len_buf = [0u8; std::mem::size_of::()]; - reader.read_exact(&mut block_len_buf)?; - let mut block_ser = vec![0u8; usize::try_from(u64::from_be_bytes(block_len_buf))?]; - reader.read_exact(&mut block_ser)?; - let block: Block = bincode::deserialize(&block_ser)?; - if block.hash() != *hash { - return Err(anyhow!("Checkpoint does not match trusted hash")); - } - block.verify_hash()?; - - let mut transactions_len_buf = [0u8; std::mem::size_of::()]; - reader.read_exact(&mut transactions_len_buf)?; - let mut transactions_ser = - vec![0u8; usize::try_from(u64::from_be_bytes(transactions_len_buf))?]; - reader.read_exact(&mut transactions_ser)?; - let transactions: Vec = bincode::deserialize(&transactions_ser)?; - - let mut parent_len_buf = [0u8; std::mem::size_of::()]; - reader.read_exact(&mut parent_len_buf)?; - let mut parent_ser = vec![0u8; usize::try_from(u64::from_be_bytes(parent_len_buf))?]; - reader.read_exact(&mut parent_ser)?; - let parent: Block = bincode::deserialize(&parent_ser)?; - if block.parent_hash() != parent.hash() { - return Err(anyhow!("Invalid checkpoint file: parent's blockhash does not correspond to checkpoint block")); - } - - if state_trie.iter().next().is_some() - || self.get_highest_canonical_block_number()?.is_some() - { - // If checkpointed block already exists then assume checkpoint load already complete. Return None - if self.get_block_by_hash(&block.hash())?.is_some() { - return Ok(None); - } - // This may not be strictly necessary, as in theory old values will, at worst, be orphaned - // values not part of any state trie of any known block. With some effort, this could - // even be supported. - // However, without such explicit support, having old blocks MAY in fact cause - // unexpected and unwanted behaviour. Thus we currently forbid loading a checkpoint in - // a node that already contains previous state, until (and unless) there's ever a - // usecase for going through the effort to support it and ensure it works as expected. - if let Some(db_block) = self.get_block_by_hash(&parent.hash())? { - if db_block.parent_hash() != parent.parent_hash() { - return Err(anyhow!("Inconsistent checkpoint file: block loaded from checkpoint and block stored in database with same hash have differing parent hashes")); - } else { - // In this case, the database already has the block contained in this checkpoint. We assume the - // database contains the full state for that block too and thus return early, without actually - // loading the checkpoint file. - return Ok(Some((block, transactions, parent))); - } - } else { - return Err(anyhow!("Inconsistent checkpoint file: block loaded from checkpoint file does not exist in non-empty database")); - } - } - - // then decode state - loop { - // Read account key and the serialised Account - let mut account_hash = [0u8; 32]; - match reader.read_exact(&mut account_hash) { - // Read successful - Ok(_) => (), - // Break loop here if weve reached the end of the file - Err(ref e) if e.kind() == std::io::ErrorKind::UnexpectedEof => { - break; - } - Err(e) => return Err(e.into()), - }; - - let mut serialised_account_len_buf = [0u8; std::mem::size_of::()]; - reader.read_exact(&mut serialised_account_len_buf)?; - let mut serialised_account = - vec![0u8; usize::try_from(u64::from_be_bytes(serialised_account_len_buf))?]; - reader.read_exact(&mut serialised_account)?; - - // Read entire account storage as a buffer - let mut account_storage_len_buf = [0u8; std::mem::size_of::()]; - reader.read_exact(&mut account_storage_len_buf)?; - let account_storage_len = usize::try_from(u64::from_be_bytes(account_storage_len_buf))?; - let mut account_storage = vec![0u8; account_storage_len]; - reader.read_exact(&mut account_storage)?; - - // Pull out each storage key and value - let mut account_trie = EthTrie::new(trie_storage.clone()); - let mut pointer: usize = 0; - while account_storage_len > pointer { - let storage_key_len_buf: &[u8] = - &account_storage[pointer..(pointer + std::mem::size_of::())]; - let storage_key_len = - usize::try_from(u64::from_be_bytes(storage_key_len_buf.try_into()?))?; - pointer += std::mem::size_of::(); - let storage_key: &[u8] = &account_storage[pointer..(pointer + storage_key_len)]; - pointer += storage_key_len; - - let storage_val_len_buf: &[u8] = - &account_storage[pointer..(pointer + std::mem::size_of::())]; - let storage_val_len = - usize::try_from(u64::from_be_bytes(storage_val_len_buf.try_into()?))?; - pointer += std::mem::size_of::(); - let storage_val: &[u8] = &account_storage[pointer..(pointer + storage_val_len)]; - pointer += storage_val_len; - - account_trie.insert(storage_key, storage_val)?; - } - - let account_trie_root = - bincode::deserialize::(&serialised_account)?.storage_root; - if account_trie.root_hash()?.as_slice() != account_trie_root { - return Err(anyhow!( - "Invalid checkpoint file: account trie root hash mismatch: calculated {}, checkpoint file contained {}", hex::encode(account_trie.root_hash()?.as_slice()), hex::encode(account_trie_root) - )); - } - state_trie.insert(&account_hash, &serialised_account)?; - } - - if state_trie.root_hash()? != parent.state_root_hash().0 { - return Err(anyhow!("Invalid checkpoint file: state root hash mismatch")); - } - - let parent_ref: &Block = &parent; // for moving into the closure - self.with_sqlite_tx(move |tx| { - self.insert_block_with_db_tx(tx, parent_ref)?; - self.set_finalized_view_with_db_tx(tx, parent_ref.view())?; - self.set_high_qc_with_db_tx(tx, block.header.qc)?; - self.set_view_with_db_tx(tx, parent_ref.view() + 1)?; - Ok(()) - })?; - - fs::remove_file(temp_filename)?; - - Ok(Some((block, transactions, parent))) - } - - pub fn state_trie(&self) -> Result { - Ok(TrieStorage { - db: self.db.clone(), - cache: self.state_cache.clone(), - }) - } - - pub fn with_sqlite_tx(&self, operations: impl FnOnce(&Connection) -> Result<()>) -> Result<()> { - let mut sqlite_tx = self.db.lock().unwrap(); - let sqlite_tx = sqlite_tx.transaction()?; - operations(&sqlite_tx)?; - Ok(sqlite_tx.commit()?) - } - - pub fn get_block_hash_by_view(&self, view: u64) -> Result> { - Ok(self - .db - .lock() - .unwrap() - .query_row_and_then( - "SELECT block_hash FROM blocks WHERE view = ?1", - [view], - |row| row.get(0), - ) - .optional()?) - } - - pub fn set_finalized_view_with_db_tx(&self, sqlite_tx: &Connection, view: u64) -> Result<()> { - sqlite_tx - .execute("INSERT INTO tip_info (finalized_view) VALUES (?1) ON CONFLICT DO UPDATE SET finalized_view = ?1", - [view])?; - Ok(()) - } - - pub fn set_finalized_view(&self, view: u64) -> Result<()> { - self.set_finalized_view_with_db_tx(&self.db.lock().unwrap(), view) - } - - pub fn get_finalized_view(&self) -> Result> { - Ok(self - .db - .lock() - .unwrap() - .query_row("SELECT finalized_view FROM tip_info", (), |row| row.get(0)) - .optional() - .unwrap_or(None)) - } - - /// Write view and timestamp to table if view is larger than current. Return true if write was successful - pub fn set_view_with_db_tx(&self, sqlite_tx: &Connection, view: u64) -> Result { - let res = sqlite_tx - .execute("INSERT INTO tip_info (view) VALUES (?1) ON CONFLICT(_single_row) DO UPDATE SET view = ?1 WHERE tip_info.view IS NULL OR tip_info.view < ?1", - [view])?; - Ok(res != 0) - } - - pub fn set_view(&self, view: u64) -> Result { - self.set_view_with_db_tx(&self.db.lock().unwrap(), view) - } - - pub fn get_view(&self) -> Result> { - Ok(self - .db - .lock() - .unwrap() - .query_row("SELECT view FROM tip_info", (), |row| row.get(0)) - .optional() - .unwrap_or(None)) - } - - // Deliberately not named get_highest_block_number() because there used to be one - // of those with unclear semantics, so changing name to force the compiler to error - // if it was used. - pub fn get_highest_recorded_block_number(&self) -> Result> { - Ok(self - .db - .lock() - .unwrap() - .query_row_and_then( - "SELECT height FROM blocks ORDER BY height DESC LIMIT 1", - (), - |row| row.get(0), - ) - .optional()?) - } - - pub fn get_highest_canonical_block_number(&self) -> Result> { - Ok(self - .db - .lock() - .unwrap() - .query_row_and_then( - "SELECT height FROM blocks WHERE is_canonical = TRUE ORDER BY height DESC LIMIT 1", - (), - |row| row.get(0), - ) - .optional()?) - } - - pub fn get_highest_block_hashes(&self, how_many: usize) -> Result> { - Ok(self - .db - .lock() - .unwrap() - .prepare_cached( - "select block_hash from blocks where is_canonical = true order by height desc limit ?1")? - .query_map([how_many], |row| row.get(0))?.collect::, _>>()?) - } - - pub fn set_high_qc_with_db_tx( - &self, - sqlite_tx: &Connection, - high_qc: QuorumCertificate, - ) -> Result<()> { - sqlite_tx.execute( - "INSERT INTO tip_info (high_qc, high_qc_updated_at) VALUES (:high_qc, :timestamp) ON CONFLICT DO UPDATE SET high_qc = :high_qc, high_qc_updated_at = :timestamp", - named_params! { - ":high_qc": high_qc, - ":timestamp": SystemTimeSqlable(SystemTime::now()) - })?; - Ok(()) - } - - pub fn set_high_qc(&self, high_qc: QuorumCertificate) -> Result<()> { - self.set_high_qc_with_db_tx(&self.db.lock().unwrap(), high_qc) - } - - pub fn get_high_qc(&self) -> Result> { - Ok(self - .db - .lock() - .unwrap() - .query_row("SELECT high_qc FROM tip_info", (), |row| row.get(0)) - .optional()? - .flatten()) - } - - pub fn get_high_qc_updated_at(&self) -> Result> { - Ok(self - .db - .lock() - .unwrap() - .query_row("SELECT high_qc_updated_at FROM tip_info", (), |row| { - row.get::<_, SystemTimeSqlable>(0) - }) - .optional() - .unwrap_or(None) - .map(Into::::into)) - } - - pub fn add_touched_address_with_db_tx( - &self, - sqlite_tx: &Connection, - address: Address, - txn_hash: Hash, - ) -> Result<()> { - sqlite_tx.execute( - "INSERT OR IGNORE INTO touched_address_index (address, tx_hash) VALUES (?1, ?2)", - (AddressSqlable(address), txn_hash), - )?; - Ok(()) - } - - pub fn add_touched_address(&self, address: Address, txn_hash: Hash) -> Result<()> { - self.add_touched_address_with_db_tx(&self.db.lock().unwrap(), address, txn_hash) - } - - pub fn get_touched_transactions(&self, address: Address) -> Result> { - // TODO: this is only ever used in one API, so keep an eye on performance - in case e.g. - // the index table might need to be denormalised to simplify this lookup - Ok(self - .db - .lock() - .unwrap() - .prepare_cached("SELECT tx_hash FROM touched_address_index JOIN receipts USING (tx_hash) JOIN blocks USING (block_hash) WHERE address = ?1 ORDER BY blocks.height, receipts.tx_index")? - .query_map([AddressSqlable(address)], |row| row.get(0))? - .collect::, _>>()?) - } - - pub fn get_transaction(&self, txn_hash: &Hash) -> Result> { - Ok(self - .db - .lock() - .unwrap() - .query_row( - "SELECT data FROM transactions WHERE tx_hash = ?1", - [txn_hash], - |row| row.get(0), - ) - .optional()?) - } - - pub fn contains_transaction(&self, hash: &Hash) -> Result { - Ok(self - .db - .lock() - .unwrap() - .query_row( - "SELECT 1 FROM transactions WHERE tx_hash = ?1", - [hash], - |row| row.get::<_, i64>(0), - ) - .optional()? - .is_some()) - } - - pub fn insert_transaction_with_db_tx( - &self, - sqlite_tx: &Connection, - hash: &Hash, - tx: &SignedTransaction, - ) -> Result<()> { - sqlite_tx.execute( - "INSERT OR IGNORE INTO transactions (tx_hash, data) VALUES (?1, ?2)", - (hash, tx), - )?; - Ok(()) - } - - /// Insert a transaction whose hash was precalculated, to save a call to calculate_hash() if it - /// is already known - pub fn insert_transaction(&self, hash: &Hash, tx: &SignedTransaction) -> Result<()> { - self.insert_transaction_with_db_tx(&self.db.lock().unwrap(), hash, tx) - } - - pub fn remove_transactions_executed_in_block(&self, block_hash: &Hash) -> Result<()> { - // foreign key triggers will take care of receipts and touched_address_index - self.db.lock().unwrap().execute( - "DELETE FROM transactions WHERE tx_hash IN (SELECT tx_hash FROM receipts WHERE block_hash = ?1)", - [block_hash], - )?; - Ok(()) - } - - pub fn get_block_hash_reverse_index(&self, tx_hash: &Hash) -> Result> { - Ok(self - .db - .lock() - .unwrap() - .query_row( - "SELECT block_hash FROM receipts WHERE tx_hash = ?1", - [tx_hash], - |row| row.get(0), - ) - .optional()?) - } - - pub fn insert_block_with_db_tx(&self, sqlite_tx: &Connection, block: &Block) -> Result<()> { - self.insert_block_with_hash_with_db_tx(sqlite_tx, block.hash(), block) - } - - pub fn insert_block_with_hash_with_db_tx( - &self, - sqlite_tx: &Connection, - hash: Hash, - block: &Block, - ) -> Result<()> { - sqlite_tx.execute( - "INSERT INTO blocks - (block_hash, view, height, qc, signature, state_root_hash, transactions_root_hash, receipts_root_hash, timestamp, gas_used, gas_limit, agg, is_canonical) - VALUES (:block_hash, :view, :height, :qc, :signature, :state_root_hash, :transactions_root_hash, :receipts_root_hash, :timestamp, :gas_used, :gas_limit, :agg, TRUE)", - named_params! { - ":block_hash": hash, - ":view": block.header.view, - ":height": block.header.number, - ":qc": block.header.qc, - ":signature": block.header.signature, - ":state_root_hash": block.header.state_root_hash, - ":transactions_root_hash": block.header.transactions_root_hash, - ":receipts_root_hash": block.header.receipts_root_hash, - ":timestamp": SystemTimeSqlable(block.header.timestamp), - ":gas_used": block.header.gas_used, - ":gas_limit": block.header.gas_limit, - ":agg": block.agg, - })?; - Ok(()) - } - - pub fn mark_block_as_canonical(&self, hash: Hash) -> Result<()> { - self.db.lock().unwrap().execute( - "UPDATE blocks SET is_canonical = TRUE WHERE block_hash = ?1", - [hash], - )?; - Ok(()) - } - - pub fn mark_block_as_non_canonical(&self, hash: Hash) -> Result<()> { - self.db.lock().unwrap().execute( - "UPDATE blocks SET is_canonical = FALSE WHERE block_hash = ?1", - [hash], - )?; - Ok(()) - } - - pub fn insert_block(&self, block: &Block) -> Result<()> { - self.insert_block_with_db_tx(&self.db.lock().unwrap(), block) - } - - pub fn remove_block(&self, block: &Block) -> Result<()> { - self.db.lock().unwrap().execute( - "DELETE FROM blocks WHERE block_hash = ?1", - [block.header.hash], - )?; - Ok(()) - } - - fn get_transactionless_block(&self, filter: BlockFilter) -> Result> { - fn make_block(row: &Row) -> rusqlite::Result { - Ok(Block { - header: BlockHeader { - hash: row.get(0)?, - view: row.get(1)?, - number: row.get(2)?, - qc: row.get(3)?, - signature: row.get(4)?, - state_root_hash: row.get(5)?, - transactions_root_hash: row.get(6)?, - receipts_root_hash: row.get(7)?, - timestamp: row.get::<_, SystemTimeSqlable>(8)?.into(), - gas_used: row.get(9)?, - gas_limit: row.get(10)?, - }, - agg: row.get(11)?, - transactions: vec![], - }) - } - macro_rules! query_block { - ($cond: tt, $key: tt) => { - self.db.lock().unwrap().query_row(concat!("SELECT block_hash, view, height, qc, signature, state_root_hash, transactions_root_hash, receipts_root_hash, timestamp, gas_used, gas_limit, agg FROM blocks WHERE ", $cond), [$key], make_block).optional()? - }; - } - Ok(match filter { - BlockFilter::Hash(hash) => { - query_block!("block_hash = ?1", hash) - } - BlockFilter::View(view) => { - query_block!("view = ?1", view) - } - BlockFilter::Height(height) => { - query_block!("height = ?1 AND is_canonical = TRUE", height) - } - }) - } - - fn get_block(&self, filter: BlockFilter) -> Result> { - let Some(mut block) = self.get_transactionless_block(filter)? else { - return Ok(None); - }; - let transaction_hashes = self - .db - .lock() - .unwrap() - .prepare_cached("SELECT tx_hash FROM receipts WHERE block_hash = ?1")? - .query_map([block.header.hash], |row| row.get(0))? - .collect::, _>>()?; - block.transactions = transaction_hashes; - Ok(Some(block)) - } - - pub fn get_block_by_hash(&self, block_hash: &Hash) -> Result> { - self.get_block(BlockFilter::Hash(*block_hash)) - } - - pub fn get_block_by_view(&self, view: u64) -> Result> { - self.get_block(BlockFilter::View(view)) - } - - pub fn get_canonical_block_by_number(&self, number: u64) -> Result> { - self.get_block(BlockFilter::Height(number)) - } - - pub fn contains_block(&self, block_hash: &Hash) -> Result { - Ok(self - .db - .lock() - .unwrap() - .query_row( - "SELECT 1 FROM blocks WHERE block_hash = ?1", - [block_hash], - |row| row.get::<_, i64>(0), - ) - .optional()? - .is_some()) - } - - fn make_view_range(row: &Row) -> rusqlite::Result> { - // Add one to end because the range returned from SQL is inclusive. - let start: u64 = row.get(0)?; - let end_inc: u64 = row.get(1)?; - Ok(Range { - start, - end: end_inc + 1, - }) - } - - fn make_receipt(row: &Row) -> rusqlite::Result { - Ok(TransactionReceipt { - tx_hash: row.get(0)?, - block_hash: row.get(1)?, - index: row.get(2)?, - success: row.get(3)?, - gas_used: row.get(4)?, - cumulative_gas_used: row.get(5)?, - contract_address: row.get::<_, Option>(6)?.map(|a| a.into()), - logs: row.get::<_, VecLogSqlable>(7)?.into(), - transitions: row.get::<_, VecScillaTransitionSqlable>(8)?.into(), - accepted: row.get(9)?, - errors: row.get::<_, MapScillaErrorSqlable>(10)?.into(), - exceptions: row.get::<_, VecScillaExceptionSqlable>(11)?.into(), - }) - } - - pub fn insert_transaction_receipt_with_db_tx( - &self, - sqlite_tx: &Connection, - receipt: TransactionReceipt, - ) -> Result<()> { - sqlite_tx.execute( - "INSERT INTO receipts - (tx_hash, block_hash, tx_index, success, gas_used, cumulative_gas_used, contract_address, logs, transitions, accepted, errors, exceptions) - VALUES (:tx_hash, :block_hash, :tx_index, :success, :gas_used, :cumulative_gas_used, :contract_address, :logs, :transitions, :accepted, :errors, :exceptions)", - named_params! { - ":tx_hash": receipt.tx_hash, - ":block_hash": receipt.block_hash, - ":tx_index": receipt.index, - ":success": receipt.success, - ":gas_used": receipt.gas_used, - ":cumulative_gas_used": receipt.cumulative_gas_used, - ":contract_address": receipt.contract_address.map(AddressSqlable), - ":logs": VecLogSqlable(receipt.logs), - ":transitions": VecScillaTransitionSqlable(receipt.transitions), - ":accepted": receipt.accepted, - ":errors": MapScillaErrorSqlable(receipt.errors), - ":exceptions": VecScillaExceptionSqlable(receipt.exceptions), - })?; - - Ok(()) - } - - pub fn insert_transaction_receipt(&self, receipt: TransactionReceipt) -> Result<()> { - self.insert_transaction_receipt_with_db_tx(&self.db.lock().unwrap(), receipt) - } - - pub fn get_transaction_receipt(&self, txn_hash: &Hash) -> Result> { - Ok(self.db.lock().unwrap().query_row("SELECT tx_hash, block_hash, tx_index, success, gas_used, cumulative_gas_used, contract_address, logs, transitions, accepted, errors, exceptions FROM receipts WHERE tx_hash = ?1", [txn_hash], Self::make_receipt).optional()?) - } - - pub fn get_transaction_receipts_in_block( - &self, - block_hash: &Hash, - ) -> Result> { - Ok(self.db.lock().unwrap().prepare_cached("SELECT tx_hash, block_hash, tx_index, success, gas_used, cumulative_gas_used, contract_address, logs, transitions, accepted, errors, exceptions FROM receipts WHERE block_hash = ?1")?.query_map([block_hash], Self::make_receipt)?.collect::, _>>()?) - } - - pub fn remove_transaction_receipts_in_block(&self, block_hash: &Hash) -> Result<()> { - self.db - .lock() - .unwrap() - .execute("DELETE FROM receipts WHERE block_hash = ?1", [block_hash])?; - Ok(()) - } - - pub fn get_total_transaction_count(&self) -> Result { - let count: usize = - self.db - .lock() - .unwrap() - .query_row("SELECT COUNT(*) FROM transactions", [], |row| row.get(0))?; - Ok(count) - } - - /// Retrieve a list of the views in our db. - /// This is a bit horrific. What we actually do here is to find the view lower and upper bounds for the contiguous block ranges in the database. - /// See block_store.rs::availability() for details. - pub fn get_view_ranges(&self) -> Result>> { - // The island field is technically redundant, but it helps with debugging. - // - // First off, note that this function returns all available blocks - it is up to the ultimate receiver of those blocks - // to decide if they are _canonical_ blocks or not. We take no view and serve everything we have. - // - // This query: - // - // R1 = SELECT height, MIN(view) as vlb, MAX(view) as vub from blocks GROUP BY height - // - Take everything in the blocks table, group by height and retrieve the max and min view for each block height. - // - // R2 = SELECT height, vlb, vub, ROW_NUMBER() OVER (ORDER BY height) AS rank FROM R1 - // - order the result by height, and find me the height, vlb, vub, and row number in the results (which we call rank). - // (OVER is sqlite magic -see docs for details) - // - // R3 = SELECT MIN(vlb), MAX(vub), MIN(height), MAX(height), height-rank AS island FROM R2 GROUP BY island ORDER BY MIN(height) ASC - // - now group R2 by island number (i.e contiguous range of heights), and select the max view, min view, max height and min height for this range. - // Return this list ordered by MIN(height) for convenience. - // - // And now you have the set of ranges you can advertise that you can serve. You could get the same result by SELECT height FROM blocks, putting the results in - // a RangeMap and then iterating the resulting ranges - this query just makes the database do the work (and returns the associated views, since block requests - // are made by view). - Ok(self.db.lock().unwrap() - .prepare_cached("SELECT MIN(vlb), MAX(vub), MIN(height),MAX(height),height-rank AS island FROM ( SELECT height,vlb,vub,ROW_NUMBER() OVER (ORDER BY height) AS rank FROM - (SELECT height,MIN(view) as vlb, MAX(view) as vub from blocks GROUP BY height ) ) GROUP BY island ORDER BY MIN(height) ASC")? - .query_map([], Self::make_view_range)?.collect::,_>>()?) - } - - /// Forget about a range of blocks; this saves space, but also allows us to test our block fetch algorithm. - pub fn forget_block_range(&self, blocks: Range) -> Result<()> { - self.with_sqlite_tx(move |tx| { - // Remove everything! - tx.execute("DELETE FROM tip_info WHERE finalized_view IN (SELECT view FROM blocks WHERE height >= :low AND height < :high)", - named_params! { - ":low" : blocks.start, - ":high" : blocks.end } )?; - tx.execute("DELETE FROM receipts WHERE block_hash IN (SELECT block_hash FROM blocks WHERE height >= :low AND height < :high)", - named_params! { - ":low": blocks.start, - ":high": blocks.end })?; - tx.execute( - "DELETE FROM blocks WHERE height >= :low AND height < :high", - named_params! { - ":low": blocks.start, - ":high": blocks.end }, - )?; - Ok(()) - }) - } -} - -pub fn get_checkpoint_filename + Debug>( - output_dir: P, - block: &Block, -) -> Result { - Ok(output_dir.as_ref().join(block.number().to_string())) -} - -/// Build checkpoint and write to disk. -/// A description of the data written can be found in docs/checkpoints -pub fn checkpoint_block_with_state + Debug>( - block: &Block, - transactions: &Vec, - parent: &Block, - state_trie_storage: TrieStorage, - shard_id: u64, - output_dir: P, -) -> Result<()> { - const VERSION: u32 = 3; - - fs::create_dir_all(&output_dir)?; - - let state_trie_storage = Arc::new(state_trie_storage); - // quick sanity check - if block.parent_hash() != parent.hash() { - return Err(anyhow!( - "Parent block parameter must match the checkpoint block's parent hash" - )); - } - - // Note: we ignore any existing file - let output_filename = get_checkpoint_filename(output_dir, block)?; - let temp_filename = output_filename.with_extension("part"); - let outfile_temp = File::create_new(&temp_filename)?; - let mut writer = BufWriter::with_capacity(8192 * 1024, outfile_temp); // 8 MiB chunks - - // write the header: - writer.write_all(&CHECKPOINT_HEADER_BYTES)?; // file identifier - writer.write_all(&VERSION.to_be_bytes())?; // 4 BE bytes for version - writer.write_all(&shard_id.to_be_bytes())?; // 8 BE bytes for shard ID - writer.write_all(b"\n")?; - - // write the block... - let block_ser = &bincode::serialize(&block)?; - writer.write_all(&u64::try_from(block_ser.len())?.to_be_bytes())?; - writer.write_all(block_ser)?; - - // write transactions - let transactions_ser = &bincode::serialize(&transactions)?; - writer.write_all(&u64::try_from(transactions_ser.len())?.to_be_bytes())?; - writer.write_all(transactions_ser)?; - - // and its parent, to keep the qc tracked - let parent_ser = &bincode::serialize(&parent)?; - writer.write_all(&u64::try_from(parent_ser.len())?.to_be_bytes())?; - writer.write_all(parent_ser)?; - - // then write state for each account - let accounts = - EthTrie::new(state_trie_storage.clone()).at_root(parent.state_root_hash().into()); - let account_storage = EthTrie::new(state_trie_storage); - let mut account_key_buf = [0u8; 32]; // save a few allocations, since account keys are fixed length - - for (key, serialised_account) in accounts.iter() { - // export the account itself - account_key_buf.copy_from_slice(&key); - writer.write_all(&account_key_buf)?; - - writer.write_all(&u64::try_from(serialised_account.len())?.to_be_bytes())?; - writer.write_all(&serialised_account)?; - - // now write the entire account storage map - let account_storage = account_storage - .at_root(bincode::deserialize::(&serialised_account)?.storage_root); - let mut account_storage_buf = vec![]; - for (storage_key, storage_val) in account_storage.iter() { - account_storage_buf.extend_from_slice(&u64::try_from(storage_key.len())?.to_be_bytes()); - account_storage_buf.extend_from_slice(&storage_key); - - account_storage_buf.extend_from_slice(&u64::try_from(storage_val.len())?.to_be_bytes()); - account_storage_buf.extend_from_slice(&storage_val); - } - writer.write_all(&u64::try_from(account_storage_buf.len())?.to_be_bytes())?; - writer.write_all(&account_storage_buf)?; - } - writer.flush()?; - - // lz4 compress and write to output - compress_file(&temp_filename, &output_filename)?; - - fs::remove_file(temp_filename)?; - - Ok(()) -} - -/// Read temp file, compress usign lz4, write into output file -fn compress_file + Debug>(input_file_path: P, output_file_path: P) -> Result<()> { - let mut reader = BufReader::new(File::open(input_file_path)?); - - let mut encoder = EncoderBuilder::new().build(File::create(output_file_path)?)?; - let mut buffer = [0u8; 1024 * 64]; // read 64KB chunks at a time - loop { - let bytes_read = reader.read(&mut buffer)?; // Read a chunk of decompressed data - if bytes_read == 0 { - break; // End of file - } - encoder.write_all(&buffer[..bytes_read])?; - } - encoder.finish().1?; - - Ok(()) -} - -/// Read lz4 compressed file and write into output file -fn decompress_file + Debug>(input_file_path: P, output_file_path: P) -> Result<()> { - let reader: BufReader = BufReader::new(File::open(input_file_path)?); - let mut decoder = Decoder::new(reader)?; - - let mut writer = BufWriter::new(File::create(output_file_path)?); - let mut buffer = [0u8; 1024 * 64]; // read 64KB chunks at a time - loop { - let bytes_read = decoder.read(&mut buffer)?; // Read a chunk of decompressed data - if bytes_read == 0 { - break; // End of file - } - writer.write_all(&buffer[..bytes_read])?; - } - - writer.flush()?; - - Ok(()) -} - -/// An implementor of [eth_trie::DB] which uses a [Connection] to persist data. -#[derive(Debug, Clone)] -pub struct TrieStorage { - db: Arc>, - cache: Arc, Vec>>>, -} - -impl eth_trie::DB for TrieStorage { - type Error = rusqlite::Error; - - fn get(&self, key: &[u8]) -> Result>, Self::Error> { - if let Some(cached) = self.cache.lock().unwrap().get(key).map(|v| v.to_vec()) { - return Ok(Some(cached)); - } - - let value: Option> = self - .db - .lock() - .unwrap() - .query_row( - "SELECT value FROM state_trie WHERE key = ?1", - [key], - |row| row.get(0), - ) - .optional()?; - - let mut cache = self.cache.lock().unwrap(); - if !cache.contains(key) { - if let Some(value) = &value { - let _ = cache.insert(key.to_vec(), value.clone()); - } - } - - Ok(value) - } - - fn insert(&self, key: &[u8], value: Vec) -> Result<(), Self::Error> { - self.db.lock().unwrap().execute( - "INSERT OR REPLACE INTO state_trie (key, value) VALUES (?1, ?2)", - (key, &value), - )?; - let _ = self.cache.lock().unwrap().insert(key.to_vec(), value); - Ok(()) - } - - fn insert_batch(&self, keys: Vec>, values: Vec>) -> Result<(), Self::Error> { - if keys.is_empty() { - return Ok(()); - } - - assert_eq!(keys.len(), values.len()); - - // https://www.sqlite.org/limits.html#max_variable_number - let maximum_sql_parameters = 32766; - // Each key-value pair needs two parameters. - let chunk_size = maximum_sql_parameters / 2; - - let keys = keys.chunks(chunk_size); - let values = values.chunks(chunk_size); - - for (keys, values) in keys.zip(values) { - // Generate the SQL substring of the form "(?1, ?2), (?3, ?4), (?5, ?6), ...". There will be one pair of - // parameters for each key. Note that parameters are one-indexed. - #[allow(unstable_name_collisions)] - let params_stmt: String = (0..keys.len()) - .map(|i| format!("(?{}, ?{})", i * 2 + 1, i * 2 + 2)) - .intersperse(",".to_owned()) - .collect(); - let query = - format!("INSERT OR REPLACE INTO state_trie (key, value) VALUES {params_stmt}"); - - let params = keys.iter().zip(values).flat_map(|(k, v)| [k, v]); - self.db - .lock() - .unwrap() - .execute(&query, rusqlite::params_from_iter(params))?; - for (key, value) in keys.iter().zip(values) { - let _ = self - .cache - .lock() - .unwrap() - .insert(key.to_vec(), value.to_vec()); - } - } - - Ok(()) - } - - fn remove(&self, _key: &[u8]) -> Result<(), Self::Error> { - // we keep old state to function as an archive node, therefore no-op - Ok(()) - } - - fn remove_batch(&self, _: &[Vec]) -> Result<(), Self::Error> { - // we keep old state to function as an archive node, therefore no-op - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use alloy::consensus::EMPTY_ROOT_HASH; - use rand::{ - distributions::{Distribution, Uniform}, - Rng, SeedableRng, - }; - use rand_chacha::ChaCha8Rng; - use tempfile::tempdir; - - use super::*; - use crate::{crypto::SecretKey, state::State}; - - #[test] - fn checkpoint_export_import() { - let base_path = tempdir().unwrap(); - let base_path = base_path.path(); - let db = Db::new(Some(base_path), 0, 1024).unwrap(); - - // Seed db with data - let mut rng = ChaCha8Rng::seed_from_u64(0); - let distribution = Uniform::new(1, 50); - let mut root_trie = EthTrie::new(Arc::new(db.state_trie().unwrap())); - for _ in 0..100 { - let account_address: [u8; 20] = rng.gen(); - let mut account_trie = EthTrie::new(Arc::new(db.state_trie().unwrap())); - let mut key = Vec::::with_capacity(50); - let mut value = Vec::::with_capacity(50); - for _ in 0..distribution.sample(&mut rng) { - for _ in 0..distribution.sample(&mut rng) { - key.push(rng.gen()); - } - for _ in 0..distribution.sample(&mut rng) { - value.push(rng.gen()); - } - account_trie.insert(&key, &value).unwrap(); - } - let account = Account { - storage_root: account_trie.root_hash().unwrap(), - ..Default::default() - }; - root_trie - .insert( - &State::account_key(account_address.into()).0, - &bincode::serialize(&account).unwrap(), - ) - .unwrap(); - } - - let state_hash = root_trie.root_hash().unwrap(); - let checkpoint_parent = Block::genesis(state_hash.into()); - // bit of a hack to generate a successor block - let mut qc2 = QuorumCertificate::genesis(); - qc2.block_hash = checkpoint_parent.hash(); - qc2.view = 1; - let checkpoint_block = Block::from_qc( - SecretKey::new().unwrap(), - 1, - 1, - qc2, - None, - state_hash.into(), - EMPTY_ROOT_HASH.into(), - EMPTY_ROOT_HASH.into(), - vec![], - SystemTime::now(), - EvmGas(0), - EvmGas(0), - ); - - let checkpoint_path = db.get_checkpoint_dir().unwrap().unwrap(); - - const SHARD_ID: u64 = 5000; - - let checkpoint_transactions = vec![]; - checkpoint_block_with_state( - &checkpoint_block, - &checkpoint_transactions, - &checkpoint_parent, - db.state_trie().unwrap(), - SHARD_ID, - &checkpoint_path, - ) - .unwrap(); - - // now load the checkpoint - let (block, transactions, parent) = db - .load_trusted_checkpoint( - checkpoint_path.join(checkpoint_block.number().to_string()), - &checkpoint_block.hash(), - SHARD_ID, - ) - .unwrap() - .unwrap(); - assert_eq!(checkpoint_block, block); - assert_eq!(checkpoint_transactions, transactions); - assert_eq!(checkpoint_parent, parent); - - // load the checkpoint again, to ensure idempotency - let (block, transactions, parent) = db - .load_trusted_checkpoint( - checkpoint_path.join(checkpoint_block.number().to_string()), - &checkpoint_block.hash(), - SHARD_ID, - ) - .unwrap() - .unwrap(); - assert_eq!(checkpoint_block, block); - assert_eq!(checkpoint_transactions, transactions); - assert_eq!(checkpoint_parent, parent); - - // Return None if checkpointed block already executed - db.insert_block(&checkpoint_block).unwrap(); - let result = db - .load_trusted_checkpoint( - checkpoint_path.join(checkpoint_block.number().to_string()), - &checkpoint_block.hash(), - SHARD_ID, - ) - .unwrap(); - assert!(result.is_none()); - } -} diff --git a/zilliqa/src/db/mod.rs b/zilliqa/src/db/mod.rs new file mode 100644 index 000000000..986583fc4 --- /dev/null +++ b/zilliqa/src/db/mod.rs @@ -0,0 +1,601 @@ +mod tables; + +use std::{ + fmt::Debug, + fs::{self, File}, + io::{BufReader, BufWriter, Read, Write}, + path::{Path, PathBuf}, + sync::{Arc, Mutex}, +}; + +use anyhow::{anyhow, Context, Result}; +use eth_trie::{EthTrie, Trie}; +use lru_mem::LruCache; +use lz4::{Decoder, EncoderBuilder}; +use redb::{backends::InMemoryBackend, Database}; +pub use tables::*; +use tracing::warn; + +use crate::{crypto::Hash, message::Block, state::Account, transaction::SignedTransaction}; + +const CHECKPOINT_HEADER_BYTES: [u8; 8] = *b"ZILCHKPT"; + +#[derive(Debug)] +pub struct Db { + db: Arc, + state_cache: Arc, Vec>>>, + path: Option>, +} + +pub trait ArcDb { + fn state_trie(&self) -> Result; + fn load_trusted_checkpoint>( + &self, + path: P, + hash: &Hash, + our_shard_id: u64, + ) -> Result, Block)>>; +} + +impl ArcDb for Arc { + fn state_trie(&self) -> Result { + Ok(TrieStorage { db: self.clone() }) + } + + /// Fetch checkpoint data from file and initialise db state + /// Return checkpointed block and transactions which must be executed after this function + /// Return None if checkpoint already loaded + fn load_trusted_checkpoint>( + &self, + path: P, + hash: &Hash, + our_shard_id: u64, + ) -> Result, Block)>> { + // For now, only support a single version: you want to load the latest checkpoint, anyway. + const SUPPORTED_VERSION: u32 = 3; + + // Decompress file and write to temp file + let input_filename = path.as_ref(); + let temp_filename = input_filename.with_extension("part"); + decompress_file(input_filename, &temp_filename)?; + + // Read decompressed file + let input = File::open(&temp_filename)?; + + let mut reader = BufReader::with_capacity(8192 * 1024, input); // 8 MiB read chunks + let trie_storage = Arc::new(self.state_trie()?); + let mut state_trie = EthTrie::new(trie_storage.clone()); + + // Decode and validate header + let mut header: [u8; 21] = [0u8; 21]; + reader.read_exact(&mut header)?; + let header = header; + if header[0..8] != CHECKPOINT_HEADER_BYTES // magic bytes + || header[20] != b'\n' + // header must end in newline + { + return Err(anyhow!("Invalid checkpoint file: invalid header")); + } + let version = u32::from_be_bytes(header[8..12].try_into()?); + // Only support a single version right now. + if version != SUPPORTED_VERSION { + return Err(anyhow!("Invalid checkpoint file: unsupported version.")); + } + let shard_id = u64::from_be_bytes(header[12..20].try_into()?); + if shard_id != our_shard_id { + return Err(anyhow!("Invalid checkpoint file: wrong shard ID.")); + } + + // Decode and validate checkpoint block, its transactions and parent block + let mut block_len_buf = [0u8; std::mem::size_of::()]; + reader.read_exact(&mut block_len_buf)?; + let mut block_ser = vec![0u8; usize::try_from(u64::from_be_bytes(block_len_buf))?]; + reader.read_exact(&mut block_ser)?; + let block: Block = bincode::deserialize(&block_ser)?; + if block.hash() != *hash { + return Err(anyhow!("Checkpoint does not match trusted hash")); + } + block.verify_hash()?; + + let mut transactions_len_buf = [0u8; std::mem::size_of::()]; + reader.read_exact(&mut transactions_len_buf)?; + let mut transactions_ser = + vec![0u8; usize::try_from(u64::from_be_bytes(transactions_len_buf))?]; + reader.read_exact(&mut transactions_ser)?; + let transactions: Vec = bincode::deserialize(&transactions_ser)?; + + let mut parent_len_buf = [0u8; std::mem::size_of::()]; + reader.read_exact(&mut parent_len_buf)?; + let mut parent_ser = vec![0u8; usize::try_from(u64::from_be_bytes(parent_len_buf))?]; + reader.read_exact(&mut parent_ser)?; + let parent: Block = bincode::deserialize(&parent_ser)?; + if block.parent_hash() != parent.hash() { + return Err(anyhow!("Invalid checkpoint file: parent's blockhash does not correspond to checkpoint block")); + } + + let read = self.read()?; + + if state_trie.iter().next().is_some() || read.blocks()?.max_canonical_by_view()?.is_some() { + // If checkpointed block already exists then assume checkpoint load already complete. Return None + if read.blocks()?.by_hash(block.hash())?.is_some() { + return Ok(None); + } + // This may not be strictly necessary, as in theory old values will, at worst, be orphaned + // values not part of any state trie of any known block. With some effort, this could + // even be supported. + // However, without such explicit support, having old blocks MAY in fact cause + // unexpected and unwanted behaviour. Thus we currently forbid loading a checkpoint in + // a node that already contains previous state, until (and unless) there's ever a + // usecase for going through the effort to support it and ensure it works as expected. + if let Some(db_block) = read.blocks()?.by_hash(parent.hash())? { + if db_block.parent_hash() != parent.parent_hash() { + return Err(anyhow!("Inconsistent checkpoint file: block loaded from checkpoint and block stored in database with same hash have differing parent hashes")); + } else { + // In this case, the database already has the block contained in this checkpoint. We assume the + // database contains the full state for that block too and thus return early, without actually + // loading the checkpoint file. + return Ok(Some((block, transactions, parent))); + } + } else { + return Err(anyhow!("Inconsistent checkpoint file: block loaded from checkpoint file does not exist in non-empty database")); + } + } + + // then decode state + loop { + // Read account key and the serialised Account + let mut account_hash = [0u8; 32]; + match reader.read_exact(&mut account_hash) { + // Read successful + Ok(_) => (), + // Break loop here if weve reached the end of the file + Err(ref e) if e.kind() == std::io::ErrorKind::UnexpectedEof => { + break; + } + Err(e) => return Err(e.into()), + }; + + let mut serialised_account_len_buf = [0u8; std::mem::size_of::()]; + reader.read_exact(&mut serialised_account_len_buf)?; + let mut serialised_account = + vec![0u8; usize::try_from(u64::from_be_bytes(serialised_account_len_buf))?]; + reader.read_exact(&mut serialised_account)?; + + // Read entire account storage as a buffer + let mut account_storage_len_buf = [0u8; std::mem::size_of::()]; + reader.read_exact(&mut account_storage_len_buf)?; + let account_storage_len = usize::try_from(u64::from_be_bytes(account_storage_len_buf))?; + let mut account_storage = vec![0u8; account_storage_len]; + reader.read_exact(&mut account_storage)?; + + // Pull out each storage key and value + let mut account_trie = EthTrie::new(trie_storage.clone()); + let mut pointer: usize = 0; + while account_storage_len > pointer { + let storage_key_len_buf: &[u8] = + &account_storage[pointer..(pointer + std::mem::size_of::())]; + let storage_key_len = + usize::try_from(u64::from_be_bytes(storage_key_len_buf.try_into()?))?; + pointer += std::mem::size_of::(); + let storage_key: &[u8] = &account_storage[pointer..(pointer + storage_key_len)]; + pointer += storage_key_len; + + let storage_val_len_buf: &[u8] = + &account_storage[pointer..(pointer + std::mem::size_of::())]; + let storage_val_len = + usize::try_from(u64::from_be_bytes(storage_val_len_buf.try_into()?))?; + pointer += std::mem::size_of::(); + let storage_val: &[u8] = &account_storage[pointer..(pointer + storage_val_len)]; + pointer += storage_val_len; + + account_trie.insert(storage_key, storage_val)?; + } + + let account_trie_root = + bincode::deserialize::(&serialised_account)?.storage_root; + if account_trie.root_hash()?.as_slice() != account_trie_root { + return Err(anyhow!( + "Invalid checkpoint file: account trie root hash mismatch: calculated {}, checkpoint file contained {}", hex::encode(account_trie.root_hash()?.as_slice()), hex::encode(account_trie_root) + )); + } + state_trie.insert(&account_hash, &serialised_account)?; + } + + if state_trie.root_hash()? != parent.state_root_hash().0 { + return Err(anyhow!("Invalid checkpoint file: state root hash mismatch")); + } + + let write = self.write()?; + write.blocks()?.insert(&parent)?; + write.finalized_view()?.set(parent.view())?; + write.high_qc()?.set(&block.header.qc)?; + write.view()?.set(parent.view() + 1)?; + write.commit()?; + + fs::remove_file(temp_filename)?; + + Ok(Some((block, transactions, parent))) + } +} + +impl Db { + pub fn new

(data_dir: Option

, shard_id: u64, state_cache_size: usize) -> Result + where + P: AsRef, + { + let (db, path) = match data_dir { + Some(path) => { + let path = path.as_ref().join(shard_id.to_string()); + fs::create_dir_all(&path).context(format!("Unable to create {path:?}"))?; + let db_path = path.join("db.redb"); + ( + Database::builder().create(db_path)?, + Some(path.into_boxed_path()), + ) + } + None => ( + Database::builder().create_with_backend(InMemoryBackend::new())?, + None, + ), + }; + + let db = Db { + db: Arc::new(db), + state_cache: Arc::new(Mutex::new(LruCache::new(state_cache_size))), + path, + }; + + // Ensure tables exist. + let write = db.write()?; + write.create_all()?; + write.commit()?; + + Ok(db) + } + + pub fn get_checkpoint_dir(&self) -> Result>> { + let Some(base_path) = &self.path else { + // If we don't have on-disk persistency, disable checkpoints too + warn!( + "Attempting to create checkpoint, but no persistence directory has been configured" + ); + return Ok(None); + }; + Ok(Some(base_path.join("checkpoints").into_boxed_path())) + } +} + +pub fn get_checkpoint_filename + Debug>( + output_dir: P, + block: &Block, +) -> Result { + Ok(output_dir.as_ref().join(block.number().to_string())) +} + +/// Build checkpoint and write to disk. +/// A description of the data written can be found in docs/checkpoints +pub fn checkpoint_block_with_state + Debug>( + block: &Block, + transactions: &Vec, + parent: &Block, + state_trie_storage: TrieStorage, + shard_id: u64, + output_dir: P, +) -> Result<()> { + const VERSION: u32 = 3; + + fs::create_dir_all(&output_dir)?; + + let state_trie_storage = Arc::new(state_trie_storage); + // quick sanity check + if block.parent_hash() != parent.hash() { + return Err(anyhow!( + "Parent block parameter must match the checkpoint block's parent hash" + )); + } + + // Note: we ignore any existing file + let output_filename = get_checkpoint_filename(output_dir, block)?; + let temp_filename = output_filename.with_extension("part"); + let outfile_temp = File::create_new(&temp_filename)?; + let mut writer = BufWriter::with_capacity(8192 * 1024, outfile_temp); // 8 MiB chunks + + // write the header: + writer.write_all(&CHECKPOINT_HEADER_BYTES)?; // file identifier + writer.write_all(&VERSION.to_be_bytes())?; // 4 BE bytes for version + writer.write_all(&shard_id.to_be_bytes())?; // 8 BE bytes for shard ID + writer.write_all(b"\n")?; + + // write the block... + let block_ser = &bincode::serialize(&block)?; + writer.write_all(&u64::try_from(block_ser.len())?.to_be_bytes())?; + writer.write_all(block_ser)?; + + // write transactions + let transactions_ser = &bincode::serialize(&transactions)?; + writer.write_all(&u64::try_from(transactions_ser.len())?.to_be_bytes())?; + writer.write_all(transactions_ser)?; + + // and its parent, to keep the qc tracked + let parent_ser = &bincode::serialize(&parent)?; + writer.write_all(&u64::try_from(parent_ser.len())?.to_be_bytes())?; + writer.write_all(parent_ser)?; + + // then write state for each account + let accounts = + EthTrie::new(state_trie_storage.clone()).at_root(parent.state_root_hash().into()); + let account_storage = EthTrie::new(state_trie_storage); + let mut account_key_buf = [0u8; 32]; // save a few allocations, since account keys are fixed length + + for (key, serialised_account) in accounts.iter() { + // export the account itself + account_key_buf.copy_from_slice(&key); + writer.write_all(&account_key_buf)?; + + writer.write_all(&u64::try_from(serialised_account.len())?.to_be_bytes())?; + writer.write_all(&serialised_account)?; + + // now write the entire account storage map + let account_storage = account_storage + .at_root(bincode::deserialize::(&serialised_account)?.storage_root); + let mut account_storage_buf = vec![]; + for (storage_key, storage_val) in account_storage.iter() { + account_storage_buf.extend_from_slice(&u64::try_from(storage_key.len())?.to_be_bytes()); + account_storage_buf.extend_from_slice(&storage_key); + + account_storage_buf.extend_from_slice(&u64::try_from(storage_val.len())?.to_be_bytes()); + account_storage_buf.extend_from_slice(&storage_val); + } + writer.write_all(&u64::try_from(account_storage_buf.len())?.to_be_bytes())?; + writer.write_all(&account_storage_buf)?; + } + writer.flush()?; + + // lz4 compress and write to output + compress_file(&temp_filename, &output_filename)?; + + fs::remove_file(temp_filename)?; + + Ok(()) +} + +/// Read temp file, compress usign lz4, write into output file +fn compress_file + Debug>(input_file_path: P, output_file_path: P) -> Result<()> { + let mut reader = BufReader::new(File::open(input_file_path)?); + + let mut encoder = EncoderBuilder::new().build(File::create(output_file_path)?)?; + let mut buffer = [0u8; 1024 * 64]; // read 64KB chunks at a time + loop { + let bytes_read = reader.read(&mut buffer)?; // Read a chunk of decompressed data + if bytes_read == 0 { + break; // End of file + } + encoder.write_all(&buffer[..bytes_read])?; + } + encoder.finish().1?; + + Ok(()) +} + +/// Read lz4 compressed file and write into output file +fn decompress_file + Debug>(input_file_path: P, output_file_path: P) -> Result<()> { + let reader: BufReader = BufReader::new(File::open(input_file_path)?); + let mut decoder = Decoder::new(reader)?; + + let mut writer = BufWriter::new(File::create(output_file_path)?); + let mut buffer = [0u8; 1024 * 64]; // read 64KB chunks at a time + loop { + let bytes_read = decoder.read(&mut buffer)?; // Read a chunk of decompressed data + if bytes_read == 0 { + break; // End of file + } + writer.write_all(&buffer[..bytes_read])?; + } + + writer.flush()?; + + Ok(()) +} + +/// An implementor of [eth_trie::DB] which uses a [Db] to persist data. +#[derive(Debug, Clone)] +pub struct TrieStorage { + db: Arc, +} + +impl eth_trie::DB for TrieStorage { + type Error = anyhow::Error; + + fn get(&self, key: &[u8]) -> Result>> { + if let Some(cached) = self + .db + .state_cache + .lock() + .unwrap() + .get(key) + .map(|v| v.to_vec()) + { + return Ok(Some(cached)); + } + + let value = self.db.read()?.state_trie()?.get(key)?; + + let mut cache = self.db.state_cache.lock().unwrap(); + if !cache.contains(key) { + if let Some(value) = &value { + let _ = cache.insert(key.to_vec(), value.clone()); + } + } + + Ok(value) + } + + fn insert(&self, key: &[u8], value: Vec) -> Result<(), Self::Error> { + let write = self.db.write()?; + write.state_trie()?.insert(key, &value)?; + write.commit()?; + let _ = self + .db + .state_cache + .lock() + .unwrap() + .insert(key.to_vec(), value); + Ok(()) + } + + fn insert_batch(&self, keys: Vec>, values: Vec>) -> Result<(), Self::Error> { + if keys.is_empty() { + return Ok(()); + } + + assert_eq!(keys.len(), values.len()); + + let write = self.db.write()?; + for (key, value) in keys.into_iter().zip(values) { + write.state_trie()?.insert(&key, &value)?; + let _ = self.db.state_cache.lock().unwrap().insert(key, value); + } + write.commit()?; + + Ok(()) + } + + fn remove(&self, _key: &[u8]) -> Result<(), Self::Error> { + // we keep old state to function as an archive node, therefore no-op + Ok(()) + } + + fn remove_batch(&self, _: &[Vec]) -> Result<(), Self::Error> { + // we keep old state to function as an archive node, therefore no-op + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use alloy::consensus::EMPTY_ROOT_HASH; + use rand::{ + distributions::{Distribution, Uniform}, + Rng, SeedableRng, + }; + use rand_chacha::ChaCha8Rng; + use tempfile::tempdir; + + use super::*; + use crate::{ + crypto::SecretKey, message::QuorumCertificate, state::State, time::SystemTime, + transaction::EvmGas, + }; + + #[test] + fn checkpoint_export_import() { + let base_path = tempdir().unwrap(); + let base_path = base_path.path(); + let db = Arc::new(Db::new(Some(base_path), 0, 1024).unwrap()); + + // Seed db with data + let mut rng = ChaCha8Rng::seed_from_u64(0); + let distribution = Uniform::new(1, 50); + let mut root_trie = EthTrie::new(Arc::new(db.state_trie().unwrap())); + for _ in 0..100 { + let account_address: [u8; 20] = rng.gen(); + let mut account_trie = EthTrie::new(Arc::new(db.state_trie().unwrap())); + let mut key = Vec::::with_capacity(50); + let mut value = Vec::::with_capacity(50); + for _ in 0..distribution.sample(&mut rng) { + for _ in 0..distribution.sample(&mut rng) { + key.push(rng.gen()); + } + for _ in 0..distribution.sample(&mut rng) { + value.push(rng.gen()); + } + account_trie.insert(&key, &value).unwrap(); + } + let account = Account { + storage_root: account_trie.root_hash().unwrap(), + ..Default::default() + }; + root_trie + .insert( + &State::account_key(account_address.into()).0, + &bincode::serialize(&account).unwrap(), + ) + .unwrap(); + } + + let state_hash = root_trie.root_hash().unwrap(); + let checkpoint_parent = Block::genesis(state_hash.into()); + // bit of a hack to generate a successor block + let mut qc2 = QuorumCertificate::genesis(); + qc2.block_hash = checkpoint_parent.hash(); + qc2.view = 1; + let checkpoint_block = Block::from_qc( + SecretKey::new().unwrap(), + 1, + 1, + qc2, + None, + state_hash.into(), + EMPTY_ROOT_HASH.into(), + EMPTY_ROOT_HASH.into(), + vec![], + SystemTime::now(), + EvmGas(0), + EvmGas(0), + ); + + let checkpoint_path = db.get_checkpoint_dir().unwrap().unwrap(); + + const SHARD_ID: u64 = 5000; + + let checkpoint_transactions = vec![]; + checkpoint_block_with_state( + &checkpoint_block, + &checkpoint_transactions, + &checkpoint_parent, + db.state_trie().unwrap(), + SHARD_ID, + &checkpoint_path, + ) + .unwrap(); + + // now load the checkpoint + let (block, transactions, parent) = db + .load_trusted_checkpoint( + checkpoint_path.join(checkpoint_block.number().to_string()), + &checkpoint_block.hash(), + SHARD_ID, + ) + .unwrap() + .unwrap(); + assert_eq!(checkpoint_block, block); + assert_eq!(checkpoint_transactions, transactions); + assert_eq!(checkpoint_parent, parent); + + // load the checkpoint again, to ensure idempotency + let (block, transactions, parent) = db + .load_trusted_checkpoint( + checkpoint_path.join(checkpoint_block.number().to_string()), + &checkpoint_block.hash(), + SHARD_ID, + ) + .unwrap() + .unwrap(); + assert_eq!(checkpoint_block, block); + assert_eq!(checkpoint_transactions, transactions); + assert_eq!(checkpoint_parent, parent); + + // Return None if checkpointed block already executed + let write = db.write().unwrap(); + write.blocks().unwrap().insert(&checkpoint_block).unwrap(); + write.commit().unwrap(); + let result = db + .load_trusted_checkpoint( + checkpoint_path.join(checkpoint_block.number().to_string()), + &checkpoint_block.hash(), + SHARD_ID, + ) + .unwrap(); + assert!(result.is_none()); + } +} diff --git a/zilliqa/src/db/tables.rs b/zilliqa/src/db/tables.rs new file mode 100644 index 000000000..687f44d66 --- /dev/null +++ b/zilliqa/src/db/tables.rs @@ -0,0 +1,536 @@ +//! This module defines the tables in our database and provides the abstractions used to interact with them. +//! +//! Each logical table may be backed by one or more concrete tables, for additional indices. + +#![allow(clippy::type_complexity)] + +use std::time::Duration; + +use anyhow::{anyhow, Result}; +use redb::{ + Durability, MultimapTable, MultimapTableDefinition, ReadOnlyMultimapTable, ReadOnlyTable, + ReadTransaction, ReadableMultimapTable, ReadableTable, ReadableTableMetadata, Table, + TableDefinition, WriteTransaction, +}; +use revm::primitives::Address; + +use super::Db; +use crate::{ + crypto::Hash, + message::{Block, QuorumCertificate}, + time::SystemTime, + transaction::{SignedTransaction, TransactionReceipt}, +}; + +// Each logical table consists of: +// 1. The `TableDefinition`s backing this table. +// 2. A table `struct` which contains the methods to access this table. The struct is generic, but in practice only +// takes two possible values - One returned by `ReadTx` and one returned by `WriteTx`. The concrete table consists of +// a tuple of the opened `redb` tables. +// 3. An `impl` block which contains all the read-only methods for the table. The implementation is generic over the +// `ReadableTable` trait, which means the methods are callable on both `ReadTx`s and `WriteTx`s. +// 4. An `impl` block which contains all the write-only methods for the table. The implementation uses the concrete +// mutable `Table` types and thus is only callable on a `WriteTx`. + +// blocks: view -> block +// blocks_hash_index: hash -> view +// blocks_height_index: height -> [view] +// block_is_canonical: view -> bool +const BLOCKS: TableDefinition> = TableDefinition::new("blocks"); +const BLOCKS_HASH_INDEX: TableDefinition<&[u8; 32], u64> = + TableDefinition::new("blocks_hash_index"); +const BLOCKS_HEIGHT_INDEX: MultimapTableDefinition = + MultimapTableDefinition::new("blocks_height_index"); +const BLOCK_IS_CANONICAL: TableDefinition = TableDefinition::new("block_is_canonical"); + +pub struct BlocksTable(T); + +impl BlocksTable<(T1, T2, T3, T4)> +where + T1: ReadableTable>, + T2: ReadableTable<&'static [u8; 32], u64>, + T3: ReadableMultimapTable, + T4: ReadableTable, +{ + pub fn by_view(&self, view: u64) -> Result> { + let Some(block) = self.0 .0.get(view)? else { + return Ok(None); + }; + Ok(Some(bincode::deserialize(&block.value())?)) + } + + pub fn max_canonical_by_view(&self) -> Result> { + // Search the `block_is_canonical` table in reverse until we find the canonical block with the maximum view. + for kv in self.0 .3.iter()?.rev() { + let (view, canonical) = kv?; + if canonical.value() { + let view = view.value(); + return self.by_view(view); + } + } + + // There are no canonical blocks. + Ok(None) + } + + pub fn min_by_view(&self) -> Result> { + let Some((_, block)) = self.0 .0.first()? else { + return Ok(None); + }; + Ok(Some(bincode::deserialize(&block.value())?)) + } + + pub fn max_canonical_by_view_count(&self, count: usize) -> Result> { + let mut blocks = Vec::with_capacity(count); + for kv in self.0 .3.iter()?.rev() { + let (view, canonical) = kv?; + if canonical.value() { + let view = view.value(); + blocks.push(self.by_view(view)?.ok_or(anyhow!("missing block"))?); + } + } + Ok(blocks) + } + + pub fn by_hash(&self, hash: Hash) -> Result> { + let Some(view) = self.0 .1.get(&hash.0)? else { + return Ok(None); + }; + self.by_view(view.value()) + } + + pub fn canonical_by_height(&self, height: u64) -> Result> { + for view in self.0 .2.get(height)? { + // Check if this block is canonical. + let view = view?.value(); + let canonical = self + .0 + .3 + .get(view)? + .ok_or(anyhow!("missing canonical"))? + .value(); + if canonical { + return self.by_view(view); + } + } + + Ok(None) + } + + pub fn contains(&self, view: u64) -> Result { + Ok(self.0 .0.get(view)?.is_some()) + } +} + +impl + BlocksTable<( + Table<'_, u64, Vec>, + Table<'_, &[u8; 32], u64>, + MultimapTable<'_, u64, u64>, + Table<'_, u64, bool>, + )> +{ + pub fn insert(&mut self, block: &Block) -> Result<()> { + self.0 .0.insert(block.view(), bincode::serialize(block)?)?; + self.0 .1.insert(&block.hash().0, block.view())?; + self.0 .2.insert(block.number(), block.view())?; + self.0 .3.insert(block.view(), true)?; + Ok(()) + } + + pub fn delete(&mut self, view: u64) -> Result<()> { + let Some(block) = self.0 .0.remove(view)? else { + return Ok(()); + }; + let block: Block = bincode::deserialize(&block.value())?; + self.0 .1.remove(&block.hash().0)?; + self.0 .2.remove(block.number(), block.view())?; + self.0 .3.remove(block.view())?; + Ok(()) + } + + pub fn set_canonical(&mut self, view: u64) -> Result<()> { + self.0 .3.insert(view, true)?; + Ok(()) + } + + pub fn set_non_canonical(&mut self, view: u64) -> Result<()> { + self.0 .3.insert(view, false)?; + Ok(()) + } +} + +const TRANSACTIONS: TableDefinition<&[u8; 32], Vec> = TableDefinition::new("transactions"); + +pub struct TransactionsTable(T); + +impl>> TransactionsTable { + pub fn get(&self, hash: Hash) -> Result> { + let Some(txn) = self.0.get(&hash.0)? else { + return Ok(None); + }; + Ok(Some(bincode::deserialize(&txn.value())?)) + } + + pub fn contains(&self, hash: Hash) -> Result { + Ok(self.0.get(&hash.0)?.is_some()) + } +} + +impl TransactionsTable { + pub fn count(&self) -> Result { + Ok(self.0.len()?) + } +} + +impl TransactionsTable>> { + pub fn insert(&mut self, hash: Hash, txn: &SignedTransaction) -> Result<()> { + self.0.insert(&hash.0, bincode::serialize(&txn)?)?; + Ok(()) + } + + fn delete(&mut self, hash: Hash) -> Result<()> { + self.0.remove(&hash.0)?; + Ok(()) + } +} + +const RECEIPTS: TableDefinition<&[u8; 32], Vec> = TableDefinition::new("receipts"); + +pub struct ReceiptsTable(T); + +impl>> ReceiptsTable { + pub fn get(&self, hash: Hash) -> Result> { + let Some(txn) = self.0.get(&hash.0)? else { + return Ok(None); + }; + Ok(Some(bincode::deserialize(&txn.value())?)) + } +} + +impl ReceiptsTable>> { + pub fn insert(&mut self, receipt: &TransactionReceipt) -> Result<()> { + self.0 + .insert(&receipt.tx_hash.0, bincode::serialize(receipt)?)?; + Ok(()) + } + + fn delete(&mut self, hash: Hash) -> Result<()> { + self.0.remove(&hash.0)?; + Ok(()) + } +} + +// touched_address_index: address -> [(index, txn_hash)] +// The index of each entry is contiguous. This ensures values are returned in the same order they were inserted. +// touched_address_reverse_index: txn_hash -> (index, address) +const TOUCHED_ADDRESS_INDEX: MultimapTableDefinition<&[u8; 20], (u64, &[u8; 32])> = + MultimapTableDefinition::new("touched_address_index"); +const TOUCHED_ADDRESS_REVERSE_INDEX: TableDefinition<&[u8; 32], (u64, &[u8; 20])> = + TableDefinition::new("touched_address_reverse_index"); + +pub struct TouchedAddressIndex(T); + +impl TouchedAddressIndex<(T1, T2)> +where + T1: ReadableMultimapTable<&'static [u8; 20], (u64, &'static [u8; 32])>, + T2: ReadableTable<&'static [u8; 32], (u64, &'static [u8; 20])>, +{ + pub fn get(&self, address: Address) -> Result> { + let hashes = self.0 .0.get(&<[u8; 20]>::from(address))?; + hashes + .map(|hash| Ok(Hash(*hash?.value().1))) + .collect::>() + } +} + +impl + TouchedAddressIndex<( + MultimapTable<'_, &[u8; 20], (u64, &[u8; 32])>, + Table<'_, &[u8; 32], (u64, &[u8; 20])>, + )> +{ + pub fn insert(&mut self, address: Address, txn_hash: Hash) -> Result<()> { + let key = &<[u8; 20]>::from(address); + let next_index = self + .0 + .0 + .get(key)? + .next_back() + .map(|value| Ok::<_, redb::Error>(value?.value().0 + 1)) + .transpose()? + .unwrap_or(0); + + self.0 .0.insert(key, (next_index, &txn_hash.0))?; + self.0 .1.insert(&txn_hash.0, (next_index, key))?; + Ok(()) + } + + pub fn delete_by_txn_hash(&mut self, txn_hash: Hash) -> Result<()> { + let Some(value) = self.0 .1.remove(&txn_hash.0)? else { + return Ok(()); + }; + let (index, address) = value.value(); + self.0 .0.remove(address, (index, &txn_hash.0))?; + Ok(()) + } +} + +const FINALIZED_VIEW: TableDefinition<(), u64> = TableDefinition::new("finalized_view"); + +pub struct FinalizedViewTable(T); + +impl> FinalizedViewTable { + pub fn get(&self) -> Result> { + Ok(self.0.get(())?.map(|v| v.value())) + } +} + +impl FinalizedViewTable> { + pub fn set(&mut self, finalized_view: u64) -> Result<()> { + self.0.insert((), finalized_view)?; + Ok(()) + } +} + +const VIEW: TableDefinition<(), u64> = TableDefinition::new("view"); + +pub struct ViewTable(T); + +impl> ViewTable { + pub fn get(&self) -> Result> { + Ok(self.0.get(())?.map(|v| v.value())) + } +} + +impl ViewTable> { + /// Sets the provided view if it is greater than the existing view. Returns true if the value was updated. + pub fn set(&mut self, view: u64) -> Result { + let current = self.get()?; + let update = current.map(|c| view > c).unwrap_or(true); + if update { + self.0.insert((), view)?; + } + Ok(update) + } +} + +const HIGH_QC: TableDefinition<(), (Vec, u64, u32)> = TableDefinition::new("high_qc"); + +pub struct HighQcTable(T); + +impl, u64, u32)>> HighQcTable { + pub fn get(&self) -> Result> { + let Some(value) = self.0.get(())? else { + return Ok(None); + }; + let (high_qc, updated_at_secs, updated_at_subsec_nanos) = value.value(); + let high_qc = bincode::deserialize(&high_qc)?; + let high_qc_updated_at = + SystemTime::UNIX_EPOCH + Duration::new(updated_at_secs, updated_at_subsec_nanos); + Ok(Some((high_qc, high_qc_updated_at))) + } +} + +impl HighQcTable, u64, u32)>> { + pub fn set(&mut self, high_qc: &QuorumCertificate) -> Result<()> { + let high_qc = bincode::serialize(high_qc)?; + let high_qc_updated_at = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?; + self.0.insert( + (), + ( + high_qc, + high_qc_updated_at.as_secs(), + high_qc_updated_at.subsec_nanos(), + ), + )?; + Ok(()) + } +} + +const STATE_TRIE: TableDefinition<&[u8; 32], Vec> = TableDefinition::new("state_trie"); + +pub struct StateTrieTable(T); + +impl>> StateTrieTable { + pub fn get(&self, key: &[u8]) -> Result>> { + Ok(self.0.get(&<[u8; 32]>::try_from(key)?)?.map(|v| v.value())) + } +} + +impl StateTrieTable>> { + pub fn insert(&mut self, key: &[u8], value: &Vec) -> Result<()> { + self.0.insert(&<[u8; 32]>::try_from(key)?, value)?; + Ok(()) + } +} + +impl Db { + /// Begin a read transaction. + /// + /// Captures a snapshot of the database, so that only data committed before calling this method is visible in the + /// transaction. + /// + /// Read transactions may exist concurrently with writes. + pub fn read(&self) -> Result { + Ok(TxRead(self.db.begin_read()?)) + } + + /// Begin a write transaction. + /// + /// Only a single write may be in progress at a time. If a write is in progress, this function will block until it + /// completes. + /// + /// You must call `[TxWrite::commit]` to persist the writes performed in this transaction. After committting, all + /// writes will be visible to future transactions. + pub fn write(&self) -> Result { + Ok(TxWrite(self.db.begin_write()?)) + } + + /// Like [Db::write] but writes will only be eventually durable. + pub fn write_non_durable(&self) -> Result { + let mut tx = self.db.begin_write()?; + tx.set_durability(Durability::Eventual); + Ok(TxWrite(tx)) + } +} + +pub struct TxRead(ReadTransaction); + +impl TxRead { + pub fn blocks( + &self, + ) -> Result< + BlocksTable<( + ReadOnlyTable>, + ReadOnlyTable<&'static [u8; 32], u64>, + ReadOnlyMultimapTable, + ReadOnlyTable, + )>, + > { + let blocks = self.0.open_table(BLOCKS)?; + let blocks_hash_index = self.0.open_table(BLOCKS_HASH_INDEX)?; + let blocks_height_index = self.0.open_multimap_table(BLOCKS_HEIGHT_INDEX)?; + let block_is_canonical = self.0.open_table(BLOCK_IS_CANONICAL)?; + Ok(BlocksTable(( + blocks, + blocks_hash_index, + blocks_height_index, + block_is_canonical, + ))) + } + pub fn transactions( + &self, + ) -> Result>>> { + Ok(TransactionsTable(self.0.open_table(TRANSACTIONS)?)) + } + pub fn receipts(&self) -> Result>>> { + Ok(ReceiptsTable(self.0.open_table(RECEIPTS)?)) + } + pub fn touched_address_index( + &self, + ) -> Result< + TouchedAddressIndex<( + ReadOnlyMultimapTable<&'static [u8; 20], (u64, &'static [u8; 32])>, + ReadOnlyTable<&'static [u8; 32], (u64, &'static [u8; 20])>, + )>, + > { + let index = self.0.open_multimap_table(TOUCHED_ADDRESS_INDEX)?; + let reverse_index = self.0.open_table(TOUCHED_ADDRESS_REVERSE_INDEX)?; + Ok(TouchedAddressIndex((index, reverse_index))) + } + pub fn finalized_view(&self) -> Result>> { + Ok(FinalizedViewTable(self.0.open_table(FINALIZED_VIEW)?)) + } + pub fn view(&self) -> Result>> { + Ok(ViewTable(self.0.open_table(VIEW)?)) + } + pub fn high_qc(&self) -> Result, u64, u32)>>> { + Ok(HighQcTable(self.0.open_table(HIGH_QC)?)) + } + pub fn state_trie(&self) -> Result>>> { + Ok(StateTrieTable(self.0.open_table(STATE_TRIE)?)) + } +} + +pub struct TxWrite(WriteTransaction); + +impl TxWrite { + pub fn commit(self) -> Result<()> { + self.0.commit()?; + Ok(()) + } + + pub fn blocks( + &self, + ) -> Result< + BlocksTable<( + Table>, + Table<&'static [u8; 32], u64>, + MultimapTable, + Table, + )>, + > { + let blocks = self.0.open_table(BLOCKS)?; + let blocks_hash_index = self.0.open_table(BLOCKS_HASH_INDEX)?; + let blocks_height_index = self.0.open_multimap_table(BLOCKS_HEIGHT_INDEX)?; + let block_is_canonical = self.0.open_table(BLOCK_IS_CANONICAL)?; + Ok(BlocksTable(( + blocks, + blocks_hash_index, + blocks_height_index, + block_is_canonical, + ))) + } + pub fn transactions(&self) -> Result>>> { + Ok(TransactionsTable(self.0.open_table(TRANSACTIONS)?)) + } + pub fn receipts(&self) -> Result>>> { + Ok(ReceiptsTable(self.0.open_table(RECEIPTS)?)) + } + pub fn touched_address_index( + &self, + ) -> Result< + TouchedAddressIndex<( + MultimapTable<&'static [u8; 20], (u64, &'static [u8; 32])>, + Table<&'static [u8; 32], (u64, &'static [u8; 20])>, + )>, + > { + let index = self.0.open_multimap_table(TOUCHED_ADDRESS_INDEX)?; + let reverse_index = self.0.open_table(TOUCHED_ADDRESS_REVERSE_INDEX)?; + Ok(TouchedAddressIndex((index, reverse_index))) + } + pub fn finalized_view(&self) -> Result>> { + Ok(FinalizedViewTable(self.0.open_table(FINALIZED_VIEW)?)) + } + pub fn view(&self) -> Result>> { + Ok(ViewTable(self.0.open_table(VIEW)?)) + } + pub fn high_qc(&self) -> Result, u64, u32)>>> { + Ok(HighQcTable(self.0.open_table(HIGH_QC)?)) + } + pub fn state_trie(&self) -> Result>>> { + Ok(StateTrieTable(self.0.open_table(STATE_TRIE)?)) + } + + /// Ensure all tables are created. + pub fn create_all(&self) -> Result<()> { + self.blocks()?; + self.transactions()?; + self.receipts()?; + self.touched_address_index()?; + self.finalized_view()?; + self.view()?; + self.high_qc()?; + self.state_trie()?; + Ok(()) + } + + /// Convenience method for deleting all references to a transaction. + pub fn delete_transaction(&self, txn_hash: Hash) -> Result<()> { + self.transactions()?.delete(txn_hash)?; + self.receipts()?.delete(txn_hash)?; + self.touched_address_index()?.delete_by_txn_hash(txn_hash)?; + Ok(()) + } +} diff --git a/zilliqa/src/exec.rs b/zilliqa/src/exec.rs index 04a3644af..0af2955bd 100644 --- a/zilliqa/src/exec.rs +++ b/zilliqa/src/exec.rs @@ -1066,9 +1066,11 @@ impl PendingState { } pub fn get_highest_canonical_block_number(&self) -> Result> { - self.pre_state + Ok(self + .pre_state .block_store - .get_highest_canonical_block_number() + .get_highest_block()? + .map(|b| b.number())) } pub fn load_account(&mut self, address: Address) -> Result<&mut PendingAccount> { diff --git a/zilliqa/src/node.rs b/zilliqa/src/node.rs index a090b893d..1f486bdd5 100644 --- a/zilliqa/src/node.rs +++ b/zilliqa/src/node.rs @@ -434,10 +434,11 @@ impl Node { BlockNumberOrTag::Latest => Ok(Some(self.consensus.head_block())), BlockNumberOrTag::Pending => self.consensus.get_pending_block(), BlockNumberOrTag::Finalized => { - let Some(view) = self.db.get_finalized_view()? else { + let read = self.db.read()?; + let Some(view) = read.finalized_view()?.get()? else { return self.resolve_block_number(BlockNumberOrTag::Earliest); }; - let Some(block) = self.db.get_block_by_view(view)? else { + let Some(block) = read.blocks()?.by_view(view)? else { return self.resolve_block_number(BlockNumberOrTag::Earliest); }; Ok(Some(block)) @@ -858,13 +859,6 @@ impl Node { self.consensus.head_block().header.number } - pub fn get_transaction_receipts_in_block( - &self, - block_hash: Hash, - ) -> Result> { - self.db.get_transaction_receipts_in_block(&block_hash) - } - pub fn get_finalized_height(&self) -> Result { self.consensus.get_finalized_view() } diff --git a/zilliqa/src/pool.rs b/zilliqa/src/pool.rs index 183fcab77..120e8aa66 100644 --- a/zilliqa/src/pool.rs +++ b/zilliqa/src/pool.rs @@ -412,7 +412,7 @@ mod tests { block_store::BlockStore, cfg::{ConsensusConfig, NodeConfig, *}, crypto::Hash, - db::Db, + db::{ArcDb, Db}, node::{MessageSender, RequestId}, state::State, transaction::{EvmGas, SignedTransaction, TxIntershard, VerifiedTransaction},