diff --git a/chain/src/chain.rs b/chain/src/chain.rs index cf2e6bc326..415f2d9fd3 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -2,7 +2,8 @@ #![allow(missing_docs)] use crate::orphan_block_pool::OrphanBlockPool; -use ckb_channel::{self as channel, select, Receiver, Sender}; +use ckb_chain_spec::versionbits::VersionbitsIndexer; +use ckb_channel::{self as channel, select, Receiver, SendError, Sender}; use ckb_constant::sync::BLOCK_DOWNLOAD_WINDOW; use ckb_error::{Error, InternalErrorKind}; use ckb_logger::Level::Trace; @@ -228,6 +229,9 @@ pub struct ChainService { unverified_tx: Sender<(Byte32, Switch)>, unverified_rx: Receiver<(Byte32, Switch)>, + + new_block_tx: Sender, + new_block_rx: Receiver, } impl ChainService { @@ -236,20 +240,28 @@ impl ChainService { let (unverified_tx, unverified_rx) = channel::bounded::<(Byte32, Switch)>(BLOCK_DOWNLOAD_WINDOW as usize); + let (new_block_tx, new_block_rx) = + channel::bounded::(BLOCK_DOWNLOAD_WINDOW as usize); + ChainService { shared, proposal_table: Arc::new(Mutex::new(proposal_table)), orphan_blocks_broker: Arc::new(OrphanBlockPool::with_capacity(ORPHAN_BLOCK_SIZE)), unverified_tx, unverified_rx, + new_block_tx, + new_block_rx, } } /// start background single-threaded service with specified thread_name. pub fn start(mut self, thread_name: Option) -> ChainController { + let orphan_blocks_broker_clone = Arc::clone(&self.orphan_blocks_broker); + let (signal_sender, signal_receiver) = channel::bounded::<()>(SIGNAL_CHANNEL_SIZE); let (process_block_sender, process_block_receiver) = channel::bounded(BLOCK_DOWNLOAD_WINDOW as usize); + let (truncate_sender, truncate_receiver) = channel::bounded(1); // Mainly for test: give an empty thread_name @@ -259,52 +271,58 @@ impl ChainService { } let tx_control = self.shared.tx_pool_controller().clone(); let (unverified_queue_stop_tx, unverified_queue_stop_rx) = ckb_channel::bounded::<()>(1); - let orphan_blocks_broker_clone = Arc::clone(&self.orphan_blocks_broker); + let (search_orphan_pool_stop_tx, search_orphan_pool_stop_rx) = + ckb_channel::bounded::<()>(1); - let thread = thread_builder + let unverified_consumer_thread = thread::Builder::new() + .name("verify_blocks".into()) .spawn({ - let chain_service_clone = self.clone(); - move || { - let unverified_consumer_thread = thread::Builder::new() - .name("verify_blocks".into()) - .spawn(move || { - chain_service_clone.consume_unverified_blocks(unverified_queue_stop_rx) - }) - .expect("start unverified_queue consumer thread should ok"); - - loop { - select! { - recv(signal_receiver) -> _ => { - debug!("ChainService received signal"); - unverified_queue_stop_tx.send(()); - unverified_consumer_thread.join(); - break; - }, - recv(process_block_receiver) -> msg => match msg { - Ok(Request { responder, arguments: (block, verify) }) => { - let _ = responder.send(Ok(false)); - - let _ = tx_control.suspend_chunk_process(); - self.process_block_v2(block, verify); - let _ = tx_control.continue_chunk_process(); - }, - _ => { - error!("process_block_receiver closed"); - break; - }, - }, - recv(truncate_receiver) -> msg => match msg { - Ok(Request { responder, arguments: target_tip_hash }) => { - let _ = tx_control.suspend_chunk_process(); - let _ = responder.send(self.truncate(&target_tip_hash)); - let _ = tx_control.continue_chunk_process(); - }, - _ => { - error!("truncate_receiver closed"); - break; - }, - } - } + let chain_service = self.clone(); + move || chain_service.start_consume_unverified_blocks(unverified_queue_stop_rx) + }) + .expect("start unverified_queue consumer thread should ok"); + + let search_orphan_pool_thread = thread::Builder::new() + .name("search_orphan".into()) + .spawn({ + let chain_service = self.clone(); + move || chain_service.start_search_orphan_pool(search_orphan_pool_stop_rx) + }) + .expect("start search_orphan_pool thread should ok"); + + let thread = thread_builder + .spawn(move || loop { + select! { + recv(signal_receiver) -> _ => { + debug!("ChainService received signal"); + unverified_queue_stop_tx.send(()); + search_orphan_pool_stop_tx.send(()); + + search_orphan_pool_thread.join(); + unverified_consumer_thread.join(); + break; + }, + recv(process_block_receiver) -> msg => match msg { + Ok(Request { responder, arguments: (block, verify) }) => { + let _ = tx_control.suspend_chunk_process(); + let _ = responder.send(self.process_block_v2(block, verify)); + let _ = tx_control.continue_chunk_process(); + }, + _ => { + error!("process_block_receiver closed"); + break; + }, + }, + recv(truncate_receiver) -> msg => match msg { + Ok(Request { responder, arguments: target_tip_hash }) => { + let _ = tx_control.suspend_chunk_process(); + let _ = responder.send(self.truncate(&target_tip_hash)); + let _ = tx_control.continue_chunk_process(); + }, + _ => { + error!("truncate_receiver closed"); + break; + }, } } }) @@ -323,7 +341,7 @@ impl ChainService { ) } - fn consume_unverified_blocks(&self, unverified_queue_stop_rx: Receiver<()>) { + fn start_consume_unverified_blocks(&self, unverified_queue_stop_rx: Receiver<()>) { loop { select! { recv(unverified_queue_stop_rx) -> _ => { @@ -333,32 +351,10 @@ impl ChainService { recv(self.unverified_rx) -> msg => match msg { Ok((block_hash, switch)) => { // process this unverified block - match self.verify_block(&block_hash, switch) { - Ok(_) => { - self.shared.remove_block_status(&block_hash); - self.shared.remove_header_view(&block_hash); - } - Err(err) => { - error!("verify block {} failed: {}", block_hash, err); - // TODO punish the peer who give me the bad block - - // TODO decrease unverified_tip - let chain_tip = self - .shared - .store() - .get_tip_header() - .expect("tip_header must exist"); - self.shared.set_unverified_tip(chain_tip.clone()); - error!( - "set_unverified tip to {}-{}", - chain_tip.number(), - chain_tip.hash() - ); - } - } + self.consume_unverified_blocks((block_hash, switch)) }, - _ => { - error!("unverified_rx closed"); + Err(err) => { + error!("unverified_rx err: {}", err); return; }, }, @@ -367,6 +363,130 @@ impl ChainService { } } + fn consume_unverified_blocks(&self, (block_hash, switch): (Byte32, Switch)) { + // process this unverified block + match self.verify_block(&block_hash, switch) { + Ok(_) => { + self.shared.remove_block_status(&block_hash); + self.shared.remove_header_view(&block_hash); + } + Err(err) => { + error!("verify block {} failed: {}", block_hash.clone(), err); + // TODO punish the peer who give me the bad block + + // TODO decrease unverified_tip + let tip = self + .shared + .store() + .get_tip_header() + .expect("tip_header must exist"); + let tip_ext = self + .shared + .store() + .get_block_ext(&tip.hash()) + .expect("tip header's ext must exist"); + + self.shared.set_unverified_tip(ckb_shared::HeaderView::new( + tip.clone(), + tip_ext.total_difficulty, + )); + + self.shared + .insert_block_status(block_hash.clone(), BlockStatus::BLOCK_INVALID); + error!( + "set_unverified tip to {}-{}, beacause verify {} failed: {}", + tip.number(), + tip.hash(), + block_hash, + err + ); + } + } + } + + fn start_search_orphan_pool(&self, search_orphan_pool_stop_rx: Receiver<()>) { + loop { + select! { + recv(search_orphan_pool_stop_rx) -> _ => { + info!("unverified_queue_consumer got exit signal, exit now"); + return; + }, + recv(self.new_block_rx) -> msg => match msg { + Ok(switch) => { + self.search_orphan_pool(switch) + }, + Err(err) => { + error!("new_block_rx err: {}", err); + return + } + }, + } + } + } + fn search_orphan_pool(&self, switch: Switch) { + for leader_hash in self.orphan_blocks_broker.clone_leaders() { + if !self + .shared + .contains_block_status(&leader_hash, BlockStatus::BLOCK_PARTIAL_STORED) + { + trace!("orphan leader: {} not partial stored", leader_hash); + continue; + } + + let descendants = self + .orphan_blocks_broker + .remove_blocks_by_parent(&leader_hash); + if descendants.is_empty() { + continue; + } + let mut accept_error_occurred = false; + for descendant in &descendants { + match self.accept_block(descendant) { + Err(err) => { + accept_error_occurred = true; + error!("accept block {} failed: {}", descendant.hash(), err); + continue; + } + Ok(total_difficulty) => { + match self + .unverified_tx + .send_timeout((descendant.hash(), switch), Duration::from_secs(1)) + { + Ok(_) => {} + Err(err) => error!("send unverified_tx failed: {}", err), + }; + + if total_difficulty.gt(self.shared.get_unverified_tip().total_difficulty()) + { + self.shared.set_unverified_tip(ckb_shared::HeaderView::new( + descendant.header(), + total_difficulty, + )); + } + + debug!( + "set unverified_tip to {}-{}, while unverified_tip - verified_tip = {}", + descendant.number(), + descendant.hash(), + descendant + .number() + .saturating_sub(self.shared.snapshot().tip_number()) + ) + } + } + } + + if !accept_error_occurred { + info!( + "accept {} blocks [{}->{}] success", + descendants.len(), + descendants.first().expect("descendants not empty").number(), + descendants.last().expect("descendants not empty").number(), + ) + } + } + } + fn make_fork_for_truncate(&self, target: &HeaderView, current_tip: &HeaderView) -> ForkChanges { let mut fork = ForkChanges::default(); let store = self.shared.store(); @@ -467,84 +587,30 @@ impl ChainService { pub fn process_block_v2(&self, block: Arc, switch: Switch) -> Result { let block_number = block.number(); let block_hash = block.hash(); - if block_number < 1 { warn!("receive 0 number block: 0-{}", block_hash); } - let db_txn = Arc::new(self.shared.store().begin_transaction()); + // if self + // .shared + // .contains_block_status(&block_hash, BlockStatus::BLOCK_RECEIVED) + // { + // debug!("block {}-{} has been stored", block_number, block_hash); + // return Ok(false); + // } + if !switch.disable_non_contextual() { self.non_contextual_verify(&block)?; } - if db_txn.block_exists(&block_hash) { - debug!("block {}-{} has been stored", block_number, block_hash); - return Ok(false); - } - db_txn.insert_block(block.as_ref())?; - db_txn.commit()?; - - self.shared - .insert_block_status(block.hash(), BlockStatus::BLOCK_PARTIAL_STORED); self.orphan_blocks_broker.insert(block.as_ref().to_owned()); - for leader_hash in self.orphan_blocks_broker.clone_leaders() { - if !self - .shared - .contains_block_status(&leader_hash, BlockStatus::BLOCK_PARTIAL_STORED) - { - trace!("orphan leader: {} not partial stored", leader_hash); - continue; - } - - let descendants = self - .orphan_blocks_broker - .remove_blocks_by_parent(&leader_hash); - if descendants.is_empty() { - continue; - } - let mut accept_error_occurred = false; - for descendant in &descendants { - match self.accept_block(descendant) { - Err(err) => { - accept_error_occurred = true; - error!("accept block {} failed: {}", descendant.hash(), err); - continue; - } - Ok(_) => { - match self - .unverified_tx - .send_timeout((descendant.hash(), switch), Duration::from_secs(1)) - { - Ok(_) => {} - Err(err) => error!("send unverified_tx failed: {}", err), - }; - - // TODO set unverified_tip based on total difficulty - self.shared.set_unverified_tip(descendant.header()); - - debug!( - "set unverified_tip to {}-{}, while unverified_tip - verified_tip = {}", - descendant.number(), - descendant.hash(), - descendant - .number() - .saturating_sub(self.shared.snapshot().tip_number()) - ) - } - } - } - - if !accept_error_occurred { - info!( - "accept {} blocks [{}->{}] success", - descendants.len(), - descendants.first().expect("descendants not empty").number(), - descendants.last().expect("descendants not empty").number(), - ) + match self.new_block_tx.send(switch) { + Ok(_) => {} + Err(err) => { + error!("notify new block to orphan pool err: {}", err) } } - debug!( "processing block: {}-{}, orphan_len: {}, (tip:unverified_tip):({}:{})", block_number, @@ -557,10 +623,21 @@ impl ChainService { Ok(false) } - fn accept_block(&self, block: &BlockView) -> Result<(), Error> { - let hash = block.hash(); + fn accept_block(&self, block: &BlockView) -> Result { + let (block_number, block_hash) = (block.number(), block.hash()); + + if self + .shared + .contains_block_status(&block_hash, BlockStatus::BLOCK_PARTIAL_STORED) + { + debug!("block {}-{} has been stored", block_number, block_hash); + return Ok(U256::zero()); + } + let db_txn = Arc::new(self.shared.store().begin_transaction()); + db_txn.insert_block(block)?; + // let block = self // .shared // .store() @@ -624,7 +701,11 @@ impl ChainService { db_txn.insert_block_ext(&block.header().hash(), &ext)?; db_txn.commit()?; - Ok(()) + + self.shared + .insert_block_status(block_hash, BlockStatus::BLOCK_PARTIAL_STORED); + + Ok(cannon_total_difficulty) } fn verify_block(&self, hash: &Byte32, switch: Switch) -> Result { diff --git a/shared/src/shared.rs b/shared/src/shared.rs index 392dc99b8c..d087a6bb12 100644 --- a/shared/src/shared.rs +++ b/shared/src/shared.rs @@ -66,7 +66,7 @@ pub struct Shared { pub(crate) header_map: Arc, pub(crate) block_status_map: DashMap, - pub(crate) unverified_tip: Arc>, + pub(crate) unverified_tip: Arc>, } impl Shared { @@ -83,9 +83,12 @@ impl Shared { ibd_finished: Arc, header_map: HeaderMap, ) -> Shared { - let unverified_tip = store - .get_tip_header() - .unwrap_or(consensus.genesis_block().header()); + let unverified_tip = crate::HeaderView::new( + store + .get_tip_header() + .unwrap_or(consensus.genesis_block().header()), + consensus.genesis_block.difficulty(), + ); Shared { store, tx_pool_controller, @@ -397,10 +400,10 @@ impl Shared { ) } - pub fn set_unverified_tip(&self, header: HeaderView) { + pub fn set_unverified_tip(&self, header: crate::HeaderView) { self.unverified_tip.store(Arc::new(header)); } - pub fn get_unverified_tip(&self) -> HeaderView { + pub fn get_unverified_tip(&self) -> crate::HeaderView { self.unverified_tip.load().as_ref().clone() } diff --git a/sync/src/synchronizer/mod.rs b/sync/src/synchronizer/mod.rs index 244bee225c..f5af2faab1 100644 --- a/sync/src/synchronizer/mod.rs +++ b/sync/src/synchronizer/mod.rs @@ -50,7 +50,7 @@ pub const TIMEOUT_EVICTION_TOKEN: u64 = 3; pub const NO_PEER_CHECK_TOKEN: u64 = 255; const SYNC_NOTIFY_INTERVAL: Duration = Duration::from_secs(1); -const IBD_BLOCK_FETCH_INTERVAL: Duration = Duration::from_millis(500); +const IBD_BLOCK_FETCH_INTERVAL: Duration = Duration::from_millis(40); const NOT_IBD_BLOCK_FETCH_INTERVAL: Duration = Duration::from_millis(200); #[derive(Copy, Clone)] diff --git a/sync/src/types/mod.rs b/sync/src/types/mod.rs index 40d2ca5432..acb00cacee 100644 --- a/sync/src/types/mod.rs +++ b/sync/src/types/mod.rs @@ -1781,7 +1781,7 @@ impl ActiveChain { } pub fn unverified_tip_header(&self) -> core::HeaderView { - self.shared.shared.get_unverified_tip() + self.shared.shared.get_unverified_tip().into_inner() } pub fn unverified_tip_hash(&self) -> Byte32 {