Skip to content

Commit

Permalink
Add per-task report upload metrics (#2508)
Browse files Browse the repository at this point in the history
* Add per task report upload metrics.

* Change default to 32, add documentation on option

* Fix test

* Build query instead of brute forcing each possible one

* Don't wait on bad reports

* Use Runtime and RuntimeManager instead of sleeping in tests

* Clippy

* Cargo doc

* Don't use macro needlessly

Co-authored-by: Brandon Pitman <[email protected]>

---------

Co-authored-by: Brandon Pitman <[email protected]>
  • Loading branch information
inahga and branlwyd authored Jan 24, 2024
1 parent bdbe7ac commit 57b3f90
Show file tree
Hide file tree
Showing 22 changed files with 1,107 additions and 226 deletions.
552 changes: 489 additions & 63 deletions aggregator/src/aggregator.rs

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion aggregator/src/aggregator/aggregate_init_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ use janus_aggregator_core::{
};
use janus_core::{
auth_tokens::{AuthenticationToken, DAP_AUTH_HEADER},
test_util::{dummy_vdaf, install_test_trace_subscriber, run_vdaf, VdafTranscript},
test_util::{
dummy_vdaf, install_test_trace_subscriber, run_vdaf, runtime::TestRuntime, VdafTranscript,
},
time::{Clock, MockClock, TimeExt as _},
vdaf::VdafInstance,
};
Expand Down Expand Up @@ -259,6 +261,7 @@ async fn setup_aggregate_init_test_without_sending_request<
let handler = aggregator_handler(
Arc::clone(&datastore),
clock.clone(),
TestRuntime::default(),
&noop_meter(),
Config::default(),
)
Expand Down
3 changes: 2 additions & 1 deletion aggregator/src/aggregator/aggregation_job_continue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ mod tests {
test_util::noop_meter,
};
use janus_core::{
test_util::install_test_trace_subscriber,
test_util::{install_test_trace_subscriber, runtime::TestRuntime},
time::{IntervalExt, MockClock},
vdaf::{VdafInstance, VERIFY_KEY_LENGTH},
};
Expand Down Expand Up @@ -530,6 +530,7 @@ mod tests {
let handler = aggregator_handler(
Arc::clone(&datastore),
clock,
TestRuntime::default(),
&meter,
default_aggregator_config(),
)
Expand Down
2 changes: 2 additions & 0 deletions aggregator/src/aggregator/collection_job_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use janus_core::{
test_util::{
dummy_vdaf::{self, AggregationParam},
install_test_trace_subscriber,
runtime::TestRuntime,
},
time::{Clock, IntervalExt, MockClock},
vdaf::VdafInstance,
Expand Down Expand Up @@ -143,6 +144,7 @@ pub(crate) async fn setup_collection_job_test_case(
let handler = aggregator_handler(
Arc::clone(&datastore),
clock.clone(),
TestRuntime::default(),
&noop_meter(),
Config {
batch_aggregation_shard_count: 32,
Expand Down
135 changes: 102 additions & 33 deletions aggregator/src/aggregator/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use janus_aggregator_core::{datastore, task};
use janus_aggregator_core::{
datastore::{self, models::TaskUploadIncrementor},
task,
};
use janus_core::http::HttpErrorResponse;
use janus_messages::{
AggregationJobId, AggregationJobStep, CollectionJobId, HpkeConfigId, Interval, PrepareError,
Expand Down Expand Up @@ -32,14 +35,9 @@ pub enum Error {
/// Error handling a message.
#[error("invalid message: {0}")]
Message(#[from] janus_messages::Error),
/// Corresponds to `reportRejected` in DAP. A report was rejected for some reason that is not
/// specified in DAP.
#[error("task {0}: report {1} rejected: {2}")]
ReportRejected(TaskId, ReportId, Time, ReportRejectedReason),
/// Corresponds to `reportTooEarly` in DAP. A report was rejected because the timestamp is too
/// far in the future.
#[error("task {0}: report {1} too early: {2}")]
ReportTooEarly(TaskId, ReportId, Time),
/// Catch-all error for invalid reports.
#[error("{0}")]
ReportRejected(ReportRejection),
/// Corresponds to `invalidMessage` in DAP.
#[error("task {0:?}: invalid message: {1}")]
InvalidMessage(Option<TaskId>, &'static str),
Expand Down Expand Up @@ -72,9 +70,6 @@ pub enum Error {
/// An attempt was made to act on a collection job that has been abandoned by the aggregator.
#[error("abandoned collection job: {0}")]
AbandonedCollectionJob(CollectionJobId),
/// Corresponds to `outdatedHpkeConfig` in DAP.
#[error("task {0}: outdated HPKE config: {1}")]
OutdatedHpkeConfig(TaskId, HpkeConfigId),
/// Corresponds to `unauthorizedRequest` in DAP.
#[error("task {0}: unauthorized request")]
UnauthorizedRequest(TaskId),
Expand Down Expand Up @@ -148,37 +143,109 @@ pub enum Error {
DifferentialPrivacy(VdafError),
}

#[derive(Debug)]
pub enum ReportRejectedReason {
IntervalAlreadyCollected,
LeaderDecryptFailure,
LeaderInputShareDecodeFailure,
PublicShareDecodeFailure,
/// Contains details that describe the report and why it was rejected.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct ReportRejection {
task_id: TaskId,
report_id: ReportId,
time: Time,
reason: ReportRejectionReason,
}

impl ReportRejection {
pub fn new(
task_id: TaskId,
report_id: ReportId,
time: Time,
reason: ReportRejectionReason,
) -> Self {
Self {
task_id,
report_id,
time,
reason,
}
}

pub fn task_id(&self) -> &TaskId {
&self.task_id
}

pub fn report_id(&self) -> &ReportId {
&self.report_id
}

pub fn time(&self) -> &Time {
&self.time
}

pub fn reason(&self) -> &ReportRejectionReason {
&self.reason
}
}

impl Display for ReportRejection {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(
f,
"task {}, report {}, time {}, rejected {}",
self.task_id, self.report_id, self.time, self.reason
)
}
}

/// Indicates why a report was rejected.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ReportRejectionReason {
IntervalCollected,
DecryptFailure,
DecodeFailure,
TaskExpired,
TooOld,
Expired,
TooEarly,
OutdatedHpkeConfig(HpkeConfigId),
}

impl ReportRejectedReason {
impl ReportRejectionReason {
pub fn detail(&self) -> &'static str {
match self {
ReportRejectedReason::IntervalAlreadyCollected => {
ReportRejectionReason::IntervalCollected => {
"Report falls into a time interval that has already been collected."
}
ReportRejectedReason::LeaderDecryptFailure => {
"Leader's report share could not be decrypted."
}
ReportRejectedReason::LeaderInputShareDecodeFailure => {
"Leader's input share could not be decoded."
ReportRejectionReason::DecryptFailure => "Report share could not be decrypted.",
ReportRejectionReason::DecodeFailure => "Report could not be decoded.",
ReportRejectionReason::TaskExpired => "Task has expired.",
ReportRejectionReason::Expired => "Report timestamp is too old.",
ReportRejectionReason::TooEarly => "Report timestamp is too far in the future.",
ReportRejectionReason::OutdatedHpkeConfig(_) => {
"Report is using an outdated HPKE configuration."
}
ReportRejectedReason::PublicShareDecodeFailure => {
"Report public share could not be decoded."
}
}
}

impl From<&ReportRejectionReason> for TaskUploadIncrementor {
fn from(value: &ReportRejectionReason) -> Self {
match value {
ReportRejectionReason::IntervalCollected => TaskUploadIncrementor::IntervalCollected,
ReportRejectionReason::DecryptFailure => TaskUploadIncrementor::ReportDecryptFailure,
ReportRejectionReason::DecodeFailure => TaskUploadIncrementor::ReportDecodeFailure,
ReportRejectionReason::TaskExpired => TaskUploadIncrementor::TaskExpired,
ReportRejectionReason::Expired => TaskUploadIncrementor::ReportExpired,
ReportRejectionReason::TooEarly => TaskUploadIncrementor::ReportTooEarly,
ReportRejectionReason::OutdatedHpkeConfig(_) => {
TaskUploadIncrementor::ReportOutdatedKey
}
ReportRejectedReason::TaskExpired => "Task has expired.",
ReportRejectedReason::TooOld => "Report timestamp is too old.",
}
}
}

impl Display for ReportRejectionReason {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self)
}
}

/// Errors that cause the aggregator to opt-out of a taskprov task.
#[derive(Debug, thiserror::Error)]
pub enum OptOutReason {
Expand All @@ -203,8 +270,11 @@ impl Error {
Error::MessageDecode(_) => "message_decode",
Error::ResponseEncode(_) => "response_encode",
Error::Message(_) => "message",
Error::ReportRejected(_, _, _, _) => "report_rejected",
Error::ReportTooEarly(_, _, _) => "report_too_early",
Error::ReportRejected(rejection) => match rejection.reason {
ReportRejectionReason::TooEarly => "report_too_early",
ReportRejectionReason::OutdatedHpkeConfig(_) => "outdated_hpke_config",
_ => "report_rejected",
},
Error::InvalidMessage(_, _) => "unrecognized_message",
Error::StepMismatch { .. } => "step_mismatch",
Error::UnrecognizedTask(_) => "unrecognized_task",
Expand All @@ -213,7 +283,6 @@ impl Error {
Error::DeletedCollectionJob(_) => "deleted_collection_job",
Error::AbandonedCollectionJob(_) => "abandoned_collection_job",
Error::UnrecognizedCollectionJob(_) => "unrecognized_collection_job",
Error::OutdatedHpkeConfig(_, _) => "outdated_hpke_config",
Error::UnauthorizedRequest(_) => "unauthorized_request",
Error::Datastore(_) => "datastore",
Error::Vdaf(_) => "vdaf",
Expand Down
Loading

0 comments on commit 57b3f90

Please sign in to comment.