Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
eval-exec committed Apr 23, 2023
1 parent 74f1842 commit 4311267
Show file tree
Hide file tree
Showing 43 changed files with 1,182 additions and 322 deletions.
6 changes: 5 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

126 changes: 79 additions & 47 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
//! CKB chain service.
#![allow(missing_docs)]

use crate::orphan_block_pool::OrphanBlockPool;
use ckb_channel::{self as channel, select, Sender};
use ckb_channel::{self as channel, select, Receiver, Sender};
use ckb_error::{Error, InternalErrorKind};
use ckb_logger::Level::Trace;
use ckb_logger::{
Expand All @@ -12,7 +11,7 @@ use ckb_merkle_mountain_range::leaf_index_to_mmr_size;
use ckb_proposal_table::ProposalTable;
#[cfg(debug_assertions)]
use ckb_rust_unstable_port::IsSorted;
use ckb_shared::shared::Shared;
use ckb_shared::{shared::Shared, OrphanBlockPool};
use ckb_stop_handler::{SignalSender, StopHandler};
use ckb_store::{attach_block_cell, detach_block_cell, ChainStore, StoreTransaction};
use ckb_systemtime::unix_time_as_millis;
Expand Down Expand Up @@ -210,25 +209,24 @@ pub struct ChainService {
proposal_table: Arc<Mutex<ProposalTable>>,

unverified_queue: Arc<crossbeam::queue::SegQueue<(Byte32, Switch)>>,
orphan_blocks_broker: Arc<OrphanBlockPool>,
}

const ORPHAN_BLOCK_SIZE: usize = 100000;

impl ChainService {
/// Create a new ChainService instance with shared and initial proposal_table.
pub fn new(shared: Shared, proposal_table: ProposalTable) -> ChainService {
let unverified_queue = Arc::new(crossbeam::queue::SegQueue::<(Byte32, Switch)>::new());
let orphan_blocks_broker = Arc::new(OrphanBlockPool::with_capacity(ORPHAN_BLOCK_SIZE));

ChainService {
shared,
proposal_table: Arc::new(Mutex::new(proposal_table)),
unverified_queue,
orphan_blocks_broker,
}
}

fn orphan_blocks_broker(&self) -> Arc<OrphanBlockPool> {
self.shared.orphan_block_pool()
}

/// start background single-threaded service with specified thread_name.
pub fn start<S: ToString>(mut self, thread_name: Option<S>) -> ChainController {
let (signal_sender, signal_receiver) = channel::bounded::<()>(SIGNAL_CHANNEL_SIZE);
Expand All @@ -248,38 +246,9 @@ impl ChainService {
let chain_service_clone = self.clone();
move || {
let unverified_consumer_thread = thread::Builder::new()
.name("verify_blocks".into())
.spawn(move || {
loop {
match chain_service_clone.unverified_queue.pop() {
Some((block_hash, switch)) => {
// process this unverified block
match chain_service_clone.verify_block(&block_hash, switch) {
Ok(_) => {}
Err(err) => {
error!("verify block {} failed: {}", block_hash, err);
// TODO punish the peer who give me the bad block

// TODO decrease unverified_tip
let chain_tip= chain_service_clone.shared.store().get_tip_header().expect("tip_header must exist");
chain_service_clone.shared.set_unverified_tip(chain_tip.clone());
info!("set_unverified tip to {}-{}",chain_tip.number(),chain_tip.hash() );
}
}
}
None => {
std::thread::sleep(Duration::from_millis(100));
}
}
select! {
recv(unverified_queue_stop_rx) -> _ => {
if chain_service_clone.unverified_queue.len() == 0{
info!("unverified_queue_consumer got exit signal, exit now");
return;
}
},
default => {},
}
}
chain_service_clone.consume_unverified_blocks(unverified_queue_stop_rx)
})
.expect("start unverified_queue consumer thread should ok");

Expand Down Expand Up @@ -327,6 +296,58 @@ impl ChainService {
ChainController::new(process_block_sender, truncate_sender, stop)
}

fn consume_unverified_blocks(&self, unverified_queue_stop_rx: Receiver<()>) {
loop {
let queue_length = self.unverified_queue.len();
if queue_length > 1 {
debug!(
"there are {} unverified blocks in unverified_queue",
queue_length
);
}
match self.unverified_queue.pop() {
Some((block_hash, switch)) => {
// process this unverified block
match self.verify_block(&block_hash, switch) {
Ok(_) => {
self.shared.remove_block_status(&block_hash);
self.shared.remove_header_view(&block_hash);
}
Err(err) => {
error!("verify block {} failed: {}", block_hash, err);
// TODO punish the peer who give me the bad block

// TODO decrease unverified_tip
let chain_tip = self
.shared
.store()
.get_tip_header()
.expect("tip_header must exist");
self.shared.set_unverified_tip(chain_tip.clone());
info!(
"set_unverified tip to {}-{}",
chain_tip.number(),
chain_tip.hash()
);
}
}
}
None => {
std::thread::sleep(Duration::from_millis(100));
}
}
select! {
recv(unverified_queue_stop_rx) -> _ => {
if self.unverified_queue.len() == 0{
info!("unverified_queue_consumer got exit signal, exit now");
return;
}
},
default => {},
}
}
}

fn make_fork_for_truncate(&self, target: &HeaderView, current_tip: &HeaderView) -> ForkChanges {
let mut fork = ForkChanges::default();
let store = self.shared.store();
Expand Down Expand Up @@ -444,16 +465,23 @@ impl ChainService {
db_txn.insert_block(block.as_ref())?;
db_txn.commit()?;

self.orphan_blocks_broker.insert(block.as_ref().to_owned());
self.orphan_blocks_broker()
.insert(block.as_ref().to_owned());
{
debug!(
"there are {} orphan blocks in orphan_blocks_broker",
self.orphan_blocks_broker().len()
);
}

for leader_hash in self.orphan_blocks_broker.clone_leaders() {
for leader_hash in self.orphan_blocks_broker().clone_leaders() {
if !db_txn.get_block_epoch_index(&leader_hash).is_some() {
debug!("block {}'s block_epoch_index not stored", leader_hash);
continue;
}

for descendant in self
.orphan_blocks_broker
.orphan_blocks_broker()
.remove_blocks_by_parent(&leader_hash)
{
match self.accept_block(descendant.hash()) {
Expand All @@ -468,9 +496,12 @@ impl ChainService {
self.shared.set_unverified_tip(descendant.header());

debug!(
"set unverified_tip to {}-{}",
"set unverified_tip to {}-{}, while unverified_tip - verified_tip = {}",
descendant.number(),
descendant.hash()
descendant.hash(),
descendant
.number()
.saturating_sub(self.shared.snapshot().tip_number())
)
}
}
Expand Down Expand Up @@ -617,10 +648,11 @@ impl ChainService {

if new_best_block {
debug!(
"[verify block] new best block found: {} => {:#x}, difficulty diff = {:#x}",
"[verify block] new best block found: {} => {:#x}, difficulty diff = {:#x}, unverified_tip: {}",
block.header().number(),
block.header().hash(),
&cannon_total_difficulty - &current_total_difficulty
&cannon_total_difficulty - &current_total_difficulty,
self.shared.get_unverified_tip().number(),
);
self.find_fork(&mut fork, current_tip_header.number(), &block, ext);
self.rollback(&fork, &db_txn)?;
Expand Down Expand Up @@ -678,7 +710,7 @@ impl ChainService {
self.shared
.notify_controller()
.notify_new_block(block_ref.clone());
if log_enabled!(ckb_logger::Level::Debug) {
if log_enabled!(ckb_logger::Level::Trace) {
self.print_chain(10);
}
if let Some(metrics) = ckb_metrics::handle() {
Expand Down
1 change: 0 additions & 1 deletion chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,5 @@
//! [`ChainController`]: chain/struct.ChainController.html
pub mod chain;
mod orphan_block_pool;
#[cfg(test)]
mod tests;
1 change: 1 addition & 0 deletions error/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub mod util;
use derive_more::Display;
pub use internal::{InternalError, InternalErrorKind, OtherError, SilentError};
use prelude::*;
pub use util::is_internal_db_error;

/// A wrapper around a dynamic error type.
#[derive(Clone)]
Expand Down
23 changes: 23 additions & 0 deletions error/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Error-related macros
use crate::{Error, ErrorKind, InternalError, InternalErrorKind};
/// Compare two errors.
///
/// NOTE: Used for testing only!
Expand Down Expand Up @@ -192,3 +193,25 @@ macro_rules! def_error_base_on_kind {
def_error_base_on_kind!($error, $error_kind, "/// TODO(doc): @keroro520");
};
}

/// return whether the error's kind is `InternalErrorKind::Database`
///
/// ### Panic
///
/// Panic if the error kind is `InternalErrorKind::DataCorrupted`.
/// If the database is corrupted, panic is better than handle it silently.
pub fn is_internal_db_error(error: &Error) -> bool {
if error.kind() == ErrorKind::Internal {
let error_kind = error
.downcast_ref::<InternalError>()
.expect("error kind checked")
.kind();
if error_kind == InternalErrorKind::DataCorrupted {
panic!("{}", error)
} else {
return error_kind == InternalErrorKind::Database
|| error_kind == InternalErrorKind::System;
}
}
false
}
2 changes: 1 addition & 1 deletion rpc/src/module/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,7 @@ impl NetRpc for NetRpcImpl {
ibd: chain.is_initial_block_download(),
best_known_block_number: best_known.number().into(),
best_known_block_timestamp: best_known.timestamp().into(),
orphan_blocks_count: (state.orphan_pool().len() as u64).into(),
// orphan_blocks_count: (state.orphan_pool().len() as u64).into(),
inflight_blocks_count: (state.read_inflight_blocks().total_inflight_count() as u64)
.into(),
fast_time: fast_time.into(),
Expand Down
5 changes: 5 additions & 0 deletions shared/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ ckb-channel = { path = "../util/channel", version = "= 0.110.0-pre" }
ckb-constant = { path = "../util/constant", version = "= 0.110.0-pre" }
ckb-systemtime = { path = "../util/systemtime", version = "= 0.110.0-pre" }
ckb-util = { path = "../util", version = "= 0.110.0-pre" }
dashmap = "4.0"
bitflags = "1.0"
sled = "0.34.7"
tempfile.workspace = true
tokio = { version = "1", features = ["sync"] }

[dev-dependencies]
ckb-systemtime = { path = "../util/systemtime", version = "= 0.110.0-pre", features = ["enable_faketime"] }
Expand Down
File renamed without changes.
6 changes: 6 additions & 0 deletions shared/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
//! TODO(doc): @quake
// num_cpus is used in proc_macro
mod block_status;
mod orphan_block_pool;
pub mod shared;
mod types;

pub use block_status::BlockStatus;
pub use ckb_snapshot::{Snapshot, SnapshotMgr};
pub use orphan_block_pool::{OrphanBlockPool, ParentHash};
pub use shared::Shared;
pub use types::{HeaderMap, HeaderView};
File renamed without changes.
Loading

0 comments on commit 4311267

Please sign in to comment.