Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add per-task report upload metrics #2508

Merged
merged 9 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
552 changes: 489 additions & 63 deletions aggregator/src/aggregator.rs

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion aggregator/src/aggregator/aggregate_init_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ use janus_aggregator_core::{
};
use janus_core::{
auth_tokens::{AuthenticationToken, DAP_AUTH_HEADER},
test_util::{dummy_vdaf, install_test_trace_subscriber, run_vdaf, VdafTranscript},
test_util::{
dummy_vdaf, install_test_trace_subscriber, run_vdaf, runtime::TestRuntime, VdafTranscript,
},
time::{Clock, MockClock, TimeExt as _},
vdaf::VdafInstance,
};
Expand Down Expand Up @@ -259,6 +261,7 @@ async fn setup_aggregate_init_test_without_sending_request<
let handler = aggregator_handler(
Arc::clone(&datastore),
clock.clone(),
TestRuntime::default(),
&noop_meter(),
Config::default(),
)
Expand Down
3 changes: 2 additions & 1 deletion aggregator/src/aggregator/aggregation_job_continue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ mod tests {
test_util::noop_meter,
};
use janus_core::{
test_util::install_test_trace_subscriber,
test_util::{install_test_trace_subscriber, runtime::TestRuntime},
time::{IntervalExt, MockClock},
vdaf::{VdafInstance, VERIFY_KEY_LENGTH},
};
Expand Down Expand Up @@ -530,6 +530,7 @@ mod tests {
let handler = aggregator_handler(
Arc::clone(&datastore),
clock,
TestRuntime::default(),
&meter,
default_aggregator_config(),
)
Expand Down
2 changes: 2 additions & 0 deletions aggregator/src/aggregator/collection_job_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use janus_core::{
test_util::{
dummy_vdaf::{self, AggregationParam},
install_test_trace_subscriber,
runtime::TestRuntime,
},
time::{Clock, IntervalExt, MockClock},
vdaf::VdafInstance,
Expand Down Expand Up @@ -143,6 +144,7 @@ pub(crate) async fn setup_collection_job_test_case(
let handler = aggregator_handler(
Arc::clone(&datastore),
clock.clone(),
TestRuntime::default(),
&noop_meter(),
Config {
batch_aggregation_shard_count: 32,
Expand Down
135 changes: 102 additions & 33 deletions aggregator/src/aggregator/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use janus_aggregator_core::{datastore, task};
use janus_aggregator_core::{
datastore::{self, models::TaskUploadIncrementor},
task,
};
use janus_core::http::HttpErrorResponse;
use janus_messages::{
AggregationJobId, AggregationJobStep, CollectionJobId, HpkeConfigId, Interval, PrepareError,
Expand Down Expand Up @@ -32,14 +35,9 @@ pub enum Error {
/// Error handling a message.
#[error("invalid message: {0}")]
Message(#[from] janus_messages::Error),
/// Corresponds to `reportRejected` in DAP. A report was rejected for some reason that is not
/// specified in DAP.
#[error("task {0}: report {1} rejected: {2}")]
ReportRejected(TaskId, ReportId, Time, ReportRejectedReason),
/// Corresponds to `reportTooEarly` in DAP. A report was rejected because the timestamp is too
/// far in the future.
#[error("task {0}: report {1} too early: {2}")]
ReportTooEarly(TaskId, ReportId, Time),
/// Catch-all error for invalid reports.
#[error("{0}")]
ReportRejected(ReportRejection),
/// Corresponds to `invalidMessage` in DAP.
#[error("task {0:?}: invalid message: {1}")]
InvalidMessage(Option<TaskId>, &'static str),
Expand Down Expand Up @@ -72,9 +70,6 @@ pub enum Error {
/// An attempt was made to act on a collection job that has been abandoned by the aggregator.
#[error("abandoned collection job: {0}")]
AbandonedCollectionJob(CollectionJobId),
/// Corresponds to `outdatedHpkeConfig` in DAP.
#[error("task {0}: outdated HPKE config: {1}")]
OutdatedHpkeConfig(TaskId, HpkeConfigId),
/// Corresponds to `unauthorizedRequest` in DAP.
#[error("task {0}: unauthorized request")]
UnauthorizedRequest(TaskId),
Expand Down Expand Up @@ -148,37 +143,109 @@ pub enum Error {
DifferentialPrivacy(VdafError),
}

#[derive(Debug)]
pub enum ReportRejectedReason {
IntervalAlreadyCollected,
LeaderDecryptFailure,
LeaderInputShareDecodeFailure,
PublicShareDecodeFailure,
/// Contains details that describe the report and why it was rejected.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct ReportRejection {
task_id: TaskId,
report_id: ReportId,
time: Time,
reason: ReportRejectionReason,
}

impl ReportRejection {
pub fn new(
task_id: TaskId,
report_id: ReportId,
time: Time,
reason: ReportRejectionReason,
) -> Self {
Self {
task_id,
report_id,
time,
reason,
}
}

pub fn task_id(&self) -> &TaskId {
&self.task_id
}

pub fn report_id(&self) -> &ReportId {
&self.report_id
}

pub fn time(&self) -> &Time {
&self.time
}

pub fn reason(&self) -> &ReportRejectionReason {
&self.reason
}
}

impl Display for ReportRejection {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(
f,
"task {}, report {}, time {}, rejected {}",
self.task_id, self.report_id, self.time, self.reason
)
}
}

/// Indicates why a report was rejected.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ReportRejectionReason {
IntervalCollected,
DecryptFailure,
DecodeFailure,
TaskExpired,
TooOld,
Expired,
TooEarly,
OutdatedHpkeConfig(HpkeConfigId),
}

impl ReportRejectedReason {
impl ReportRejectionReason {
pub fn detail(&self) -> &'static str {
match self {
ReportRejectedReason::IntervalAlreadyCollected => {
ReportRejectionReason::IntervalCollected => {
"Report falls into a time interval that has already been collected."
}
ReportRejectedReason::LeaderDecryptFailure => {
"Leader's report share could not be decrypted."
}
ReportRejectedReason::LeaderInputShareDecodeFailure => {
"Leader's input share could not be decoded."
ReportRejectionReason::DecryptFailure => "Report share could not be decrypted.",
ReportRejectionReason::DecodeFailure => "Report could not be decoded.",
ReportRejectionReason::TaskExpired => "Task has expired.",
ReportRejectionReason::Expired => "Report timestamp is too old.",
ReportRejectionReason::TooEarly => "Report timestamp is too far in the future.",
ReportRejectionReason::OutdatedHpkeConfig(_) => {
"Report is using an outdated HPKE configuration."
}
ReportRejectedReason::PublicShareDecodeFailure => {
"Report public share could not be decoded."
}
}
}

impl From<&ReportRejectionReason> for TaskUploadIncrementor {
fn from(value: &ReportRejectionReason) -> Self {
match value {
ReportRejectionReason::IntervalCollected => TaskUploadIncrementor::IntervalCollected,
ReportRejectionReason::DecryptFailure => TaskUploadIncrementor::ReportDecryptFailure,
ReportRejectionReason::DecodeFailure => TaskUploadIncrementor::ReportDecodeFailure,
ReportRejectionReason::TaskExpired => TaskUploadIncrementor::TaskExpired,
ReportRejectionReason::Expired => TaskUploadIncrementor::ReportExpired,
ReportRejectionReason::TooEarly => TaskUploadIncrementor::ReportTooEarly,
ReportRejectionReason::OutdatedHpkeConfig(_) => {
TaskUploadIncrementor::ReportOutdatedKey
}
ReportRejectedReason::TaskExpired => "Task has expired.",
ReportRejectedReason::TooOld => "Report timestamp is too old.",
}
}
}

impl Display for ReportRejectionReason {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self)
}
}

/// Errors that cause the aggregator to opt-out of a taskprov task.
#[derive(Debug, thiserror::Error)]
pub enum OptOutReason {
Expand All @@ -203,8 +270,11 @@ impl Error {
Error::MessageDecode(_) => "message_decode",
Error::ResponseEncode(_) => "response_encode",
Error::Message(_) => "message",
Error::ReportRejected(_, _, _, _) => "report_rejected",
Error::ReportTooEarly(_, _, _) => "report_too_early",
Error::ReportRejected(rejection) => match rejection.reason {
ReportRejectionReason::TooEarly => "report_too_early",
ReportRejectionReason::OutdatedHpkeConfig(_) => "outdated_hpke_config",
_ => "report_rejected",
},
Error::InvalidMessage(_, _) => "unrecognized_message",
Error::StepMismatch { .. } => "step_mismatch",
Error::UnrecognizedTask(_) => "unrecognized_task",
Expand All @@ -213,7 +283,6 @@ impl Error {
Error::DeletedCollectionJob(_) => "deleted_collection_job",
Error::AbandonedCollectionJob(_) => "abandoned_collection_job",
Error::UnrecognizedCollectionJob(_) => "unrecognized_collection_job",
Error::OutdatedHpkeConfig(_, _) => "outdated_hpke_config",
Error::UnauthorizedRequest(_) => "unauthorized_request",
Error::Datastore(_) => "datastore",
Error::Vdaf(_) => "vdaf",
Expand Down
Loading
Loading