Skip to content

Commit

Permalink
Rename StreamStats to StreamerStats (#2028)
Browse files Browse the repository at this point in the history
Name StreamStats is misleading because this structure has metrics not only for streams but also foe connections. Hence, it makes sense to rename it to StreamerStats.
  • Loading branch information
KirillLykov authored Jul 11, 2024
1 parent 0f5263d commit b24f492
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 28 deletions.
32 changes: 16 additions & 16 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use {
STREAM_THROTTLING_INTERVAL_MS,
},
},
quic::{configure_server, QuicServerError, StreamStats},
quic::{configure_server, QuicServerError, StreamerStats},
streamer::StakedNodes,
tls_certificates::get_pubkey_from_tls_certificate,
},
Expand Down Expand Up @@ -99,7 +99,7 @@ const TOTAL_CONNECTIONS_PER_SECOND: u64 = 2500;
/// The threshold of the size of the connection rate limiter map. When
/// the map size is above this, we will trigger a cleanup of older
/// entries used by past requests.
const CONNECITON_RATE_LIMITER_CLEANUP_SIZE_THRESHOLD: usize = 100_000;
const CONNECTION_RATE_LIMITER_CLEANUP_SIZE_THRESHOLD: usize = 100_000;

// A sequence of bytes that is part of a packet
// along with where in the packet it is
Expand Down Expand Up @@ -140,7 +140,7 @@ impl ConnectionPeerType {

pub struct SpawnNonBlockingServerResult {
pub endpoints: Vec<Endpoint>,
pub stats: Arc<StreamStats>,
pub stats: Arc<StreamerStats>,
pub thread: JoinHandle<()>,
pub max_concurrent_connections: usize,
}
Expand Down Expand Up @@ -212,7 +212,7 @@ pub fn spawn_server_multi(
.map_err(QuicServerError::EndpointFailed)
})
.collect::<Result<Vec<_>, _>>()?;
let stats = Arc::<StreamStats>::default();
let stats = Arc::<StreamerStats>::default();
let handle = tokio::spawn(run_server(
name,
endpoints.clone(),
Expand Down Expand Up @@ -248,7 +248,7 @@ async fn run_server(
max_unstaked_connections: usize,
max_streams_per_ms: u64,
max_connections_per_ipaddr_per_min: u64,
stats: Arc<StreamStats>,
stats: Arc<StreamerStats>,
wait_for_chunk_timeout: Duration,
coalesce: Duration,
) {
Expand Down Expand Up @@ -331,7 +331,7 @@ async fn run_server(
continue;
}

if rate_limiter.len() > CONNECITON_RATE_LIMITER_CLEANUP_SIZE_THRESHOLD {
if rate_limiter.len() > CONNECTION_RATE_LIMITER_CLEANUP_SIZE_THRESHOLD {
rate_limiter.retain_recent();
}
stats
Expand Down Expand Up @@ -374,7 +374,7 @@ async fn run_server(
fn prune_unstaked_connection_table(
unstaked_connection_table: &mut ConnectionTable,
max_unstaked_connections: usize,
stats: Arc<StreamStats>,
stats: Arc<StreamerStats>,
) {
if unstaked_connection_table.total_size >= max_unstaked_connections {
const PRUNE_TABLE_TO_PERCENTAGE: u8 = 90;
Expand Down Expand Up @@ -457,7 +457,7 @@ struct NewConnectionHandlerParams {
peer_type: ConnectionPeerType,
total_stake: u64,
max_connections_per_peer: usize,
stats: Arc<StreamStats>,
stats: Arc<StreamerStats>,
max_stake: u64,
min_stake: u64,
}
Expand All @@ -466,7 +466,7 @@ impl NewConnectionHandlerParams {
fn new_unstaked(
packet_sender: AsyncSender<PacketAccumulator>,
max_connections_per_peer: usize,
stats: Arc<StreamStats>,
stats: Arc<StreamerStats>,
) -> NewConnectionHandlerParams {
NewConnectionHandlerParams {
packet_sender,
Expand Down Expand Up @@ -640,7 +640,7 @@ async fn setup_connection(
max_staked_connections: usize,
max_unstaked_connections: usize,
max_streams_per_ms: u64,
stats: Arc<StreamStats>,
stats: Arc<StreamerStats>,
wait_for_chunk_timeout: Duration,
stream_load_ema: Arc<StakedStreamLoadEMA>,
) {
Expand Down Expand Up @@ -769,7 +769,7 @@ async fn setup_connection(
}
}

fn handle_connection_error(e: quinn::ConnectionError, stats: &StreamStats, from: SocketAddr) {
fn handle_connection_error(e: quinn::ConnectionError, stats: &StreamerStats, from: SocketAddr) {
debug!("error: {:?} from: {:?}", e, from);
stats.connection_setup_error.fetch_add(1, Ordering::Relaxed);
match e {
Expand Down Expand Up @@ -811,7 +811,7 @@ async fn packet_batch_sender(
packet_sender: Sender<PacketBatch>,
packet_receiver: AsyncReceiver<PacketAccumulator>,
exit: Arc<AtomicBool>,
stats: Arc<StreamStats>,
stats: Arc<StreamerStats>,
coalesce: Duration,
) {
trace!("enter packet_batch_sender");
Expand Down Expand Up @@ -902,7 +902,7 @@ async fn packet_batch_sender(

fn track_streamer_fetch_packet_performance(
packet_perf_measure: &[([u8; 64], Instant)],
stats: &StreamStats,
stats: &StreamerStats,
) {
if packet_perf_measure.is_empty() {
return;
Expand Down Expand Up @@ -1075,7 +1075,7 @@ async fn handle_chunk(
packet_accum: &mut Option<PacketAccumulator>,
remote_addr: &SocketAddr,
packet_sender: &AsyncSender<PacketAccumulator>,
stats: Arc<StreamStats>,
stats: Arc<StreamerStats>,
peer_type: ConnectionPeerType,
) -> bool {
match chunk {
Expand Down Expand Up @@ -1493,7 +1493,7 @@ pub mod test {
Arc<AtomicBool>,
crossbeam_channel::Receiver<PacketBatch>,
SocketAddr,
Arc<StreamStats>,
Arc<StreamerStats>,
) {
let sockets = {
#[cfg(not(target_os = "windows"))]
Expand Down Expand Up @@ -1742,7 +1742,7 @@ pub mod test {
let (pkt_batch_sender, pkt_batch_receiver) = unbounded();
let (ptk_sender, pkt_receiver) = async_unbounded();
let exit = Arc::new(AtomicBool::new(false));
let stats = Arc::new(StreamStats::default());
let stats = Arc::new(StreamerStats::default());

let handle = tokio::spawn(packet_batch_sender(
pkt_batch_sender,
Expand Down
20 changes: 10 additions & 10 deletions streamer/src/nonblocking/stream_throttle.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use {
crate::{nonblocking::quic::ConnectionPeerType, quic::StreamStats},
crate::{nonblocking::quic::ConnectionPeerType, quic::StreamerStats},
percentage::Percentage,
std::{
cmp,
Expand All @@ -23,7 +23,7 @@ pub(crate) struct StakedStreamLoadEMA {
current_load_ema: AtomicU64,
load_in_recent_interval: AtomicU64,
last_update: RwLock<Instant>,
stats: Arc<StreamStats>,
stats: Arc<StreamerStats>,
// Maximum number of streams for a staked connection in EMA window
// Note: EMA window can be different than stream throttling window. EMA is being calculated
// specifically for staked connections. Unstaked connections have fixed limit on
Expand All @@ -35,7 +35,7 @@ pub(crate) struct StakedStreamLoadEMA {

impl StakedStreamLoadEMA {
pub(crate) fn new(
stats: Arc<StreamStats>,
stats: Arc<StreamerStats>,
max_unstaked_connections: usize,
max_streams_per_ms: u64,
) -> Self {
Expand Down Expand Up @@ -239,7 +239,7 @@ pub mod test {
nonblocking::{
quic::DEFAULT_MAX_STREAMS_PER_MS, stream_throttle::STREAM_LOAD_EMA_INTERVAL_MS,
},
quic::{StreamStats, MAX_UNSTAKED_CONNECTIONS},
quic::{StreamerStats, MAX_UNSTAKED_CONNECTIONS},
},
std::{
sync::{atomic::Ordering, Arc},
Expand All @@ -250,7 +250,7 @@ pub mod test {
#[test]
fn test_max_streams_for_unstaked_connection() {
let load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamStats::default()),
Arc::new(StreamerStats::default()),
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
));
Expand All @@ -267,7 +267,7 @@ pub mod test {
#[test]
fn test_max_streams_for_staked_connection() {
let load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamStats::default()),
Arc::new(StreamerStats::default()),
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
));
Expand Down Expand Up @@ -359,7 +359,7 @@ pub mod test {
#[test]
fn test_max_streams_for_staked_connection_with_no_unstaked_connections() {
let load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamStats::default()),
Arc::new(StreamerStats::default()),
0,
DEFAULT_MAX_STREAMS_PER_MS,
));
Expand Down Expand Up @@ -447,7 +447,7 @@ pub mod test {
#[test]
fn test_update_ema() {
let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamStats::default()),
Arc::new(StreamerStats::default()),
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
));
Expand Down Expand Up @@ -476,7 +476,7 @@ pub mod test {
#[test]
fn test_update_ema_missing_interval() {
let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamStats::default()),
Arc::new(StreamerStats::default()),
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
));
Expand All @@ -496,7 +496,7 @@ pub mod test {
#[test]
fn test_update_ema_if_needed() {
let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamStats::default()),
Arc::new(StreamerStats::default()),
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
));
Expand Down
4 changes: 2 additions & 2 deletions streamer/src/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl NotifyKeyUpdate for EndpointKeyUpdater {
}

#[derive(Default)]
pub struct StreamStats {
pub struct StreamerStats {
pub(crate) total_connections: AtomicUsize,
pub(crate) total_new_connections: AtomicUsize,
pub(crate) total_streams: AtomicUsize,
Expand Down Expand Up @@ -199,7 +199,7 @@ pub struct StreamStats {
pub(crate) total_incoming_connection_attempts: AtomicUsize,
}

impl StreamStats {
impl StreamerStats {
pub fn report(&self, name: &'static str) {
let process_sampled_packets_us_hist = {
let mut metrics = self.process_sampled_packets_us_hist.lock().unwrap();
Expand Down

0 comments on commit b24f492

Please sign in to comment.