Skip to content

Commit

Permalink
removes intermediate vector allocations in ClusterNodes::get_retransm…
Browse files Browse the repository at this point in the history
…it_addrs

ClusterNodes::get_retransmit_addrs does 2 intermediate collects:
https://github.com/anza-xyz/agave/blob/3890ce5bc/turbine/src/cluster_nodes.rs#L222
https://github.com/anza-xyz/agave/blob/3890ce5bc/turbine/src/cluster_nodes.rs#L239

The commit avoids both by chaining iterator operations.
  • Loading branch information
behzadnouri committed Dec 16, 2024
1 parent 9e59baa commit f063280
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 61 deletions.
2 changes: 1 addition & 1 deletion turbine/benches/cluster_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ fn get_retransmit_peers_deterministic(
0,
);
let _retransmit_peers =
cluster_nodes.get_retransmit_peers(slot_leader, &shred.id(), /*fanout:*/ 200);
cluster_nodes.get_retransmit_addrs(slot_leader, &shred.id(), /*fanout:*/ 200);
}
}

Expand Down
112 changes: 52 additions & 60 deletions turbine/src/cluster_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use {
std::{
any::TypeId,
cmp::Reverse,
collections::HashMap,
collections::{HashMap, HashSet},
iter::repeat_with,
marker::PhantomData,
net::{IpAddr, SocketAddr},
Expand Down Expand Up @@ -83,14 +83,6 @@ pub struct ClusterNodesCache<T> {
ttl: Duration, // Time to live.
}

pub struct RetransmitPeers<'a> {
root_distance: usize, // distance from the root node
children: Vec<&'a Node>,
// Maps tvu addresses to the first node
// in the shuffle with the same address.
addrs: HashMap<SocketAddr, Pubkey>, // tvu addresses
}

impl Node {
#[inline]
fn pubkey(&self) -> Pubkey {
Expand Down Expand Up @@ -168,33 +160,12 @@ impl ClusterNodes<BroadcastStage> {
}

impl ClusterNodes<RetransmitStage> {
pub(crate) fn get_retransmit_addrs(
pub fn get_retransmit_addrs(
&self,
slot_leader: &Pubkey,
shred: &ShredId,
fanout: usize,
) -> Result<(/*root_distance:*/ usize, Vec<SocketAddr>), Error> {
let RetransmitPeers {
root_distance,
children,
addrs,
} = self.get_retransmit_peers(slot_leader, shred, fanout)?;
let protocol = get_broadcast_protocol(shred);
let peers = children.into_iter().filter_map(|node| {
node.contact_info()?
.tvu(protocol)
.ok()
.filter(|addr| addrs.get(addr) == Some(&node.pubkey()))
});
Ok((root_distance, peers.collect()))
}

pub fn get_retransmit_peers(
&self,
slot_leader: &Pubkey,
shred: &ShredId,
fanout: usize,
) -> Result<RetransmitPeers, Error> {
let mut weighted_shuffle = self.weighted_shuffle.clone();
// Exclude slot leader from list of nodes.
if slot_leader == &self.pubkey {
Expand All @@ -206,39 +177,46 @@ impl ClusterNodes<RetransmitStage> {
if let Some(index) = self.index.get(slot_leader) {
weighted_shuffle.remove_index(*index);
}
let mut addrs = HashMap::<SocketAddr, Pubkey>::with_capacity(self.nodes.len());
let mut rng = get_seeded_rng(slot_leader, shred);
let protocol = get_broadcast_protocol(shred);
let nodes: Vec<_> = weighted_shuffle
.shuffle(&mut rng)
.map(|index| &self.nodes[index])
.inspect(|node| {
if let Some(node) = node.contact_info() {
if let Ok(addr) = node.tvu(protocol) {
addrs.entry(addr).or_insert(*node.pubkey());
}
}
let mut nodes = {
let protocol = get_broadcast_protocol(shred);
// If there are 2 nodes in the shuffle with the same socket-addr,
// we only send shreds to the first one. The hash-set below allows
// to track if a socket-addr was observed earlier in the shuffle.
let mut addrs = HashSet::<SocketAddr>::with_capacity(self.nodes.len());
weighted_shuffle.shuffle(&mut rng).map(move |index| {
let node = &self.nodes[index];
let addr: Option<SocketAddr> = node
.contact_info()
.and_then(|node| node.tvu(protocol).ok())
.filter(|&addr| addrs.insert(addr));
(node, addr)
})
.collect();
let self_index = nodes
.iter()
.position(|node| node.pubkey() == self.pubkey)
};
// This node's index within shuffled nodes.
let index = nodes
.by_ref()
.position(|(node, _)| node.pubkey() == self.pubkey)
.unwrap();
let root_distance = if self_index == 0 {
0
} else if self_index <= fanout {
1
} else if self_index <= fanout.saturating_add(1).saturating_mul(fanout) {
2
} else {
3 // If changed, update MAX_NUM_TURBINE_HOPS.
let root_distance = get_root_distance(index, fanout);
let peers: Vec<SocketAddr> = {
// Node's index within its neighborhood.
let offset = index.saturating_sub(1) % fanout;
// First node in the neighborhood.
let anchor = index - offset;
let step = if index == 0 { 1 } else { fanout };
(anchor * fanout + offset + 1..)
.step_by(step)
.take(fanout)
.scan(index, |state, k| -> Option<Option<SocketAddr>> {
let (_, addr) = nodes.by_ref().nth(k - *state - 1)?;
*state = k;
Some(addr)
})
.flatten()
.collect()
};
let peers = get_retransmit_peers(fanout, self_index, &nodes);
Ok(RetransmitPeers {
root_distance,
children: peers.collect(),
addrs,
})
Ok((root_distance, peers))
}

// Returns the parent node in the turbine broadcast tree.
Expand Down Expand Up @@ -393,6 +371,7 @@ fn get_seeded_rng(leader: &Pubkey, shred: &ShredId) -> ChaChaRng {
// Each other node retransmits shreds to fanout many nodes in the next layer.
// For example the node k in the 1st layer will retransmit to nodes:
// fanout + k, 2*fanout + k, ..., fanout*fanout + k
#[cfg(test)]
fn get_retransmit_peers<T: Copy>(
fanout: usize,
index: usize, // Local node's index within the nodes slice.
Expand Down Expand Up @@ -519,6 +498,19 @@ pub(crate) fn get_broadcast_protocol(_: &ShredId) -> Protocol {
Protocol::UDP
}

#[inline]
fn get_root_distance(index: usize, fanout: usize) -> usize {
if index == 0 {
0
} else if index <= fanout {
1
} else if index <= fanout.saturating_add(1).saturating_mul(fanout) {
2
} else {
3 // If changed, update MAX_NUM_TURBINE_HOPS.
}
}

pub fn make_test_cluster<R: Rng>(
rng: &mut R,
num_nodes: usize,
Expand Down

0 comments on commit f063280

Please sign in to comment.