Skip to content

Commit

Permalink
refactor: move broadcast channel init into tree (paradigmxyz#4894)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattsse authored Oct 3, 2023
1 parent b1a7a87 commit d3cc4cc
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 33 deletions.
23 changes: 9 additions & 14 deletions bin/reth/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,20 +281,15 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
Factory::new(self.chain.clone()),
Arc::clone(&self.chain),
);
let tree_config = BlockchainTreeConfig::default();
// The size of the broadcast is twice the maximum reorg depth, because at maximum reorg
// depth at least N blocks must be sent at once.
let (canon_state_notification_sender, _receiver) =
tokio::sync::broadcast::channel(tree_config.max_reorg_depth() as usize * 2);
let blockchain_tree = ShareableBlockchainTree::new(
BlockchainTree::new(
tree_externals,
canon_state_notification_sender.clone(),
tree_config,
prune_config.clone().map(|config| config.parts),
)?
.with_sync_metrics_tx(metrics_tx.clone()),
);
let _tree_config = BlockchainTreeConfig::default();
let tree = BlockchainTree::new(
tree_externals,
BlockchainTreeConfig::default(),
prune_config.clone().map(|config| config.parts),
)?
.with_sync_metrics_tx(metrics_tx.clone());
let canon_state_notification_sender = tree.canon_state_notification_sender();
let blockchain_tree = ShareableBlockchainTree::new(tree);

// fetch the head block from the database
let head = self.lookup_head(Arc::clone(&db)).wrap_err("the head block is missing")?;
Expand Down
2 changes: 1 addition & 1 deletion crates/blockchain-tree/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ reth-stages = { path = "../stages" }
parking_lot.workspace = true
lru = "0.11"
tracing.workspace = true
tokio = { workspace = true, features = ["macros", "sync"] }

# metrics
reth-metrics = { workspace = true, features = ["common"] }
Expand All @@ -41,7 +42,6 @@ reth-primitives = { workspace = true , features = ["test-utils"] }
reth-provider = { workspace = true, features = ["test-utils"] }
parking_lot.workspace = true
assert_matches.workspace = true
tokio = { workspace = true, features = ["macros", "sync"] }

[features]
test-utils = []
37 changes: 22 additions & 15 deletions crates/blockchain-tree/src/blockchain_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,25 +96,18 @@ pub struct BlockchainTree<DB: Database, C: Consensus, EF: ExecutorFactory> {
prune_modes: Option<PruneModes>,
}

/// A container that wraps chains and block indices to allow searching for block hashes across all
/// sidechains.
#[derive(Debug)]
pub struct BlockHashes<'a> {
/// The current tracked chains.
pub chains: &'a mut HashMap<BlockChainId, AppendableChain>,
/// The block indices for all chains.
pub indices: &'a BlockIndices,
}

impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF> {
/// Create a new blockchain tree.
pub fn new(
externals: TreeExternals<DB, C, EF>,
canon_state_notification_sender: CanonStateNotificationSender,
config: BlockchainTreeConfig,
prune_modes: Option<PruneModes>,
) -> RethResult<Self> {
let max_reorg_depth = config.max_reorg_depth();
// The size of the broadcast is twice the maximum reorg depth, because at maximum reorg
// depth at least N blocks must be sent at once.
let (canon_state_notification_sender, _receiver) =
tokio::sync::broadcast::channel(max_reorg_depth as usize * 2);

let last_canonical_hashes = externals
.db
Expand Down Expand Up @@ -1069,11 +1062,16 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>

/// Subscribe to new blocks events.
///
/// Note: Only canonical blocks are send.
/// Note: Only canonical blocks are emitted by the tree.
pub fn subscribe_canon_state(&self) -> CanonStateNotifications {
self.canon_state_notification_sender.subscribe()
}

/// Returns a clone of the sender for the canonical state notifications.
pub fn canon_state_notification_sender(&self) -> CanonStateNotificationSender {
self.canon_state_notification_sender.clone()
}

/// Canonicalize the given chain and commit it to the database.
fn commit_canonical(&self, chain: Chain) -> RethResult<()> {
let provider = DatabaseProvider::new_rw(
Expand Down Expand Up @@ -1169,6 +1167,16 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
}
}

/// A container that wraps chains and block indices to allow searching for block hashes across all
/// sidechains.
#[derive(Debug)]
pub struct BlockHashes<'a> {
/// The current tracked chains.
pub chains: &'a mut HashMap<BlockChainId, AppendableChain>,
/// The block indices for all chains.
pub indices: &'a BlockIndices,
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -1316,10 +1324,9 @@ mod tests {

// make tree
let config = BlockchainTreeConfig::new(1, 2, 3, 2);
let (sender, mut canon_notif) = tokio::sync::broadcast::channel(10);
let mut tree =
BlockchainTree::new(externals, sender, config, None).expect("failed to create tree");
let mut tree = BlockchainTree::new(externals, config, None).expect("failed to create tree");

let mut canon_notif = tree.subscribe_canon_state();
// genesis block 10 is already canonical
tree.make_canonical(&B256::ZERO).unwrap();

Expand Down
4 changes: 1 addition & 3 deletions crates/consensus/beacon/src/engine/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,10 +506,8 @@ where
self.base_config.chain_spec.clone(),
);
let config = BlockchainTreeConfig::new(1, 2, 3, 2);
let (canon_state_notification_sender, _) = tokio::sync::broadcast::channel(3);
let tree = ShareableBlockchainTree::new(
BlockchainTree::new(externals, canon_state_notification_sender, config, None)
.expect("failed to create tree"),
BlockchainTree::new(externals, config, None).expect("failed to create tree"),
);
let shareable_db = ProviderFactory::new(db.clone(), self.base_config.chain_spec.clone());
let latest = self.base_config.chain_spec.genesis_header().seal_slow();
Expand Down

0 comments on commit d3cc4cc

Please sign in to comment.