From 6bbb93c5410e0c25ff4bc5a5528f2c3f2a11b05a Mon Sep 17 00:00:00 2001 From: Eval EXEC <execvy@gmail.com> Date: Mon, 9 Oct 2023 12:18:00 +0800 Subject: [PATCH] Unify process_block's return type as `VerifyResult` --- chain/src/chain.rs | 36 +++++++++++++++++++++++------------- rpc/src/module/miner.rs | 23 +++++++++++------------ sync/src/relayer/mod.rs | 16 +++++++++++----- sync/src/types/mod.rs | 6 ++++-- 4 files changed, 49 insertions(+), 32 deletions(-) diff --git a/chain/src/chain.rs b/chain/src/chain.rs index e3aa3f816c..59cce57ca3 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -54,12 +54,15 @@ const ORPHAN_BLOCK_SIZE: usize = (BLOCK_DOWNLOAD_WINDOW * 2) as usize; type ProcessBlockRequest = Request<LonelyBlock, ()>; type TruncateRequest = Request<Byte32, Result<(), Error>>; -pub type VerifyCallback = dyn FnOnce(Result<VerifiedBlockStatus, ckb_error::Error>) + Send + Sync; +pub type VerifyResult = Result<VerifiedBlockStatus, Error>; + +pub type VerifyCallback = dyn FnOnce(VerifyResult) + Send + Sync; /// VerifiedBlockStatus is pub enum VerifiedBlockStatus { // The block is being seen for the first time. - FirstSeen, + FirstSeenAndVerified, + FirstSeenButNotVerified, // The block has been verified before. PreviouslyVerified, @@ -376,10 +379,7 @@ impl ChainService { } } - fn consume_unverified_blocks( - &self, - unverified_block: &UnverifiedBlock, - ) -> Result<VerifiedBlockStatus, Error> { + fn consume_unverified_blocks(&self, unverified_block: &UnverifiedBlock) -> VerifyResult { // process this unverified block let verify_result = self.verify_block(unverified_block); match &verify_result { @@ -687,7 +687,7 @@ impl ChainService { match lonely_block_tx.send(lonely_block) { Ok(_) => {} Err(SendError(lonely_block)) => { - error!("notify new block to orphan pool err: {}", err); + error!("failed to notify new block to orphan pool"); if let Some(verify_callback) = lonely_block.verify_callback { verify_callback(Err(InternalErrorKind::System .other("OrphanBlock broker disconnected") @@ -788,7 +788,7 @@ impl ChainService { Ok(Some((parent_header, cannon_total_difficulty))) } - fn verify_block(&self, unverified_block: &UnverifiedBlock) -> Result<(), Error> { + fn verify_block(&self, unverified_block: &UnverifiedBlock) -> VerifyResult { let log_now = std::time::Instant::now(); let UnverifiedBlock { @@ -803,20 +803,28 @@ impl ChainService { .shared .store() .get_block_ext(&block.data().header().raw().parent_hash()) - .expect("parent already store"); + .expect("parent should be stored already"); if let Some(ext) = self.shared.store().get_block_ext(&block.hash()) { match ext.verified { Some(verified) => { debug!( - "block {}-{} has been verified: {}", + "block {}-{} has been verified, previously verified result: {}", block.number(), block.hash(), verified ); - return Ok(()); + return if verified { + Ok(VerifiedBlockStatus::PreviouslyVerified) + } else { + Err(InternalErrorKind::Other + .other("block previously verified failed") + .into()) + }; + } + _ => { + // we didn't verify this block, going on verify now } - _ => {} } } @@ -934,6 +942,8 @@ impl ChainService { if let Some(metrics) = ckb_metrics::handle() { metrics.ckb_chain_tip.set(block.header().number() as i64); } + + Ok(VerifiedBlockStatus::FirstSeenAndVerified) } else { self.shared.refresh_snapshot(); info!( @@ -952,8 +962,8 @@ impl ChainService { error!("[verify block] notify new_uncle error {}", e); } } + Ok(VerifiedBlockStatus::FirstSeenButNotVerified) } - Ok(()) } fn insert_block(&mut self, block: Arc<BlockView>, switch: Switch) -> Result<bool, Error> { diff --git a/rpc/src/module/miner.rs b/rpc/src/module/miner.rs index c923158211..f625e83a9f 100644 --- a/rpc/src/module/miner.rs +++ b/rpc/src/module/miner.rs @@ -1,5 +1,5 @@ use crate::error::RPCError; -use ckb_chain::chain::ChainController; +use ckb_chain::chain::{ChainController, VerifiedBlockStatus, VerifyResult}; use ckb_jsonrpc_types::{Block, BlockTemplate, Uint64, Version}; use ckb_logger::{debug, error, info, warn}; use ckb_network::{NetworkController, PeerIndex, SupportProtocols, TargetSession}; @@ -271,17 +271,16 @@ impl MinerRpc for MinerRpcImpl { .verify(&header) .map_err(|err| handle_submit_error(&work_id, &err))?; - let (verify_result_tx, verify_result_rx) = - ckb_channel::oneshot::channel::<std::result::Result<(), ckb_error::Error>>(); - let verify_callback = - move |verify_result: std::result::Result<(), ckb_error::Error>| match verify_result_tx - .send(verify_result) - { - Err(_) => { - error!("send verify result failed, the Receiver in MinerRpc is disconnected") - } - _ => {} - }; + let (verify_result_tx, verify_result_rx) = ckb_channel::oneshot::channel::<VerifyResult>(); + let verify_callback = move |verify_result: std::result::Result< + VerifiedBlockStatus, + ckb_error::Error, + >| match verify_result_tx.send(verify_result) { + Err(_) => { + error!("send verify result failed, the Receiver in MinerRpc is disconnected") + } + _ => {} + }; self.chain .process_block_with_callback(Arc::clone(&block), Box::new(verify_callback)); diff --git a/sync/src/relayer/mod.rs b/sync/src/relayer/mod.rs index f1b531b070..24178b5478 100644 --- a/sync/src/relayer/mod.rs +++ b/sync/src/relayer/mod.rs @@ -23,7 +23,7 @@ use self::transactions_process::TransactionsProcess; use crate::types::{ActiveChain, SyncShared}; use crate::utils::{metric_ckb_message_bytes, send_message_to, MetricDirection}; use crate::{Status, StatusCode}; -use ckb_chain::chain::ChainController; +use ckb_chain::chain::{ChainController, VerifiedBlockStatus, VerifyResult}; use ckb_constant::sync::BAD_MESSAGE_BAN_TIME; use ckb_logger::{debug_target, error, error_target, info_target, trace_target, warn_target}; use ckb_network::{ @@ -313,12 +313,18 @@ impl Relayer { let broadcast_compact_block_tx = self.broadcast_compact_block_tx.clone(); let block = Arc::clone(&block); let peer = peer.clone(); - move |result: Result<(), ckb_error::Error>| match result { - Ok(()) => match broadcast_compact_block_tx.send((block, peer)) { - Err(_) => { - error!( + move |result: VerifyResult| match result { + Ok(verified_block_status) => match verified_block_status { + VerifiedBlockStatus::FirstSeenAndVerified + | VerifiedBlockStatus::FirstSeenAndVerified => { + match broadcast_compact_block_tx.send((block, peer)) { + Err(_) => { + error!( "send block to broadcast_compact_block_tx failed, this shouldn't happen", ); + } + _ => {} + } } _ => {} }, diff --git a/sync/src/types/mod.rs b/sync/src/types/mod.rs index 95a88e30be..b3f215bcdd 100644 --- a/sync/src/types/mod.rs +++ b/sync/src/types/mod.rs @@ -2,7 +2,9 @@ use crate::orphan_block_pool::OrphanBlockPool; use crate::utils::is_internal_db_error; use crate::{Status, StatusCode, FAST_INDEX, LOW_INDEX, NORMAL_INDEX, TIME_TRACE_SIZE}; use ckb_app_config::SyncConfig; -use ckb_chain::chain::{ChainController, LonelyBlock, VerifyCallback}; +use ckb_chain::chain::{ + ChainController, LonelyBlock, VerifiedBlockStatus, VerifyCallback, VerifyResult, +}; use ckb_chain_spec::consensus::Consensus; use ckb_channel::Receiver; use ckb_constant::sync::{ @@ -1082,7 +1084,7 @@ impl SyncShared { chain: &ChainController, block: Arc<core::BlockView>, peer_id: PeerIndex, - verify_success_callback: impl FnOnce(Result<(), ckb_error::Error>) + Send + Sync + 'static, + verify_success_callback: impl FnOnce(VerifyResult) + Send + Sync + 'static, ) { self.accept_block( chain,