Skip to content

Commit

Permalink
Unify process_block's return type as VerifyResult
Browse files Browse the repository at this point in the history
  • Loading branch information
eval-exec committed Oct 16, 2023
1 parent 6ee86a7 commit 571d3f2
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 32 deletions.
36 changes: 23 additions & 13 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,15 @@ const ORPHAN_BLOCK_SIZE: usize = (BLOCK_DOWNLOAD_WINDOW * 2) as usize;
type ProcessBlockRequest = Request<LonelyBlock, ()>;
type TruncateRequest = Request<Byte32, Result<(), Error>>;

pub type VerifyCallback = dyn FnOnce(Result<VerifiedBlockStatus, ckb_error::Error>) + Send + Sync;
pub type VerifyResult = Result<VerifiedBlockStatus, Error>;

pub type VerifyCallback = dyn FnOnce(VerifyResult) + Send + Sync;

/// VerifiedBlockStatus is
pub enum VerifiedBlockStatus {
// The block is being seen for the first time.
FirstSeen,
FirstSeenAndVerified,
FirstSeenButNotVerified,

// The block has been verified before.
PreviouslyVerified,
Expand Down Expand Up @@ -376,10 +379,7 @@ impl ChainService {
}
}

fn consume_unverified_blocks(
&self,
unverified_block: &UnverifiedBlock,
) -> Result<VerifiedBlockStatus, Error> {
fn consume_unverified_blocks(&self, unverified_block: &UnverifiedBlock) -> VerifyResult {
// process this unverified block
let verify_result = self.verify_block(unverified_block);
match &verify_result {
Expand Down Expand Up @@ -687,7 +687,7 @@ impl ChainService {
match lonely_block_tx.send(lonely_block) {
Ok(_) => {}
Err(SendError(lonely_block)) => {
error!("notify new block to orphan pool err: {}", err);
error!("failed to notify new block to orphan pool");
if let Some(verify_callback) = lonely_block.verify_callback {
verify_callback(Err(InternalErrorKind::System
.other("OrphanBlock broker disconnected")
Expand Down Expand Up @@ -788,7 +788,7 @@ impl ChainService {
Ok(Some((parent_header, cannon_total_difficulty)))
}

fn verify_block(&self, unverified_block: &UnverifiedBlock) -> Result<(), Error> {
fn verify_block(&self, unverified_block: &UnverifiedBlock) -> VerifyResult {
let log_now = std::time::Instant::now();

let UnverifiedBlock {
Expand All @@ -803,20 +803,28 @@ impl ChainService {
.shared
.store()
.get_block_ext(&block.data().header().raw().parent_hash())
.expect("parent already store");
.expect("parent should be stored already");

if let Some(ext) = self.shared.store().get_block_ext(&block.hash()) {
match ext.verified {
Some(verified) => {
debug!(
"block {}-{} has been verified: {}",
"block {}-{} has been verified, previously verified result: {}",
block.number(),
block.hash(),
verified
);
return Ok(());
return if verified {
Ok(VerifiedBlockStatus::PreviouslyVerified)
} else {
Err(InternalErrorKind::Other
.other("block previously verified failed")
.into())
};
}
_ => {
// we didn't verify this block, going on verify now
}
_ => {}
}
}

Expand Down Expand Up @@ -934,6 +942,8 @@ impl ChainService {
if let Some(metrics) = ckb_metrics::handle() {
metrics.ckb_chain_tip.set(block.header().number() as i64);
}

Ok(VerifiedBlockStatus::FirstSeenAndVerified)
} else {
self.shared.refresh_snapshot();
info!(
Expand All @@ -952,8 +962,8 @@ impl ChainService {
error!("[verify block] notify new_uncle error {}", e);
}
}
Ok(VerifiedBlockStatus::FirstSeenButNotVerified)
}
Ok(())
}

fn insert_block(&mut self, block: Arc<BlockView>, switch: Switch) -> Result<bool, Error> {
Expand Down
23 changes: 11 additions & 12 deletions rpc/src/module/miner.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::error::RPCError;
use ckb_chain::chain::ChainController;
use ckb_chain::chain::{ChainController, VerifiedBlockStatus, VerifyResult};
use ckb_jsonrpc_types::{Block, BlockTemplate, Uint64, Version};
use ckb_logger::{debug, error, info, warn};
use ckb_network::{NetworkController, PeerIndex, SupportProtocols, TargetSession};
Expand Down Expand Up @@ -271,17 +271,16 @@ impl MinerRpc for MinerRpcImpl {
.verify(&header)
.map_err(|err| handle_submit_error(&work_id, &err))?;

let (verify_result_tx, verify_result_rx) =
ckb_channel::oneshot::channel::<std::result::Result<(), ckb_error::Error>>();
let verify_callback =
move |verify_result: std::result::Result<(), ckb_error::Error>| match verify_result_tx
.send(verify_result)
{
Err(_) => {
error!("send verify result failed, the Receiver in MinerRpc is disconnected")
}
_ => {}
};
let (verify_result_tx, verify_result_rx) = ckb_channel::oneshot::channel::<VerifyResult>();
let verify_callback = move |verify_result: std::result::Result<
VerifiedBlockStatus,
ckb_error::Error,
>| match verify_result_tx.send(verify_result) {
Err(_) => {
error!("send verify result failed, the Receiver in MinerRpc is disconnected")
}
_ => {}
};

self.chain
.process_block_with_callback(Arc::clone(&block), Box::new(verify_callback));
Expand Down
16 changes: 11 additions & 5 deletions sync/src/relayer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use self::transactions_process::TransactionsProcess;
use crate::types::{ActiveChain, SyncShared};
use crate::utils::{metric_ckb_message_bytes, send_message_to, MetricDirection};
use crate::{Status, StatusCode};
use ckb_chain::chain::ChainController;
use ckb_chain::chain::{ChainController, VerifiedBlockStatus, VerifyResult};
use ckb_constant::sync::BAD_MESSAGE_BAN_TIME;
use ckb_logger::{debug_target, error, error_target, info_target, trace_target, warn_target};
use ckb_network::{
Expand Down Expand Up @@ -313,12 +313,18 @@ impl Relayer {
let broadcast_compact_block_tx = self.broadcast_compact_block_tx.clone();
let block = Arc::clone(&block);
let peer = peer.clone();
move |result: Result<(), ckb_error::Error>| match result {
Ok(()) => match broadcast_compact_block_tx.send((block, peer)) {
Err(_) => {
error!(
move |result: VerifyResult| match result {
Ok(verified_block_status) => match verified_block_status {
VerifiedBlockStatus::FirstSeenAndVerified
| VerifiedBlockStatus::FirstSeenAndVerified => {
match broadcast_compact_block_tx.send((block, peer)) {
Err(_) => {
error!(
"send block to broadcast_compact_block_tx failed, this shouldn't happen",
);
}
_ => {}
}
}
_ => {}
},
Expand Down
6 changes: 4 additions & 2 deletions sync/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use crate::orphan_block_pool::OrphanBlockPool;
use crate::utils::is_internal_db_error;
use crate::{Status, StatusCode, FAST_INDEX, LOW_INDEX, NORMAL_INDEX, TIME_TRACE_SIZE};
use ckb_app_config::SyncConfig;
use ckb_chain::chain::{ChainController, LonelyBlock, VerifyCallback};
use ckb_chain::chain::{
ChainController, LonelyBlock, VerifiedBlockStatus, VerifyCallback, VerifyResult,
};
use ckb_chain_spec::consensus::Consensus;
use ckb_channel::Receiver;
use ckb_constant::sync::{
Expand Down Expand Up @@ -1082,7 +1084,7 @@ impl SyncShared {
chain: &ChainController,
block: Arc<core::BlockView>,
peer_id: PeerIndex,
verify_success_callback: impl FnOnce(Result<(), ckb_error::Error>) + Send + Sync + 'static,
verify_success_callback: impl FnOnce(VerifyResult) + Send + Sync + 'static,
) {
self.accept_block(
chain,
Expand Down

0 comments on commit 571d3f2

Please sign in to comment.