From b6713714041ae011d62408e1a2ba46e2a8b6e85f Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Mon, 16 Dec 2024 11:54:23 -0600 Subject: [PATCH] updates get_retransmit_peers --- turbine/src/cluster_nodes.rs | 84 +++++++++++++++++------------------- 1 file changed, 39 insertions(+), 45 deletions(-) diff --git a/turbine/src/cluster_nodes.rs b/turbine/src/cluster_nodes.rs index 713d21b539fd42..59b53e50544ae4 100644 --- a/turbine/src/cluster_nodes.rs +++ b/turbine/src/cluster_nodes.rs @@ -178,7 +178,7 @@ impl ClusterNodes { weighted_shuffle.remove_index(*index); } let mut rng = get_seeded_rng(slot_leader, shred); - let mut nodes = { + let nodes = { let protocol = get_broadcast_protocol(shred); // If there are 2 nodes in the shuffle with the same socket-addr, // we only send shreds to the first one. The hash-set below allows @@ -193,29 +193,10 @@ impl ClusterNodes { (node, addr) }) }; - // This node's index within shuffled nodes. - let index = nodes - .by_ref() - .position(|(node, _)| node.pubkey() == self.pubkey) - .unwrap(); + let (index, peers) = + get_retransmit_peers(fanout, |(node, _)| node.pubkey() == self.pubkey, nodes); + let peers = peers.filter_map(|(_, addr)| addr).collect(); let root_distance = get_root_distance(index, fanout); - let peers: Vec = { - // Node's index within its neighborhood. - let offset = index.saturating_sub(1) % fanout; - // First node in the neighborhood. - let anchor = index - offset; - let step = if index == 0 { 1 } else { fanout }; - (anchor * fanout + offset + 1..) - .step_by(step) - .take(fanout) - .scan(index, |state, k| -> Option> { - let (_, addr) = nodes.by_ref().nth(k - *state - 1)?; - *state = k; - Some(addr) - }) - .flatten() - .collect() - }; Ok((root_distance, peers)) } @@ -371,23 +352,29 @@ fn get_seeded_rng(leader: &Pubkey, shred: &ShredId) -> ChaChaRng { // Each other node retransmits shreds to fanout many nodes in the next layer. // For example the node k in the 1st layer will retransmit to nodes: // fanout + k, 2*fanout + k, ..., fanout*fanout + k -#[cfg(test)] -fn get_retransmit_peers( +fn get_retransmit_peers( fanout: usize, - index: usize, // Local node's index within the nodes slice. - nodes: &[T], -) -> impl Iterator + '_ { + // Predicate fn which identifies this node in the shuffle. + pred: impl Fn(T) -> bool, + nodes: impl IntoIterator, +) -> (/*this node's index:*/ usize, impl Iterator) { + let mut nodes = nodes.into_iter(); + // This node's index within shuffled nodes. + let index = nodes.by_ref().position(pred).unwrap(); // Node's index within its neighborhood. let offset = index.saturating_sub(1) % fanout; // First node in the neighborhood. let anchor = index - offset; let step = if index == 0 { 1 } else { fanout }; - (anchor * fanout + offset + 1..) + let peers = (anchor * fanout + offset + 1..) .step_by(step) .take(fanout) - .map(|i| nodes.get(i)) - .while_some() - .copied() + .scan(index, move |state, k| -> Option { + let peer = nodes.by_ref().nth(k - *state - 1)?; + *state = k; + Some(peer) + }); + (index, peers) } // Returns the parent node in the turbine broadcast tree. @@ -702,7 +689,7 @@ mod tests { T: Copy + Eq + PartialEq + Debug + Hash, { // Map node identities to their index within the shuffled tree. - let index: HashMap<_, _> = nodes + let cache: HashMap<_, _> = nodes .iter() .copied() .enumerate() @@ -712,18 +699,22 @@ mod tests { // Root node's parent is None. assert_eq!(get_retransmit_parent(fanout, /*index:*/ 0, nodes), None); for (k, peers) in peers.into_iter().enumerate() { - assert_eq!( - get_retransmit_peers(fanout, k, nodes).collect::>(), - peers - ); + { + let (index, retransmit_peers) = + get_retransmit_peers(fanout, |node| node == &nodes[k], nodes); + assert_eq!(peers, retransmit_peers.copied().collect::>()); + assert_eq!(index, k); + } let parent = Some(nodes[k]); for peer in peers { - assert_eq!(get_retransmit_parent(fanout, index[&peer], nodes), parent); + assert_eq!(get_retransmit_parent(fanout, cache[&peer], nodes), parent); } } // Remaining nodes have no children. - for k in offset..=nodes.len() { - assert_eq!(get_retransmit_peers(fanout, k, nodes).next(), None); + for k in offset..nodes.len() { + let (index, mut peers) = get_retransmit_peers(fanout, |node| node == &nodes[k], nodes); + assert_eq!(peers.next(), None); + assert_eq!(index, k); } } @@ -852,7 +843,7 @@ mod tests { let mut nodes: Vec<_> = (0..size).collect(); nodes.shuffle(&mut rng); // Map node identities to their index within the shuffled tree. - let index: HashMap<_, _> = nodes + let cache: HashMap<_, _> = nodes .iter() .copied() .enumerate() @@ -862,13 +853,16 @@ mod tests { assert_eq!(get_retransmit_parent(fanout, /*index:*/ 0, &nodes), None); for k in 1..size { let parent = get_retransmit_parent(fanout, k, &nodes).unwrap(); - let mut peers = get_retransmit_peers(fanout, index[&parent], &nodes); - assert_eq!(peers.find(|&peer| peer == nodes[k]), Some(nodes[k])); + let (index, mut peers) = get_retransmit_peers(fanout, |node| node == &parent, &nodes); + assert_eq!(index, cache[&parent]); + assert_eq!(peers.find(|&&peer| peer == nodes[k]), Some(&nodes[k])); } for k in 0..size { let parent = Some(nodes[k]); - for peer in get_retransmit_peers(fanout, k, &nodes) { - assert_eq!(get_retransmit_parent(fanout, index[&peer], &nodes), parent); + let (index, peers) = get_retransmit_peers(fanout, |node| node == &nodes[k], &nodes); + assert_eq!(index, k); + for peer in peers { + assert_eq!(get_retransmit_parent(fanout, cache[peer], &nodes), parent); } } }