From 47472a103736ecfcc351a9142403a435003ef293 Mon Sep 17 00:00:00 2001 From: Justin Starry Date: Thu, 15 Aug 2024 08:30:00 +0000 Subject: [PATCH 1/2] fix: send votes to the immediate next leader --- core/src/next_leader.rs | 30 ++++++++++++++++++++++++++++++ core/src/voting_service.rs | 21 ++++++++++++++++----- 2 files changed, 46 insertions(+), 5 deletions(-) diff --git a/core/src/next_leader.rs b/core/src/next_leader.rs index 024f31f0f0adb7..70261531437917 100644 --- a/core/src/next_leader.rs +++ b/core/src/next_leader.rs @@ -1,10 +1,40 @@ use { + itertools::Itertools, solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}, solana_poh::poh_recorder::PohRecorder, solana_sdk::{clock::FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, pubkey::Pubkey}, std::{net::SocketAddr, sync::RwLock}, }; +/// Returns a list of tpu vote sockets for the leaders of the next N fanout +/// slots. Leaders are deduped but the resulting list could have duplicate +/// sockets if two different leaders share the same tpu vote socket. +pub(crate) fn upcoming_leader_tpu_vote_sockets( + cluster_info: &ClusterInfo, + poh_recorder: &RwLock, + fanout_slots: usize, +) -> Vec { + let upcoming_leaders = { + let mut upcoming_leaders = Vec::with_capacity(fanout_slots); + let poh_recorder = poh_recorder.read().unwrap(); + for n_slots in 1..=fanout_slots { + upcoming_leaders.push(poh_recorder.leader_after_n_slots(n_slots as u64)); + } + upcoming_leaders + }; + + upcoming_leaders + .into_iter() + .flatten() + .unique() + .filter_map(|leader_pubkey| { + cluster_info + .lookup_contact_info(&leader_pubkey, ContactInfo::tpu_vote)? + .ok() + }) + .collect() +} + pub(crate) fn next_leader_tpu_vote( cluster_info: &ClusterInfo, poh_recorder: &RwLock, diff --git a/core/src/voting_service.rs b/core/src/voting_service.rs index 31ccf5c6885ad5..dc60ec49b5ce37 100644 --- a/core/src/voting_service.rs +++ b/core/src/voting_service.rs @@ -1,7 +1,7 @@ use { crate::{ consensus::tower_storage::{SavedTowerVersions, TowerStorage}, - next_leader::next_leader_tpu_vote, + next_leader::upcoming_leader_tpu_vote_sockets, }, crossbeam_channel::Receiver, solana_gossip::cluster_info::ClusterInfo, @@ -78,12 +78,23 @@ impl VotingService { trace!("{measure}"); } - let _ = cluster_info.send_transaction( - vote_op.tx(), - next_leader_tpu_vote(cluster_info, poh_recorder) - .map(|(_pubkey, target_addr)| target_addr), + // Attempt to send our vote transaction to the leaders for the next few slots + const UPCOMING_LEADER_FANOUT_SLOTS: usize = 2; + let upcoming_leader_sockets = upcoming_leader_tpu_vote_sockets( + cluster_info, + poh_recorder, + UPCOMING_LEADER_FANOUT_SLOTS, ); + if !upcoming_leader_sockets.is_empty() { + for tpu_vote_socket in upcoming_leader_sockets { + let _ = cluster_info.send_transaction(vote_op.tx(), Some(tpu_vote_socket)); + } + } else { + // Send to our own tpu vote socket if we cannot find a leader to send to + let _ = cluster_info.send_transaction(vote_op.tx(), None); + } + match vote_op { VoteOp::PushVote { tx, tower_slots, .. From 6e918e4a3481dde0923b2abcf3cf6f6a24dbedca Mon Sep 17 00:00:00 2001 From: Justin Starry Date: Thu, 15 Aug 2024 23:50:27 +0000 Subject: [PATCH 2/2] feedback --- core/src/next_leader.rs | 18 ++++++++---------- core/src/voting_service.rs | 9 +++++++-- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/core/src/next_leader.rs b/core/src/next_leader.rs index 70261531437917..7e77ecd869e4a1 100644 --- a/core/src/next_leader.rs +++ b/core/src/next_leader.rs @@ -7,31 +7,29 @@ use { }; /// Returns a list of tpu vote sockets for the leaders of the next N fanout -/// slots. Leaders are deduped but the resulting list could have duplicate -/// sockets if two different leaders share the same tpu vote socket. +/// slots. Leaders and sockets are deduped. pub(crate) fn upcoming_leader_tpu_vote_sockets( cluster_info: &ClusterInfo, poh_recorder: &RwLock, - fanout_slots: usize, + fanout_slots: u64, ) -> Vec { let upcoming_leaders = { - let mut upcoming_leaders = Vec::with_capacity(fanout_slots); let poh_recorder = poh_recorder.read().unwrap(); - for n_slots in 1..=fanout_slots { - upcoming_leaders.push(poh_recorder.leader_after_n_slots(n_slots as u64)); - } - upcoming_leaders + (1..=fanout_slots) + .filter_map(|n_slots| poh_recorder.leader_after_n_slots(n_slots)) + .collect_vec() }; upcoming_leaders .into_iter() - .flatten() - .unique() + .dedup() .filter_map(|leader_pubkey| { cluster_info .lookup_contact_info(&leader_pubkey, ContactInfo::tpu_vote)? .ok() }) + // dedup again since leaders could potentially share the same tpu vote socket + .dedup() .collect() } diff --git a/core/src/voting_service.rs b/core/src/voting_service.rs index dc60ec49b5ce37..14443ab9c7947c 100644 --- a/core/src/voting_service.rs +++ b/core/src/voting_service.rs @@ -7,7 +7,10 @@ use { solana_gossip::cluster_info::ClusterInfo, solana_measure::measure::Measure, solana_poh::poh_recorder::PohRecorder, - solana_sdk::{clock::Slot, transaction::Transaction}, + solana_sdk::{ + clock::{Slot, FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET}, + transaction::Transaction, + }, std::{ sync::{Arc, RwLock}, thread::{self, Builder, JoinHandle}, @@ -79,7 +82,9 @@ impl VotingService { } // Attempt to send our vote transaction to the leaders for the next few slots - const UPCOMING_LEADER_FANOUT_SLOTS: usize = 2; + const UPCOMING_LEADER_FANOUT_SLOTS: u64 = FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET; + #[cfg(test)] + static_assertions::const_assert_eq!(UPCOMING_LEADER_FANOUT_SLOTS, 2); let upcoming_leader_sockets = upcoming_leader_tpu_vote_sockets( cluster_info, poh_recorder,