Skip to content

Commit

Permalink
ObservableGauge collect data points since previous collection
Browse files Browse the repository at this point in the history
  • Loading branch information
fraillt committed Feb 5, 2025
1 parent 0e751b4 commit c4e6a43
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 5 deletions.
10 changes: 7 additions & 3 deletions opentelemetry-sdk/src/metrics/internal/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,12 @@ impl<T: Number> AggregateBuilder<T> {
}

/// Builds a last-value aggregate function input and output.
pub(crate) fn last_value(&self) -> AggregateFns<T> {
LastValue::new(self.temporality, self.filter.clone()).into()
pub(crate) fn last_value(&self, overwrite_temporality: Option<Temporality>) -> AggregateFns<T> {
LastValue::new(
overwrite_temporality.unwrap_or(self.temporality),
self.filter.clone(),
)
.into()
}

/// Builds a precomputed sum aggregate function input and output.
Expand Down Expand Up @@ -210,7 +214,7 @@ mod tests {
#[test]
fn last_value_aggregation() {
let AggregateFns { measure, collect } =
AggregateBuilder::<u64>::new(Temporality::Cumulative, None).last_value();
AggregateBuilder::<u64>::new(Temporality::Cumulative, None).last_value(None);
let mut a = Gauge {
data_points: vec![GaugeDataPoint {
attributes: vec![KeyValue::new("a", 1)],
Expand Down
18 changes: 16 additions & 2 deletions opentelemetry-sdk/src/metrics/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{

use self::internal::AggregateFns;

use super::Aggregation;
use super::{Aggregation, Temporality};

/// Connects all of the instruments created by a meter provider to a [MetricReader].
///
Expand Down Expand Up @@ -488,9 +488,20 @@ fn aggregate_fn<T: Number>(
match agg {
Aggregation::Default => aggregate_fn(b, &default_aggregation_selector(kind), kind),
Aggregation::Drop => Ok(None),
Aggregation::LastValue => Ok(Some(b.last_value())),
Aggregation::LastValue => {
match kind {
InstrumentKind::Gauge => Ok(Some(b.last_value(None))),
// temporality for LastValue only affects how data points are reported, so we can always use
// delta temporality, because observable instruments should report data points only since previous collection
InstrumentKind::ObservableGauge => Ok(Some(b.last_value(Some(Temporality::Delta)))),
_ => Err(MetricError::Other(format!("LastValue aggregation is only available for Gauge or ObservableGauge, but not for {kind:?}")))
}
}
Aggregation::Sum => {
let fns = match kind {
// TODO implement: observable instruments should not report data points on every collect
// from SDK: For asynchronous instruments with Delta or Cumulative aggregation temporality,
// MetricReader.Collect MUST only receive data points with measurements recorded since the previous collection
InstrumentKind::ObservableCounter => b.precomputed_sum(true),
InstrumentKind::ObservableUpDownCounter => b.precomputed_sum(false),
InstrumentKind::Counter | InstrumentKind::Histogram => b.sum(true),
Expand All @@ -508,6 +519,9 @@ fn aggregate_fn<T: Number>(
| InstrumentKind::ObservableUpDownCounter
| InstrumentKind::ObservableGauge
);
// TODO implement: observable instruments should not report data points on every collect
// from SDK: For asynchronous instruments with Delta or Cumulative aggregation temporality,
// MetricReader.Collect MUST only receive data points with measurements recorded since the previous collection
Ok(Some(b.explicit_bucket_histogram(
boundaries.to_vec(),
*record_min_max,
Expand Down

0 comments on commit c4e6a43

Please sign in to comment.