diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index c9039091813575..57ffe5f93817c6 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -259,14 +259,17 @@ impl<'a> AsRef<[u8]> for SignedData<'a> { pub struct ShredId(Slot, /*shred index:*/ u32, ShredType); impl ShredId { + #[inline] pub(crate) fn new(slot: Slot, index: u32, shred_type: ShredType) -> ShredId { ShredId(slot, index, shred_type) } + #[inline] pub fn slot(&self) -> Slot { self.0 } + #[inline] pub(crate) fn unpack(&self) -> (Slot, /*shred index:*/ u32, ShredType) { (self.0, self.1, self.2) } diff --git a/turbine/src/cluster_nodes.rs b/turbine/src/cluster_nodes.rs index fd5b3e5e6dc454..000eeef25df740 100644 --- a/turbine/src/cluster_nodes.rs +++ b/turbine/src/cluster_nodes.rs @@ -46,6 +46,10 @@ const MAX_NUM_NODES_PER_IP_ADDRESS: usize = 10; #[derive(Debug, Error)] pub enum Error { + #[error("Duplicate shred")] + DuplicateShred, + #[error("Invalid shred slot")] + InvalidShredSlot, #[error("Loopback from slot leader: {leader}, shred: {shred:?}")] Loopback { leader: Pubkey, shred: ShredId }, } diff --git a/turbine/src/retransmit_stage.rs b/turbine/src/retransmit_stage.rs index db9d1f96662a16..175bcc9499cfb2 100644 --- a/turbine/src/retransmit_stage.rs +++ b/turbine/src/retransmit_stage.rs @@ -5,7 +5,6 @@ use { crate::cluster_nodes::{self, ClusterNodes, ClusterNodesCache, Error, MAX_NUM_TURBINE_HOPS}, bytes::Bytes, crossbeam_channel::{Receiver, RecvTimeoutError}, - itertools::{izip, Itertools}, lru::LruCache, rand::Rng, rayon::{prelude::*, ThreadPool, ThreadPoolBuilder}, @@ -33,8 +32,7 @@ use { }, static_assertions::const_assert_eq, std::{ - collections::HashMap, - iter::repeat, + collections::{HashMap, HashSet}, net::{SocketAddr, UdpSocket}, ops::AddAssign, sync::{ @@ -74,7 +72,7 @@ struct RetransmitStats { num_addrs_failed: AtomicUsize, num_loopback_errs: AtomicUsize, num_shreds: usize, - num_shreds_skipped: usize, + num_shreds_skipped: AtomicUsize, num_small_batches: usize, total_batches: usize, total_time: u64, @@ -112,7 +110,11 @@ impl RetransmitStats { ("num_addrs_failed", *self.num_addrs_failed.get_mut(), i64), ("num_loopback_errs", *self.num_loopback_errs.get_mut(), i64), ("num_shreds", self.num_shreds, i64), - ("num_shreds_skipped", self.num_shreds_skipped, i64), + ( + "num_shreds_skipped", + *self.num_shreds_skipped.get_mut(), + i64 + ), ("retransmit_total", *self.retransmit_total.get_mut(), i64), ( "compute_turbine", @@ -132,9 +134,11 @@ impl RetransmitStats { fn record_error(&self, err: &Error) { match err { + // Duplicate shrds and invalid shred slots are already handled. + Error::DuplicateShred | Error::InvalidShredSlot => (), Error::Loopback { .. } => { error!("retransmit_shred: {err}"); - self.num_loopback_errs.fetch_add(1, Ordering::Relaxed) + self.num_loopback_errs.fetch_add(1, Ordering::Relaxed); } }; } @@ -165,6 +169,8 @@ impl ShredDeduper { .maybe_reset(rng, false_positive_rate, reset_cycle); } + // Returns true if the shred is duplicate and should be discarded. + #[must_use] fn dedup(&self, key: ShredId, shred: &[u8], max_duplicate_count: usize) -> bool { // In order to detect duplicate blocks across cluster, we retransmit // max_duplicate_count different shreds for each ShredId. @@ -213,20 +219,12 @@ fn retransmit( epoch_cache_update.stop(); stats.epoch_cache_update += epoch_cache_update.as_us(); // Lookup slot leader and cluster nodes for each slot. - let shreds: Vec<_> = shreds - .into_iter() - .filter_map(|shred| { - let key = shred::layout::get_shred_id(&shred)?; - if shred_deduper.dedup(key, &shred, MAX_DUPLICATE_COUNT) { - stats.num_shreds_skipped += 1; - None - } else { - Some((key, shred)) - } - }) - .into_group_map_by(|(key, _shred)| key.slot()) + let cache: HashMap>)> = shreds + .iter() + .filter_map(|shred| shred::layout::get_slot(shred)) + .collect::>() .into_iter() - .filter_map(|(slot, shreds)| { + .filter_map(|slot: Slot| { max_slots.retransmit.fetch_max(slot, Ordering::Relaxed); // TODO: consider using root-bank here for leader lookup! // Shreds' signatures should be verified before they reach here, @@ -240,10 +238,10 @@ fn retransmit( }; let cluster_nodes = cluster_nodes_cache.get(slot, &root_bank, &working_bank, cluster_info); - Some(izip!(shreds, repeat(slot_leader), repeat(cluster_nodes))) + Some((slot, (slot_leader, cluster_nodes))) }) - .flatten() .collect(); + let socket_addr_space = cluster_info.socket_addr_space(); let record = |mut stats: HashMap, (slot, root_distance, num_nodes)| { @@ -257,13 +255,12 @@ fn retransmit( shreds .into_iter() .enumerate() - .filter_map(|(index, ((key, shred), slot_leader, cluster_nodes))| { - let (root_distance, num_nodes) = retransmit_shred( - &key, + .filter_map(|(index, shred)| { + retransmit_shred( &shred, - &slot_leader, &root_bank, - &cluster_nodes, + &shred_deduper, + &cache, socket_addr_space, &sockets[index % sockets.len()], quic_endpoint_sender, @@ -272,22 +269,20 @@ fn retransmit( .inspect_err(|err| { stats.record_error(err); }) - .ok()?; - Some((key.slot(), root_distance, num_nodes)) + .ok() }) .fold(HashMap::new(), record) } else { thread_pool.install(|| { shreds .into_par_iter() - .filter_map(|((key, shred), slot_leader, cluster_nodes)| { + .filter_map(|shred| { let index = thread_pool.current_thread_index().unwrap(); - let (root_distance, num_nodes) = retransmit_shred( - &key, + retransmit_shred( &shred, - &slot_leader, &root_bank, - &cluster_nodes, + &shred_deduper, + &cache, socket_addr_space, &sockets[index % sockets.len()], quic_endpoint_sender, @@ -296,8 +291,7 @@ fn retransmit( .inspect_err(|err| { stats.record_error(err); }) - .ok()?; - Some((key.slot(), root_distance, num_nodes)) + .ok() }) .fold(HashMap::new, record) .reduce(HashMap::new, RetransmitSlotStats::merge) @@ -316,21 +310,33 @@ fn retransmit( } fn retransmit_shred( - key: &ShredId, shred: &[u8], - slot_leader: &Pubkey, root_bank: &Bank, - cluster_nodes: &ClusterNodes, + shred_deduper: &ShredDeduper<2>, + cache: &HashMap>)>, socket_addr_space: &SocketAddrSpace, socket: &UdpSocket, quic_endpoint_sender: &AsyncSender<(SocketAddr, Bytes)>, stats: &RetransmitStats, -) -> Result<(/*root_distance:*/ usize, /*num_nodes:*/ usize), Error> { +) -> Result< + ( + Slot, // shred slot + usize, // This node's distance from the turbine root. + usize, // num_nodes the shred is retransmitted to. + ), + Error, +> { let mut compute_turbine_peers = Measure::start("turbine_start"); + let key = shred::layout::get_shred_id(&shred).ok_or(Error::InvalidShredSlot)?; + let (slot_leader, cluster_nodes) = cache.get(&key.slot()).ok_or(Error::InvalidShredSlot)?; + if shred_deduper.dedup(key, &shred, MAX_DUPLICATE_COUNT) { + stats.num_shreds_skipped.fetch_add(1, Ordering::Relaxed); + return Err(Error::DuplicateShred); + } let data_plane_fanout = cluster_nodes::get_data_plane_fanout(key.slot(), root_bank); let (root_distance, addrs) = cluster_nodes.get_retransmit_addrs( slot_leader, - key, + &key, data_plane_fanout, socket_addr_space, )?; @@ -341,7 +347,7 @@ fn retransmit_shred( let mut retransmit_time = Measure::start("retransmit_to"); let num_addrs = addrs.len(); - let num_nodes = match cluster_nodes::get_broadcast_protocol(key) { + let num_nodes = match cluster_nodes::get_broadcast_protocol(&key) { Protocol::QUIC => { let shred = Bytes::copy_from_slice(shred); addrs @@ -370,7 +376,7 @@ fn retransmit_shred( stats .retransmit_total .fetch_add(retransmit_time.as_us(), Ordering::Relaxed); - Ok((root_distance, num_nodes)) + Ok((key.slot(), root_distance, num_nodes)) } /// Service to retransmit messages from the leader or layer 1 to relevant peer nodes. @@ -501,7 +507,7 @@ impl RetransmitStats { num_addrs_failed: AtomicUsize::default(), num_loopback_errs: AtomicUsize::default(), num_shreds: 0usize, - num_shreds_skipped: 0usize, + num_shreds_skipped: AtomicUsize::default(), total_batches: 0usize, num_small_batches: 0usize, total_time: 0u64,