Skip to content

Commit

Permalink
moves shred deduper inside the thread-pool
Browse files Browse the repository at this point in the history
Historically shred deduping was done outside the thread-pool because the
deduper required a mutable reference and/or locking.
Current deduper uses atomic operations and can be used inside the
thread-pool without creating contention.
  • Loading branch information
behzadnouri committed Jan 3, 2025
1 parent 062f86e commit a9ff2c8
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 69 deletions.
3 changes: 3 additions & 0 deletions ledger/src/shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,14 +259,17 @@ impl<'a> AsRef<[u8]> for SignedData<'a> {
pub struct ShredId(Slot, /*shred index:*/ u32, ShredType);

impl ShredId {
#[inline]
pub(crate) fn new(slot: Slot, index: u32, shred_type: ShredType) -> ShredId {
ShredId(slot, index, shred_type)
}

#[inline]
pub fn slot(&self) -> Slot {
self.0
}

#[inline]
pub(crate) fn unpack(&self) -> (Slot, /*shred index:*/ u32, ShredType) {
(self.0, self.1, self.2)
}
Expand Down
120 changes: 51 additions & 69 deletions turbine/src/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use {
crate::cluster_nodes::{self, ClusterNodes, ClusterNodesCache, Error, MAX_NUM_TURBINE_HOPS},
bytes::Bytes,
crossbeam_channel::{Receiver, RecvTimeoutError},
itertools::{izip, Itertools},
lru::LruCache,
rand::Rng,
rayon::{prelude::*, ThreadPool, ThreadPoolBuilder},
Expand Down Expand Up @@ -33,8 +32,7 @@ use {
},
static_assertions::const_assert_eq,
std::{
collections::HashMap,
iter::repeat,
collections::{HashMap, HashSet},
net::{SocketAddr, UdpSocket},
ops::AddAssign,
sync::{
Expand Down Expand Up @@ -74,7 +72,7 @@ struct RetransmitStats {
num_addrs_failed: AtomicUsize,
num_loopback_errs: AtomicUsize,
num_shreds: usize,
num_shreds_skipped: usize,
num_shreds_skipped: AtomicUsize,
num_small_batches: usize,
total_batches: usize,
total_time: u64,
Expand Down Expand Up @@ -112,7 +110,11 @@ impl RetransmitStats {
("num_addrs_failed", *self.num_addrs_failed.get_mut(), i64),
("num_loopback_errs", *self.num_loopback_errs.get_mut(), i64),
("num_shreds", self.num_shreds, i64),
("num_shreds_skipped", self.num_shreds_skipped, i64),
(
"num_shreds_skipped",
*self.num_shreds_skipped.get_mut(),
i64
),
("retransmit_total", *self.retransmit_total.get_mut(), i64),
(
"compute_turbine",
Expand All @@ -129,15 +131,6 @@ impl RetransmitStats {
let old = std::mem::replace(self, Self::new(Instant::now()));
self.slot_stats = old.slot_stats;
}

fn record_error(&self, err: &Error) {
match err {
Error::Loopback { .. } => {
error!("retransmit_shred: {err}");
self.num_loopback_errs.fetch_add(1, Ordering::Relaxed)
}
};
}
}

struct ShredDeduper<const K: usize> {
Expand Down Expand Up @@ -225,20 +218,12 @@ fn retransmit(
epoch_cache_update.stop();
stats.epoch_cache_update += epoch_cache_update.as_us();
// Lookup slot leader and cluster nodes for each slot.
let shreds: Vec<_> = shreds
let cache: HashMap<Slot, _> = shreds
.iter()
.filter_map(|shred| shred::layout::get_slot(shred))
.collect::<HashSet<Slot>>()
.into_iter()
.filter_map(|shred| {
let key = shred::layout::get_shred_id(&shred)?;
if shred_deduper.dedup(key, &shred, MAX_DUPLICATE_COUNT) {
stats.num_shreds_skipped += 1;
None
} else {
Some((key, shred))
}
})
.into_group_map_by(|(key, _shred)| key.slot())
.into_iter()
.filter_map(|(slot, shreds)| {
.filter_map(|slot: Slot| {
max_slots.retransmit.fetch_max(slot, Ordering::Relaxed);
// TODO: consider using root-bank here for leader lookup!
// Shreds' signatures should be verified before they reach here,
Expand All @@ -252,9 +237,8 @@ fn retransmit(
};
let cluster_nodes =
cluster_nodes_cache.get(slot, &root_bank, &working_bank, cluster_info);
Some(izip!(shreds, repeat(slot_leader), repeat(cluster_nodes)))
Some((slot, (slot_leader, cluster_nodes)))
})
.flatten()
.collect();
let socket_addr_space = cluster_info.socket_addr_space();
let record = |mut stats: HashMap<Slot, RetransmitSlotStats>,
Expand All @@ -269,47 +253,35 @@ fn retransmit(
shreds
.into_iter()
.enumerate()
.filter_map(|(index, ((key, shred), slot_leader, cluster_nodes))| {
let (root_distance, num_nodes) = retransmit_shred(
&key,
&shred,
&slot_leader,
.filter_map(|(index, shred)| {
retransmit_shred(
shred,
&root_bank,
&cluster_nodes,
shred_deduper,
&cache,
socket_addr_space,
&sockets[index % sockets.len()],
quic_endpoint_sender,
stats,
)
.inspect_err(|err| {
stats.record_error(err);
})
.ok()?;
Some((key.slot(), root_distance, num_nodes))
})
.fold(HashMap::new(), record)
} else {
thread_pool.install(|| {
shreds
.into_par_iter()
.filter_map(|((key, shred), slot_leader, cluster_nodes)| {
.filter_map(|shred| {
let index = thread_pool.current_thread_index().unwrap();
let (root_distance, num_nodes) = retransmit_shred(
&key,
&shred,
&slot_leader,
retransmit_shred(
shred,
&root_bank,
&cluster_nodes,
shred_deduper,
&cache,
socket_addr_space,
&sockets[index % sockets.len()],
quic_endpoint_sender,
stats,
)
.inspect_err(|err| {
stats.record_error(err);
})
.ok()?;
Some((key.slot(), root_distance, num_nodes))
})
.fold(HashMap::new, record)
.reduce(HashMap::new, RetransmitSlotStats::merge)
Expand All @@ -328,34 +300,46 @@ fn retransmit(
}

fn retransmit_shred(
key: &ShredId,
shred: &[u8],
slot_leader: &Pubkey,
shred: Vec<u8>,
root_bank: &Bank,
cluster_nodes: &ClusterNodes<RetransmitStage>,
shred_deduper: &ShredDeduper<2>,
cache: &HashMap<Slot, (/*leader:*/ Pubkey, Arc<ClusterNodes<RetransmitStage>>)>,
socket_addr_space: &SocketAddrSpace,
socket: &UdpSocket,
quic_endpoint_sender: &AsyncSender<(SocketAddr, Bytes)>,
stats: &RetransmitStats,
) -> Result<(/*root_distance:*/ usize, /*num_nodes:*/ usize), Error> {
) -> Option<(
Slot, // Shred slot.
usize, // This node's distance from the turbine root.
usize, // Number of nodes the shred is retransmitted to.
)> {
let key = shred::layout::get_shred_id(&shred)?;
let (slot_leader, cluster_nodes) = cache.get(&key.slot())?;
if shred_deduper.dedup(key, &shred, MAX_DUPLICATE_COUNT) {
stats.num_shreds_skipped.fetch_add(1, Ordering::Relaxed);
return None;
}
let mut compute_turbine_peers = Measure::start("turbine_start");
let data_plane_fanout = cluster_nodes::get_data_plane_fanout(key.slot(), root_bank);
let (root_distance, addrs) = cluster_nodes.get_retransmit_addrs(
slot_leader,
key,
data_plane_fanout,
socket_addr_space,
)?;
let (root_distance, addrs) = cluster_nodes
.get_retransmit_addrs(slot_leader, &key, data_plane_fanout, socket_addr_space)
.inspect_err(|err| match err {
Error::Loopback { .. } => {
error!("retransmit_shred: {err}");
stats.num_loopback_errs.fetch_add(1, Ordering::Relaxed);
}
})
.ok()?;
compute_turbine_peers.stop();
stats
.compute_turbine_peers_total
.fetch_add(compute_turbine_peers.as_us(), Ordering::Relaxed);

let mut retransmit_time = Measure::start("retransmit_to");
let num_addrs = addrs.len();
let num_nodes = match cluster_nodes::get_broadcast_protocol(key) {
let num_nodes = match cluster_nodes::get_broadcast_protocol(&key) {
Protocol::QUIC => {
let shred = Bytes::copy_from_slice(shred);
let shred = Bytes::from(shred);
addrs
.into_iter()
.filter_map(|addr| quic_endpoint_sender.try_send((addr, shred.clone())).ok())
Expand All @@ -365,9 +349,7 @@ fn retransmit_shred(
Ok(()) => addrs.len(),
Err(SendPktsError::IoError(ioerr, num_failed)) => {
error!(
"retransmit_to multi_target_send error: {:?}, {}/{} packets failed",
ioerr,
num_failed,
"retransmit_to multi_target_send error: {ioerr:?}, {num_failed}/{} packets failed",
addrs.len(),
);
addrs.len() - num_failed
Expand All @@ -382,7 +364,7 @@ fn retransmit_shred(
stats
.retransmit_total
.fetch_add(retransmit_time.as_us(), Ordering::Relaxed);
Ok((root_distance, num_nodes))
Some((key.slot(), root_distance, num_nodes))
}

/// Service to retransmit messages from the leader or layer 1 to relevant peer nodes.
Expand Down Expand Up @@ -513,7 +495,7 @@ impl RetransmitStats {
num_addrs_failed: AtomicUsize::default(),
num_loopback_errs: AtomicUsize::default(),
num_shreds: 0usize,
num_shreds_skipped: 0usize,
num_shreds_skipped: AtomicUsize::default(),
total_batches: 0usize,
num_small_batches: 0usize,
total_time: 0u64,
Expand Down

0 comments on commit a9ff2c8

Please sign in to comment.