Skip to content

Commit

Permalink
Add histogram of report shares by VDAF parameters (#3166)
Browse files Browse the repository at this point in the history
  • Loading branch information
divergentdave authored May 28, 2024
1 parent 2c59dcf commit 562db4b
Show file tree
Hide file tree
Showing 4 changed files with 204 additions and 31 deletions.
19 changes: 16 additions & 3 deletions aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ use crate::{
AggregationJobWriter, AggregationJobWriterMetrics, InitialWrite,
ReportAggregationUpdate as _, WritableReportAggregation,
},
error::{handle_ping_pong_error, ReportRejection, ReportRejectionReason},
error::{BatchMismatch, OptOutReason},
error::{
handle_ping_pong_error, BatchMismatch, OptOutReason, ReportRejection,
ReportRejectionReason,
},
query_type::{CollectableQueryType, UploadableQueryType},
report_writer::{ReportWriteBatcher, WritableReport},
},
Expand All @@ -19,7 +21,10 @@ use crate::{
},
config::TaskprovConfig,
diagnostic::AggregationJobInitForbiddenMutationEvent,
metrics::{aggregate_step_failure_counter, report_aggregation_success_counter},
metrics::{
aggregate_step_failure_counter, aggregated_report_share_dimension_histogram,
report_aggregation_success_counter,
},
};
use backoff::{backoff::Backoff, Notify};
use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine};
Expand Down Expand Up @@ -163,13 +168,18 @@ struct AggregatorMetrics {
/// Counters tracking the number of failures to step client reports through the aggregation
/// process.
aggregate_step_failure_counter: Counter<u64>,
/// Histogram tracking the VDAF type and dimension of successfully-aggregated reports.
aggregated_report_share_dimension_histogram: Histogram<u64>,
}

impl AggregatorMetrics {
fn for_aggregation_job_writer(&self) -> AggregationJobWriterMetrics {
AggregationJobWriterMetrics {
report_aggregation_success_counter: self.report_aggregation_success_counter.clone(),
aggregate_step_failure_counter: self.aggregate_step_failure_counter.clone(),
aggregated_report_share_dimension_histogram: self
.aggregated_report_share_dimension_histogram
.clone(),
}
}
}
Expand Down Expand Up @@ -283,6 +293,8 @@ impl<C: Clock> Aggregator<C> {

let report_aggregation_success_counter = report_aggregation_success_counter(meter);
let aggregate_step_failure_counter = aggregate_step_failure_counter(meter);
let aggregated_report_share_dimension_histogram =
aggregated_report_share_dimension_histogram(meter);

let global_hpke_keypairs = GlobalHpkeKeypairCache::new(
datastore.clone(),
Expand All @@ -302,6 +314,7 @@ impl<C: Clock> Aggregator<C> {
upload_decode_failure_counter,
report_aggregation_success_counter,
aggregate_step_failure_counter,
aggregated_report_share_dimension_histogram,
},
global_hpke_keypairs,
peer_aggregators,
Expand Down
28 changes: 20 additions & 8 deletions aggregator/src/aggregator/aggregation_job_driver.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use crate::aggregator::{
aggregate_step_failure_counter,
aggregation_job_writer::{
AggregationJobWriter, AggregationJobWriterMetrics, UpdateWrite, WritableReportAggregation,
use crate::{
aggregator::{
aggregate_step_failure_counter,
aggregation_job_writer::{
AggregationJobWriter, AggregationJobWriterMetrics, UpdateWrite,
WritableReportAggregation,
},
error::handle_ping_pong_error,
http_handlers::AGGREGATION_JOB_ROUTE,
query_type::CollectableQueryType,
report_aggregation_success_counter, send_request_to_helper, Error, RequestBody,
},
error::handle_ping_pong_error,
http_handlers::AGGREGATION_JOB_ROUTE,
query_type::CollectableQueryType,
report_aggregation_success_counter, send_request_to_helper, Error, RequestBody,
metrics::aggregated_report_share_dimension_histogram,
};
use anyhow::{anyhow, Result};
use backoff::backoff::Backoff;
Expand Down Expand Up @@ -64,6 +68,8 @@ pub struct AggregationJobDriver<B> {
#[derivative(Debug = "ignore")]
aggregate_step_failure_counter: Counter<u64>,
#[derivative(Debug = "ignore")]
aggregated_report_share_dimension_histogram: Histogram<u64>,
#[derivative(Debug = "ignore")]
job_cancel_counter: Counter<u64>,
#[derivative(Debug = "ignore")]
job_retry_counter: Counter<u64>,
Expand All @@ -83,6 +89,8 @@ where
) -> Self {
let aggregation_success_counter = report_aggregation_success_counter(meter);
let aggregate_step_failure_counter = aggregate_step_failure_counter(meter);
let aggregated_report_share_dimension_histogram =
aggregated_report_share_dimension_histogram(meter);

let job_cancel_counter = meter
.u64_counter("janus_job_cancellations")
Expand Down Expand Up @@ -112,6 +120,7 @@ where
backoff,
aggregation_success_counter,
aggregate_step_failure_counter,
aggregated_report_share_dimension_histogram,
job_cancel_counter,
job_retry_counter,
http_request_duration_histogram,
Expand Down Expand Up @@ -914,6 +923,9 @@ where
Some(AggregationJobWriterMetrics {
report_aggregation_success_counter: self.aggregation_success_counter.clone(),
aggregate_step_failure_counter: self.aggregate_step_failure_counter.clone(),
aggregated_report_share_dimension_histogram: self
.aggregated_report_share_dimension_histogram
.clone(),
}),
);
let new_step = aggregation_job.step().increment();
Expand Down
126 changes: 123 additions & 3 deletions aggregator/src/aggregator/aggregation_job_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,21 @@ use janus_aggregator_core::{
query_type::AccumulableQueryType,
task::AggregatorTask,
};
#[cfg(feature = "fpvec_bounded_l2")]
use janus_core::vdaf::Prio3FixedPointBoundedL2VecSumBitSize;
use janus_core::{
report_id::ReportIdChecksumExt as _,
time::{Clock, IntervalExt},
vdaf::VdafInstance,
};
use janus_messages::{
AggregationJobId, Interval, PrepareError, PrepareResp, PrepareStepResult, ReportId,
ReportIdChecksum, Time,
};
use opentelemetry::{metrics::Counter, KeyValue};
use opentelemetry::{
metrics::{Counter, Histogram},
KeyValue,
};
use prio::{codec::Encode, vdaf};
use rand::{thread_rng, Rng as _};
use std::{borrow::Cow, collections::HashMap, marker::PhantomData, sync::Arc};
Expand Down Expand Up @@ -52,6 +58,7 @@ where
pub struct AggregationJobWriterMetrics {
pub report_aggregation_success_counter: Counter<u64>,
pub aggregate_step_failure_counter: Counter<u64>,
pub aggregated_report_share_dimension_histogram: Histogram<u64>,
}

#[allow(private_bounds)]
Expand Down Expand Up @@ -673,12 +680,125 @@ where
match ra_batch_aggregation.merged_with(batch_aggregation) {
Ok(merged_batch_aggregation) => {
self.writer.update_metrics(|metrics| {
metrics.report_aggregation_success_counter.add(1, &[])
metrics.report_aggregation_success_counter.add(1, &[]);

use VdafInstance::*;
match self.writer.task.vdaf() {
Prio3Count => metrics
.aggregated_report_share_dimension_histogram
.record(1, &[KeyValue::new("type", "Prio3Count")]),

Prio3Sum { bits } => {
metrics.aggregated_report_share_dimension_histogram.record(
u64::try_from(*bits).unwrap_or(u64::MAX),
&[KeyValue::new("type", "Prio3Sum")],
)
}

Prio3SumVec {
bits,
length,
chunk_length: _,
} => {
metrics.aggregated_report_share_dimension_histogram.record(
u64::try_from(*bits)
.unwrap_or(u64::MAX)
.saturating_mul(
u64::try_from(*length).unwrap_or(u64::MAX),
),
&[KeyValue::new("type", "Prio3SumVec")],
)
}

Prio3SumVecField64MultiproofHmacSha256Aes128 {
proofs: _,
bits,
length,
chunk_length: _,
} => {
metrics.aggregated_report_share_dimension_histogram.record(
u64::try_from(*bits)
.unwrap_or(u64::MAX)
.saturating_mul(
u64::try_from(*length).unwrap_or(u64::MAX),
),
&[KeyValue::new(
"type",
"Prio3SumVecField64MultiproofHmacSha256Aes128",
)],
)
}

Prio3Histogram {
length,
chunk_length: _,
} => {
metrics.aggregated_report_share_dimension_histogram.record(
u64::try_from(*length).unwrap_or(u64::MAX),
&[KeyValue::new("type", "Prio3Histogram")],
)
}

#[cfg(feature = "fpvec_bounded_l2")]
Prio3FixedPointBoundedL2VecSum {
bitsize: Prio3FixedPointBoundedL2VecSumBitSize::BitSize16,
dp_strategy: _,
length,
} => {
metrics.aggregated_report_share_dimension_histogram.record(
u64::try_from(*length)
.unwrap_or(u64::MAX)
.saturating_mul(16),
&[KeyValue::new(
"type",
"Prio3FixedPointBoundedL2VecSum",
)],
)
}

#[cfg(feature = "fpvec_bounded_l2")]
Prio3FixedPointBoundedL2VecSum {
bitsize: Prio3FixedPointBoundedL2VecSumBitSize::BitSize32,
dp_strategy: _,
length,
} => {
metrics.aggregated_report_share_dimension_histogram.record(
u64::try_from(*length)
.unwrap_or(u64::MAX)
.saturating_mul(32),
&[KeyValue::new(
"type",
"Prio3FixedPointBoundedL2VecSum",
)],
)
}

Poplar1 { bits } => {
metrics.aggregated_report_share_dimension_histogram.record(
u64::try_from(*bits).unwrap_or(u64::MAX),
&[KeyValue::new("type", "Poplar1")],
)
}

#[cfg(feature = "test-util")]
Fake { rounds: _ } | FakeFailsPrepInit | FakeFailsPrepStep => {
metrics
.aggregated_report_share_dimension_histogram
.record(0, &[KeyValue::new("type", "Fake")])
}
_ => metrics
.aggregated_report_share_dimension_histogram
.record(0, &[KeyValue::new("type", "unknown")]),
}
});
*batch_aggregation = merged_batch_aggregation
}
Err(err) => {
warn!(report_id = %report_aggregation.report_id(), ?err, "Couldn't update batch aggregation");
warn!(
report_id = %report_aggregation.report_id(),
?err,
"Couldn't update batch aggregation",
);
self.writer.update_metrics(|metrics| {
metrics
.aggregate_step_failure_counter
Expand Down
Loading

0 comments on commit 562db4b

Please sign in to comment.