Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add histogram of report shares by VDAF parameters #3166

Merged
merged 1 commit into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ use crate::{
AggregationJobWriter, AggregationJobWriterMetrics, InitialWrite,
ReportAggregationUpdate as _, WritableReportAggregation,
},
error::{handle_ping_pong_error, ReportRejection, ReportRejectionReason},
error::{BatchMismatch, OptOutReason},
error::{
handle_ping_pong_error, BatchMismatch, OptOutReason, ReportRejection,
ReportRejectionReason,
},
query_type::{CollectableQueryType, UploadableQueryType},
report_writer::{ReportWriteBatcher, WritableReport},
},
Expand All @@ -19,7 +21,10 @@ use crate::{
},
config::TaskprovConfig,
diagnostic::AggregationJobInitForbiddenMutationEvent,
metrics::{aggregate_step_failure_counter, report_aggregation_success_counter},
metrics::{
aggregate_step_failure_counter, aggregated_report_share_dimension_histogram,
report_aggregation_success_counter,
},
};
use backoff::{backoff::Backoff, Notify};
use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine};
Expand Down Expand Up @@ -163,13 +168,18 @@ struct AggregatorMetrics {
/// Counters tracking the number of failures to step client reports through the aggregation
/// process.
aggregate_step_failure_counter: Counter<u64>,
/// Histogram tracking the VDAF type and dimension of successfully-aggregated reports.
aggregated_report_share_dimension_histogram: Histogram<u64>,
}

impl AggregatorMetrics {
fn for_aggregation_job_writer(&self) -> AggregationJobWriterMetrics {
AggregationJobWriterMetrics {
report_aggregation_success_counter: self.report_aggregation_success_counter.clone(),
aggregate_step_failure_counter: self.aggregate_step_failure_counter.clone(),
aggregated_report_share_dimension_histogram: self
.aggregated_report_share_dimension_histogram
.clone(),
}
}
}
Expand Down Expand Up @@ -283,6 +293,8 @@ impl<C: Clock> Aggregator<C> {

let report_aggregation_success_counter = report_aggregation_success_counter(meter);
let aggregate_step_failure_counter = aggregate_step_failure_counter(meter);
let aggregated_report_share_dimension_histogram =
aggregated_report_share_dimension_histogram(meter);

let global_hpke_keypairs = GlobalHpkeKeypairCache::new(
datastore.clone(),
Expand All @@ -302,6 +314,7 @@ impl<C: Clock> Aggregator<C> {
upload_decode_failure_counter,
report_aggregation_success_counter,
aggregate_step_failure_counter,
aggregated_report_share_dimension_histogram,
},
global_hpke_keypairs,
peer_aggregators,
Expand Down
28 changes: 20 additions & 8 deletions aggregator/src/aggregator/aggregation_job_driver.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use crate::aggregator::{
aggregate_step_failure_counter,
aggregation_job_writer::{
AggregationJobWriter, AggregationJobWriterMetrics, UpdateWrite, WritableReportAggregation,
use crate::{
aggregator::{
aggregate_step_failure_counter,
aggregation_job_writer::{
AggregationJobWriter, AggregationJobWriterMetrics, UpdateWrite,
WritableReportAggregation,
},
error::handle_ping_pong_error,
http_handlers::AGGREGATION_JOB_ROUTE,
query_type::CollectableQueryType,
report_aggregation_success_counter, send_request_to_helper, Error, RequestBody,
},
error::handle_ping_pong_error,
http_handlers::AGGREGATION_JOB_ROUTE,
query_type::CollectableQueryType,
report_aggregation_success_counter, send_request_to_helper, Error, RequestBody,
metrics::aggregated_report_share_dimension_histogram,
};
use anyhow::{anyhow, Result};
use backoff::backoff::Backoff;
Expand Down Expand Up @@ -64,6 +68,8 @@ pub struct AggregationJobDriver<B> {
#[derivative(Debug = "ignore")]
aggregate_step_failure_counter: Counter<u64>,
#[derivative(Debug = "ignore")]
aggregated_report_share_dimension_histogram: Histogram<u64>,
#[derivative(Debug = "ignore")]
job_cancel_counter: Counter<u64>,
#[derivative(Debug = "ignore")]
job_retry_counter: Counter<u64>,
Expand All @@ -83,6 +89,8 @@ where
) -> Self {
let aggregation_success_counter = report_aggregation_success_counter(meter);
let aggregate_step_failure_counter = aggregate_step_failure_counter(meter);
let aggregated_report_share_dimension_histogram =
aggregated_report_share_dimension_histogram(meter);

let job_cancel_counter = meter
.u64_counter("janus_job_cancellations")
Expand Down Expand Up @@ -112,6 +120,7 @@ where
backoff,
aggregation_success_counter,
aggregate_step_failure_counter,
aggregated_report_share_dimension_histogram,
job_cancel_counter,
job_retry_counter,
http_request_duration_histogram,
Expand Down Expand Up @@ -914,6 +923,9 @@ where
Some(AggregationJobWriterMetrics {
report_aggregation_success_counter: self.aggregation_success_counter.clone(),
aggregate_step_failure_counter: self.aggregate_step_failure_counter.clone(),
aggregated_report_share_dimension_histogram: self
.aggregated_report_share_dimension_histogram
.clone(),
}),
);
let new_step = aggregation_job.step().increment();
Expand Down
126 changes: 123 additions & 3 deletions aggregator/src/aggregator/aggregation_job_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,21 @@ use janus_aggregator_core::{
query_type::AccumulableQueryType,
task::AggregatorTask,
};
#[cfg(feature = "fpvec_bounded_l2")]
use janus_core::vdaf::Prio3FixedPointBoundedL2VecSumBitSize;
use janus_core::{
report_id::ReportIdChecksumExt as _,
time::{Clock, IntervalExt},
vdaf::VdafInstance,
};
use janus_messages::{
AggregationJobId, Interval, PrepareError, PrepareResp, PrepareStepResult, ReportId,
ReportIdChecksum, Time,
};
use opentelemetry::{metrics::Counter, KeyValue};
use opentelemetry::{
metrics::{Counter, Histogram},
KeyValue,
};
use prio::{codec::Encode, vdaf};
use rand::{thread_rng, Rng as _};
use std::{borrow::Cow, collections::HashMap, marker::PhantomData, sync::Arc};
Expand Down Expand Up @@ -52,6 +58,7 @@ where
pub struct AggregationJobWriterMetrics {
pub report_aggregation_success_counter: Counter<u64>,
pub aggregate_step_failure_counter: Counter<u64>,
pub aggregated_report_share_dimension_histogram: Histogram<u64>,
}

#[allow(private_bounds)]
Expand Down Expand Up @@ -673,12 +680,125 @@ where
match ra_batch_aggregation.merged_with(batch_aggregation) {
Ok(merged_batch_aggregation) => {
self.writer.update_metrics(|metrics| {
metrics.report_aggregation_success_counter.add(1, &[])
metrics.report_aggregation_success_counter.add(1, &[]);

use VdafInstance::*;
match self.writer.task.vdaf() {
Prio3Count => metrics
.aggregated_report_share_dimension_histogram
.record(1, &[KeyValue::new("type", "Prio3Count")]),

Prio3Sum { bits } => {
metrics.aggregated_report_share_dimension_histogram.record(
u64::try_from(*bits).unwrap_or(u64::MAX),
&[KeyValue::new("type", "Prio3Sum")],
)
}

Prio3SumVec {
bits,
length,
chunk_length: _,
} => {
metrics.aggregated_report_share_dimension_histogram.record(
u64::try_from(*bits)
.unwrap_or(u64::MAX)
.saturating_mul(
u64::try_from(*length).unwrap_or(u64::MAX),
),
&[KeyValue::new("type", "Prio3SumVec")],
)
}

Prio3SumVecField64MultiproofHmacSha256Aes128 {
proofs: _,
bits,
length,
chunk_length: _,
} => {
metrics.aggregated_report_share_dimension_histogram.record(
u64::try_from(*bits)
.unwrap_or(u64::MAX)
.saturating_mul(
u64::try_from(*length).unwrap_or(u64::MAX),
),
&[KeyValue::new(
"type",
"Prio3SumVecField64MultiproofHmacSha256Aes128",
)],
)
}

Prio3Histogram {
length,
chunk_length: _,
} => {
metrics.aggregated_report_share_dimension_histogram.record(
u64::try_from(*length).unwrap_or(u64::MAX),
&[KeyValue::new("type", "Prio3Histogram")],
)
}

#[cfg(feature = "fpvec_bounded_l2")]
Prio3FixedPointBoundedL2VecSum {
bitsize: Prio3FixedPointBoundedL2VecSumBitSize::BitSize16,
dp_strategy: _,
length,
} => {
metrics.aggregated_report_share_dimension_histogram.record(
u64::try_from(*length)
.unwrap_or(u64::MAX)
.saturating_mul(16),
&[KeyValue::new(
"type",
"Prio3FixedPointBoundedL2VecSum",
)],
)
}

#[cfg(feature = "fpvec_bounded_l2")]
Prio3FixedPointBoundedL2VecSum {
bitsize: Prio3FixedPointBoundedL2VecSumBitSize::BitSize32,
dp_strategy: _,
length,
} => {
metrics.aggregated_report_share_dimension_histogram.record(
u64::try_from(*length)
.unwrap_or(u64::MAX)
.saturating_mul(32),
&[KeyValue::new(
"type",
"Prio3FixedPointBoundedL2VecSum",
)],
)
}

Poplar1 { bits } => {
metrics.aggregated_report_share_dimension_histogram.record(
u64::try_from(*bits).unwrap_or(u64::MAX),
&[KeyValue::new("type", "Poplar1")],
)
}

#[cfg(feature = "test-util")]
Fake { rounds: _ } | FakeFailsPrepInit | FakeFailsPrepStep => {
metrics
.aggregated_report_share_dimension_histogram
.record(0, &[KeyValue::new("type", "Fake")])
}
_ => metrics
.aggregated_report_share_dimension_histogram
.record(0, &[KeyValue::new("type", "unknown")]),
}
});
*batch_aggregation = merged_batch_aggregation
}
Err(err) => {
warn!(report_id = %report_aggregation.report_id(), ?err, "Couldn't update batch aggregation");
warn!(
report_id = %report_aggregation.report_id(),
?err,
"Couldn't update batch aggregation",
);
self.writer.update_metrics(|metrics| {
metrics
.aggregate_step_failure_counter
Expand Down
Loading
Loading