From 137b6af2175b43e1ad0bb173fe1010baf26fa319 Mon Sep 17 00:00:00 2001 From: Alessandro Decina Date: Tue, 22 Oct 2024 09:39:10 +0700 Subject: [PATCH] quic tpu: create fewer timeout futures (#3151) * quic tpu: create fewer timeout futures Use a cancellation token instead of polling an Arc to exit connection tasks. Before this change we used to create a timeout future for each tx, and immediately cancel it since virtually all connections always have incoming tx pending. Cancelling timeout futures is expensive as it goes inside the tokio driver and takes a mutex. On high load (1M tps), cancelling timeout futures takes 8% (!) of run time. With this change we create a cancellation token _once_ when a connection is established and that's it - no overhead after that. * quic tpu: lower stream timeout from 10s to 2s Don't wait 10s for one piece of a transaction to come before timing out. Lower the timeout to 2s, after which we assume a stream is dead. 2s is enough round trips to account for non catastrophic packet loss. If packet loss is causing >2s stream latency, the connection is hosed and the best thing we can do is save server resources for peers with better connectivity. --- Cargo.lock | 1 + programs/sbf/Cargo.lock | 1 + streamer/Cargo.toml | 1 + streamer/src/nonblocking/quic.rs | 424 +++++++++--------- streamer/src/nonblocking/testing_utilities.rs | 4 +- 5 files changed, 217 insertions(+), 214 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ac4a047518b824..82126c50168a6c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8192,6 +8192,7 @@ dependencies = [ "solana-transaction-metrics-tracker", "thiserror", "tokio", + "tokio-util 0.7.12", "x509-parser", ] diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 5e359efd6004f6..d7fe223c2e7ccd 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -6888,6 +6888,7 @@ dependencies = [ "solana-transaction-metrics-tracker", "thiserror", "tokio", + "tokio-util 0.7.1", "x509-parser", ] diff --git a/streamer/Cargo.toml b/streamer/Cargo.toml index 873cb4459e3327..da8b33d553fe58 100644 --- a/streamer/Cargo.toml +++ b/streamer/Cargo.toml @@ -38,6 +38,7 @@ solana-sdk = { workspace = true } solana-transaction-metrics-tracker = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = ["full"] } +tokio-util = { workspace = true } x509-parser = { workspace = true } [dev-dependencies] diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 0bc99e04652da9..12773ea821699e 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -65,10 +65,10 @@ use { task::JoinHandle, time::{sleep, timeout}, }, + tokio_util::sync::CancellationToken, }; -const WAIT_FOR_STREAM_TIMEOUT: Duration = Duration::from_millis(100); -pub const DEFAULT_WAIT_FOR_CHUNK_TIMEOUT: Duration = Duration::from_secs(10); +pub const DEFAULT_WAIT_FOR_CHUNK_TIMEOUT: Duration = Duration::from_secs(2); pub const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu"; @@ -583,7 +583,7 @@ fn handle_and_cache_new_connection( remote_addr, ); - if let Some((last_update, stream_exit, stream_counter)) = connection_table_l + if let Some((last_update, cancel_connection, stream_counter)) = connection_table_l .try_add_connection( ConnectionTableKey::new(remote_addr.ip(), params.remote_pubkey), remote_addr.port(), @@ -606,7 +606,7 @@ fn handle_and_cache_new_connection( remote_addr, last_update, connection_table, - stream_exit, + cancel_connection, params.clone(), wait_for_chunk_timeout, stream_load_ema, @@ -1031,7 +1031,7 @@ async fn handle_connection( remote_addr: SocketAddr, last_update: Arc, connection_table: Arc>, - stream_exit: Arc, + cancel: CancellationToken, params: NewConnectionHandlerParams, wait_for_chunk_timeout: Duration, stream_load_ema: Arc, @@ -1046,107 +1046,114 @@ async fn handle_connection( ); let stable_id = connection.stable_id(); stats.total_connections.fetch_add(1, Ordering::Relaxed); - while !stream_exit.load(Ordering::Relaxed) { - if let Ok(stream) = - tokio::time::timeout(WAIT_FOR_STREAM_TIMEOUT, connection.accept_uni()).await - { - match stream { - Ok(mut stream) => { - let max_streams_per_throttling_interval = stream_load_ema - .available_load_capacity_in_throttling_duration( - params.peer_type, - params.total_stake, - ); - - let throttle_interval_start = - stream_counter.reset_throttling_params_if_needed(); - let streams_read_in_throttle_interval = - stream_counter.stream_count.load(Ordering::Relaxed); - if streams_read_in_throttle_interval >= max_streams_per_throttling_interval { - // The peer is sending faster than we're willing to read. Sleep for what's - // left of this read interval so the peer backs off. - let throttle_duration = STREAM_THROTTLING_INTERVAL - .saturating_sub(throttle_interval_start.elapsed()); - - if !throttle_duration.is_zero() { - debug!("Throttling stream from {remote_addr:?}, peer type: {:?}, total stake: {}, \ + loop { + // Wait for new streams. If the peer is disconnected we get a cancellation signal and stop + // the connection task. + let mut stream = select! { + stream = connection.accept_uni() => match stream { + Ok(stream) => stream, + Err(e) => { + debug!("stream error: {:?}", e); + break; + } + }, + _ = cancel.cancelled() => break, + }; + + let max_streams_per_throttling_interval = stream_load_ema + .available_load_capacity_in_throttling_duration(params.peer_type, params.total_stake); + + let throttle_interval_start = stream_counter.reset_throttling_params_if_needed(); + let streams_read_in_throttle_interval = stream_counter.stream_count.load(Ordering::Relaxed); + if streams_read_in_throttle_interval >= max_streams_per_throttling_interval { + // The peer is sending faster than we're willing to read. Sleep for what's + // left of this read interval so the peer backs off. + let throttle_duration = + STREAM_THROTTLING_INTERVAL.saturating_sub(throttle_interval_start.elapsed()); + + if !throttle_duration.is_zero() { + debug!("Throttling stream from {remote_addr:?}, peer type: {:?}, total stake: {}, \ max_streams_per_interval: {max_streams_per_throttling_interval}, read_interval_streams: {streams_read_in_throttle_interval} \ throttle_duration: {throttle_duration:?}", params.peer_type, params.total_stake); - stats.throttled_streams.fetch_add(1, Ordering::Relaxed); - match params.peer_type { - ConnectionPeerType::Unstaked => { - stats - .throttled_unstaked_streams - .fetch_add(1, Ordering::Relaxed); - } - ConnectionPeerType::Staked(_) => { - stats - .throttled_staked_streams - .fetch_add(1, Ordering::Relaxed); - } - } - sleep(throttle_duration).await; - } + stats.throttled_streams.fetch_add(1, Ordering::Relaxed); + match params.peer_type { + ConnectionPeerType::Unstaked => { + stats + .throttled_unstaked_streams + .fetch_add(1, Ordering::Relaxed); + } + ConnectionPeerType::Staked(_) => { + stats + .throttled_staked_streams + .fetch_add(1, Ordering::Relaxed); } - stream_load_ema.increment_load(params.peer_type); - stream_counter.stream_count.fetch_add(1, Ordering::Relaxed); - stats.total_streams.fetch_add(1, Ordering::Relaxed); - stats.total_new_streams.fetch_add(1, Ordering::Relaxed); - let stream_exit = stream_exit.clone(); - let stats = stats.clone(); - let packet_sender = params.packet_sender.clone(); - let last_update = last_update.clone(); - let stream_load_ema = stream_load_ema.clone(); - tokio::spawn(async move { - let mut maybe_batch = None; - // The min is to guard against a value too small which can wake up unnecessarily - // frequently and wasting CPU cycles. The max guard against waiting for too long - // which delay exit and cause some test failures when the timeout value is large. - // Within this value, the heuristic is to wake up 10 times to check for exit - // for the set timeout if there are no data. - let exit_check_interval = (wait_for_chunk_timeout / 10) - .clamp(Duration::from_millis(10), Duration::from_secs(1)); - let mut start = Instant::now(); - while !stream_exit.load(Ordering::Relaxed) { - if let Ok(chunk) = tokio::time::timeout( - exit_check_interval, - stream.read_chunk(PACKET_DATA_SIZE, true), - ) - .await - { - if handle_chunk( - chunk, - &mut maybe_batch, - &remote_addr, - &packet_sender, - stats.clone(), - params.peer_type, - ) - .await - { - last_update.store(timing::timestamp(), Ordering::Relaxed); - break; - } - start = Instant::now(); - } else if start.elapsed() > wait_for_chunk_timeout { - debug!("Timeout in receiving on stream"); - stats - .total_stream_read_timeouts - .fetch_add(1, Ordering::Relaxed); - break; - } - } - stats.total_streams.fetch_sub(1, Ordering::Relaxed); - stream_load_ema.update_ema_if_needed(); - }); } - Err(e) => { - debug!("stream error: {:?}", e); + sleep(throttle_duration).await; + } + } + stream_load_ema.increment_load(params.peer_type); + stream_counter.stream_count.fetch_add(1, Ordering::Relaxed); + stats.total_streams.fetch_add(1, Ordering::Relaxed); + stats.total_new_streams.fetch_add(1, Ordering::Relaxed); + let cancel = cancel.clone(); + let stats = stats.clone(); + let packet_sender = params.packet_sender.clone(); + let last_update = last_update.clone(); + let stream_load_ema = stream_load_ema.clone(); + tokio::spawn(async move { + let mut maybe_batch = None; + loop { + // Read the next chunk, waiting up to `wait_for_chunk_timeout`. If we don't get a + // chunk before then, we assume the stream is dead and stop the stream task. This + // can only happen if there's severe packet loss or the peer stop sending for + // whatever reason. + let chunk = match tokio::select! { + chunk = tokio::time::timeout( + wait_for_chunk_timeout, + stream.read_chunk(PACKET_DATA_SIZE, true)) => chunk, + + // If the peer gets disconnected stop the task right away. + _ = cancel.cancelled() => break, + } { + // read_chunk returned success + Ok(Ok(chunk)) => chunk, + // read_chunk returned error + Ok(Err(e)) => { + debug!("Received stream error: {:?}", e); + stats + .total_stream_read_errors + .fetch_add(1, Ordering::Relaxed); + break; + } + // timeout elapsed + Err(_) => { + debug!("Timeout in receiving on stream"); + stats + .total_stream_read_timeouts + .fetch_add(1, Ordering::Relaxed); + break; + } + }; + + if handle_chunk( + chunk, + &mut maybe_batch, + &remote_addr, + &packet_sender, + stats.clone(), + params.peer_type, + ) + .await + { + last_update.store(timing::timestamp(), Ordering::Relaxed); break; } } - } + + stats.total_streams.fetch_sub(1, Ordering::Relaxed); + stream_load_ema.update_ema_if_needed(); + }); } let removed_connection_count = connection_table.lock().await.remove_connection( @@ -1168,130 +1175,119 @@ async fn handle_connection( // Return true if the server should drop the stream async fn handle_chunk( - chunk: Result, quinn::ReadError>, + maybe_chunk: Option, packet_accum: &mut Option, remote_addr: &SocketAddr, packet_sender: &AsyncSender, stats: Arc, peer_type: ConnectionPeerType, ) -> bool { - match chunk { - Ok(maybe_chunk) => { - if let Some(chunk) = maybe_chunk { - trace!("got chunk: {:?}", chunk); - let chunk_len = chunk.bytes.len() as u64; - - // shouldn't happen, but sanity check the size and offsets - if chunk.offset > PACKET_DATA_SIZE as u64 || chunk_len > PACKET_DATA_SIZE as u64 { - stats.total_invalid_chunks.fetch_add(1, Ordering::Relaxed); - return true; - } - let Some(end_of_chunk) = chunk.offset.checked_add(chunk_len) else { - return true; - }; - if end_of_chunk > PACKET_DATA_SIZE as u64 { - stats - .total_invalid_chunk_size - .fetch_add(1, Ordering::Relaxed); - return true; - } + if let Some(chunk) = maybe_chunk { + trace!("got chunk: {:?}", chunk); + let chunk_len = chunk.bytes.len() as u64; - // chunk looks valid - if packet_accum.is_none() { - let mut meta = Meta::default(); - meta.set_socket_addr(remote_addr); - meta.set_from_staked_node(matches!(peer_type, ConnectionPeerType::Staked(_))); - *packet_accum = Some(PacketAccumulator { - meta, - chunks: SmallVec::new(), - start_time: Instant::now(), - }); - } + // shouldn't happen, but sanity check the size and offsets + if chunk.offset > PACKET_DATA_SIZE as u64 || chunk_len > PACKET_DATA_SIZE as u64 { + stats.total_invalid_chunks.fetch_add(1, Ordering::Relaxed); + return true; + } + let Some(end_of_chunk) = chunk.offset.checked_add(chunk_len) else { + return true; + }; + if end_of_chunk > PACKET_DATA_SIZE as u64 { + stats + .total_invalid_chunk_size + .fetch_add(1, Ordering::Relaxed); + return true; + } - if let Some(accum) = packet_accum.as_mut() { - let offset = chunk.offset; - let Some(end_of_chunk) = (chunk.offset as usize).checked_add(chunk.bytes.len()) - else { - return true; - }; - accum.chunks.push(PacketChunk { - bytes: chunk.bytes, - offset: offset as usize, - end_of_chunk, - }); - - accum.meta.size = std::cmp::max(accum.meta.size, end_of_chunk); - } + // chunk looks valid + if packet_accum.is_none() { + let mut meta = Meta::default(); + meta.set_socket_addr(remote_addr); + meta.set_from_staked_node(matches!(peer_type, ConnectionPeerType::Staked(_))); + *packet_accum = Some(PacketAccumulator { + meta, + chunks: SmallVec::new(), + start_time: Instant::now(), + }); + } - if peer_type.is_staked() { - stats - .total_staked_chunks_received - .fetch_add(1, Ordering::Relaxed); - } else { - stats - .total_unstaked_chunks_received - .fetch_add(1, Ordering::Relaxed); - } + if let Some(accum) = packet_accum.as_mut() { + let offset = chunk.offset; + let Some(end_of_chunk) = (chunk.offset as usize).checked_add(chunk.bytes.len()) else { + return true; + }; + accum.chunks.push(PacketChunk { + bytes: chunk.bytes, + offset: offset as usize, + end_of_chunk, + }); + + accum.meta.size = std::cmp::max(accum.meta.size, end_of_chunk); + } + + if peer_type.is_staked() { + stats + .total_staked_chunks_received + .fetch_add(1, Ordering::Relaxed); + } else { + stats + .total_unstaked_chunks_received + .fetch_add(1, Ordering::Relaxed); + } + } else { + // done receiving chunks + trace!("chunk is none"); + if let Some(accum) = packet_accum.take() { + let bytes_sent = accum.meta.size; + let chunks_sent = accum.chunks.len(); + + if let Err(err) = packet_sender.send(accum).await { + stats + .total_handle_chunk_to_packet_batcher_send_err + .fetch_add(1, Ordering::Relaxed); + trace!("packet batch send error {:?}", err); } else { - // done receiving chunks - trace!("chunk is none"); - if let Some(accum) = packet_accum.take() { - let bytes_sent = accum.meta.size; - let chunks_sent = accum.chunks.len(); + stats + .total_packets_sent_for_batching + .fetch_add(1, Ordering::Relaxed); + stats + .total_bytes_sent_for_batching + .fetch_add(bytes_sent, Ordering::Relaxed); + stats + .total_chunks_sent_for_batching + .fetch_add(chunks_sent, Ordering::Relaxed); - if let Err(err) = packet_sender.send(accum).await { + match peer_type { + ConnectionPeerType::Unstaked => { stats - .total_handle_chunk_to_packet_batcher_send_err + .total_unstaked_packets_sent_for_batching .fetch_add(1, Ordering::Relaxed); - trace!("packet batch send error {:?}", err); - } else { + } + ConnectionPeerType::Staked(_) => { stats - .total_packets_sent_for_batching + .total_staked_packets_sent_for_batching .fetch_add(1, Ordering::Relaxed); - stats - .total_bytes_sent_for_batching - .fetch_add(bytes_sent, Ordering::Relaxed); - stats - .total_chunks_sent_for_batching - .fetch_add(chunks_sent, Ordering::Relaxed); - - match peer_type { - ConnectionPeerType::Unstaked => { - stats - .total_unstaked_packets_sent_for_batching - .fetch_add(1, Ordering::Relaxed); - } - ConnectionPeerType::Staked(_) => { - stats - .total_staked_packets_sent_for_batching - .fetch_add(1, Ordering::Relaxed); - } - } - - trace!("sent {} byte packet for batching", bytes_sent); } - } else { - stats - .total_packet_batches_none - .fetch_add(1, Ordering::Relaxed); } - return true; + + trace!("sent {} byte packet for batching", bytes_sent); } - } - Err(e) => { - debug!("Received stream error: {:?}", e); + } else { stats - .total_stream_read_errors + .total_packet_batches_none .fetch_add(1, Ordering::Relaxed); - return true; } + return true; } + false } #[derive(Debug)] struct ConnectionEntry { - exit: Arc, + cancel: CancellationToken, peer_type: ConnectionPeerType, last_update: Arc, port: u16, @@ -1303,7 +1299,7 @@ struct ConnectionEntry { impl ConnectionEntry { fn new( - exit: Arc, + cancel: CancellationToken, peer_type: ConnectionPeerType, last_update: Arc, port: u16, @@ -1312,7 +1308,7 @@ impl ConnectionEntry { stream_counter: Arc, ) -> Self { Self { - exit, + cancel, peer_type, last_update, port, @@ -1342,7 +1338,7 @@ impl Drop for ConnectionEntry { CONNECTION_CLOSE_REASON_DROPPED_ENTRY, ); } - self.exit.store(true, Ordering::Relaxed); + self.cancel.cancel(); } } @@ -1431,7 +1427,7 @@ impl ConnectionTable { max_connections_per_peer: usize, ) -> Option<( Arc, - Arc, + CancellationToken, Arc, )> { let connection_entry = self.table.entry(key).or_default(); @@ -1441,14 +1437,14 @@ impl ConnectionTable { .map(|c| c <= max_connections_per_peer) .unwrap_or(false); if has_connection_capacity { - let exit = Arc::new(AtomicBool::new(false)); + let cancel = CancellationToken::new(); let last_update = Arc::new(AtomicU64::new(last_update)); let stream_counter = connection_entry .first() .map(|entry| entry.stream_counter.clone()) .unwrap_or(Arc::new(ConnectionStreamCounter::new())); connection_entry.push(ConnectionEntry::new( - exit.clone(), + cancel.clone(), peer_type, last_update.clone(), port, @@ -1457,7 +1453,7 @@ impl ConnectionTable { stream_counter.clone(), )); self.total_size += 1; - Some((last_update, exit, stream_counter)) + Some((last_update, cancel, stream_counter)) } else { if let Some(connection) = connection { connection.close( @@ -1785,7 +1781,7 @@ pub mod test { s1.write_all(&[0u8]).await.unwrap_or_default(); // Wait long enough for the stream to timeout in receiving chunks - let sleep_time = Duration::from_secs(3).min(WAIT_FOR_STREAM_TIMEOUT * 1000); + let sleep_time = DEFAULT_WAIT_FOR_CHUNK_TIMEOUT * 2; sleep(sleep_time).await; // Test that the stream was created, but timed out in read @@ -1815,7 +1811,7 @@ pub mod test { join_handle.await.unwrap(); } - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn test_quic_server_multiple_connections_on_single_client_endpoint() { solana_logger::setup(); @@ -1864,11 +1860,13 @@ pub mod test { CONNECTION_CLOSE_CODE_DROPPED_ENTRY.into(), CONNECTION_CLOSE_REASON_DROPPED_ENTRY, ); - // Wait long enough for the stream to timeout in receiving chunks - let sleep_time = Duration::from_secs(1).min(WAIT_FOR_STREAM_TIMEOUT * 1000); - sleep(sleep_time).await; - assert_eq!(stats.connection_removed.load(Ordering::Relaxed), 1); + let start = Instant::now(); + while stats.connection_removed.load(Ordering::Relaxed) != 1 { + debug!("First connection not removed yet"); + sleep(Duration::from_millis(10)).await; + } + assert!(start.elapsed().as_secs() < 1); s2.write_all(&[0u8]).await.unwrap(); s2.finish().unwrap(); @@ -1877,11 +1875,13 @@ pub mod test { CONNECTION_CLOSE_CODE_DROPPED_ENTRY.into(), CONNECTION_CLOSE_REASON_DROPPED_ENTRY, ); - // Wait long enough for the stream to timeout in receiving chunks - let sleep_time = Duration::from_secs(1).min(WAIT_FOR_STREAM_TIMEOUT * 1000); - sleep(sleep_time).await; - assert_eq!(stats.connection_removed.load(Ordering::Relaxed), 2); + let start = Instant::now(); + while stats.connection_removed.load(Ordering::Relaxed) != 2 { + debug!("Second connection not removed yet"); + sleep(Duration::from_millis(10)).await; + } + assert!(start.elapsed().as_secs() < 1); exit.store(true, Ordering::Relaxed); join_handle.await.unwrap(); diff --git a/streamer/src/nonblocking/testing_utilities.rs b/streamer/src/nonblocking/testing_utilities.rs index ab87334c7cc4c9..d569190f10d16d 100644 --- a/streamer/src/nonblocking/testing_utilities.rs +++ b/streamer/src/nonblocking/testing_utilities.rs @@ -3,6 +3,7 @@ use { super::quic::{ spawn_server_multi, SpawnNonBlockingServerResult, ALPN_TPU_PROTOCOL_ID, DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, DEFAULT_MAX_STREAMS_PER_MS, + DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, }, crate::{ quic::{StreamerStats, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS}, @@ -23,7 +24,6 @@ use { std::{ net::{SocketAddr, UdpSocket}, sync::{atomic::AtomicBool, Arc, RwLock}, - time::Duration, }, tokio::task::JoinHandle, }; @@ -201,7 +201,7 @@ pub fn setup_quic_server_with_sockets( max_unstaked_connections, max_streams_per_ms, max_connections_per_ipaddr_per_minute, - Duration::from_secs(2), + DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) .unwrap();