diff --git a/Cargo.lock b/Cargo.lock index fef231bb21e99f..92a95013cd7ee2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8665,6 +8665,7 @@ dependencies = [ name = "solana-send-transaction-service" version = "2.2.0" dependencies = [ + "async-trait", "crossbeam-channel", "itertools 0.12.1", "log", @@ -8673,9 +8674,12 @@ dependencies = [ "solana-logger", "solana-measure", "solana-metrics", + "solana-quic-client", "solana-runtime", "solana-sdk", - "solana-tpu-client", + "solana-tpu-client-next", + "tokio", + "tokio-util 0.7.12", ] [[package]] diff --git a/banks-server/src/banks_server.rs b/banks-server/src/banks_server.rs index 2fb0a8b7559659..46bdfa6c5f98f0 100644 --- a/banks-server/src/banks_server.rs +++ b/banks-server/src/banks_server.rs @@ -29,6 +29,7 @@ use { solana_send_transaction_service::{ send_transaction_service::{SendTransactionService, TransactionInfo}, tpu_info::NullTpuInfo, + transaction_client::ConnectionCacheClient, }, std::{ io, @@ -454,17 +455,16 @@ pub async fn start_tcp_server( .map(move |chan| { let (sender, receiver) = unbounded(); - SendTransactionService::new::( + let client = ConnectionCacheClient::::new( + connection_cache.clone(), tpu_addr, - &bank_forks, None, - receiver, - connection_cache.clone(), - 5_000, + None, 0, - exit.clone(), ); + SendTransactionService::new(&bank_forks, receiver, client, 5_000, exit.clone()); + let server = BanksServer::new( bank_forks.clone(), block_commitment_cache.clone(), diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 5d32b4f3d7dca6..bfd0b57baea574 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -7333,6 +7333,7 @@ dependencies = [ name = "solana-send-transaction-service" version = "2.2.0" dependencies = [ + "async-trait", "crossbeam-channel", "itertools 0.12.1", "log", @@ -7340,9 +7341,12 @@ dependencies = [ "solana-connection-cache", "solana-measure", "solana-metrics", + "solana-quic-client", "solana-runtime", "solana-sdk", - "solana-tpu-client", + "solana-tpu-client-next", + "tokio", + "tokio-util 0.7.1", ] [[package]] @@ -7737,6 +7741,27 @@ dependencies = [ "tokio", ] +[[package]] +name = "solana-tpu-client-next" +version = "2.2.0" +dependencies = [ + "async-trait", + "log", + "lru", + "quinn", + "rustls 0.23.19", + "solana-connection-cache", + "solana-logger", + "solana-measure", + "solana-rpc-client", + "solana-sdk", + "solana-streamer", + "solana-tpu-client", + "thiserror 2.0.3", + "tokio", + "tokio-util 0.7.1", +] + [[package]] name = "solana-transaction-context" version = "2.2.0" diff --git a/rpc/src/cluster_tpu_info.rs b/rpc/src/cluster_tpu_info.rs index 777dfd5b8ff868..c57e76ff566649 100644 --- a/rpc/src/cluster_tpu_info.rs +++ b/rpc/src/cluster_tpu_info.rs @@ -1,10 +1,7 @@ use { solana_gossip::{cluster_info::ClusterInfo, contact_info::Protocol}, solana_poh::poh_recorder::PohRecorder, - solana_sdk::{ - clock::{Slot, NUM_CONSECUTIVE_LEADER_SLOTS}, - pubkey::Pubkey, - }, + solana_sdk::{clock::NUM_CONSECUTIVE_LEADER_SLOTS, pubkey::Pubkey}, solana_send_transaction_service::tpu_info::TpuInfo, std::{ collections::HashMap, @@ -50,7 +47,7 @@ impl TpuInfo for ClusterTpuInfo { .collect(); } - fn get_leader_tpus(&self, max_count: u64, protocol: Protocol) -> Vec<&SocketAddr> { + fn get_unique_leader_tpus(&self, max_count: u64, protocol: Protocol) -> Vec<&SocketAddr> { let recorder = self.poh_recorder.read().unwrap(); let leaders: Vec<_> = (0..max_count) .filter_map(|i| recorder.leader_after_n_slots(i * NUM_CONSECUTIVE_LEADER_SLOTS)) @@ -70,37 +67,23 @@ impl TpuInfo for ClusterTpuInfo { unique_leaders } - fn get_leader_tpus_with_slots( - &self, - max_count: u64, - protocol: Protocol, - ) -> Vec<(&SocketAddr, Slot)> { + fn get_leader_tpus(&self, max_count: u64, protocol: Protocol) -> Vec<&SocketAddr> { let recorder = self.poh_recorder.read().unwrap(); - let leaders: Vec<_> = (0..max_count) - .rev() - .filter_map(|future_slot| { - NUM_CONSECUTIVE_LEADER_SLOTS - .checked_mul(future_slot) - .and_then(|slots_in_the_future| { - recorder.leader_and_slot_after_n_slots(slots_in_the_future) - }) - }) + let leader_pubkeys: Vec<_> = (0..max_count) + .filter_map(|i| recorder.leader_after_n_slots(i * NUM_CONSECUTIVE_LEADER_SLOTS)) .collect(); drop(recorder); - let addrs_to_slots = leaders - .into_iter() - .filter_map(|(leader_id, leader_slot)| { + leader_pubkeys + .iter() + .filter_map(|leader_pubkey| { self.recent_peers - .get(&leader_id) - .map(|(udp_tpu, quic_tpu)| match protocol { - Protocol::UDP => (udp_tpu, leader_slot), - Protocol::QUIC => (quic_tpu, leader_slot), + .get(leader_pubkey) + .map(|addr| match protocol { + Protocol::UDP => &addr.0, + Protocol::QUIC => &addr.1, }) }) - .collect::>(); - let mut unique_leaders = Vec::from_iter(addrs_to_slots); - unique_leaders.sort_by_key(|(_addr, slot)| *slot); - unique_leaders + .collect() } } @@ -275,12 +258,12 @@ mod test { let first_leader = solana_ledger::leader_schedule_utils::slot_leader_at(slot, &bank).unwrap(); assert_eq!( - leader_info.get_leader_tpus(1, Protocol::UDP), + leader_info.get_unique_leader_tpus(1, Protocol::UDP), vec![&recent_peers.get(&first_leader).unwrap().0] ); assert_eq!( - leader_info.get_leader_tpus_with_slots(1, Protocol::UDP), - vec![(&recent_peers.get(&first_leader).unwrap().0, 0)] + leader_info.get_leader_tpus(1, Protocol::UDP), + vec![&recent_peers.get(&first_leader).unwrap().0] ); let second_leader = solana_ledger::leader_schedule_utils::slot_leader_at( @@ -294,15 +277,12 @@ mod test { ]; expected_leader_sockets.dedup(); assert_eq!( - leader_info.get_leader_tpus(2, Protocol::UDP), + leader_info.get_unique_leader_tpus(2, Protocol::UDP), expected_leader_sockets ); assert_eq!( - leader_info.get_leader_tpus_with_slots(2, Protocol::UDP), + leader_info.get_leader_tpus(2, Protocol::UDP), expected_leader_sockets - .into_iter() - .zip([0, 4]) - .collect::>() ); let third_leader = solana_ledger::leader_schedule_utils::slot_leader_at( @@ -317,26 +297,17 @@ mod test { ]; expected_leader_sockets.dedup(); assert_eq!( - leader_info.get_leader_tpus(3, Protocol::UDP), + leader_info.get_unique_leader_tpus(3, Protocol::UDP), expected_leader_sockets ); - // Only 2 leader tpus are returned always... so [0, 4, 8] isn't right here. - // This assumption is safe. After all, leader schedule generation must be deterministic. - assert_eq!( - leader_info.get_leader_tpus_with_slots(3, Protocol::UDP), - expected_leader_sockets - .into_iter() - .zip([0, 4]) - .collect::>() - ); for x in 4..8 { - assert!(leader_info.get_leader_tpus(x, Protocol::UDP).len() <= recent_peers.len()); assert!( - leader_info - .get_leader_tpus_with_slots(x, Protocol::UDP) - .len() - <= recent_peers.len() + leader_info.get_unique_leader_tpus(x, Protocol::UDP).len() <= recent_peers.len() + ); + assert_eq!( + leader_info.get_leader_tpus(x, Protocol::UDP).len(), + x as usize ); } } diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 29cce7b1a74680..b479536de0eaf5 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -121,6 +121,7 @@ use { solana_runtime::commitment::CommitmentSlots, solana_send_transaction_service::{ send_transaction_service::SendTransactionService, tpu_info::NullTpuInfo, + transaction_client::ConnectionCacheClient, }, solana_streamer::socket::SocketAddrSpace, }; @@ -379,16 +380,15 @@ impl JsonRpcRequestProcessor { .tpu(connection_cache.protocol()) .unwrap(); let (sender, receiver) = unbounded(); - SendTransactionService::new::( + + let client = ConnectionCacheClient::::new( + connection_cache.clone(), tpu_address, - &bank_forks, None, - receiver, - connection_cache, - 1000, + None, 1, - exit.clone(), ); + SendTransactionService::new(&bank_forks, receiver, client, 1000, exit.clone()); let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); let startup_verification_complete = Arc::clone(bank.get_startup_verification_complete()); @@ -4386,7 +4386,9 @@ pub mod tests { }, vote::state::VoteState, }, - solana_send_transaction_service::tpu_info::NullTpuInfo, + solana_send_transaction_service::{ + tpu_info::NullTpuInfo, transaction_client::ConnectionCacheClient, + }, solana_transaction_status::{ EncodedConfirmedBlock, EncodedTransaction, EncodedTransactionWithStatusMeta, TransactionDetails, @@ -6492,16 +6494,14 @@ pub mod tests { Arc::new(AtomicU64::default()), Arc::new(PrioritizationFeeCache::default()), ); - SendTransactionService::new::( + let client = ConnectionCacheClient::::new( + connection_cache.clone(), tpu_address, - &bank_forks, None, - receiver, - connection_cache, - 1000, + None, 1, - exit, ); + SendTransactionService::new(&bank_forks, receiver, client, 1000, exit.clone()); let mut bad_transaction = system_transaction::transfer( &mint_keypair, @@ -6766,16 +6766,15 @@ pub mod tests { Arc::new(AtomicU64::default()), Arc::new(PrioritizationFeeCache::default()), ); - SendTransactionService::new::( + let client = ConnectionCacheClient::::new( + connection_cache.clone(), tpu_address, - &bank_forks, None, - receiver, - connection_cache, - 1000, + None, 1, - exit, ); + SendTransactionService::new(&bank_forks, receiver, client, 1000, exit.clone()); + assert_eq!( request_processor.get_block_commitment(0), RpcBlockCommitment { diff --git a/rpc/src/rpc_service.rs b/rpc/src/rpc_service.rs index f388433d611923..2c582be72ea5a6 100644 --- a/rpc/src/rpc_service.rs +++ b/rpc/src/rpc_service.rs @@ -36,7 +36,10 @@ use { exit::Exit, genesis_config::DEFAULT_GENESIS_DOWNLOAD_PATH, hash::Hash, native_token::lamports_to_sol, }, - solana_send_transaction_service::send_transaction_service::{self, SendTransactionService}, + solana_send_transaction_service::{ + send_transaction_service::{self, SendTransactionService}, + transaction_client::ConnectionCacheClient, + }, solana_storage_bigtable::CredentialType, std::{ net::SocketAddr, @@ -474,15 +477,20 @@ impl JsonRpcService { let leader_info = poh_recorder.map(|recorder| ClusterTpuInfo::new(cluster_info.clone(), recorder)); - let _send_transaction_service = Arc::new(SendTransactionService::new_with_config( + let client = ConnectionCacheClient::new( + connection_cache, tpu_address, - &bank_forks, + send_transaction_service_config.tpu_peers.clone(), leader_info, + send_transaction_service_config.leader_forward_count, + ); + let _send_transaction_service = SendTransactionService::new_with_config( + &bank_forks, receiver, - connection_cache, + client, send_transaction_service_config, exit, - )); + ); #[cfg(test)] let test_request_processor = request_processor.clone(); diff --git a/send-transaction-service/Cargo.toml b/send-transaction-service/Cargo.toml index 07ad3f5a5b886c..92a73c33bc5d1a 100644 --- a/send-transaction-service/Cargo.toml +++ b/send-transaction-service/Cargo.toml @@ -10,6 +10,7 @@ license = { workspace = true } edition = { workspace = true } [dependencies] +async-trait = { workspace = true } crossbeam-channel = { workspace = true } itertools = { workspace = true } log = { workspace = true } @@ -17,13 +18,19 @@ solana-client = { workspace = true } solana-connection-cache = { workspace = true } solana-measure = { workspace = true } solana-metrics = { workspace = true } +solana-quic-client = { workspace = true } solana-runtime = { workspace = true } solana-sdk = { workspace = true } -solana-tpu-client = { workspace = true } +solana-tpu-client-next = { workspace = true } +tokio = { workspace = true, features = ["full"] } +tokio-util = { workspace = true } [dev-dependencies] solana-logger = { workspace = true } solana-runtime = { workspace = true, features = ["dev-context-only-utils"] } +[features] +dev-context-only-utils = [] + [package.metadata.docs.rs] targets = ["x86_64-unknown-linux-gnu"] diff --git a/send-transaction-service/src/lib.rs b/send-transaction-service/src/lib.rs index 960ff1cb3c90e7..5d3bfc9176705c 100644 --- a/send-transaction-service/src/lib.rs +++ b/send-transaction-service/src/lib.rs @@ -4,5 +4,8 @@ pub mod send_transaction_service_stats; pub mod tpu_info; pub mod transaction_client; +#[cfg(any(test, feature = "dev-context-only-utils"))] +pub mod test_utils; + #[macro_use] extern crate solana_metrics; diff --git a/send-transaction-service/src/send_transaction_service.rs b/send-transaction-service/src/send_transaction_service.rs index 0755cfb968b7ef..62a604cc5e1392 100644 --- a/send-transaction-service/src/send_transaction_service.rs +++ b/send-transaction-service/src/send_transaction_service.rs @@ -3,13 +3,11 @@ use { send_transaction_service_stats::{ SendTransactionServiceStats, SendTransactionServiceStatsReport, }, - tpu_info::TpuInfo, - transaction_client::{ConnectionCacheClient, TransactionClient}, + transaction_client::TransactionClient, }, crossbeam_channel::{Receiver, RecvTimeoutError}, itertools::Itertools, log::*, - solana_client::connection_cache::ConnectionCache, solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_sdk::{ hash::Hash, nonce_account, pubkey::Pubkey, saturating_add_assign, signature::Signature, @@ -140,38 +138,24 @@ impl Default for Config { pub const MAX_RETRY_SLEEP_MS: u64 = 1000; impl SendTransactionService { - pub fn new( - tpu_address: SocketAddr, + pub fn new( bank_forks: &Arc>, - leader_info: Option, receiver: Receiver, - connection_cache: Arc, + client: Client, retry_rate_ms: u64, - leader_forward_count: u64, exit: Arc, ) -> Self { let config = Config { retry_rate_ms, - leader_forward_count, ..Config::default() }; - Self::new_with_config( - tpu_address, - bank_forks, - leader_info, - receiver, - connection_cache, - config, - exit, - ) + Self::new_with_config::(bank_forks, receiver, client, config, exit) } - pub fn new_with_config( - tpu_address: SocketAddr, + pub fn new_with_config( bank_forks: &Arc>, - leader_info: Option, receiver: Receiver, - connection_cache: Arc, + client: Client, config: Config, exit: Arc, ) -> Self { @@ -179,14 +163,6 @@ impl SendTransactionService { let retry_transactions = Arc::new(Mutex::new(HashMap::new())); - let client = ConnectionCacheClient::new( - connection_cache, - tpu_address, - config.tpu_peers, - leader_info, - config.leader_forward_count, - ); - let receive_txn_thread = Self::receive_txn_thread( receiver, client.clone(), @@ -217,9 +193,9 @@ impl SendTransactionService { } /// Thread responsible for receiving transactions from RPC clients. - fn receive_txn_thread( + fn receive_txn_thread( receiver: Receiver, - client: ConnectionCacheClient, + client: Client, retry_transactions: Arc>>, stats_report: Arc, batch_send_rate_ms: u64, @@ -317,9 +293,9 @@ impl SendTransactionService { } /// Thread responsible for retrying transactions - fn retry_thread( + fn retry_thread( bank_forks: Arc>, - client: ConnectionCacheClient, + client: Client, retry_transactions: Arc>>, retry_rate_ms: u64, service_max_retries: usize, @@ -368,11 +344,11 @@ impl SendTransactionService { } /// Retry transactions sent before. - fn process_transactions( + fn process_transactions( working_bank: &Bank, root_bank: &Bank, transactions: &mut HashMap, - client: &ConnectionCacheClient, + client: &Client, retry_rate_ms: u64, service_max_retries: usize, default_max_retries: Option, @@ -498,7 +474,11 @@ impl SendTransactionService { mod test { use { super::*, - crate::tpu_info::NullTpuInfo, + crate::{ + test_utils::ClientWithCreator, + tpu_info::NullTpuInfo, + transaction_client::{ConnectionCacheClient, TpuClientNextClient}, + }, crossbeam_channel::{bounded, unbounded}, solana_sdk::{ account::AccountSharedData, @@ -509,34 +489,41 @@ mod test { system_program, system_transaction, }, std::ops::Sub, + tokio::runtime::Handle, }; - #[test] - fn service_exit() { - let tpu_address = "127.0.0.1:0".parse().unwrap(); + fn service_exit(maybe_runtime: Option) { let bank = Bank::default_for_tests(); let bank_forks = BankForks::new_rw_arc(bank); let (sender, receiver) = unbounded(); - let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); - let send_transaction_service = SendTransactionService::new::( - tpu_address, + let client = C::create_client(maybe_runtime, "127.0.0.1:0".parse().unwrap(), None, 1); + + let send_transaction_service = SendTransactionService::new( &bank_forks, - None, receiver, - connection_cache, + client.clone(), 1000, - 1, Arc::new(AtomicBool::new(false)), ); drop(sender); send_transaction_service.join().unwrap(); + client.cancel(); } #[test] - fn validator_exit() { - let tpu_address = "127.0.0.1:0".parse().unwrap(); + fn service_exit_with_connection_cache() { + service_exit::>(None); + } + + #[tokio::test(flavor = "multi_thread")] + + async fn service_exit_with_tpu_client_next() { + service_exit::>(Some(Handle::current())); + } + + fn validator_exit(maybe_runtime: Option) { let bank = Bank::default_for_tests(); let bank_forks = BankForks::new_rw_arc(bank); let (sender, receiver) = bounded(0); @@ -552,22 +539,15 @@ mod test { }; let exit = Arc::new(AtomicBool::new(false)); - let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); - let _send_transaction_service = SendTransactionService::new::( - tpu_address, - &bank_forks, - None, - receiver, - connection_cache, - 1000, - 1, - exit.clone(), - ); + let client = C::create_client(maybe_runtime, "127.0.0.1:0".parse().unwrap(), None, 1); + let _send_transaction_service = + SendTransactionService::new(&bank_forks, receiver, client.clone(), 1000, exit.clone()); sender.send(dummy_tx_info()).unwrap(); thread::spawn(move || { exit.store(true, Ordering::Relaxed); + client.cancel(); }); let mut option = Ok(()); @@ -576,33 +556,25 @@ mod test { } } - fn create_client( - tpu_peers: Option>, - leader_forward_count: u64, - ) -> ConnectionCacheClient { - let tpu_address = "127.0.0.1:0".parse().unwrap(); - let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); - - ConnectionCacheClient::new( - connection_cache, - tpu_address, - tpu_peers, - None, - leader_forward_count, - ) + #[test] + fn validator_exit_with_connection_cache() { + validator_exit::>(None); } - #[test] - fn process_transactions() { + #[tokio::test(flavor = "multi_thread")] + async fn validator_exit_with_tpu_client_next() { + validator_exit::>(Some(Handle::current())); + } + + fn process_transactions(maybe_runtime: Option) { solana_logger::setup(); let (mut genesis_config, mint_keypair) = create_genesis_config(4); genesis_config.fee_rate_governor = solana_sdk::fee_calculator::FeeRateGovernor::new(0, 0); let (_, bank_forks) = Bank::new_with_bank_forks_for_tests(&genesis_config); - let config = Config { - leader_forward_count: 1, - ..Config::default() - }; + + let leader_forward_count = 1; + let config = Config::default(); let root_bank = Bank::new_from_parent( bank_forks.read().unwrap().working_bank(), @@ -658,8 +630,13 @@ mod test { ), ); - let client = create_client(config.tpu_peers, config.leader_forward_count); - let result = SendTransactionService::process_transactions::( + let client = C::create_client( + maybe_runtime, + "127.0.0.1:0".parse().unwrap(), + config.tpu_peers, + leader_forward_count, + ); + let result = SendTransactionService::process_transactions( &working_bank, &root_bank, &mut transactions, @@ -691,7 +668,7 @@ mod test { Some(Instant::now()), ), ); - let result = SendTransactionService::process_transactions::( + let result = SendTransactionService::process_transactions( &working_bank, &root_bank, &mut transactions, @@ -723,7 +700,7 @@ mod test { Some(Instant::now()), ), ); - let result = SendTransactionService::process_transactions::( + let result = SendTransactionService::process_transactions( &working_bank, &root_bank, &mut transactions, @@ -755,7 +732,7 @@ mod test { Some(Instant::now()), ), ); - let result = SendTransactionService::process_transactions::( + let result = SendTransactionService::process_transactions( &working_bank, &root_bank, &mut transactions, @@ -789,7 +766,7 @@ mod test { ), ); - let result = SendTransactionService::process_transactions::( + let result = SendTransactionService::process_transactions( &working_bank, &root_bank, &mut transactions, @@ -833,7 +810,7 @@ mod test { Some(Instant::now().sub(Duration::from_millis(4000))), ), ); - let result = SendTransactionService::process_transactions::( + let result = SendTransactionService::process_transactions( &working_bank, &root_bank, &mut transactions, @@ -853,7 +830,7 @@ mod test { ..ProcessTransactionsResult::default() } ); - let result = SendTransactionService::process_transactions::( + let result = SendTransactionService::process_transactions( &working_bank, &root_bank, &mut transactions, @@ -872,19 +849,27 @@ mod test { ..ProcessTransactionsResult::default() } ); + client.cancel(); } #[test] - fn test_retry_durable_nonce_transactions() { + fn process_transactions_with_connection_cache() { + process_transactions::>(None); + } + + #[tokio::test(flavor = "multi_thread")] + async fn process_transactions_with_tpu_client_next() { + process_transactions::>(Some(Handle::current())); + } + + fn retry_durable_nonce_transactions(maybe_runtime: Option) { solana_logger::setup(); let (mut genesis_config, mint_keypair) = create_genesis_config(4); genesis_config.fee_rate_governor = solana_sdk::fee_calculator::FeeRateGovernor::new(0, 0); let (_, bank_forks) = Bank::new_with_bank_forks_for_tests(&genesis_config); - let config = Config { - leader_forward_count: 1, - ..Config::default() - }; + let leader_forward_count = 1; + let config = Config::default(); let root_bank = Bank::new_from_parent( bank_forks.read().unwrap().working_bank(), @@ -949,8 +934,13 @@ mod test { ), ); let stats = SendTransactionServiceStats::default(); - let client = create_client(config.tpu_peers, config.leader_forward_count); - let result = SendTransactionService::process_transactions::( + let client = C::create_client( + maybe_runtime, + "127.0.0.1:0".parse().unwrap(), + config.tpu_peers, + leader_forward_count, + ); + let result = SendTransactionService::process_transactions( &working_bank, &root_bank, &mut transactions, @@ -981,7 +971,7 @@ mod test { Some(Instant::now()), ), ); - let result = SendTransactionService::process_transactions::( + let result = SendTransactionService::process_transactions( &working_bank, &root_bank, &mut transactions, @@ -1014,7 +1004,7 @@ mod test { Some(Instant::now().sub(Duration::from_millis(4000))), ), ); - let result = SendTransactionService::process_transactions::( + let result = SendTransactionService::process_transactions( &working_bank, &root_bank, &mut transactions, @@ -1045,7 +1035,7 @@ mod test { Some(Instant::now()), ), ); - let result = SendTransactionService::process_transactions::( + let result = SendTransactionService::process_transactions( &working_bank, &root_bank, &mut transactions, @@ -1077,7 +1067,7 @@ mod test { Some(Instant::now()), ), ); - let result = SendTransactionService::process_transactions::( + let result = SendTransactionService::process_transactions( &working_bank, &root_bank, &mut transactions, @@ -1109,7 +1099,7 @@ mod test { Some(Instant::now()), ), ); - let result = SendTransactionService::process_transactions::( + let result = SendTransactionService::process_transactions( &working_bank, &root_bank, &mut transactions, @@ -1143,7 +1133,7 @@ mod test { Some(Instant::now().sub(Duration::from_millis(4000))), ), ); - let result = SendTransactionService::process_transactions::( + let result = SendTransactionService::process_transactions( &working_bank, &root_bank, &mut transactions, @@ -1174,7 +1164,7 @@ mod test { let nonce_account = AccountSharedData::new_data(43, &new_nonce_state, &system_program::id()).unwrap(); working_bank.store_account(&nonce_address, &nonce_account); - let result = SendTransactionService::process_transactions::( + let result = SendTransactionService::process_transactions( &working_bank, &root_bank, &mut transactions, @@ -1193,5 +1183,18 @@ mod test { ..ProcessTransactionsResult::default() } ); + client.cancel(); + } + + #[test] + fn retry_durable_nonce_transactions_with_connection_cache() { + retry_durable_nonce_transactions::>(None); + } + + #[tokio::test(flavor = "multi_thread")] + async fn retry_durable_nonce_transactions_with_tpu_client_next() { + retry_durable_nonce_transactions::>(Some( + Handle::current(), + )); } } diff --git a/send-transaction-service/src/test_utils.rs b/send-transaction-service/src/test_utils.rs new file mode 100644 index 00000000000000..7df6b3dec95b38 --- /dev/null +++ b/send-transaction-service/src/test_utils.rs @@ -0,0 +1,95 @@ +//! This module contains functionality required to create tests parameterized +//! with the client type. + +use { + crate::{ + tpu_info::NullTpuInfo, + transaction_client::{ + ConnectionCacheClient, TpuClientNextClient, TpuInfoWithSendStatic, TransactionClient, + }, + }, + solana_client::connection_cache::ConnectionCache, + std::{net::SocketAddr, sync::Arc}, + tokio::runtime::Handle, +}; + +// `maybe_runtime` argument is introduced to be able to use runtime from test +// for the TpuClientNext, while ConnectionCache uses runtime created internally +// in the quic-client module and it is impossible to pass test runtime there. +pub trait CreateClient: TransactionClient { + fn create_client( + maybe_runtime: Option, + my_tpu_address: SocketAddr, + tpu_peers: Option>, + leader_forward_count: u64, + ) -> Self; +} + +impl CreateClient for ConnectionCacheClient { + fn create_client( + maybe_runtime: Option, + my_tpu_address: SocketAddr, + tpu_peers: Option>, + leader_forward_count: u64, + ) -> Self { + assert!(maybe_runtime.is_none()); + let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); + ConnectionCacheClient::new( + connection_cache, + my_tpu_address, + tpu_peers, + None, + leader_forward_count, + ) + } +} + +impl CreateClient for TpuClientNextClient { + fn create_client( + maybe_runtime: Option, + my_tpu_address: SocketAddr, + tpu_peers: Option>, + leader_forward_count: u64, + ) -> Self { + let runtime_handle = + maybe_runtime.expect("Runtime should be provided for the TpuClientNextClient."); + Self::new( + runtime_handle, + my_tpu_address, + tpu_peers, + None, + leader_forward_count, + None, + ) + } +} + +pub trait Cancelable { + fn cancel(&self); +} + +impl Cancelable for ConnectionCacheClient +where + T: TpuInfoWithSendStatic, +{ + fn cancel(&self) {} +} + +impl Cancelable for TpuClientNextClient +where + T: TpuInfoWithSendStatic + Clone, +{ + fn cancel(&self) { + self.cancel().unwrap(); + } +} + +// Define type alias to simplify definition of test functions. +pub trait ClientWithCreator: + CreateClient + TransactionClient + Cancelable + Send + Clone + 'static +{ +} +impl ClientWithCreator for T where + T: CreateClient + TransactionClient + Cancelable + Send + Clone + 'static +{ +} diff --git a/send-transaction-service/src/tpu_info.rs b/send-transaction-service/src/tpu_info.rs index 2eddfc33891507..faeae7bb041f60 100644 --- a/send-transaction-service/src/tpu_info.rs +++ b/send-transaction-service/src/tpu_info.rs @@ -1,17 +1,24 @@ -use { - solana_connection_cache::connection_cache::Protocol, solana_sdk::clock::Slot, - std::net::SocketAddr, -}; +use {solana_connection_cache::connection_cache::Protocol, std::net::SocketAddr}; +/// A trait to abstract out the leader estimation for the SendTransactionService. pub trait TpuInfo { fn refresh_recent_peers(&mut self); + /// Takes `max_count` which specifies how many leaders per + /// `NUM_CONSECUTIVE_LEADER_SLOTS` we want to receive and returns *unique* + /// TPU socket addresses for these leaders. + /// + /// For example, if leader schedule was `[L1, L1, L1, L1, L2, L2, L2, L2, + /// L1, ...]` it will return `[L1, L2]` (the last L1 will be not added to + /// the result). + fn get_unique_leader_tpus(&self, max_count: u64, protocol: Protocol) -> Vec<&SocketAddr>; + + /// Takes `max_count` which specifies how many leaders per + /// `NUM_CONSECUTIVE_LEADER_SLOTS` we want to receive and returns TPU socket + /// addresses for these leaders. + /// + /// For example, if leader schedule was `[L1, L1, L1, L1, L2, L2, L2, L2, + /// L1, ...]` it will return `[L1, L2, L1]`. fn get_leader_tpus(&self, max_count: u64, protocol: Protocol) -> Vec<&SocketAddr>; - /// In addition to the tpu address, also return the leader slot - fn get_leader_tpus_with_slots( - &self, - max_count: u64, - protocol: Protocol, - ) -> Vec<(&SocketAddr, Slot)>; } #[derive(Clone)] @@ -19,14 +26,10 @@ pub struct NullTpuInfo; impl TpuInfo for NullTpuInfo { fn refresh_recent_peers(&mut self) {} - fn get_leader_tpus(&self, _max_count: u64, _protocol: Protocol) -> Vec<&SocketAddr> { + fn get_unique_leader_tpus(&self, _max_count: u64, _protocol: Protocol) -> Vec<&SocketAddr> { vec![] } - fn get_leader_tpus_with_slots( - &self, - _max_count: u64, - _protocol: Protocol, - ) -> Vec<(&SocketAddr, Slot)> { + fn get_leader_tpus(&self, _max_count: u64, _protocol: Protocol) -> Vec<&SocketAddr> { vec![] } } diff --git a/send-transaction-service/src/transaction_client.rs b/send-transaction-service/src/transaction_client.rs index d7910b2b8609c3..29d1c3c3411944 100644 --- a/send-transaction-service/src/transaction_client.rs +++ b/send-transaction-service/src/transaction_client.rs @@ -1,25 +1,47 @@ use { crate::{send_transaction_service_stats::SendTransactionServiceStats, tpu_info::TpuInfo}, + async_trait::async_trait, log::warn, - solana_client::connection_cache::ConnectionCache, + solana_client::connection_cache::{ConnectionCache, Protocol}, solana_connection_cache::client_connection::ClientConnection as TpuConnection, solana_measure::measure::Measure, + solana_sdk::signature::Keypair, + solana_tpu_client_next::{ + connection_workers_scheduler::{ + ConnectionWorkersSchedulerConfig, Fanout, TransactionStatsAndReceiver, + }, + leader_updater::LeaderUpdater, + transaction_batch::TransactionBatch, + ConnectionWorkersScheduler, ConnectionWorkersSchedulerError, + }, std::{ - net::SocketAddr, + net::{Ipv4Addr, SocketAddr}, sync::{atomic::Ordering, Arc, Mutex}, time::{Duration, Instant}, }, + tokio::{ + runtime::Handle, + sync::mpsc::{self}, + task::JoinHandle, + }, + tokio_util::sync::CancellationToken, }; +// Alias trait to shorten function definitions. +pub trait TpuInfoWithSendStatic: TpuInfo + std::marker::Send + 'static {} +impl TpuInfoWithSendStatic for T where T: TpuInfo + std::marker::Send + 'static {} + pub trait TransactionClient { fn send_transactions_in_batch( &self, wire_transactions: Vec>, stats: &SendTransactionServiceStats, ); + + fn protocol(&self) -> Protocol; } -pub struct ConnectionCacheClient { +pub struct ConnectionCacheClient { connection_cache: Arc, tpu_address: SocketAddr, tpu_peers: Option>, @@ -27,10 +49,10 @@ pub struct ConnectionCacheClient { leader_forward_count: u64, } -// Manual implementation of Clone without requiring T to be Clone +// Manual implementation of Clone to avoid requiring T to be Clone impl Clone for ConnectionCacheClient where - T: TpuInfo + std::marker::Send + 'static, + T: TpuInfoWithSendStatic, { fn clone(&self) -> Self { Self { @@ -45,7 +67,7 @@ where impl ConnectionCacheClient where - T: TpuInfo + std::marker::Send + 'static, + T: TpuInfoWithSendStatic, { pub fn new( connection_cache: Arc, @@ -64,11 +86,13 @@ where } } - fn get_tpu_addresses<'a>(&'a self, leader_info: Option<&'a T>) -> Vec<&'a SocketAddr> { + fn get_unique_tpu_addresses<'a>(&'a self, leader_info: Option<&'a T>) -> Vec<&'a SocketAddr> { leader_info .map(|leader_info| { - leader_info - .get_leader_tpus(self.leader_forward_count, self.connection_cache.protocol()) + leader_info.get_unique_leader_tpus( + self.leader_forward_count, + self.connection_cache.protocol(), + ) }) .filter(|addresses| !addresses.is_empty()) .unwrap_or_else(|| vec![&self.tpu_address]) @@ -100,7 +124,7 @@ where impl TransactionClient for ConnectionCacheClient where - T: TpuInfo + std::marker::Send + 'static, + T: TpuInfoWithSendStatic, { fn send_transactions_in_batch( &self, @@ -115,13 +139,182 @@ where .unwrap_or_default(); let mut leader_info_provider = self.leader_info_provider.lock().unwrap(); let leader_info = leader_info_provider.get_leader_info(); - let leader_addresses = self.get_tpu_addresses(leader_info); + let leader_addresses = self.get_unique_tpu_addresses(leader_info); addresses.extend(leader_addresses); for address in &addresses { self.send_transactions(address, wire_transactions.clone(), stats); } } + + fn protocol(&self) -> Protocol { + self.connection_cache.protocol() + } +} + +#[derive(Clone)] +pub struct SendTransactionServiceLeaderUpdater { + leader_info_provider: CurrentLeaderInfo, + my_tpu_address: SocketAddr, + tpu_peers: Option>, +} + +#[async_trait] +impl LeaderUpdater for SendTransactionServiceLeaderUpdater +where + T: TpuInfoWithSendStatic, +{ + fn next_leaders(&mut self, lookahead_leaders: usize) -> Vec { + let discovered_peers = self + .leader_info_provider + .get_leader_info() + .map(|leader_info| { + leader_info.get_leader_tpus(lookahead_leaders as u64, Protocol::QUIC) + }) + .filter(|addresses| !addresses.is_empty()) + .unwrap_or_else(|| vec![&self.my_tpu_address]); + let mut all_peers = self.tpu_peers.clone().unwrap_or_default(); + all_peers.extend(discovered_peers.into_iter().cloned()); + all_peers + } + async fn stop(&mut self) {} +} + +type TpuClientJoinHandle = + JoinHandle>; + +/// `TpuClientNextClient` provides an interface for managing the +/// [`ConnectionWorkersScheduler`]. +/// +/// It allows: +/// * Create and initializes the scheduler with runtime configurations, +/// * Send transactions to the connection scheduler, +/// * Update the validator identity keypair and propagate the changes to the +/// scheduler. Most of the complexity of this structure arises from this +/// functionality. +/// +#[allow( + dead_code, + reason = "Unused fields will be utilized soon,\ + added in advance to avoid larger changes in the code." +)] +#[derive(Clone)] +pub struct TpuClientNextClient +where + T: TpuInfoWithSendStatic + Clone, +{ + runtime_handle: Handle, + sender: mpsc::Sender, + // This handle is needed to implement `NotifyKeyUpdate` trait. It's only + // method takes &self and thus we need to wrap with Mutex. + join_and_cancel: Arc, CancellationToken)>>, + leader_updater: SendTransactionServiceLeaderUpdater, + leader_forward_count: u64, +} + +impl TpuClientNextClient +where + T: TpuInfoWithSendStatic + Clone, +{ + pub fn new( + runtime_handle: Handle, + my_tpu_address: SocketAddr, + tpu_peers: Option>, + leader_info: Option, + leader_forward_count: u64, + identity: Option, + ) -> Self + where + T: TpuInfoWithSendStatic + Clone, + { + // The channel size represents 8s worth of transactions at a rate of + // 1000 tps, assuming batch size is 64. + let (sender, receiver) = mpsc::channel(128); + + let cancel = CancellationToken::new(); + + let leader_info_provider = CurrentLeaderInfo::new(leader_info); + let leader_updater: SendTransactionServiceLeaderUpdater = + SendTransactionServiceLeaderUpdater { + leader_info_provider, + my_tpu_address, + tpu_peers, + }; + let config = Self::create_config(identity, leader_forward_count as usize); + let handle = runtime_handle.spawn(ConnectionWorkersScheduler::run( + config, + Box::new(leader_updater.clone()), + receiver, + cancel.clone(), + )); + + Self { + runtime_handle, + join_and_cancel: Arc::new(Mutex::new((Some(handle), cancel))), + sender, + leader_updater, + leader_forward_count, + } + } + + fn create_config( + identity: Option, + leader_forward_count: usize, + ) -> ConnectionWorkersSchedulerConfig { + ConnectionWorkersSchedulerConfig { + bind: SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0), + identity, + // to match MAX_CONNECTIONS from ConnectionCache + num_connections: 1024, + skip_check_transaction_age: true, + // experimentally found parameter values + worker_channel_size: 64, + max_reconnect_attempts: 4, + leaders_fanout: Fanout { + connect: leader_forward_count, + send: leader_forward_count, + }, + } + } + + #[cfg(any(test, feature = "dev-context-only-utils"))] + pub fn cancel(&self) -> Result<(), Box> { + let Ok(lock) = self.join_and_cancel.lock() else { + return Err("Failed to stop scheduler: TpuClientNext task panicked.".into()); + }; + lock.1.cancel(); + Ok(()) + } +} + +impl TransactionClient for TpuClientNextClient +where + T: TpuInfoWithSendStatic + Clone, +{ + fn send_transactions_in_batch( + &self, + wire_transactions: Vec>, + stats: &SendTransactionServiceStats, + ) { + let mut measure = Measure::start("send-us"); + self.runtime_handle.spawn({ + let sender = self.sender.clone(); + async move { + let res = sender.send(TransactionBatch::new(wire_transactions)).await; + if res.is_err() { + warn!("Failed to send transaction to channel: it is closed."); + } + } + }); + + measure.stop(); + stats.send_us.fetch_add(measure.as_us(), Ordering::Relaxed); + stats.send_attempt_count.fetch_add(1, Ordering::Relaxed); + } + + fn protocol(&self) -> Protocol { + Protocol::QUIC + } } /// The leader info refresh rate. @@ -129,9 +322,10 @@ pub const LEADER_INFO_REFRESH_RATE_MS: u64 = 1000; /// A struct responsible for holding up-to-date leader information /// used for sending transactions. +#[derive(Clone)] pub(crate) struct CurrentLeaderInfo where - T: TpuInfo + std::marker::Send + 'static, + T: TpuInfoWithSendStatic, { /// The last time the leader info was refreshed last_leader_refresh: Option, @@ -145,7 +339,7 @@ where impl CurrentLeaderInfo where - T: TpuInfo + std::marker::Send + 'static, + T: TpuInfoWithSendStatic, { /// Get the leader info, refresh if expired pub fn get_leader_info(&mut self) -> Option<&T> { diff --git a/svm/examples/Cargo.lock b/svm/examples/Cargo.lock index 8c853d0f446672..676550199602c1 100644 --- a/svm/examples/Cargo.lock +++ b/svm/examples/Cargo.lock @@ -6668,6 +6668,7 @@ dependencies = [ name = "solana-send-transaction-service" version = "2.2.0" dependencies = [ + "async-trait", "crossbeam-channel", "itertools 0.12.1", "log", @@ -6675,9 +6676,12 @@ dependencies = [ "solana-connection-cache", "solana-measure", "solana-metrics", + "solana-quic-client", "solana-runtime", "solana-sdk", - "solana-tpu-client", + "solana-tpu-client-next", + "tokio", + "tokio-util 0.7.12", ] [[package]] @@ -7089,6 +7093,27 @@ dependencies = [ "tokio", ] +[[package]] +name = "solana-tpu-client-next" +version = "2.2.0" +dependencies = [ + "async-trait", + "log", + "lru", + "quinn", + "rustls 0.23.19", + "solana-connection-cache", + "solana-logger", + "solana-measure", + "solana-rpc-client", + "solana-sdk", + "solana-streamer", + "solana-tpu-client", + "thiserror 2.0.3", + "tokio", + "tokio-util 0.7.12", +] + [[package]] name = "solana-transaction-context" version = "2.2.0" diff --git a/tpu-client-next/src/connection_workers_scheduler.rs b/tpu-client-next/src/connection_workers_scheduler.rs index 1f88a1e4593a31..a2903216f0bc9c 100644 --- a/tpu-client-next/src/connection_workers_scheduler.rs +++ b/tpu-client-next/src/connection_workers_scheduler.rs @@ -70,7 +70,7 @@ pub struct ConnectionWorkersSchedulerConfig { /// Optional stake identity keypair used in the endpoint certificate for /// identifying the sender. - pub stake_identity: Option, + pub identity: Option, /// The number of connections to be maintained by the scheduler. pub num_connections: usize, @@ -90,6 +90,11 @@ pub struct ConnectionWorkersSchedulerConfig { pub leaders_fanout: Fanout, } +pub type TransactionStatsAndReceiver = ( + SendTransactionStatsPerAddr, + mpsc::Receiver, +); + impl ConnectionWorkersScheduler { /// Starts the scheduler, which manages the distribution of transactions to /// the network's upcoming leaders. @@ -103,7 +108,7 @@ impl ConnectionWorkersScheduler { pub async fn run( ConnectionWorkersSchedulerConfig { bind, - stake_identity: validator_identity, + identity, num_connections, skip_check_transaction_age, worker_channel_size, @@ -113,14 +118,14 @@ impl ConnectionWorkersScheduler { mut leader_updater: Box, mut transaction_receiver: mpsc::Receiver, cancel: CancellationToken, - ) -> Result { - let endpoint = Self::setup_endpoint(bind, validator_identity)?; + ) -> Result { + let endpoint = Self::setup_endpoint(bind, identity.as_ref())?; debug!("Client endpoint bind address: {:?}", endpoint.local_addr()); let mut workers = WorkersCache::new(num_connections, cancel.clone()); let mut send_stats_per_addr = SendTransactionStatsPerAddr::new(); loop { - let transaction_batch = tokio::select! { + let transaction_batch: TransactionBatch = tokio::select! { recv_res = transaction_receiver.recv() => match recv_res { Some(txs) => txs, None => { @@ -184,19 +189,15 @@ impl ConnectionWorkersScheduler { endpoint.close(0u32.into(), b"Closing connection"); leader_updater.stop().await; - Ok(send_stats_per_addr) + Ok((send_stats_per_addr, transaction_receiver)) } /// Sets up the QUIC endpoint for the scheduler to handle connections. fn setup_endpoint( bind: SocketAddr, - validator_identity: Option, + identity: Option<&Keypair>, ) -> Result { - let client_certificate = if let Some(validator_identity) = validator_identity { - Arc::new(QuicClientCertificate::new(&validator_identity)) - } else { - Arc::new(QuicClientCertificate::new(&Keypair::new())) - }; + let client_certificate = QuicClientCertificate::new(identity); let client_config = create_client_config(client_certificate); let endpoint = create_client_endpoint(bind, client_config)?; Ok(endpoint) diff --git a/tpu-client-next/src/quic_networking.rs b/tpu-client-next/src/quic_networking.rs index b18fa469241da9..4de596ca2e3bdb 100644 --- a/tpu-client-next/src/quic_networking.rs +++ b/tpu-client-next/src/quic_networking.rs @@ -20,7 +20,7 @@ pub use { quic_client_certificate::QuicClientCertificate, }; -pub(crate) fn create_client_config(client_certificate: Arc) -> ClientConfig { +pub(crate) fn create_client_config(client_certificate: QuicClientCertificate) -> ClientConfig { // adapted from QuicLazyInitializedEndpoint::create_endpoint let mut crypto = rustls::ClientConfig::builder() .dangerous() diff --git a/tpu-client-next/src/quic_networking/quic_client_certificate.rs b/tpu-client-next/src/quic_networking/quic_client_certificate.rs index b9f0c8d1cf27a6..b9d2eca4a51f4f 100644 --- a/tpu-client-next/src/quic_networking/quic_client_certificate.rs +++ b/tpu-client-next/src/quic_networking/quic_client_certificate.rs @@ -10,8 +10,13 @@ pub struct QuicClientCertificate { } impl QuicClientCertificate { - pub fn new(keypair: &Keypair) -> Self { - let (certificate, key) = new_dummy_x509_certificate(keypair); - Self { certificate, key } + pub fn new(keypair: Option<&Keypair>) -> Self { + if let Some(keypair) = keypair { + let (certificate, key) = new_dummy_x509_certificate(keypair); + Self { certificate, key } + } else { + let (certificate, key) = new_dummy_x509_certificate(&Keypair::new()); + Self { certificate, key } + } } } diff --git a/tpu-client-next/tests/connection_workers_scheduler_test.rs b/tpu-client-next/tests/connection_workers_scheduler_test.rs index 8a8d623ab8da3f..1da2a534701138 100644 --- a/tpu-client-next/tests/connection_workers_scheduler_test.rs +++ b/tpu-client-next/tests/connection_workers_scheduler_test.rs @@ -16,11 +16,13 @@ use { streamer::StakedNodes, }, solana_tpu_client_next::{ - connection_workers_scheduler::{ConnectionWorkersSchedulerConfig, Fanout}, + connection_workers_scheduler::{ + ConnectionWorkersSchedulerConfig, Fanout, TransactionStatsAndReceiver, + }, leader_updater::create_leader_updater, send_transaction_stats::SendTransactionStatsNonAtomic, transaction_batch::TransactionBatch, - ConnectionWorkersScheduler, ConnectionWorkersSchedulerError, SendTransactionStatsPerAddr, + ConnectionWorkersScheduler, ConnectionWorkersSchedulerError, }, std::{ collections::HashMap, @@ -41,10 +43,10 @@ use { tokio_util::sync::CancellationToken, }; -fn test_config(validator_identity: Option) -> ConnectionWorkersSchedulerConfig { +fn test_config(identity: Option) -> ConnectionWorkersSchedulerConfig { ConnectionWorkersSchedulerConfig { bind: SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 0), - stake_identity: validator_identity, + identity, num_connections: 1, skip_check_transaction_age: false, // At the moment we have only one strategy to send transactions: we try @@ -64,9 +66,9 @@ fn test_config(validator_identity: Option) -> ConnectionWorkersSchedule async fn setup_connection_worker_scheduler( tpu_address: SocketAddr, transaction_receiver: Receiver, - validator_identity: Option, + identity: Option, ) -> ( - JoinHandle>, + JoinHandle>, CancellationToken, ) { let json_rpc_url = "http://127.0.0.1:8899"; @@ -83,7 +85,7 @@ async fn setup_connection_worker_scheduler( .expect("Leader updates was successfully created"); let cancel = CancellationToken::new(); - let config = test_config(validator_identity); + let config = test_config(identity); let scheduler = tokio::spawn(ConnectionWorkersScheduler::run( config, leader_updater, @@ -96,10 +98,10 @@ async fn setup_connection_worker_scheduler( async fn join_scheduler( scheduler_handle: JoinHandle< - Result, + Result, >, ) -> SendTransactionStatsNonAtomic { - let stats_per_ip = scheduler_handle + let (stats_per_ip, _) = scheduler_handle .await .unwrap() .expect("Scheduler should stop successfully."); @@ -401,8 +403,8 @@ async fn test_connection_pruned_and_reopened() { /// connection and verify that all the txs has been received. #[tokio::test] async fn test_staked_connection() { - let validator_identity = Keypair::new(); - let stakes = HashMap::from([(validator_identity.pubkey(), 100_000)]); + let identity = Keypair::new(); + let stakes = HashMap::from([(identity.pubkey(), 100_000)]); let staked_nodes = StakedNodes::new(Arc::new(stakes), HashMap::::default()); let SpawnTestServerResult { @@ -433,8 +435,7 @@ async fn test_staked_connection() { } = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(100)); let (scheduler_handle, _scheduler_cancel) = - setup_connection_worker_scheduler(server_address, tx_receiver, Some(validator_identity)) - .await; + setup_connection_worker_scheduler(server_address, tx_receiver, Some(identity)).await; // Check results let actual_num_packets = count_received_packets_for(receiver, tx_size, TEST_MAX_TIME).await; @@ -534,7 +535,7 @@ async fn test_no_host() { // While attempting to establish a connection with a nonexistent host, we fill the worker's // channel. - let stats = scheduler_handle + let (stats, _) = scheduler_handle .await .expect("Scheduler should stop successfully") .expect("Scheduler execution was successful");