diff --git a/chain/src/chain.rs b/chain/src/chain.rs index aec614756f6..1c94d85ef35 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -1,6 +1,7 @@ //! CKB chain service. #![allow(missing_docs)] +use crate::orphan_block_pool::OrphanBlockPool; use ckb_channel::{self as channel, select, Receiver, Sender}; use ckb_error::{Error, InternalErrorKind}; use ckb_logger::Level::Trace; @@ -11,7 +12,7 @@ use ckb_merkle_mountain_range::leaf_index_to_mmr_size; use ckb_proposal_table::ProposalTable; #[cfg(debug_assertions)] use ckb_rust_unstable_port::IsSorted; -use ckb_shared::{shared::Shared, OrphanBlockPool}; +use ckb_shared::shared::Shared; use ckb_stop_handler::{SignalSender, StopHandler}; use ckb_store::{attach_block_cell, detach_block_cell, ChainStore, StoreTransaction}; use ckb_systemtime::unix_time_as_millis; @@ -39,6 +40,8 @@ use std::sync::Arc; use std::time::Duration; use std::{cmp, thread}; +const ORPHAN_BLOCK_SIZE: usize = 100000; + type ProcessBlockRequest = Request<(Arc, Switch), Result>; type TruncateRequest = Request>; @@ -54,6 +57,8 @@ pub struct ChainController { truncate_sender: Sender, // Used for testing only stop: Option>, + + orphan_block_broker: Arc, } impl Drop for ChainController { @@ -68,11 +73,13 @@ impl ChainController { process_block_sender: Sender, truncate_sender: Sender, stop: StopHandler<()>, + orphan_block_broker: Arc, ) -> Self { ChainController { process_block_sender, truncate_sender, stop: Some(stop), + orphan_block_broker, } } /// Inserts the block into database. @@ -125,8 +132,14 @@ impl ChainController { stop: None, truncate_sender: self.truncate_sender.clone(), process_block_sender: self.process_block_sender.clone(), + orphan_block_broker: self.orphan_block_broker.clone(), } } + + // Relay need this + pub fn get_orphan_block(&self, hash: &Byte32) -> Option { + todo!("load orphan block") + } } /// The struct represent fork @@ -208,6 +221,8 @@ pub struct ChainService { shared: Shared, proposal_table: Arc>, + orphan_blocks_broker: Arc, + unverified_queue: Arc>, } @@ -220,13 +235,10 @@ impl ChainService { shared, proposal_table: Arc::new(Mutex::new(proposal_table)), unverified_queue, + orphan_blocks_broker: Arc::new(OrphanBlockPool::with_capacity(ORPHAN_BLOCK_SIZE)), } } - fn orphan_blocks_broker(&self) -> Arc { - self.shared.orphan_block_pool() - } - /// start background single-threaded service with specified thread_name. pub fn start(mut self, thread_name: Option) -> ChainController { let (signal_sender, signal_receiver) = channel::bounded::<()>(SIGNAL_CHANNEL_SIZE); @@ -240,6 +252,7 @@ impl ChainService { } let tx_control = self.shared.tx_pool_controller().clone(); let (unverified_queue_stop_tx, unverified_queue_stop_rx) = ckb_channel::bounded::<()>(1); + let orphan_blocks_broker_clone = Arc::clone(&self.orphan_blocks_broker); let thread = thread_builder .spawn({ @@ -293,7 +306,12 @@ impl ChainService { "chain".to_string(), ); - ChainController::new(process_block_sender, truncate_sender, stop) + ChainController::new( + process_block_sender, + truncate_sender, + stop, + orphan_blocks_broker_clone, + ) } fn consume_unverified_blocks(&self, unverified_queue_stop_rx: Receiver<()>) { @@ -465,23 +483,22 @@ impl ChainService { db_txn.insert_block(block.as_ref())?; db_txn.commit()?; - self.orphan_blocks_broker() - .insert(block.as_ref().to_owned()); + self.orphan_blocks_broker.insert(block.as_ref().to_owned()); { debug!( "there are {} orphan blocks in orphan_blocks_broker", - self.orphan_blocks_broker().len() + self.orphan_blocks_broker.len() ); } - for leader_hash in self.orphan_blocks_broker().clone_leaders() { + for leader_hash in self.orphan_blocks_broker.clone_leaders() { if !db_txn.get_block_epoch_index(&leader_hash).is_some() { trace!("a orphan leader: {} not stored", leader_hash); continue; } for descendant in self - .orphan_blocks_broker() + .orphan_blocks_broker .remove_blocks_by_parent(&leader_hash) { match self.accept_block(descendant.hash()) { diff --git a/chain/src/lib.rs b/chain/src/lib.rs index 5898633b834..2def5177fc8 100644 --- a/chain/src/lib.rs +++ b/chain/src/lib.rs @@ -7,5 +7,6 @@ //! [`ChainController`]: chain/struct.ChainController.html pub mod chain; +mod orphan_block_pool; #[cfg(test)] mod tests; diff --git a/shared/src/orphan_block_pool.rs b/chain/src/orphan_block_pool.rs similarity index 100% rename from shared/src/orphan_block_pool.rs rename to chain/src/orphan_block_pool.rs diff --git a/shared/src/lib.rs b/shared/src/lib.rs index 699b1f81548..c9d0791610c 100644 --- a/shared/src/lib.rs +++ b/shared/src/lib.rs @@ -2,12 +2,10 @@ // num_cpus is used in proc_macro mod block_status; -mod orphan_block_pool; pub mod shared; mod types; pub use block_status::BlockStatus; pub use ckb_snapshot::{Snapshot, SnapshotMgr}; -pub use orphan_block_pool::{OrphanBlockPool, ParentHash}; pub use shared::Shared; pub use types::{HeaderMap, HeaderView}; diff --git a/shared/src/shared.rs b/shared/src/shared.rs index 89d25300ddd..f4c7ee31315 100644 --- a/shared/src/shared.rs +++ b/shared/src/shared.rs @@ -1,7 +1,7 @@ //! TODO(doc): @quake use crate::block_status::BlockStatus; use crate::types::HeaderMap; -use crate::{OrphanBlockPool, Snapshot, SnapshotMgr}; +use crate::{Snapshot, SnapshotMgr}; use arc_swap::Guard; use ckb_async_runtime::Handle; use ckb_chain_spec::consensus::Consensus; @@ -37,7 +37,6 @@ const FREEZER_INTERVAL: Duration = Duration::from_secs(60); const THRESHOLD_EPOCH: EpochNumber = 2; const MAX_FREEZE_LIMIT: BlockNumber = 30_000; const SHRINK_THRESHOLD: usize = 300; -const ORPHAN_BLOCK_SIZE: usize = 100000; /// An owned permission to close on a freezer thread pub struct FreezerClose { @@ -67,7 +66,6 @@ pub struct Shared { pub(crate) header_map: Arc, pub(crate) block_status_map: DashMap, pub(crate) unverified_tip: Arc>, - pub(crate) orphan_block_pool: Arc, } impl Shared { @@ -100,7 +98,6 @@ impl Shared { header_map: Arc::new(header_map), block_status_map: DashMap::new(), unverified_tip: Arc::new(Mutex::new(unverified_tip)), - orphan_block_pool: Arc::new(OrphanBlockPool::with_capacity(ORPHAN_BLOCK_SIZE)), } } /// Spawn freeze background thread that periodically checks and moves ancient data from the kv database into the freezer. @@ -457,13 +454,4 @@ impl Shared { pub fn remove_header_view(&self, hash: &Byte32) { self.header_map().remove(hash); } - - pub fn orphan_block_pool(&self) -> Arc { - Arc::clone(&self.orphan_block_pool) - } - - pub fn get_orphan_block(&self, block_hash: &Byte32) -> Option { - todo!("// TODO move orphan blocks from ckb_chain to ckb_shared"); - self.orphan_block_pool.get_block(block_hash) - } } diff --git a/sync/src/relayer/mod.rs b/sync/src/relayer/mod.rs index ad9a5455d51..16b2bddb185 100644 --- a/sync/src/relayer/mod.rs +++ b/sync/src/relayer/mod.rs @@ -458,7 +458,7 @@ impl Relayer { } } BlockStatus::BLOCK_RECEIVED => { - if let Some(uncle) = self.shared.shared().get_orphan_block(&uncle_hash) { + if let Some(uncle) = self.chain.get_orphan_block(&uncle_hash) { uncles.push(uncle.as_uncle().data()); } else { debug_target!(