diff --git a/streamer/src/socket.rs b/streamer/src/socket.rs index fe86f84319d4d4..d9bd0966e3ae3a 100644 --- a/streamer/src/socket.rs +++ b/streamer/src/socket.rs @@ -16,6 +16,7 @@ impl SocketAddrSpace { } /// Returns true if the IP address is valid. + #[inline] #[must_use] pub fn check(&self, addr: &SocketAddr) -> bool { if matches!(self, SocketAddrSpace::Unspecified) { diff --git a/turbine/benches/cluster_nodes.rs b/turbine/benches/cluster_nodes.rs index a36ac648b1c757..08862898196807 100644 --- a/turbine/benches/cluster_nodes.rs +++ b/turbine/benches/cluster_nodes.rs @@ -7,6 +7,7 @@ use { solana_gossip::contact_info::ContactInfo, solana_ledger::shred::{Shred, ShredFlags}, solana_sdk::{clock::Slot, genesis_config::ClusterType, pubkey::Pubkey}, + solana_streamer::socket::SocketAddrSpace, solana_turbine::{ cluster_nodes::{make_test_cluster, new_cluster_nodes, ClusterNodes}, retransmit_stage::RetransmitStage, @@ -45,8 +46,12 @@ fn get_retransmit_peers_deterministic( 0, 0, ); - let _retransmit_peers = - cluster_nodes.get_retransmit_addrs(slot_leader, &shred.id(), /*fanout:*/ 200); + let _retransmit_peers = cluster_nodes.get_retransmit_addrs( + slot_leader, + &shred.id(), + 200, // fanout + &SocketAddrSpace::Unspecified, + ); } } diff --git a/turbine/src/cluster_nodes.rs b/turbine/src/cluster_nodes.rs index 59b53e50544ae4..850b90cbb97232 100644 --- a/turbine/src/cluster_nodes.rs +++ b/turbine/src/cluster_nodes.rs @@ -165,6 +165,7 @@ impl ClusterNodes { slot_leader: &Pubkey, shred: &ShredId, fanout: usize, + socket_addr_space: &SocketAddrSpace, ) -> Result<(/*root_distance:*/ usize, Vec), Error> { let mut weighted_shuffle = self.weighted_shuffle.clone(); // Exclude slot leader from list of nodes. @@ -195,7 +196,10 @@ impl ClusterNodes { }; let (index, peers) = get_retransmit_peers(fanout, |(node, _)| node.pubkey() == self.pubkey, nodes); - let peers = peers.filter_map(|(_, addr)| addr).collect(); + let peers = peers + .filter_map(|(_, addr)| addr) + .filter(|addr| socket_addr_space.check(addr)) + .collect(); let root_distance = get_root_distance(index, fanout); Ok((root_distance, peers)) } diff --git a/turbine/src/retransmit_stage.rs b/turbine/src/retransmit_stage.rs index e820851d03e4ba..db9d1f96662a16 100644 --- a/turbine/src/retransmit_stage.rs +++ b/turbine/src/retransmit_stage.rs @@ -328,12 +328,12 @@ fn retransmit_shred( ) -> Result<(/*root_distance:*/ usize, /*num_nodes:*/ usize), Error> { let mut compute_turbine_peers = Measure::start("turbine_start"); let data_plane_fanout = cluster_nodes::get_data_plane_fanout(key.slot(), root_bank); - let (root_distance, addrs) = - cluster_nodes.get_retransmit_addrs(slot_leader, key, data_plane_fanout)?; - let addrs: Vec<_> = addrs - .into_iter() - .filter(|addr| socket_addr_space.check(addr)) - .collect(); + let (root_distance, addrs) = cluster_nodes.get_retransmit_addrs( + slot_leader, + key, + data_plane_fanout, + socket_addr_space, + )?; compute_turbine_peers.stop(); stats .compute_turbine_peers_total