Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support max conncurrent connections #3031

Merged
102 changes: 99 additions & 3 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use {
},
solana_transaction_metrics_tracker::signature_if_should_track_packet,
std::{
fmt,
iter::repeat_with,
net::{IpAddr, SocketAddr, UdpSocket},
pin::Pin,
Expand Down Expand Up @@ -195,8 +196,7 @@ pub fn spawn_server_multi(
coalesce: Duration,
) -> Result<SpawnNonBlockingServerResult, QuicServerError> {
info!("Start {name} quic server on {sockets:?}");
let concurrent_connections =
(max_staked_connections + max_unstaked_connections) / sockets.len();
let concurrent_connections = max_staked_connections + max_unstaked_connections;
let max_concurrent_connections = concurrent_connections + concurrent_connections / 4;
let (config, _) = configure_server(keypair)?;

Expand Down Expand Up @@ -227,6 +227,7 @@ pub fn spawn_server_multi(
stats.clone(),
wait_for_chunk_timeout,
coalesce,
max_concurrent_connections,
));
Ok(SpawnNonBlockingServerResult {
endpoints,
Expand All @@ -236,6 +237,43 @@ pub fn spawn_server_multi(
})
}

/// struct ease tracking connections of all stages, so that we do not have to
/// litter the code with open connection tracking. This is added into the
/// connection table as part of the ConnectionEntry. The reference is auto
/// reduced when it is dropped.

struct ClientConnectionTracker {
stats: Arc<StreamerStats>,
}

/// This is required by ConnectionEntry for supporting debug format.
impl fmt::Debug for ClientConnectionTracker {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("StreamerClientConnection")
.field(
"open_connections:",
&self.stats.open_connections.load(Ordering::Relaxed),
)
.finish()
}
}

impl Drop for ClientConnectionTracker {
/// When this is dropped, reduce the open connection count.
fn drop(&mut self) {
self.stats.open_connections.fetch_sub(1, Ordering::Relaxed);
}
}

impl ClientConnectionTracker {
/// Create StreamerClientConnection and increase open connection count.
fn new(stats: Arc<StreamerStats>) -> Self {
stats.open_connections.fetch_add(1, Ordering::Relaxed);

Self { stats }
}
}

#[allow(clippy::too_many_arguments)]
async fn run_server(
name: &'static str,
Expand All @@ -251,6 +289,7 @@ async fn run_server(
stats: Arc<StreamerStats>,
wait_for_chunk_timeout: Duration,
coalesce: Duration,
max_concurrent_connections: usize,
) {
let rate_limiter = ConnectionRateLimiter::new(max_connections_per_ipaddr_per_min);
let overall_connection_rate_limiter =
Expand Down Expand Up @@ -290,6 +329,7 @@ async fn run_server(
})
})
.collect::<FuturesUnordered<_>>();

while !exit.load(Ordering::Relaxed) {
let timeout_connection = select! {
ready = accepts.next() => {
Expand Down Expand Up @@ -320,6 +360,7 @@ async fn run_server(
stats
.total_incoming_connection_attempts
.fetch_add(1, Ordering::Relaxed);

let remote_address = incoming.remote_address();

// first check overall connection rate limit:
Expand Down Expand Up @@ -354,6 +395,21 @@ async fn run_server(
continue;
}

let open_connections = stats.open_connections.load(Ordering::Relaxed);
lijunwangs marked this conversation as resolved.
Show resolved Hide resolved
if open_connections >= max_concurrent_connections {
debug!(
"There are too many concurrent connections opened already: open: {}, max: {}",
open_connections, max_concurrent_connections
);
stats
.refused_connections_too_many_open_connections
.fetch_add(1, Ordering::Relaxed);
incoming.refuse();

continue;
}
let client_connection_tracker = ClientConnectionTracker::new(stats.clone());

stats
.outstanding_incoming_connection_attempts
.fetch_add(1, Ordering::Relaxed);
Expand All @@ -362,6 +418,7 @@ async fn run_server(
Ok(connecting) => {
tokio::spawn(setup_connection(
connecting,
client_connection_tracker,
unstaked_connection_table.clone(),
staked_connection_table.clone(),
sender.clone(),
Expand Down Expand Up @@ -496,6 +553,7 @@ impl NewConnectionHandlerParams {
}

fn handle_and_cache_new_connection(
client_connection_tracker: ClientConnectionTracker,
connection: Connection,
mut connection_table_l: MutexGuard<ConnectionTable>,
connection_table: Arc<Mutex<ConnectionTable>>,
Expand Down Expand Up @@ -525,6 +583,7 @@ fn handle_and_cache_new_connection(
.try_add_connection(
ConnectionTableKey::new(remote_addr.ip(), params.remote_pubkey),
remote_addr.port(),
client_connection_tracker,
Some(connection.clone()),
params.peer_type,
timing::timestamp(),
Expand Down Expand Up @@ -571,6 +630,7 @@ fn handle_and_cache_new_connection(
}

async fn prune_unstaked_connections_and_add_new_connection(
client_connection_tracker: ClientConnectionTracker,
connection: Connection,
connection_table: Arc<Mutex<ConnectionTable>>,
max_connections: usize,
Expand All @@ -584,6 +644,7 @@ async fn prune_unstaked_connections_and_add_new_connection(
let mut connection_table = connection_table.lock().await;
prune_unstaked_connection_table(&mut connection_table, max_connections, stats);
handle_and_cache_new_connection(
client_connection_tracker,
connection,
connection_table,
connection_table_clone,
Expand Down Expand Up @@ -646,6 +707,7 @@ fn compute_recieve_window(
#[allow(clippy::too_many_arguments)]
async fn setup_connection(
connecting: Connecting,
client_connection_tracker: ClientConnectionTracker,
unstaked_connection_table: Arc<Mutex<ConnectionTable>>,
staked_connection_table: Arc<Mutex<ConnectionTable>>,
packet_sender: AsyncSender<PacketAccumulator>,
Expand Down Expand Up @@ -712,6 +774,7 @@ async fn setup_connection(

if connection_table_l.total_size < max_staked_connections {
if let Ok(()) = handle_and_cache_new_connection(
client_connection_tracker,
new_connection,
connection_table_l,
staked_connection_table.clone(),
Expand All @@ -728,6 +791,7 @@ async fn setup_connection(
// put this connection in the unstaked connection table. If needed, prune a
// connection from the unstaked connection table.
if let Ok(()) = prune_unstaked_connections_and_add_new_connection(
client_connection_tracker,
new_connection,
unstaked_connection_table.clone(),
max_unstaked_connections,
Expand All @@ -752,6 +816,7 @@ async fn setup_connection(
}
ConnectionPeerType::Unstaked => {
if let Ok(()) = prune_unstaked_connections_and_add_new_connection(
client_connection_tracker,
new_connection,
unstaked_connection_table.clone(),
max_unstaked_connections,
Expand Down Expand Up @@ -1226,6 +1291,9 @@ struct ConnectionEntry {
peer_type: ConnectionPeerType,
last_update: Arc<AtomicU64>,
port: u16,
#[allow(dead_code)]
// We do not explicitly use it, but its drop is triggered when ConnectionEntry is dropped.
client_connection_tracker: ClientConnectionTracker,
lijunwangs marked this conversation as resolved.
Show resolved Hide resolved
connection: Option<Connection>,
stream_counter: Arc<ConnectionStreamCounter>,
}
Expand All @@ -1236,6 +1304,7 @@ impl ConnectionEntry {
peer_type: ConnectionPeerType,
last_update: Arc<AtomicU64>,
port: u16,
client_connection_tracker: ClientConnectionTracker,
connection: Option<Connection>,
stream_counter: Arc<ConnectionStreamCounter>,
) -> Self {
Expand All @@ -1244,6 +1313,7 @@ impl ConnectionEntry {
peer_type,
last_update,
port,
client_connection_tracker,
connection,
stream_counter,
}
Expand Down Expand Up @@ -1334,7 +1404,7 @@ impl ConnectionTable {
})
.map(|index| {
let connection = self.table[index].first();
let stake = connection.map(|connection| connection.stake());
let stake = connection.map(|connection: &ConnectionEntry| connection.stake());
(index, stake)
})
.take(sample_size)
Expand All @@ -1351,6 +1421,7 @@ impl ConnectionTable {
&mut self,
key: ConnectionTableKey,
port: u16,
client_connection_tracker: ClientConnectionTracker,
connection: Option<Connection>,
peer_type: ConnectionPeerType,
last_update: u64,
Expand Down Expand Up @@ -1378,6 +1449,7 @@ impl ConnectionTable {
peer_type,
last_update.clone(),
port,
client_connection_tracker,
connection,
stream_counter.clone(),
));
Expand Down Expand Up @@ -2002,11 +2074,13 @@ pub mod test {
let sockets: Vec<_> = (0..num_entries)
.map(|i| SocketAddr::new(IpAddr::V4(Ipv4Addr::new(i, 0, 0, 0)), 0))
.collect();
let stats = Arc::new(StreamerStats::default());
for (i, socket) in sockets.iter().enumerate() {
table
.try_add_connection(
ConnectionTableKey::IP(socket.ip()),
socket.port(),
ClientConnectionTracker::new(stats.clone()),
None,
ConnectionPeerType::Unstaked,
i as u64,
Expand All @@ -2019,6 +2093,7 @@ pub mod test {
.try_add_connection(
ConnectionTableKey::IP(sockets[0].ip()),
sockets[0].port(),
ClientConnectionTracker::new(stats.clone()),
None,
ConnectionPeerType::Unstaked,
5,
Expand All @@ -2040,6 +2115,7 @@ pub mod test {
table.remove_connection(ConnectionTableKey::IP(socket.ip()), socket.port(), 0);
}
assert_eq!(table.total_size, 0);
assert_eq!(stats.open_connections.load(Ordering::Relaxed), 0);
}

#[test]
Expand All @@ -2051,13 +2127,15 @@ pub mod test {
// from a different peer pubkey.
let num_entries = 15;
let max_connections_per_peer = 10;
let stats = Arc::new(StreamerStats::default());

let pubkeys: Vec<_> = (0..num_entries).map(|_| Pubkey::new_unique()).collect();
for (i, pubkey) in pubkeys.iter().enumerate() {
table
.try_add_connection(
ConnectionTableKey::Pubkey(*pubkey),
0,
ClientConnectionTracker::new(stats.clone()),
None,
ConnectionPeerType::Unstaked,
i as u64,
Expand All @@ -2075,6 +2153,7 @@ pub mod test {
table.remove_connection(ConnectionTableKey::Pubkey(*pubkey), 0, 0);
}
assert_eq!(table.total_size, 0);
assert_eq!(stats.open_connections.load(Ordering::Relaxed), 0);
}

#[test]
Expand All @@ -2084,11 +2163,14 @@ pub mod test {

let max_connections_per_peer = 10;
let pubkey = Pubkey::new_unique();
let stats: Arc<StreamerStats> = Arc::new(StreamerStats::default());

(0..max_connections_per_peer).for_each(|i| {
table
.try_add_connection(
ConnectionTableKey::Pubkey(pubkey),
0,
ClientConnectionTracker::new(stats.clone()),
None,
ConnectionPeerType::Unstaked,
i as u64,
Expand All @@ -2103,6 +2185,7 @@ pub mod test {
.try_add_connection(
ConnectionTableKey::Pubkey(pubkey),
0,
ClientConnectionTracker::new(stats.clone()),
None,
ConnectionPeerType::Unstaked,
10,
Expand All @@ -2117,6 +2200,7 @@ pub mod test {
.try_add_connection(
ConnectionTableKey::Pubkey(pubkey2),
0,
ClientConnectionTracker::new(stats.clone()),
None,
ConnectionPeerType::Unstaked,
10,
Expand All @@ -2134,6 +2218,7 @@ pub mod test {

table.remove_connection(ConnectionTableKey::Pubkey(pubkey2), 0, 0);
assert_eq!(table.total_size, 0);
assert_eq!(stats.open_connections.load(Ordering::Relaxed), 0);
}

#[test]
Expand All @@ -2146,11 +2231,14 @@ pub mod test {
let sockets: Vec<_> = (0..num_entries)
.map(|i| SocketAddr::new(IpAddr::V4(Ipv4Addr::new(i, 0, 0, 0)), 0))
.collect();
let stats: Arc<StreamerStats> = Arc::new(StreamerStats::default());

for (i, socket) in sockets.iter().enumerate() {
table
.try_add_connection(
ConnectionTableKey::IP(socket.ip()),
socket.port(),
ClientConnectionTracker::new(stats.clone()),
None,
ConnectionPeerType::Staked((i + 1) as u64),
i as u64,
Expand All @@ -2171,6 +2259,8 @@ pub mod test {
num_entries as u64 + 1, // threshold_stake
);
assert_eq!(pruned, 1);
// We had 5 connections and pruned 1, we should have 4 left
assert_eq!(stats.open_connections.load(Ordering::Relaxed), 4);
}

#[test]
Expand All @@ -2183,11 +2273,14 @@ pub mod test {
let mut sockets: Vec<_> = (0..num_ips)
.map(|i| SocketAddr::new(IpAddr::V4(Ipv4Addr::new(i, 0, 0, 0)), 0))
.collect();
let stats: Arc<StreamerStats> = Arc::new(StreamerStats::default());

for (i, socket) in sockets.iter().enumerate() {
table
.try_add_connection(
ConnectionTableKey::IP(socket.ip()),
socket.port(),
ClientConnectionTracker::new(stats.clone()),
None,
ConnectionPeerType::Unstaked,
(i * 2) as u64,
Expand All @@ -2199,6 +2292,7 @@ pub mod test {
.try_add_connection(
ConnectionTableKey::IP(socket.ip()),
socket.port(),
ClientConnectionTracker::new(stats.clone()),
None,
ConnectionPeerType::Unstaked,
(i * 2 + 1) as u64,
Expand All @@ -2213,6 +2307,7 @@ pub mod test {
.try_add_connection(
ConnectionTableKey::IP(single_connection_addr.ip()),
single_connection_addr.port(),
ClientConnectionTracker::new(stats.clone()),
None,
ConnectionPeerType::Unstaked,
(num_ips * 2) as u64,
Expand All @@ -2230,6 +2325,7 @@ pub mod test {
table.remove_connection(ConnectionTableKey::IP(socket.ip()), socket.port(), 0);
}
assert_eq!(table.total_size, 0);
assert_eq!(stats.open_connections.load(Ordering::Relaxed), 0);
}

#[test]
Expand Down
Loading
Loading