diff --git a/aggregator/src/aggregator.rs b/aggregator/src/aggregator.rs index d7103a335..5feabaca5 100644 --- a/aggregator/src/aggregator.rs +++ b/aggregator/src/aggregator.rs @@ -8,8 +8,10 @@ use crate::{ AggregationJobWriter, AggregationJobWriterMetrics, InitialWrite, ReportAggregationUpdate as _, WritableReportAggregation, }, - error::{handle_ping_pong_error, ReportRejection, ReportRejectionReason}, - error::{BatchMismatch, OptOutReason}, + error::{ + handle_ping_pong_error, BatchMismatch, OptOutReason, ReportRejection, + ReportRejectionReason, + }, query_type::{CollectableQueryType, UploadableQueryType}, report_writer::{ReportWriteBatcher, WritableReport}, }, @@ -19,7 +21,10 @@ use crate::{ }, config::TaskprovConfig, diagnostic::AggregationJobInitForbiddenMutationEvent, - metrics::{aggregate_step_failure_counter, report_aggregation_success_counter}, + metrics::{ + aggregate_step_failure_counter, aggregated_report_share_dimension_histogram, + report_aggregation_success_counter, + }, }; use backoff::{backoff::Backoff, Notify}; use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; @@ -163,6 +168,8 @@ struct AggregatorMetrics { /// Counters tracking the number of failures to step client reports through the aggregation /// process. aggregate_step_failure_counter: Counter, + /// Histogram tracking the VDAF type and dimension of successfully-aggregated reports. + aggregated_report_share_dimension_histogram: Histogram, } impl AggregatorMetrics { @@ -170,6 +177,9 @@ impl AggregatorMetrics { AggregationJobWriterMetrics { report_aggregation_success_counter: self.report_aggregation_success_counter.clone(), aggregate_step_failure_counter: self.aggregate_step_failure_counter.clone(), + aggregated_report_share_dimension_histogram: self + .aggregated_report_share_dimension_histogram + .clone(), } } } @@ -283,6 +293,8 @@ impl Aggregator { let report_aggregation_success_counter = report_aggregation_success_counter(meter); let aggregate_step_failure_counter = aggregate_step_failure_counter(meter); + let aggregated_report_share_dimension_histogram = + aggregated_report_share_dimension_histogram(meter); let global_hpke_keypairs = GlobalHpkeKeypairCache::new( datastore.clone(), @@ -302,6 +314,7 @@ impl Aggregator { upload_decode_failure_counter, report_aggregation_success_counter, aggregate_step_failure_counter, + aggregated_report_share_dimension_histogram, }, global_hpke_keypairs, peer_aggregators, diff --git a/aggregator/src/aggregator/aggregation_job_driver.rs b/aggregator/src/aggregator/aggregation_job_driver.rs index daa2830ac..9700e6e59 100644 --- a/aggregator/src/aggregator/aggregation_job_driver.rs +++ b/aggregator/src/aggregator/aggregation_job_driver.rs @@ -1,12 +1,16 @@ -use crate::aggregator::{ - aggregate_step_failure_counter, - aggregation_job_writer::{ - AggregationJobWriter, AggregationJobWriterMetrics, UpdateWrite, WritableReportAggregation, +use crate::{ + aggregator::{ + aggregate_step_failure_counter, + aggregation_job_writer::{ + AggregationJobWriter, AggregationJobWriterMetrics, UpdateWrite, + WritableReportAggregation, + }, + error::handle_ping_pong_error, + http_handlers::AGGREGATION_JOB_ROUTE, + query_type::CollectableQueryType, + report_aggregation_success_counter, send_request_to_helper, Error, RequestBody, }, - error::handle_ping_pong_error, - http_handlers::AGGREGATION_JOB_ROUTE, - query_type::CollectableQueryType, - report_aggregation_success_counter, send_request_to_helper, Error, RequestBody, + metrics::aggregated_report_share_dimension_histogram, }; use anyhow::{anyhow, Result}; use backoff::backoff::Backoff; @@ -64,6 +68,8 @@ pub struct AggregationJobDriver { #[derivative(Debug = "ignore")] aggregate_step_failure_counter: Counter, #[derivative(Debug = "ignore")] + aggregated_report_share_dimension_histogram: Histogram, + #[derivative(Debug = "ignore")] job_cancel_counter: Counter, #[derivative(Debug = "ignore")] job_retry_counter: Counter, @@ -83,6 +89,8 @@ where ) -> Self { let aggregation_success_counter = report_aggregation_success_counter(meter); let aggregate_step_failure_counter = aggregate_step_failure_counter(meter); + let aggregated_report_share_dimension_histogram = + aggregated_report_share_dimension_histogram(meter); let job_cancel_counter = meter .u64_counter("janus_job_cancellations") @@ -112,6 +120,7 @@ where backoff, aggregation_success_counter, aggregate_step_failure_counter, + aggregated_report_share_dimension_histogram, job_cancel_counter, job_retry_counter, http_request_duration_histogram, @@ -914,6 +923,9 @@ where Some(AggregationJobWriterMetrics { report_aggregation_success_counter: self.aggregation_success_counter.clone(), aggregate_step_failure_counter: self.aggregate_step_failure_counter.clone(), + aggregated_report_share_dimension_histogram: self + .aggregated_report_share_dimension_histogram + .clone(), }), ); let new_step = aggregation_job.step().increment(); diff --git a/aggregator/src/aggregator/aggregation_job_writer.rs b/aggregator/src/aggregator/aggregation_job_writer.rs index d322fbcf4..14192c3e9 100644 --- a/aggregator/src/aggregator/aggregation_job_writer.rs +++ b/aggregator/src/aggregator/aggregation_job_writer.rs @@ -16,15 +16,21 @@ use janus_aggregator_core::{ query_type::AccumulableQueryType, task::AggregatorTask, }; +#[cfg(feature = "fpvec_bounded_l2")] +use janus_core::vdaf::Prio3FixedPointBoundedL2VecSumBitSize; use janus_core::{ report_id::ReportIdChecksumExt as _, time::{Clock, IntervalExt}, + vdaf::VdafInstance, }; use janus_messages::{ AggregationJobId, Interval, PrepareError, PrepareResp, PrepareStepResult, ReportId, ReportIdChecksum, Time, }; -use opentelemetry::{metrics::Counter, KeyValue}; +use opentelemetry::{ + metrics::{Counter, Histogram}, + KeyValue, +}; use prio::{codec::Encode, vdaf}; use rand::{thread_rng, Rng as _}; use std::{borrow::Cow, collections::HashMap, marker::PhantomData, sync::Arc}; @@ -52,6 +58,7 @@ where pub struct AggregationJobWriterMetrics { pub report_aggregation_success_counter: Counter, pub aggregate_step_failure_counter: Counter, + pub aggregated_report_share_dimension_histogram: Histogram, } #[allow(private_bounds)] @@ -673,12 +680,125 @@ where match ra_batch_aggregation.merged_with(batch_aggregation) { Ok(merged_batch_aggregation) => { self.writer.update_metrics(|metrics| { - metrics.report_aggregation_success_counter.add(1, &[]) + metrics.report_aggregation_success_counter.add(1, &[]); + + use VdafInstance::*; + match self.writer.task.vdaf() { + Prio3Count => metrics + .aggregated_report_share_dimension_histogram + .record(1, &[KeyValue::new("type", "Prio3Count")]), + + Prio3Sum { bits } => { + metrics.aggregated_report_share_dimension_histogram.record( + u64::try_from(*bits).unwrap_or(u64::MAX), + &[KeyValue::new("type", "Prio3Sum")], + ) + } + + Prio3SumVec { + bits, + length, + chunk_length: _, + } => { + metrics.aggregated_report_share_dimension_histogram.record( + u64::try_from(*bits) + .unwrap_or(u64::MAX) + .saturating_mul( + u64::try_from(*length).unwrap_or(u64::MAX), + ), + &[KeyValue::new("type", "Prio3SumVec")], + ) + } + + Prio3SumVecField64MultiproofHmacSha256Aes128 { + proofs: _, + bits, + length, + chunk_length: _, + } => { + metrics.aggregated_report_share_dimension_histogram.record( + u64::try_from(*bits) + .unwrap_or(u64::MAX) + .saturating_mul( + u64::try_from(*length).unwrap_or(u64::MAX), + ), + &[KeyValue::new( + "type", + "Prio3SumVecField64MultiproofHmacSha256Aes128", + )], + ) + } + + Prio3Histogram { + length, + chunk_length: _, + } => { + metrics.aggregated_report_share_dimension_histogram.record( + u64::try_from(*length).unwrap_or(u64::MAX), + &[KeyValue::new("type", "Prio3Histogram")], + ) + } + + #[cfg(feature = "fpvec_bounded_l2")] + Prio3FixedPointBoundedL2VecSum { + bitsize: Prio3FixedPointBoundedL2VecSumBitSize::BitSize16, + dp_strategy: _, + length, + } => { + metrics.aggregated_report_share_dimension_histogram.record( + u64::try_from(*length) + .unwrap_or(u64::MAX) + .saturating_mul(16), + &[KeyValue::new( + "type", + "Prio3FixedPointBoundedL2VecSum", + )], + ) + } + + #[cfg(feature = "fpvec_bounded_l2")] + Prio3FixedPointBoundedL2VecSum { + bitsize: Prio3FixedPointBoundedL2VecSumBitSize::BitSize32, + dp_strategy: _, + length, + } => { + metrics.aggregated_report_share_dimension_histogram.record( + u64::try_from(*length) + .unwrap_or(u64::MAX) + .saturating_mul(32), + &[KeyValue::new( + "type", + "Prio3FixedPointBoundedL2VecSum", + )], + ) + } + + Poplar1 { bits } => { + metrics.aggregated_report_share_dimension_histogram.record( + u64::try_from(*bits).unwrap_or(u64::MAX), + &[KeyValue::new("type", "Poplar1")], + ) + } + + #[cfg(feature = "test-util")] + Fake { rounds: _ } | FakeFailsPrepInit | FakeFailsPrepStep => { + metrics + .aggregated_report_share_dimension_histogram + .record(0, &[KeyValue::new("type", "Fake")]) + } + _ => metrics + .aggregated_report_share_dimension_histogram + .record(0, &[KeyValue::new("type", "unknown")]), + } }); *batch_aggregation = merged_batch_aggregation } Err(err) => { - warn!(report_id = %report_aggregation.report_id(), ?err, "Couldn't update batch aggregation"); + warn!( + report_id = %report_aggregation.report_id(), + ?err, + "Couldn't update batch aggregation", + ); self.writer.update_metrics(|metrics| { metrics .aggregate_step_failure_counter diff --git a/aggregator/src/metrics.rs b/aggregator/src/metrics.rs index c0e8c8100..b072fa10a 100644 --- a/aggregator/src/metrics.rs +++ b/aggregator/src/metrics.rs @@ -2,7 +2,7 @@ use anyhow::anyhow; use opentelemetry::{ - metrics::{Counter, Meter, Unit}, + metrics::{Counter, Histogram, Meter, Unit}, KeyValue, }; use serde::{Deserialize, Serialize}; @@ -153,26 +153,31 @@ pub enum MetricsExporterHandle { #[cfg(any(feature = "prometheus", feature = "otlp"))] struct CustomView { - uint_histogram_view: Box, + retries_histogram_view: Box, + vdaf_dimension_histogram_view: Box, bytes_histogram_view: Box, default_histogram_view: Box, } #[cfg(any(feature = "prometheus", feature = "otlp"))] impl CustomView { + /// These boundaries are for the number of times a database transaction was retried. + const RETRIES_HISTOGRAM_BOUNDARIES: &'static [f64] = &[ + 1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0, 512.0, 1024.0, 2048.0, 4096.0, 8192.0, + 16384.0, + ]; + + /// These boundaries are for the dimensions of VDAF measurements. + const VDAF_DIMENSION_HISTOGRAM_VALUES: &'static [f64] = &[ + 1.0, 4.0, 16.0, 64.0, 256.0, 1024.0, 4096.0, 16384.0, 65536.0, 262144.0, + ]; + /// These boundaries are intended to be used with measurements having the unit of "bytes". const BYTES_HISTOGRAM_BOUNDARIES: &'static [f64] = &[ 1024.0, 2048.0, 4096.0, 8192.0, 16384.0, 32768.0, 65536.0, 131072.0, 262144.0, 524288.0, 1048576.0, 2097152.0, 4194304.0, 8388608.0, 16777216.0, 33554432.0, ]; - /// These boundaries are for measurements of unsigned integers, such as the number of retries - /// that an operation took. - const UINT_HISTOGRAM_BOUNDARIES: &'static [f64] = &[ - 1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0, 512.0, 1024.0, 2048.0, 4096.0, 8192.0, - 16384.0, - ]; - /// These boundaries are intended to be able to capture the length of short-lived operations /// (e.g HTTP requests) as well as longer-running operations. const DEFAULT_HISTOGRAM_BOUNDARIES: &'static [f64] = &[ @@ -180,23 +185,31 @@ impl CustomView { ]; pub fn new() -> Result { + let wildcard_instrument = Instrument::new().name("*"); Ok(Self { - uint_histogram_view: new_view( - Instrument::new().name("*"), + retries_histogram_view: new_view( + wildcard_instrument.clone(), + Stream::new().aggregation(Aggregation::ExplicitBucketHistogram { + boundaries: Vec::from(Self::RETRIES_HISTOGRAM_BOUNDARIES), + record_min_max: true, + }), + )?, + vdaf_dimension_histogram_view: new_view( + wildcard_instrument.clone(), Stream::new().aggregation(Aggregation::ExplicitBucketHistogram { - boundaries: Vec::from(Self::UINT_HISTOGRAM_BOUNDARIES), + boundaries: Vec::from(Self::VDAF_DIMENSION_HISTOGRAM_VALUES), record_min_max: true, }), )?, bytes_histogram_view: new_view( - Instrument::new().name("*"), + wildcard_instrument.clone(), Stream::new().aggregation(Aggregation::ExplicitBucketHistogram { boundaries: Vec::from(Self::BYTES_HISTOGRAM_BOUNDARIES), record_min_max: true, }), )?, default_histogram_view: new_view( - Instrument::new().name("*"), + wildcard_instrument, Stream::new().aggregation(Aggregation::ExplicitBucketHistogram { boundaries: Vec::from(Self::DEFAULT_HISTOGRAM_BOUNDARIES), record_min_max: true, @@ -210,13 +223,16 @@ impl CustomView { impl View for CustomView { fn match_inst(&self, inst: &Instrument) -> Option { match (inst.kind, inst.name.as_ref()) { + (Some(InstrumentKind::Histogram), TRANSACTION_RETRIES_METER_NAME) => { + self.retries_histogram_view.match_inst(inst) + } + (Some(InstrumentKind::Histogram), AGGREGATED_REPORT_SHARE_DIMENSION_METER_NAME) => { + self.vdaf_dimension_histogram_view.match_inst(inst) + } ( Some(InstrumentKind::Histogram), "http.server.request.body.size" | "http.server.response.body.size", ) => self.bytes_histogram_view.match_inst(inst), - (Some(InstrumentKind::Histogram), TRANSACTION_RETRIES_METER_NAME) => { - self.uint_histogram_view.match_inst(inst) - } (Some(InstrumentKind::Histogram), _) => self.default_histogram_view.match_inst(inst), _ => None, } @@ -389,6 +405,8 @@ fn resource() -> Resource { version_info_resource.merge(&default_resource) } +// TODO(#3165): This counter is made obsolete by the histogram below. Remove it once it is no longer +// being used. pub(crate) fn report_aggregation_success_counter(meter: &Meter) -> Counter { let report_aggregation_success_counter = meter .u64_counter("janus_report_aggregation_success_counter") @@ -399,6 +417,16 @@ pub(crate) fn report_aggregation_success_counter(meter: &Meter) -> Counter report_aggregation_success_counter } +pub const AGGREGATED_REPORT_SHARE_DIMENSION_METER_NAME: &str = + "janus_aggregated_report_share_vdaf_dimension"; + +pub(crate) fn aggregated_report_share_dimension_histogram(meter: &Meter) -> Histogram { + meter + .u64_histogram(AGGREGATED_REPORT_SHARE_DIMENSION_METER_NAME) + .with_description("Successfully aggregated report shares") + .init() +} + pub(crate) fn aggregate_step_failure_counter(meter: &Meter) -> Counter { let aggregate_step_failure_counter = meter .u64_counter("janus_step_failures")