diff --git a/aggregator/src/metrics.rs b/aggregator/src/metrics.rs index 8a287114b..2ae8a11ae 100644 --- a/aggregator/src/metrics.rs +++ b/aggregator/src/metrics.rs @@ -2,29 +2,42 @@ #[cfg(any(not(feature = "prometheus"), not(feature = "otlp")))] use anyhow::anyhow; -use opentelemetry_sdk::metrics::{reader::AggregationSelector, Aggregation, InstrumentKind}; use serde::{Deserialize, Serialize}; use std::net::AddrParseError; #[cfg(feature = "prometheus")] use { anyhow::Context, - opentelemetry::{global::set_meter_provider, metrics::MetricsError}, + opentelemetry::global::set_meter_provider, prometheus::Registry, - std::net::{IpAddr, Ipv4Addr}, + std::{ + net::{IpAddr, Ipv4Addr}, + str::FromStr, + }, tokio::{sync::oneshot, task::JoinHandle}, trillium::{Info, Init}, }; #[cfg(feature = "otlp")] -use {opentelemetry_otlp::WithExportConfig, opentelemetry_sdk::runtime::Tokio}; +use { + opentelemetry_otlp::WithExportConfig, + opentelemetry_sdk::{ + metrics::{ + reader::{DefaultAggregationSelector, DefaultTemporalitySelector}, + PeriodicReader, + }, + runtime::Tokio, + }, +}; #[cfg(any(feature = "otlp", feature = "prometheus"))] use { git_version::git_version, - opentelemetry::KeyValue, - opentelemetry_sdk::{metrics::MeterProvider, Resource}, - std::str::FromStr, + opentelemetry::{metrics::MetricsError, KeyValue}, + opentelemetry_sdk::{ + metrics::{new_view, Aggregation, Instrument, InstrumentKind, MeterProvider, Stream, View}, + Resource, + }, }; #[cfg(all(test, feature = "prometheus"))] @@ -79,24 +92,56 @@ pub enum MetricsExporterHandle { Noop, } -#[derive(Debug)] -struct CustomAggregationSelector; - -impl AggregationSelector for CustomAggregationSelector { - fn aggregation(&self, kind: InstrumentKind) -> Aggregation { - /// These boundaries are intended to be able to capture the length of short-lived operations - /// (e.g HTTP requests) as well as longer-running operations. - const DEFAULT_HISTOGRAM_BOUNDARIES: &[f64] = &[ - 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 90.0, 300.0, - ]; - - match kind { - InstrumentKind::Histogram => Aggregation::ExplicitBucketHistogram { - boundaries: Vec::from(DEFAULT_HISTOGRAM_BOUNDARIES), - record_min_max: true, - }, - InstrumentKind::ObservableGauge => Aggregation::LastValue, - _ => Aggregation::Sum, +#[cfg(any(feature = "prometheus", feature = "otlp"))] +struct CustomView { + bytes_histogram_view: Box, + default_histogram_view: Box, +} + +#[cfg(any(feature = "prometheus", feature = "otlp"))] +impl CustomView { + /// These boundaries are intended to be used with measurements having the unit of "bytes". + const BYTES_HISTOGRAM_BOUNDARIES: &'static [f64] = &[ + 1024.0, 4096.0, 16384.0, 65536.0, 262144.0, 1048576.0, 4194304.0, 8388608.0, 16777216.0, + 33554432.0, 67108864.0, + ]; + + /// These boundaries are intended to be able to capture the length of short-lived operations + /// (e.g HTTP requests) as well as longer-running operations. + const DEFAULT_HISTOGRAM_BOUNDARIES: &'static [f64] = &[ + 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 90.0, 300.0, + ]; + + pub fn new() -> Result { + Ok(Self { + bytes_histogram_view: new_view( + Instrument::new().name("*"), + Stream::new().aggregation(Aggregation::ExplicitBucketHistogram { + boundaries: Vec::from(Self::BYTES_HISTOGRAM_BOUNDARIES), + record_min_max: true, + }), + )?, + default_histogram_view: new_view( + Instrument::new().name("*"), + Stream::new().aggregation(Aggregation::ExplicitBucketHistogram { + boundaries: Vec::from(Self::DEFAULT_HISTOGRAM_BOUNDARIES), + record_min_max: true, + }), + )?, + }) + } +} + +#[cfg(any(feature = "prometheus", feature = "otlp"))] +impl View for CustomView { + fn match_inst(&self, inst: &Instrument) -> Option { + match (inst.kind, inst.name.as_ref()) { + ( + Some(InstrumentKind::Histogram), + "http.server.request.body.size" | "http.server.response.body.size", + ) => self.bytes_histogram_view.match_inst(inst), + (Some(InstrumentKind::Histogram), _) => self.default_histogram_view.match_inst(inst), + _ => None, } } } @@ -106,11 +151,11 @@ fn build_opentelemetry_prometheus_meter_provider( registry: Registry, ) -> Result { let reader = opentelemetry_prometheus::exporter() - .with_aggregation_selector(CustomAggregationSelector) .with_registry(registry) .build()?; let meter_provider = MeterProvider::builder() .with_reader(reader) + .with_view(CustomView::new()?) .with_resource(resource()) .build(); Ok(meter_provider) @@ -184,16 +229,19 @@ pub async fn install_metrics_exporter( #[cfg(feature = "otlp")] Some(MetricsExporterConfiguration::Otlp(otlp_config)) => { - let meter_provider = opentelemetry_otlp::new_pipeline() - .metrics(Tokio) - .with_aggregation_selector(CustomAggregationSelector) - .with_exporter( - opentelemetry_otlp::new_exporter() - .tonic() - .with_endpoint(otlp_config.endpoint.clone()), - ) + let exporter = opentelemetry_otlp::new_exporter() + .tonic() + .with_endpoint(otlp_config.endpoint.clone()) + .build_metrics_exporter( + Box::new(DefaultAggregationSelector::new()), + Box::new(DefaultTemporalitySelector::new()), + )?; + let reader = PeriodicReader::builder(exporter, Tokio).build(); + let meter_provider = MeterProvider::builder() + .with_reader(reader) + .with_view(CustomView::new()?) .with_resource(resource()) - .build()?; + .build(); // We can't drop the PushController, as that would stop pushes, so return it to the // caller. Ok(MetricsExporterHandle::Otlp(meter_provider))