Skip to content

Commit

Permalink
updates get_retransmit_peers
Browse files Browse the repository at this point in the history
  • Loading branch information
behzadnouri committed Dec 16, 2024
1 parent f063280 commit b671371
Showing 1 changed file with 39 additions and 45 deletions.
84 changes: 39 additions & 45 deletions turbine/src/cluster_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ impl ClusterNodes<RetransmitStage> {
weighted_shuffle.remove_index(*index);
}
let mut rng = get_seeded_rng(slot_leader, shred);
let mut nodes = {
let nodes = {
let protocol = get_broadcast_protocol(shred);
// If there are 2 nodes in the shuffle with the same socket-addr,
// we only send shreds to the first one. The hash-set below allows
Expand All @@ -193,29 +193,10 @@ impl ClusterNodes<RetransmitStage> {
(node, addr)
})
};
// This node's index within shuffled nodes.
let index = nodes
.by_ref()
.position(|(node, _)| node.pubkey() == self.pubkey)
.unwrap();
let (index, peers) =
get_retransmit_peers(fanout, |(node, _)| node.pubkey() == self.pubkey, nodes);
let peers = peers.filter_map(|(_, addr)| addr).collect();
let root_distance = get_root_distance(index, fanout);
let peers: Vec<SocketAddr> = {
// Node's index within its neighborhood.
let offset = index.saturating_sub(1) % fanout;
// First node in the neighborhood.
let anchor = index - offset;
let step = if index == 0 { 1 } else { fanout };
(anchor * fanout + offset + 1..)
.step_by(step)
.take(fanout)
.scan(index, |state, k| -> Option<Option<SocketAddr>> {
let (_, addr) = nodes.by_ref().nth(k - *state - 1)?;
*state = k;
Some(addr)
})
.flatten()
.collect()
};
Ok((root_distance, peers))
}

Expand Down Expand Up @@ -371,23 +352,29 @@ fn get_seeded_rng(leader: &Pubkey, shred: &ShredId) -> ChaChaRng {
// Each other node retransmits shreds to fanout many nodes in the next layer.
// For example the node k in the 1st layer will retransmit to nodes:
// fanout + k, 2*fanout + k, ..., fanout*fanout + k
#[cfg(test)]
fn get_retransmit_peers<T: Copy>(
fn get_retransmit_peers<T>(
fanout: usize,
index: usize, // Local node's index within the nodes slice.
nodes: &[T],
) -> impl Iterator<Item = T> + '_ {
// Predicate fn which identifies this node in the shuffle.
pred: impl Fn(T) -> bool,
nodes: impl IntoIterator<Item = T>,
) -> (/*this node's index:*/ usize, impl Iterator<Item = T>) {
let mut nodes = nodes.into_iter();
// This node's index within shuffled nodes.
let index = nodes.by_ref().position(pred).unwrap();
// Node's index within its neighborhood.
let offset = index.saturating_sub(1) % fanout;
// First node in the neighborhood.
let anchor = index - offset;
let step = if index == 0 { 1 } else { fanout };
(anchor * fanout + offset + 1..)
let peers = (anchor * fanout + offset + 1..)
.step_by(step)
.take(fanout)
.map(|i| nodes.get(i))
.while_some()
.copied()
.scan(index, move |state, k| -> Option<T> {
let peer = nodes.by_ref().nth(k - *state - 1)?;
*state = k;
Some(peer)
});
(index, peers)
}

// Returns the parent node in the turbine broadcast tree.
Expand Down Expand Up @@ -702,7 +689,7 @@ mod tests {
T: Copy + Eq + PartialEq + Debug + Hash,
{
// Map node identities to their index within the shuffled tree.
let index: HashMap<_, _> = nodes
let cache: HashMap<_, _> = nodes
.iter()
.copied()
.enumerate()
Expand All @@ -712,18 +699,22 @@ mod tests {
// Root node's parent is None.
assert_eq!(get_retransmit_parent(fanout, /*index:*/ 0, nodes), None);
for (k, peers) in peers.into_iter().enumerate() {
assert_eq!(
get_retransmit_peers(fanout, k, nodes).collect::<Vec<_>>(),
peers
);
{
let (index, retransmit_peers) =
get_retransmit_peers(fanout, |node| node == &nodes[k], nodes);
assert_eq!(peers, retransmit_peers.copied().collect::<Vec<_>>());
assert_eq!(index, k);
}
let parent = Some(nodes[k]);
for peer in peers {
assert_eq!(get_retransmit_parent(fanout, index[&peer], nodes), parent);
assert_eq!(get_retransmit_parent(fanout, cache[&peer], nodes), parent);
}
}
// Remaining nodes have no children.
for k in offset..=nodes.len() {
assert_eq!(get_retransmit_peers(fanout, k, nodes).next(), None);
for k in offset..nodes.len() {
let (index, mut peers) = get_retransmit_peers(fanout, |node| node == &nodes[k], nodes);
assert_eq!(peers.next(), None);
assert_eq!(index, k);
}
}

Expand Down Expand Up @@ -852,7 +843,7 @@ mod tests {
let mut nodes: Vec<_> = (0..size).collect();
nodes.shuffle(&mut rng);
// Map node identities to their index within the shuffled tree.
let index: HashMap<_, _> = nodes
let cache: HashMap<_, _> = nodes
.iter()
.copied()
.enumerate()
Expand All @@ -862,13 +853,16 @@ mod tests {
assert_eq!(get_retransmit_parent(fanout, /*index:*/ 0, &nodes), None);
for k in 1..size {
let parent = get_retransmit_parent(fanout, k, &nodes).unwrap();
let mut peers = get_retransmit_peers(fanout, index[&parent], &nodes);
assert_eq!(peers.find(|&peer| peer == nodes[k]), Some(nodes[k]));
let (index, mut peers) = get_retransmit_peers(fanout, |node| node == &parent, &nodes);
assert_eq!(index, cache[&parent]);
assert_eq!(peers.find(|&&peer| peer == nodes[k]), Some(&nodes[k]));
}
for k in 0..size {
let parent = Some(nodes[k]);
for peer in get_retransmit_peers(fanout, k, &nodes) {
assert_eq!(get_retransmit_parent(fanout, index[&peer], &nodes), parent);
let (index, peers) = get_retransmit_peers(fanout, |node| node == &nodes[k], &nodes);
assert_eq!(index, k);
for peer in peers {
assert_eq!(get_retransmit_parent(fanout, cache[peer], &nodes), parent);
}
}
}
Expand Down

0 comments on commit b671371

Please sign in to comment.