Skip to content

Commit

Permalink
Fix broken parallelism in quic-client
Browse files Browse the repository at this point in the history
Fixes excessive fragmentation by TPU clients leading to a large
number of streams per conn in 'sending' state simultaneously.
This, in turn, requires excessive in-memory buffering server-side
to reassemble fragmented transactions.

- Simplifies QuicClient::send_batch to enqueue send operations
  in sequential order
- Removes the "max_parallel_streams" config option

The quic-client now produces an ordered fragment stream when
scheduling send operations from a single-thread.
  • Loading branch information
riptl committed Aug 9, 2024
1 parent e769f3a commit 12e9caf
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 97 deletions.
58 changes: 3 additions & 55 deletions quic-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,7 @@ use {
pubkey::Pubkey,
signature::{Keypair, Signer},
},
solana_streamer::{
nonblocking::quic::{compute_max_allowed_uni_streams, ConnectionPeerType},
streamer::StakedNodes,
tls_certificates::new_dummy_x509_certificate,
},
solana_streamer::{streamer::StakedNodes, tls_certificates::new_dummy_x509_certificate},
std::{
net::{IpAddr, SocketAddr},
sync::{Arc, RwLock},
Expand Down Expand Up @@ -65,13 +61,12 @@ impl ConnectionPool for QuicPool {

fn create_pool_entry(
&self,
config: &Self::NewConnectionConfig,
_config: &Self::NewConnectionConfig,
addr: &SocketAddr,
) -> Arc<Self::BaseClientConnection> {
Arc::new(Quic(Arc::new(QuicClient::new(
self.endpoint.clone(),
*addr,
config.compute_max_parallel_streams(),
))))
}
}
Expand Down Expand Up @@ -120,24 +115,6 @@ impl QuicConfig {
QuicLazyInitializedEndpoint::new(cert_guard.clone(), self.client_endpoint.as_ref().cloned())
}

fn compute_max_parallel_streams(&self) -> usize {
let (client_type, total_stake) =
self.maybe_client_pubkey
.map_or((ConnectionPeerType::Unstaked, 0), |pubkey| {
self.maybe_staked_nodes.as_ref().map_or(
(ConnectionPeerType::Unstaked, 0),
|stakes| {
let rstakes = stakes.read().unwrap();
rstakes.get_node_stake(&pubkey).map_or(
(ConnectionPeerType::Unstaked, rstakes.total_stake()),
|stake| (ConnectionPeerType::Staked(stake), rstakes.total_stake()),
)
},
)
});
compute_max_allowed_uni_streams(client_type, total_stake)
}

pub fn update_client_certificate(&mut self, keypair: &Keypair, _ipaddr: IpAddr) {
let (cert, priv_key) = new_dummy_x509_certificate(keypair);

Expand Down Expand Up @@ -253,56 +230,27 @@ pub fn new_quic_connection_cache(

#[cfg(test)]
mod tests {
use {
super::*,
solana_sdk::quic::{
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, QUIC_MIN_STAKED_CONCURRENT_STREAMS,
QUIC_TOTAL_STAKED_CONCURRENT_STREAMS,
},
std::collections::HashMap,
};
use {super::*, std::collections::HashMap};

#[test]
fn test_connection_cache_max_parallel_chunks() {
solana_logger::setup();

let mut connection_config = QuicConfig::new().unwrap();
assert_eq!(
connection_config.compute_max_parallel_streams(),
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
);

let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let pubkey = Pubkey::new_unique();
connection_config.set_staked_nodes(&staked_nodes, &pubkey);
assert_eq!(
connection_config.compute_max_parallel_streams(),
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
);
let overrides = HashMap::<Pubkey, u64>::default();
let mut stakes = HashMap::from([(Pubkey::new_unique(), 10_000)]);
*staked_nodes.write().unwrap() =
StakedNodes::new(Arc::new(stakes.clone()), overrides.clone());
assert_eq!(
connection_config.compute_max_parallel_streams(),
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
);

stakes.insert(pubkey, 1);
*staked_nodes.write().unwrap() =
StakedNodes::new(Arc::new(stakes.clone()), overrides.clone());
let delta =
(QUIC_TOTAL_STAKED_CONCURRENT_STREAMS - QUIC_MIN_STAKED_CONCURRENT_STREAMS) as f64;

assert_eq!(
connection_config.compute_max_parallel_streams(),
(QUIC_MIN_STAKED_CONCURRENT_STREAMS as f64 + (1f64 / 10000f64) * delta) as usize
);
stakes.insert(pubkey, 1_000);
*staked_nodes.write().unwrap() = StakedNodes::new(Arc::new(stakes.clone()), overrides);
assert_ne!(
connection_config.compute_max_parallel_streams(),
QUIC_MIN_STAKED_CONCURRENT_STREAMS
);
}
}
50 changes: 8 additions & 42 deletions quic-client/src/nonblocking/quic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
use {
async_mutex::Mutex,
async_trait::async_trait,
futures::future::{join_all, TryFutureExt},
itertools::Itertools,
futures::future::TryFutureExt,
log::*,
quinn::{
ClientConfig, ConnectError, Connection, ConnectionError, Endpoint, EndpointConfig,
Expand All @@ -19,10 +18,7 @@ use {
solana_net_utils::VALIDATOR_PORT_RANGE,
solana_rpc_client_api::client_error::ErrorKind as ClientErrorKind,
solana_sdk::{
quic::{
QUIC_CONNECTION_HANDSHAKE_TIMEOUT, QUIC_KEEP_ALIVE, QUIC_MAX_TIMEOUT,
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS,
},
quic::{QUIC_CONNECTION_HANDSHAKE_TIMEOUT, QUIC_KEEP_ALIVE, QUIC_MAX_TIMEOUT},
signature::Keypair,
transport::Result as TransportResult,
},
Expand Down Expand Up @@ -249,21 +245,15 @@ pub struct QuicClient {
connection: Arc<Mutex<Option<QuicNewConnection>>>,
addr: SocketAddr,
stats: Arc<ClientStats>,
chunk_size: usize,
}

impl QuicClient {
pub fn new(
endpoint: Arc<QuicLazyInitializedEndpoint>,
addr: SocketAddr,
chunk_size: usize,
) -> Self {
pub fn new(endpoint: Arc<QuicLazyInitializedEndpoint>, addr: SocketAddr) -> Self {
Self {
endpoint,
connection: Arc::new(Mutex::new(None)),
addr,
stats: Arc::new(ClientStats::default()),
chunk_size,
}
}

Expand All @@ -272,9 +262,9 @@ impl QuicClient {
connection: &Connection,
) -> Result<(), QuicError> {
let mut send_stream = connection.open_uni().await?;

send_stream.write_all(data).await?;
send_stream.finish().await?;
// Finish sending this stream before sending any new stream
_ = send_stream.set_priority(1000);
Ok(())
}

Expand Down Expand Up @@ -486,28 +476,8 @@ impl QuicClient {
.await
.map_err(Into::<ClientErrorKind>::into)?;

// Used to avoid dereferencing the Arc multiple times below
// by just getting a reference to the NewConnection once
let connection_ref: &Connection = &connection;

let chunks = buffers[1..buffers.len()].iter().chunks(self.chunk_size);

let futures: Vec<_> = chunks
.into_iter()
.map(|buffs| {
join_all(
buffs
.into_iter()
.map(|buf| Self::_send_buffer_using_conn(buf.as_ref(), connection_ref)),
)
})
.collect();

for f in futures {
f.await
.into_iter()
.try_for_each(|res| res)
.map_err(Into::<ClientErrorKind>::into)?;
for data in buffers[1..buffers.len()].iter() {
Self::_send_buffer_using_conn(data.as_ref(), &connection).await?;
}
Ok(())
}
Expand Down Expand Up @@ -540,11 +510,7 @@ impl QuicClientConnection {
addr: SocketAddr,
connection_stats: Arc<ConnectionCacheStats>,
) -> Self {
let client = Arc::new(QuicClient::new(
endpoint,
addr,
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS,
));
let client = Arc::new(QuicClient::new(endpoint, addr));
Self::new_with_client(client, connection_stats)
}

Expand Down

0 comments on commit 12e9caf

Please sign in to comment.