Skip to content

Commit

Permalink
quic tpu: lower stream timeout from 10s to 2s
Browse files Browse the repository at this point in the history
Don't wait 10s for one piece of a transaction to come before timing out.
Lower the timeout to 2s, after which we assume a stream is dead. 2s is
enough round trips to account for non catastrophic packet loss. If
packet loss is causing >2s stream latency, the connection is hosed and
the best thing we can do is save server resources for peers with better
connectivity.
  • Loading branch information
alessandrod committed Oct 16, 2024
1 parent a302c43 commit ce3cc60
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 26 deletions.
38 changes: 14 additions & 24 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ use {
tokio_util::sync::CancellationToken,
};

pub const DEFAULT_WAIT_FOR_CHUNK_TIMEOUT: Duration = Duration::from_secs(10);
pub const DEFAULT_WAIT_FOR_CHUNK_TIMEOUT: Duration = Duration::from_secs(2);

pub const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu";

Expand Down Expand Up @@ -1103,19 +1103,14 @@ async fn handle_connection(
let stream_load_ema = stream_load_ema.clone();
tokio::spawn(async move {
let mut maybe_batch = None;
// The min is to guard against a value too small which can wake up unnecessarily
// frequently and wasting CPU cycles. The max guard against waiting for too long
// which delay exit and cause some test failures when the timeout value is large.
// Within this value, the heuristic is to wake up 10 times to check for exit
// for the set timeout if there are no data.
let exit_check_interval = (wait_for_chunk_timeout / 10)
.clamp(Duration::from_millis(10), Duration::from_secs(1));
let mut start = Instant::now();
loop {
// Read the next chunk
// Read the next chunk, waiting up to `wait_for_chunk_timeout`. If we don't get a
// chunk before then, we assume the stream is dead and stop the stream task. This
// can only happen if there's severe packet loss or the peer stop sending for
// whatever reason.
let chunk = match tokio::select! {
chunk = tokio::time::timeout(
exit_check_interval,
wait_for_chunk_timeout,
stream.read_chunk(PACKET_DATA_SIZE, true)) => chunk,

// If the peer gets disconnected stop the task right away.
Expand All @@ -1133,15 +1128,11 @@ async fn handle_connection(
}
// timeout elapsed
Err(_) => {
if start.elapsed() >= wait_for_chunk_timeout {
debug!("Timeout in receiving on stream");
stats
.total_stream_read_timeouts
.fetch_add(1, Ordering::Relaxed);
break;
} else {
continue;
}
debug!("Timeout in receiving on stream");
stats
.total_stream_read_timeouts
.fetch_add(1, Ordering::Relaxed);
break;
}
};

Expand All @@ -1158,7 +1149,6 @@ async fn handle_connection(
last_update.store(timing::timestamp(), Ordering::Relaxed);
break;
}
start = Instant::now();
}

stats.total_streams.fetch_sub(1, Ordering::Relaxed);
Expand Down Expand Up @@ -1791,7 +1781,7 @@ pub mod test {
s1.write_all(&[0u8]).await.unwrap_or_default();

// Wait long enough for the stream to timeout in receiving chunks
let sleep_time = Duration::from_secs(3).min(WAIT_FOR_STREAM_TIMEOUT * 1000);
let sleep_time = DEFAULT_WAIT_FOR_CHUNK_TIMEOUT * 2;
sleep(sleep_time).await;

// Test that the stream was created, but timed out in read
Expand Down Expand Up @@ -1871,7 +1861,7 @@ pub mod test {
CONNECTION_CLOSE_REASON_DROPPED_ENTRY,
);
// Wait long enough for the stream to timeout in receiving chunks
let sleep_time = Duration::from_secs(1).min(WAIT_FOR_STREAM_TIMEOUT * 1000);
let sleep_time = DEFAULT_WAIT_FOR_CHUNK_TIMEOUT * 2;
sleep(sleep_time).await;

assert_eq!(stats.connection_removed.load(Ordering::Relaxed), 1);
Expand All @@ -1884,7 +1874,7 @@ pub mod test {
CONNECTION_CLOSE_REASON_DROPPED_ENTRY,
);
// Wait long enough for the stream to timeout in receiving chunks
let sleep_time = Duration::from_secs(1).min(WAIT_FOR_STREAM_TIMEOUT * 1000);
let sleep_time = DEFAULT_WAIT_FOR_CHUNK_TIMEOUT * 2;
sleep(sleep_time).await;

assert_eq!(stats.connection_removed.load(Ordering::Relaxed), 2);
Expand Down
4 changes: 2 additions & 2 deletions streamer/src/nonblocking/testing_utilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use {
super::quic::{
spawn_server_multi, SpawnNonBlockingServerResult, ALPN_TPU_PROTOCOL_ID,
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
},
crate::{
quic::{StreamerStats, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS},
Expand All @@ -23,7 +24,6 @@ use {
std::{
net::{SocketAddr, UdpSocket},
sync::{atomic::AtomicBool, Arc, RwLock},
time::Duration,
},
tokio::task::JoinHandle,
};
Expand Down Expand Up @@ -201,7 +201,7 @@ pub fn setup_quic_server_with_sockets(
max_unstaked_connections,
max_streams_per_ms,
max_connections_per_ipaddr_per_minute,
Duration::from_secs(2),
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
.unwrap();
Expand Down

0 comments on commit ce3cc60

Please sign in to comment.