Skip to content

Commit

Permalink
Add per task report upload metrics.
Browse files Browse the repository at this point in the history
  • Loading branch information
inahga committed Jan 22, 2024
1 parent 11bb8f6 commit aa2c23e
Show file tree
Hide file tree
Showing 14 changed files with 932 additions and 186 deletions.
416 changes: 370 additions & 46 deletions aggregator/src/aggregator.rs

Large diffs are not rendered by default.

135 changes: 102 additions & 33 deletions aggregator/src/aggregator/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use janus_aggregator_core::{datastore, task};
use janus_aggregator_core::{
datastore::{self, models::TaskUploadIncrementor},
task,
};
use janus_core::http::HttpErrorResponse;
use janus_messages::{
AggregationJobId, AggregationJobStep, CollectionJobId, HpkeConfigId, Interval, PrepareError,
Expand Down Expand Up @@ -29,14 +32,9 @@ pub enum Error {
/// Error handling a message.
#[error("invalid message: {0}")]
Message(#[from] janus_messages::Error),
/// Corresponds to `reportRejected` in DAP. A report was rejected for some reason that is not
/// specified in DAP.
#[error("task {0}: report {1} rejected: {2}")]
ReportRejected(TaskId, ReportId, Time, ReportRejectedReason),
/// Corresponds to `reportTooEarly` in DAP. A report was rejected because the timestamp is too
/// far in the future.
#[error("task {0}: report {1} too early: {2}")]
ReportTooEarly(TaskId, ReportId, Time),
/// Catch-all error for invalid reports.
#[error("{0}")]
ReportRejected(ReportRejection),
/// Corresponds to `invalidMessage` in DAP.
#[error("task {0:?}: invalid message: {1}")]
InvalidMessage(Option<TaskId>, &'static str),
Expand Down Expand Up @@ -69,9 +67,6 @@ pub enum Error {
/// An attempt was made to act on a collection job that has been abandoned by the aggregator.
#[error("abandoned collection job: {0}")]
AbandonedCollectionJob(CollectionJobId),
/// Corresponds to `outdatedHpkeConfig` in DAP.
#[error("task {0}: outdated HPKE config: {1}")]
OutdatedHpkeConfig(TaskId, HpkeConfigId),
/// Corresponds to `unauthorizedRequest` in DAP.
#[error("task {0}: unauthorized request")]
UnauthorizedRequest(TaskId),
Expand Down Expand Up @@ -145,37 +140,109 @@ pub enum Error {
DifferentialPrivacy(VdafError),
}

#[derive(Debug)]
pub enum ReportRejectedReason {
IntervalAlreadyCollected,
LeaderDecryptFailure,
LeaderInputShareDecodeFailure,
PublicShareDecodeFailure,
/// Contains details that describe the report and why it was rejected.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct ReportRejection {
task_id: TaskId,
report_id: ReportId,
time: Time,
reason: ReportRejectionReason,
}

impl ReportRejection {
pub fn new(
task_id: TaskId,
report_id: ReportId,
time: Time,
reason: ReportRejectionReason,
) -> Self {
Self {
task_id,
report_id,
time,
reason,
}
}

pub fn task_id(&self) -> &TaskId {
&self.task_id
}

pub fn report_id(&self) -> &ReportId {
&self.report_id
}

pub fn time(&self) -> &Time {
&self.time
}

pub fn reason(&self) -> &ReportRejectionReason {
&self.reason
}
}

impl Display for ReportRejection {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(
f,
"task {}, report {}, time {}, rejected {}",
self.task_id, self.report_id, self.time, self.reason
)
}
}

/// Indicates why a report was rejected.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ReportRejectionReason {
IntervalCollected,
DecryptFailure,
DecodeFailure,
TaskExpired,
TooOld,
Expired,
TooEarly,
OutdatedHpkeConfig(HpkeConfigId),
}

impl ReportRejectedReason {
impl ReportRejectionReason {
pub fn detail(&self) -> &'static str {
match self {
ReportRejectedReason::IntervalAlreadyCollected => {
ReportRejectionReason::IntervalCollected => {
"Report falls into a time interval that has already been collected."
}
ReportRejectedReason::LeaderDecryptFailure => {
"Leader's report share could not be decrypted."
}
ReportRejectedReason::LeaderInputShareDecodeFailure => {
"Leader's input share could not be decoded."
ReportRejectionReason::DecryptFailure => "Report share could not be decrypted.",
ReportRejectionReason::DecodeFailure => "Report could not be decoded.",
ReportRejectionReason::TaskExpired => "Task has expired.",
ReportRejectionReason::Expired => "Report timestamp is too old.",
ReportRejectionReason::TooEarly => "Report timestamp is too far in the future.",
ReportRejectionReason::OutdatedHpkeConfig(_) => {
"Report is using an outdated HPKE configuration."
}
ReportRejectedReason::PublicShareDecodeFailure => {
"Report public share could not be decoded."
}
}
}

impl From<&ReportRejectionReason> for TaskUploadIncrementor {
fn from(value: &ReportRejectionReason) -> Self {
match value {
ReportRejectionReason::IntervalCollected => TaskUploadIncrementor::IntervalCollected,
ReportRejectionReason::DecryptFailure => TaskUploadIncrementor::ReportDecryptFailure,
ReportRejectionReason::DecodeFailure => TaskUploadIncrementor::ReportDecodeFailure,
ReportRejectionReason::TaskExpired => TaskUploadIncrementor::TaskExpired,
ReportRejectionReason::Expired => TaskUploadIncrementor::ReportExpired,
ReportRejectionReason::TooEarly => TaskUploadIncrementor::ReportTooEarly,
ReportRejectionReason::OutdatedHpkeConfig(_) => {
TaskUploadIncrementor::ReportOutdatedKey
}
ReportRejectedReason::TaskExpired => "Task has expired.",
ReportRejectedReason::TooOld => "Report timestamp is too old.",
}
}
}

impl Display for ReportRejectionReason {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self)
}
}

/// Errors that cause the aggregator to opt-out of a taskprov task.
#[derive(Debug, thiserror::Error)]
pub enum OptOutReason {
Expand All @@ -199,8 +266,11 @@ impl Error {
Error::InvalidConfiguration(_) => "invalid_configuration",
Error::MessageDecode(_) => "message_decode",
Error::Message(_) => "message",
Error::ReportRejected(_, _, _, _) => "report_rejected",
Error::ReportTooEarly(_, _, _) => "report_too_early",
Error::ReportRejected(rejection) => match rejection.reason {
ReportRejectionReason::TooEarly => "report_too_early",
ReportRejectionReason::OutdatedHpkeConfig(_) => "outdated_hpke_config",
_ => "report_rejected",
},
Error::InvalidMessage(_, _) => "unrecognized_message",
Error::StepMismatch { .. } => "step_mismatch",
Error::UnrecognizedTask(_) => "unrecognized_task",
Expand All @@ -209,7 +279,6 @@ impl Error {
Error::DeletedCollectionJob(_) => "deleted_collection_job",
Error::AbandonedCollectionJob(_) => "abandoned_collection_job",
Error::UnrecognizedCollectionJob(_) => "unrecognized_collection_job",
Error::OutdatedHpkeConfig(_, _) => "outdated_hpke_config",
Error::UnauthorizedRequest(_) => "unauthorized_request",
Error::Datastore(_) => "datastore",
Error::Vdaf(_) => "vdaf",
Expand Down
40 changes: 22 additions & 18 deletions aggregator/src/aggregator/http_handlers.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{Aggregator, Config, Error};
use super::{error::ReportRejectionReason, Aggregator, Config, Error};
use crate::aggregator::problem_details::{ProblemDetailsConnExt, ProblemDocument};
use async_trait::async_trait;
use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine};
Expand Down Expand Up @@ -46,11 +46,21 @@ impl Handler for Error {
Error::InvalidConfiguration(_) => conn.with_status(Status::InternalServerError),
Error::MessageDecode(_) => conn
.with_problem_document(&ProblemDocument::new_dap(DapProblemType::InvalidMessage)),
Error::ReportRejected(task_id, _, _, reason) => conn.with_problem_document(
&ProblemDocument::new_dap(DapProblemType::ReportRejected)
.with_task_id(task_id)
.with_detail(reason.detail()),
),
Error::ReportRejected(rejection) => match rejection.reason() {
ReportRejectionReason::OutdatedHpkeConfig(_) => conn.with_problem_document(
&ProblemDocument::new_dap(DapProblemType::OutdatedConfig)
.with_task_id(rejection.task_id()),
),
ReportRejectionReason::TooEarly => conn.with_problem_document(
&ProblemDocument::new_dap(DapProblemType::ReportTooEarly)
.with_task_id(rejection.task_id()),
),
_ => conn.with_problem_document(
&ProblemDocument::new_dap(DapProblemType::ReportRejected)
.with_task_id(rejection.task_id())
.with_detail(rejection.reason().detail()),
),
},
Error::InvalidMessage(task_id, _) => {
let mut doc = ProblemDocument::new_dap(DapProblemType::InvalidMessage);
if let Some(task_id) = task_id {
Expand Down Expand Up @@ -86,12 +96,6 @@ impl Handler for Error {
.with_collection_job_id(collection_job_id),
),
Error::UnrecognizedCollectionJob(_) => conn.with_status(Status::NotFound),
Error::OutdatedHpkeConfig(task_id, _) => conn.with_problem_document(
&ProblemDocument::new_dap(DapProblemType::OutdatedConfig).with_task_id(task_id),
),
Error::ReportTooEarly(task_id, _, _) => conn.with_problem_document(
&ProblemDocument::new_dap(DapProblemType::ReportTooEarly).with_task_id(task_id),
),
Error::UnauthorizedRequest(task_id) => conn.with_problem_document(
&ProblemDocument::new_dap(DapProblemType::UnauthorizedRequest)
.with_task_id(task_id),
Expand Down Expand Up @@ -682,7 +686,7 @@ mod tests {
},
collection_job_tests::setup_collection_job_test_case,
empty_batch_aggregations,
error::ReportRejectedReason,
error::ReportRejectionReason,
http_handlers::{
aggregator_handler, aggregator_handler_with_aggregator,
test_util::{decode_response_body, take_problem_details},
Expand Down Expand Up @@ -1196,7 +1200,7 @@ mod tests {
"reportRejected",
"Report could not be processed.",
task.id(),
Some(ReportRejectedReason::TooOld.detail()),
Some(ReportRejectionReason::Expired.detail()),
)
.await;

Expand Down Expand Up @@ -1286,7 +1290,7 @@ mod tests {
"reportRejected",
"Report could not be processed.",
task_expire_soon.id(),
Some(ReportRejectedReason::TaskExpired.detail()),
Some(ReportRejectionReason::TaskExpired.detail()),
)
.await;

Expand Down Expand Up @@ -1314,7 +1318,7 @@ mod tests {
"reportRejected",
"Report could not be processed.",
leader_task.id(),
Some(ReportRejectedReason::PublicShareDecodeFailure.detail()),
Some(ReportRejectionReason::DecodeFailure.detail()),
)
.await;

Expand All @@ -1339,7 +1343,7 @@ mod tests {
"reportRejected",
"Report could not be processed.",
leader_task.id(),
Some(ReportRejectedReason::LeaderDecryptFailure.detail()),
Some(ReportRejectionReason::DecryptFailure.detail()),
)
.await;

Expand Down Expand Up @@ -1376,7 +1380,7 @@ mod tests {
"reportRejected",
"Report could not be processed.",
leader_task.id(),
Some(ReportRejectedReason::LeaderInputShareDecodeFailure.detail()),
Some(ReportRejectionReason::DecodeFailure.detail()),
)
.await;

Expand Down
42 changes: 27 additions & 15 deletions aggregator/src/aggregator/problem_details.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl ProblemDetailsConnExt for Conn {
#[cfg(test)]
mod tests {
use crate::aggregator::{
error::{BatchMismatch, ReportRejectedReason},
error::{BatchMismatch, ReportRejection, ReportRejectionReason},
send_request_to_helper, Error,
};
use assert_matches::assert_matches;
Expand All @@ -119,7 +119,7 @@ mod tests {
use janus_core::time::{Clock, RealClock};
use janus_messages::{
problem_type::{DapProblemType, DapProblemTypeParseError},
Duration, HpkeConfigId, Interval, ReportIdChecksum,
Duration, Interval, ReportIdChecksum,
};
use opentelemetry::metrics::Unit;
use rand::random;
Expand Down Expand Up @@ -179,15 +179,37 @@ mod tests {
TestCase::new(Box::new(|| Error::InvalidConfiguration("test")), None),
TestCase::new(
Box::new(|| {
Error::ReportRejected(
Error::ReportRejected(ReportRejection::new(
random(),
random(),
RealClock::default().now(),
ReportRejectedReason::TaskExpired
)
ReportRejectionReason::TaskExpired
))
}),
Some(DapProblemType::ReportRejected),
),
TestCase::new(
Box::new(|| {
Error::ReportRejected(ReportRejection::new(
random(),
random(),
RealClock::default().now(),
ReportRejectionReason::TooEarly
))
}),
Some(DapProblemType::ReportTooEarly),
),
TestCase::new(
Box::new(|| {
Error::ReportRejected(ReportRejection::new(
random(),
random(),
RealClock::default().now(),
ReportRejectionReason::OutdatedHpkeConfig(random()),
))
}),
Some(DapProblemType::OutdatedConfig),
),
TestCase::new(
Box::new(|| Error::InvalidMessage(Some(random()), "test")),
Some(DapProblemType::InvalidMessage),
Expand All @@ -204,16 +226,6 @@ mod tests {
Box::new(|| Error::UnrecognizedAggregationJob(random(), random())),
Some(DapProblemType::UnrecognizedAggregationJob),
),
TestCase::new(
Box::new(|| Error::OutdatedHpkeConfig(random(), HpkeConfigId::from(0))),
Some(DapProblemType::OutdatedConfig),
),
TestCase::new(
Box::new(|| {
Error::ReportTooEarly(random(), random(), RealClock::default().now())
}),
Some(DapProblemType::ReportTooEarly),
),
TestCase::new(
Box::new(|| Error::UnauthorizedRequest(random())),
Some(DapProblemType::UnauthorizedRequest),
Expand Down
Loading

0 comments on commit aa2c23e

Please sign in to comment.