From 3ee1853a487f8f6646f8b3d39284798ae64fb23b Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Thu, 7 Nov 2024 14:20:34 +0100 Subject: [PATCH] address PR comments --- tpu-client-next/src/send_transaction_stats.rs | 4 +-- tpu-client-next/src/workers_cache.rs | 28 +++++++++++-------- .../connection_workers_scheduler_test.rs | 23 +++++++++------ 3 files changed, 32 insertions(+), 23 deletions(-) diff --git a/tpu-client-next/src/send_transaction_stats.rs b/tpu-client-next/src/send_transaction_stats.rs index 43711f3e385851..fe16e2546ea411 100644 --- a/tpu-client-next/src/send_transaction_stats.rs +++ b/tpu-client-next/src/send_transaction_stats.rs @@ -168,7 +168,7 @@ impl fmt::Display for SendTransactionStats { /// For tests it is useful to be have PartialEq but we cannot have it on top of /// atomics. This macro creates a structure with the same attributes but of type /// u64. -macro_rules! define_non_atomic_struct { +macro_rules! define_non_atomic_struct_for { ($name:ident, $atomic_name:ident, {$($field:ident),* $(,)?}) => { #[derive(Debug, Default, PartialEq)] pub struct $name { @@ -186,7 +186,7 @@ macro_rules! define_non_atomic_struct { } // Define the non-atomic struct and the `to_non_atomic` conversion method -define_non_atomic_struct!( +define_non_atomic_struct_for!( SendTransactionStatsNonAtomic, SendTransactionStats, { diff --git a/tpu-client-next/src/workers_cache.rs b/tpu-client-next/src/workers_cache.rs index 39965fa7c9a113..d3d25223dfcbfd 100644 --- a/tpu-client-next/src/workers_cache.rs +++ b/tpu-client-next/src/workers_cache.rs @@ -8,7 +8,10 @@ use { lru::LruCache, std::net::SocketAddr, thiserror::Error, - tokio::{sync::mpsc, task::JoinHandle}, + tokio::{ + sync::mpsc::{self, error::TrySendError}, + task::JoinHandle, + }, tokio_util::sync::CancellationToken, }; @@ -35,8 +38,8 @@ impl WorkerInfo { fn try_send_transactions(&self, txs_batch: TransactionBatch) -> Result<(), WorkersCacheError> { self.sender.try_send(txs_batch).map_err(|err| match err { - mpsc::error::TrySendError::Full(_) => WorkersCacheError::FullChannel, - mpsc::error::TrySendError::Closed(_) => WorkersCacheError::ReceiverDropped, + TrySendError::Full(_) => WorkersCacheError::FullChannel, + TrySendError::Closed(_) => WorkersCacheError::ReceiverDropped, })?; Ok(()) } @@ -180,13 +183,14 @@ impl ShutdownWorker { } pub(crate) fn maybe_shutdown_worker(worker: Option) { - if let Some(worker) = worker { - tokio::spawn(async move { - let leader = worker.leader(); - let res = worker.shutdown().await; - if let Err(err) = res { - debug!("Error while shutting down worker for {leader}: {err}"); - } - }); - } + let Some(worker) = worker else { + return; + }; + tokio::spawn(async move { + let leader = worker.leader(); + let res = worker.shutdown().await; + if let Err(err) = res { + debug!("Error while shutting down worker for {leader}: {err}"); + } + }); } diff --git a/tpu-client-next/tests/connection_workers_scheduler_test.rs b/tpu-client-next/tests/connection_workers_scheduler_test.rs index 4448e958999c18..e64eba2c660468 100644 --- a/tpu-client-next/tests/connection_workers_scheduler_test.rs +++ b/tpu-client-next/tests/connection_workers_scheduler_test.rs @@ -48,6 +48,11 @@ fn test_config(validator_identity: Option) -> ConnectionWorkersSchedule stake_identity: validator_identity, num_connections: 1, skip_check_transaction_age: false, + // At the moment we have only one strategy to send transactions: we try + // to put to worker channel transaction batch and in case of failure + // just drop it. This requires to use large channels here. In the + // future, we are planning to add an option to send with backpressure at + // the speed of fastest leader. worker_channel_size: 100, max_reconnect_attempts: 4, lookahead_slots: 1, @@ -92,7 +97,7 @@ async fn join_scheduler( scheduler_handle: JoinHandle< Result, >, -) -> Arc { +) -> SendTransactionStatsNonAtomic { let stats_per_ip = scheduler_handle .await .unwrap() @@ -100,7 +105,7 @@ async fn join_scheduler( stats_per_ip .get(&IpAddr::from_str("127.0.0.1").unwrap()) .expect("setup_connection_worker_scheduler() connected to a leader at 127.0.0.1") - .clone() + .to_non_atomic() } // Specify the pessimistic time to finish generation and result checks. @@ -239,7 +244,7 @@ async fn test_basic_transactions_sending() { // Stop sending tx_sender_shutdown.await; - let localhost_stats = join_scheduler(scheduler_handle).await.to_non_atomic(); + let localhost_stats = join_scheduler(scheduler_handle).await; assert_eq!( localhost_stats, SendTransactionStatsNonAtomic { @@ -316,7 +321,7 @@ async fn test_connection_denied_until_allowed() { // Wait for the exchange to finish. tx_sender_shutdown.await; - let localhost_stats = join_scheduler(scheduler_handle).await.to_non_atomic(); + let localhost_stats = join_scheduler(scheduler_handle).await; // in case of pruning, server closes the connection with code 1 and error // message b"dropped". This might lead to connection error // (ApplicationClosed::ApplicationClose) or to stream error @@ -375,7 +380,7 @@ async fn test_connection_pruned_and_reopened() { // Wait for the exchange to finish. tx_sender_shutdown.await; - let localhost_stats = join_scheduler(scheduler_handle).await.to_non_atomic(); + let localhost_stats = join_scheduler(scheduler_handle).await; // in case of pruning, server closes the connection with code 1 and error // message b"dropped". This might lead to connection error // (ApplicationClosed::ApplicationClose) or to stream error @@ -436,7 +441,7 @@ async fn test_staked_connection() { // Wait for the exchange to finish. tx_sender_shutdown.await; - let localhost_stats = join_scheduler(scheduler_handle).await.to_non_atomic(); + let localhost_stats = join_scheduler(scheduler_handle).await; assert_eq!( localhost_stats, SendTransactionStatsNonAtomic { @@ -483,7 +488,7 @@ async fn test_connection_throttling() { // Stop sending tx_sender_shutdown.await; - let localhost_stats = join_scheduler(scheduler_handle).await.to_non_atomic(); + let localhost_stats = join_scheduler(scheduler_handle).await; assert_eq!( localhost_stats, SendTransactionStatsNonAtomic { @@ -584,7 +589,7 @@ async fn test_rate_limiting() { // And the scheduler. scheduler_cancel.cancel(); - let localhost_stats = join_scheduler(scheduler_handle).await.to_non_atomic(); + let localhost_stats = join_scheduler(scheduler_handle).await; // We do not expect to see any errors, as the connection is in the pending state still, when we // do the shutdown. If we increase the time we wait in `count_received_packets_for`, we would @@ -648,7 +653,7 @@ async fn test_rate_limiting_establish_connection() { // And the scheduler. scheduler_cancel.cancel(); - let mut localhost_stats = join_scheduler(scheduler_handle).await.to_non_atomic(); + let mut localhost_stats = join_scheduler(scheduler_handle).await; assert!( localhost_stats.connection_error_timed_out > 0, "As the quinn timeout is below 1 minute, a few connections will fail to connect during \