From 5de1266f1bf0b1755fc5dded6fca9aedc68c6697 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Tue, 3 Dec 2024 14:04:12 +0100 Subject: [PATCH 1/7] Add update_key for transaction client --- .../src/transaction_client.rs | 71 +++++++++++++++++-- 1 file changed, 64 insertions(+), 7 deletions(-) diff --git a/send-transaction-service/src/transaction_client.rs b/send-transaction-service/src/transaction_client.rs index 29d1c3c3411944..f098c95643320b 100644 --- a/send-transaction-service/src/transaction_client.rs +++ b/send-transaction-service/src/transaction_client.rs @@ -5,7 +5,7 @@ use { solana_client::connection_cache::{ConnectionCache, Protocol}, solana_connection_cache::client_connection::ClientConnection as TpuConnection, solana_measure::measure::Measure, - solana_sdk::signature::Keypair, + solana_sdk::{quic::NotifyKeyUpdate, signature::Keypair}, solana_tpu_client_next::{ connection_workers_scheduler::{ ConnectionWorkersSchedulerConfig, Fanout, TransactionStatsAndReceiver, @@ -152,6 +152,15 @@ where } } +impl NotifyKeyUpdate for ConnectionCacheClient +where + T: TpuInfoWithSendStatic, +{ + fn update_key(&self, identity: &Keypair) -> Result<(), Box> { + self.connection_cache.update_key(identity) + } +} + #[derive(Clone)] pub struct SendTransactionServiceLeaderUpdater { leader_info_provider: CurrentLeaderInfo, @@ -192,12 +201,6 @@ type TpuClientJoinHandle = /// * Update the validator identity keypair and propagate the changes to the /// scheduler. Most of the complexity of this structure arises from this /// functionality. -/// -#[allow( - dead_code, - reason = "Unused fields will be utilized soon,\ - added in advance to avoid larger changes in the code." -)] #[derive(Clone)] pub struct TpuClientNextClient where @@ -285,6 +288,60 @@ where lock.1.cancel(); Ok(()) } + + async fn do_update_key(&self, identity: &Keypair) -> Result<(), Box> { + let runtime_handle = self.runtime_handle.clone(); + let config = Self::create_config( + Some(identity.insecure_clone()), + self.leader_forward_count as usize, + ); + let leader_updater = self.leader_updater.clone(); + let handle = self.join_and_cancel.clone(); + + let join_handle = { + let Ok(mut lock) = handle.lock() else { + return Err("TpuClientNext task panicked.".into()); + }; + lock.1.cancel(); + lock.0.take() // Take the `join_handle` out of the critical section + }; + + if let Some(join_handle) = join_handle { + let Ok(result) = join_handle.await else { + return Err("TpuClientNext task panicked.".into()); + }; + + match result { + Ok((_stats, receiver)) => { + let cancel = CancellationToken::new(); + let join_handle = runtime_handle.spawn(ConnectionWorkersScheduler::run( + config, + Box::new(leader_updater), + receiver, + cancel.clone(), + )); + + let Ok(mut lock) = handle.lock() else { + return Err("TpuClientNext task panicked.".into()); + }; + *lock = (Some(join_handle), cancel); + } + Err(error) => { + return Err(Box::new(error)); + } + } + } + Ok(()) + } +} + +impl NotifyKeyUpdate for TpuClientNextClient +where + T: TpuInfoWithSendStatic + Clone, +{ + fn update_key(&self, identity: &Keypair) -> Result<(), Box> { + self.runtime_handle.block_on(self.do_update_key(identity)) + } } impl TransactionClient for TpuClientNextClient From 8c17ca63c51bd1a58deae265e3c8bcd2b32f8a98 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Tue, 3 Dec 2024 17:47:36 +0100 Subject: [PATCH 2/7] use tpu-client-next in validator --- core/src/validator.rs | 154 +++++++++++++++++----- local-cluster/src/validator_configs.rs | 1 + rpc/Cargo.toml | 1 + rpc/src/lib.rs | 2 +- rpc/src/rpc.rs | 170 +++++++++++++++---------- rpc/src/rpc_service.rs | 70 +++++----- validator/src/cli.rs | 9 ++ validator/src/main.rs | 1 + 8 files changed, 276 insertions(+), 132 deletions(-) diff --git a/core/src/validator.rs b/core/src/validator.rs index c3318ee070f2bc..5a36158f341147 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -82,6 +82,7 @@ use { solana_rayon_threadlimit::{get_max_thread_count, get_thread_count}, solana_rpc::{ cache_block_meta_service::{CacheBlockMetaSender, CacheBlockMetaService}, + cluster_tpu_info::ClusterTpuInfo, max_slots::MaxSlots, optimistically_confirmed_bank_tracker::{ BankNotificationSenderConfig, OptimisticallyConfirmedBank, @@ -120,11 +121,15 @@ use { hard_forks::HardForks, hash::Hash, pubkey::Pubkey, + quic::NotifyKeyUpdate, shred_version::compute_shred_version, signature::{Keypair, Signer}, timing::timestamp, }, - solana_send_transaction_service::send_transaction_service, + solana_send_transaction_service::{ + send_transaction_service, + transaction_client::{ConnectionCacheClient, TpuClientNextClient}, + }, solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes}, solana_turbine::{self, broadcast_stage::BroadcastStageType}, solana_unified_scheduler_pool::DefaultSchedulerPool, @@ -138,7 +143,7 @@ use { path::{Path, PathBuf}, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, - Arc, Mutex, RwLock, + Arc, LazyLock, Mutex, RwLock, }, thread::{sleep, Builder, JoinHandle}, time::{Duration, Instant}, @@ -146,9 +151,21 @@ use { strum::VariantNames, strum_macros::{Display, EnumCount, EnumIter, EnumString, EnumVariantNames, IntoStaticStr}, thiserror::Error, - tokio::runtime::Runtime as TokioRuntime, + tokio::runtime::{self, Runtime as TokioRuntime}, }; +static GLOBAL_RUNTIME: LazyLock = LazyLock::new(|| { + runtime::Builder::new_multi_thread() + .enable_all() + .build() + .expect("Failed to create Tokio runtime") +}); + +// Function to get a handle to the runtime +fn get_runtime_handle() -> tokio::runtime::Handle { + GLOBAL_RUNTIME.handle().clone() +} + const MAX_COMPLETED_DATA_SETS_IN_CHANNEL: usize = 100_000; const WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT: u64 = 80; // Right now since we reuse the wait for supermajority code, the @@ -286,6 +303,7 @@ pub struct ValidatorConfig { pub replay_transactions_threads: NonZeroUsize, pub tvu_shred_sigverify_threads: NonZeroUsize, pub delay_leader_block_for_pending_fork: bool, + pub use_tpu_client_next: bool, } impl Default for ValidatorConfig { @@ -358,6 +376,7 @@ impl Default for ValidatorConfig { replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"), tvu_shred_sigverify_threads: NonZeroUsize::new(1).expect("1 is non-zero"), delay_leader_block_for_pending_fork: false, + use_tpu_client_next: false, } } } @@ -1009,6 +1028,9 @@ impl Validator { let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); + // ConnectionCache might be used for JsonRpc and for Forwarding. Since + // the later is not migrated yet to the tpu-client-next, create + // ConnectionCache regardless of config.use_tpu_client_next for now. let connection_cache = if use_quic { let connection_cache = ConnectionCache::new_with_client_options( "connection_cache_tpu_quic", @@ -1067,6 +1089,7 @@ impl Validator { rpc_completed_slots_service, optimistically_confirmed_bank_tracker, bank_notification_sender, + client_updater, ) = if let Some((rpc_addr, rpc_pubsub_addr)) = config.rpc_addrs { assert_eq!( node.info @@ -1085,31 +1108,91 @@ impl Validator { None }; - let json_rpc_service = JsonRpcService::new( - rpc_addr, - config.rpc_config.clone(), - Some(config.snapshot_config.clone()), - bank_forks.clone(), - block_commitment_cache.clone(), - blockstore.clone(), - cluster_info.clone(), - Some(poh_recorder.clone()), - genesis_config.hash(), - ledger_path, - config.validator_exit.clone(), - exit.clone(), - rpc_override_health_check.clone(), - startup_verification_complete, - optimistically_confirmed_bank.clone(), - config.send_transaction_service_config.clone(), - max_slots.clone(), - leader_schedule_cache.clone(), - connection_cache.clone(), - max_complete_transaction_status_slot, - max_complete_rewards_slot, - prioritization_fee_cache.clone(), - ) - .map_err(ValidatorError::Other)?; + let leader_info = ClusterTpuInfo::new(cluster_info.clone(), poh_recorder.clone()); + let (json_rpc_service, client_updater) = if config.use_tpu_client_next { + let my_tpu_address = cluster_info + .my_contact_info() + .tpu(Protocol::QUIC) + .map_err(|err| ValidatorError::Other(format!("{err}")))?; + + let client = TpuClientNextClient::new( + get_runtime_handle(), + my_tpu_address, + config.send_transaction_service_config.tpu_peers.clone(), + Some(leader_info), + config.send_transaction_service_config.leader_forward_count, + Some(identity_keypair.insecure_clone()), + ); + + let json_rpc_service = JsonRpcService::new( + rpc_addr, + config.rpc_config.clone(), + Some(config.snapshot_config.clone()), + bank_forks.clone(), + block_commitment_cache.clone(), + blockstore.clone(), + cluster_info.clone(), + genesis_config.hash(), + ledger_path, + config.validator_exit.clone(), + exit.clone(), + rpc_override_health_check.clone(), + startup_verification_complete, + optimistically_confirmed_bank.clone(), + config.send_transaction_service_config.clone(), + max_slots.clone(), + leader_schedule_cache.clone(), + client.clone(), + max_complete_transaction_status_slot, + max_complete_rewards_slot, + prioritization_fee_cache.clone(), + ) + .map_err(ValidatorError::Other)?; + ( + json_rpc_service, + Arc::new(client) as Arc, + ) + } else { + let my_tpu_address = cluster_info + .my_contact_info() + .tpu(connection_cache.protocol()) + .map_err(|err| ValidatorError::Other(format!("{err}")))?; + let client = ConnectionCacheClient::new( + connection_cache.clone(), + my_tpu_address, + config.send_transaction_service_config.tpu_peers.clone(), + Some(leader_info), + config.send_transaction_service_config.leader_forward_count, + ); + let json_rpc_service = JsonRpcService::new( + rpc_addr, + config.rpc_config.clone(), + Some(config.snapshot_config.clone()), + bank_forks.clone(), + block_commitment_cache.clone(), + blockstore.clone(), + cluster_info.clone(), + genesis_config.hash(), + ledger_path, + config.validator_exit.clone(), + exit.clone(), + rpc_override_health_check.clone(), + startup_verification_complete, + optimistically_confirmed_bank.clone(), + config.send_transaction_service_config.clone(), + max_slots.clone(), + leader_schedule_cache.clone(), + client.clone(), + max_complete_transaction_status_slot, + max_complete_rewards_slot, + prioritization_fee_cache.clone(), + ) + .map_err(ValidatorError::Other)?; + ( + json_rpc_service, + Arc::new(client) as Arc, + ) + }; let pubsub_service = if !config.rpc_config.full_api { None @@ -1185,9 +1268,10 @@ impl Validator { rpc_completed_slots_service, optimistically_confirmed_bank_tracker, bank_notification_sender_config, + Some(client_updater), ) } else { - (None, None, None, None, None, None, None) + (None, None, None, None, None, None, None, None) }; if config.halt_at_slot.is_some() { @@ -1457,7 +1541,10 @@ impl Validator { config.wait_to_vote_slot, accounts_background_request_sender.clone(), config.runtime_config.log_messages_bytes_limit, - json_rpc_service.is_some().then_some(&connection_cache), // for the cache warmer only used for STS for RPC service + // for the cache warmer only used for STS for RPC service when + // tpu-client-next is not used + (json_rpc_service.is_some() && !config.use_tpu_client_next) + .then_some(&connection_cache), &prioritization_fee_cache, banking_tracer.clone(), turbine_quic_endpoint_sender.clone(), @@ -1551,7 +1638,12 @@ impl Validator { ); *start_progress.write().unwrap() = ValidatorStartProgress::Running; - key_notifies.push(connection_cache); + if let Some(client_updater) = client_updater { + key_notifies.push(client_updater); + } else { + // add connection_cache because it is still used in Forwarder. + key_notifies.push(connection_cache); + } *admin_rpc_service_post_init.write().unwrap() = Some(AdminRpcRequestMetadataPostInit { bank_forks: bank_forks.clone(), diff --git a/local-cluster/src/validator_configs.rs b/local-cluster/src/validator_configs.rs index e90475aad2a06f..506ffc95ee4bec 100644 --- a/local-cluster/src/validator_configs.rs +++ b/local-cluster/src/validator_configs.rs @@ -73,6 +73,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { replay_transactions_threads: config.replay_transactions_threads, tvu_shred_sigverify_threads: config.tvu_shred_sigverify_threads, delay_leader_block_for_pending_fork: config.delay_leader_block_for_pending_fork, + use_tpu_client_next: config.use_tpu_client_next, } } diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index bbda760bac0493..d747afafa9f9ae 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -71,6 +71,7 @@ solana-runtime = { workspace = true, features = ["dev-context-only-utils"] } solana-runtime-transaction = { workspace = true, features = [ "dev-context-only-utils", ] } +solana-send-transaction-service = { workspace = true, features = ["dev-context-only-utils"] } solana-stake-program = { workspace = true } spl-pod = { workspace = true } symlink = { workspace = true } diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index 38db7b12e3d942..4a71c835a825d9 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -1,6 +1,6 @@ #![allow(clippy::arithmetic_side_effects)] pub mod cache_block_meta_service; -mod cluster_tpu_info; +pub mod cluster_tpu_info; pub mod filter; pub mod max_slots; pub mod optimistically_confirmed_bank_tracker; diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index dfe856ee0b6a1a..c0e3f63ca7acc6 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -115,15 +115,14 @@ use { }; #[cfg(test)] use { - solana_client::connection_cache::ConnectionCache, solana_gossip::contact_info::ContactInfo, solana_ledger::get_tmp_ledger_path, solana_runtime::commitment::CommitmentSlots, solana_send_transaction_service::{ - send_transaction_service::SendTransactionService, tpu_info::NullTpuInfo, - transaction_client::ConnectionCacheClient, + send_transaction_service::SendTransactionService, test_utils::ClientWithCreator, }, solana_streamer::socket::SocketAddrSpace, + tokio::runtime::Handle, }; pub mod account_resolver; @@ -357,10 +356,10 @@ impl JsonRpcRequestProcessor { } #[cfg(test)] - pub fn new_from_bank( + pub fn new_from_bank( + runtime: Option, bank: Bank, socket_addr_space: SocketAddrSpace, - connection_cache: Arc, ) -> Self { let genesis_hash = bank.hash(); let bank_forks = BankForks::new_rw_arc(bank); @@ -375,19 +374,11 @@ impl JsonRpcRequestProcessor { ); ClusterInfo::new(contact_info, keypair, socket_addr_space) }); - let tpu_address = cluster_info - .my_contact_info() - .tpu(connection_cache.protocol()) - .unwrap(); + + let my_tpu_address = cluster_info.my_contact_info().tpu(Protocol::QUIC).unwrap(); let (transaction_sender, transaction_receiver) = unbounded(); - let client = ConnectionCacheClient::::new( - connection_cache.clone(), - tpu_address, - None, - None, - 1, - ); + let client = Client::create_client(runtime, my_tpu_address, None, 1); SendTransactionService::new( &bank_forks, transaction_receiver, @@ -4391,7 +4382,8 @@ pub mod tests { vote::state::VoteState, }, solana_send_transaction_service::{ - tpu_info::NullTpuInfo, transaction_client::ConnectionCacheClient, + tpu_info::NullTpuInfo, + transaction_client::{ConnectionCacheClient, TpuClientNextClient}, }, solana_transaction_status::{ EncodedConfirmedBlock, EncodedTransaction, EncodedTransactionWithStatusMeta, @@ -4412,6 +4404,7 @@ pub mod tests { state::{AccountState as TokenAccountState, Mint}, }, std::{borrow::Cow, collections::HashMap, net::Ipv4Addr}, + tokio::runtime::Handle, }; const TEST_MINT_LAMPORTS: u64 = 1_000_000_000; @@ -4772,16 +4765,14 @@ pub mod tests { } } - #[test] - fn test_rpc_request_processor_new() { + fn rpc_request_processor_new(maybe_runtime: Option) { let bob_pubkey = solana_sdk::pubkey::new_rand(); let genesis = create_genesis_config(100); let bank = Bank::new_for_tests(&genesis.genesis_config); - let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); - let meta = JsonRpcRequestProcessor::new_from_bank( + let meta = JsonRpcRequestProcessor::new_from_bank::( + maybe_runtime, bank, SocketAddrSpace::Unspecified, - connection_cache, ); let bank = meta.bank_forks.read().unwrap().root_bank(); @@ -4796,15 +4787,23 @@ pub mod tests { } #[test] - fn test_rpc_get_balance() { + fn test_rpc_request_processor_new_connection_cache() { + rpc_request_processor_new::>(None); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_rpc_request_processor_new_tpu_client_next() { + rpc_request_processor_new::>(Some(Handle::current())); + } + + fn rpc_get_balance(maybe_runtime: Option) { let genesis = create_genesis_config(20); let mint_pubkey = genesis.mint_keypair.pubkey(); let bank = Bank::new_for_tests(&genesis.genesis_config); - let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); - let meta = JsonRpcRequestProcessor::new_from_bank( + let meta = JsonRpcRequestProcessor::new_from_bank::( + maybe_runtime, bank, SocketAddrSpace::Unspecified, - connection_cache, ); let mut io = MetaIoHandler::default(); @@ -4828,15 +4827,23 @@ pub mod tests { } #[test] - fn test_rpc_get_balance_via_client() { + fn test_rpc_get_balance_new_connection_cache() { + rpc_get_balance::>(None); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_rpc_get_balance_new_tpu_client_next() { + rpc_get_balance::>(Some(Handle::current())); + } + + fn rpc_get_balance_via_client(maybe_runtime: Option) { let genesis = create_genesis_config(20); let mint_pubkey = genesis.mint_keypair.pubkey(); let bank = Bank::new_for_tests(&genesis.genesis_config); - let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); - let meta = JsonRpcRequestProcessor::new_from_bank( + let meta = JsonRpcRequestProcessor::new_from_bank::( + maybe_runtime, bank, SocketAddrSpace::Unspecified, - connection_cache, ); let mut io = MetaIoHandler::default(); @@ -4861,6 +4868,16 @@ pub mod tests { assert_eq!(response, 20); } + #[test] + fn test_rpc_get_balance_via_client_connection_cache() { + rpc_get_balance_via_client::>(None); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_rpc_get_balance_via_client_tpu_client_next() { + rpc_get_balance_via_client::>(Some(Handle::current())); + } + #[test] fn test_rpc_get_cluster_nodes() { let rpc = RpcHandler::start(); @@ -4955,16 +4972,14 @@ pub mod tests { assert_eq!(result, expected); } - #[test] - fn test_rpc_get_tx_count() { + fn rpc_get_tx_count(maybe_runtime: Option) { let bob_pubkey = solana_sdk::pubkey::new_rand(); let genesis = create_genesis_config(10); let bank = Bank::new_for_tests(&genesis.genesis_config); - let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); - let meta = JsonRpcRequestProcessor::new_from_bank( + let meta = JsonRpcRequestProcessor::new_from_bank::( + maybe_runtime, bank, SocketAddrSpace::Unspecified, - connection_cache, ); let mut io = MetaIoHandler::default(); @@ -4991,6 +5006,17 @@ pub mod tests { assert_eq!(result, expected); } + #[test] + fn test_rpc_get_tx_count_connection_cache() { + rpc_get_tx_count::>(None); + } + + #[tokio::test(flavor = "multi_thread")] + + async fn test_rpc_get_tx_count_tpu_client_next() { + rpc_get_tx_count::>(Some(Handle::current())); + } + #[test] fn test_rpc_minimum_ledger_slot() { let rpc = RpcHandler::start(); @@ -6426,15 +6452,13 @@ pub mod tests { assert_eq!(result, expected); } - #[test] - fn test_rpc_send_bad_tx() { + fn rpc_send_bad_tx(maybe_runtime: Option) { let genesis = create_genesis_config(100); let bank = Bank::new_for_tests(&genesis.genesis_config); - let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); - let meta = JsonRpcRequestProcessor::new_from_bank( + let meta = JsonRpcRequestProcessor::new_from_bank::( + maybe_runtime, bank, SocketAddrSpace::Unspecified, - connection_cache, ); let mut io = MetaIoHandler::default(); @@ -6448,7 +6472,16 @@ pub mod tests { } #[test] - fn test_rpc_send_transaction_preflight() { + fn test_rpc_send_bad_tx_connection_cache() { + rpc_send_bad_tx::>(None); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_rpc_send_bad_tx_tpu_client_next() { + rpc_send_bad_tx::>(Some(Handle::current())); + } + + fn rpc_send_transaction_preflight(maybe_runtime: Option) { let exit = Arc::new(AtomicBool::new(false)); let validator_exit = create_validator_exit(exit.clone()); let ledger_path = get_tmp_ledger_path!(); @@ -6474,11 +6507,7 @@ pub mod tests { ); ClusterInfo::new(contact_info, keypair, SocketAddrSpace::Unspecified) }); - let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); - let tpu_address = cluster_info - .my_contact_info() - .tpu(connection_cache.protocol()) - .unwrap(); + let my_tpu_address = cluster_info.my_contact_info().tpu(Protocol::QUIC).unwrap(); let (meta, receiver) = JsonRpcRequestProcessor::new( JsonRpcConfig::default(), None, @@ -6498,13 +6527,7 @@ pub mod tests { Arc::new(AtomicU64::default()), Arc::new(PrioritizationFeeCache::default()), ); - let client = ConnectionCacheClient::::new( - connection_cache.clone(), - tpu_address, - None, - None, - 1, - ); + let client = C::create_client(maybe_runtime, my_tpu_address, None, 1); SendTransactionService::new(&bank_forks, receiver, client, 1000, exit.clone()); let mut bad_transaction = system_transaction::transfer( @@ -6608,6 +6631,16 @@ pub mod tests { ); } + #[test] + fn test_rpc_send_transaction_preflight_with_connection_cache() { + rpc_send_transaction_preflight::>(None); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_rpc_send_transaction_preflight_with_tpu_client_next() { + rpc_send_transaction_preflight::>(Some(Handle::current())); + } + #[test] fn test_rpc_verify_filter() { let filter = RpcFilterType::Memcmp(Memcmp::new( @@ -6720,8 +6753,7 @@ pub mod tests { assert_eq!(result, expected); } - #[test] - fn test_rpc_processor_get_block_commitment() { + fn rpc_processor_get_block_commitment(maybe_runtime: Option) { let exit = Arc::new(AtomicBool::new(false)); let validator_exit = create_validator_exit(exit.clone()); let bank_forks = new_bank_forks().0; @@ -6744,11 +6776,7 @@ pub mod tests { ))); let cluster_info = Arc::new(new_test_cluster_info()); - let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); - let tpu_address = cluster_info - .my_contact_info() - .tpu(connection_cache.protocol()) - .unwrap(); + let my_tpu_address = cluster_info.my_contact_info().tpu(Protocol::QUIC).unwrap(); let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); let (request_processor, receiver) = JsonRpcRequestProcessor::new( @@ -6770,14 +6798,8 @@ pub mod tests { Arc::new(AtomicU64::default()), Arc::new(PrioritizationFeeCache::default()), ); - let client = ConnectionCacheClient::::new( - connection_cache.clone(), - tpu_address, - None, - None, - 1, - ); - SendTransactionService::new(&bank_forks, receiver, client, 1000, exit.clone()); + let client = C::create_client(maybe_runtime, my_tpu_address, None, 1); + SendTransactionService::new(&bank_forks, receiver, client, 1000, exit); assert_eq!( request_processor.get_block_commitment(0), @@ -6802,6 +6824,18 @@ pub mod tests { ); } + #[test] + fn test_rpc_processor_get_block_commitment_with_connection_cache() { + rpc_processor_get_block_commitment::>(None); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_rpc_processor_get_block_commitment_with_tpu_client_next() { + rpc_processor_get_block_commitment::>(Some( + Handle::current(), + )); + } + #[test] fn test_rpc_get_block_commitment() { let rpc = RpcHandler::start(); diff --git a/rpc/src/rpc_service.rs b/rpc/src/rpc_service.rs index 2c582be72ea5a6..2d09765e08c784 100644 --- a/rpc/src/rpc_service.rs +++ b/rpc/src/rpc_service.rs @@ -2,7 +2,6 @@ use { crate::{ - cluster_tpu_info::ClusterTpuInfo, max_slots::MaxSlots, optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank, rpc::{rpc_accounts::*, rpc_accounts_scan::*, rpc_bank::*, rpc_full::*, rpc_minimal::*, *}, @@ -16,7 +15,6 @@ use { RequestMiddlewareAction, ServerBuilder, }, regex::Regex, - solana_client::connection_cache::ConnectionCache, solana_gossip::cluster_info::ClusterInfo, solana_ledger::{ bigtable_upload::ConfirmedBlockUploadConfig, @@ -25,7 +23,6 @@ use { }, solana_metrics::inc_new_counter_info, solana_perf::thread::renice_this_thread, - solana_poh::poh_recorder::PohRecorder, solana_runtime::{ bank_forks::BankForks, commitment::BlockCommitmentCache, prioritization_fee_cache::PrioritizationFeeCache, @@ -38,7 +35,7 @@ use { }, solana_send_transaction_service::{ send_transaction_service::{self, SendTransactionService}, - transaction_client::ConnectionCacheClient, + transaction_client::TransactionClient, }, solana_storage_bigtable::CredentialType, std::{ @@ -335,7 +332,7 @@ fn process_rest(bank_forks: &Arc>, path: &str) -> Option( rpc_addr: SocketAddr, config: JsonRpcConfig, snapshot_config: Option, @@ -343,7 +340,6 @@ impl JsonRpcService { block_commitment_cache: Arc>, blockstore: Arc, cluster_info: Arc, - poh_recorder: Option>>, genesis_hash: Hash, ledger_path: &Path, validator_exit: Arc>, @@ -354,7 +350,7 @@ impl JsonRpcService { send_transaction_service_config: send_transaction_service::Config, max_slots: Arc, leader_schedule_cache: Arc, - connection_cache: Arc, + client: Client, max_complete_transaction_status_slot: Arc, max_complete_rewards_slot: Arc, prioritization_fee_cache: Arc, @@ -376,11 +372,6 @@ impl JsonRpcService { LARGEST_ACCOUNTS_CACHE_DURATION, ))); - let tpu_address = cluster_info - .my_contact_info() - .tpu(connection_cache.protocol()) - .map_err(|err| format!("{err}"))?; - // sadly, some parts of our current rpc implemention block the jsonrpc's // _socket-listening_ event loop for too long, due to (blocking) long IO or intesive CPU, // causing no further processing of incoming requests and ultimatily innocent clients timing-out. @@ -475,15 +466,6 @@ impl JsonRpcService { prioritization_fee_cache, ); - let leader_info = - poh_recorder.map(|recorder| ClusterTpuInfo::new(cluster_info.clone(), recorder)); - let client = ConnectionCacheClient::new( - connection_cache, - tpu_address, - send_transaction_service_config.tpu_peers.clone(), - leader_info, - send_transaction_service_config.leader_forward_count, - ); let _send_transaction_service = SendTransactionService::new_with_config( &bank_forks, receiver, @@ -591,6 +573,7 @@ mod tests { use { super::*, crate::rpc::{create_validator_exit, tests::new_test_cluster_info}, + solana_client::connection_cache::Protocol, solana_ledger::{ genesis_utils::{create_genesis_config, GenesisConfigInfo}, get_tmp_ledger_path_auto_delete, @@ -601,15 +584,20 @@ mod tests { genesis_config::{ClusterType, DEFAULT_GENESIS_ARCHIVE}, signature::Signer, }, + solana_send_transaction_service::{ + send_transaction_service::{self}, + test_utils::ClientWithCreator, + tpu_info::NullTpuInfo, + transaction_client::{ConnectionCacheClient, TpuClientNextClient}, + }, std::{ io::Write, net::{IpAddr, Ipv4Addr}, }, - tokio::runtime::Runtime, + tokio::runtime::{Handle, Runtime}, }; - #[test] - fn test_rpc_new() { + fn rpc_new(maybe_runtime: Option) { let GenesisConfigInfo { genesis_config, mint_keypair, @@ -630,7 +618,20 @@ mod tests { let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); - let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); + + let send_transaction_service_config = send_transaction_service::Config { + retry_rate_ms: 1000, + leader_forward_count: 1, + ..send_transaction_service::Config::default() + }; + + let my_tpu_address = cluster_info.my_contact_info().tpu(Protocol::QUIC).unwrap(); + let client = C::create_client( + maybe_runtime, + my_tpu_address, + send_transaction_service_config.tpu_peers.clone(), + send_transaction_service_config.leader_forward_count, + ); let mut rpc_service = JsonRpcService::new( rpc_addr, JsonRpcConfig::default(), @@ -639,7 +640,6 @@ mod tests { block_commitment_cache, blockstore, cluster_info, - None, Hash::default(), &PathBuf::from("farf"), validator_exit, @@ -647,14 +647,10 @@ mod tests { Arc::new(AtomicBool::new(false)), Arc::new(AtomicBool::new(true)), optimistically_confirmed_bank, - send_transaction_service::Config { - retry_rate_ms: 1000, - leader_forward_count: 1, - ..send_transaction_service::Config::default() - }, + send_transaction_service_config, Arc::new(MaxSlots::default()), Arc::new(LeaderScheduleCache::default()), - connection_cache, + client, Arc::new(AtomicU64::default()), Arc::new(AtomicU64::default()), Arc::new(PrioritizationFeeCache::default()), @@ -675,6 +671,16 @@ mod tests { rpc_service.join().unwrap(); } + #[test] + fn test_rpc_new_with_connection_cache() { + rpc_new::>(None); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_rpc_new_with_tpu_client_next() { + rpc_new::>(Some(Handle::current())); + } + fn create_bank_forks() -> Arc> { let GenesisConfigInfo { mut genesis_config, .. diff --git a/validator/src/cli.rs b/validator/src/cli.rs index aed7a3bffc1d9f..352aa558d1c7dc 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -1579,6 +1579,15 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> { was created before we started creating ours.", ), ) + .arg( + Arg::with_name("use_tpu_client_next") + .long("use-tpu-client-next") + .takes_value(false) + .help( + "Use tpu-client-next crate to send transactions over TPU ports. If not set,\ + ConnectionCache is used instead." + ), + ) .arg( Arg::with_name("block_verification_method") .long("block-verification-method") diff --git a/validator/src/main.rs b/validator/src/main.rs index a7de615b3be9ac..79d49584fee125 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1604,6 +1604,7 @@ pub fn main() { tvu_shred_sigverify_threads: tvu_sigverify_threads, delay_leader_block_for_pending_fork: matches .is_present("delay_leader_block_for_pending_fork"), + use_tpu_client_next: matches.is_present("use_tpu_client_next"), wen_restart_proto_path: value_t!(matches, "wen_restart", PathBuf).ok(), wen_restart_coordinator: value_t!(matches, "wen_restart_coordinator", Pubkey).ok(), ..ValidatorConfig::default() From 55b80707a82a6ba7ec1e98b0899eaf4ab5182742 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Tue, 10 Dec 2024 17:48:24 +0100 Subject: [PATCH 3/7] Rename runtime --- core/src/validator.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/core/src/validator.rs b/core/src/validator.rs index 5a36158f341147..f21bea3e6403e8 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -154,18 +154,13 @@ use { tokio::runtime::{self, Runtime as TokioRuntime}, }; -static GLOBAL_RUNTIME: LazyLock = LazyLock::new(|| { +static STS_CLIENT_RUNTIME: LazyLock = LazyLock::new(|| { runtime::Builder::new_multi_thread() .enable_all() .build() .expect("Failed to create Tokio runtime") }); -// Function to get a handle to the runtime -fn get_runtime_handle() -> tokio::runtime::Handle { - GLOBAL_RUNTIME.handle().clone() -} - const MAX_COMPLETED_DATA_SETS_IN_CHANNEL: usize = 100_000; const WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT: u64 = 80; // Right now since we reuse the wait for supermajority code, the @@ -1116,7 +1111,7 @@ impl Validator { .map_err(|err| ValidatorError::Other(format!("{err}")))?; let client = TpuClientNextClient::new( - get_runtime_handle(), + STS_CLIENT_RUNTIME.handle().clone(), my_tpu_address, config.send_transaction_service_config.tpu_peers.clone(), Some(leader_info), From 13efce472907a1fb074e06472624c527e5cf0688 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Tue, 10 Dec 2024 18:25:40 +0100 Subject: [PATCH 4/7] Address PR comments --- core/src/validator.rs | 7 +++---- rpc/src/rpc.rs | 12 ++++++++++++ 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/core/src/validator.rs b/core/src/validator.rs index f21bea3e6403e8..e2850fe55123d4 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -1024,7 +1024,7 @@ impl Validator { let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); // ConnectionCache might be used for JsonRpc and for Forwarding. Since - // the later is not migrated yet to the tpu-client-next, create + // the latter is not migrated yet to the tpu-client-next, create // ConnectionCache regardless of config.use_tpu_client_next for now. let connection_cache = if use_quic { let connection_cache = ConnectionCache::new_with_client_options( @@ -1635,10 +1635,9 @@ impl Validator { *start_progress.write().unwrap() = ValidatorStartProgress::Running; if let Some(client_updater) = client_updater { key_notifies.push(client_updater); - } else { - // add connection_cache because it is still used in Forwarder. - key_notifies.push(connection_cache); } + // add connection_cache because it is still used in Forwarder. + key_notifies.push(connection_cache); *admin_rpc_service_post_init.write().unwrap() = Some(AdminRpcRequestMetadataPostInit { bank_forks: bank_forks.clone(), diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index c0e3f63ca7acc6..f7637539f5b531 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -379,6 +379,10 @@ impl JsonRpcRequestProcessor { let (transaction_sender, transaction_receiver) = unbounded(); let client = Client::create_client(runtime, my_tpu_address, None, 1); + assert!( + client.protocol() == Protocol::QUIC, + "UDP is not supported by this test." + ); SendTransactionService::new( &bank_forks, transaction_receiver, @@ -6528,6 +6532,10 @@ pub mod tests { Arc::new(PrioritizationFeeCache::default()), ); let client = C::create_client(maybe_runtime, my_tpu_address, None, 1); + assert!( + client.protocol() == Protocol::QUIC, + "UDP is not supported by this test." + ); SendTransactionService::new(&bank_forks, receiver, client, 1000, exit.clone()); let mut bad_transaction = system_transaction::transfer( @@ -6799,6 +6807,10 @@ pub mod tests { Arc::new(PrioritizationFeeCache::default()), ); let client = C::create_client(maybe_runtime, my_tpu_address, None, 1); + assert!( + client.protocol() == Protocol::QUIC, + "UDP is not supported by this test." + ); SendTransactionService::new(&bank_forks, receiver, client, 1000, exit); assert_eq!( From 20d5615ae00ce61be92e791d0078a15b3bf319e3 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Fri, 13 Dec 2024 19:10:40 +0100 Subject: [PATCH 5/7] address PR comments --- core/src/validator.rs | 28 ++++++++++--------- rpc/src/rpc.rs | 1 - .../src/transaction_client.rs | 5 ++-- 3 files changed, 18 insertions(+), 16 deletions(-) diff --git a/core/src/validator.rs b/core/src/validator.rs index e2850fe55123d4..167f35f662482a 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -143,7 +143,7 @@ use { path::{Path, PathBuf}, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, - Arc, LazyLock, Mutex, RwLock, + Arc, Mutex, RwLock, }, thread::{sleep, Builder, JoinHandle}, time::{Duration, Instant}, @@ -154,13 +154,6 @@ use { tokio::runtime::{self, Runtime as TokioRuntime}, }; -static STS_CLIENT_RUNTIME: LazyLock = LazyLock::new(|| { - runtime::Builder::new_multi_thread() - .enable_all() - .build() - .expect("Failed to create Tokio runtime") -}); - const MAX_COMPLETED_DATA_SETS_IN_CHANNEL: usize = 100_000; const WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT: u64 = 80; // Right now since we reuse the wait for supermajority code, the @@ -1105,13 +1098,20 @@ impl Validator { let leader_info = ClusterTpuInfo::new(cluster_info.clone(), poh_recorder.clone()); let (json_rpc_service, client_updater) = if config.use_tpu_client_next { + let tpu_client_next_runtime = runtime::Builder::new_multi_thread() + .enable_all() + .thread_name("solSTSQuic") + .build() + .expect( + "Should be possible to create tokio runtime for SendTransactionService.", + ); let my_tpu_address = cluster_info .my_contact_info() .tpu(Protocol::QUIC) .map_err(|err| ValidatorError::Other(format!("{err}")))?; let client = TpuClientNextClient::new( - STS_CLIENT_RUNTIME.handle().clone(), + tpu_client_next_runtime.handle().clone(), my_tpu_address, config.send_transaction_service_config.tpu_peers.clone(), Some(leader_info), @@ -1488,6 +1488,11 @@ impl Validator { let cluster_slots = Arc::new(crate::cluster_slots_service::cluster_slots::ClusterSlots::default()); + // If RPC is supported and ConnectionCache is used, pass ConnectionCache for being warmup inside Tvu. + let connection_cache_for_warmup = (json_rpc_service.is_some() + && !config.use_tpu_client_next) + .then_some(&connection_cache); + let tvu = Tvu::new( vote_account, authorized_voter_keypairs, @@ -1536,10 +1541,7 @@ impl Validator { config.wait_to_vote_slot, accounts_background_request_sender.clone(), config.runtime_config.log_messages_bytes_limit, - // for the cache warmer only used for STS for RPC service when - // tpu-client-next is not used - (json_rpc_service.is_some() && !config.use_tpu_client_next) - .then_some(&connection_cache), + connection_cache_for_warmup, &prioritization_fee_cache, banking_tracer.clone(), turbine_quic_endpoint_sender.clone(), diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index f7637539f5b531..34ac35519b37ef 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -5016,7 +5016,6 @@ pub mod tests { } #[tokio::test(flavor = "multi_thread")] - async fn test_rpc_get_tx_count_tpu_client_next() { rpc_get_tx_count::>(Some(Handle::current())); } diff --git a/send-transaction-service/src/transaction_client.rs b/send-transaction-service/src/transaction_client.rs index f098c95643320b..45a6234a66a4d1 100644 --- a/send-transaction-service/src/transaction_client.rs +++ b/send-transaction-service/src/transaction_client.rs @@ -302,8 +302,9 @@ where let Ok(mut lock) = handle.lock() else { return Err("TpuClientNext task panicked.".into()); }; - lock.1.cancel(); - lock.0.take() // Take the `join_handle` out of the critical section + let (handle, token) = std::mem::take(&mut *lock); + token.cancel(); + handle }; if let Some(join_handle) = join_handle { From fc53bea9d57a79254d4bf6f2da7d54f98ba6003c Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Fri, 13 Dec 2024 19:42:01 +0100 Subject: [PATCH 6/7] add exit to TransactionClient --- .../src/send_transaction_service.rs | 26 ++++++++++++------- .../src/transaction_client.rs | 26 ++++++++++++++++++- 2 files changed, 41 insertions(+), 11 deletions(-) diff --git a/send-transaction-service/src/send_transaction_service.rs b/send-transaction-service/src/send_transaction_service.rs index 928a67dc4d84ca..523c39adbbe9df 100644 --- a/send-transaction-service/src/send_transaction_service.rs +++ b/send-transaction-service/src/send_transaction_service.rs @@ -52,10 +52,11 @@ const DEFAULT_BATCH_SEND_RATE_MS: u64 = 1; // The maximum transaction batch send rate in MS pub const MAX_BATCH_SEND_RATE_MS: usize = 100_000; -pub struct SendTransactionService { +pub struct SendTransactionService { receive_txn_thread: JoinHandle<()>, retry_thread: JoinHandle<()>, exit: Arc, + client: Client, } pub struct TransactionInfo { @@ -145,8 +146,10 @@ impl Default for Config { /// processing the transactions that need to be retried. pub const MAX_RETRY_SLEEP_MS: u64 = 1_000; -impl SendTransactionService { - pub fn new( +impl + SendTransactionService +{ + pub fn new( bank_forks: &Arc>, receiver: Receiver, client: Client, @@ -157,10 +160,10 @@ impl SendTransactionService { retry_rate_ms, ..Config::default() }; - Self::new_with_config::(bank_forks, receiver, client, config, exit) + Self::new_with_config(bank_forks, receiver, client, config, exit) } - pub fn new_with_config( + pub fn new_with_config( bank_forks: &Arc>, receiver: Receiver, client: Client, @@ -186,7 +189,7 @@ impl SendTransactionService { let retry_thread = Self::retry_thread( bank_forks.clone(), - client, + client.clone(), retry_transactions, config.retry_rate_ms, config.service_max_retries, @@ -199,12 +202,13 @@ impl SendTransactionService { receive_txn_thread, retry_thread, exit, + client, } } /// Thread responsible for receiving transactions from RPC clients. #[allow(clippy::too_many_arguments)] - fn receive_txn_thread( + fn receive_txn_thread( receiver: Receiver, client: Client, retry_transactions: Arc>>, @@ -314,7 +318,7 @@ impl SendTransactionService { } /// Thread responsible for retrying transactions - fn retry_thread( + fn retry_thread( bank_forks: Arc>, client: Client, retry_transactions: Arc>>, @@ -377,7 +381,7 @@ impl SendTransactionService { } /// Retry transactions sent before. - fn process_transactions( + fn process_transactions( working_bank: &Bank, root_bank: &Bank, transactions: &mut HashMap, @@ -528,7 +532,9 @@ impl SendTransactionService { pub fn join(self) -> thread::Result<()> { self.receive_txn_thread.join()?; self.exit.store(true, Ordering::Relaxed); - self.retry_thread.join() + let retry_result = self.retry_thread.join(); + self.client.exit(); + retry_result } } diff --git a/send-transaction-service/src/transaction_client.rs b/send-transaction-service/src/transaction_client.rs index 45a6234a66a4d1..a73efe8fc2687b 100644 --- a/send-transaction-service/src/transaction_client.rs +++ b/send-transaction-service/src/transaction_client.rs @@ -1,7 +1,7 @@ use { crate::{send_transaction_service_stats::SendTransactionServiceStats, tpu_info::TpuInfo}, async_trait::async_trait, - log::warn, + log::{debug, error, warn}, solana_client::connection_cache::{ConnectionCache, Protocol}, solana_connection_cache::client_connection::ClientConnection as TpuConnection, solana_measure::measure::Measure, @@ -39,6 +39,8 @@ pub trait TransactionClient { ); fn protocol(&self) -> Protocol; + + fn exit(&self); } pub struct ConnectionCacheClient { @@ -150,6 +152,8 @@ where fn protocol(&self) -> Protocol { self.connection_cache.protocol() } + + fn exit(&self) {} } impl NotifyKeyUpdate for ConnectionCacheClient @@ -373,6 +377,26 @@ where fn protocol(&self) -> Protocol { Protocol::QUIC } + + fn exit(&self) { + let Ok(mut lock) = self.join_and_cancel.lock() else { + error!("Failed to stop scheduler: TpuClientNext task panicked."); + return; + }; + let (handle, token) = std::mem::take(&mut *lock); + token.cancel(); + if let Some(handle) = handle { + match self.runtime_handle.block_on(handle) { + Ok(result) => match result { + Ok(stats) => { + debug!("tpu-client-next statistics over all the connections: {stats:?}"); + } + Err(error) => error!("tpu-client-next exits with error {error}."), + }, + Err(error) => error!("Failed to join task {error}."), + } + } + } } /// The leader info refresh rate. From 50fd073f764569c215148041972a752afb43c66c Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Mon, 16 Dec 2024 16:03:50 +0100 Subject: [PATCH 7/7] parametrize test for admin set_identity with client --- validator/src/admin_rpc_service.rs | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/validator/src/admin_rpc_service.rs b/validator/src/admin_rpc_service.rs index 851c2959e97c0b..1a4232e6bc3456 100644 --- a/validator/src/admin_rpc_service.rs +++ b/validator/src/admin_rpc_service.rs @@ -1350,7 +1350,7 @@ mod tests { } impl TestValidatorWithAdminRpc { - fn new() -> Self { + fn new(use_tpu_client_next: bool) -> Self { let leader_keypair = Keypair::new(); let leader_node = Node::new_localhost_with_pubkey(&leader_keypair.pubkey()); @@ -1369,6 +1369,7 @@ mod tests { validator_node.info.rpc().unwrap(), validator_node.info.rpc_pubsub().unwrap(), )), + use_tpu_client_next, ..ValidatorConfig::default_for_test() }; let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default())); @@ -1433,9 +1434,8 @@ mod tests { } // This test checks that `set_identity` call works with working validator and client. - #[test] - fn test_set_identity_with_validator() { - let test_validator = TestValidatorWithAdminRpc::new(); + fn set_identity_with_validator(use_tpu_client_next: bool) { + let test_validator = TestValidatorWithAdminRpc::new(use_tpu_client_next); let expected_validator_id = Keypair::new(); let validator_id_bytes = format!("{:?}", expected_validator_id.to_bytes()); @@ -1478,4 +1478,14 @@ mod tests { .expect("actual response deserialization"); assert_eq!(actual_parsed_response, expected_parsed_response); } + + #[test] + fn test_set_identity_with_validator_connection_cache() { + set_identity_with_validator(false); + } + + #[test] + fn test_set_identity_with_validator_tpu_client_next() { + set_identity_with_validator(true); + } }