From 4be7123a97d07d6b5694d12515f3533e292cb1cb Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Wed, 6 Mar 2024 09:31:53 -0600 Subject: [PATCH] adds api to obtain the parent node in the turbine retransmit tree Following commits will use this api to check retransmitter's signature on incoming shreds. --- turbine/src/cluster_nodes.rs | 159 ++++++++++++++++++++++++++++++----- 1 file changed, 136 insertions(+), 23 deletions(-) diff --git a/turbine/src/cluster_nodes.rs b/turbine/src/cluster_nodes.rs index 0c55cb41e56472..8313a93ad7d5aa 100644 --- a/turbine/src/cluster_nodes.rs +++ b/turbine/src/cluster_nodes.rs @@ -152,8 +152,7 @@ impl ClusterNodes { } pub(crate) fn get_broadcast_peer(&self, shred: &ShredId) -> Option<&ContactInfo> { - let shred_seed = shred.seed(&self.pubkey); - let mut rng = ChaChaRng::from_seed(shred_seed); + let mut rng = get_seeded_rng(/*leader:*/ &self.pubkey, shred); let index = self.weighted_shuffle.first(&mut rng)?; self.nodes[index].contact_info() } @@ -187,7 +186,6 @@ impl ClusterNodes { shred: &ShredId, fanout: usize, ) -> Result { - let shred_seed = shred.seed(slot_leader); let mut weighted_shuffle = self.weighted_shuffle.clone(); // Exclude slot leader from list of nodes. if slot_leader == &self.pubkey { @@ -200,7 +198,7 @@ impl ClusterNodes { weighted_shuffle.remove_index(*index); } let mut addrs = HashMap::::with_capacity(self.nodes.len()); - let mut rng = ChaChaRng::from_seed(shred_seed); + let mut rng = get_seeded_rng(slot_leader, shred); let protocol = get_broadcast_protocol(shred); let nodes: Vec<_> = weighted_shuffle .shuffle(&mut rng) @@ -233,6 +231,43 @@ impl ClusterNodes { addrs, }) } + + // Returns the parent node in the turbine broadcast tree. + // Returns None if the node is the root of the tree or is not staked. + #[allow(unused)] + fn get_retransmit_parent( + &self, + leader: &Pubkey, + shred: &ShredId, + fanout: usize, + ) -> Result, Error> { + // Exclude slot leader from list of nodes. + if leader == &self.pubkey { + return Err(Error::Loopback { + leader: *leader, + shred: *shred, + }); + } + // Unstaked nodes' position in the turbine tree is not deterministic + // and depends on gossip propagation of contact-infos. Therefore, if + // this node is not staked return None. + if self.nodes[self.index[&self.pubkey]].stake == 0 { + return Ok(None); + } + let mut weighted_shuffle = self.weighted_shuffle.clone(); + if let Some(index) = self.index.get(leader).copied() { + weighted_shuffle.remove_index(index); + } + let mut rng = get_seeded_rng(leader, shred); + // Only need shuffled nodes until this node itself. + let nodes: Vec<_> = weighted_shuffle + .shuffle(&mut rng) + .map(|index| &self.nodes[index]) + .take_while(|node| node.pubkey() != self.pubkey) + .collect(); + let parent = get_retransmit_parent(fanout, nodes.len(), &nodes); + Ok(parent.map(Node::pubkey)) + } } pub fn new_cluster_nodes( @@ -296,6 +331,11 @@ fn get_nodes(cluster_info: &ClusterInfo, stakes: &HashMap) -> Vec ChaChaRng { + let seed = shred.seed(leader); + ChaChaRng::from_seed(seed) +} + // root : [0] // 1st layer: [1, 2, ..., fanout] // 2nd layer: [[fanout + 1, ..., fanout * 2], @@ -327,6 +367,21 @@ fn get_retransmit_peers( .copied() } +// Returns the parent node in the turbine broadcast tree. +// Returns None if the node is the root of the tree. +fn get_retransmit_parent( + fanout: usize, + index: usize, // Local node's index within the nodes slice. + nodes: &[T], +) -> Option { + // Node's index within its neighborhood. + let offset = index.saturating_sub(1) % fanout; + let index = index.checked_sub(1)? / fanout; + let index = index - index.saturating_sub(1) % fanout; + let index = if index == 0 { index } else { index + offset }; + nodes.get(index).copied() +} + impl ClusterNodesCache { pub fn new( // Capacity of underlying LRU-cache in terms of number of epochs. @@ -589,10 +644,36 @@ mod tests { } } + fn check_retransmit_nodes(fanout: usize, nodes: &[usize], peers: Vec>) { + let index: HashMap<_, _> = nodes + .iter() + .copied() + .enumerate() + .map(|(k, node)| (node, k)) + .collect(); + let offset = peers.len(); + // Root node's parent is None. + assert_eq!(get_retransmit_parent(fanout, 0, nodes), None); + for (k, peers) in peers.into_iter().enumerate() { + assert_eq!( + get_retransmit_peers(fanout, k, nodes).collect::>(), + peers + ); + let parent = Some(nodes[k]); + for peer in peers { + assert_eq!(get_retransmit_parent(fanout, index[&peer], nodes), parent); + } + } + // Remaining nodes have no children. + for k in offset..=nodes.len() { + assert_eq!(get_retransmit_peers(fanout, k, nodes).next(), None); + } + } + #[test] - fn test_get_retransmit_peers() { + fn test_get_retransmit_nodes() { // fanout 2 - let index = vec![ + let nodes = [ 7, // root 6, 10, // 1st layer // 2nd layer @@ -620,16 +701,9 @@ mod tests { vec![16, 9], vec![8], ]; - for (k, peers) in peers.into_iter().enumerate() { - let retransmit_peers = get_retransmit_peers(/*fanout:*/ 2, k, &index); - assert_eq!(retransmit_peers.collect::>(), peers); - } - for k in 10..=index.len() { - let mut retransmit_peers = get_retransmit_peers(/*fanout:*/ 2, k, &index); - assert_eq!(retransmit_peers.next(), None); - } + check_retransmit_nodes(/*fanout:*/ 2, &nodes, peers); // fanout 3 - let index = vec![ + let nodes = [ 19, // root 14, 15, 28, // 1st layer // 2nd layer @@ -661,13 +735,52 @@ mod tests { vec![24, 32], vec![34], ]; - for (k, peers) in peers.into_iter().enumerate() { - let retransmit_peers = get_retransmit_peers(/*fanout:*/ 3, k, &index); - assert_eq!(retransmit_peers.collect::>(), peers); - } - for k in 13..=index.len() { - let mut retransmit_peers = get_retransmit_peers(/*fanout:*/ 3, k, &index); - assert_eq!(retransmit_peers.next(), None); - } + check_retransmit_nodes(/*fanout:*/ 3, &nodes, peers); + let nodes = [ + 5, // root + 34, 52, 8, // 1st layer + // 2nd layar + 44, 18, 2, // 1st neigborhood + 42, 47, 46, // 2nd + 11, 26, 28, // 3rd + // 3rd layer + 53, 23, 37, // 1st neighborhood + 40, 13, 7, // 2nd + 50, 35, 22, // 3rd + 3, 27, 31, // 4th + 10, 48, 15, // 5th + 19, 6, 30, // 6th + 36, 45, 1, // 7th + 38, 12, 17, // 8th + 4, 32, 16, // 9th + // 4th layer + 41, 49, 24, // 1st neighborhood + 14, 9, 0, // 2nd + 29, 21, 39, // 3rd + 43, 51, 33, // 4th + 25, 20, // 5th + ]; + let peers = vec![ + vec![34, 52, 8], + vec![44, 42, 11], + vec![18, 47, 26], + vec![2, 46, 28], + vec![53, 40, 50], + vec![23, 13, 35], + vec![37, 7, 22], + vec![3, 10, 19], + vec![27, 48, 6], + vec![31, 15, 30], + vec![36, 38, 4], + vec![45, 12, 32], + vec![1, 17, 16], + vec![41, 14, 29], + vec![49, 9, 21], + vec![24, 0, 39], + vec![43, 25], + vec![51, 20], + vec![33], + ]; + check_retransmit_nodes(/*fanout:*/ 3, &nodes, peers); } }