From 646c71c5a5b43aec0718ba7c1df5a72c66f6bc77 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Mon, 16 Dec 2024 08:55:42 -0600 Subject: [PATCH 1/3] removes intermediate vector allocations in ClusterNodes::get_retransmit_addrs ClusterNodes::get_retransmit_addrs does 2 intermediate collects: https://github.com/anza-xyz/agave/blob/3890ce5bc/turbine/src/cluster_nodes.rs#L222 https://github.com/anza-xyz/agave/blob/3890ce5bc/turbine/src/cluster_nodes.rs#L239 The commit avoids both by chaining iterator operations. --- turbine/benches/cluster_nodes.rs | 2 +- turbine/src/cluster_nodes.rs | 112 ++++++++++++++----------------- 2 files changed, 53 insertions(+), 61 deletions(-) diff --git a/turbine/benches/cluster_nodes.rs b/turbine/benches/cluster_nodes.rs index 1daa304ee74ccc..a36ac648b1c757 100644 --- a/turbine/benches/cluster_nodes.rs +++ b/turbine/benches/cluster_nodes.rs @@ -46,7 +46,7 @@ fn get_retransmit_peers_deterministic( 0, ); let _retransmit_peers = - cluster_nodes.get_retransmit_peers(slot_leader, &shred.id(), /*fanout:*/ 200); + cluster_nodes.get_retransmit_addrs(slot_leader, &shred.id(), /*fanout:*/ 200); } } diff --git a/turbine/src/cluster_nodes.rs b/turbine/src/cluster_nodes.rs index 6f8aa6a1522aa5..713d21b539fd42 100644 --- a/turbine/src/cluster_nodes.rs +++ b/turbine/src/cluster_nodes.rs @@ -28,7 +28,7 @@ use { std::{ any::TypeId, cmp::Reverse, - collections::HashMap, + collections::{HashMap, HashSet}, iter::repeat_with, marker::PhantomData, net::{IpAddr, SocketAddr}, @@ -83,14 +83,6 @@ pub struct ClusterNodesCache { ttl: Duration, // Time to live. } -pub struct RetransmitPeers<'a> { - root_distance: usize, // distance from the root node - children: Vec<&'a Node>, - // Maps tvu addresses to the first node - // in the shuffle with the same address. - addrs: HashMap, // tvu addresses -} - impl Node { #[inline] fn pubkey(&self) -> Pubkey { @@ -168,33 +160,12 @@ impl ClusterNodes { } impl ClusterNodes { - pub(crate) fn get_retransmit_addrs( + pub fn get_retransmit_addrs( &self, slot_leader: &Pubkey, shred: &ShredId, fanout: usize, ) -> Result<(/*root_distance:*/ usize, Vec), Error> { - let RetransmitPeers { - root_distance, - children, - addrs, - } = self.get_retransmit_peers(slot_leader, shred, fanout)?; - let protocol = get_broadcast_protocol(shred); - let peers = children.into_iter().filter_map(|node| { - node.contact_info()? - .tvu(protocol) - .ok() - .filter(|addr| addrs.get(addr) == Some(&node.pubkey())) - }); - Ok((root_distance, peers.collect())) - } - - pub fn get_retransmit_peers( - &self, - slot_leader: &Pubkey, - shred: &ShredId, - fanout: usize, - ) -> Result { let mut weighted_shuffle = self.weighted_shuffle.clone(); // Exclude slot leader from list of nodes. if slot_leader == &self.pubkey { @@ -206,39 +177,46 @@ impl ClusterNodes { if let Some(index) = self.index.get(slot_leader) { weighted_shuffle.remove_index(*index); } - let mut addrs = HashMap::::with_capacity(self.nodes.len()); let mut rng = get_seeded_rng(slot_leader, shred); - let protocol = get_broadcast_protocol(shred); - let nodes: Vec<_> = weighted_shuffle - .shuffle(&mut rng) - .map(|index| &self.nodes[index]) - .inspect(|node| { - if let Some(node) = node.contact_info() { - if let Ok(addr) = node.tvu(protocol) { - addrs.entry(addr).or_insert(*node.pubkey()); - } - } + let mut nodes = { + let protocol = get_broadcast_protocol(shred); + // If there are 2 nodes in the shuffle with the same socket-addr, + // we only send shreds to the first one. The hash-set below allows + // to track if a socket-addr was observed earlier in the shuffle. + let mut addrs = HashSet::::with_capacity(self.nodes.len()); + weighted_shuffle.shuffle(&mut rng).map(move |index| { + let node = &self.nodes[index]; + let addr: Option = node + .contact_info() + .and_then(|node| node.tvu(protocol).ok()) + .filter(|&addr| addrs.insert(addr)); + (node, addr) }) - .collect(); - let self_index = nodes - .iter() - .position(|node| node.pubkey() == self.pubkey) + }; + // This node's index within shuffled nodes. + let index = nodes + .by_ref() + .position(|(node, _)| node.pubkey() == self.pubkey) .unwrap(); - let root_distance = if self_index == 0 { - 0 - } else if self_index <= fanout { - 1 - } else if self_index <= fanout.saturating_add(1).saturating_mul(fanout) { - 2 - } else { - 3 // If changed, update MAX_NUM_TURBINE_HOPS. + let root_distance = get_root_distance(index, fanout); + let peers: Vec = { + // Node's index within its neighborhood. + let offset = index.saturating_sub(1) % fanout; + // First node in the neighborhood. + let anchor = index - offset; + let step = if index == 0 { 1 } else { fanout }; + (anchor * fanout + offset + 1..) + .step_by(step) + .take(fanout) + .scan(index, |state, k| -> Option> { + let (_, addr) = nodes.by_ref().nth(k - *state - 1)?; + *state = k; + Some(addr) + }) + .flatten() + .collect() }; - let peers = get_retransmit_peers(fanout, self_index, &nodes); - Ok(RetransmitPeers { - root_distance, - children: peers.collect(), - addrs, - }) + Ok((root_distance, peers)) } // Returns the parent node in the turbine broadcast tree. @@ -393,6 +371,7 @@ fn get_seeded_rng(leader: &Pubkey, shred: &ShredId) -> ChaChaRng { // Each other node retransmits shreds to fanout many nodes in the next layer. // For example the node k in the 1st layer will retransmit to nodes: // fanout + k, 2*fanout + k, ..., fanout*fanout + k +#[cfg(test)] fn get_retransmit_peers( fanout: usize, index: usize, // Local node's index within the nodes slice. @@ -519,6 +498,19 @@ pub(crate) fn get_broadcast_protocol(_: &ShredId) -> Protocol { Protocol::UDP } +#[inline] +fn get_root_distance(index: usize, fanout: usize) -> usize { + if index == 0 { + 0 + } else if index <= fanout { + 1 + } else if index <= fanout.saturating_add(1).saturating_mul(fanout) { + 2 + } else { + 3 // If changed, update MAX_NUM_TURBINE_HOPS. + } +} + pub fn make_test_cluster( rng: &mut R, num_nodes: usize, From 69548079b6c85465dfd095347f66d932394d85be Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Mon, 16 Dec 2024 11:54:23 -0600 Subject: [PATCH 2/3] updates get_retransmit_peers --- turbine/src/cluster_nodes.rs | 84 +++++++++++++++++------------------- 1 file changed, 39 insertions(+), 45 deletions(-) diff --git a/turbine/src/cluster_nodes.rs b/turbine/src/cluster_nodes.rs index 713d21b539fd42..59b53e50544ae4 100644 --- a/turbine/src/cluster_nodes.rs +++ b/turbine/src/cluster_nodes.rs @@ -178,7 +178,7 @@ impl ClusterNodes { weighted_shuffle.remove_index(*index); } let mut rng = get_seeded_rng(slot_leader, shred); - let mut nodes = { + let nodes = { let protocol = get_broadcast_protocol(shred); // If there are 2 nodes in the shuffle with the same socket-addr, // we only send shreds to the first one. The hash-set below allows @@ -193,29 +193,10 @@ impl ClusterNodes { (node, addr) }) }; - // This node's index within shuffled nodes. - let index = nodes - .by_ref() - .position(|(node, _)| node.pubkey() == self.pubkey) - .unwrap(); + let (index, peers) = + get_retransmit_peers(fanout, |(node, _)| node.pubkey() == self.pubkey, nodes); + let peers = peers.filter_map(|(_, addr)| addr).collect(); let root_distance = get_root_distance(index, fanout); - let peers: Vec = { - // Node's index within its neighborhood. - let offset = index.saturating_sub(1) % fanout; - // First node in the neighborhood. - let anchor = index - offset; - let step = if index == 0 { 1 } else { fanout }; - (anchor * fanout + offset + 1..) - .step_by(step) - .take(fanout) - .scan(index, |state, k| -> Option> { - let (_, addr) = nodes.by_ref().nth(k - *state - 1)?; - *state = k; - Some(addr) - }) - .flatten() - .collect() - }; Ok((root_distance, peers)) } @@ -371,23 +352,29 @@ fn get_seeded_rng(leader: &Pubkey, shred: &ShredId) -> ChaChaRng { // Each other node retransmits shreds to fanout many nodes in the next layer. // For example the node k in the 1st layer will retransmit to nodes: // fanout + k, 2*fanout + k, ..., fanout*fanout + k -#[cfg(test)] -fn get_retransmit_peers( +fn get_retransmit_peers( fanout: usize, - index: usize, // Local node's index within the nodes slice. - nodes: &[T], -) -> impl Iterator + '_ { + // Predicate fn which identifies this node in the shuffle. + pred: impl Fn(T) -> bool, + nodes: impl IntoIterator, +) -> (/*this node's index:*/ usize, impl Iterator) { + let mut nodes = nodes.into_iter(); + // This node's index within shuffled nodes. + let index = nodes.by_ref().position(pred).unwrap(); // Node's index within its neighborhood. let offset = index.saturating_sub(1) % fanout; // First node in the neighborhood. let anchor = index - offset; let step = if index == 0 { 1 } else { fanout }; - (anchor * fanout + offset + 1..) + let peers = (anchor * fanout + offset + 1..) .step_by(step) .take(fanout) - .map(|i| nodes.get(i)) - .while_some() - .copied() + .scan(index, move |state, k| -> Option { + let peer = nodes.by_ref().nth(k - *state - 1)?; + *state = k; + Some(peer) + }); + (index, peers) } // Returns the parent node in the turbine broadcast tree. @@ -702,7 +689,7 @@ mod tests { T: Copy + Eq + PartialEq + Debug + Hash, { // Map node identities to their index within the shuffled tree. - let index: HashMap<_, _> = nodes + let cache: HashMap<_, _> = nodes .iter() .copied() .enumerate() @@ -712,18 +699,22 @@ mod tests { // Root node's parent is None. assert_eq!(get_retransmit_parent(fanout, /*index:*/ 0, nodes), None); for (k, peers) in peers.into_iter().enumerate() { - assert_eq!( - get_retransmit_peers(fanout, k, nodes).collect::>(), - peers - ); + { + let (index, retransmit_peers) = + get_retransmit_peers(fanout, |node| node == &nodes[k], nodes); + assert_eq!(peers, retransmit_peers.copied().collect::>()); + assert_eq!(index, k); + } let parent = Some(nodes[k]); for peer in peers { - assert_eq!(get_retransmit_parent(fanout, index[&peer], nodes), parent); + assert_eq!(get_retransmit_parent(fanout, cache[&peer], nodes), parent); } } // Remaining nodes have no children. - for k in offset..=nodes.len() { - assert_eq!(get_retransmit_peers(fanout, k, nodes).next(), None); + for k in offset..nodes.len() { + let (index, mut peers) = get_retransmit_peers(fanout, |node| node == &nodes[k], nodes); + assert_eq!(peers.next(), None); + assert_eq!(index, k); } } @@ -852,7 +843,7 @@ mod tests { let mut nodes: Vec<_> = (0..size).collect(); nodes.shuffle(&mut rng); // Map node identities to their index within the shuffled tree. - let index: HashMap<_, _> = nodes + let cache: HashMap<_, _> = nodes .iter() .copied() .enumerate() @@ -862,13 +853,16 @@ mod tests { assert_eq!(get_retransmit_parent(fanout, /*index:*/ 0, &nodes), None); for k in 1..size { let parent = get_retransmit_parent(fanout, k, &nodes).unwrap(); - let mut peers = get_retransmit_peers(fanout, index[&parent], &nodes); - assert_eq!(peers.find(|&peer| peer == nodes[k]), Some(nodes[k])); + let (index, mut peers) = get_retransmit_peers(fanout, |node| node == &parent, &nodes); + assert_eq!(index, cache[&parent]); + assert_eq!(peers.find(|&&peer| peer == nodes[k]), Some(&nodes[k])); } for k in 0..size { let parent = Some(nodes[k]); - for peer in get_retransmit_peers(fanout, k, &nodes) { - assert_eq!(get_retransmit_parent(fanout, index[&peer], &nodes), parent); + let (index, peers) = get_retransmit_peers(fanout, |node| node == &nodes[k], &nodes); + assert_eq!(index, k); + for peer in peers { + assert_eq!(get_retransmit_parent(fanout, cache[peer], &nodes), parent); } } } From e6f1b71fd08e972d5b807e886fb95bf19f9e2949 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Mon, 16 Dec 2024 14:08:35 -0600 Subject: [PATCH 3/3] moves socket-addr-space check to get_retransmit_addrs --- streamer/src/socket.rs | 1 + turbine/benches/cluster_nodes.rs | 9 +++++++-- turbine/src/cluster_nodes.rs | 6 +++++- turbine/src/retransmit_stage.rs | 12 ++++++------ 4 files changed, 19 insertions(+), 9 deletions(-) diff --git a/streamer/src/socket.rs b/streamer/src/socket.rs index fe86f84319d4d4..d9bd0966e3ae3a 100644 --- a/streamer/src/socket.rs +++ b/streamer/src/socket.rs @@ -16,6 +16,7 @@ impl SocketAddrSpace { } /// Returns true if the IP address is valid. + #[inline] #[must_use] pub fn check(&self, addr: &SocketAddr) -> bool { if matches!(self, SocketAddrSpace::Unspecified) { diff --git a/turbine/benches/cluster_nodes.rs b/turbine/benches/cluster_nodes.rs index a36ac648b1c757..08862898196807 100644 --- a/turbine/benches/cluster_nodes.rs +++ b/turbine/benches/cluster_nodes.rs @@ -7,6 +7,7 @@ use { solana_gossip::contact_info::ContactInfo, solana_ledger::shred::{Shred, ShredFlags}, solana_sdk::{clock::Slot, genesis_config::ClusterType, pubkey::Pubkey}, + solana_streamer::socket::SocketAddrSpace, solana_turbine::{ cluster_nodes::{make_test_cluster, new_cluster_nodes, ClusterNodes}, retransmit_stage::RetransmitStage, @@ -45,8 +46,12 @@ fn get_retransmit_peers_deterministic( 0, 0, ); - let _retransmit_peers = - cluster_nodes.get_retransmit_addrs(slot_leader, &shred.id(), /*fanout:*/ 200); + let _retransmit_peers = cluster_nodes.get_retransmit_addrs( + slot_leader, + &shred.id(), + 200, // fanout + &SocketAddrSpace::Unspecified, + ); } } diff --git a/turbine/src/cluster_nodes.rs b/turbine/src/cluster_nodes.rs index 59b53e50544ae4..850b90cbb97232 100644 --- a/turbine/src/cluster_nodes.rs +++ b/turbine/src/cluster_nodes.rs @@ -165,6 +165,7 @@ impl ClusterNodes { slot_leader: &Pubkey, shred: &ShredId, fanout: usize, + socket_addr_space: &SocketAddrSpace, ) -> Result<(/*root_distance:*/ usize, Vec), Error> { let mut weighted_shuffle = self.weighted_shuffle.clone(); // Exclude slot leader from list of nodes. @@ -195,7 +196,10 @@ impl ClusterNodes { }; let (index, peers) = get_retransmit_peers(fanout, |(node, _)| node.pubkey() == self.pubkey, nodes); - let peers = peers.filter_map(|(_, addr)| addr).collect(); + let peers = peers + .filter_map(|(_, addr)| addr) + .filter(|addr| socket_addr_space.check(addr)) + .collect(); let root_distance = get_root_distance(index, fanout); Ok((root_distance, peers)) } diff --git a/turbine/src/retransmit_stage.rs b/turbine/src/retransmit_stage.rs index e820851d03e4ba..db9d1f96662a16 100644 --- a/turbine/src/retransmit_stage.rs +++ b/turbine/src/retransmit_stage.rs @@ -328,12 +328,12 @@ fn retransmit_shred( ) -> Result<(/*root_distance:*/ usize, /*num_nodes:*/ usize), Error> { let mut compute_turbine_peers = Measure::start("turbine_start"); let data_plane_fanout = cluster_nodes::get_data_plane_fanout(key.slot(), root_bank); - let (root_distance, addrs) = - cluster_nodes.get_retransmit_addrs(slot_leader, key, data_plane_fanout)?; - let addrs: Vec<_> = addrs - .into_iter() - .filter(|addr| socket_addr_space.check(addr)) - .collect(); + let (root_distance, addrs) = cluster_nodes.get_retransmit_addrs( + slot_leader, + key, + data_plane_fanout, + socket_addr_space, + )?; compute_turbine_peers.stop(); stats .compute_turbine_peers_total