diff --git a/src/components/proxy/io_uring_shared.rs b/src/components/proxy/io_uring_shared.rs index 40c1c8f68..342a92b4d 100644 --- a/src/components/proxy/io_uring_shared.rs +++ b/src/components/proxy/io_uring_shared.rs @@ -228,6 +228,7 @@ fn process_packet( ctx: &mut PacketProcessorCtx, packet: RecvPacket, last_received_at: &mut Option, + processing_time: &prometheus::local::LocalHistogram, ) { match ctx { PacketProcessorCtx::Router { @@ -256,6 +257,7 @@ fn process_packet( sessions, error_acc, destinations, + processing_time, ); } PacketProcessorCtx::SessionPool { pool, port, .. } => { @@ -453,6 +455,8 @@ impl IoUringLoop { // Just double buffer the pending writes for simplicity let mut double_pending_sends = Vec::with_capacity(pending_sends.capacity()); + let processing_metrics = ProcessingMetrics::new(); + // When sending packets, this is the direction used when updating metrics let send_dir = if matches!(ctx, PacketProcessorCtx::Router { .. }) { metrics::WRITE @@ -478,6 +482,8 @@ impl IoUringLoop { // onto the submission queue for the loop to actually function (ie, similar to await on futures) loop_ctx.sync(); + const FLUSH_DURATION: std::time::Duration = std::time::Duration::from_secs(15); + let mut time_since_flush = std::time::Duration::default(); let mut last_received_at = None; // The core io uring loop @@ -520,7 +526,24 @@ impl IoUringLoop { } let packet = packet.finalize_recv(ret as usize); - process_packet(&mut ctx, packet, &mut last_received_at); + let old_received_at = last_received_at.clone(); + process_packet( + &mut ctx, + packet, + &mut last_received_at, + &processing_metrics.processing_time, + ); + + if let (Some(old_received_at), Some(last_received_at)) = + (&old_received_at, &last_received_at) + { + time_since_flush += last_received_at - old_received_at; + + if time_since_flush >= FLUSH_DURATION { + time_since_flush = <_>::default(); + processing_metrics.flush(); + } + } loop_ctx.enqueue_recv(buffer_pool.clone().alloc()); } diff --git a/src/components/proxy/packet_router.rs b/src/components/proxy/packet_router.rs index 5d54c99e2..29fc475cb 100644 --- a/src/components/proxy/packet_router.rs +++ b/src/components/proxy/packet_router.rs @@ -57,6 +57,7 @@ impl DownstreamReceiveWorkerConfig { sessions: &Arc, error_acc: &mut super::error::ErrorAccumulator, destinations: &mut Vec, + processing_time: &prometheus::local::LocalHistogram, ) { tracing::trace!( id = worker_id, @@ -65,7 +66,7 @@ impl DownstreamReceiveWorkerConfig { "received packet from downstream" ); - let timer = metrics::processing_time(metrics::READ).start_timer(); + let timer = processing_time.start_timer(); match Self::process_downstream_received_packet(packet, config, sessions, destinations) { Ok(()) => { error_acc.maybe_send(); diff --git a/src/components/proxy/packet_router/reference.rs b/src/components/proxy/packet_router/reference.rs index 694d5eae6..94422297e 100644 --- a/src/components/proxy/packet_router/reference.rs +++ b/src/components/proxy/packet_router/reference.rs @@ -102,6 +102,7 @@ impl super::DownstreamReceiveWorkerConfig { let mut error_acc = crate::components::proxy::error::ErrorAccumulator::new(error_sender); let mut destinations = Vec::with_capacity(1); + let processing_metrics = crate::metrics::ProcessingMetrics::new(); loop { // Initialize a buffer for the UDP packet. We use the maximum size of a UDP @@ -134,7 +135,10 @@ impl super::DownstreamReceiveWorkerConfig { &sessions, &mut error_acc, &mut destinations, + &processing_metrics.read_processing_time, ); + + processing_metrics.flush(); } Err(error) => { tracing::error!(%error, "error receiving packet"); diff --git a/src/metrics.rs b/src/metrics.rs index 75019a8a6..ef2d0509e 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -17,8 +17,8 @@ use crate::net::maxmind_db::MetricsIpNetEntry; use once_cell::sync::Lazy; use prometheus::{ - core::Collector, Histogram, HistogramOpts, HistogramVec, IntCounter, IntCounterVec, IntGauge, - IntGaugeVec, Opts, Registry, DEFAULT_BUCKETS, + core::Collector, local::LocalHistogram, Histogram, HistogramOpts, HistogramVec, IntCounter, + IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry, DEFAULT_BUCKETS, }; pub use prometheus::Result; @@ -282,6 +282,23 @@ pub trait CollectorExt: Collector + Clone + Sized + 'static { impl CollectorExt for C {} +/// A local instance of all of the metrics related to packet processing. +pub struct ProcessingMetrics { + pub read_processing_time: LocalHistogram, +} + +impl ProcessingMetrics { + pub fn new() -> Self { + Self { + read_processing_time: processing_time(READ).local(), + } + } + + pub fn flush(&self) { + self.read_processing_time.flush(); + } +} + #[cfg(test)] mod test { fn check(num: u64, exp: &str) {