Skip to content

Commit

Permalink
adds api to obtain the parent node in the turbine retransmit tree
Browse files Browse the repository at this point in the history
Following commits will use this api to check retransmitter's signature
on incoming shreds.
  • Loading branch information
behzadnouri committed Mar 6, 2024
1 parent c161351 commit bd649d6
Showing 1 changed file with 131 additions and 20 deletions.
151 changes: 131 additions & 20 deletions turbine/src/cluster_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,43 @@ impl ClusterNodes<RetransmitStage> {
addrs,
})
}

// Returns the parent node in the turbine broadcast tree.
// Returns None if the node is the root of the tree or is not staked.
#[allow(unused)]
fn get_retransmit_parent(
&self,
slot_leader: &Pubkey,
shred: &ShredId,
fanout: usize,
) -> Result<Option<Pubkey>, Error> {
// Exclude slot leader from list of nodes.
if slot_leader == &self.pubkey {
return Err(Error::Loopback {
leader: *slot_leader,
shred: *shred,
});
}
// Unstaked nodes position in the turbine tree is not deterministic and
// depends on gossip propagation of contact-infos. Therefore, if this
// node is not staked return None.
if self.nodes[self.index[&self.pubkey]].stake == 0 {
return Ok(None);
}
let mut weighted_shuffle = self.weighted_shuffle.clone();
if let Some(index) = self.index.get(slot_leader).copied() {
weighted_shuffle.remove_index(index);
}
let shred_seed = shred.seed(slot_leader);
let mut rng = ChaChaRng::from_seed(shred_seed);
// Only need shuffled nodes until this node itself.
let nodes: Vec<_> = weighted_shuffle
.shuffle(&mut rng)
.map(|index| &self.nodes[index])
.take_while(|node| node.pubkey() != self.pubkey)
.collect();
Ok(get_retransmit_parent(fanout, nodes.len(), &nodes).map(Node::pubkey))
}
}

pub fn new_cluster_nodes<T: 'static>(
Expand Down Expand Up @@ -327,6 +364,21 @@ fn get_retransmit_peers<T: Copy>(
.copied()
}

// Returns the parent node in the turbine broadcast tree.
// Returns None if the node is the root of the tree.
fn get_retransmit_parent<T: Copy>(
fanout: usize,
index: usize, // Local node's index within the nodes slice.
nodes: &[T],
) -> Option<T> {
// Node's index within its neighborhood.
let offset = index.saturating_sub(1) % fanout;
let index = index.checked_sub(1)? / fanout;
let index = index - index.saturating_sub(1) % fanout;
let index = if index == 0 { index } else { index + offset };
nodes.get(index).copied()
}

impl<T> ClusterNodesCache<T> {
pub fn new(
// Capacity of underlying LRU-cache in terms of number of epochs.
Expand Down Expand Up @@ -589,10 +641,36 @@ mod tests {
}
}

fn check_retransmit_nodes(fanout: usize, nodes: &[usize], peers: Vec<Vec<usize>>) {
let index: HashMap<_, _> = nodes
.iter()
.copied()
.enumerate()
.map(|(k, node)| (node, k))
.collect();
let offset = peers.len();
// Root node's parent is None.
assert_eq!(get_retransmit_parent(fanout, 0, nodes), None);
for (k, peers) in peers.into_iter().enumerate() {
assert_eq!(
get_retransmit_peers(fanout, k, nodes).collect::<Vec<_>>(),
peers
);
let parent = Some(nodes[k]);
for peer in peers {
assert_eq!(get_retransmit_parent(fanout, index[&peer], nodes), parent);
}
}
// Remaining nodes have no children.
for k in offset..=nodes.len() {
assert_eq!(get_retransmit_peers(fanout, k, nodes).next(), None);
}
}

#[test]
fn test_get_retransmit_peers() {
fn test_get_retransmit_nodes() {
// fanout 2
let index = vec![
let nodes = [
7, // root
6, 10, // 1st layer
// 2nd layer
Expand Down Expand Up @@ -620,16 +698,9 @@ mod tests {
vec![16, 9],
vec![8],
];
for (k, peers) in peers.into_iter().enumerate() {
let retransmit_peers = get_retransmit_peers(/*fanout:*/ 2, k, &index);
assert_eq!(retransmit_peers.collect::<Vec<_>>(), peers);
}
for k in 10..=index.len() {
let mut retransmit_peers = get_retransmit_peers(/*fanout:*/ 2, k, &index);
assert_eq!(retransmit_peers.next(), None);
}
// fanout 3
let index = vec![
check_retransmit_nodes(/*fanout:*/ 2, &nodes, peers);
// fanout 3, 3 layers
let nodes = [
19, // root
14, 15, 28, // 1st layer
// 2nd layer
Expand Down Expand Up @@ -661,13 +732,53 @@ mod tests {
vec![24, 32],
vec![34],
];
for (k, peers) in peers.into_iter().enumerate() {
let retransmit_peers = get_retransmit_peers(/*fanout:*/ 3, k, &index);
assert_eq!(retransmit_peers.collect::<Vec<_>>(), peers);
}
for k in 13..=index.len() {
let mut retransmit_peers = get_retransmit_peers(/*fanout:*/ 3, k, &index);
assert_eq!(retransmit_peers.next(), None);
}
check_retransmit_nodes(/*fanout:*/ 3, &nodes, peers);
// fanout 3, 4 layers
let nodes = [
5, // root
34, 52, 8, // 1st layer
// 2nd layar
44, 18, 2, // 1st neigborhood
42, 47, 46, // 2nd
11, 26, 28, // 3rd
// 3rd layer
53, 23, 37, // 1st neighborhood
40, 13, 7, // 2nd
50, 35, 22, // 3rd
3, 27, 31, // 4th
10, 48, 15, // 5th
19, 6, 30, // 6th
36, 45, 1, // 7th
38, 12, 17, // 8th
4, 32, 16, // 9th
// 4th layer
41, 49, 24, // 1st neighborhood
14, 9, 0, // 2nd
29, 21, 39, // 3rd
43, 51, 33, // 4th
25, 20, // 5th
];
let peers = vec![
vec![34, 52, 8],
vec![44, 42, 11],
vec![18, 47, 26],
vec![2, 46, 28],
vec![53, 40, 50],
vec![23, 13, 35],
vec![37, 7, 22],
vec![3, 10, 19],
vec![27, 48, 6],
vec![31, 15, 30],
vec![36, 38, 4],
vec![45, 12, 32],
vec![1, 17, 16],
vec![41, 14, 29],
vec![49, 9, 21],
vec![24, 0, 39],
vec![43, 25],
vec![51, 20],
vec![33],
];
check_retransmit_nodes(/*fanout:*/ 3, &nodes, peers);
}
}

0 comments on commit bd649d6

Please sign in to comment.