Skip to content

Commit

Permalink
adds api to obtain the parent node in the turbine retransmit tree
Browse files Browse the repository at this point in the history
Following commits will use this api to check retransmitter's signature
on incoming shreds.
  • Loading branch information
behzadnouri committed Mar 7, 2024
1 parent 85cfe23 commit 345348c
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 22 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions turbine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ tokio = { workspace = true }
assert_matches = { workspace = true }
solana-logger = { workspace = true }
solana-runtime = { workspace = true, features = ["dev-context-only-utils"] }
test-case = { workspace = true }

[[bench]]
name = "cluster_info"
Expand Down
199 changes: 177 additions & 22 deletions turbine/src/cluster_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,7 @@ impl ClusterNodes<BroadcastStage> {
}

pub(crate) fn get_broadcast_peer(&self, shred: &ShredId) -> Option<&ContactInfo> {
let shred_seed = shred.seed(&self.pubkey);
let mut rng = ChaChaRng::from_seed(shred_seed);
let mut rng = get_seeded_rng(/*leader:*/ &self.pubkey, shred);
let index = self.weighted_shuffle.first(&mut rng)?;
self.nodes[index].contact_info()
}
Expand Down Expand Up @@ -187,7 +186,6 @@ impl ClusterNodes<RetransmitStage> {
shred: &ShredId,
fanout: usize,
) -> Result<RetransmitPeers, Error> {
let shred_seed = shred.seed(slot_leader);
let mut weighted_shuffle = self.weighted_shuffle.clone();
// Exclude slot leader from list of nodes.
if slot_leader == &self.pubkey {
Expand All @@ -200,7 +198,7 @@ impl ClusterNodes<RetransmitStage> {
weighted_shuffle.remove_index(*index);
}
let mut addrs = HashMap::<SocketAddr, Pubkey>::with_capacity(self.nodes.len());
let mut rng = ChaChaRng::from_seed(shred_seed);
let mut rng = get_seeded_rng(slot_leader, shred);
let protocol = get_broadcast_protocol(shred);
let nodes: Vec<_> = weighted_shuffle
.shuffle(&mut rng)
Expand Down Expand Up @@ -233,6 +231,43 @@ impl ClusterNodes<RetransmitStage> {
addrs,
})
}

// Returns the parent node in the turbine broadcast tree.
// Returns None if the node is the root of the tree or if it is not staked.
#[allow(unused)]
fn get_retransmit_parent(
&self,
leader: &Pubkey,
shred: &ShredId,
fanout: usize,
) -> Result<Option<Pubkey>, Error> {
// Exclude slot leader from list of nodes.
if leader == &self.pubkey {
return Err(Error::Loopback {
leader: *leader,
shred: *shred,
});
}
// Unstaked nodes' position in the turbine tree is not deterministic
// and depends on gossip propagation of contact-infos. Therefore, if
// this node is not staked return None.
if self.nodes[self.index[&self.pubkey]].stake == 0 {
return Ok(None);
}
let mut weighted_shuffle = self.weighted_shuffle.clone();
if let Some(index) = self.index.get(leader).copied() {
weighted_shuffle.remove_index(index);
}
let mut rng = get_seeded_rng(leader, shred);
// Only need shuffled nodes until this node itself.
let nodes: Vec<_> = weighted_shuffle
.shuffle(&mut rng)
.map(|index| &self.nodes[index])
.take_while(|node| node.pubkey() != self.pubkey)
.collect();
let parent = get_retransmit_parent(fanout, nodes.len(), &nodes);
Ok(parent.map(Node::pubkey))
}
}

pub fn new_cluster_nodes<T: 'static>(
Expand Down Expand Up @@ -296,6 +331,11 @@ fn get_nodes(cluster_info: &ClusterInfo, stakes: &HashMap<Pubkey, u64>) -> Vec<N
.collect()
}

fn get_seeded_rng(leader: &Pubkey, shred: &ShredId) -> ChaChaRng {
let seed = shred.seed(leader);
ChaChaRng::from_seed(seed)
}

// root : [0]
// 1st layer: [1, 2, ..., fanout]
// 2nd layer: [[fanout + 1, ..., fanout * 2],
Expand Down Expand Up @@ -327,6 +367,21 @@ fn get_retransmit_peers<T: Copy>(
.copied()
}

// Returns the parent node in the turbine broadcast tree.
// Returns None if the node is the root of the tree.
fn get_retransmit_parent<T: Copy>(
fanout: usize,
index: usize, // Local node's index within the nodes slice.
nodes: &[T],
) -> Option<T> {
// Node's index within its neighborhood.
let offset = index.saturating_sub(1) % fanout;
let index = index.checked_sub(1)? / fanout;
let index = index - index.saturating_sub(1) % fanout;
let index = if index == 0 { index } else { index + offset };
nodes.get(index).copied()
}

impl<T> ClusterNodesCache<T> {
pub fn new(
// Capacity of underlying LRU-cache in terms of number of epochs.
Expand Down Expand Up @@ -516,7 +571,11 @@ pub fn check_feature_activation(feature: &Pubkey, shred_slot: Slot, root_bank: &

#[cfg(test)]
mod tests {
use super::*;
use {
super::*,
std::{fmt::Debug, hash::Hash},
test_case::test_case,
};

#[test]
fn test_cluster_nodes_retransmit() {
Expand Down Expand Up @@ -589,10 +648,42 @@ mod tests {
}
}

// Checks (1) computed retransmit children against expected children and
// (2) computed parent of each child against the expected parent.
fn check_retransmit_nodes<T>(fanout: usize, nodes: &[T], peers: Vec<Vec<T>>)
where
T: Copy + Eq + PartialEq + Debug + Hash,
{
// Map node identities to their index within the shuffled tree.
let index: HashMap<_, _> = nodes
.iter()
.copied()
.enumerate()
.map(|(k, node)| (node, k))
.collect();
let offset = peers.len();
// Root node's parent is None.
assert_eq!(get_retransmit_parent(fanout, /*index:*/ 0, nodes), None);
for (k, peers) in peers.into_iter().enumerate() {
assert_eq!(
get_retransmit_peers(fanout, k, nodes).collect::<Vec<_>>(),
peers
);
let parent = Some(nodes[k]);
for peer in peers {
assert_eq!(get_retransmit_parent(fanout, index[&peer], nodes), parent);
}
}
// Remaining nodes have no children.
for k in offset..=nodes.len() {
assert_eq!(get_retransmit_peers(fanout, k, nodes).next(), None);
}
}

#[test]
fn test_get_retransmit_peers() {
fn test_get_retransmit_nodes() {
// fanout 2
let index = vec![
let nodes = [
7, // root
6, 10, // 1st layer
// 2nd layer
Expand Down Expand Up @@ -620,16 +711,9 @@ mod tests {
vec![16, 9],
vec![8],
];
for (k, peers) in peers.into_iter().enumerate() {
let retransmit_peers = get_retransmit_peers(/*fanout:*/ 2, k, &index);
assert_eq!(retransmit_peers.collect::<Vec<_>>(), peers);
}
for k in 10..=index.len() {
let mut retransmit_peers = get_retransmit_peers(/*fanout:*/ 2, k, &index);
assert_eq!(retransmit_peers.next(), None);
}
check_retransmit_nodes(/*fanout:*/ 2, &nodes, peers);
// fanout 3
let index = vec![
let nodes = [
19, // root
14, 15, 28, // 1st layer
// 2nd layer
Expand Down Expand Up @@ -661,13 +745,84 @@ mod tests {
vec![24, 32],
vec![34],
];
for (k, peers) in peers.into_iter().enumerate() {
let retransmit_peers = get_retransmit_peers(/*fanout:*/ 3, k, &index);
assert_eq!(retransmit_peers.collect::<Vec<_>>(), peers);
check_retransmit_nodes(/*fanout:*/ 3, &nodes, peers);
let nodes = [
5, // root
34, 52, 8, // 1st layer
// 2nd layar
44, 18, 2, // 1st neigborhood
42, 47, 46, // 2nd
11, 26, 28, // 3rd
// 3rd layer
53, 23, 37, // 1st neighborhood
40, 13, 7, // 2nd
50, 35, 22, // 3rd
3, 27, 31, // 4th
10, 48, 15, // 5th
19, 6, 30, // 6th
36, 45, 1, // 7th
38, 12, 17, // 8th
4, 32, 16, // 9th
// 4th layer
41, 49, 24, // 1st neighborhood
14, 9, 0, // 2nd
29, 21, 39, // 3rd
43, 51, 33, // 4th
25, 20, // 5th
];
let peers = vec![
vec![34, 52, 8],
vec![44, 42, 11],
vec![18, 47, 26],
vec![2, 46, 28],
vec![53, 40, 50],
vec![23, 13, 35],
vec![37, 7, 22],
vec![3, 10, 19],
vec![27, 48, 6],
vec![31, 15, 30],
vec![36, 38, 4],
vec![45, 12, 32],
vec![1, 17, 16],
vec![41, 14, 29],
vec![49, 9, 21],
vec![24, 0, 39],
vec![43, 25],
vec![51, 20],
vec![33],
];
check_retransmit_nodes(/*fanout:*/ 3, &nodes, peers);
}

#[test_case(2, 1_347)]
#[test_case(3, 1_359)]
#[test_case(4, 4_296)]
#[test_case(5, 3_925)]
#[test_case(6, 8_778)]
#[test_case(7, 9_879)]
fn test_get_retransmit_nodes_round_trip(fanout: usize, size: usize) {
let mut rng = rand::thread_rng();
let mut nodes: Vec<_> = (0..size).collect();
nodes.shuffle(&mut rng);
// Map node identities to their index within the shuffled tree.
let index: HashMap<_, _> = nodes
.iter()
.copied()
.enumerate()
.map(|(k, node)| (node, k))
.collect();
// Root node's parent is None.
assert_eq!(get_retransmit_parent(fanout, /*index:*/ 0, &nodes), None);
for k in 1..size {
let parent = get_retransmit_parent(fanout, k, &nodes).unwrap();
let mut peers = get_retransmit_peers(fanout, index[&parent], &nodes);
assert_eq!(peers.find(|&peer| peer == nodes[k]), Some(nodes[k]));
}
for k in 13..=index.len() {
let mut retransmit_peers = get_retransmit_peers(/*fanout:*/ 3, k, &index);
assert_eq!(retransmit_peers.next(), None);
for k in 0..size {
let parent = Some(nodes[k]);
for peer in get_retransmit_peers(fanout, k, &nodes) {
assert_eq!(get_retransmit_parent(fanout, index[&peer], &nodes), parent);
}
}
}
}

0 comments on commit 345348c

Please sign in to comment.