diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index feb9bd2db65a3e..ccd6f8a7b46fd4 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -795,6 +795,18 @@ async fn handle_connection( >= max_streams_per_throttling_interval { stats.throttled_streams.fetch_add(1, Ordering::Relaxed); + match params.peer_type { + ConnectionPeerType::Unstaked => { + stats + .throttled_unstaked_streams + .fetch_add(1, Ordering::Relaxed); + } + ConnectionPeerType::Staked(_) => { + stats + .throttled_staked_streams + .fetch_add(1, Ordering::Relaxed); + } + } let _ = stream.stop(VarInt::from_u32(STREAM_STOP_CODE_THROTTLING)); continue; } @@ -963,6 +975,19 @@ async fn handle_chunk( .total_chunks_sent_for_batching .fetch_add(chunks_sent, Ordering::Relaxed); + match peer_type { + ConnectionPeerType::Unstaked => { + stats + .total_unstaked_packets_sent_for_batching + .fetch_add(1, Ordering::Relaxed); + } + ConnectionPeerType::Staked(_) => { + stats + .total_staked_packets_sent_for_batching + .fetch_add(1, Ordering::Relaxed); + } + } + trace!("sent {} byte packet for batching", bytes_sent); } } else { diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index 3b1b6b21adf468..9b68ab1eea01ef 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -177,6 +177,10 @@ pub struct StreamStats { pub(crate) stream_load_capacity_overflow: AtomicUsize, pub(crate) process_sampled_packets_us_hist: Mutex, pub(crate) perf_track_overhead_us: AtomicU64, + pub(crate) total_staked_packets_sent_for_batching: AtomicUsize, + pub(crate) total_unstaked_packets_sent_for_batching: AtomicUsize, + pub(crate) throttled_staked_streams: AtomicUsize, + pub(crate) throttled_unstaked_streams: AtomicUsize, } impl StreamStats { @@ -338,6 +342,18 @@ impl StreamStats { .swap(0, Ordering::Relaxed), i64 ), + ( + "staked_packets_sent_for_batching", + self.total_staked_packets_sent_for_batching + .swap(0, Ordering::Relaxed), + i64 + ), + ( + "unstaked_packets_sent_for_batching", + self.total_unstaked_packets_sent_for_batching + .swap(0, Ordering::Relaxed), + i64 + ), ( "bytes_sent_for_batching", self.total_bytes_sent_for_batching @@ -434,6 +450,16 @@ impl StreamStats { self.stream_load_capacity_overflow.load(Ordering::Relaxed), i64 ), + ( + "throttled_unstaked_streams", + self.throttled_unstaked_streams.swap(0, Ordering::Relaxed), + i64 + ), + ( + "throttled_staked_streams", + self.throttled_staked_streams.swap(0, Ordering::Relaxed), + i64 + ), ( "process_sampled_packets_us_90pct", process_sampled_packets_us_hist