diff --git a/crates/shared/src/request_sharing.rs b/crates/shared/src/request_sharing.rs index c2d1b38f60..0adbacf5d7 100644 --- a/crates/shared/src/request_sharing.rs +++ b/crates/shared/src/request_sharing.rs @@ -4,7 +4,7 @@ use { FutureExt, }, prometheus::{ - core::{AtomicU64, GenericGauge}, + core::{AtomicU64, GenericGaugeVec}, IntCounterVec, }, std::{ @@ -47,26 +47,27 @@ where { pub fn labelled(request_label: String) -> Self { let cache: Cache = Default::default(); - Self::spawn_gc(cache.clone()); + Self::spawn_gc(cache.clone(), request_label.clone()); Self { in_flight: cache, request_label, } } - fn collect_garbage(cache: &Cache) { + fn collect_garbage(cache: &Cache, label: &str) { let mut cache = cache.lock().unwrap(); let len_before = cache.len() as u64; cache.retain(|_request, weak| weak.upgrade().is_some()); Metrics::get() .request_sharing_cached_items + .with_label_values(&[label]) .sub(len_before - cache.len() as u64); } - fn spawn_gc(cache: Cache) { + fn spawn_gc(cache: Cache, label: String) { tokio::task::spawn(async move { loop { - Self::collect_garbage(&cache); + Self::collect_garbage(&cache, &label); tokio::time::sleep(Duration::from_millis(500)).await; } }); @@ -78,6 +79,7 @@ impl Drop for RequestSharing { let cache = self.in_flight.lock().unwrap(); Metrics::get() .request_sharing_cached_items + .with_label_values(&[&self.request_label]) .sub(cache.len() as u64); } } @@ -125,7 +127,10 @@ where // unwrap because downgrade only returns None if the Shared has already // completed which cannot be the case because we haven't polled it yet. in_flight.insert(request, shared.downgrade().unwrap()); - Metrics::get().request_sharing_cached_items.inc(); + Metrics::get() + .request_sharing_cached_items + .with_label_values(&[&self.request_label]) + .inc(); shared } } @@ -137,7 +142,8 @@ struct Metrics { request_sharing_access: IntCounterVec, /// Number of all currently cached requests - request_sharing_cached_items: GenericGauge, + #[metric(labels("request_label"))] + request_sharing_cached_items: GenericGaugeVec, } impl Metrics { @@ -155,9 +161,10 @@ mod tests { // Manually create [`RequestSharing`] so we can have fine grained control // over the garbage collection. let cache: Cache> = Default::default(); + let label = "test".to_string(); let sharing = RequestSharing { in_flight: cache, - request_label: Default::default(), + request_label: label.clone(), }; let shared0 = sharing.shared_or_else(0, |_| futures::future::ready(0).boxed()); @@ -174,14 +181,14 @@ mod tests { assert_eq!(shared1.weak_count().unwrap(), 1); // GC does not delete any keys because some tasks still use the future - RequestSharing::collect_garbage(&sharing.in_flight); + RequestSharing::collect_garbage(&sharing.in_flight, &label); assert_eq!(sharing.in_flight.lock().unwrap().len(), 1); assert!(sharing.in_flight.lock().unwrap().get(&0).is_some()); // complete second shared assert_eq!(shared1.now_or_never().unwrap(), 0); - RequestSharing::collect_garbage(&sharing.in_flight); + RequestSharing::collect_garbage(&sharing.in_flight, &label); // GC deleted all now unused futures assert!(sharing.in_flight.lock().unwrap().is_empty());