From 3d9bfe20047ca972e0f42b390a4d58860435f8ee Mon Sep 17 00:00:00 2001 From: Paul Loyd Date: Wed, 10 Apr 2024 10:11:12 +0200 Subject: [PATCH] feat(telemeter): calculate stats for snapshots --- elfo-telemeter/src/actor.rs | 7 +++- elfo-telemeter/src/metrics/gauge.rs | 5 +-- elfo-telemeter/src/metrics/histogram.rs | 1 - elfo-telemeter/src/protocol.rs | 55 +++++++++++++++++++------ elfo-telemeter/src/stats.rs | 30 ++++++++++++++ 5 files changed, 81 insertions(+), 17 deletions(-) diff --git a/elfo-telemeter/src/actor.rs b/elfo-telemeter/src/actor.rs index fca26c28..ac057d00 100644 --- a/elfo-telemeter/src/actor.rs +++ b/elfo-telemeter/src/actor.rs @@ -118,12 +118,17 @@ impl Telemeter { // Run the preemtive merge process. self.storage.merge(snapshot, only_compact).await; + + if !only_compact { + snapshot.emit_stats(); + } } fn reset_distributions(&mut self) { // Reuse the latest snapshot if possible. let snapshot = Arc::make_mut(&mut self.snapshot); - snapshot.histograms_mut().for_each(|d| d.reset()); + + snapshot.reset_distributions(); } fn start_server(&mut self) { diff --git a/elfo-telemeter/src/metrics/gauge.rs b/elfo-telemeter/src/metrics/gauge.rs index b4582c0c..f5c39aea 100644 --- a/elfo-telemeter/src/metrics/gauge.rs +++ b/elfo-telemeter/src/metrics/gauge.rs @@ -129,7 +129,6 @@ mod tests { } fn action_strategy() -> impl Strategy { - // Use integers here to avoid floating point errors. prop_oneof![ 1 => (1..=SHARDS).prop_map(Action::Merge), 10 => update_strategy().prop_map(Action::Update), @@ -174,7 +173,7 @@ mod tests { assert_eq!(shards.len(), SHARDS); } - // Check eventually consistency. + // Check eventual consistency. if limit == SHARDS { prop_assert_eq!(actual.0, expected); } @@ -182,7 +181,7 @@ mod tests { } } - // Check eventually consistency. + // Check eventual consistency. for shard in shards { shard.merge(&mut actual); } diff --git a/elfo-telemeter/src/metrics/histogram.rs b/elfo-telemeter/src/metrics/histogram.rs index 8d81f0fa..8e04a17f 100644 --- a/elfo-telemeter/src/metrics/histogram.rs +++ b/elfo-telemeter/src/metrics/histogram.rs @@ -3,7 +3,6 @@ use std::mem; use super::MetricKind; use crate::protocol::Distribution; -// TODO: *lazy* reservior sampling to improve memory usage? pub(crate) struct Histogram(SegVec); impl MetricKind for Histogram { diff --git a/elfo-telemeter/src/protocol.rs b/elfo-telemeter/src/protocol.rs index b01e771a..7380ee3e 100644 --- a/elfo-telemeter/src/protocol.rs +++ b/elfo-telemeter/src/protocol.rs @@ -9,6 +9,8 @@ use tracing::warn; use elfo_core::{message, ActorMeta, Local}; +use crate::stats::SnapshotStats; + #[message(ret = Rendered)] pub(crate) struct Render; @@ -43,20 +45,44 @@ pub struct Snapshot { } impl Snapshot { - pub(crate) fn histograms_mut(&mut self) -> impl Iterator { + pub(crate) fn reset_distributions(&mut self) { let global = self.global.histograms.values_mut(); - let per_group = self + let groupwise = self .groupwise .values_mut() .flat_map(|m| m.histograms.values_mut()); - let per_actor = self + let actorwise = self .actorwise .values_mut() .flat_map(|m| m.histograms.values_mut()); - global.chain(per_group).chain(per_actor) + for d in global.chain(groupwise).chain(actorwise) { + d.reset(); + } + } + + pub(crate) fn emit_stats(&self) { + let mut stats = SnapshotStats::new::(); + + stats.add_registry(&self.groupwise); + stats.add_registry(&self.actorwise); + + std::iter::once(&self.global) + .chain(self.groupwise.values()) + .chain(self.actorwise.values()) + .for_each(|metrics| { + stats.add_registry(&metrics.counters); + stats.add_registry(&metrics.gauges); + stats.add_registry(&metrics.histograms); + + metrics.histograms.values().for_each(|d| { + stats.add_additional_size(d.sketch_size()); + }); + }); + + stats.emit(); } } @@ -135,14 +161,6 @@ impl Distribution { self.cumulative_sum + self.sketch.sum().unwrap_or_default() } - /// Resets the distribution. It doesn't reset cumulative values. - pub(crate) fn reset(&mut self) { - self.cumulative_sum += self.sketch.sum().unwrap_or_default(); - self.cumulative_count += self.sketch.count(); - - self.sketch = make_ddsketch(); - } - /// Adds samples to the distribution. Ignores all non-finite samples. pub(crate) fn add(&mut self, samples: &[f64]) { let sketch = Arc::make_mut(&mut self.sketch); @@ -155,6 +173,19 @@ impl Distribution { .filter(|v| f64::is_finite(**v)) .for_each(|v| sketch.add(*v)); } + + /// Resets the distribution. It doesn't reset cumulative values. + fn reset(&mut self) { + self.cumulative_sum += self.sketch.sum().unwrap_or_default(); + self.cumulative_count += self.sketch.count(); + + self.sketch = make_ddsketch(); + } + + fn sketch_size(&self) -> usize { + // `DDSketch::length()` returns the number of u64 buckets. + std::mem::size_of::() + 8 * self.sketch.length() + } } fn make_ddsketch() -> Arc { diff --git a/elfo-telemeter/src/stats.rs b/elfo-telemeter/src/stats.rs index a8c6b1a3..189aa23b 100644 --- a/elfo-telemeter/src/stats.rs +++ b/elfo-telemeter/src/stats.rs @@ -16,6 +16,8 @@ pub(crate) fn register() { ); } +// === Storage === + // The total size estimator. pub(crate) struct StorageStats { shards_total: u32, @@ -73,6 +75,34 @@ impl ShardStats { } } +// === Snapshot === + +pub(crate) struct SnapshotStats { + total_size: usize, +} + +impl SnapshotStats { + pub(crate) fn new() -> Self { + Self { + total_size: mem::size_of::(), + } + } + + pub(crate) fn add_registry(&mut self, registry: &FxHashMap) { + self.total_size += estimate_hashbrown_size::<(K, V)>(registry.capacity()); + } + + pub(crate) fn add_additional_size(&mut self, size: usize) { + self.total_size += size; + } + + pub(crate) fn emit(&self) { + gauge!("elfo_metrics_usage_bytes", self.total_size as f64, "object" => "Snapshot"); + } +} + +// === Helpers === + // From the `datasize` crate. fn estimate_hashbrown_size(capacity: usize) -> usize { // https://github.com/rust-lang/hashbrown/blob/v0.12.3/src/raw/mod.rs#L185