From 2a618b5e01d898af63e0922454b6b0f0eed93528 Mon Sep 17 00:00:00 2001 From: kirill lykov Date: Thu, 7 Nov 2024 17:46:14 +0100 Subject: [PATCH] add fanout to tpu-client-next (#3478) * Add tpu-client-next to the root Cargo.toml * Change LeaderUpdater trait to accept mut self * add fanout to the tpu-client-next * Shutdown in separate task * Use try_send instead, minor impromenets * fix LeaderUpdaterError traits * improve lifetimes in split_leaders Co-authored-by: Illia Bobyr * address PR comments * create connections in advance * removed lookahead_slots --------- Co-authored-by: Illia Bobyr --- Cargo.toml | 1 + tpu-client-next/src/connection_worker.rs | 26 +-- .../src/connection_workers_scheduler.rs | 122 ++++++++---- tpu-client-next/src/leader_updater.rs | 15 +- tpu-client-next/src/send_transaction_stats.rs | 186 +++++++++++------- tpu-client-next/src/workers_cache.rs | 168 ++++++++-------- .../connection_workers_scheduler_test.rs | 42 ++-- 7 files changed, 336 insertions(+), 224 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8778a641a570dd..b7f09cf260278a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -539,6 +539,7 @@ solana-test-validator = { path = "test-validator", version = "=2.2.0" } solana-thin-client = { path = "thin-client", version = "=2.2.0" } solana-transaction-error = { path = "sdk/transaction-error", version = "=2.2.0" } solana-tpu-client = { path = "tpu-client", version = "=2.2.0", default-features = false } +solana-tpu-client-next = { path = "tpu-client-next", version = "=2.2.0" } solana-transaction-status = { path = "transaction-status", version = "=2.2.0" } solana-transaction-status-client-types = { path = "transaction-status-client-types", version = "=2.2.0" } solana-transaction-metrics-tracker = { path = "transaction-metrics-tracker", version = "=2.2.0" } diff --git a/tpu-client-next/src/connection_worker.rs b/tpu-client-next/src/connection_worker.rs index 7d77bc3f6ed2a2..25d2a087c0f8e6 100644 --- a/tpu-client-next/src/connection_worker.rs +++ b/tpu-client-next/src/connection_worker.rs @@ -14,7 +14,10 @@ use { clock::{DEFAULT_MS_PER_SLOT, MAX_PROCESSING_AGE, NUM_CONSECUTIVE_LEADER_SLOTS}, timing::timestamp, }, - std::net::SocketAddr, + std::{ + net::SocketAddr, + sync::{atomic::Ordering, Arc}, + }, tokio::{ sync::mpsc, time::{sleep, Duration}, @@ -72,7 +75,7 @@ pub(crate) struct ConnectionWorker { connection: ConnectionState, skip_check_transaction_age: bool, max_reconnect_attempts: usize, - send_txs_stats: SendTransactionStats, + send_txs_stats: Arc, cancel: CancellationToken, } @@ -93,6 +96,7 @@ impl ConnectionWorker { transactions_receiver: mpsc::Receiver, skip_check_transaction_age: bool, max_reconnect_attempts: usize, + send_txs_stats: Arc, ) -> (Self, CancellationToken) { let cancel = CancellationToken::new(); @@ -103,7 +107,7 @@ impl ConnectionWorker { connection: ConnectionState::NotSetup, skip_check_transaction_age, max_reconnect_attempts, - send_txs_stats: SendTransactionStats::default(), + send_txs_stats, cancel: cancel.clone(), }; @@ -155,11 +159,6 @@ impl ConnectionWorker { } } - /// Retrieves the statistics for transactions sent by this worker. - pub fn transaction_stats(&self) -> &SendTransactionStats { - &self.send_txs_stats - } - /// Sends a batch of transactions using the provided `connection`. /// /// Each transaction in the batch is sent over the QUIC streams one at the @@ -183,11 +182,12 @@ impl ConnectionWorker { if let Err(error) = result { trace!("Failed to send transaction over stream with error: {error}."); - record_error(error, &mut self.send_txs_stats); + record_error(error, &self.send_txs_stats); self.connection = ConnectionState::Retry(0); } else { - self.send_txs_stats.successfully_sent = - self.send_txs_stats.successfully_sent.saturating_add(1); + self.send_txs_stats + .successfully_sent + .fetch_add(1, Ordering::Relaxed); } } measure_send.stop(); @@ -221,14 +221,14 @@ impl ConnectionWorker { } Err(err) => { warn!("Connection error {}: {}", self.peer, err); - record_error(err.into(), &mut self.send_txs_stats); + record_error(err.into(), &self.send_txs_stats); self.connection = ConnectionState::Retry(max_retries_attempt.saturating_add(1)); } } } Err(connecting_error) => { - record_error(connecting_error.clone().into(), &mut self.send_txs_stats); + record_error(connecting_error.clone().into(), &self.send_txs_stats); match connecting_error { ConnectError::EndpointStopping => { debug!("Endpoint stopping, exit connection worker."); diff --git a/tpu-client-next/src/connection_workers_scheduler.rs b/tpu-client-next/src/connection_workers_scheduler.rs index 82b038827b48eb..e42040316c139a 100644 --- a/tpu-client-next/src/connection_workers_scheduler.rs +++ b/tpu-client-next/src/connection_workers_scheduler.rs @@ -9,7 +9,8 @@ use { create_client_config, create_client_endpoint, QuicClientCertificate, QuicError, }, transaction_batch::TransactionBatch, - workers_cache::{WorkerInfo, WorkersCache, WorkersCacheError}, + workers_cache::{maybe_shutdown_worker, WorkerInfo, WorkersCache, WorkersCacheError}, + SendTransactionStats, }, log::*, quinn::Endpoint, @@ -39,6 +40,25 @@ pub enum ConnectionWorkersSchedulerError { LeaderReceiverDropped, } +/// [`Fanout`] is a configuration struct that specifies how many leaders should +/// be targeted when sending transactions and connecting. +/// +/// Note, that the unit is number of leaders per +/// [`NUM_CONSECUTIVE_LEADER_SLOTS`]. It means that if the leader schedule is +/// [L1, L1, L1, L1, L1, L1, L1, L1, L2, L2, L2, L2], the leaders per +/// consecutive leader slots are [L1, L1, L2], so there are 3 of them. +/// +/// The idea of having a separate `connect` parameter is to create a set of +/// nodes to connect to in advance in order to hide the latency of opening new +/// connection. Hence, `connect` must be greater or equal to `send` +pub struct Fanout { + /// The number of leaders to target for sending transactions. + pub send: usize, + + /// The number of leaders to target for establishing connections. + pub connect: usize, +} + /// Configuration for the [`ConnectionWorkersScheduler`]. /// /// This struct holds the necessary settings to initialize and manage connection @@ -66,10 +86,8 @@ pub struct ConnectionWorkersSchedulerConfig { /// connection failure. pub max_reconnect_attempts: usize, - /// The number of slots to look ahead during the leader estimation - /// procedure. Determines how far into the future leaders are estimated, - /// allowing connections to be established with those leaders in advance. - pub lookahead_slots: u64, + /// Configures the number of leaders to connect to and send transactions to. + pub leaders_fanout: Fanout, } impl ConnectionWorkersScheduler { @@ -90,7 +108,7 @@ impl ConnectionWorkersScheduler { skip_check_transaction_age, worker_channel_size, max_reconnect_attempts, - lookahead_slots, + leaders_fanout, }: ConnectionWorkersSchedulerConfig, mut leader_updater: Box, mut transaction_receiver: mpsc::Receiver, @@ -99,6 +117,7 @@ impl ConnectionWorkersScheduler { let endpoint = Self::setup_endpoint(bind, validator_identity)?; debug!("Client endpoint bind address: {:?}", endpoint.local_addr()); let mut workers = WorkersCache::new(num_connections, cancel.clone()); + let mut send_stats_per_addr = SendTransactionStatsPerAddr::new(); loop { let transaction_batch = tokio::select! { @@ -114,50 +133,49 @@ impl ConnectionWorkersScheduler { break; } }; - let updated_leaders = leader_updater.next_leaders(lookahead_slots); - let new_leader = &updated_leaders[0]; - let future_leaders = &updated_leaders[1..]; - if !workers.contains(new_leader) { - debug!("No existing workers for {new_leader:?}, starting a new one."); - let worker = Self::spawn_worker( - &endpoint, - new_leader, - worker_channel_size, - skip_check_transaction_age, - max_reconnect_attempts, - ); - workers.push(*new_leader, worker).await; - } - tokio::select! { - send_res = workers.send_transactions_to_address(new_leader, transaction_batch) => match send_res { - Ok(()) => (), - Err(WorkersCacheError::ShutdownError) => { - debug!("Connection to {new_leader} was closed, worker cache shutdown"); - } - Err(err) => { - warn!("Connection to {new_leader} was closed, worker error: {err}"); - // If we has failed to send batch, it will be dropped. - } - }, - () = cancel.cancelled() => { - debug!("Cancelled: Shutting down"); - break; - } - }; + let updated_leaders = leader_updater.next_leaders(leaders_fanout.connect); - // Regardless of who is leader, add future leaders to the cache to - // hide the latency of opening the connection. - for peer in future_leaders { + let (fanout_leaders, connect_leaders) = + split_leaders(&updated_leaders, &leaders_fanout); + // add future leaders to the cache to hide the latency of opening + // the connection. + for peer in connect_leaders { if !workers.contains(peer) { + let stats = send_stats_per_addr.entry(peer.ip()).or_default(); let worker = Self::spawn_worker( &endpoint, peer, worker_channel_size, skip_check_transaction_age, max_reconnect_attempts, + stats.clone(), ); - workers.push(*peer, worker).await; + maybe_shutdown_worker(workers.push(*peer, worker)); + } + } + + for new_leader in fanout_leaders { + if !workers.contains(new_leader) { + warn!("No existing worker for {new_leader:?}, skip sending to this leader."); + continue; + } + + let send_res = + workers.try_send_transactions_to_address(new_leader, transaction_batch.clone()); + match send_res { + Ok(()) => (), + Err(WorkersCacheError::ShutdownError) => { + debug!("Connection to {new_leader} was closed, worker cache shutdown"); + } + Err(WorkersCacheError::ReceiverDropped) => { + // Remove the worker from the cache, if the peer has disconnected. + maybe_shutdown_worker(workers.pop(*new_leader)); + } + Err(err) => { + warn!("Connection to {new_leader} was closed, worker error: {err}"); + // If we has failed to send batch, it will be dropped. + } } } } @@ -166,7 +184,7 @@ impl ConnectionWorkersScheduler { endpoint.close(0u32.into(), b"Closing connection"); leader_updater.stop().await; - Ok(workers.transaction_stats().clone()) + Ok(send_stats_per_addr) } /// Sets up the QUIC endpoint for the scheduler to handle connections. @@ -191,6 +209,7 @@ impl ConnectionWorkersScheduler { worker_channel_size: usize, skip_check_transaction_age: bool, max_reconnect_attempts: usize, + stats: Arc, ) -> WorkerInfo { let (txs_sender, txs_receiver) = mpsc::channel(worker_channel_size); let endpoint = endpoint.clone(); @@ -202,12 +221,31 @@ impl ConnectionWorkersScheduler { txs_receiver, skip_check_transaction_age, max_reconnect_attempts, + stats, ); let handle = tokio::spawn(async move { worker.run().await; - worker.transaction_stats().clone() }); WorkerInfo::new(txs_sender, handle, cancel) } } + +/// Splits `leaders` into two slices based on the `fanout` configuration: +/// * the first slice contains the leaders to which transactions will be sent, +/// * the second vector contains the leaders, used to warm up connections. This +/// slice includes the the first set. +fn split_leaders<'leaders>( + leaders: &'leaders [SocketAddr], + fanout: &Fanout, +) -> (&'leaders [SocketAddr], &'leaders [SocketAddr]) { + let Fanout { send, connect } = fanout; + assert!(send <= connect); + let send_count = (*send).min(leaders.len()); + let connect_count = (*connect).min(leaders.len()); + + let send_slice = &leaders[..send_count]; + let connect_slice = &leaders[..connect_count]; + + (send_slice, connect_slice) +} diff --git a/tpu-client-next/src/leader_updater.rs b/tpu-client-next/src/leader_updater.rs index 5e07b9b0bfe612..1c7d16cd2acb2d 100644 --- a/tpu-client-next/src/leader_updater.rs +++ b/tpu-client-next/src/leader_updater.rs @@ -13,6 +13,7 @@ use { log::*, solana_connection_cache::connection_cache::Protocol, solana_rpc_client::nonblocking::rpc_client::RpcClient, + solana_sdk::clock::NUM_CONSECUTIVE_LEADER_SLOTS, solana_tpu_client::nonblocking::tpu_client::LeaderTpuService, std::{ fmt, @@ -22,6 +23,7 @@ use { Arc, }, }, + thiserror::Error, }; /// [`LeaderUpdater`] trait abstracts out functionality required for the @@ -29,19 +31,22 @@ use { /// identify next leaders to send transactions to. #[async_trait] pub trait LeaderUpdater: Send { - /// Returns next unique leaders for the next `lookahead_slots` starting from + /// Returns next leaders for the next `lookahead_leaders` starting from /// current estimated slot. /// + /// Leaders are returned per [`NUM_CONSECUTIVE_LEADER_SLOTS`] to avoid unnecessary repetition. + /// /// If the current leader estimation is incorrect and transactions are sent to /// only one estimated leader, there is a risk of losing all the transactions, /// depending on the forwarding policy. - fn next_leaders(&self, lookahead_slots: u64) -> Vec; + fn next_leaders(&mut self, lookahead_leaders: usize) -> Vec; /// Stop [`LeaderUpdater`] and releases all associated resources. async fn stop(&mut self); } /// Error type for [`LeaderUpdater`]. +#[derive(Error, PartialEq)] pub struct LeaderUpdaterError; impl fmt::Display for LeaderUpdaterError { @@ -98,7 +103,9 @@ struct LeaderUpdaterService { #[async_trait] impl LeaderUpdater for LeaderUpdaterService { - fn next_leaders(&self, lookahead_slots: u64) -> Vec { + fn next_leaders(&mut self, lookahead_leaders: usize) -> Vec { + let lookahead_slots = + (lookahead_leaders as u64).saturating_mul(NUM_CONSECUTIVE_LEADER_SLOTS); self.leader_tpu_service.leader_tpu_sockets(lookahead_slots) } @@ -116,7 +123,7 @@ struct PinnedLeaderUpdater { #[async_trait] impl LeaderUpdater for PinnedLeaderUpdater { - fn next_leaders(&self, _lookahead_slots: u64) -> Vec { + fn next_leaders(&mut self, _lookahead_leaders: usize) -> Vec { self.address.clone() } diff --git a/tpu-client-next/src/send_transaction_stats.rs b/tpu-client-next/src/send_transaction_stats.rs index abe68b8bf60213..fe16e2546ea411 100644 --- a/tpu-client-next/src/send_transaction_stats.rs +++ b/tpu-client-next/src/send_transaction_stats.rs @@ -4,86 +4,118 @@ use { super::QuicError, quinn::{ConnectError, ConnectionError, WriteError}, - std::{collections::HashMap, fmt, net::IpAddr}, + std::{ + collections::HashMap, + fmt, + net::IpAddr, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, + }, }; /// [`SendTransactionStats`] aggregates counters related to sending transactions. -#[derive(Debug, Default, Clone, PartialEq)] +#[derive(Debug, Default)] pub struct SendTransactionStats { - pub successfully_sent: u64, - pub connect_error_cids_exhausted: u64, - pub connect_error_invalid_remote_address: u64, - pub connect_error_other: u64, - pub connection_error_application_closed: u64, - pub connection_error_cids_exhausted: u64, - pub connection_error_connection_closed: u64, - pub connection_error_locally_closed: u64, - pub connection_error_reset: u64, - pub connection_error_timed_out: u64, - pub connection_error_transport_error: u64, - pub connection_error_version_mismatch: u64, - pub write_error_closed_stream: u64, - pub write_error_connection_lost: u64, - pub write_error_stopped: u64, - pub write_error_zero_rtt_rejected: u64, + pub successfully_sent: AtomicU64, + pub connect_error_cids_exhausted: AtomicU64, + pub connect_error_invalid_remote_address: AtomicU64, + pub connect_error_other: AtomicU64, + pub connection_error_application_closed: AtomicU64, + pub connection_error_cids_exhausted: AtomicU64, + pub connection_error_connection_closed: AtomicU64, + pub connection_error_locally_closed: AtomicU64, + pub connection_error_reset: AtomicU64, + pub connection_error_timed_out: AtomicU64, + pub connection_error_transport_error: AtomicU64, + pub connection_error_version_mismatch: AtomicU64, + pub write_error_closed_stream: AtomicU64, + pub write_error_connection_lost: AtomicU64, + pub write_error_stopped: AtomicU64, + pub write_error_zero_rtt_rejected: AtomicU64, } #[allow(clippy::arithmetic_side_effects)] -pub fn record_error(err: QuicError, stats: &mut SendTransactionStats) { +pub fn record_error(err: QuicError, stats: &SendTransactionStats) { match err { QuicError::Connect(ConnectError::EndpointStopping) => { - stats.connect_error_other += 1; + stats.connect_error_other.fetch_add(1, Ordering::Relaxed); } QuicError::Connect(ConnectError::CidsExhausted) => { - stats.connect_error_cids_exhausted += 1; + stats + .connect_error_cids_exhausted + .fetch_add(1, Ordering::Relaxed); } QuicError::Connect(ConnectError::InvalidServerName(_)) => { - stats.connect_error_other += 1; + stats.connect_error_other.fetch_add(1, Ordering::Relaxed); } QuicError::Connect(ConnectError::InvalidRemoteAddress(_)) => { - stats.connect_error_invalid_remote_address += 1; + stats + .connect_error_invalid_remote_address + .fetch_add(1, Ordering::Relaxed); } QuicError::Connect(ConnectError::NoDefaultClientConfig) => { - stats.connect_error_other += 1; + stats.connect_error_other.fetch_add(1, Ordering::Relaxed); } QuicError::Connect(ConnectError::UnsupportedVersion) => { - stats.connect_error_other += 1; + stats.connect_error_other.fetch_add(1, Ordering::Relaxed); } QuicError::Connection(ConnectionError::VersionMismatch) => { - stats.connection_error_version_mismatch += 1; + stats + .connection_error_version_mismatch + .fetch_add(1, Ordering::Relaxed); } QuicError::Connection(ConnectionError::TransportError(_)) => { - stats.connection_error_transport_error += 1; + stats + .connection_error_transport_error + .fetch_add(1, Ordering::Relaxed); } QuicError::Connection(ConnectionError::ConnectionClosed(_)) => { - stats.connection_error_connection_closed += 1; + stats + .connection_error_connection_closed + .fetch_add(1, Ordering::Relaxed); } QuicError::Connection(ConnectionError::ApplicationClosed(_)) => { - stats.connection_error_application_closed += 1; + stats + .connection_error_application_closed + .fetch_add(1, Ordering::Relaxed); } QuicError::Connection(ConnectionError::Reset) => { - stats.connection_error_reset += 1; + stats.connection_error_reset.fetch_add(1, Ordering::Relaxed); } QuicError::Connection(ConnectionError::TimedOut) => { - stats.connection_error_timed_out += 1; + stats + .connection_error_timed_out + .fetch_add(1, Ordering::Relaxed); } QuicError::Connection(ConnectionError::LocallyClosed) => { - stats.connection_error_locally_closed += 1; + stats + .connection_error_locally_closed + .fetch_add(1, Ordering::Relaxed); } QuicError::Connection(ConnectionError::CidsExhausted) => { - stats.connection_error_cids_exhausted += 1; + stats + .connection_error_cids_exhausted + .fetch_add(1, Ordering::Relaxed); } QuicError::StreamWrite(WriteError::Stopped(_)) => { - stats.write_error_stopped += 1; + stats.write_error_stopped.fetch_add(1, Ordering::Relaxed); } QuicError::StreamWrite(WriteError::ConnectionLost(_)) => { - stats.write_error_connection_lost += 1; + stats + .write_error_connection_lost + .fetch_add(1, Ordering::Relaxed); } QuicError::StreamWrite(WriteError::ClosedStream) => { - stats.write_error_closed_stream += 1; + stats + .write_error_closed_stream + .fetch_add(1, Ordering::Relaxed); } QuicError::StreamWrite(WriteError::ZeroRttRejected) => { - stats.write_error_zero_rtt_rejected += 1; + stats + .write_error_zero_rtt_rejected + .fetch_add(1, Ordering::Relaxed); } // Endpoint is created on the scheduler level and handled separately // No counters are used for this case. @@ -91,39 +123,7 @@ pub fn record_error(err: QuicError, stats: &mut SendTransactionStats) { } } -pub type SendTransactionStatsPerAddr = HashMap; - -macro_rules! add_fields { - ($self:ident += $other:ident for: $( $field:ident ),* $(,)? ) => { - $( - $self.$field = $self.$field.saturating_add($other.$field); - )* - }; -} - -impl SendTransactionStats { - pub fn add(&mut self, other: &SendTransactionStats) { - add_fields!( - self += other for: - successfully_sent, - connect_error_cids_exhausted, - connect_error_invalid_remote_address, - connect_error_other, - connection_error_application_closed, - connection_error_cids_exhausted, - connection_error_connection_closed, - connection_error_locally_closed, - connection_error_reset, - connection_error_timed_out, - connection_error_transport_error, - connection_error_version_mismatch, - write_error_closed_stream, - write_error_connection_lost, - write_error_stopped, - write_error_zero_rtt_rejected, - ); - } -} +pub type SendTransactionStatsPerAddr = HashMap>; macro_rules! display_send_transaction_stats_body { ($self:ident, $f:ident, $($field:ident),* $(,)?) => { @@ -135,7 +135,7 @@ macro_rules! display_send_transaction_stats_body { "\x20 ", stringify!($field), ": {},\n", )* ), - $($self.$field),* + $($self.$field.load(Ordering::Relaxed)),* ) }; } @@ -164,3 +164,47 @@ impl fmt::Display for SendTransactionStats { ) } } + +/// For tests it is useful to be have PartialEq but we cannot have it on top of +/// atomics. This macro creates a structure with the same attributes but of type +/// u64. +macro_rules! define_non_atomic_struct_for { + ($name:ident, $atomic_name:ident, {$($field:ident),* $(,)?}) => { + #[derive(Debug, Default, PartialEq)] + pub struct $name { + $(pub $field: u64),* + } + + impl $atomic_name { + pub fn to_non_atomic(&self) -> $name { + $name { + $($field: self.$field.load(Ordering::Relaxed)),* + } + } + } + }; +} + +// Define the non-atomic struct and the `to_non_atomic` conversion method +define_non_atomic_struct_for!( + SendTransactionStatsNonAtomic, + SendTransactionStats, + { + successfully_sent, + connect_error_cids_exhausted, + connect_error_invalid_remote_address, + connect_error_other, + connection_error_application_closed, + connection_error_cids_exhausted, + connection_error_connection_closed, + connection_error_locally_closed, + connection_error_reset, + connection_error_timed_out, + connection_error_transport_error, + connection_error_version_mismatch, + write_error_closed_stream, + write_error_connection_lost, + write_error_stopped, + write_error_zero_rtt_rejected + } +); diff --git a/tpu-client-next/src/workers_cache.rs b/tpu-client-next/src/workers_cache.rs index 90d2954b669d7f..d3d25223dfcbfd 100644 --- a/tpu-client-next/src/workers_cache.rs +++ b/tpu-client-next/src/workers_cache.rs @@ -3,31 +3,30 @@ //! batches, and gathering send transaction statistics. use { - super::SendTransactionStats, crate::transaction_batch::TransactionBatch, log::*, lru::LruCache, - std::{ - collections::HashMap, - net::{IpAddr, SocketAddr}, - }, + std::net::SocketAddr, thiserror::Error, - tokio::{sync::mpsc, task::JoinHandle}, + tokio::{ + sync::mpsc::{self, error::TrySendError}, + task::JoinHandle, + }, tokio_util::sync::CancellationToken, }; /// [`WorkerInfo`] holds information about a worker responsible for sending /// transaction batches. pub(crate) struct WorkerInfo { - pub sender: mpsc::Sender, - pub handle: JoinHandle, - pub cancel: CancellationToken, + sender: mpsc::Sender, + handle: JoinHandle<()>, + cancel: CancellationToken, } impl WorkerInfo { pub fn new( sender: mpsc::Sender, - handle: JoinHandle, + handle: JoinHandle<()>, cancel: CancellationToken, ) -> Self { Self { @@ -37,27 +36,23 @@ impl WorkerInfo { } } - async fn send_transactions( - &self, - txs_batch: TransactionBatch, - ) -> Result<(), WorkersCacheError> { - self.sender - .send(txs_batch) - .await - .map_err(|_| WorkersCacheError::ReceiverDropped)?; + fn try_send_transactions(&self, txs_batch: TransactionBatch) -> Result<(), WorkersCacheError> { + self.sender.try_send(txs_batch).map_err(|err| match err { + TrySendError::Full(_) => WorkersCacheError::FullChannel, + TrySendError::Closed(_) => WorkersCacheError::ReceiverDropped, + })?; Ok(()) } /// Closes the worker by dropping the sender and awaiting the worker's /// statistics. - async fn shutdown(self) -> Result { + async fn shutdown(self) -> Result<(), WorkersCacheError> { self.cancel.cancel(); drop(self.sender); - let stats = self - .handle + self.handle .await .map_err(|_| WorkersCacheError::TaskJoinFailure)?; - Ok(stats) + Ok(()) } } @@ -65,7 +60,6 @@ impl WorkerInfo { /// manage workers. It also tracks transaction statistics for each peer. pub(crate) struct WorkersCache { workers: LruCache, - send_stats_per_addr: HashMap, /// Indicates that the `WorkersCache` is been `shutdown()`, interrupting any outstanding /// `send_txs()` invocations. @@ -78,6 +72,9 @@ pub enum WorkersCacheError { #[error("Work receiver has been dropped unexpectedly.")] ReceiverDropped, + #[error("Worker's channel is full.")] + FullChannel, + #[error("Task failed to join.")] TaskJoinFailure, @@ -86,32 +83,45 @@ pub enum WorkersCacheError { } impl WorkersCache { - pub fn new(capacity: usize, cancel: CancellationToken) -> Self { + pub(crate) fn new(capacity: usize, cancel: CancellationToken) -> Self { Self { workers: LruCache::new(capacity), - send_stats_per_addr: HashMap::new(), cancel, } } - pub fn contains(&self, peer: &SocketAddr) -> bool { + pub(crate) fn contains(&self, peer: &SocketAddr) -> bool { self.workers.contains(peer) } - pub async fn push(&mut self, peer: SocketAddr, peer_worker: WorkerInfo) { - // Although there might be concerns about the performance implications - // of waiting for the worker to be closed when trying to add a new - // worker, the idea is that these workers are almost always created in - // advance so the latency is hidden. - if let Some((leader, popped_worker)) = self.workers.push(peer, peer_worker) { - self.shutdown_worker(leader, popped_worker).await; + pub(crate) fn push( + &mut self, + leader: SocketAddr, + peer_worker: WorkerInfo, + ) -> Option { + if let Some((leader, popped_worker)) = self.workers.push(leader, peer_worker) { + return Some(ShutdownWorker { + leader, + worker: popped_worker, + }); + } + None + } + + pub(crate) fn pop(&mut self, leader: SocketAddr) -> Option { + if let Some(popped_worker) = self.workers.pop(&leader) { + return Some(ShutdownWorker { + leader, + worker: popped_worker, + }); } + None } /// Sends a batch of transactions to the worker for a given peer. If the /// worker for the peer is disconnected or fails, it is removed from the /// cache. - pub async fn send_transactions_to_address( + pub(crate) fn try_send_transactions_to_address( &mut self, peer: &SocketAddr, txs_batch: TransactionBatch, @@ -119,66 +129,68 @@ impl WorkersCache { let Self { workers, cancel, .. } = self; + if cancel.is_cancelled() { + return Err(WorkersCacheError::ShutdownError); + } - let body = async move { - let current_worker = workers.get(peer).expect( - "Failed to fetch worker for peer {peer}.\n\ + let current_worker = workers.get(peer).expect( + "Failed to fetch worker for peer {peer}.\n\ Peer existence must be checked before this call using `contains` method.", - ); - let send_res = current_worker.send_transactions(txs_batch).await; - - if let Err(WorkersCacheError::ReceiverDropped) = send_res { - // Remove the worker from the cache, if the peer has disconnected. - if let Some(current_worker) = workers.pop(peer) { - // To avoid obscuring the error from send, ignore a possible - // `TaskJoinFailure`. - let close_result = current_worker.shutdown().await; - if let Err(error) = close_result { - error!("Error while closing worker: {error}."); - } - } - } + ); + let send_res = current_worker.try_send_transactions(txs_batch); - send_res - }; - - tokio::select! { - send_res = body => send_res, - () = cancel.cancelled() => Err(WorkersCacheError::ShutdownError), + if let Err(WorkersCacheError::ReceiverDropped) = send_res { + warn!( + "Failed to deliver transaction batch for leader {}, drop batch.", + peer.ip() + ); } - } - pub fn transaction_stats(&self) -> &HashMap { - &self.send_stats_per_addr + send_res } /// Closes and removes all workers in the cache. This is typically done when /// shutting down the system. - pub async fn shutdown(&mut self) { - // Interrupt any outstanding `send_txs()` calls. + pub(crate) async fn shutdown(&mut self) { + // Interrupt any outstanding `send_transactions()` calls. self.cancel.cancel(); while let Some((leader, worker)) = self.workers.pop_lru() { - self.shutdown_worker(leader, worker).await; + let res = worker.shutdown().await; + if let Err(err) = res { + debug!("Error while shutting down worker for {leader}: {err}"); + } } } +} - /// Shuts down a worker for a given peer by closing the worker and gathering - /// its transaction statistics. - async fn shutdown_worker(&mut self, leader: SocketAddr, worker: WorkerInfo) { - let res = worker.shutdown().await; +/// [`ShutdownWorker`] takes care of stopping the worker. It's method +/// `shutdown()` should be executed in a separate task to hide the latency of +/// finishing worker gracefully. +pub(crate) struct ShutdownWorker { + leader: SocketAddr, + worker: WorkerInfo, +} - let stats = match res { - Ok(stats) => stats, - Err(err) => { - debug!("Error while shutting down worker for {leader}: {err}"); - return; - } - }; +impl ShutdownWorker { + pub(crate) fn leader(&self) -> SocketAddr { + self.leader + } - self.send_stats_per_addr - .entry(leader.ip()) - .and_modify(|e| e.add(&stats)) - .or_insert(stats); + pub(crate) async fn shutdown(self) -> Result<(), WorkersCacheError> { + self.worker.shutdown().await } } + +pub(crate) fn maybe_shutdown_worker(worker: Option) { + let Some(worker) = worker else { + return; + }; + tokio::spawn(async move { + let leader = worker.leader(); + let res = worker.shutdown().await; + if let Err(err) = res { + debug!("Error while shutting down worker for {leader}: {err}"); + } + }); +} diff --git a/tpu-client-next/tests/connection_workers_scheduler_test.rs b/tpu-client-next/tests/connection_workers_scheduler_test.rs index 4944a80542ce87..8a8d623ab8da3f 100644 --- a/tpu-client-next/tests/connection_workers_scheduler_test.rs +++ b/tpu-client-next/tests/connection_workers_scheduler_test.rs @@ -16,10 +16,11 @@ use { streamer::StakedNodes, }, solana_tpu_client_next::{ - connection_workers_scheduler::ConnectionWorkersSchedulerConfig, - leader_updater::create_leader_updater, transaction_batch::TransactionBatch, - ConnectionWorkersScheduler, ConnectionWorkersSchedulerError, SendTransactionStats, - SendTransactionStatsPerAddr, + connection_workers_scheduler::{ConnectionWorkersSchedulerConfig, Fanout}, + leader_updater::create_leader_updater, + send_transaction_stats::SendTransactionStatsNonAtomic, + transaction_batch::TransactionBatch, + ConnectionWorkersScheduler, ConnectionWorkersSchedulerError, SendTransactionStatsPerAddr, }, std::{ collections::HashMap, @@ -46,9 +47,17 @@ fn test_config(validator_identity: Option) -> ConnectionWorkersSchedule stake_identity: validator_identity, num_connections: 1, skip_check_transaction_age: false, - worker_channel_size: 2, + // At the moment we have only one strategy to send transactions: we try + // to put to worker channel transaction batch and in case of failure + // just drop it. This requires to use large channels here. In the + // future, we are planning to add an option to send with backpressure at + // the speed of fastest leader. + worker_channel_size: 100, max_reconnect_attempts: 4, - lookahead_slots: 1, + leaders_fanout: Fanout { + send: 1, + connect: 1, + }, } } @@ -89,7 +98,7 @@ async fn join_scheduler( scheduler_handle: JoinHandle< Result, >, -) -> SendTransactionStats { +) -> SendTransactionStatsNonAtomic { let stats_per_ip = scheduler_handle .await .unwrap() @@ -97,7 +106,7 @@ async fn join_scheduler( stats_per_ip .get(&IpAddr::from_str("127.0.0.1").unwrap()) .expect("setup_connection_worker_scheduler() connected to a leader at 127.0.0.1") - .clone() + .to_non_atomic() } // Specify the pessimistic time to finish generation and result checks. @@ -239,7 +248,7 @@ async fn test_basic_transactions_sending() { let localhost_stats = join_scheduler(scheduler_handle).await; assert_eq!( localhost_stats, - SendTransactionStats { + SendTransactionStatsNonAtomic { successfully_sent: expected_num_txs as u64, ..Default::default() } @@ -436,7 +445,7 @@ async fn test_staked_connection() { let localhost_stats = join_scheduler(scheduler_handle).await; assert_eq!( localhost_stats, - SendTransactionStats { + SendTransactionStatsNonAtomic { successfully_sent: expected_num_txs as u64, ..Default::default() } @@ -483,7 +492,7 @@ async fn test_connection_throttling() { let localhost_stats = join_scheduler(scheduler_handle).await; assert_eq!( localhost_stats, - SendTransactionStats { + SendTransactionStatsNonAtomic { successfully_sent: expected_num_txs as u64, ..Default::default() } @@ -524,13 +533,14 @@ async fn test_no_host() { tx_sender_shutdown.await; // While attempting to establish a connection with a nonexistent host, we fill the worker's - // channel. Transactions from this channel will never be sent and will eventually be dropped - // without increasing the `SendTransactionStats` counters. + // channel. let stats = scheduler_handle .await .expect("Scheduler should stop successfully") .expect("Scheduler execution was successful"); - assert_eq!(stats, HashMap::new()); + let stats = stats.get(&server_ip).unwrap().to_non_atomic(); + // `5` because `config.max_reconnect_attempts` is 4 + assert_eq!(stats.connect_error_invalid_remote_address, 5); } // Check that when the client is rate-limited by server, we update counters @@ -586,7 +596,7 @@ async fn test_rate_limiting() { // do the shutdown. If we increase the time we wait in `count_received_packets_for`, we would // start seeing a `connection_error_timed_out` incremented to 1. Potentially, we may want to // accept both 0 and 1 as valid values for it. - assert_eq!(localhost_stats, SendTransactionStats::default()); + assert_eq!(localhost_stats, SendTransactionStatsNonAtomic::default()); // Stop the server. exit.store(true, Ordering::Relaxed); @@ -663,7 +673,7 @@ async fn test_rate_limiting_establish_connection() { // All the rest of the error counters should be 0. localhost_stats.connection_error_timed_out = 0; localhost_stats.successfully_sent = 0; - assert_eq!(localhost_stats, SendTransactionStats::default()); + assert_eq!(localhost_stats, SendTransactionStatsNonAtomic::default()); // Stop the server. exit.store(true, Ordering::Relaxed);