From cefd3f13a2b8c1f3f8332ebeea3d7bf44d547a9a Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Fri, 22 Nov 2024 23:01:57 +1100 Subject: [PATCH] v2.1: Add not unique leader discovery (backport of #3546) (#3658) Add not unique leader discovery (#3546) * rename get_leader_sockets to get_unique_leader_sockets * Add methods for getting not unique tpu sockets (cherry picked from commit dffcdb4fbcd9704639c7d36dc6f558b037ae0c6f) Co-authored-by: kirill lykov --- tpu-client/src/nonblocking/tpu_client.rs | 47 +++++++++++++++++++----- tpu-client/src/tpu_client.rs | 2 +- 2 files changed, 38 insertions(+), 11 deletions(-) diff --git a/tpu-client/src/nonblocking/tpu_client.rs b/tpu-client/src/nonblocking/tpu_client.rs index 1b63059a6b8c86..75886b250bd2fd 100644 --- a/tpu-client/src/nonblocking/tpu_client.rs +++ b/tpu-client/src/nonblocking/tpu_client.rs @@ -19,7 +19,7 @@ use { response::{RpcContactInfo, SlotUpdate}, }, solana_sdk::{ - clock::{Slot, DEFAULT_MS_PER_SLOT}, + clock::{Slot, DEFAULT_MS_PER_SLOT, NUM_CONSECUTIVE_LEADER_SLOTS}, commitment_config::CommitmentConfig, epoch_info::EpochInfo, pubkey::Pubkey, @@ -122,24 +122,43 @@ impl LeaderTpuCache { ) } - // Get the TPU sockets for the current leader and upcoming leaders according to fanout size + // Get the TPU sockets for the current leader and upcoming *unique* leaders according to fanout size. + fn get_unique_leader_sockets( + &self, + estimated_current_slot: Slot, + fanout_slots: u64, + ) -> Vec { + let all_leader_sockets = self.get_leader_sockets(estimated_current_slot, fanout_slots); + + let mut unique_sockets = Vec::new(); + let mut seen = HashSet::new(); + + for socket in all_leader_sockets { + if seen.insert(socket) { + unique_sockets.push(socket); + } + } + + unique_sockets + } + + // Get the TPU sockets for the current leader and upcoming leaders according to fanout size. fn get_leader_sockets( &self, estimated_current_slot: Slot, fanout_slots: u64, ) -> Vec { - let mut leader_set = HashSet::new(); let mut leader_sockets = Vec::new(); // `first_slot` might have been advanced since caller last read the `estimated_current_slot` // value. Take the greater of the two values to ensure we are reading from the latest // leader schedule. let current_slot = std::cmp::max(estimated_current_slot, self.first_slot); - for leader_slot in current_slot..current_slot + fanout_slots { + for leader_slot in (current_slot..current_slot + fanout_slots) + .step_by(NUM_CONSECUTIVE_LEADER_SLOTS as usize) + { if let Some(leader) = self.get_slot_leader(leader_slot) { if let Some(tpu_socket) = self.leader_tpu_map.get(leader) { - if leader_set.insert(*leader) { - leader_sockets.push(*tpu_socket); - } + leader_sockets.push(*tpu_socket); } else { // The leader is probably delinquent trace!("TPU not available for leader {}", leader); @@ -411,7 +430,7 @@ where ) -> TransportResult<()> { let leaders = self .leader_tpu_service - .leader_tpu_sockets(self.fanout_slots); + .unique_leader_tpu_sockets(self.fanout_slots); let futures = leaders .iter() .map(|addr| { @@ -455,7 +474,7 @@ where ) -> TransportResult<()> { let leaders = self .leader_tpu_service - .leader_tpu_sockets(self.fanout_slots); + .unique_leader_tpu_sockets(self.fanout_slots); let futures = leaders .iter() .map(|addr| { @@ -568,7 +587,7 @@ where let wire_transaction = serialize(transaction).unwrap(); let leaders = self .leader_tpu_service - .leader_tpu_sockets(self.fanout_slots); + .unique_leader_tpu_sockets(self.fanout_slots); futures.extend(send_wire_transaction_futures( &progress_bar, &progress, @@ -803,6 +822,14 @@ impl LeaderTpuService { self.recent_slots.estimated_current_slot() } + pub fn unique_leader_tpu_sockets(&self, fanout_slots: u64) -> Vec { + let current_slot = self.recent_slots.estimated_current_slot(); + self.leader_tpu_cache + .read() + .unwrap() + .get_unique_leader_sockets(current_slot, fanout_slots) + } + pub fn leader_tpu_sockets(&self, fanout_slots: u64) -> Vec { let current_slot = self.recent_slots.estimated_current_slot(); self.leader_tpu_cache diff --git a/tpu-client/src/tpu_client.rs b/tpu-client/src/tpu_client.rs index 469b4105fe27ec..3fecab0941771b 100644 --- a/tpu-client/src/tpu_client.rs +++ b/tpu-client/src/tpu_client.rs @@ -119,7 +119,7 @@ where let leaders = self .tpu_client .get_leader_tpu_service() - .leader_tpu_sockets(self.tpu_client.get_fanout_slots()); + .unique_leader_tpu_sockets(self.tpu_client.get_fanout_slots()); for tpu_address in &leaders { let cache = self.tpu_client.get_connection_cache();