Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adds api to obtain the parent node in the turbine retransmit tree #115

Merged
merged 1 commit into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions turbine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ tokio = { workspace = true }
assert_matches = { workspace = true }
solana-logger = { workspace = true }
solana-runtime = { workspace = true, features = ["dev-context-only-utils"] }
test-case = { workspace = true }

[[bench]]
name = "cluster_info"
Expand Down
199 changes: 177 additions & 22 deletions turbine/src/cluster_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,7 @@ impl ClusterNodes<BroadcastStage> {
}

pub(crate) fn get_broadcast_peer(&self, shred: &ShredId) -> Option<&ContactInfo> {
let shred_seed = shred.seed(&self.pubkey);
let mut rng = ChaChaRng::from_seed(shred_seed);
let mut rng = get_seeded_rng(/*leader:*/ &self.pubkey, shred);
let index = self.weighted_shuffle.first(&mut rng)?;
self.nodes[index].contact_info()
}
Expand Down Expand Up @@ -187,7 +186,6 @@ impl ClusterNodes<RetransmitStage> {
shred: &ShredId,
fanout: usize,
) -> Result<RetransmitPeers, Error> {
let shred_seed = shred.seed(slot_leader);
let mut weighted_shuffle = self.weighted_shuffle.clone();
// Exclude slot leader from list of nodes.
if slot_leader == &self.pubkey {
Expand All @@ -200,7 +198,7 @@ impl ClusterNodes<RetransmitStage> {
weighted_shuffle.remove_index(*index);
}
let mut addrs = HashMap::<SocketAddr, Pubkey>::with_capacity(self.nodes.len());
let mut rng = ChaChaRng::from_seed(shred_seed);
let mut rng = get_seeded_rng(slot_leader, shred);
let protocol = get_broadcast_protocol(shred);
let nodes: Vec<_> = weighted_shuffle
.shuffle(&mut rng)
Expand Down Expand Up @@ -233,6 +231,43 @@ impl ClusterNodes<RetransmitStage> {
addrs,
})
}

// Returns the parent node in the turbine broadcast tree.
// Returns None if the node is the root of the tree or if it is not staked.
#[allow(unused)]
fn get_retransmit_parent(
&self,
leader: &Pubkey,
shred: &ShredId,
fanout: usize,
) -> Result<Option<Pubkey>, Error> {
// Exclude slot leader from list of nodes.
if leader == &self.pubkey {
return Err(Error::Loopback {
leader: *leader,
shred: *shred,
});
}
// Unstaked nodes' position in the turbine tree is not deterministic
// and depends on gossip propagation of contact-infos. Therefore, if
// this node is not staked return None.
if self.nodes[self.index[&self.pubkey]].stake == 0 {
return Ok(None);
bw-solana marked this conversation as resolved.
Show resolved Hide resolved
}
let mut weighted_shuffle = self.weighted_shuffle.clone();
if let Some(index) = self.index.get(leader).copied() {
weighted_shuffle.remove_index(index);
}
let mut rng = get_seeded_rng(leader, shred);
// Only need shuffled nodes until this node itself.
let nodes: Vec<_> = weighted_shuffle
.shuffle(&mut rng)
.map(|index| &self.nodes[index])
.take_while(|node| node.pubkey() != self.pubkey)
.collect();
let parent = get_retransmit_parent(fanout, nodes.len(), &nodes);
Ok(parent.map(Node::pubkey))
}
}

pub fn new_cluster_nodes<T: 'static>(
Expand Down Expand Up @@ -296,6 +331,11 @@ fn get_nodes(cluster_info: &ClusterInfo, stakes: &HashMap<Pubkey, u64>) -> Vec<N
.collect()
}

fn get_seeded_rng(leader: &Pubkey, shred: &ShredId) -> ChaChaRng {
let seed = shred.seed(leader);
ChaChaRng::from_seed(seed)
}

// root : [0]
// 1st layer: [1, 2, ..., fanout]
// 2nd layer: [[fanout + 1, ..., fanout * 2],
Expand Down Expand Up @@ -327,6 +367,21 @@ fn get_retransmit_peers<T: Copy>(
.copied()
}

// Returns the parent node in the turbine broadcast tree.
// Returns None if the node is the root of the tree.
fn get_retransmit_parent<T: Copy>(
fanout: usize,
index: usize, // Local node's index within the nodes slice.
nodes: &[T],
) -> Option<T> {
// Node's index within its neighborhood.
let offset = index.saturating_sub(1) % fanout;
let index = index.checked_sub(1)? / fanout;
let index = index - index.saturating_sub(1) % fanout;
let index = if index == 0 { index } else { index + offset };
nodes.get(index).copied()
}

impl<T> ClusterNodesCache<T> {
pub fn new(
// Capacity of underlying LRU-cache in terms of number of epochs.
Expand Down Expand Up @@ -516,7 +571,11 @@ pub fn check_feature_activation(feature: &Pubkey, shred_slot: Slot, root_bank: &

#[cfg(test)]
mod tests {
use super::*;
use {
super::*,
std::{fmt::Debug, hash::Hash},
test_case::test_case,
};

#[test]
fn test_cluster_nodes_retransmit() {
Expand Down Expand Up @@ -589,10 +648,42 @@ mod tests {
}
}

bw-solana marked this conversation as resolved.
Show resolved Hide resolved
// Checks (1) computed retransmit children against expected children and
// (2) computed parent of each child against the expected parent.
fn check_retransmit_nodes<T>(fanout: usize, nodes: &[T], peers: Vec<Vec<T>>)
where
T: Copy + Eq + PartialEq + Debug + Hash,
{
// Map node identities to their index within the shuffled tree.
let index: HashMap<_, _> = nodes

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be helpful to name this something like node_to_turbine_index

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a comment.

.iter()
.copied()
.enumerate()
.map(|(k, node)| (node, k))
.collect();
let offset = peers.len();
// Root node's parent is None.
assert_eq!(get_retransmit_parent(fanout, /*index:*/ 0, nodes), None);
for (k, peers) in peers.into_iter().enumerate() {
assert_eq!(
get_retransmit_peers(fanout, k, nodes).collect::<Vec<_>>(),
peers
);
let parent = Some(nodes[k]);
for peer in peers {
assert_eq!(get_retransmit_parent(fanout, index[&peer], nodes), parent);
}
}
// Remaining nodes have no children.
for k in offset..=nodes.len() {
assert_eq!(get_retransmit_peers(fanout, k, nodes).next(), None);
}
}

#[test]
fn test_get_retransmit_peers() {
fn test_get_retransmit_nodes() {
// fanout 2
let index = vec![
let nodes = [
7, // root
6, 10, // 1st layer
// 2nd layer
Expand Down Expand Up @@ -620,16 +711,9 @@ mod tests {
vec![16, 9],
vec![8],
];
for (k, peers) in peers.into_iter().enumerate() {
let retransmit_peers = get_retransmit_peers(/*fanout:*/ 2, k, &index);
assert_eq!(retransmit_peers.collect::<Vec<_>>(), peers);
}
for k in 10..=index.len() {
let mut retransmit_peers = get_retransmit_peers(/*fanout:*/ 2, k, &index);
assert_eq!(retransmit_peers.next(), None);
}
check_retransmit_nodes(/*fanout:*/ 2, &nodes, peers);
// fanout 3
let index = vec![
let nodes = [
19, // root
14, 15, 28, // 1st layer
// 2nd layer
Expand Down Expand Up @@ -661,13 +745,84 @@ mod tests {
vec![24, 32],
vec![34],
];
for (k, peers) in peers.into_iter().enumerate() {
let retransmit_peers = get_retransmit_peers(/*fanout:*/ 3, k, &index);
assert_eq!(retransmit_peers.collect::<Vec<_>>(), peers);
check_retransmit_nodes(/*fanout:*/ 3, &nodes, peers);
let nodes = [
5, // root
34, 52, 8, // 1st layer
// 2nd layar
44, 18, 2, // 1st neigborhood
42, 47, 46, // 2nd
11, 26, 28, // 3rd
// 3rd layer
53, 23, 37, // 1st neighborhood
40, 13, 7, // 2nd
50, 35, 22, // 3rd
3, 27, 31, // 4th
10, 48, 15, // 5th
19, 6, 30, // 6th
36, 45, 1, // 7th
38, 12, 17, // 8th
4, 32, 16, // 9th
// 4th layer
41, 49, 24, // 1st neighborhood
14, 9, 0, // 2nd
29, 21, 39, // 3rd
43, 51, 33, // 4th
25, 20, // 5th
];
let peers = vec![
bw-solana marked this conversation as resolved.
Show resolved Hide resolved
vec![34, 52, 8],
vec![44, 42, 11],
vec![18, 47, 26],
vec![2, 46, 28],
vec![53, 40, 50],
vec![23, 13, 35],
vec![37, 7, 22],
vec![3, 10, 19],
vec![27, 48, 6],
vec![31, 15, 30],
vec![36, 38, 4],
vec![45, 12, 32],
vec![1, 17, 16],
vec![41, 14, 29],
vec![49, 9, 21],
vec![24, 0, 39],
vec![43, 25],
vec![51, 20],
vec![33],
];
check_retransmit_nodes(/*fanout:*/ 3, &nodes, peers);
}

#[test_case(2, 1_347)]
#[test_case(3, 1_359)]
#[test_case(4, 4_296)]
#[test_case(5, 3_925)]
#[test_case(6, 8_778)]
#[test_case(7, 9_879)]
fn test_get_retransmit_nodes_round_trip(fanout: usize, size: usize) {
let mut rng = rand::thread_rng();
let mut nodes: Vec<_> = (0..size).collect();
nodes.shuffle(&mut rng);
// Map node identities to their index within the shuffled tree.
let index: HashMap<_, _> = nodes
.iter()
.copied()
.enumerate()
.map(|(k, node)| (node, k))
.collect();
// Root node's parent is None.
assert_eq!(get_retransmit_parent(fanout, /*index:*/ 0, &nodes), None);
for k in 1..size {
let parent = get_retransmit_parent(fanout, k, &nodes).unwrap();
let mut peers = get_retransmit_peers(fanout, index[&parent], &nodes);
assert_eq!(peers.find(|&peer| peer == nodes[k]), Some(nodes[k]));
}
for k in 13..=index.len() {
let mut retransmit_peers = get_retransmit_peers(/*fanout:*/ 3, k, &index);
assert_eq!(retransmit_peers.next(), None);
for k in 0..size {
let parent = Some(nodes[k]);
for peer in get_retransmit_peers(fanout, k, &nodes) {
assert_eq!(get_retransmit_parent(fanout, index[&peer], &nodes), parent);
}
}
}
}
Loading