diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 20ec39bf8c0715..d3716392c91dcc 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -68,7 +68,7 @@ use { tokio_util::sync::CancellationToken, }; -pub const DEFAULT_WAIT_FOR_CHUNK_TIMEOUT: Duration = Duration::from_secs(10); +pub const DEFAULT_WAIT_FOR_CHUNK_TIMEOUT: Duration = Duration::from_secs(2); pub const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu"; @@ -1103,19 +1103,14 @@ async fn handle_connection( let stream_load_ema = stream_load_ema.clone(); tokio::spawn(async move { let mut maybe_batch = None; - // The min is to guard against a value too small which can wake up unnecessarily - // frequently and wasting CPU cycles. The max guard against waiting for too long - // which delay exit and cause some test failures when the timeout value is large. - // Within this value, the heuristic is to wake up 10 times to check for exit - // for the set timeout if there are no data. - let exit_check_interval = (wait_for_chunk_timeout / 10) - .clamp(Duration::from_millis(10), Duration::from_secs(1)); - let mut start = Instant::now(); loop { - // Read the next chunk + // Read the next chunk, waiting up to `wait_for_chunk_timeout`. If we don't get a + // chunk before then, we assume the stream is dead and stop the stream task. This + // can only happen if there's severe packet loss or the peer stop sending for + // whatever reason. let chunk = match tokio::select! { chunk = tokio::time::timeout( - exit_check_interval, + wait_for_chunk_timeout, stream.read_chunk(PACKET_DATA_SIZE, true)) => chunk, // If the peer gets disconnected stop the task right away. @@ -1133,15 +1128,11 @@ async fn handle_connection( } // timeout elapsed Err(_) => { - if start.elapsed() >= wait_for_chunk_timeout { - debug!("Timeout in receiving on stream"); - stats - .total_stream_read_timeouts - .fetch_add(1, Ordering::Relaxed); - break; - } else { - continue; - } + debug!("Timeout in receiving on stream"); + stats + .total_stream_read_timeouts + .fetch_add(1, Ordering::Relaxed); + break; } }; @@ -1158,7 +1149,6 @@ async fn handle_connection( last_update.store(timing::timestamp(), Ordering::Relaxed); break; } - start = Instant::now(); } stats.total_streams.fetch_sub(1, Ordering::Relaxed); @@ -1791,7 +1781,7 @@ pub mod test { s1.write_all(&[0u8]).await.unwrap_or_default(); // Wait long enough for the stream to timeout in receiving chunks - let sleep_time = Duration::from_secs(3).min(WAIT_FOR_STREAM_TIMEOUT * 1000); + let sleep_time = DEFAULT_WAIT_FOR_CHUNK_TIMEOUT * 2; sleep(sleep_time).await; // Test that the stream was created, but timed out in read @@ -1871,7 +1861,7 @@ pub mod test { CONNECTION_CLOSE_REASON_DROPPED_ENTRY, ); // Wait long enough for the stream to timeout in receiving chunks - let sleep_time = Duration::from_secs(1).min(WAIT_FOR_STREAM_TIMEOUT * 1000); + let sleep_time = DEFAULT_WAIT_FOR_CHUNK_TIMEOUT * 2; sleep(sleep_time).await; assert_eq!(stats.connection_removed.load(Ordering::Relaxed), 1); @@ -1884,7 +1874,7 @@ pub mod test { CONNECTION_CLOSE_REASON_DROPPED_ENTRY, ); // Wait long enough for the stream to timeout in receiving chunks - let sleep_time = Duration::from_secs(1).min(WAIT_FOR_STREAM_TIMEOUT * 1000); + let sleep_time = DEFAULT_WAIT_FOR_CHUNK_TIMEOUT * 2; sleep(sleep_time).await; assert_eq!(stats.connection_removed.load(Ordering::Relaxed), 2); diff --git a/streamer/src/nonblocking/testing_utilities.rs b/streamer/src/nonblocking/testing_utilities.rs index ab87334c7cc4c9..d569190f10d16d 100644 --- a/streamer/src/nonblocking/testing_utilities.rs +++ b/streamer/src/nonblocking/testing_utilities.rs @@ -3,6 +3,7 @@ use { super::quic::{ spawn_server_multi, SpawnNonBlockingServerResult, ALPN_TPU_PROTOCOL_ID, DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, DEFAULT_MAX_STREAMS_PER_MS, + DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, }, crate::{ quic::{StreamerStats, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS}, @@ -23,7 +24,6 @@ use { std::{ net::{SocketAddr, UdpSocket}, sync::{atomic::AtomicBool, Arc, RwLock}, - time::Duration, }, tokio::task::JoinHandle, }; @@ -201,7 +201,7 @@ pub fn setup_quic_server_with_sockets( max_unstaked_connections, max_streams_per_ms, max_connections_per_ipaddr_per_minute, - Duration::from_secs(2), + DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) .unwrap();