diff --git a/core/src/next_leader.rs b/core/src/next_leader.rs index 024f31f0f0adb7..7e77ecd869e4a1 100644 --- a/core/src/next_leader.rs +++ b/core/src/next_leader.rs @@ -1,10 +1,38 @@ use { + itertools::Itertools, solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}, solana_poh::poh_recorder::PohRecorder, solana_sdk::{clock::FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, pubkey::Pubkey}, std::{net::SocketAddr, sync::RwLock}, }; +/// Returns a list of tpu vote sockets for the leaders of the next N fanout +/// slots. Leaders and sockets are deduped. +pub(crate) fn upcoming_leader_tpu_vote_sockets( + cluster_info: &ClusterInfo, + poh_recorder: &RwLock, + fanout_slots: u64, +) -> Vec { + let upcoming_leaders = { + let poh_recorder = poh_recorder.read().unwrap(); + (1..=fanout_slots) + .filter_map(|n_slots| poh_recorder.leader_after_n_slots(n_slots)) + .collect_vec() + }; + + upcoming_leaders + .into_iter() + .dedup() + .filter_map(|leader_pubkey| { + cluster_info + .lookup_contact_info(&leader_pubkey, ContactInfo::tpu_vote)? + .ok() + }) + // dedup again since leaders could potentially share the same tpu vote socket + .dedup() + .collect() +} + pub(crate) fn next_leader_tpu_vote( cluster_info: &ClusterInfo, poh_recorder: &RwLock, diff --git a/core/src/voting_service.rs b/core/src/voting_service.rs index 31ccf5c6885ad5..14443ab9c7947c 100644 --- a/core/src/voting_service.rs +++ b/core/src/voting_service.rs @@ -1,13 +1,16 @@ use { crate::{ consensus::tower_storage::{SavedTowerVersions, TowerStorage}, - next_leader::next_leader_tpu_vote, + next_leader::upcoming_leader_tpu_vote_sockets, }, crossbeam_channel::Receiver, solana_gossip::cluster_info::ClusterInfo, solana_measure::measure::Measure, solana_poh::poh_recorder::PohRecorder, - solana_sdk::{clock::Slot, transaction::Transaction}, + solana_sdk::{ + clock::{Slot, FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET}, + transaction::Transaction, + }, std::{ sync::{Arc, RwLock}, thread::{self, Builder, JoinHandle}, @@ -78,12 +81,25 @@ impl VotingService { trace!("{measure}"); } - let _ = cluster_info.send_transaction( - vote_op.tx(), - next_leader_tpu_vote(cluster_info, poh_recorder) - .map(|(_pubkey, target_addr)| target_addr), + // Attempt to send our vote transaction to the leaders for the next few slots + const UPCOMING_LEADER_FANOUT_SLOTS: u64 = FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET; + #[cfg(test)] + static_assertions::const_assert_eq!(UPCOMING_LEADER_FANOUT_SLOTS, 2); + let upcoming_leader_sockets = upcoming_leader_tpu_vote_sockets( + cluster_info, + poh_recorder, + UPCOMING_LEADER_FANOUT_SLOTS, ); + if !upcoming_leader_sockets.is_empty() { + for tpu_vote_socket in upcoming_leader_sockets { + let _ = cluster_info.send_transaction(vote_op.tx(), Some(tpu_vote_socket)); + } + } else { + // Send to our own tpu vote socket if we cannot find a leader to send to + let _ = cluster_info.send_transaction(vote_op.tx(), None); + } + match vote_op { VoteOp::PushVote { tx, tower_slots, ..