Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connection rate limiting #948

Merged
merged 14 commits into from
May 15, 2024
Merged
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions bench-tps/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use {
pubkey::Pubkey,
signature::{read_keypair_file, Keypair},
},
solana_streamer::nonblocking::quic::DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
solana_tpu_client::tpu_client::{DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC},
std::{
net::{IpAddr, Ipv4Addr},
Expand Down Expand Up @@ -68,6 +69,7 @@ pub struct Config {
pub external_client_type: ExternalClientType,
pub use_quic: bool,
pub tpu_connection_pool_size: usize,
pub tpu_max_connections_per_ipaddr_per_minute: u64,
pub compute_unit_price: Option<ComputeUnitPrice>,
pub skip_tx_account_data_size: bool,
pub use_durable_nonce: bool,
Expand Down Expand Up @@ -103,6 +105,8 @@ impl Default for Config {
external_client_type: ExternalClientType::default(),
use_quic: DEFAULT_TPU_USE_QUIC,
tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
tpu_max_connections_per_ipaddr_per_minute:
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
compute_unit_price: None,
skip_tx_account_data_size: false,
use_durable_nonce: false,
Expand Down
6 changes: 5 additions & 1 deletion client/src/connection_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,10 @@ mod tests {
crossbeam_channel::unbounded,
solana_sdk::{net::DEFAULT_TPU_COALESCE, signature::Keypair},
solana_streamer::{
nonblocking::quic::{DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT},
nonblocking::quic::{
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
},
quic::SpawnServerResult,
streamer::StakedNodes,
},
Expand Down Expand Up @@ -272,6 +275,7 @@ mod tests {
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down
3 changes: 3 additions & 0 deletions core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ impl Tpu {
banking_tracer: Arc<BankingTracer>,
tracer_thread_hdl: TracerThread,
tpu_enable_udp: bool,
tpu_max_connections_per_ipaddr_per_minute: u64,
prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
block_production_method: BlockProductionMethod,
_generator_config: Option<GeneratorConfig>, /* vestigial code for replay invalidator */
Expand Down Expand Up @@ -164,6 +165,7 @@ impl Tpu {
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
tpu_max_connections_per_ipaddr_per_minute,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
tpu_coalesce,
)
Expand All @@ -185,6 +187,7 @@ impl Tpu {
MAX_STAKED_CONNECTIONS.saturating_add(MAX_UNSTAKED_CONNECTIONS),
0, // Prevent unstaked nodes from forwarding transactions
DEFAULT_MAX_STREAMS_PER_MS,
tpu_max_connections_per_ipaddr_per_minute,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
tpu_coalesce,
)
Expand Down
4 changes: 4 additions & 0 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,7 @@ impl Validator {
use_quic: bool,
tpu_connection_pool_size: usize,
tpu_enable_udp: bool,
tpu_max_connections_per_ipaddr_per_minute: u64,
admin_rpc_service_post_init: Arc<RwLock<Option<AdminRpcRequestMetadataPostInit>>>,
) -> Result<Self, String> {
let start_time = Instant::now();
Expand Down Expand Up @@ -1395,6 +1396,7 @@ impl Validator {
banking_tracer,
tracer_thread,
tpu_enable_udp,
tpu_max_connections_per_ipaddr_per_minute,
&prioritization_fee_cache,
config.block_production_method.clone(),
config.generator_config.clone(),
Expand Down Expand Up @@ -2583,6 +2585,7 @@ mod tests {
DEFAULT_TPU_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE,
DEFAULT_TPU_ENABLE_UDP,
32, // max connections per IpAddr per minute for test
Arc::new(RwLock::new(None)),
)
.expect("assume successful validator start");
Expand Down Expand Up @@ -2668,6 +2671,7 @@ mod tests {
DEFAULT_TPU_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE,
DEFAULT_TPU_ENABLE_UDP,
32, // max connections per IpAddr per minute for test
Arc::new(RwLock::new(None)),
)
.expect("assume successful validator start")
Expand Down
3 changes: 3 additions & 0 deletions local-cluster/src/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ impl LocalCluster {
DEFAULT_TPU_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE,
DEFAULT_TPU_ENABLE_UDP,
32, // max connections per IpAddr per minute
Arc::new(RwLock::new(None)),
)
.expect("assume successful validator start");
Expand Down Expand Up @@ -543,6 +544,7 @@ impl LocalCluster {
DEFAULT_TPU_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE,
DEFAULT_TPU_ENABLE_UDP,
32, // max connections per IpAddr per mintute
Arc::new(RwLock::new(None)),
)
.expect("assume successful validator start");
Expand Down Expand Up @@ -1013,6 +1015,7 @@ impl Cluster for LocalCluster {
DEFAULT_TPU_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE,
DEFAULT_TPU_ENABLE_UDP,
32, // max connections per IpAddr per minute, use higher value because of tests
Arc::new(RwLock::new(None)),
)
.expect("assume successful validator start");
Expand Down
1 change: 1 addition & 0 deletions programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion quic-client/tests/quic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ mod tests {
},
solana_sdk::{net::DEFAULT_TPU_COALESCE, packet::PACKET_DATA_SIZE, signature::Keypair},
solana_streamer::{
nonblocking::quic::{DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT},
nonblocking::quic::{
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
},
quic::SpawnServerResult,
streamer::StakedNodes,
tls_certificates::new_dummy_x509_certificate,
Expand Down Expand Up @@ -85,6 +88,7 @@ mod tests {
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down Expand Up @@ -170,6 +174,7 @@ mod tests {
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
Duration::from_secs(1), // wait_for_chunk_timeout
DEFAULT_TPU_COALESCE,
)
Expand Down Expand Up @@ -233,6 +238,7 @@ mod tests {
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down Expand Up @@ -262,6 +268,7 @@ mod tests {
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down
1 change: 1 addition & 0 deletions streamer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ edition = { workspace = true }
async-channel = { workspace = true }
bytes = { workspace = true }
crossbeam-channel = { workspace = true }
dashmap = { workspace = true }
futures-util = { workspace = true }
histogram = { workspace = true }
indexmap = { workspace = true }
Expand Down
99 changes: 99 additions & 0 deletions streamer/src/nonblocking/connection_rate_limiter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use {
crate::nonblocking::{keyed_rate_limiter::KeyedRateLimiter, rate_limiter::RateLimiter},
std::{net::IpAddr, time::Duration},
};

pub struct ConnectionRateLimiter {
limiter: KeyedRateLimiter<IpAddr>,
}

impl ConnectionRateLimiter {
/// Create a new rate limiter per IpAddr. The rate is specified as the count per minute to allow for
/// less frequent connections.
pub fn new(limit_per_minute: u64) -> Self {
Self {
limiter: KeyedRateLimiter::new(limit_per_minute, Duration::from_secs(60)),
}
}

/// Check if the connection from the said `ip` is allowed.
pub fn is_allowed(&self, ip: &IpAddr) -> bool {
// Acquire a permit from the rate limiter for the given IP address
if self.limiter.check_and_update(*ip) {
debug!("Request from IP {:?} allowed", ip);
true // Request allowed
} else {
debug!("Request from IP {:?} blocked", ip);
false // Request blocked
}
}

/// retain only keys whose throttle start date is within the throttle interval.
/// Otherwise drop them as inactive
pub fn retain_recent(&self) {
self.limiter.retain_recent()
}

/// Returns the number of "live" keys in the rate limiter.
pub fn len(&self) -> usize {
self.limiter.len()
}

/// Returns `true` if the rate limiter has no keys in it.
pub fn is_empty(&self) -> bool {
self.limiter.is_empty()
}
}

/// Connection rate limiter for enforcing connection rates from
/// all clients.
pub struct TotalConnectionRateLimiter {
lijunwangs marked this conversation as resolved.
Show resolved Hide resolved
limiter: RateLimiter,
}

impl TotalConnectionRateLimiter {
/// Create a new rate limiter. The rate is specified as the count per second.
pub fn new(limit_per_second: u64) -> Self {
Self {
limiter: RateLimiter::new(limit_per_second, Duration::from_secs(1)),
}
}

/// Check if a connection is allowed.
pub fn is_allowed(&mut self) -> bool {
self.limiter.check_and_update()
}
}

#[cfg(test)]
pub mod test {
use {super::*, std::net::Ipv4Addr};

#[tokio::test]
async fn test_total_connection_rate_limiter() {
let mut limiter = TotalConnectionRateLimiter::new(2);
assert!(limiter.is_allowed());
assert!(limiter.is_allowed());
assert!(!limiter.is_allowed());
}

#[tokio::test]
async fn test_connection_rate_limiter() {
let limiter = ConnectionRateLimiter::new(4);
let ip1 = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1));
assert!(limiter.is_allowed(&ip1));
assert!(limiter.is_allowed(&ip1));
assert!(limiter.is_allowed(&ip1));
assert!(limiter.is_allowed(&ip1));
assert!(!limiter.is_allowed(&ip1));

assert!(limiter.len() == 1);
let ip2 = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2));
assert!(limiter.is_allowed(&ip2));
assert!(limiter.len() == 2);
assert!(limiter.is_allowed(&ip2));
assert!(limiter.is_allowed(&ip2));
assert!(limiter.is_allowed(&ip2));
assert!(!limiter.is_allowed(&ip2));
}
}
104 changes: 104 additions & 0 deletions streamer/src/nonblocking/keyed_rate_limiter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
use {
crate::nonblocking::rate_limiter::RateLimiter,
dashmap::DashMap,
std::{hash::Hash, time::Duration},
};

pub struct KeyedRateLimiter<K> {
limiters: DashMap<K, RateLimiter>,
interval: Duration,
limit: u64,
}

impl<K> KeyedRateLimiter<K>
where
K: Eq + Hash,
{
/// Create a keyed rate limiter with `limit` count with a rate limit `interval`
pub fn new(limit: u64, interval: Duration) -> Self {
Self {
limiters: DashMap::default(),
interval,
limit,
}
}

/// Check if the connection from the said `key` is allowed to pass through the rate limiter.
/// When it is allowed, the rate limiter state is updated to reflect it has been
/// allowed. For a unique request, the caller should call it only once when it is allowed.
pub fn check_and_update(&self, key: K) -> bool {
let allowed = match self.limiters.entry(key) {
dashmap::mapref::entry::Entry::Occupied(mut entry) => {
let limiter = entry.get_mut();
limiter.check_and_update()
}
dashmap::mapref::entry::Entry::Vacant(entry) => entry
.insert(RateLimiter::new(self.limit, self.interval))
.value_mut()
.check_and_update(),
};
allowed
}

/// retain only keys whose throttle start date is within the throttle interval.
/// Otherwise drop them as inactive
pub fn retain_recent(&self) {
let now = tokio::time::Instant::now();
self.limiters.retain(|_key, limiter| {
now.duration_since(*limiter.throttle_start_instant()) <= self.interval
});
}

/// Returns the number of "live" keys in the rate limiter.
pub fn len(&self) -> usize {
self.limiters.len()
}

/// Returns `true` if the rate limiter has no keys in it.
pub fn is_empty(&self) -> bool {
self.limiters.is_empty()
}
}

#[cfg(test)]
pub mod test {
use {super::*, tokio::time::sleep};

#[allow(clippy::len_zero)]
#[tokio::test]
async fn test_rate_limiter() {
let limiter = KeyedRateLimiter::<u64>::new(2, Duration::from_millis(100));
assert!(limiter.len() == 0);
assert!(limiter.is_empty());
assert!(limiter.check_and_update(1));
assert!(limiter.check_and_update(1));
assert!(!limiter.check_and_update(1));
assert!(limiter.len() == 1);
assert!(limiter.check_and_update(2));
assert!(limiter.check_and_update(2));
assert!(!limiter.check_and_update(2));
assert!(limiter.len() == 2);

// sleep 150 ms, the throttle parameters should have been reset.
sleep(Duration::from_millis(150)).await;
assert!(limiter.len() == 2);

assert!(limiter.check_and_update(1));
assert!(limiter.check_and_update(1));
assert!(!limiter.check_and_update(1));

assert!(limiter.check_and_update(2));
assert!(limiter.check_and_update(2));
assert!(!limiter.check_and_update(2));
assert!(limiter.len() == 2);

// sleep another 150 and clean outdatated, key 2 will be removed
sleep(Duration::from_millis(150)).await;
assert!(limiter.check_and_update(1));
assert!(limiter.check_and_update(1));
assert!(!limiter.check_and_update(1));

limiter.retain_recent();
assert!(limiter.len() == 1);
}
}
Loading
Loading