Skip to content

Commit

Permalink
debug orphan block broker
Browse files Browse the repository at this point in the history
  • Loading branch information
eval-exec committed Apr 24, 2023
1 parent eff0b47 commit 21db95f
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 27 deletions.
39 changes: 28 additions & 11 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! CKB chain service.
#![allow(missing_docs)]

use crate::orphan_block_pool::OrphanBlockPool;
use ckb_channel::{self as channel, select, Receiver, Sender};
use ckb_error::{Error, InternalErrorKind};
use ckb_logger::Level::Trace;
Expand All @@ -11,7 +12,7 @@ use ckb_merkle_mountain_range::leaf_index_to_mmr_size;
use ckb_proposal_table::ProposalTable;
#[cfg(debug_assertions)]
use ckb_rust_unstable_port::IsSorted;
use ckb_shared::{shared::Shared, OrphanBlockPool};
use ckb_shared::shared::Shared;
use ckb_stop_handler::{SignalSender, StopHandler};
use ckb_store::{attach_block_cell, detach_block_cell, ChainStore, StoreTransaction};
use ckb_systemtime::unix_time_as_millis;
Expand Down Expand Up @@ -39,6 +40,8 @@ use std::sync::Arc;
use std::time::Duration;
use std::{cmp, thread};

const ORPHAN_BLOCK_SIZE: usize = 100000;

type ProcessBlockRequest = Request<(Arc<BlockView>, Switch), Result<bool, Error>>;
type TruncateRequest = Request<Byte32, Result<(), Error>>;

Expand All @@ -54,6 +57,8 @@ pub struct ChainController {
truncate_sender: Sender<TruncateRequest>,
// Used for testing only
stop: Option<StopHandler<()>>,

orphan_block_broker: Arc<OrphanBlockPool>,
}

impl Drop for ChainController {
Expand All @@ -68,11 +73,13 @@ impl ChainController {
process_block_sender: Sender<ProcessBlockRequest>,
truncate_sender: Sender<TruncateRequest>,
stop: StopHandler<()>,
orphan_block_broker: Arc<OrphanBlockPool>,
) -> Self {
ChainController {
process_block_sender,
truncate_sender,
stop: Some(stop),
orphan_block_broker,
}
}
/// Inserts the block into database.
Expand Down Expand Up @@ -125,8 +132,14 @@ impl ChainController {
stop: None,
truncate_sender: self.truncate_sender.clone(),
process_block_sender: self.process_block_sender.clone(),
orphan_block_broker: self.orphan_block_broker.clone(),
}
}

// Relay need this
pub fn get_orphan_block(&self, hash: &Byte32) -> Option<BlockView> {
todo!("load orphan block")
}
}

/// The struct represent fork
Expand Down Expand Up @@ -208,6 +221,8 @@ pub struct ChainService {
shared: Shared,
proposal_table: Arc<Mutex<ProposalTable>>,

orphan_blocks_broker: Arc<OrphanBlockPool>,

unverified_queue: Arc<crossbeam::queue::SegQueue<(Byte32, Switch)>>,
}

Expand All @@ -220,13 +235,10 @@ impl ChainService {
shared,
proposal_table: Arc::new(Mutex::new(proposal_table)),
unverified_queue,
orphan_blocks_broker: Arc::new(OrphanBlockPool::with_capacity(ORPHAN_BLOCK_SIZE)),
}
}

fn orphan_blocks_broker(&self) -> Arc<OrphanBlockPool> {
self.shared.orphan_block_pool()
}

/// start background single-threaded service with specified thread_name.
pub fn start<S: ToString>(mut self, thread_name: Option<S>) -> ChainController {
let (signal_sender, signal_receiver) = channel::bounded::<()>(SIGNAL_CHANNEL_SIZE);
Expand All @@ -240,6 +252,7 @@ impl ChainService {
}
let tx_control = self.shared.tx_pool_controller().clone();
let (unverified_queue_stop_tx, unverified_queue_stop_rx) = ckb_channel::bounded::<()>(1);
let orphan_blocks_broker_clone = Arc::clone(&self.orphan_blocks_broker);

let thread = thread_builder
.spawn({
Expand Down Expand Up @@ -293,7 +306,12 @@ impl ChainService {
"chain".to_string(),
);

ChainController::new(process_block_sender, truncate_sender, stop)
ChainController::new(
process_block_sender,
truncate_sender,
stop,
orphan_blocks_broker_clone,
)
}

fn consume_unverified_blocks(&self, unverified_queue_stop_rx: Receiver<()>) {
Expand Down Expand Up @@ -465,23 +483,22 @@ impl ChainService {
db_txn.insert_block(block.as_ref())?;
db_txn.commit()?;

self.orphan_blocks_broker()
.insert(block.as_ref().to_owned());
self.orphan_blocks_broker.insert(block.as_ref().to_owned());
{
debug!(
"there are {} orphan blocks in orphan_blocks_broker",
self.orphan_blocks_broker().len()
self.orphan_blocks_broker.len()
);
}

for leader_hash in self.orphan_blocks_broker().clone_leaders() {
for leader_hash in self.orphan_blocks_broker.clone_leaders() {
if !db_txn.get_block_epoch_index(&leader_hash).is_some() {
trace!("a orphan leader: {} not stored", leader_hash);
continue;
}

for descendant in self
.orphan_blocks_broker()
.orphan_blocks_broker
.remove_blocks_by_parent(&leader_hash)
{
match self.accept_block(descendant.hash()) {
Expand Down
1 change: 1 addition & 0 deletions chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@
//! [`ChainController`]: chain/struct.ChainController.html
pub mod chain;
mod orphan_block_pool;
#[cfg(test)]
mod tests;
File renamed without changes.
2 changes: 0 additions & 2 deletions shared/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@
// num_cpus is used in proc_macro
mod block_status;
mod orphan_block_pool;
pub mod shared;
mod types;

pub use block_status::BlockStatus;
pub use ckb_snapshot::{Snapshot, SnapshotMgr};
pub use orphan_block_pool::{OrphanBlockPool, ParentHash};
pub use shared::Shared;
pub use types::{HeaderMap, HeaderView};
14 changes: 1 addition & 13 deletions shared/src/shared.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! TODO(doc): @quake
use crate::block_status::BlockStatus;
use crate::types::HeaderMap;
use crate::{OrphanBlockPool, Snapshot, SnapshotMgr};
use crate::{Snapshot, SnapshotMgr};
use arc_swap::Guard;
use ckb_async_runtime::Handle;
use ckb_chain_spec::consensus::Consensus;
Expand Down Expand Up @@ -37,7 +37,6 @@ const FREEZER_INTERVAL: Duration = Duration::from_secs(60);
const THRESHOLD_EPOCH: EpochNumber = 2;
const MAX_FREEZE_LIMIT: BlockNumber = 30_000;
const SHRINK_THRESHOLD: usize = 300;
const ORPHAN_BLOCK_SIZE: usize = 100000;

/// An owned permission to close on a freezer thread
pub struct FreezerClose {
Expand Down Expand Up @@ -67,7 +66,6 @@ pub struct Shared {
pub(crate) header_map: Arc<HeaderMap>,
pub(crate) block_status_map: DashMap<Byte32, BlockStatus>,
pub(crate) unverified_tip: Arc<Mutex<HeaderView>>,
pub(crate) orphan_block_pool: Arc<OrphanBlockPool>,
}

impl Shared {
Expand Down Expand Up @@ -100,7 +98,6 @@ impl Shared {
header_map: Arc::new(header_map),
block_status_map: DashMap::new(),
unverified_tip: Arc::new(Mutex::new(unverified_tip)),
orphan_block_pool: Arc::new(OrphanBlockPool::with_capacity(ORPHAN_BLOCK_SIZE)),
}
}
/// Spawn freeze background thread that periodically checks and moves ancient data from the kv database into the freezer.
Expand Down Expand Up @@ -457,13 +454,4 @@ impl Shared {
pub fn remove_header_view(&self, hash: &Byte32) {
self.header_map().remove(hash);
}

pub fn orphan_block_pool(&self) -> Arc<OrphanBlockPool> {
Arc::clone(&self.orphan_block_pool)
}

pub fn get_orphan_block(&self, block_hash: &Byte32) -> Option<BlockView> {
todo!("// TODO move orphan blocks from ckb_chain to ckb_shared");
self.orphan_block_pool.get_block(block_hash)
}
}
2 changes: 1 addition & 1 deletion sync/src/relayer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ impl Relayer {
}
}
BlockStatus::BLOCK_RECEIVED => {
if let Some(uncle) = self.shared.shared().get_orphan_block(&uncle_hash) {
if let Some(uncle) = self.chain.get_orphan_block(&uncle_hash) {
uncles.push(uncle.as_uncle().data());
} else {
debug_target!(
Expand Down

0 comments on commit 21db95f

Please sign in to comment.