diff --git a/sync/src/types/header_map/backend.rs b/sync/src/types/header_map/backend.rs deleted file mode 100644 index befc1986828..00000000000 --- a/sync/src/types/header_map/backend.rs +++ /dev/null @@ -1,23 +0,0 @@ -use std::path; - -use ckb_types::packed::Byte32; - -use crate::types::HeaderView; - -pub(crate) trait KeyValueBackend { - fn new

(tmpdir: Option

) -> Self - where - P: AsRef; - - fn len(&self) -> usize; - fn is_empty(&self) -> bool { - self.len() == 0 - } - - fn contains_key(&self, key: &Byte32) -> bool; - fn get(&self, key: &Byte32) -> Option; - fn insert(&self, value: &HeaderView) -> Option<()>; - fn insert_batch(&self, values: &[HeaderView]); - fn remove(&self, key: &Byte32) -> Option; - fn remove_no_return(&self, key: &Byte32); -} diff --git a/sync/src/types/header_map/backend_sled.rs b/sync/src/types/header_map/backend_sled.rs deleted file mode 100644 index a41523d1864..00000000000 --- a/sync/src/types/header_map/backend_sled.rs +++ /dev/null @@ -1,104 +0,0 @@ -use super::KeyValueBackend; -use crate::types::HeaderView; -use ckb_types::{packed::Byte32, prelude::*}; -use sled::Db; -use std::path; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering; -use tempfile::TempDir; - -pub(crate) struct SledBackend { - count: AtomicUsize, - db: Db, - _tmpdir: TempDir, -} - -impl KeyValueBackend for SledBackend { - fn new

(tmp_path: Option

) -> Self - where - P: AsRef, - { - let mut builder = tempfile::Builder::new(); - builder.prefix("ckb-tmp-"); - let tmpdir = if let Some(ref path) = tmp_path { - builder.tempdir_in(path) - } else { - builder.tempdir() - } - .expect("failed to create a tempdir to save header map into disk"); - - let db: Db = sled::open(tmpdir.path()) - .expect("failed to open a key-value database to save header map into disk"); - Self { - db, - _tmpdir: tmpdir, - count: AtomicUsize::new(0), - } - } - - fn len(&self) -> usize { - self.count.load(Ordering::SeqCst) - } - - fn contains_key(&self, key: &Byte32) -> bool { - self.db - .contains_key(key.as_slice()) - .expect("sled contains_key") - } - - fn get(&self, key: &Byte32) -> Option { - self.db - .get(key.as_slice()) - .unwrap_or_else(|err| panic!("read header map from disk should be ok, but {err}")) - .map(|slice| HeaderView::from_slice_should_be_ok(slice.as_ref())) - } - - fn insert(&self, value: &HeaderView) -> Option<()> { - let key = value.hash(); - let last_value = self - .db - .insert(key.as_slice(), value.to_vec()) - .expect("failed to insert item to sled"); - if last_value.is_none() { - self.count.fetch_add(1, Ordering::SeqCst); - } - last_value.map(|_| ()) - } - - fn insert_batch(&self, values: &[HeaderView]) { - let mut count = 0; - for value in values { - let key = value.hash(); - let last_value = self - .db - .insert(key.as_slice(), value.to_vec()) - .expect("failed to insert item to sled"); - if last_value.is_none() { - count += 1; - } - } - self.count.fetch_add(count, Ordering::SeqCst); - } - - fn remove(&self, key: &Byte32) -> Option { - let old_value = self - .db - .remove(key.as_slice()) - .expect("failed to remove item from sled"); - - old_value.map(|slice| { - self.count.fetch_sub(1, Ordering::SeqCst); - HeaderView::from_slice_should_be_ok(&slice) - }) - } - - fn remove_no_return(&self, key: &Byte32) { - let old_value = self - .db - .remove(key.as_slice()) - .expect("failed to remove item from sled"); - if old_value.is_some() { - self.count.fetch_sub(1, Ordering::SeqCst); - } - } -} diff --git a/sync/src/types/header_map/kernel_lru.rs b/sync/src/types/header_map/kernel_lru.rs deleted file mode 100644 index 69eb1e8f8c0..00000000000 --- a/sync/src/types/header_map/kernel_lru.rs +++ /dev/null @@ -1,236 +0,0 @@ -use std::path; - -#[cfg(feature = "stats")] -use ckb_logger::trace; -#[cfg(feature = "stats")] -use ckb_util::{Mutex, MutexGuard}; - -use ckb_types::packed::Byte32; - -use super::{KeyValueBackend, MemoryMap}; -use crate::types::HeaderView; - -pub(crate) struct HeaderMapKernel -where - Backend: KeyValueBackend, -{ - pub(crate) memory: MemoryMap, - pub(crate) backend: Backend, - // Configuration - memory_limit: usize, - // Statistics - #[cfg(feature = "stats")] - stats: Mutex, -} - -#[cfg(feature = "stats")] -#[derive(Default)] -struct HeaderMapKernelStats { - frequency: usize, - - trace_progress: usize, - - primary_contain: usize, - primary_select: usize, - primary_insert: usize, - primary_delete: usize, - - backend_contain: usize, - backend_delete: usize, -} - -impl HeaderMapKernel -where - Backend: KeyValueBackend, -{ - pub(crate) fn new

(tmpdir: Option

, memory_limit: usize) -> Self - where - P: AsRef, - { - let memory = Default::default(); - let backend = Backend::new(tmpdir); - - #[cfg(not(feature = "stats"))] - { - Self { - memory, - backend, - memory_limit, - } - } - - #[cfg(feature = "stats")] - { - Self { - memory, - backend, - memory_limit, - stats: Mutex::new(HeaderMapKernelStats::new(50_000)), - } - } - } - - pub(crate) fn contains_key(&self, hash: &Byte32) -> bool { - #[cfg(feature = "stats")] - { - self.stats().tick_primary_contain(); - } - if self.memory.contains_key(hash) { - return true; - } - if self.backend.is_empty() { - return false; - } - #[cfg(feature = "stats")] - { - self.stats().tick_backend_contain(); - } - self.backend.contains_key(hash) - } - - pub(crate) fn get(&self, hash: &Byte32) -> Option { - #[cfg(feature = "stats")] - { - self.stats().tick_primary_select(); - } - if let Some(view) = self.memory.get_refresh(hash) { - return Some(view); - } - if self.backend.is_empty() { - return None; - } - #[cfg(feature = "stats")] - { - self.stats().tick_backend_delete(); - } - if let Some(view) = self.backend.remove(hash) { - #[cfg(feature = "stats")] - { - self.stats().tick_primary_insert(); - } - self.memory.insert(view.hash(), view.clone()); - Some(view) - } else { - None - } - } - - pub(crate) fn insert(&self, view: HeaderView) -> Option<()> { - #[cfg(feature = "stats")] - { - self.trace(); - self.stats().tick_primary_insert(); - } - self.memory.insert(view.hash(), view) - } - - pub(crate) fn remove(&self, hash: &Byte32) { - #[cfg(feature = "stats")] - { - self.trace(); - self.stats().tick_primary_delete(); - } - self.memory.remove(hash); - if self.backend.is_empty() { - return; - } - self.backend.remove_no_return(hash); - } - - pub(crate) fn limit_memory(&self) { - if let Some(values) = self.memory.front_n(self.memory_limit) { - tokio::task::block_in_place(|| { - self.backend.insert_batch(&values); - }); - self.memory - .remove_batch(values.iter().map(|value| value.hash())); - } - } - - #[cfg(feature = "stats")] - fn trace(&self) { - let mut stats = self.stats(); - let progress = stats.trace_progress(); - let frequency = stats.frequency(); - if progress % frequency == 0 { - trace!( - "Header Map Statistics\ - \n>\t| storage | length | limit | contain | select | insert | delete |\ - \n>\t|---------+---------+---------+---------+------------+---------+---------|\ - \n>\t| memory |{:>9}|{:>9}|{:>9}|{:>12}|{:>9}|{:>9}|\ - \n>\t| backend |{:>9}|{:>9}|{:>9}|{:>12}|{:>9}|{:>9}|\ - ", - self.memory.len(), - self.memory_limit, - stats.primary_contain, - stats.primary_select, - stats.primary_insert, - stats.primary_delete, - self.backend.len(), - '-', - stats.backend_contain, - '-', - '-', - stats.backend_delete, - ); - stats.trace_progress_reset(); - } else { - stats.trace_progress_tick(); - } - } - - #[cfg(feature = "stats")] - fn stats(&self) -> MutexGuard { - self.stats.lock() - } -} - -#[cfg(feature = "stats")] -impl HeaderMapKernelStats { - fn new(frequency: usize) -> Self { - Self { - frequency, - ..Default::default() - } - } - - fn frequency(&self) -> usize { - self.frequency - } - - fn trace_progress(&self) -> usize { - self.trace_progress - } - - fn trace_progress_reset(&mut self) { - self.trace_progress = 1; - } - - fn trace_progress_tick(&mut self) { - self.trace_progress += 1; - } - - fn tick_primary_contain(&mut self) { - self.primary_contain += 1; - } - - fn tick_backend_contain(&mut self) { - self.backend_contain += 1; - } - - fn tick_primary_select(&mut self) { - self.primary_select += 1; - } - - fn tick_primary_insert(&mut self) { - self.primary_insert += 1; - } - - fn tick_primary_delete(&mut self) { - self.primary_delete += 1; - } - - fn tick_backend_delete(&mut self) { - self.backend_delete += 1; - } -} diff --git a/sync/src/types/header_map/memory.rs b/sync/src/types/header_map/memory.rs deleted file mode 100644 index 39d85506c7a..00000000000 --- a/sync/src/types/header_map/memory.rs +++ /dev/null @@ -1,62 +0,0 @@ -use crate::types::HeaderView; -use crate::types::SHRINK_THRESHOLD; -use ckb_types::packed::Byte32; -use ckb_util::shrink_to_fit; -use ckb_util::LinkedHashMap; -use ckb_util::RwLock; -use std::default; - -pub(crate) struct MemoryMap(RwLock>); - -impl default::Default for MemoryMap { - fn default() -> Self { - Self(RwLock::new(default::Default::default())) - } -} - -impl MemoryMap { - #[cfg(feature = "stats")] - pub(crate) fn len(&self) -> usize { - self.0.read().len() - } - - pub(crate) fn contains_key(&self, key: &Byte32) -> bool { - self.0.read().contains_key(key) - } - - pub(crate) fn get_refresh(&self, key: &Byte32) -> Option { - let mut guard = self.0.write(); - guard.get_refresh(key).cloned() - } - - pub(crate) fn insert(&self, key: Byte32, value: HeaderView) -> Option<()> { - let mut guard = self.0.write(); - guard.insert(key, value).map(|_| ()) - } - - pub(crate) fn remove(&self, key: &Byte32) -> Option { - let mut guard = self.0.write(); - let ret = guard.remove(key); - shrink_to_fit!(guard, SHRINK_THRESHOLD); - ret - } - - pub(crate) fn front_n(&self, size_limit: usize) -> Option> { - let guard = self.0.read(); - let size = guard.len(); - if size > size_limit { - let num = size - size_limit; - Some(guard.values().take(num).cloned().collect()) - } else { - None - } - } - - pub(crate) fn remove_batch(&self, keys: impl Iterator) { - let mut guard = self.0.write(); - for key in keys { - guard.remove(&key); - } - shrink_to_fit!(guard, SHRINK_THRESHOLD); - } -} diff --git a/sync/src/types/header_map/mod.rs b/sync/src/types/header_map/mod.rs deleted file mode 100644 index d6dde661f4f..00000000000 --- a/sync/src/types/header_map/mod.rs +++ /dev/null @@ -1,90 +0,0 @@ -use crate::types::HeaderView; -use ckb_async_runtime::Handle; -use ckb_stop_handler::{SignalSender, StopHandler}; -use ckb_types::packed::{self, Byte32}; -use std::path; -use std::sync::Arc; -use std::time::Duration; -use tokio::sync::oneshot; -use tokio::time::MissedTickBehavior; - -mod backend; -mod backend_sled; -mod kernel_lru; -mod memory; - -pub(crate) use self::{ - backend::KeyValueBackend, backend_sled::SledBackend, kernel_lru::HeaderMapKernel, - memory::MemoryMap, -}; - -pub struct HeaderMap { - inner: Arc>, - stop: StopHandler<()>, -} - -impl Drop for HeaderMap { - fn drop(&mut self) { - self.stop.try_send(()); - } -} - -const INTERVAL: Duration = Duration::from_millis(500); -// key + total_difficulty + skip_hash -const ITEM_BYTES_SIZE: usize = packed::HeaderView::TOTAL_SIZE + 32 * 3; -const WARN_THRESHOLD: usize = ITEM_BYTES_SIZE * 100_000; - -impl HeaderMap { - pub(crate) fn new

(tmpdir: Option

, memory_limit: usize, async_handle: &Handle) -> Self - where - P: AsRef, - { - if memory_limit < ITEM_BYTES_SIZE { - panic!("The limit setting is too low"); - } - if memory_limit < WARN_THRESHOLD { - ckb_logger::warn!( - "The low memory limit setting {} will result in inefficient synchronization", - memory_limit - ); - } - let size_limit = memory_limit / ITEM_BYTES_SIZE; - let inner = Arc::new(HeaderMapKernel::new(tmpdir, size_limit)); - let map = Arc::clone(&inner); - let (stop, mut stop_rx) = oneshot::channel::<()>(); - - async_handle.spawn(async move { - let mut interval = tokio::time::interval(INTERVAL); - interval.set_missed_tick_behavior(MissedTickBehavior::Skip); - loop { - tokio::select! { - _ = interval.tick() => { - map.limit_memory(); - } - _ = &mut stop_rx => break, - } - } - }); - - Self { - inner, - stop: StopHandler::new(SignalSender::Tokio(stop), None, "HeaderMap".to_string()), - } - } - - pub(crate) fn contains_key(&self, hash: &Byte32) -> bool { - self.inner.contains_key(hash) - } - - pub(crate) fn get(&self, hash: &Byte32) -> Option { - self.inner.get(hash) - } - - pub(crate) fn insert(&self, view: HeaderView) -> Option<()> { - self.inner.insert(view) - } - - pub(crate) fn remove(&self, hash: &Byte32) { - self.inner.remove(hash) - } -} diff --git a/sync/src/types/mod.rs b/sync/src/types/mod.rs index 72495dcebd6..f22d45b9ac4 100644 --- a/sync/src/types/mod.rs +++ b/sync/src/types/mod.rs @@ -1,4 +1,3 @@ -use crate::block_status::BlockStatus; use crate::orphan_block_pool::OrphanBlockPool; use crate::utils::is_internal_db_error; use crate::{Status, StatusCode, FAST_INDEX, LOW_INDEX, NORMAL_INDEX, TIME_TRACE_SIZE}; @@ -17,6 +16,7 @@ use ckb_error::Error as CKBError; use ckb_logger::{debug, error, trace}; use ckb_network::{CKBProtocolContext, PeerIndex, SupportProtocols}; use ckb_shared::{shared::Shared, Snapshot}; +use ckb_shared::{BlockStatus, HeaderView}; use ckb_store::{ChainDB, ChainStore}; use ckb_systemtime::unix_time_as_millis; use ckb_traits::HeaderProvider; @@ -40,8 +40,6 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use std::{cmp, fmt, iter}; -mod header_map; - use crate::utils::send_message; use ckb_types::core::EpochNumber; pub use header_map::HeaderMap; @@ -976,174 +974,6 @@ impl Peers { } } -#[derive(Clone, Debug, PartialEq, Eq)] -pub struct HeaderView { - inner: core::HeaderView, - total_difficulty: U256, - // pointer to the index of some further predecessor of this block - pub(crate) skip_hash: Option, -} - -impl HeaderView { - pub fn new(inner: core::HeaderView, total_difficulty: U256) -> Self { - HeaderView { - inner, - total_difficulty, - skip_hash: None, - } - } - - pub fn number(&self) -> BlockNumber { - self.inner.number() - } - - pub fn hash(&self) -> Byte32 { - self.inner.hash() - } - - pub fn parent_hash(&self) -> Byte32 { - self.inner.data().raw().parent_hash() - } - - pub fn timestamp(&self) -> u64 { - self.inner.timestamp() - } - - pub fn total_difficulty(&self) -> &U256 { - &self.total_difficulty - } - - pub fn inner(&self) -> &core::HeaderView { - &self.inner - } - - pub fn into_inner(self) -> core::HeaderView { - self.inner - } - - pub fn build_skip(&mut self, tip_number: BlockNumber, get_header_view: F, fast_scanner: G) - where - F: FnMut(&Byte32, Option) -> Option, - G: Fn(BlockNumber, &HeaderView) -> Option, - { - if self.inner.is_genesis() { - return; - } - self.skip_hash = self - .clone() - .get_ancestor( - tip_number, - get_skip_height(self.number()), - get_header_view, - fast_scanner, - ) - .map(|header| header.hash()); - } - - // NOTE: get_header_view may change source state, for cache or for tests - pub fn get_ancestor( - self, - tip_number: BlockNumber, - number: BlockNumber, - mut get_header_view: F, - fast_scanner: G, - ) -> Option - where - F: FnMut(&Byte32, Option) -> Option, - G: Fn(BlockNumber, &HeaderView) -> Option, - { - let mut current = self; - if number > current.number() { - return None; - } - - let mut number_walk = current.number(); - while number_walk > number { - let number_skip = get_skip_height(number_walk); - let number_skip_prev = get_skip_height(number_walk - 1); - let store_first = current.number() <= tip_number; - match current.skip_hash { - Some(ref hash) - if number_skip == number - || (number_skip > number - && !(number_skip_prev + 2 < number_skip - && number_skip_prev >= number)) => - { - // Only follow skip if parent->skip isn't better than skip->parent - current = get_header_view(hash, Some(store_first))?; - number_walk = number_skip; - } - _ => { - current = get_header_view(¤t.parent_hash(), Some(store_first))?; - number_walk -= 1; - } - } - if let Some(target) = fast_scanner(number, ¤t) { - current = target; - break; - } - } - Some(current).map(HeaderView::into_inner) - } - - pub fn is_better_than(&self, total_difficulty: &U256) -> bool { - self.total_difficulty() > total_difficulty - } - - fn from_slice_should_be_ok(slice: &[u8]) -> Self { - let len_size = packed::Uint32Reader::TOTAL_SIZE; - if slice.len() < len_size { - panic!("failed to unpack item in header map: header part is broken"); - } - let mut idx = 0; - let inner_len = { - let reader = packed::Uint32Reader::from_slice_should_be_ok(&slice[idx..idx + len_size]); - Unpack::::unpack(&reader) as usize - }; - idx += len_size; - let total_difficulty_len = packed::Uint256Reader::TOTAL_SIZE; - if slice.len() < len_size + inner_len + total_difficulty_len { - panic!("failed to unpack item in header map: body part is broken"); - } - let inner = { - let reader = - packed::HeaderViewReader::from_slice_should_be_ok(&slice[idx..idx + inner_len]); - Unpack::::unpack(&reader) - }; - idx += inner_len; - let total_difficulty = { - let reader = packed::Uint256Reader::from_slice_should_be_ok( - &slice[idx..idx + total_difficulty_len], - ); - Unpack::::unpack(&reader) - }; - idx += total_difficulty_len; - let skip_hash = { - packed::Byte32OptReader::from_slice_should_be_ok(&slice[idx..]) - .to_entity() - .to_opt() - }; - Self { - inner, - total_difficulty, - skip_hash, - } - } - - fn to_vec(&self) -> Vec { - let mut v = Vec::new(); - let inner: packed::HeaderView = self.inner.pack(); - let total_difficulty: packed::Uint256 = self.total_difficulty.pack(); - let skip_hash: packed::Byte32Opt = Pack::pack(&self.skip_hash); - let inner_len: packed::Uint32 = (inner.as_slice().len() as u32).pack(); - v.extend_from_slice(inner_len.as_slice()); - v.extend_from_slice(inner.as_slice()); - v.extend_from_slice(total_difficulty.as_slice()); - v.extend_from_slice(skip_hash.as_slice()); - v - } -} - // Compute what height to jump back to with the skip pointer. fn get_skip_height(height: BlockNumber) -> BlockNumber { // Turn the lowest '1' bit in the binary representation of a number into a '0'. @@ -1222,7 +1052,6 @@ impl SyncShared { let state = SyncState { shared_best_header, - header_map, block_status_map: DashMap::new(), tx_filter: Mutex::new(TtlFilter::default()), unknown_tx_hashes: Mutex::new(KeyedPriorityQueue::new()), @@ -1350,7 +1179,7 @@ impl SyncShared { .state .clean_expired_blocks(self.active_chain().epoch_ext().number()); for hash in hashes { - self.state.remove_header_view(&hash); + self.shared().remove_header_view(&hash); } } @@ -1428,7 +1257,7 @@ impl SyncShared { } }, ); - self.state.header_map.insert(header_view.clone()); + self.shared().header_map().insert(header_view.clone()); self.state .peers() .may_set_best_known_header(peer, header_view.clone()); @@ -1450,9 +1279,9 @@ impl SyncShared { .get_block_ext(hash) .map(|block_ext| HeaderView::new(header, block_ext.total_difficulty)) }) - .or_else(|| self.state.header_map.get(hash)) + .or_else(|| self.shared().header_map().get(hash)) } else { - self.state.header_map.get(hash).or_else(|| { + self.shared().header_map().get(hash).or_else(|| { store.get_block_header(hash).and_then(|header| { store .get_block_ext(hash) @@ -1472,12 +1301,28 @@ impl SyncShared { pub fn get_epoch_ext(&self, hash: &Byte32) -> Option { self.store().get_block_epoch(hash) } + + pub fn insert_peer_unknown_header_list(&self, pi: PeerIndex, header_list: Vec) { + // update peer's unknown_header_list only once + if self.state().peers.unknown_header_list_is_empty(pi) { + // header list is an ordered list, sorted from highest to lowest, + // so here you discard and exit early + for hash in header_list { + if let Some(header) = self.shared().header_map().get(&hash) { + self.state().peers.may_set_best_known_header(pi, header); + break; + } else { + self.state().peers.insert_unknown_header_hash(pi, hash) + } + } + } + } } impl HeaderProvider for SyncShared { fn get_header(&self, hash: &Byte32) -> Option { - self.state - .header_map + self.shared() + .header_map() .get(hash) .map(HeaderView::into_inner) .or_else(|| self.store().get_block_header(hash)) @@ -1549,7 +1394,6 @@ impl PartialOrd for UnknownTxHashPriority { pub struct SyncState { /* Status irrelevant to peers */ shared_best_header: RwLock, - header_map: HeaderMap, block_status_map: DashMap, tx_filter: Mutex>, @@ -1632,10 +1476,6 @@ impl SyncState { self.shared_best_header.read() } - pub fn header_map(&self) -> &HeaderMap { - &self.header_map - } - pub fn may_set_shared_best_header(&self, header: HeaderView) { if !header.is_better_than(self.shared_best_header.read().total_difficulty()) { return; @@ -1647,10 +1487,6 @@ impl SyncState { *self.shared_best_header.write() = header; } - pub fn remove_header_view(&self, hash: &Byte32) { - self.header_map.remove(hash); - } - pub(crate) fn suspend_sync(&self, peer_state: &mut PeerState) { if peer_state.sync_started() { assert_ne!( @@ -1896,22 +1732,6 @@ impl SyncState { pub fn clean_expired_blocks(&self, epoch: EpochNumber) -> Vec { self.orphan_block_pool.clean_expired_blocks(epoch) } - - pub fn insert_peer_unknown_header_list(&self, pi: PeerIndex, header_list: Vec) { - // update peer's unknown_header_list only once - if self.peers.unknown_header_list_is_empty(pi) { - // header list is an ordered list, sorted from highest to lowest, - // so here you discard and exit early - for hash in header_list { - if let Some(header) = self.header_map.get(&hash) { - self.peers.may_set_best_known_header(pi, header); - break; - } else { - self.peers.insert_unknown_header_hash(pi, hash) - } - } - } - } } /** ActiveChain captures a point-in-time view of indexed chain of blocks. */ @@ -2260,7 +2080,7 @@ impl ActiveChain { match self.state.block_status_map.get(block_hash) { Some(status_ref) => *status_ref.value(), None => { - if self.state.header_map.contains_key(block_hash) { + if self.shared().shared().header_map().contains_key(block_hash) { BlockStatus::HEADER_VALID } else { let verified = self