diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 0aeb306c208108..0cdd15a9a7bc7a 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -7,7 +7,7 @@ use { STREAM_THROTTLING_INTERVAL_MS, }, }, - quic::{configure_server, QuicServerError, StreamStats}, + quic::{configure_server, QuicServerError, StreamerStats}, streamer::StakedNodes, tls_certificates::get_pubkey_from_tls_certificate, }, @@ -99,7 +99,7 @@ const TOTAL_CONNECTIONS_PER_SECOND: u64 = 2500; /// The threshold of the size of the connection rate limiter map. When /// the map size is above this, we will trigger a cleanup of older /// entries used by past requests. -const CONNECITON_RATE_LIMITER_CLEANUP_SIZE_THRESHOLD: usize = 100_000; +const CONNECTION_RATE_LIMITER_CLEANUP_SIZE_THRESHOLD: usize = 100_000; // A sequence of bytes that is part of a packet // along with where in the packet it is @@ -140,7 +140,7 @@ impl ConnectionPeerType { pub struct SpawnNonBlockingServerResult { pub endpoints: Vec, - pub stats: Arc, + pub stats: Arc, pub thread: JoinHandle<()>, pub max_concurrent_connections: usize, } @@ -212,7 +212,7 @@ pub fn spawn_server_multi( .map_err(QuicServerError::EndpointFailed) }) .collect::, _>>()?; - let stats = Arc::::default(); + let stats = Arc::::default(); let handle = tokio::spawn(run_server( name, endpoints.clone(), @@ -248,7 +248,7 @@ async fn run_server( max_unstaked_connections: usize, max_streams_per_ms: u64, max_connections_per_ipaddr_per_min: u64, - stats: Arc, + stats: Arc, wait_for_chunk_timeout: Duration, coalesce: Duration, ) { @@ -331,7 +331,7 @@ async fn run_server( continue; } - if rate_limiter.len() > CONNECITON_RATE_LIMITER_CLEANUP_SIZE_THRESHOLD { + if rate_limiter.len() > CONNECTION_RATE_LIMITER_CLEANUP_SIZE_THRESHOLD { rate_limiter.retain_recent(); } stats @@ -374,7 +374,7 @@ async fn run_server( fn prune_unstaked_connection_table( unstaked_connection_table: &mut ConnectionTable, max_unstaked_connections: usize, - stats: Arc, + stats: Arc, ) { if unstaked_connection_table.total_size >= max_unstaked_connections { const PRUNE_TABLE_TO_PERCENTAGE: u8 = 90; @@ -457,7 +457,7 @@ struct NewConnectionHandlerParams { peer_type: ConnectionPeerType, total_stake: u64, max_connections_per_peer: usize, - stats: Arc, + stats: Arc, max_stake: u64, min_stake: u64, } @@ -466,7 +466,7 @@ impl NewConnectionHandlerParams { fn new_unstaked( packet_sender: AsyncSender, max_connections_per_peer: usize, - stats: Arc, + stats: Arc, ) -> NewConnectionHandlerParams { NewConnectionHandlerParams { packet_sender, @@ -640,7 +640,7 @@ async fn setup_connection( max_staked_connections: usize, max_unstaked_connections: usize, max_streams_per_ms: u64, - stats: Arc, + stats: Arc, wait_for_chunk_timeout: Duration, stream_load_ema: Arc, ) { @@ -769,7 +769,7 @@ async fn setup_connection( } } -fn handle_connection_error(e: quinn::ConnectionError, stats: &StreamStats, from: SocketAddr) { +fn handle_connection_error(e: quinn::ConnectionError, stats: &StreamerStats, from: SocketAddr) { debug!("error: {:?} from: {:?}", e, from); stats.connection_setup_error.fetch_add(1, Ordering::Relaxed); match e { @@ -811,7 +811,7 @@ async fn packet_batch_sender( packet_sender: Sender, packet_receiver: AsyncReceiver, exit: Arc, - stats: Arc, + stats: Arc, coalesce: Duration, ) { trace!("enter packet_batch_sender"); @@ -902,7 +902,7 @@ async fn packet_batch_sender( fn track_streamer_fetch_packet_performance( packet_perf_measure: &[([u8; 64], Instant)], - stats: &StreamStats, + stats: &StreamerStats, ) { if packet_perf_measure.is_empty() { return; @@ -1075,7 +1075,7 @@ async fn handle_chunk( packet_accum: &mut Option, remote_addr: &SocketAddr, packet_sender: &AsyncSender, - stats: Arc, + stats: Arc, peer_type: ConnectionPeerType, ) -> bool { match chunk { @@ -1493,7 +1493,7 @@ pub mod test { Arc, crossbeam_channel::Receiver, SocketAddr, - Arc, + Arc, ) { let sockets = { #[cfg(not(target_os = "windows"))] @@ -1742,7 +1742,7 @@ pub mod test { let (pkt_batch_sender, pkt_batch_receiver) = unbounded(); let (ptk_sender, pkt_receiver) = async_unbounded(); let exit = Arc::new(AtomicBool::new(false)); - let stats = Arc::new(StreamStats::default()); + let stats = Arc::new(StreamerStats::default()); let handle = tokio::spawn(packet_batch_sender( pkt_batch_sender, diff --git a/streamer/src/nonblocking/stream_throttle.rs b/streamer/src/nonblocking/stream_throttle.rs index 699d8d7faf33fb..a9db82874b9c21 100644 --- a/streamer/src/nonblocking/stream_throttle.rs +++ b/streamer/src/nonblocking/stream_throttle.rs @@ -1,5 +1,5 @@ use { - crate::{nonblocking::quic::ConnectionPeerType, quic::StreamStats}, + crate::{nonblocking::quic::ConnectionPeerType, quic::StreamerStats}, percentage::Percentage, std::{ cmp, @@ -23,7 +23,7 @@ pub(crate) struct StakedStreamLoadEMA { current_load_ema: AtomicU64, load_in_recent_interval: AtomicU64, last_update: RwLock, - stats: Arc, + stats: Arc, // Maximum number of streams for a staked connection in EMA window // Note: EMA window can be different than stream throttling window. EMA is being calculated // specifically for staked connections. Unstaked connections have fixed limit on @@ -35,7 +35,7 @@ pub(crate) struct StakedStreamLoadEMA { impl StakedStreamLoadEMA { pub(crate) fn new( - stats: Arc, + stats: Arc, max_unstaked_connections: usize, max_streams_per_ms: u64, ) -> Self { @@ -239,7 +239,7 @@ pub mod test { nonblocking::{ quic::DEFAULT_MAX_STREAMS_PER_MS, stream_throttle::STREAM_LOAD_EMA_INTERVAL_MS, }, - quic::{StreamStats, MAX_UNSTAKED_CONNECTIONS}, + quic::{StreamerStats, MAX_UNSTAKED_CONNECTIONS}, }, std::{ sync::{atomic::Ordering, Arc}, @@ -250,7 +250,7 @@ pub mod test { #[test] fn test_max_streams_for_unstaked_connection() { let load_ema = Arc::new(StakedStreamLoadEMA::new( - Arc::new(StreamStats::default()), + Arc::new(StreamerStats::default()), MAX_UNSTAKED_CONNECTIONS, DEFAULT_MAX_STREAMS_PER_MS, )); @@ -267,7 +267,7 @@ pub mod test { #[test] fn test_max_streams_for_staked_connection() { let load_ema = Arc::new(StakedStreamLoadEMA::new( - Arc::new(StreamStats::default()), + Arc::new(StreamerStats::default()), MAX_UNSTAKED_CONNECTIONS, DEFAULT_MAX_STREAMS_PER_MS, )); @@ -359,7 +359,7 @@ pub mod test { #[test] fn test_max_streams_for_staked_connection_with_no_unstaked_connections() { let load_ema = Arc::new(StakedStreamLoadEMA::new( - Arc::new(StreamStats::default()), + Arc::new(StreamerStats::default()), 0, DEFAULT_MAX_STREAMS_PER_MS, )); @@ -447,7 +447,7 @@ pub mod test { #[test] fn test_update_ema() { let stream_load_ema = Arc::new(StakedStreamLoadEMA::new( - Arc::new(StreamStats::default()), + Arc::new(StreamerStats::default()), MAX_UNSTAKED_CONNECTIONS, DEFAULT_MAX_STREAMS_PER_MS, )); @@ -476,7 +476,7 @@ pub mod test { #[test] fn test_update_ema_missing_interval() { let stream_load_ema = Arc::new(StakedStreamLoadEMA::new( - Arc::new(StreamStats::default()), + Arc::new(StreamerStats::default()), MAX_UNSTAKED_CONNECTIONS, DEFAULT_MAX_STREAMS_PER_MS, )); @@ -496,7 +496,7 @@ pub mod test { #[test] fn test_update_ema_if_needed() { let stream_load_ema = Arc::new(StakedStreamLoadEMA::new( - Arc::new(StreamStats::default()), + Arc::new(StreamerStats::default()), MAX_UNSTAKED_CONNECTIONS, DEFAULT_MAX_STREAMS_PER_MS, )); diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index 98d1d3bba29f47..a9309cca925829 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -136,7 +136,7 @@ impl NotifyKeyUpdate for EndpointKeyUpdater { } #[derive(Default)] -pub struct StreamStats { +pub struct StreamerStats { pub(crate) total_connections: AtomicUsize, pub(crate) total_new_connections: AtomicUsize, pub(crate) total_streams: AtomicUsize, @@ -199,7 +199,7 @@ pub struct StreamStats { pub(crate) total_incoming_connection_attempts: AtomicUsize, } -impl StreamStats { +impl StreamerStats { pub fn report(&self, name: &'static str) { let process_sampled_packets_us_hist = { let mut metrics = self.process_sampled_packets_us_hist.lock().unwrap();