Skip to content

Commit

Permalink
adds api to obtain the parent node in the turbine retransmit tree
Browse files Browse the repository at this point in the history
Following commits will use this api to check retransmitter's signature
on incoming shreds.
  • Loading branch information
behzadnouri committed Mar 6, 2024
1 parent c161351 commit 4be7123
Showing 1 changed file with 136 additions and 23 deletions.
159 changes: 136 additions & 23 deletions turbine/src/cluster_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,7 @@ impl ClusterNodes<BroadcastStage> {
}

pub(crate) fn get_broadcast_peer(&self, shred: &ShredId) -> Option<&ContactInfo> {
let shred_seed = shred.seed(&self.pubkey);
let mut rng = ChaChaRng::from_seed(shred_seed);
let mut rng = get_seeded_rng(/*leader:*/ &self.pubkey, shred);
let index = self.weighted_shuffle.first(&mut rng)?;
self.nodes[index].contact_info()
}
Expand Down Expand Up @@ -187,7 +186,6 @@ impl ClusterNodes<RetransmitStage> {
shred: &ShredId,
fanout: usize,
) -> Result<RetransmitPeers, Error> {
let shred_seed = shred.seed(slot_leader);
let mut weighted_shuffle = self.weighted_shuffle.clone();
// Exclude slot leader from list of nodes.
if slot_leader == &self.pubkey {
Expand All @@ -200,7 +198,7 @@ impl ClusterNodes<RetransmitStage> {
weighted_shuffle.remove_index(*index);
}
let mut addrs = HashMap::<SocketAddr, Pubkey>::with_capacity(self.nodes.len());
let mut rng = ChaChaRng::from_seed(shred_seed);
let mut rng = get_seeded_rng(slot_leader, shred);
let protocol = get_broadcast_protocol(shred);
let nodes: Vec<_> = weighted_shuffle
.shuffle(&mut rng)
Expand Down Expand Up @@ -233,6 +231,43 @@ impl ClusterNodes<RetransmitStage> {
addrs,
})
}

// Returns the parent node in the turbine broadcast tree.
// Returns None if the node is the root of the tree or is not staked.
#[allow(unused)]
fn get_retransmit_parent(
&self,
leader: &Pubkey,
shred: &ShredId,
fanout: usize,
) -> Result<Option<Pubkey>, Error> {
// Exclude slot leader from list of nodes.
if leader == &self.pubkey {
return Err(Error::Loopback {
leader: *leader,
shred: *shred,
});
}
// Unstaked nodes' position in the turbine tree is not deterministic
// and depends on gossip propagation of contact-infos. Therefore, if
// this node is not staked return None.
if self.nodes[self.index[&self.pubkey]].stake == 0 {
return Ok(None);
}
let mut weighted_shuffle = self.weighted_shuffle.clone();
if let Some(index) = self.index.get(leader).copied() {
weighted_shuffle.remove_index(index);
}
let mut rng = get_seeded_rng(leader, shred);
// Only need shuffled nodes until this node itself.
let nodes: Vec<_> = weighted_shuffle
.shuffle(&mut rng)
.map(|index| &self.nodes[index])
.take_while(|node| node.pubkey() != self.pubkey)
.collect();
let parent = get_retransmit_parent(fanout, nodes.len(), &nodes);
Ok(parent.map(Node::pubkey))
}
}

pub fn new_cluster_nodes<T: 'static>(
Expand Down Expand Up @@ -296,6 +331,11 @@ fn get_nodes(cluster_info: &ClusterInfo, stakes: &HashMap<Pubkey, u64>) -> Vec<N
.collect()
}

fn get_seeded_rng(leader: &Pubkey, shred: &ShredId) -> ChaChaRng {
let seed = shred.seed(leader);
ChaChaRng::from_seed(seed)
}

// root : [0]
// 1st layer: [1, 2, ..., fanout]
// 2nd layer: [[fanout + 1, ..., fanout * 2],
Expand Down Expand Up @@ -327,6 +367,21 @@ fn get_retransmit_peers<T: Copy>(
.copied()
}

// Returns the parent node in the turbine broadcast tree.
// Returns None if the node is the root of the tree.
fn get_retransmit_parent<T: Copy>(
fanout: usize,
index: usize, // Local node's index within the nodes slice.
nodes: &[T],
) -> Option<T> {
// Node's index within its neighborhood.
let offset = index.saturating_sub(1) % fanout;
let index = index.checked_sub(1)? / fanout;
let index = index - index.saturating_sub(1) % fanout;
let index = if index == 0 { index } else { index + offset };
nodes.get(index).copied()
}

impl<T> ClusterNodesCache<T> {
pub fn new(
// Capacity of underlying LRU-cache in terms of number of epochs.
Expand Down Expand Up @@ -589,10 +644,36 @@ mod tests {
}
}

fn check_retransmit_nodes(fanout: usize, nodes: &[usize], peers: Vec<Vec<usize>>) {
let index: HashMap<_, _> = nodes
.iter()
.copied()
.enumerate()
.map(|(k, node)| (node, k))
.collect();
let offset = peers.len();
// Root node's parent is None.
assert_eq!(get_retransmit_parent(fanout, 0, nodes), None);
for (k, peers) in peers.into_iter().enumerate() {
assert_eq!(
get_retransmit_peers(fanout, k, nodes).collect::<Vec<_>>(),
peers
);
let parent = Some(nodes[k]);
for peer in peers {
assert_eq!(get_retransmit_parent(fanout, index[&peer], nodes), parent);
}
}
// Remaining nodes have no children.
for k in offset..=nodes.len() {
assert_eq!(get_retransmit_peers(fanout, k, nodes).next(), None);
}
}

#[test]
fn test_get_retransmit_peers() {
fn test_get_retransmit_nodes() {
// fanout 2
let index = vec![
let nodes = [
7, // root
6, 10, // 1st layer
// 2nd layer
Expand Down Expand Up @@ -620,16 +701,9 @@ mod tests {
vec![16, 9],
vec![8],
];
for (k, peers) in peers.into_iter().enumerate() {
let retransmit_peers = get_retransmit_peers(/*fanout:*/ 2, k, &index);
assert_eq!(retransmit_peers.collect::<Vec<_>>(), peers);
}
for k in 10..=index.len() {
let mut retransmit_peers = get_retransmit_peers(/*fanout:*/ 2, k, &index);
assert_eq!(retransmit_peers.next(), None);
}
check_retransmit_nodes(/*fanout:*/ 2, &nodes, peers);
// fanout 3
let index = vec![
let nodes = [
19, // root
14, 15, 28, // 1st layer
// 2nd layer
Expand Down Expand Up @@ -661,13 +735,52 @@ mod tests {
vec![24, 32],
vec![34],
];
for (k, peers) in peers.into_iter().enumerate() {
let retransmit_peers = get_retransmit_peers(/*fanout:*/ 3, k, &index);
assert_eq!(retransmit_peers.collect::<Vec<_>>(), peers);
}
for k in 13..=index.len() {
let mut retransmit_peers = get_retransmit_peers(/*fanout:*/ 3, k, &index);
assert_eq!(retransmit_peers.next(), None);
}
check_retransmit_nodes(/*fanout:*/ 3, &nodes, peers);
let nodes = [
5, // root
34, 52, 8, // 1st layer
// 2nd layar
44, 18, 2, // 1st neigborhood
42, 47, 46, // 2nd
11, 26, 28, // 3rd
// 3rd layer
53, 23, 37, // 1st neighborhood
40, 13, 7, // 2nd
50, 35, 22, // 3rd
3, 27, 31, // 4th
10, 48, 15, // 5th
19, 6, 30, // 6th
36, 45, 1, // 7th
38, 12, 17, // 8th
4, 32, 16, // 9th
// 4th layer
41, 49, 24, // 1st neighborhood
14, 9, 0, // 2nd
29, 21, 39, // 3rd
43, 51, 33, // 4th
25, 20, // 5th
];
let peers = vec![
vec![34, 52, 8],
vec![44, 42, 11],
vec![18, 47, 26],
vec![2, 46, 28],
vec![53, 40, 50],
vec![23, 13, 35],
vec![37, 7, 22],
vec![3, 10, 19],
vec![27, 48, 6],
vec![31, 15, 30],
vec![36, 38, 4],
vec![45, 12, 32],
vec![1, 17, 16],
vec![41, 14, 29],
vec![49, 9, 21],
vec![24, 0, 39],
vec![43, 25],
vec![51, 20],
vec![33],
];
check_retransmit_nodes(/*fanout:*/ 3, &nodes, peers);
}
}

0 comments on commit 4be7123

Please sign in to comment.