Skip to content

Commit

Permalink
perf: Use local histogram for processing time
Browse files Browse the repository at this point in the history
  • Loading branch information
XAMPPRocky committed Dec 2, 2024
1 parent 632ec6a commit a433efa
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 4 deletions.
25 changes: 24 additions & 1 deletion src/components/proxy/io_uring_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ fn process_packet(
ctx: &mut PacketProcessorCtx,
packet: RecvPacket,
last_received_at: &mut Option<UtcTimestamp>,
processing_time: &prometheus::local::LocalHistogram,
) {
match ctx {
PacketProcessorCtx::Router {
Expand Down Expand Up @@ -256,6 +257,7 @@ fn process_packet(
sessions,
error_acc,
destinations,
processing_time,
);
}
PacketProcessorCtx::SessionPool { pool, port, .. } => {
Expand Down Expand Up @@ -453,6 +455,8 @@ impl IoUringLoop {
// Just double buffer the pending writes for simplicity
let mut double_pending_sends = Vec::with_capacity(pending_sends.capacity());

let processing_metrics = ProcessingMetrics::new();

// When sending packets, this is the direction used when updating metrics
let send_dir = if matches!(ctx, PacketProcessorCtx::Router { .. }) {
metrics::WRITE
Expand All @@ -478,6 +482,8 @@ impl IoUringLoop {
// onto the submission queue for the loop to actually function (ie, similar to await on futures)
loop_ctx.sync();

const FLUSH_DURATION: std::time::Duration = std::time::Duration::from_secs(15);
let mut time_since_flush = std::time::Duration::default();
let mut last_received_at = None;

// The core io uring loop
Expand Down Expand Up @@ -520,7 +526,24 @@ impl IoUringLoop {
}

let packet = packet.finalize_recv(ret as usize);
process_packet(&mut ctx, packet, &mut last_received_at);
let old_received_at = last_received_at.clone();
process_packet(
&mut ctx,
packet,
&mut last_received_at,
&processing_metrics.processing_time,
);

if let (Some(old_received_at), Some(last_received_at)) =
(&old_received_at, &last_received_at)
{
time_since_flush += last_received_at - old_received_at;

if time_since_flush >= FLUSH_DURATION {
time_since_flush = <_>::default();
processing_metrics.flush();
}
}

loop_ctx.enqueue_recv(buffer_pool.clone().alloc());
}
Expand Down
3 changes: 2 additions & 1 deletion src/components/proxy/packet_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ impl DownstreamReceiveWorkerConfig {
sessions: &Arc<SessionPool>,
error_acc: &mut super::error::ErrorAccumulator,
destinations: &mut Vec<crate::net::EndpointAddress>,
processing_time: &prometheus::local::LocalHistogram,
) {
tracing::trace!(
id = worker_id,
Expand All @@ -65,7 +66,7 @@ impl DownstreamReceiveWorkerConfig {
"received packet from downstream"
);

let timer = metrics::processing_time(metrics::READ).start_timer();
let timer = processing_time.start_timer();
match Self::process_downstream_received_packet(packet, config, sessions, destinations) {
Ok(()) => {
error_acc.maybe_send();
Expand Down
4 changes: 4 additions & 0 deletions src/components/proxy/packet_router/reference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ impl super::DownstreamReceiveWorkerConfig {
let mut error_acc =
crate::components::proxy::error::ErrorAccumulator::new(error_sender);
let mut destinations = Vec::with_capacity(1);
let processing_metrics = crate::metrics::ProcessingMetrics::new();

loop {
// Initialize a buffer for the UDP packet. We use the maximum size of a UDP
Expand Down Expand Up @@ -134,7 +135,10 @@ impl super::DownstreamReceiveWorkerConfig {
&sessions,
&mut error_acc,
&mut destinations,
&processing_metrics.read_processing_time,
);

processing_metrics.flush();
}
Err(error) => {
tracing::error!(%error, "error receiving packet");
Expand Down
21 changes: 19 additions & 2 deletions src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
use crate::net::maxmind_db::MetricsIpNetEntry;
use once_cell::sync::Lazy;
use prometheus::{
core::Collector, Histogram, HistogramOpts, HistogramVec, IntCounter, IntCounterVec, IntGauge,
IntGaugeVec, Opts, Registry, DEFAULT_BUCKETS,
core::Collector, local::LocalHistogram, Histogram, HistogramOpts, HistogramVec, IntCounter,
IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry, DEFAULT_BUCKETS,
};

pub use prometheus::Result;
Expand Down Expand Up @@ -282,6 +282,23 @@ pub trait CollectorExt: Collector + Clone + Sized + 'static {

impl<C: Collector + Clone + 'static> CollectorExt for C {}

/// A local instance of all of the metrics related to packet processing.
pub struct ProcessingMetrics {
pub read_processing_time: LocalHistogram,
}

impl ProcessingMetrics {
pub fn new() -> Self {
Self {
read_processing_time: processing_time(READ).local(),
}
}

pub fn flush(&self) {
self.read_processing_time.flush();
}
}

#[cfg(test)]
mod test {
fn check(num: u64, exp: &str) {
Expand Down

0 comments on commit a433efa

Please sign in to comment.