Skip to content

Commit

Permalink
feat(telemeter): calculate stats for snapshots
Browse files Browse the repository at this point in the history
  • Loading branch information
loyd committed Apr 10, 2024
1 parent f1c28bf commit 3d9bfe2
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 17 deletions.
7 changes: 6 additions & 1 deletion elfo-telemeter/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,17 @@ impl Telemeter {

// Run the preemtive merge process.
self.storage.merge(snapshot, only_compact).await;

if !only_compact {
snapshot.emit_stats();
}
}

fn reset_distributions(&mut self) {
// Reuse the latest snapshot if possible.
let snapshot = Arc::make_mut(&mut self.snapshot);
snapshot.histograms_mut().for_each(|d| d.reset());

snapshot.reset_distributions();
}

fn start_server(&mut self) {
Expand Down
5 changes: 2 additions & 3 deletions elfo-telemeter/src/metrics/gauge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ mod tests {
}

fn action_strategy() -> impl Strategy<Value = Action> {
// Use integers here to avoid floating point errors.
prop_oneof![
1 => (1..=SHARDS).prop_map(Action::Merge),
10 => update_strategy().prop_map(Action::Update),
Expand Down Expand Up @@ -174,15 +173,15 @@ mod tests {
assert_eq!(shards.len(), SHARDS);
}

// Check eventually consistency.
// Check eventual consistency.
if limit == SHARDS {
prop_assert_eq!(actual.0, expected);
}
}
}
}

// Check eventually consistency.
// Check eventual consistency.
for shard in shards {
shard.merge(&mut actual);
}
Expand Down
1 change: 0 additions & 1 deletion elfo-telemeter/src/metrics/histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::mem;
use super::MetricKind;
use crate::protocol::Distribution;

// TODO: *lazy* reservior sampling to improve memory usage?
pub(crate) struct Histogram(SegVec<f64>);

impl MetricKind for Histogram {
Expand Down
55 changes: 43 additions & 12 deletions elfo-telemeter/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use tracing::warn;

use elfo_core::{message, ActorMeta, Local};

use crate::stats::SnapshotStats;

#[message(ret = Rendered)]
pub(crate) struct Render;

Expand Down Expand Up @@ -43,20 +45,44 @@ pub struct Snapshot {
}

impl Snapshot {
pub(crate) fn histograms_mut(&mut self) -> impl Iterator<Item = &mut Distribution> {
pub(crate) fn reset_distributions(&mut self) {
let global = self.global.histograms.values_mut();

let per_group = self
let groupwise = self
.groupwise
.values_mut()
.flat_map(|m| m.histograms.values_mut());

let per_actor = self
let actorwise = self
.actorwise
.values_mut()
.flat_map(|m| m.histograms.values_mut());

global.chain(per_group).chain(per_actor)
for d in global.chain(groupwise).chain(actorwise) {
d.reset();
}
}

pub(crate) fn emit_stats(&self) {
let mut stats = SnapshotStats::new::<Self>();

stats.add_registry(&self.groupwise);
stats.add_registry(&self.actorwise);

std::iter::once(&self.global)
.chain(self.groupwise.values())
.chain(self.actorwise.values())
.for_each(|metrics| {
stats.add_registry(&metrics.counters);
stats.add_registry(&metrics.gauges);
stats.add_registry(&metrics.histograms);

metrics.histograms.values().for_each(|d| {
stats.add_additional_size(d.sketch_size());
});
});

stats.emit();
}
}

Expand Down Expand Up @@ -135,14 +161,6 @@ impl Distribution {
self.cumulative_sum + self.sketch.sum().unwrap_or_default()
}

/// Resets the distribution. It doesn't reset cumulative values.
pub(crate) fn reset(&mut self) {
self.cumulative_sum += self.sketch.sum().unwrap_or_default();
self.cumulative_count += self.sketch.count();

self.sketch = make_ddsketch();
}

/// Adds samples to the distribution. Ignores all non-finite samples.
pub(crate) fn add(&mut self, samples: &[f64]) {
let sketch = Arc::make_mut(&mut self.sketch);
Expand All @@ -155,6 +173,19 @@ impl Distribution {
.filter(|v| f64::is_finite(**v))
.for_each(|v| sketch.add(*v));
}

/// Resets the distribution. It doesn't reset cumulative values.
fn reset(&mut self) {
self.cumulative_sum += self.sketch.sum().unwrap_or_default();
self.cumulative_count += self.sketch.count();

self.sketch = make_ddsketch();
}

fn sketch_size(&self) -> usize {
// `DDSketch::length()` returns the number of u64 buckets.
std::mem::size_of::<DDSketch>() + 8 * self.sketch.length()
}
}

fn make_ddsketch() -> Arc<DDSketch> {
Expand Down
30 changes: 30 additions & 0 deletions elfo-telemeter/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ pub(crate) fn register() {
);
}

// === Storage ===

// The total size estimator.
pub(crate) struct StorageStats {
shards_total: u32,
Expand Down Expand Up @@ -73,6 +75,34 @@ impl ShardStats {
}
}

// === Snapshot ===

pub(crate) struct SnapshotStats {
total_size: usize,
}

impl SnapshotStats {
pub(crate) fn new<T>() -> Self {
Self {
total_size: mem::size_of::<T>(),
}
}

pub(crate) fn add_registry<K, V>(&mut self, registry: &FxHashMap<K, V>) {
self.total_size += estimate_hashbrown_size::<(K, V)>(registry.capacity());
}

pub(crate) fn add_additional_size(&mut self, size: usize) {
self.total_size += size;
}

pub(crate) fn emit(&self) {
gauge!("elfo_metrics_usage_bytes", self.total_size as f64, "object" => "Snapshot");
}
}

// === Helpers ===

// From the `datasize` crate.
fn estimate_hashbrown_size<T>(capacity: usize) -> usize {
// https://github.com/rust-lang/hashbrown/blob/v0.12.3/src/raw/mod.rs#L185
Expand Down

0 comments on commit 3d9bfe2

Please sign in to comment.