Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

moves shred deduper inside the thread-pool #4179

Merged
merged 2 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ledger/src/shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,14 +259,17 @@ impl<'a> AsRef<[u8]> for SignedData<'a> {
pub struct ShredId(Slot, /*shred index:*/ u32, ShredType);

impl ShredId {
#[inline]
pub(crate) fn new(slot: Slot, index: u32, shred_type: ShredType) -> ShredId {
ShredId(slot, index, shred_type)
}

#[inline]
pub fn slot(&self) -> Slot {
self.0
}

#[inline]
pub(crate) fn unpack(&self) -> (Slot, /*shred index:*/ u32, ShredType) {
(self.0, self.1, self.2)
}
Expand Down
126 changes: 54 additions & 72 deletions turbine/src/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use {
crate::cluster_nodes::{self, ClusterNodes, ClusterNodesCache, Error, MAX_NUM_TURBINE_HOPS},
bytes::Bytes,
crossbeam_channel::{Receiver, RecvTimeoutError},
itertools::{izip, Itertools},
lru::LruCache,
rand::Rng,
rayon::{prelude::*, ThreadPool, ThreadPoolBuilder},
Expand Down Expand Up @@ -33,8 +32,7 @@ use {
},
static_assertions::const_assert_eq,
std::{
collections::HashMap,
iter::repeat,
collections::{HashMap, HashSet},
net::{SocketAddr, UdpSocket},
ops::AddAssign,
sync::{
Expand Down Expand Up @@ -74,7 +72,7 @@ struct RetransmitStats {
num_addrs_failed: AtomicUsize,
num_loopback_errs: AtomicUsize,
num_shreds: usize,
num_shreds_skipped: usize,
num_shreds_skipped: AtomicUsize,
num_small_batches: usize,
total_batches: usize,
total_time: u64,
Expand Down Expand Up @@ -112,7 +110,11 @@ impl RetransmitStats {
("num_addrs_failed", *self.num_addrs_failed.get_mut(), i64),
("num_loopback_errs", *self.num_loopback_errs.get_mut(), i64),
("num_shreds", self.num_shreds, i64),
("num_shreds_skipped", self.num_shreds_skipped, i64),
(
"num_shreds_skipped",
*self.num_shreds_skipped.get_mut(),
i64
),
("retransmit_total", *self.retransmit_total.get_mut(), i64),
(
"compute_turbine",
Expand All @@ -129,18 +131,9 @@ impl RetransmitStats {
let old = std::mem::replace(self, Self::new(Instant::now()));
self.slot_stats = old.slot_stats;
}

fn record_error(&self, err: &Error) {
match err {
Error::Loopback { .. } => {
error!("retransmit_shred: {err}");
self.num_loopback_errs.fetch_add(1, Ordering::Relaxed)
}
};
}
}

struct ShredDeduper<const K: usize> {
struct ShredDeduper<const K: usize = 2> {
deduper: Deduper<K, /*shred:*/ [u8]>,
shred_id_filter: Deduper<K, (ShredId, /*0..MAX_DUPLICATE_COUNT:*/ usize)>,
}
Expand Down Expand Up @@ -196,7 +189,7 @@ fn retransmit(
quic_endpoint_sender: &AsyncSender<(SocketAddr, Bytes)>,
stats: &mut RetransmitStats,
cluster_nodes_cache: &ClusterNodesCache<RetransmitStage>,
shred_deduper: &mut ShredDeduper<2>,
shred_deduper: &mut ShredDeduper,
max_slots: &MaxSlots,
rpc_subscriptions: Option<&RpcSubscriptions>,
slot_status_notifier: Option<&SlotStatusNotifier>,
Expand Down Expand Up @@ -225,20 +218,12 @@ fn retransmit(
epoch_cache_update.stop();
stats.epoch_cache_update += epoch_cache_update.as_us();
// Lookup slot leader and cluster nodes for each slot.
let shreds: Vec<_> = shreds
let cache: HashMap<Slot, _> = shreds
.iter()
.filter_map(|shred| shred::layout::get_slot(shred))
.collect::<HashSet<Slot>>()
.into_iter()
.filter_map(|shred| {
let key = shred::layout::get_shred_id(&shred)?;
if shred_deduper.dedup(key, &shred, MAX_DUPLICATE_COUNT) {
stats.num_shreds_skipped += 1;
None
} else {
Some((key, shred))
}
})
.into_group_map_by(|(key, _shred)| key.slot())
.into_iter()
.filter_map(|(slot, shreds)| {
.filter_map(|slot: Slot| {
max_slots.retransmit.fetch_max(slot, Ordering::Relaxed);
// TODO: consider using root-bank here for leader lookup!
// Shreds' signatures should be verified before they reach here,
Expand All @@ -252,9 +237,8 @@ fn retransmit(
};
let cluster_nodes =
cluster_nodes_cache.get(slot, &root_bank, &working_bank, cluster_info);
Some(izip!(shreds, repeat(slot_leader), repeat(cluster_nodes)))
Some((slot, (slot_leader, cluster_nodes)))
})
.flatten()
.collect();
let socket_addr_space = cluster_info.socket_addr_space();
let record = |mut stats: HashMap<Slot, RetransmitSlotStats>,
Expand All @@ -269,47 +253,35 @@ fn retransmit(
shreds
.into_iter()
.enumerate()
.filter_map(|(index, ((key, shred), slot_leader, cluster_nodes))| {
let (root_distance, num_nodes) = retransmit_shred(
&key,
&shred,
&slot_leader,
.filter_map(|(index, shred)| {
retransmit_shred(
shred,
&root_bank,
&cluster_nodes,
shred_deduper,
&cache,
socket_addr_space,
&sockets[index % sockets.len()],
quic_endpoint_sender,
stats,
)
.inspect_err(|err| {
stats.record_error(err);
})
.ok()?;
Some((key.slot(), root_distance, num_nodes))
})
.fold(HashMap::new(), record)
} else {
thread_pool.install(|| {
shreds
.into_par_iter()
.filter_map(|((key, shred), slot_leader, cluster_nodes)| {
.filter_map(|shred| {
let index = thread_pool.current_thread_index().unwrap();
let (root_distance, num_nodes) = retransmit_shred(
&key,
&shred,
&slot_leader,
retransmit_shred(
shred,
&root_bank,
&cluster_nodes,
shred_deduper,
&cache,
socket_addr_space,
&sockets[index % sockets.len()],
quic_endpoint_sender,
stats,
)
.inspect_err(|err| {
stats.record_error(err);
})
.ok()?;
Some((key.slot(), root_distance, num_nodes))
})
.fold(HashMap::new, record)
.reduce(HashMap::new, RetransmitSlotStats::merge)
Expand All @@ -328,34 +300,46 @@ fn retransmit(
}

fn retransmit_shred(
key: &ShredId,
shred: &[u8],
slot_leader: &Pubkey,
shred: Vec<u8>,
root_bank: &Bank,
cluster_nodes: &ClusterNodes<RetransmitStage>,
shred_deduper: &ShredDeduper,
cache: &HashMap<Slot, (/*leader:*/ Pubkey, Arc<ClusterNodes<RetransmitStage>>)>,
socket_addr_space: &SocketAddrSpace,
socket: &UdpSocket,
quic_endpoint_sender: &AsyncSender<(SocketAddr, Bytes)>,
stats: &RetransmitStats,
) -> Result<(/*root_distance:*/ usize, /*num_nodes:*/ usize), Error> {
) -> Option<(
Slot, // Shred slot.
usize, // This node's distance from the turbine root.
usize, // Number of nodes the shred is retransmitted to.
)> {
let key = shred::layout::get_shred_id(&shred)?;
let (slot_leader, cluster_nodes) = cache.get(&key.slot())?;
if shred_deduper.dedup(key, &shred, MAX_DUPLICATE_COUNT) {
stats.num_shreds_skipped.fetch_add(1, Ordering::Relaxed);
return None;
}
let mut compute_turbine_peers = Measure::start("turbine_start");
let data_plane_fanout = cluster_nodes::get_data_plane_fanout(key.slot(), root_bank);
let (root_distance, addrs) = cluster_nodes.get_retransmit_addrs(
slot_leader,
key,
data_plane_fanout,
socket_addr_space,
)?;
let (root_distance, addrs) = cluster_nodes
.get_retransmit_addrs(slot_leader, &key, data_plane_fanout, socket_addr_space)
.inspect_err(|err| match err {
Error::Loopback { .. } => {
error!("retransmit_shred: {err}");
stats.num_loopback_errs.fetch_add(1, Ordering::Relaxed);
}
})
.ok()?;
compute_turbine_peers.stop();
stats
.compute_turbine_peers_total
.fetch_add(compute_turbine_peers.as_us(), Ordering::Relaxed);

let mut retransmit_time = Measure::start("retransmit_to");
let num_addrs = addrs.len();
let num_nodes = match cluster_nodes::get_broadcast_protocol(key) {
let num_nodes = match cluster_nodes::get_broadcast_protocol(&key) {
Protocol::QUIC => {
let shred = Bytes::copy_from_slice(shred);
let shred = Bytes::from(shred);
addrs
.into_iter()
.filter_map(|addr| quic_endpoint_sender.try_send((addr, shred.clone())).ok())
Expand All @@ -365,9 +349,7 @@ fn retransmit_shred(
Ok(()) => addrs.len(),
Err(SendPktsError::IoError(ioerr, num_failed)) => {
error!(
"retransmit_to multi_target_send error: {:?}, {}/{} packets failed",
ioerr,
num_failed,
"retransmit_to multi_target_send error: {ioerr:?}, {num_failed}/{} packets failed",
addrs.len(),
);
addrs.len() - num_failed
Expand All @@ -382,7 +364,7 @@ fn retransmit_shred(
stats
.retransmit_total
.fetch_add(retransmit_time.as_us(), Ordering::Relaxed);
Ok((root_distance, num_nodes))
Some((key.slot(), root_distance, num_nodes))
}

/// Service to retransmit messages from the leader or layer 1 to relevant peer nodes.
Expand All @@ -409,7 +391,7 @@ pub fn retransmitter(
CLUSTER_NODES_CACHE_TTL,
);
let mut rng = rand::thread_rng();
let mut shred_deduper = ShredDeduper::<2>::new(&mut rng, DEDUPER_NUM_BITS);
let mut shred_deduper = ShredDeduper::new(&mut rng, DEDUPER_NUM_BITS);
let mut stats = RetransmitStats::new(Instant::now());
#[allow(clippy::manual_clamp)]
let num_threads = get_thread_count().min(8).max(sockets.len());
Expand Down Expand Up @@ -513,7 +495,7 @@ impl RetransmitStats {
num_addrs_failed: AtomicUsize::default(),
num_loopback_errs: AtomicUsize::default(),
num_shreds: 0usize,
num_shreds_skipped: 0usize,
num_shreds_skipped: AtomicUsize::default(),
total_batches: 0usize,
num_small_batches: 0usize,
total_time: 0u64,
Expand Down
Loading