Skip to content

Commit

Permalink
moves shred deduper inside the thread-pool
Browse files Browse the repository at this point in the history
Historically shred deduping was done outside the thread-pool because the
deduper required a mutable refrences and/or locking.
Current deduper uses atomic operations and can be used inside the
thread-pool without creating contention.
  • Loading branch information
behzadnouri committed Dec 19, 2024
1 parent d57460b commit 361e4b2
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 43 deletions.
3 changes: 3 additions & 0 deletions ledger/src/shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,14 +259,17 @@ impl<'a> AsRef<[u8]> for SignedData<'a> {
pub struct ShredId(Slot, /*shred index:*/ u32, ShredType);

impl ShredId {
#[inline]
pub(crate) fn new(slot: Slot, index: u32, shred_type: ShredType) -> ShredId {
ShredId(slot, index, shred_type)
}

#[inline]
pub fn slot(&self) -> Slot {
self.0
}

#[inline]
pub(crate) fn unpack(&self) -> (Slot, /*shred index:*/ u32, ShredType) {
(self.0, self.1, self.2)
}
Expand Down
4 changes: 4 additions & 0 deletions turbine/src/cluster_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ const MAX_NUM_NODES_PER_IP_ADDRESS: usize = 10;

#[derive(Debug, Error)]
pub enum Error {
#[error("Duplicate shred")]
DuplicateShred,
#[error("Invalid shred slot")]
InvalidShredSlot,
#[error("Loopback from slot leader: {leader}, shred: {shred:?}")]
Loopback { leader: Pubkey, shred: ShredId },
}
Expand Down
92 changes: 49 additions & 43 deletions turbine/src/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use {
crate::cluster_nodes::{self, ClusterNodes, ClusterNodesCache, Error, MAX_NUM_TURBINE_HOPS},
bytes::Bytes,
crossbeam_channel::{Receiver, RecvTimeoutError},
itertools::{izip, Itertools},
lru::LruCache,
rand::Rng,
rayon::{prelude::*, ThreadPool, ThreadPoolBuilder},
Expand Down Expand Up @@ -33,8 +32,7 @@ use {
},
static_assertions::const_assert_eq,
std::{
collections::HashMap,
iter::repeat,
collections::{HashMap, HashSet},
net::{SocketAddr, UdpSocket},
ops::AddAssign,
sync::{
Expand Down Expand Up @@ -74,7 +72,7 @@ struct RetransmitStats {
num_addrs_failed: AtomicUsize,
num_loopback_errs: AtomicUsize,
num_shreds: usize,
num_shreds_skipped: usize,
num_shreds_skipped: AtomicUsize,
num_small_batches: usize,
total_batches: usize,
total_time: u64,
Expand Down Expand Up @@ -112,7 +110,11 @@ impl RetransmitStats {
("num_addrs_failed", *self.num_addrs_failed.get_mut(), i64),
("num_loopback_errs", *self.num_loopback_errs.get_mut(), i64),
("num_shreds", self.num_shreds, i64),
("num_shreds_skipped", self.num_shreds_skipped, i64),
(
"num_shreds_skipped",
*self.num_shreds_skipped.get_mut(),
i64
),
("retransmit_total", *self.retransmit_total.get_mut(), i64),
(
"compute_turbine",
Expand All @@ -132,9 +134,11 @@ impl RetransmitStats {

fn record_error(&self, err: &Error) {
match err {
// Duplicate shrds and invalid shred slots are already handled.
Error::DuplicateShred | Error::InvalidShredSlot => (),
Error::Loopback { .. } => {
error!("retransmit_shred: {err}");
self.num_loopback_errs.fetch_add(1, Ordering::Relaxed)
self.num_loopback_errs.fetch_add(1, Ordering::Relaxed);
}
};
}
Expand Down Expand Up @@ -165,6 +169,8 @@ impl<const K: usize> ShredDeduper<K> {
.maybe_reset(rng, false_positive_rate, reset_cycle);
}

// Returns true if the shred is duplicate and should be discarded.
#[must_use]
fn dedup(&self, key: ShredId, shred: &[u8], max_duplicate_count: usize) -> bool {
// In order to detect duplicate blocks across cluster, we retransmit
// max_duplicate_count different shreds for each ShredId.
Expand Down Expand Up @@ -213,20 +219,12 @@ fn retransmit(
epoch_cache_update.stop();
stats.epoch_cache_update += epoch_cache_update.as_us();
// Lookup slot leader and cluster nodes for each slot.
let shreds: Vec<_> = shreds
.into_iter()
.filter_map(|shred| {
let key = shred::layout::get_shred_id(&shred)?;
if shred_deduper.dedup(key, &shred, MAX_DUPLICATE_COUNT) {
stats.num_shreds_skipped += 1;
None
} else {
Some((key, shred))
}
})
.into_group_map_by(|(key, _shred)| key.slot())
let cache: HashMap<Slot, (/*leader:*/ Pubkey, Arc<ClusterNodes<RetransmitStage>>)> = shreds
.iter()
.filter_map(|shred| shred::layout::get_slot(shred))
.collect::<HashSet<Slot>>()
.into_iter()
.filter_map(|(slot, shreds)| {
.filter_map(|slot: Slot| {
max_slots.retransmit.fetch_max(slot, Ordering::Relaxed);
// TODO: consider using root-bank here for leader lookup!
// Shreds' signatures should be verified before they reach here,
Expand All @@ -240,10 +238,10 @@ fn retransmit(
};
let cluster_nodes =
cluster_nodes_cache.get(slot, &root_bank, &working_bank, cluster_info);
Some(izip!(shreds, repeat(slot_leader), repeat(cluster_nodes)))
Some((slot, (slot_leader, cluster_nodes)))
})
.flatten()
.collect();

let socket_addr_space = cluster_info.socket_addr_space();
let record = |mut stats: HashMap<Slot, RetransmitSlotStats>,
(slot, root_distance, num_nodes)| {
Expand All @@ -257,13 +255,12 @@ fn retransmit(
shreds
.into_iter()
.enumerate()
.filter_map(|(index, ((key, shred), slot_leader, cluster_nodes))| {
let (root_distance, num_nodes) = retransmit_shred(
&key,
.filter_map(|(index, shred)| {
retransmit_shred(
&shred,
&slot_leader,
&root_bank,
&cluster_nodes,
&shred_deduper,
&cache,
socket_addr_space,
&sockets[index % sockets.len()],
quic_endpoint_sender,
Expand All @@ -272,22 +269,20 @@ fn retransmit(
.inspect_err(|err| {
stats.record_error(err);
})
.ok()?;
Some((key.slot(), root_distance, num_nodes))
.ok()
})
.fold(HashMap::new(), record)
} else {
thread_pool.install(|| {
shreds
.into_par_iter()
.filter_map(|((key, shred), slot_leader, cluster_nodes)| {
.filter_map(|shred| {
let index = thread_pool.current_thread_index().unwrap();
let (root_distance, num_nodes) = retransmit_shred(
&key,
retransmit_shred(
&shred,
&slot_leader,
&root_bank,
&cluster_nodes,
&shred_deduper,
&cache,
socket_addr_space,
&sockets[index % sockets.len()],
quic_endpoint_sender,
Expand All @@ -296,8 +291,7 @@ fn retransmit(
.inspect_err(|err| {
stats.record_error(err);
})
.ok()?;
Some((key.slot(), root_distance, num_nodes))
.ok()
})
.fold(HashMap::new, record)
.reduce(HashMap::new, RetransmitSlotStats::merge)
Expand All @@ -316,21 +310,33 @@ fn retransmit(
}

fn retransmit_shred(
key: &ShredId,
shred: &[u8],
slot_leader: &Pubkey,
root_bank: &Bank,
cluster_nodes: &ClusterNodes<RetransmitStage>,
shred_deduper: &ShredDeduper<2>,
cache: &HashMap<Slot, (/*leader:*/ Pubkey, Arc<ClusterNodes<RetransmitStage>>)>,
socket_addr_space: &SocketAddrSpace,
socket: &UdpSocket,
quic_endpoint_sender: &AsyncSender<(SocketAddr, Bytes)>,
stats: &RetransmitStats,
) -> Result<(/*root_distance:*/ usize, /*num_nodes:*/ usize), Error> {
) -> Result<
(
Slot, // shred slot
usize, // This node's distance from the turbine root.
usize, // num_nodes the shred is retransmitted to.
),
Error,
> {
let mut compute_turbine_peers = Measure::start("turbine_start");
let key = shred::layout::get_shred_id(&shred).ok_or(Error::InvalidShredSlot)?;
let (slot_leader, cluster_nodes) = cache.get(&key.slot()).ok_or(Error::InvalidShredSlot)?;
if shred_deduper.dedup(key, &shred, MAX_DUPLICATE_COUNT) {
stats.num_shreds_skipped.fetch_add(1, Ordering::Relaxed);
return Err(Error::DuplicateShred);
}
let data_plane_fanout = cluster_nodes::get_data_plane_fanout(key.slot(), root_bank);
let (root_distance, addrs) = cluster_nodes.get_retransmit_addrs(
slot_leader,
key,
&key,
data_plane_fanout,
socket_addr_space,
)?;
Expand All @@ -341,7 +347,7 @@ fn retransmit_shred(

let mut retransmit_time = Measure::start("retransmit_to");
let num_addrs = addrs.len();
let num_nodes = match cluster_nodes::get_broadcast_protocol(key) {
let num_nodes = match cluster_nodes::get_broadcast_protocol(&key) {
Protocol::QUIC => {
let shred = Bytes::copy_from_slice(shred);
addrs
Expand Down Expand Up @@ -370,7 +376,7 @@ fn retransmit_shred(
stats
.retransmit_total
.fetch_add(retransmit_time.as_us(), Ordering::Relaxed);
Ok((root_distance, num_nodes))
Ok((key.slot(), root_distance, num_nodes))
}

/// Service to retransmit messages from the leader or layer 1 to relevant peer nodes.
Expand Down Expand Up @@ -501,7 +507,7 @@ impl RetransmitStats {
num_addrs_failed: AtomicUsize::default(),
num_loopback_errs: AtomicUsize::default(),
num_shreds: 0usize,
num_shreds_skipped: 0usize,
num_shreds_skipped: AtomicUsize::default(),
total_batches: 0usize,
num_small_batches: 0usize,
total_time: 0u64,
Expand Down

0 comments on commit 361e4b2

Please sign in to comment.