diff --git a/core/src/validator.rs b/core/src/validator.rs index d776c7923efb8d..5c91578d09a7f5 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -83,6 +83,7 @@ use { }, solana_rayon_threadlimit::{get_max_thread_count, get_thread_count}, solana_rpc::{ + cluster_tpu_info::ClusterTpuInfo, max_slots::MaxSlots, optimistically_confirmed_bank_tracker::{ BankNotificationSenderConfig, OptimisticallyConfirmedBank, @@ -120,11 +121,15 @@ use { hard_forks::HardForks, hash::Hash, pubkey::Pubkey, + quic::NotifyKeyUpdate, shred_version::compute_shred_version, signature::{Keypair, Signer}, timing::timestamp, }, - solana_send_transaction_service::send_transaction_service, + solana_send_transaction_service::{ + send_transaction_service, + transaction_client::{ConnectionCacheClient, TpuClientNextClient}, + }, solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes}, solana_turbine::{self, broadcast_stage::BroadcastStageType}, solana_unified_scheduler_pool::DefaultSchedulerPool, @@ -138,7 +143,7 @@ use { path::{Path, PathBuf}, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, - Arc, Mutex, RwLock, + Arc, LazyLock, Mutex, RwLock, }, thread::{sleep, Builder, JoinHandle}, time::{Duration, Instant}, @@ -146,9 +151,21 @@ use { strum::VariantNames, strum_macros::{Display, EnumCount, EnumIter, EnumString, EnumVariantNames, IntoStaticStr}, thiserror::Error, - tokio::runtime::Runtime as TokioRuntime, + tokio::runtime::{self, Runtime as TokioRuntime}, }; +static GLOBAL_RUNTIME: LazyLock = LazyLock::new(|| { + runtime::Builder::new_multi_thread() + .enable_all() + .build() + .expect("Failed to create Tokio runtime") +}); + +// Function to get a handle to the runtime +fn get_runtime_handle() -> tokio::runtime::Handle { + GLOBAL_RUNTIME.handle().clone() +} + const MAX_COMPLETED_DATA_SETS_IN_CHANNEL: usize = 100_000; const WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT: u64 = 80; // Right now since we reuse the wait for supermajority code, the @@ -286,6 +303,7 @@ pub struct ValidatorConfig { pub replay_transactions_threads: NonZeroUsize, pub tvu_shred_sigverify_threads: NonZeroUsize, pub delay_leader_block_for_pending_fork: bool, + pub use_tpu_client_next: bool, } impl Default for ValidatorConfig { @@ -358,6 +376,7 @@ impl Default for ValidatorConfig { replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"), tvu_shred_sigverify_threads: NonZeroUsize::new(1).expect("1 is non-zero"), delay_leader_block_for_pending_fork: false, + use_tpu_client_next: false, } } } @@ -989,6 +1008,9 @@ impl Validator { let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); + // ConnectionCache might be used for JsonRpc and for Forwarding. Since + // the later is not migrated yet to the tpu-client-next, create + // ConnectionCache regardless of config.use_tpu_client_next for now. let connection_cache = match use_quic { true => { let connection_cache = ConnectionCache::new_with_client_options( @@ -1024,6 +1046,7 @@ impl Validator { rpc_completed_slots_service, optimistically_confirmed_bank_tracker, bank_notification_sender, + client_updater, ) = if let Some((rpc_addr, rpc_pubsub_addr)) = config.rpc_addrs { assert_eq!( node.info @@ -1042,31 +1065,91 @@ impl Validator { None }; - let json_rpc_service = JsonRpcService::new( - rpc_addr, - config.rpc_config.clone(), - Some(config.snapshot_config.clone()), - bank_forks.clone(), - block_commitment_cache.clone(), - blockstore.clone(), - cluster_info.clone(), - Some(poh_recorder.clone()), - genesis_config.hash(), - ledger_path, - config.validator_exit.clone(), - exit.clone(), - rpc_override_health_check.clone(), - startup_verification_complete, - optimistically_confirmed_bank.clone(), - config.send_transaction_service_config.clone(), - max_slots.clone(), - leader_schedule_cache.clone(), - connection_cache.clone(), - max_complete_transaction_status_slot, - max_complete_rewards_slot, - prioritization_fee_cache.clone(), - ) - .map_err(ValidatorError::Other)?; + let leader_info = ClusterTpuInfo::new(cluster_info.clone(), poh_recorder.clone()); + let (json_rpc_service, client_updater) = if config.use_tpu_client_next { + let my_tpu_address = cluster_info + .my_contact_info() + .tpu(Protocol::QUIC) + .map_err(|err| ValidatorError::Other(format!("{err}")))?; + + let client = TpuClientNextClient::new( + get_runtime_handle(), + my_tpu_address, + config.send_transaction_service_config.tpu_peers.clone(), + Some(leader_info), + config.send_transaction_service_config.leader_forward_count, + Some(identity_keypair.insecure_clone()), + ); + + let json_rpc_service = JsonRpcService::new( + rpc_addr, + config.rpc_config.clone(), + Some(config.snapshot_config.clone()), + bank_forks.clone(), + block_commitment_cache.clone(), + blockstore.clone(), + cluster_info.clone(), + genesis_config.hash(), + ledger_path, + config.validator_exit.clone(), + exit.clone(), + rpc_override_health_check.clone(), + startup_verification_complete, + optimistically_confirmed_bank.clone(), + config.send_transaction_service_config.clone(), + max_slots.clone(), + leader_schedule_cache.clone(), + client.clone(), + max_complete_transaction_status_slot, + max_complete_rewards_slot, + prioritization_fee_cache.clone(), + ) + .map_err(ValidatorError::Other)?; + ( + json_rpc_service, + Arc::new(client) as Arc, + ) + } else { + let my_tpu_address = cluster_info + .my_contact_info() + .tpu(connection_cache.protocol()) + .map_err(|err| ValidatorError::Other(format!("{err}")))?; + let client = ConnectionCacheClient::new( + connection_cache.clone(), + my_tpu_address, + config.send_transaction_service_config.tpu_peers.clone(), + Some(leader_info), + config.send_transaction_service_config.leader_forward_count, + ); + let json_rpc_service = JsonRpcService::new( + rpc_addr, + config.rpc_config.clone(), + Some(config.snapshot_config.clone()), + bank_forks.clone(), + block_commitment_cache.clone(), + blockstore.clone(), + cluster_info.clone(), + genesis_config.hash(), + ledger_path, + config.validator_exit.clone(), + exit.clone(), + rpc_override_health_check.clone(), + startup_verification_complete, + optimistically_confirmed_bank.clone(), + config.send_transaction_service_config.clone(), + max_slots.clone(), + leader_schedule_cache.clone(), + client.clone(), + max_complete_transaction_status_slot, + max_complete_rewards_slot, + prioritization_fee_cache.clone(), + ) + .map_err(ValidatorError::Other)?; + ( + json_rpc_service, + Arc::new(client) as Arc, + ) + }; let pubsub_service = if !config.rpc_config.full_api { None @@ -1142,9 +1225,10 @@ impl Validator { rpc_completed_slots_service, optimistically_confirmed_bank_tracker, bank_notification_sender_config, + Some(client_updater), ) } else { - (None, None, None, None, None, None, None) + (None, None, None, None, None, None, None, None) }; if config.halt_at_slot.is_some() { @@ -1414,7 +1498,10 @@ impl Validator { config.wait_to_vote_slot, accounts_background_request_sender.clone(), config.runtime_config.log_messages_bytes_limit, - json_rpc_service.is_some().then_some(&connection_cache), // for the cache warmer only used for STS for RPC service + // for the cache warmer only used for STS for RPC service when + // tpu-client-next is not used + (json_rpc_service.is_some() && !config.use_tpu_client_next) + .then_some(&connection_cache), &prioritization_fee_cache, banking_tracer.clone(), turbine_quic_endpoint_sender.clone(), @@ -1507,7 +1594,12 @@ impl Validator { ); *start_progress.write().unwrap() = ValidatorStartProgress::Running; - key_notifies.push(connection_cache); + if let Some(client_updater) = client_updater { + key_notifies.push(client_updater); + } else { + // add connection_cache because it is still used in Forwarder. + key_notifies.push(connection_cache); + } *admin_rpc_service_post_init.write().unwrap() = Some(AdminRpcRequestMetadataPostInit { bank_forks: bank_forks.clone(), diff --git a/local-cluster/src/validator_configs.rs b/local-cluster/src/validator_configs.rs index e90475aad2a06f..506ffc95ee4bec 100644 --- a/local-cluster/src/validator_configs.rs +++ b/local-cluster/src/validator_configs.rs @@ -73,6 +73,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { replay_transactions_threads: config.replay_transactions_threads, tvu_shred_sigverify_threads: config.tvu_shred_sigverify_threads, delay_leader_block_for_pending_fork: config.delay_leader_block_for_pending_fork, + use_tpu_client_next: config.use_tpu_client_next, } } diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index bbda760bac0493..d747afafa9f9ae 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -71,6 +71,7 @@ solana-runtime = { workspace = true, features = ["dev-context-only-utils"] } solana-runtime-transaction = { workspace = true, features = [ "dev-context-only-utils", ] } +solana-send-transaction-service = { workspace = true, features = ["dev-context-only-utils"] } solana-stake-program = { workspace = true } spl-pod = { workspace = true } symlink = { workspace = true } diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index 9763ebd791a162..21aea6d8dc20d6 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -1,5 +1,5 @@ #![allow(clippy::arithmetic_side_effects)] -mod cluster_tpu_info; +pub mod cluster_tpu_info; pub mod filter; pub mod max_slots; pub mod optimistically_confirmed_bank_tracker; diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index b479536de0eaf5..8e3a2dbcf54f2d 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -115,15 +115,14 @@ use { }; #[cfg(test)] use { - solana_client::connection_cache::ConnectionCache, solana_gossip::contact_info::ContactInfo, solana_ledger::get_tmp_ledger_path, solana_runtime::commitment::CommitmentSlots, solana_send_transaction_service::{ - send_transaction_service::SendTransactionService, tpu_info::NullTpuInfo, - transaction_client::ConnectionCacheClient, + send_transaction_service::SendTransactionService, test_utils::ClientWithCreator, }, solana_streamer::socket::SocketAddrSpace, + tokio::runtime::Handle, }; pub mod account_resolver; @@ -357,10 +356,10 @@ impl JsonRpcRequestProcessor { } #[cfg(test)] - pub fn new_from_bank( + pub fn new_from_bank( + runtime: Option, bank: Bank, socket_addr_space: SocketAddrSpace, - connection_cache: Arc, ) -> Self { let genesis_hash = bank.hash(); let bank_forks = BankForks::new_rw_arc(bank); @@ -375,19 +374,10 @@ impl JsonRpcRequestProcessor { ); ClusterInfo::new(contact_info, keypair, socket_addr_space) }); - let tpu_address = cluster_info - .my_contact_info() - .tpu(connection_cache.protocol()) - .unwrap(); + let my_tpu_address = cluster_info.my_contact_info().tpu(Protocol::QUIC).unwrap(); let (sender, receiver) = unbounded(); - let client = ConnectionCacheClient::::new( - connection_cache.clone(), - tpu_address, - None, - None, - 1, - ); + let client = Client::create_client(runtime, my_tpu_address, None, 1); SendTransactionService::new(&bank_forks, receiver, client, 1000, exit.clone()); let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); @@ -4387,7 +4377,8 @@ pub mod tests { vote::state::VoteState, }, solana_send_transaction_service::{ - tpu_info::NullTpuInfo, transaction_client::ConnectionCacheClient, + tpu_info::NullTpuInfo, + transaction_client::{ConnectionCacheClient, TpuClientNextClient}, }, solana_transaction_status::{ EncodedConfirmedBlock, EncodedTransaction, EncodedTransactionWithStatusMeta, @@ -4408,6 +4399,7 @@ pub mod tests { state::{AccountState as TokenAccountState, Mint}, }, std::{borrow::Cow, collections::HashMap, net::Ipv4Addr}, + tokio::runtime::Handle, }; const TEST_MINT_LAMPORTS: u64 = 1_000_000_000; @@ -4768,16 +4760,14 @@ pub mod tests { } } - #[test] - fn test_rpc_request_processor_new() { + fn rpc_request_processor_new(maybe_runtime: Option) { let bob_pubkey = solana_sdk::pubkey::new_rand(); let genesis = create_genesis_config(100); let bank = Bank::new_for_tests(&genesis.genesis_config); - let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); - let meta = JsonRpcRequestProcessor::new_from_bank( + let meta = JsonRpcRequestProcessor::new_from_bank::( + maybe_runtime, bank, SocketAddrSpace::Unspecified, - connection_cache, ); let bank = meta.bank_forks.read().unwrap().root_bank(); @@ -4792,15 +4782,23 @@ pub mod tests { } #[test] - fn test_rpc_get_balance() { + fn test_rpc_request_processor_new_connection_cache() { + rpc_request_processor_new::>(None); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_rpc_request_processor_new_tpu_client_next() { + rpc_request_processor_new::>(Some(Handle::current())); + } + + fn rpc_get_balance(maybe_runtime: Option) { let genesis = create_genesis_config(20); let mint_pubkey = genesis.mint_keypair.pubkey(); let bank = Bank::new_for_tests(&genesis.genesis_config); - let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); - let meta = JsonRpcRequestProcessor::new_from_bank( + let meta = JsonRpcRequestProcessor::new_from_bank::( + maybe_runtime, bank, SocketAddrSpace::Unspecified, - connection_cache, ); let mut io = MetaIoHandler::default(); @@ -4824,15 +4822,23 @@ pub mod tests { } #[test] - fn test_rpc_get_balance_via_client() { + fn test_rpc_get_balance_new_connection_cache() { + rpc_get_balance::>(None); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_rpc_get_balance_new_tpu_client_next() { + rpc_get_balance::>(Some(Handle::current())); + } + + fn rpc_get_balance_via_client(maybe_runtime: Option) { let genesis = create_genesis_config(20); let mint_pubkey = genesis.mint_keypair.pubkey(); let bank = Bank::new_for_tests(&genesis.genesis_config); - let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); - let meta = JsonRpcRequestProcessor::new_from_bank( + let meta = JsonRpcRequestProcessor::new_from_bank::( + maybe_runtime, bank, SocketAddrSpace::Unspecified, - connection_cache, ); let mut io = MetaIoHandler::default(); @@ -4857,6 +4863,16 @@ pub mod tests { assert_eq!(response, 20); } + #[test] + fn test_rpc_get_balance_via_client_connection_cache() { + rpc_get_balance_via_client::>(None); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_rpc_get_balance_via_client_tpu_client_next() { + rpc_get_balance_via_client::>(Some(Handle::current())); + } + #[test] fn test_rpc_get_cluster_nodes() { let rpc = RpcHandler::start(); @@ -4951,16 +4967,14 @@ pub mod tests { assert_eq!(result, expected); } - #[test] - fn test_rpc_get_tx_count() { + fn rpc_get_tx_count(maybe_runtime: Option) { let bob_pubkey = solana_sdk::pubkey::new_rand(); let genesis = create_genesis_config(10); let bank = Bank::new_for_tests(&genesis.genesis_config); - let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); - let meta = JsonRpcRequestProcessor::new_from_bank( + let meta = JsonRpcRequestProcessor::new_from_bank::( + maybe_runtime, bank, SocketAddrSpace::Unspecified, - connection_cache, ); let mut io = MetaIoHandler::default(); @@ -4987,6 +5001,17 @@ pub mod tests { assert_eq!(result, expected); } + #[test] + fn test_rpc_get_tx_count_connection_cache() { + rpc_get_tx_count::>(None); + } + + #[tokio::test(flavor = "multi_thread")] + + async fn test_rpc_get_tx_count_tpu_client_next() { + rpc_get_tx_count::>(Some(Handle::current())); + } + #[test] fn test_rpc_minimum_ledger_slot() { let rpc = RpcHandler::start(); @@ -6422,15 +6447,13 @@ pub mod tests { assert_eq!(result, expected); } - #[test] - fn test_rpc_send_bad_tx() { + fn rpc_send_bad_tx(maybe_runtime: Option) { let genesis = create_genesis_config(100); let bank = Bank::new_for_tests(&genesis.genesis_config); - let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); - let meta = JsonRpcRequestProcessor::new_from_bank( + let meta = JsonRpcRequestProcessor::new_from_bank::( + maybe_runtime, bank, SocketAddrSpace::Unspecified, - connection_cache, ); let mut io = MetaIoHandler::default(); @@ -6444,7 +6467,16 @@ pub mod tests { } #[test] - fn test_rpc_send_transaction_preflight() { + fn test_rpc_send_bad_tx_connection_cache() { + rpc_send_bad_tx::>(None); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_rpc_send_bad_tx_tpu_client_next() { + rpc_send_bad_tx::>(Some(Handle::current())); + } + + fn rpc_send_transaction_preflight(maybe_runtime: Option) { let exit = Arc::new(AtomicBool::new(false)); let validator_exit = create_validator_exit(exit.clone()); let ledger_path = get_tmp_ledger_path!(); @@ -6470,11 +6502,7 @@ pub mod tests { ); ClusterInfo::new(contact_info, keypair, SocketAddrSpace::Unspecified) }); - let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); - let tpu_address = cluster_info - .my_contact_info() - .tpu(connection_cache.protocol()) - .unwrap(); + let my_tpu_address = cluster_info.my_contact_info().tpu(Protocol::QUIC).unwrap(); let (meta, receiver) = JsonRpcRequestProcessor::new( JsonRpcConfig::default(), None, @@ -6494,13 +6522,7 @@ pub mod tests { Arc::new(AtomicU64::default()), Arc::new(PrioritizationFeeCache::default()), ); - let client = ConnectionCacheClient::::new( - connection_cache.clone(), - tpu_address, - None, - None, - 1, - ); + let client = C::create_client(maybe_runtime, my_tpu_address, None, 1); SendTransactionService::new(&bank_forks, receiver, client, 1000, exit.clone()); let mut bad_transaction = system_transaction::transfer( @@ -6604,6 +6626,16 @@ pub mod tests { ); } + #[test] + fn test_rpc_send_transaction_preflight_with_connection_cache() { + rpc_send_transaction_preflight::>(None); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_rpc_send_transaction_preflight_with_tpu_client_next() { + rpc_send_transaction_preflight::>(Some(Handle::current())); + } + #[test] fn test_rpc_verify_filter() { let filter = RpcFilterType::Memcmp(Memcmp::new( @@ -6716,8 +6748,7 @@ pub mod tests { assert_eq!(result, expected); } - #[test] - fn test_rpc_processor_get_block_commitment() { + fn rpc_processor_get_block_commitment(maybe_runtime: Option) { let exit = Arc::new(AtomicBool::new(false)); let validator_exit = create_validator_exit(exit.clone()); let bank_forks = new_bank_forks().0; @@ -6740,11 +6771,7 @@ pub mod tests { ))); let cluster_info = Arc::new(new_test_cluster_info()); - let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); - let tpu_address = cluster_info - .my_contact_info() - .tpu(connection_cache.protocol()) - .unwrap(); + let my_tpu_address = cluster_info.my_contact_info().tpu(Protocol::QUIC).unwrap(); let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); let (request_processor, receiver) = JsonRpcRequestProcessor::new( @@ -6766,14 +6793,8 @@ pub mod tests { Arc::new(AtomicU64::default()), Arc::new(PrioritizationFeeCache::default()), ); - let client = ConnectionCacheClient::::new( - connection_cache.clone(), - tpu_address, - None, - None, - 1, - ); - SendTransactionService::new(&bank_forks, receiver, client, 1000, exit.clone()); + let client = C::create_client(maybe_runtime, my_tpu_address, None, 1); + SendTransactionService::new(&bank_forks, receiver, client, 1000, exit); assert_eq!( request_processor.get_block_commitment(0), @@ -6798,6 +6819,18 @@ pub mod tests { ); } + #[test] + fn test_rpc_processor_get_block_commitment_with_connection_cache() { + rpc_processor_get_block_commitment::>(None); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_rpc_processor_get_block_commitment_with_tpu_client_next() { + rpc_processor_get_block_commitment::>(Some( + Handle::current(), + )); + } + #[test] fn test_rpc_get_block_commitment() { let rpc = RpcHandler::start(); diff --git a/rpc/src/rpc_service.rs b/rpc/src/rpc_service.rs index 2c582be72ea5a6..2d09765e08c784 100644 --- a/rpc/src/rpc_service.rs +++ b/rpc/src/rpc_service.rs @@ -2,7 +2,6 @@ use { crate::{ - cluster_tpu_info::ClusterTpuInfo, max_slots::MaxSlots, optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank, rpc::{rpc_accounts::*, rpc_accounts_scan::*, rpc_bank::*, rpc_full::*, rpc_minimal::*, *}, @@ -16,7 +15,6 @@ use { RequestMiddlewareAction, ServerBuilder, }, regex::Regex, - solana_client::connection_cache::ConnectionCache, solana_gossip::cluster_info::ClusterInfo, solana_ledger::{ bigtable_upload::ConfirmedBlockUploadConfig, @@ -25,7 +23,6 @@ use { }, solana_metrics::inc_new_counter_info, solana_perf::thread::renice_this_thread, - solana_poh::poh_recorder::PohRecorder, solana_runtime::{ bank_forks::BankForks, commitment::BlockCommitmentCache, prioritization_fee_cache::PrioritizationFeeCache, @@ -38,7 +35,7 @@ use { }, solana_send_transaction_service::{ send_transaction_service::{self, SendTransactionService}, - transaction_client::ConnectionCacheClient, + transaction_client::TransactionClient, }, solana_storage_bigtable::CredentialType, std::{ @@ -335,7 +332,7 @@ fn process_rest(bank_forks: &Arc>, path: &str) -> Option( rpc_addr: SocketAddr, config: JsonRpcConfig, snapshot_config: Option, @@ -343,7 +340,6 @@ impl JsonRpcService { block_commitment_cache: Arc>, blockstore: Arc, cluster_info: Arc, - poh_recorder: Option>>, genesis_hash: Hash, ledger_path: &Path, validator_exit: Arc>, @@ -354,7 +350,7 @@ impl JsonRpcService { send_transaction_service_config: send_transaction_service::Config, max_slots: Arc, leader_schedule_cache: Arc, - connection_cache: Arc, + client: Client, max_complete_transaction_status_slot: Arc, max_complete_rewards_slot: Arc, prioritization_fee_cache: Arc, @@ -376,11 +372,6 @@ impl JsonRpcService { LARGEST_ACCOUNTS_CACHE_DURATION, ))); - let tpu_address = cluster_info - .my_contact_info() - .tpu(connection_cache.protocol()) - .map_err(|err| format!("{err}"))?; - // sadly, some parts of our current rpc implemention block the jsonrpc's // _socket-listening_ event loop for too long, due to (blocking) long IO or intesive CPU, // causing no further processing of incoming requests and ultimatily innocent clients timing-out. @@ -475,15 +466,6 @@ impl JsonRpcService { prioritization_fee_cache, ); - let leader_info = - poh_recorder.map(|recorder| ClusterTpuInfo::new(cluster_info.clone(), recorder)); - let client = ConnectionCacheClient::new( - connection_cache, - tpu_address, - send_transaction_service_config.tpu_peers.clone(), - leader_info, - send_transaction_service_config.leader_forward_count, - ); let _send_transaction_service = SendTransactionService::new_with_config( &bank_forks, receiver, @@ -591,6 +573,7 @@ mod tests { use { super::*, crate::rpc::{create_validator_exit, tests::new_test_cluster_info}, + solana_client::connection_cache::Protocol, solana_ledger::{ genesis_utils::{create_genesis_config, GenesisConfigInfo}, get_tmp_ledger_path_auto_delete, @@ -601,15 +584,20 @@ mod tests { genesis_config::{ClusterType, DEFAULT_GENESIS_ARCHIVE}, signature::Signer, }, + solana_send_transaction_service::{ + send_transaction_service::{self}, + test_utils::ClientWithCreator, + tpu_info::NullTpuInfo, + transaction_client::{ConnectionCacheClient, TpuClientNextClient}, + }, std::{ io::Write, net::{IpAddr, Ipv4Addr}, }, - tokio::runtime::Runtime, + tokio::runtime::{Handle, Runtime}, }; - #[test] - fn test_rpc_new() { + fn rpc_new(maybe_runtime: Option) { let GenesisConfigInfo { genesis_config, mint_keypair, @@ -630,7 +618,20 @@ mod tests { let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); - let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); + + let send_transaction_service_config = send_transaction_service::Config { + retry_rate_ms: 1000, + leader_forward_count: 1, + ..send_transaction_service::Config::default() + }; + + let my_tpu_address = cluster_info.my_contact_info().tpu(Protocol::QUIC).unwrap(); + let client = C::create_client( + maybe_runtime, + my_tpu_address, + send_transaction_service_config.tpu_peers.clone(), + send_transaction_service_config.leader_forward_count, + ); let mut rpc_service = JsonRpcService::new( rpc_addr, JsonRpcConfig::default(), @@ -639,7 +640,6 @@ mod tests { block_commitment_cache, blockstore, cluster_info, - None, Hash::default(), &PathBuf::from("farf"), validator_exit, @@ -647,14 +647,10 @@ mod tests { Arc::new(AtomicBool::new(false)), Arc::new(AtomicBool::new(true)), optimistically_confirmed_bank, - send_transaction_service::Config { - retry_rate_ms: 1000, - leader_forward_count: 1, - ..send_transaction_service::Config::default() - }, + send_transaction_service_config, Arc::new(MaxSlots::default()), Arc::new(LeaderScheduleCache::default()), - connection_cache, + client, Arc::new(AtomicU64::default()), Arc::new(AtomicU64::default()), Arc::new(PrioritizationFeeCache::default()), @@ -675,6 +671,16 @@ mod tests { rpc_service.join().unwrap(); } + #[test] + fn test_rpc_new_with_connection_cache() { + rpc_new::>(None); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_rpc_new_with_tpu_client_next() { + rpc_new::>(Some(Handle::current())); + } + fn create_bank_forks() -> Arc> { let GenesisConfigInfo { mut genesis_config, .. diff --git a/validator/src/cli.rs b/validator/src/cli.rs index 19a2b7f77e7b0a..c5bff7edfd6020 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -1571,6 +1571,15 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> { was created before we started creating ours.", ), ) + .arg( + Arg::with_name("use_tpu_client_next") + .long("use-tpu-client-next") + .takes_value(false) + .help( + "Use tpu-client-next crate to send transactions over TPU ports. If not set,\ + ConnectionCache is used instead." + ), + ) .arg( Arg::with_name("block_verification_method") .long("block-verification-method") diff --git a/validator/src/main.rs b/validator/src/main.rs index bee65e487ba92a..1672ac6d4801c0 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1599,6 +1599,7 @@ pub fn main() { tvu_shred_sigverify_threads: tvu_sigverify_threads, delay_leader_block_for_pending_fork: matches .is_present("delay_leader_block_for_pending_fork"), + use_tpu_client_next: matches.is_present("use_tpu_client_next"), wen_restart_proto_path: value_t!(matches, "wen_restart", PathBuf).ok(), wen_restart_coordinator: value_t!(matches, "wen_restart_coordinator", Pubkey).ok(), ..ValidatorConfig::default()