From ce4a52d8b6081df55a99693b59a5b3520aa220fb Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Thu, 5 Dec 2024 18:26:42 -0800 Subject: [PATCH] TPU Vote using quic -- client side implementation (#3473) * Vote using QUIC Vote using QUIC send vote packet using the rigth sender removed dup declared functions rebase with master QuicServerParams removed remove_tpu_vote first part of sending votes using quic use quic for vote on client side with connection cache add debug messages turn on quic for vote by default for testing * remove unsed import * turn DEFAULT_VOTE_USE_QUIC to false * Minor fixes * addressed some feedback from Behzad and Brennan * fixed one more merge conflicts --- banking-bench/src/main.rs | 15 ++-- core/src/next_leader.rs | 3 +- core/src/replay_stage.rs | 57 ++++++++++++++ core/src/tvu.rs | 16 ++++ core/src/validator.rs | 116 +++++++++++++++++++++-------- core/src/voting_service.rs | 41 +++++++++- dos/src/main.rs | 11 ++- gossip/src/cluster_info.rs | 17 +---- local-cluster/src/local_cluster.rs | 43 +++++++---- rpc-test/tests/rpc.rs | 11 +-- test-validator/src/lib.rs | 14 ++-- tpu-client/src/tpu_client.rs | 1 + validator/src/admin_rpc_service.rs | 14 ++-- validator/src/cli.rs | 12 ++- validator/src/main.rs | 15 ++-- 15 files changed, 281 insertions(+), 105 deletions(-) diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index c80e96005c8829..57f768e2773cd5 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -450,15 +450,16 @@ fn main() { }; let cluster_info = Arc::new(cluster_info); let tpu_disable_quic = matches.is_present("tpu_disable_quic"); - let connection_cache = match tpu_disable_quic { - false => ConnectionCache::new_quic( - "connection_cache_banking_bench_quic", - DEFAULT_TPU_CONNECTION_POOL_SIZE, - ), - true => ConnectionCache::with_udp( + let connection_cache = if tpu_disable_quic { + ConnectionCache::with_udp( "connection_cache_banking_bench_udp", DEFAULT_TPU_CONNECTION_POOL_SIZE, - ), + ) + } else { + ConnectionCache::new_quic( + "connection_cache_banking_bench_quic", + DEFAULT_TPU_CONNECTION_POOL_SIZE, + ) }; let banking_stage = BankingStage::new_num_threads( block_production_method, diff --git a/core/src/next_leader.rs b/core/src/next_leader.rs index e0196fe081af7a..d5091c1df6e7cf 100644 --- a/core/src/next_leader.rs +++ b/core/src/next_leader.rs @@ -16,6 +16,7 @@ pub(crate) fn upcoming_leader_tpu_vote_sockets( cluster_info: &ClusterInfo, poh_recorder: &RwLock, fanout_slots: u64, + protocol: Protocol, ) -> Vec { let upcoming_leaders = { let poh_recorder = poh_recorder.read().unwrap(); @@ -29,7 +30,7 @@ pub(crate) fn upcoming_leader_tpu_vote_sockets( .dedup() .filter_map(|leader_pubkey| { cluster_info - .lookup_contact_info(&leader_pubkey, |node| node.tpu_vote(Protocol::UDP))? + .lookup_contact_info(&leader_pubkey, |node| node.tpu_vote(protocol))? .ok() }) // dedup again since leaders could potentially share the same tpu vote socket diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 9b998ca9a99549..0b2cd69f7643ba 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -4233,6 +4233,7 @@ pub(crate) mod tests { }, crossbeam_channel::unbounded, itertools::Itertools, + solana_client::connection_cache::ConnectionCache, solana_entry::entry::{self, Entry}, solana_gossip::{cluster_info::Node, crds::Cursor}, solana_ledger::{ @@ -4263,6 +4264,7 @@ pub(crate) mod tests { transaction::TransactionError, }, solana_streamer::socket::SocketAddrSpace, + solana_tpu_client::tpu_client::{DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_VOTE_USE_QUIC}, solana_transaction_status::VersionedTransactionWithStatusMeta, solana_vote_program::{ vote_state::{self, TowerSync, VoteStateVersions}, @@ -7547,11 +7549,25 @@ pub(crate) mod tests { let vote_info = voting_receiver .recv_timeout(Duration::from_secs(1)) .unwrap(); + + let connection_cache = if DEFAULT_VOTE_USE_QUIC { + ConnectionCache::new_quic( + "connection_cache_vote_quic", + DEFAULT_TPU_CONNECTION_POOL_SIZE, + ) + } else { + ConnectionCache::with_udp( + "connection_cache_vote_udp", + DEFAULT_TPU_CONNECTION_POOL_SIZE, + ) + }; + crate::voting_service::VotingService::handle_vote( &cluster_info, &poh_recorder, &tower_storage, vote_info, + Arc::new(connection_cache), ); let mut cursor = Cursor::default(); @@ -7622,12 +7638,27 @@ pub(crate) mod tests { let vote_info = voting_receiver .recv_timeout(Duration::from_secs(1)) .unwrap(); + + let connection_cache = if DEFAULT_VOTE_USE_QUIC { + ConnectionCache::new_quic( + "connection_cache_vote_quic", + DEFAULT_TPU_CONNECTION_POOL_SIZE, + ) + } else { + ConnectionCache::with_udp( + "connection_cache_vote_udp", + DEFAULT_TPU_CONNECTION_POOL_SIZE, + ) + }; + crate::voting_service::VotingService::handle_vote( &cluster_info, &poh_recorder, &tower_storage, vote_info, + Arc::new(connection_cache), ); + let votes = cluster_info.get_votes(&mut cursor); assert_eq!(votes.len(), 1); let vote_tx = &votes[0]; @@ -7705,11 +7736,24 @@ pub(crate) mod tests { let vote_info = voting_receiver .recv_timeout(Duration::from_secs(1)) .unwrap(); + let connection_cache = if DEFAULT_VOTE_USE_QUIC { + ConnectionCache::new_quic( + "connection_cache_vote_quic", + DEFAULT_TPU_CONNECTION_POOL_SIZE, + ) + } else { + ConnectionCache::with_udp( + "connection_cache_vote_udp", + DEFAULT_TPU_CONNECTION_POOL_SIZE, + ) + }; + crate::voting_service::VotingService::handle_vote( &cluster_info, &poh_recorder, &tower_storage, vote_info, + Arc::new(connection_cache), ); assert!(last_vote_refresh_time.last_refresh_time > clone_refresh_time); @@ -7820,11 +7864,24 @@ pub(crate) mod tests { let vote_info = voting_receiver .recv_timeout(Duration::from_secs(1)) .unwrap(); + let connection_cache = if DEFAULT_VOTE_USE_QUIC { + ConnectionCache::new_quic( + "connection_cache_vote_quic", + DEFAULT_TPU_CONNECTION_POOL_SIZE, + ) + } else { + ConnectionCache::with_udp( + "connection_cache_vote_udp", + DEFAULT_TPU_CONNECTION_POOL_SIZE, + ) + }; + crate::voting_service::VotingService::handle_vote( cluster_info, poh_recorder, tower_storage, vote_info, + Arc::new(connection_cache), ); let votes = cluster_info.get_votes(cursor); diff --git a/core/src/tvu.rs b/core/src/tvu.rs index d13b3114a89792..16e5be3634e3ae 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -163,6 +163,7 @@ impl Tvu { cluster_slots: Arc, wen_restart_repair_slots: Option>>>, slot_status_notifier: Option, + vote_connection_cache: Arc, ) -> Result { let in_wen_restart = wen_restart_repair_slots.is_some(); @@ -331,6 +332,7 @@ impl Tvu { cluster_info.clone(), poh_recorder.clone(), tower_storage, + vote_connection_cache, ); let warm_quic_cache_service = connection_cache.and_then(|connection_cache| { @@ -436,6 +438,7 @@ pub mod tests { solana_runtime::bank::Bank, solana_sdk::signature::{Keypair, Signer}, solana_streamer::socket::SocketAddrSpace, + solana_tpu_client::tpu_client::{DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_VOTE_USE_QUIC}, std::sync::atomic::{AtomicU64, Ordering}, }; @@ -494,6 +497,18 @@ pub mod tests { } else { None }; + let connection_cache = if DEFAULT_VOTE_USE_QUIC { + ConnectionCache::new_quic( + "connection_cache_vote_quic", + DEFAULT_TPU_CONNECTION_POOL_SIZE, + ) + } else { + ConnectionCache::with_udp( + "connection_cache_vote_udp", + DEFAULT_TPU_CONNECTION_POOL_SIZE, + ) + }; + let tvu = Tvu::new( &vote_keypair.pubkey(), Arc::new(RwLock::new(vec![Arc::new(vote_keypair)])), @@ -555,6 +570,7 @@ pub mod tests { cluster_slots, wen_restart_repair_slots, None, + Arc::new(connection_cache), ) .expect("assume success"); if enable_wen_restart { diff --git a/core/src/validator.rs b/core/src/validator.rs index e2d05f3f09ede7..736dca8cb38ed6 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -476,6 +476,20 @@ struct TransactionHistoryServices { cache_block_meta_service: Option, } +/// A struct easing passing Validator TPU Configurations +pub struct ValidatorTpuConfig { + /// Controls if to use QUIC for sending regular TPU transaction + pub use_quic: bool, + /// Controls if to use QUIC for sending TPU votes + pub vote_use_quic: bool, + /// Controls the connection cache pool size + pub tpu_connection_pool_size: usize, + /// Controls if to enable UDP for TPU tansactions. + pub tpu_enable_udp: bool, + /// Controls the new maximum connections per IpAddr per minute + pub tpu_max_connections_per_ipaddr_per_minute: u64, +} + pub struct Validator { validator_exit: Arc>, json_rpc_service: Option, @@ -528,12 +542,17 @@ impl Validator { rpc_to_plugin_manager_receiver: Option>, start_progress: Arc>, socket_addr_space: SocketAddrSpace, - use_quic: bool, - tpu_connection_pool_size: usize, - tpu_enable_udp: bool, - tpu_max_connections_per_ipaddr_per_minute: u64, + tpu_config: ValidatorTpuConfig, admin_rpc_service_post_init: Arc>>, ) -> Result { + let ValidatorTpuConfig { + use_quic, + vote_use_quic, + tpu_connection_pool_size, + tpu_enable_udp, + tpu_max_connections_per_ipaddr_per_minute, + } = tpu_config; + let start_time = Instant::now(); // Initialize the global rayon pool first to ensure the value in config @@ -990,29 +1009,52 @@ impl Validator { let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); - let connection_cache = match use_quic { - true => { - let connection_cache = ConnectionCache::new_with_client_options( - "connection_cache_tpu_quic", - tpu_connection_pool_size, - None, - Some(( - &identity_keypair, - node.info - .tpu(Protocol::UDP) - .map_err(|err| { - ValidatorError::Other(format!("Invalid TPU address: {err:?}")) - })? - .ip(), - )), - Some((&staked_nodes, &identity_keypair.pubkey())), - ); - Arc::new(connection_cache) - } - false => Arc::new(ConnectionCache::with_udp( + let connection_cache = if use_quic { + let connection_cache = ConnectionCache::new_with_client_options( + "connection_cache_tpu_quic", + tpu_connection_pool_size, + None, + Some(( + &identity_keypair, + node.info + .tpu(Protocol::UDP) + .map_err(|err| { + ValidatorError::Other(format!("Invalid TPU address: {err:?}")) + })? + .ip(), + )), + Some((&staked_nodes, &identity_keypair.pubkey())), + ); + Arc::new(connection_cache) + } else { + Arc::new(ConnectionCache::with_udp( "connection_cache_tpu_udp", tpu_connection_pool_size, - )), + )) + }; + + let vote_connection_cache = if vote_use_quic { + let vote_connection_cache = ConnectionCache::new_with_client_options( + "connection_cache_vote_quic", + tpu_connection_pool_size, + None, // client_endpoint + Some(( + &identity_keypair, + node.info + .tpu_vote(Protocol::QUIC) + .map_err(|err| { + ValidatorError::Other(format!("Invalid TPU Vote address: {err:?}")) + })? + .ip(), + )), + Some((&staked_nodes, &identity_keypair.pubkey())), + ); + Arc::new(vote_connection_cache) + } else { + Arc::new(ConnectionCache::with_udp( + "connection_cache_vote_udp", + tpu_connection_pool_size, + )) }; let rpc_override_health_check = @@ -1428,6 +1470,7 @@ impl Validator { cluster_slots.clone(), wen_restart_repair_slots.clone(), slot_status_notifier, + vote_connection_cache, ) .map_err(ValidatorError::Other)?; @@ -2715,6 +2758,7 @@ mod tests { solana_sdk::{genesis_config::create_genesis_config, poh_config::PohConfig}, solana_tpu_client::tpu_client::{ DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP, DEFAULT_TPU_USE_QUIC, + DEFAULT_VOTE_USE_QUIC, }, std::{fs::remove_dir_all, thread, time::Duration}, }; @@ -2753,10 +2797,13 @@ mod tests { None, // rpc_to_plugin_manager_receiver start_progress.clone(), SocketAddrSpace::Unspecified, - DEFAULT_TPU_USE_QUIC, - DEFAULT_TPU_CONNECTION_POOL_SIZE, - DEFAULT_TPU_ENABLE_UDP, - 32, // max connections per IpAddr per minute for test + ValidatorTpuConfig { + use_quic: DEFAULT_TPU_USE_QUIC, + vote_use_quic: DEFAULT_VOTE_USE_QUIC, + tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE, + tpu_enable_udp: DEFAULT_TPU_ENABLE_UDP, + tpu_max_connections_per_ipaddr_per_minute: 32, // max connections per IpAddr per minute for test + }, Arc::new(RwLock::new(None)), ) .expect("assume successful validator start"); @@ -2972,10 +3019,13 @@ mod tests { None, // rpc_to_plugin_manager_receiver Arc::new(RwLock::new(ValidatorStartProgress::default())), SocketAddrSpace::Unspecified, - DEFAULT_TPU_USE_QUIC, - DEFAULT_TPU_CONNECTION_POOL_SIZE, - DEFAULT_TPU_ENABLE_UDP, - 32, // max connections per IpAddr per minute for test + ValidatorTpuConfig { + use_quic: DEFAULT_TPU_USE_QUIC, + vote_use_quic: DEFAULT_VOTE_USE_QUIC, + tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE, + tpu_enable_udp: DEFAULT_TPU_ENABLE_UDP, + tpu_max_connections_per_ipaddr_per_minute: 32, // max connections per IpAddr per minute for test + }, Arc::new(RwLock::new(None)), ) .expect("assume successful validator start") diff --git a/core/src/voting_service.rs b/core/src/voting_service.rs index 14443ab9c7947c..270446cb2e2a3d 100644 --- a/core/src/voting_service.rs +++ b/core/src/voting_service.rs @@ -3,8 +3,11 @@ use { consensus::tower_storage::{SavedTowerVersions, TowerStorage}, next_leader::upcoming_leader_tpu_vote_sockets, }, + bincode::serialize, crossbeam_channel::Receiver, - solana_gossip::cluster_info::ClusterInfo, + solana_client::connection_cache::ConnectionCache, + solana_connection_cache::client_connection::ClientConnection, + solana_gossip::{cluster_info::ClusterInfo, gossip_error::GossipError}, solana_measure::measure::Measure, solana_poh::poh_recorder::PohRecorder, solana_sdk::{ @@ -12,6 +15,7 @@ use { transaction::Transaction, }, std::{ + net::SocketAddr, sync::{Arc, RwLock}, thread::{self, Builder, JoinHandle}, }, @@ -38,6 +42,28 @@ impl VoteOp { } } +fn send_vote_transaction( + cluster_info: &ClusterInfo, + transaction: &Transaction, + tpu: Option, + connection_cache: &Arc, +) -> Result<(), GossipError> { + let tpu = tpu.map(Ok).unwrap_or_else(|| { + cluster_info + .my_contact_info() + .tpu(connection_cache.protocol()) + })?; + let buf = serialize(transaction)?; + let client = connection_cache.get_connection(&tpu); + match client.send_data_async(buf) { + Ok(_) => Ok(()), + Err(err) => { + trace!("Ran into an error when sending vote: {err:?} to {tpu:?}"); + Err(GossipError::SendError) + } + } +} + pub struct VotingService { thread_hdl: JoinHandle<()>, } @@ -48,6 +74,7 @@ impl VotingService { cluster_info: Arc, poh_recorder: Arc>, tower_storage: Arc, + connection_cache: Arc, ) -> Self { let thread_hdl = Builder::new() .name("solVoteService".to_string()) @@ -58,6 +85,7 @@ impl VotingService { &poh_recorder, tower_storage.as_ref(), vote_op, + connection_cache.clone(), ); } }) @@ -70,6 +98,7 @@ impl VotingService { poh_recorder: &RwLock, tower_storage: &dyn TowerStorage, vote_op: VoteOp, + connection_cache: Arc, ) { if let VoteOp::PushVote { saved_tower, .. } = &vote_op { let mut measure = Measure::start("tower storage save"); @@ -89,15 +118,21 @@ impl VotingService { cluster_info, poh_recorder, UPCOMING_LEADER_FANOUT_SLOTS, + connection_cache.protocol(), ); if !upcoming_leader_sockets.is_empty() { for tpu_vote_socket in upcoming_leader_sockets { - let _ = cluster_info.send_transaction(vote_op.tx(), Some(tpu_vote_socket)); + let _ = send_vote_transaction( + cluster_info, + vote_op.tx(), + Some(tpu_vote_socket), + &connection_cache, + ); } } else { // Send to our own tpu vote socket if we cannot find a leader to send to - let _ = cluster_info.send_transaction(vote_op.tx(), None); + let _ = send_vote_transaction(cluster_info, vote_op.tx(), None, &connection_cache); } match vote_op { diff --git a/dos/src/main.rs b/dos/src/main.rs index beb891e5a46a11..62af3c83af381c 100644 --- a/dos/src/main.rs +++ b/dos/src/main.rs @@ -259,14 +259,13 @@ fn create_sender_thread( tpu_use_quic: bool, ) -> thread::JoinHandle<()> { // ConnectionCache is used instead of client because it gives ~6% higher pps - let connection_cache = match tpu_use_quic { - true => ConnectionCache::new_quic( + let connection_cache = if tpu_use_quic { + ConnectionCache::new_quic( "connection_cache_dos_quic", DEFAULT_TPU_CONNECTION_POOL_SIZE, - ), - false => { - ConnectionCache::with_udp("connection_cache_dos_udp", DEFAULT_TPU_CONNECTION_POOL_SIZE) - } + ) + } else { + ConnectionCache::with_udp("connection_cache_dos_udp", DEFAULT_TPU_CONNECTION_POOL_SIZE) }; let connection = connection_cache.get_connection(target); diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 2dc840cc918a5a..a1ec2f21915d90 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -46,7 +46,6 @@ use { }, weighted_shuffle::WeightedShuffle, }, - bincode::serialize, crossbeam_channel::{Receiver, RecvTimeoutError, Sender}, itertools::Itertools, rand::{seq::SliceRandom, thread_rng, CryptoRng, Rng}, @@ -154,7 +153,6 @@ pub struct ClusterInfo { my_contact_info: RwLock, ping_cache: Mutex, stats: GossipStats, - socket: UdpSocket, local_message_pending_push_queue: Mutex>, contact_debug_interval: u64, // milliseconds, 0 = disabled contact_save_interval: u64, // milliseconds, 0 = disabled @@ -224,7 +222,6 @@ impl ClusterInfo { GOSSIP_PING_CACHE_CAPACITY, )), stats: GossipStats::default(), - socket: bind_to_unspecified().unwrap(), local_message_pending_push_queue: Mutex::default(), contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS, instance: RwLock::new(NodeInstance::new(&mut thread_rng(), id, timestamp())), @@ -925,19 +922,6 @@ impl ClusterInfo { } } - pub fn send_transaction( - &self, - transaction: &Transaction, - tpu: Option, - ) -> Result<(), GossipError> { - let tpu = tpu - .map(Ok) - .unwrap_or_else(|| self.my_contact_info().tpu(contact_info::Protocol::UDP))?; - let buf = serialize(transaction)?; - self.socket.send_to(&buf, tpu)?; - Ok(()) - } - /// Returns votes inserted since the given cursor. pub fn get_votes(&self, cursor: &mut Cursor) -> Vec { let txs: Vec = self @@ -3147,6 +3131,7 @@ mod tests { protocol::tests::new_rand_remote_node, socketaddr, }, + bincode::serialize, itertools::izip, solana_ledger::shred::Shredder, solana_net_utils::bind_to, diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index 309c2a41bb713b..35fcc5f2ed9333 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -11,7 +11,7 @@ use { solana_client::connection_cache::ConnectionCache, solana_core::{ consensus::tower_storage::FileTowerStorage, - validator::{Validator, ValidatorConfig, ValidatorStartProgress}, + validator::{Validator, ValidatorConfig, ValidatorStartProgress, ValidatorTpuConfig}, }, solana_gossip::{ cluster_info::Node, @@ -51,7 +51,7 @@ use { solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes}, solana_tpu_client::tpu_client::{ TpuClient, TpuClientConfig, DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP, - DEFAULT_TPU_USE_QUIC, + DEFAULT_TPU_USE_QUIC, DEFAULT_VOTE_USE_QUIC, }, solana_vote_program::{ vote_instruction, @@ -97,6 +97,7 @@ pub struct ClusterConfig { pub additional_accounts: Vec<(Pubkey, AccountSharedData)>, pub tpu_use_quic: bool, pub tpu_connection_pool_size: usize, + pub vote_use_quic: bool, } impl ClusterConfig { @@ -136,6 +137,7 @@ impl Default for ClusterConfig { additional_accounts: vec![], tpu_use_quic: DEFAULT_TPU_USE_QUIC, tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE, + vote_use_quic: DEFAULT_VOTE_USE_QUIC, } } } @@ -339,12 +341,15 @@ impl LocalCluster { None, // rpc_to_plugin_manager_receiver Arc::new(RwLock::new(ValidatorStartProgress::default())), socket_addr_space, - DEFAULT_TPU_USE_QUIC, - DEFAULT_TPU_CONNECTION_POOL_SIZE, - // We are turning tpu_enable_udp to true in order to prevent concurrent local cluster tests - // to use the same QUIC ports due to SO_REUSEPORT. - true, - 32, // max connections per IpAddr per minute + ValidatorTpuConfig { + use_quic: DEFAULT_TPU_USE_QUIC, + vote_use_quic: DEFAULT_VOTE_USE_QUIC, + tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE, + // We are turning tpu_enable_udp to true in order to prevent concurrent local cluster tests + // to use the same QUIC ports due to SO_REUSEPORT. + tpu_enable_udp: true, + tpu_max_connections_per_ipaddr_per_minute: 32, // max connections per IpAddr per minute + }, Arc::new(RwLock::new(None)), ) .expect("assume successful validator start"); @@ -546,10 +551,13 @@ impl LocalCluster { None, // rpc_to_plugin_manager_receiver Arc::new(RwLock::new(ValidatorStartProgress::default())), socket_addr_space, - DEFAULT_TPU_USE_QUIC, - DEFAULT_TPU_CONNECTION_POOL_SIZE, - DEFAULT_TPU_ENABLE_UDP, - 32, // max connections per IpAddr per mintute + ValidatorTpuConfig { + use_quic: DEFAULT_TPU_USE_QUIC, + vote_use_quic: DEFAULT_VOTE_USE_QUIC, + tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE, + tpu_enable_udp: DEFAULT_TPU_ENABLE_UDP, + tpu_max_connections_per_ipaddr_per_minute: 32, // max connections per IpAddr per mintute + }, Arc::new(RwLock::new(None)), ) .expect("assume successful validator start"); @@ -1079,10 +1087,13 @@ impl Cluster for LocalCluster { None, // rpc_to_plugin_manager_receiver Arc::new(RwLock::new(ValidatorStartProgress::default())), socket_addr_space, - DEFAULT_TPU_USE_QUIC, - DEFAULT_TPU_CONNECTION_POOL_SIZE, - DEFAULT_TPU_ENABLE_UDP, - 32, // max connections per IpAddr per minute, use higher value because of tests + ValidatorTpuConfig { + use_quic: DEFAULT_TPU_USE_QUIC, + vote_use_quic: DEFAULT_VOTE_USE_QUIC, + tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE, + tpu_enable_udp: DEFAULT_TPU_ENABLE_UDP, + tpu_max_connections_per_ipaddr_per_minute: 32, // max connections per IpAddr per minute, use higher value because of tests + }, Arc::new(RwLock::new(None)), ) .expect("assume successful validator start"); diff --git a/rpc-test/tests/rpc.rs b/rpc-test/tests/rpc.rs index f82c0912678005..60ea60d4d06717 100644 --- a/rpc-test/tests/rpc.rs +++ b/rpc-test/tests/rpc.rs @@ -502,13 +502,10 @@ fn run_tpu_send_transaction(tpu_use_quic: bool) { test_validator.rpc_url(), CommitmentConfig::processed(), )); - let connection_cache = match tpu_use_quic { - true => { - ConnectionCache::new_quic("connection_cache_test", DEFAULT_TPU_CONNECTION_POOL_SIZE) - } - false => { - ConnectionCache::with_udp("connection_cache_test", DEFAULT_TPU_CONNECTION_POOL_SIZE) - } + let connection_cache = if tpu_use_quic { + ConnectionCache::new_quic("connection_cache_test", DEFAULT_TPU_CONNECTION_POOL_SIZE) + } else { + ConnectionCache::with_udp("connection_cache_test", DEFAULT_TPU_CONNECTION_POOL_SIZE) }; let recent_blockhash = rpc_client.get_latest_blockhash().unwrap(); let tx = diff --git a/test-validator/src/lib.rs b/test-validator/src/lib.rs index 0c77705e04acce..66ea5a914647cc 100644 --- a/test-validator/src/lib.rs +++ b/test-validator/src/lib.rs @@ -13,7 +13,7 @@ use { solana_core::{ admin_rpc_post_init::AdminRpcRequestMetadataPostInit, consensus::tower_storage::TowerStorage, - validator::{Validator, ValidatorConfig, ValidatorStartProgress}, + validator::{Validator, ValidatorConfig, ValidatorStartProgress, ValidatorTpuConfig}, }, solana_feature_set::FEATURE_NAMES, solana_geyser_plugin_manager::{ @@ -58,6 +58,7 @@ use { solana_streamer::socket::SocketAddrSpace, solana_tpu_client::tpu_client::{ DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP, DEFAULT_TPU_USE_QUIC, + DEFAULT_VOTE_USE_QUIC, }, std::{ collections::{HashMap, HashSet}, @@ -1045,10 +1046,13 @@ impl TestValidator { rpc_to_plugin_manager_receiver, config.start_progress.clone(), socket_addr_space, - DEFAULT_TPU_USE_QUIC, - DEFAULT_TPU_CONNECTION_POOL_SIZE, - config.tpu_enable_udp, - 32, // max connections per IpAddr per minute for test + ValidatorTpuConfig { + use_quic: DEFAULT_TPU_USE_QUIC, + vote_use_quic: DEFAULT_VOTE_USE_QUIC, + tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE, + tpu_enable_udp: config.tpu_enable_udp, + tpu_max_connections_per_ipaddr_per_minute: 32, // max connections per IpAddr per minute for test + }, config.admin_rpc_service_post_init.clone(), )?); diff --git a/tpu-client/src/tpu_client.rs b/tpu-client/src/tpu_client.rs index ca45cb4c51331e..1df53c54b12c9f 100644 --- a/tpu-client/src/tpu_client.rs +++ b/tpu-client/src/tpu_client.rs @@ -29,6 +29,7 @@ use { pub const DEFAULT_TPU_ENABLE_UDP: bool = false; pub const DEFAULT_TPU_USE_QUIC: bool = true; +pub const DEFAULT_VOTE_USE_QUIC: bool = false; /// The default connection count is set to 1 -- it should /// be sufficient for most use cases. Validators can use diff --git a/validator/src/admin_rpc_service.rs b/validator/src/admin_rpc_service.rs index 021f169fe72c1d..851c2959e97c0b 100644 --- a/validator/src/admin_rpc_service.rs +++ b/validator/src/admin_rpc_service.rs @@ -871,7 +871,7 @@ mod tests { }, solana_core::{ consensus::tower_storage::NullTowerStorage, - validator::{Validator, ValidatorConfig}, + validator::{Validator, ValidatorConfig, ValidatorTpuConfig}, }, solana_gossip::cluster_info::{ClusterInfo, Node}, solana_inline_spl::token, @@ -895,6 +895,7 @@ mod tests { solana_streamer::socket::SocketAddrSpace, solana_tpu_client::tpu_client::{ DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP, DEFAULT_TPU_USE_QUIC, + DEFAULT_VOTE_USE_QUIC, }, spl_token_2022::{ solana_program::{program_option::COption, program_pack::Pack}, @@ -1397,10 +1398,13 @@ mod tests { None, // rpc_to_plugin_manager_receiver start_progress.clone(), SocketAddrSpace::Unspecified, - DEFAULT_TPU_USE_QUIC, - DEFAULT_TPU_CONNECTION_POOL_SIZE, - DEFAULT_TPU_ENABLE_UDP, - 32, // max connections per IpAddr per minute for test + ValidatorTpuConfig { + use_quic: DEFAULT_TPU_USE_QUIC, + vote_use_quic: DEFAULT_VOTE_USE_QUIC, + tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE, + tpu_enable_udp: DEFAULT_TPU_ENABLE_UDP, + tpu_max_connections_per_ipaddr_per_minute: 32, // max connections per IpAddr per minute for test + }, post_init, ) .expect("assume successful validator start"); diff --git a/validator/src/cli.rs b/validator/src/cli.rs index 19a2b7f77e7b0a..aed7a3bffc1d9f 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -48,7 +48,7 @@ use { self, MAX_BATCH_SEND_RATE_MS, MAX_TRANSACTION_BATCH_SIZE, }, solana_streamer::quic::DEFAULT_QUIC_ENDPOINTS, - solana_tpu_client::tpu_client::DEFAULT_TPU_CONNECTION_POOL_SIZE, + solana_tpu_client::tpu_client::{DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_VOTE_USE_QUIC}, solana_unified_scheduler_pool::DefaultSchedulerPool, std::{path::PathBuf, str::FromStr}, }; @@ -898,6 +898,14 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> { .hidden(hidden_unless_forced()) .help("Controls the rate of the clients connections per IpAddr per minute."), ) + .arg( + Arg::with_name("vote_use_quic") + .long("vote-use-quic") + .takes_value(true) + .default_value(&default_args.vote_use_quic) + .hidden(hidden_unless_forced()) + .help("Controls if to use QUIC to send votes."), + ) .arg( Arg::with_name("num_quic_endpoints") .long("num-quic-endpoints") @@ -2294,6 +2302,7 @@ pub struct DefaultArgs { pub tpu_connection_pool_size: String, pub tpu_max_connections_per_ipaddr_per_minute: String, pub num_quic_endpoints: String, + pub vote_use_quic: String, // Exit subcommand pub exit_min_idle_time: String, @@ -2385,6 +2394,7 @@ impl DefaultArgs { tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE.to_string(), tpu_max_connections_per_ipaddr_per_minute: DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE.to_string(), + vote_use_quic: DEFAULT_VOTE_USE_QUIC.to_string(), num_quic_endpoints: DEFAULT_QUIC_ENDPOINTS.to_string(), rpc_max_request_body_size: MAX_REQUEST_BODY_SIZE.to_string(), exit_min_idle_time: "10".to_string(), diff --git a/validator/src/main.rs b/validator/src/main.rs index bee65e487ba92a..26552c7bec9f1f 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -34,7 +34,7 @@ use { tpu::DEFAULT_TPU_COALESCE, validator::{ is_snapshot_config_valid, BlockProductionMethod, BlockVerificationMethod, Validator, - ValidatorConfig, ValidatorError, ValidatorStartProgress, + ValidatorConfig, ValidatorError, ValidatorStartProgress, ValidatorTpuConfig, }, }, solana_gossip::{ @@ -1124,6 +1124,8 @@ pub fn main() { let accounts_shrink_optimize_total_space = value_t_or_exit!(matches, "accounts_shrink_optimize_total_space", bool); let tpu_use_quic = !matches.is_present("tpu_disable_quic"); + let vote_use_quic = value_t_or_exit!(matches, "vote_use_quic", bool); + let tpu_enable_udp = if matches.is_present("tpu_enable_udp") { true } else { @@ -2077,10 +2079,13 @@ pub fn main() { rpc_to_plugin_manager_receiver, start_progress, socket_addr_space, - tpu_use_quic, - tpu_connection_pool_size, - tpu_enable_udp, - tpu_max_connections_per_ipaddr_per_minute, + ValidatorTpuConfig { + use_quic: tpu_use_quic, + vote_use_quic, + tpu_connection_pool_size, + tpu_enable_udp, + tpu_max_connections_per_ipaddr_per_minute, + }, admin_service_post_init, ) { Ok(validator) => validator,