From a9ff2c8c7743e6d6319e0d683cdc5b9210f1d8bd Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Thu, 19 Dec 2024 09:45:25 -0600 Subject: [PATCH 1/2] moves shred deduper inside the thread-pool Historically shred deduping was done outside the thread-pool because the deduper required a mutable reference and/or locking. Current deduper uses atomic operations and can be used inside the thread-pool without creating contention. --- ledger/src/shred.rs | 3 + turbine/src/retransmit_stage.rs | 120 ++++++++++++++------------------ 2 files changed, 54 insertions(+), 69 deletions(-) diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index 7b721c9407483f..135a229d46a6d2 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -259,14 +259,17 @@ impl<'a> AsRef<[u8]> for SignedData<'a> { pub struct ShredId(Slot, /*shred index:*/ u32, ShredType); impl ShredId { + #[inline] pub(crate) fn new(slot: Slot, index: u32, shred_type: ShredType) -> ShredId { ShredId(slot, index, shred_type) } + #[inline] pub fn slot(&self) -> Slot { self.0 } + #[inline] pub(crate) fn unpack(&self) -> (Slot, /*shred index:*/ u32, ShredType) { (self.0, self.1, self.2) } diff --git a/turbine/src/retransmit_stage.rs b/turbine/src/retransmit_stage.rs index 5aa42f6e428c32..e1feb5c47166c4 100644 --- a/turbine/src/retransmit_stage.rs +++ b/turbine/src/retransmit_stage.rs @@ -5,7 +5,6 @@ use { crate::cluster_nodes::{self, ClusterNodes, ClusterNodesCache, Error, MAX_NUM_TURBINE_HOPS}, bytes::Bytes, crossbeam_channel::{Receiver, RecvTimeoutError}, - itertools::{izip, Itertools}, lru::LruCache, rand::Rng, rayon::{prelude::*, ThreadPool, ThreadPoolBuilder}, @@ -33,8 +32,7 @@ use { }, static_assertions::const_assert_eq, std::{ - collections::HashMap, - iter::repeat, + collections::{HashMap, HashSet}, net::{SocketAddr, UdpSocket}, ops::AddAssign, sync::{ @@ -74,7 +72,7 @@ struct RetransmitStats { num_addrs_failed: AtomicUsize, num_loopback_errs: AtomicUsize, num_shreds: usize, - num_shreds_skipped: usize, + num_shreds_skipped: AtomicUsize, num_small_batches: usize, total_batches: usize, total_time: u64, @@ -112,7 +110,11 @@ impl RetransmitStats { ("num_addrs_failed", *self.num_addrs_failed.get_mut(), i64), ("num_loopback_errs", *self.num_loopback_errs.get_mut(), i64), ("num_shreds", self.num_shreds, i64), - ("num_shreds_skipped", self.num_shreds_skipped, i64), + ( + "num_shreds_skipped", + *self.num_shreds_skipped.get_mut(), + i64 + ), ("retransmit_total", *self.retransmit_total.get_mut(), i64), ( "compute_turbine", @@ -129,15 +131,6 @@ impl RetransmitStats { let old = std::mem::replace(self, Self::new(Instant::now())); self.slot_stats = old.slot_stats; } - - fn record_error(&self, err: &Error) { - match err { - Error::Loopback { .. } => { - error!("retransmit_shred: {err}"); - self.num_loopback_errs.fetch_add(1, Ordering::Relaxed) - } - }; - } } struct ShredDeduper { @@ -225,20 +218,12 @@ fn retransmit( epoch_cache_update.stop(); stats.epoch_cache_update += epoch_cache_update.as_us(); // Lookup slot leader and cluster nodes for each slot. - let shreds: Vec<_> = shreds + let cache: HashMap = shreds + .iter() + .filter_map(|shred| shred::layout::get_slot(shred)) + .collect::>() .into_iter() - .filter_map(|shred| { - let key = shred::layout::get_shred_id(&shred)?; - if shred_deduper.dedup(key, &shred, MAX_DUPLICATE_COUNT) { - stats.num_shreds_skipped += 1; - None - } else { - Some((key, shred)) - } - }) - .into_group_map_by(|(key, _shred)| key.slot()) - .into_iter() - .filter_map(|(slot, shreds)| { + .filter_map(|slot: Slot| { max_slots.retransmit.fetch_max(slot, Ordering::Relaxed); // TODO: consider using root-bank here for leader lookup! // Shreds' signatures should be verified before they reach here, @@ -252,9 +237,8 @@ fn retransmit( }; let cluster_nodes = cluster_nodes_cache.get(slot, &root_bank, &working_bank, cluster_info); - Some(izip!(shreds, repeat(slot_leader), repeat(cluster_nodes))) + Some((slot, (slot_leader, cluster_nodes))) }) - .flatten() .collect(); let socket_addr_space = cluster_info.socket_addr_space(); let record = |mut stats: HashMap, @@ -269,47 +253,35 @@ fn retransmit( shreds .into_iter() .enumerate() - .filter_map(|(index, ((key, shred), slot_leader, cluster_nodes))| { - let (root_distance, num_nodes) = retransmit_shred( - &key, - &shred, - &slot_leader, + .filter_map(|(index, shred)| { + retransmit_shred( + shred, &root_bank, - &cluster_nodes, + shred_deduper, + &cache, socket_addr_space, &sockets[index % sockets.len()], quic_endpoint_sender, stats, ) - .inspect_err(|err| { - stats.record_error(err); - }) - .ok()?; - Some((key.slot(), root_distance, num_nodes)) }) .fold(HashMap::new(), record) } else { thread_pool.install(|| { shreds .into_par_iter() - .filter_map(|((key, shred), slot_leader, cluster_nodes)| { + .filter_map(|shred| { let index = thread_pool.current_thread_index().unwrap(); - let (root_distance, num_nodes) = retransmit_shred( - &key, - &shred, - &slot_leader, + retransmit_shred( + shred, &root_bank, - &cluster_nodes, + shred_deduper, + &cache, socket_addr_space, &sockets[index % sockets.len()], quic_endpoint_sender, stats, ) - .inspect_err(|err| { - stats.record_error(err); - }) - .ok()?; - Some((key.slot(), root_distance, num_nodes)) }) .fold(HashMap::new, record) .reduce(HashMap::new, RetransmitSlotStats::merge) @@ -328,24 +300,36 @@ fn retransmit( } fn retransmit_shred( - key: &ShredId, - shred: &[u8], - slot_leader: &Pubkey, + shred: Vec, root_bank: &Bank, - cluster_nodes: &ClusterNodes, + shred_deduper: &ShredDeduper<2>, + cache: &HashMap>)>, socket_addr_space: &SocketAddrSpace, socket: &UdpSocket, quic_endpoint_sender: &AsyncSender<(SocketAddr, Bytes)>, stats: &RetransmitStats, -) -> Result<(/*root_distance:*/ usize, /*num_nodes:*/ usize), Error> { +) -> Option<( + Slot, // Shred slot. + usize, // This node's distance from the turbine root. + usize, // Number of nodes the shred is retransmitted to. +)> { + let key = shred::layout::get_shred_id(&shred)?; + let (slot_leader, cluster_nodes) = cache.get(&key.slot())?; + if shred_deduper.dedup(key, &shred, MAX_DUPLICATE_COUNT) { + stats.num_shreds_skipped.fetch_add(1, Ordering::Relaxed); + return None; + } let mut compute_turbine_peers = Measure::start("turbine_start"); let data_plane_fanout = cluster_nodes::get_data_plane_fanout(key.slot(), root_bank); - let (root_distance, addrs) = cluster_nodes.get_retransmit_addrs( - slot_leader, - key, - data_plane_fanout, - socket_addr_space, - )?; + let (root_distance, addrs) = cluster_nodes + .get_retransmit_addrs(slot_leader, &key, data_plane_fanout, socket_addr_space) + .inspect_err(|err| match err { + Error::Loopback { .. } => { + error!("retransmit_shred: {err}"); + stats.num_loopback_errs.fetch_add(1, Ordering::Relaxed); + } + }) + .ok()?; compute_turbine_peers.stop(); stats .compute_turbine_peers_total @@ -353,9 +337,9 @@ fn retransmit_shred( let mut retransmit_time = Measure::start("retransmit_to"); let num_addrs = addrs.len(); - let num_nodes = match cluster_nodes::get_broadcast_protocol(key) { + let num_nodes = match cluster_nodes::get_broadcast_protocol(&key) { Protocol::QUIC => { - let shred = Bytes::copy_from_slice(shred); + let shred = Bytes::from(shred); addrs .into_iter() .filter_map(|addr| quic_endpoint_sender.try_send((addr, shred.clone())).ok()) @@ -365,9 +349,7 @@ fn retransmit_shred( Ok(()) => addrs.len(), Err(SendPktsError::IoError(ioerr, num_failed)) => { error!( - "retransmit_to multi_target_send error: {:?}, {}/{} packets failed", - ioerr, - num_failed, + "retransmit_to multi_target_send error: {ioerr:?}, {num_failed}/{} packets failed", addrs.len(), ); addrs.len() - num_failed @@ -382,7 +364,7 @@ fn retransmit_shred( stats .retransmit_total .fetch_add(retransmit_time.as_us(), Ordering::Relaxed); - Ok((root_distance, num_nodes)) + Some((key.slot(), root_distance, num_nodes)) } /// Service to retransmit messages from the leader or layer 1 to relevant peer nodes. @@ -513,7 +495,7 @@ impl RetransmitStats { num_addrs_failed: AtomicUsize::default(), num_loopback_errs: AtomicUsize::default(), num_shreds: 0usize, - num_shreds_skipped: 0usize, + num_shreds_skipped: AtomicUsize::default(), total_batches: 0usize, num_small_batches: 0usize, total_time: 0u64, From 47c84f744c8843697105c23dbdd267da662bc23c Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Fri, 3 Jan 2025 14:44:50 -0600 Subject: [PATCH 2/2] adds default value for number of hashers in ShredDeduper --- turbine/src/retransmit_stage.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/turbine/src/retransmit_stage.rs b/turbine/src/retransmit_stage.rs index e1feb5c47166c4..88cd3730e9b74f 100644 --- a/turbine/src/retransmit_stage.rs +++ b/turbine/src/retransmit_stage.rs @@ -133,7 +133,7 @@ impl RetransmitStats { } } -struct ShredDeduper { +struct ShredDeduper { deduper: Deduper, shred_id_filter: Deduper, } @@ -189,7 +189,7 @@ fn retransmit( quic_endpoint_sender: &AsyncSender<(SocketAddr, Bytes)>, stats: &mut RetransmitStats, cluster_nodes_cache: &ClusterNodesCache, - shred_deduper: &mut ShredDeduper<2>, + shred_deduper: &mut ShredDeduper, max_slots: &MaxSlots, rpc_subscriptions: Option<&RpcSubscriptions>, slot_status_notifier: Option<&SlotStatusNotifier>, @@ -302,7 +302,7 @@ fn retransmit( fn retransmit_shred( shred: Vec, root_bank: &Bank, - shred_deduper: &ShredDeduper<2>, + shred_deduper: &ShredDeduper, cache: &HashMap>)>, socket_addr_space: &SocketAddrSpace, socket: &UdpSocket, @@ -391,7 +391,7 @@ pub fn retransmitter( CLUSTER_NODES_CACHE_TTL, ); let mut rng = rand::thread_rng(); - let mut shred_deduper = ShredDeduper::<2>::new(&mut rng, DEDUPER_NUM_BITS); + let mut shred_deduper = ShredDeduper::new(&mut rng, DEDUPER_NUM_BITS); let mut stats = RetransmitStats::new(Instant::now()); #[allow(clippy::manual_clamp)] let num_threads = get_thread_count().min(8).max(sockets.len());