Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adopt DAP-05 error types #1853

Merged
merged 1 commit into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 62 additions & 28 deletions aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,9 +290,9 @@ impl<C: Clock> Aggregator<C> {
Some(task_id_base64) => {
let task_id_bytes = URL_SAFE_NO_PAD
.decode(task_id_base64)
.map_err(|_| Error::UnrecognizedMessage(None, "task_id"))?;
.map_err(|_| Error::InvalidMessage(None, "task_id"))?;
let task_id = TaskId::get_decoded(&task_id_bytes)
.map_err(|_| Error::UnrecognizedMessage(None, "task_id"))?;
.map_err(|_| Error::InvalidMessage(None, "task_id"))?;
let task_aggregator = self
.task_aggregator_for(&task_id)
.await?
Expand Down Expand Up @@ -734,7 +734,7 @@ impl<C: Clock> Aggregator<C> {
.map(|url| url.try_into())
.collect::<Result<Vec<Url>, _>>()?;
if aggregator_urls.len() != 2 {
return Err(Error::UnrecognizedMessage(
return Err(Error::InvalidMessage(
Some(*task_id),
"taskprov configuration is missing one or both aggregators",
));
Expand Down Expand Up @@ -1586,11 +1586,11 @@ impl VdafOps {
let req = AggregationJobInitializeReq::<Q>::get_decoded(req_bytes)?;

// If two ReportShare messages have the same report ID, then the helper MUST abort with
// error "unrecognizedMessage". (§4.4.4.1)
// error "invalidMessage". (§4.5.1.2)
let mut seen_report_ids = HashSet::with_capacity(req.prepare_inits().len());
for prepare_init in req.prepare_inits() {
if !seen_report_ids.insert(*prepare_init.report_share().metadata().id()) {
return Err(Error::UnrecognizedMessage(
return Err(Error::InvalidMessage(
Some(*task.id()),
"aggregate request contains duplicate report IDs",
));
Expand Down Expand Up @@ -1700,37 +1700,71 @@ impl VdafOps {
});

let plaintext_input_share = plaintext.and_then(|plaintext| {
let plaintext_input_share = PlaintextInputShare::get_decoded(&plaintext).map_err(|error| {
info!(task_id = %task.id(), metadata = ?prepare_init.report_share().metadata(), ?error, "Couldn't decode helper's plaintext input share");
aggregate_step_failure_counter.add(1, &[KeyValue::new("type", "plaintext_input_share_decode_failure")]);
PrepareError::UnrecognizedMessage
})?;
let plaintext_input_share =
PlaintextInputShare::get_decoded(&plaintext).map_err(|error| {
info!(
task_id = %task.id(),
metadata = ?prepare_init.report_share().metadata(),
?error, "Couldn't decode helper's plaintext input share",
);
aggregate_step_failure_counter.add(
1,
&[KeyValue::new(
"type",
"plaintext_input_share_decode_failure",
)],
);
PrepareError::InvalidMessage
})?;
// Check for repeated extensions.
let mut extension_types = HashSet::new();
if !plaintext_input_share
.extensions()
.iter()
.all(|extension| extension_types.insert(extension.extension_type())) {
info!(task_id = %task.id(), metadata = ?prepare_init.report_share().metadata(), "Received report share with duplicate extensions");
aggregate_step_failure_counter.add(1, &[KeyValue::new("type", "duplicate_extension")]);
return Err(PrepareError::UnrecognizedMessage)
.all(|extension| extension_types.insert(extension.extension_type()))
{
info!(
task_id = %task.id(),
metadata = ?prepare_init.report_share().metadata(),
"Received report share with duplicate extensions",
);
aggregate_step_failure_counter
.add(1, &[KeyValue::new("type", "duplicate_extension")]);
return Err(PrepareError::InvalidMessage);
}
Ok(plaintext_input_share)
});

let input_share = plaintext_input_share.and_then(|plaintext_input_share| {
A::InputShare::get_decoded_with_param(&(vdaf, Role::Helper.index().unwrap()), plaintext_input_share.payload())
.map_err(|error| {
info!(task_id = %task.id(), metadata = ?prepare_init.report_share().metadata(), ?error, "Couldn't decode helper's input share");
aggregate_step_failure_counter.add(1, &[KeyValue::new("type", "input_share_decode_failure")]);
PrepareError::UnrecognizedMessage
})
A::InputShare::get_decoded_with_param(
&(vdaf, Role::Helper.index().unwrap()),
plaintext_input_share.payload(),
)
.map_err(|error| {
info!(
task_id = %task.id(),
metadata = ?prepare_init.report_share().metadata(),
?error, "Couldn't decode helper's input share",
);
aggregate_step_failure_counter
.add(1, &[KeyValue::new("type", "input_share_decode_failure")]);
PrepareError::InvalidMessage
})
});

let public_share = A::PublicShare::get_decoded_with_param(vdaf, prepare_init.report_share().public_share()).map_err(|error|{
info!(task_id = %task.id(), metadata = ?prepare_init.report_share().metadata(), ?error, "Couldn't decode public share");
aggregate_step_failure_counter.add(1, &[KeyValue::new("type", "public_share_decode_failure")]);
PrepareError::UnrecognizedMessage
let public_share = A::PublicShare::get_decoded_with_param(
vdaf,
prepare_init.report_share().public_share(),
)
.map_err(|error| {
info!(
task_id = %task.id(),
metadata = ?prepare_init.report_share().metadata(),
?error, "Couldn't decode public share",
);
aggregate_step_failure_counter
.add(1, &[KeyValue::new("type", "public_share_decode_failure")]);
PrepareError::InvalidMessage
});

let shares = input_share.and_then(|input_share| Ok((public_share?, input_share)));
Expand Down Expand Up @@ -2035,7 +2069,7 @@ impl VdafOps {
A::OutputShare: Send + Sync,
{
if leader_aggregation_job.round() == AggregationJobRound::from(0) {
return Err(Error::UnrecognizedMessage(
return Err(Error::InvalidMessage(
Some(*task.id()),
"aggregation job cannot be advanced to round 0",
));
Expand Down Expand Up @@ -2113,11 +2147,11 @@ impl VdafOps {
// If this is not a replay, the leader should be advancing our state to the next
// round and no further.
return Err(datastore::Error::User(
Error::RoundMismatch {
Error::StepMismatch {
task_id: *task.id(),
aggregation_job_id,
expected_round: helper_aggregation_job.round().increment(),
got_round: leader_aggregation_job.round(),
expected_step: helper_aggregation_job.round().increment(),
got_step: leader_aggregation_job.round(),
}
.into(),
));
Expand Down
51 changes: 49 additions & 2 deletions aggregator/src/aggregator/aggregate_init_tests.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use crate::aggregator::{
http_handlers::{aggregator_handler, test_util::decode_response_body},
http_handlers::{
aggregator_handler,
test_util::{decode_response_body, take_problem_details},
},
tests::generate_helper_report_share,
Config,
};
use assert_matches::assert_matches;
use http::StatusCode;
use janus_aggregator_core::{
datastore::{
test_util::{ephemeral_datastore, EphemeralDatastore},
Expand Down Expand Up @@ -31,6 +35,7 @@ use prio::{
},
};
use rand::random;
use serde_json::json;
use std::sync::Arc;
use trillium::{Handler, KnownHeaderName, Status};
use trillium_testing::{prelude::put, TestConn};
Expand Down Expand Up @@ -459,6 +464,48 @@ async fn aggregation_job_init_two_round_vdaf_idempotence() {
let aggregation_job_resp: AggregationJobResp = decode_response_body(&mut response).await;
assert_eq!(
aggregation_job_resp,
test_case.aggregation_job_init_resp.unwrap()
test_case.aggregation_job_init_resp.unwrap(),
);
}

#[tokio::test]
async fn aggregation_job_init_wrong_query() {
let test_case = setup_aggregate_init_test().await;

// setup_aggregate_init_test sets up a task with a time interval query. We send a fixed size
// query which should yield an error.
let wrong_query = AggregationJobInitializeReq::new(
test_case.aggregation_param.get_encoded(),
PartialBatchSelector::new_fixed_size(random()),
test_case.prepare_inits,
);

let mut response = put(test_case
.task
.aggregation_job_uri(&random())
.unwrap()
.path())
.with_request_header(
DAP_AUTH_HEADER,
test_case
.task
.primary_aggregator_auth_token()
.as_ref()
.to_owned(),
)
.with_request_header(
KnownHeaderName::ContentType,
AggregationJobInitializeReq::<TimeInterval>::MEDIA_TYPE,
)
.with_request_body(wrong_query.get_encoded())
.run_async(&test_case.handler)
.await;
assert_eq!(
take_problem_details(&mut response).await,
json!({
"status": StatusCode::BAD_REQUEST.as_u16(),
"type": "urn:ietf:params:ppm:dap:error:invalidMessage",
"title": "The message type for a response was incorrect or the payload was malformed.",
}),
);
}
14 changes: 7 additions & 7 deletions aggregator/src/aggregator/aggregation_job_continue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl VdafOps {
let report_aggregation = loop {
let report_agg = report_aggregations_iter.next().ok_or_else(|| {
datastore::Error::User(
Error::UnrecognizedMessage(
Error::InvalidMessage(
Some(*task.id()),
"leader sent unexpected, duplicate, or out-of-order prepare steps",
)
Expand Down Expand Up @@ -116,7 +116,7 @@ impl VdafOps {
}
_ => {
return Err(datastore::Error::User(
Error::UnrecognizedMessage(
Error::InvalidMessage(
Some(*task.id()),
"leader sent prepare step for non-WAITING report aggregation",
)
Expand Down Expand Up @@ -597,7 +597,7 @@ mod tests {
&round_zero_request,
&test_case.handler,
Status::BadRequest,
"urn:ietf:params:ppm:dap:error:unrecognizedMessage",
"urn:ietf:params:ppm:dap:error:invalidMessage",
"The message type for a response was incorrect or the payload was malformed.",
)
.await;
Expand Down Expand Up @@ -768,8 +768,8 @@ mod tests {
&past_round_request,
&test_case.handler,
Status::BadRequest,
"urn:ietf:params:ppm:dap:error:roundMismatch",
"The leader and helper are not on the same round of VDAF preparation.",
"urn:ietf:params:ppm:dap:error:stepMismatch",
"The leader and helper are not on the same step of VDAF preparation.",
)
.await;
}
Expand All @@ -791,8 +791,8 @@ mod tests {
&future_round_request,
&test_case.handler,
Status::BadRequest,
"urn:ietf:params:ppm:dap:error:roundMismatch",
"The leader and helper are not on the same round of VDAF preparation.",
"urn:ietf:params:ppm:dap:error:stepMismatch",
"The leader and helper are not on the same step of VDAF preparation.",
)
.await;
}
Expand Down
9 changes: 5 additions & 4 deletions aggregator/src/aggregator/aggregation_job_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,9 +351,10 @@ impl AggregationJobDriver {
info!(report_id = %report_aggregation.report_id(), "Received report with duplicate extensions");
self.aggregate_step_failure_counter
.add(1, &[KeyValue::new("type", "duplicate_extension")]);
report_aggregations_to_write.push(report_aggregation.with_state(
ReportAggregationState::Failed(PrepareError::UnrecognizedMessage),
));
report_aggregations_to_write.push(
report_aggregation
.with_state(ReportAggregationState::Failed(PrepareError::InvalidMessage)),
);
continue;
}

Expand Down Expand Up @@ -1515,7 +1516,7 @@ mod tests {
*repeated_extension_report.metadata().time(),
1,
None,
ReportAggregationState::Failed(PrepareError::UnrecognizedMessage),
ReportAggregationState::Failed(PrepareError::InvalidMessage),
);
let want_missing_report_report_aggregation =
ReportAggregation::<VERIFY_KEY_LENGTH, Prio3Count>::new(
Expand Down
22 changes: 11 additions & 11 deletions aggregator/src/aggregator/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,19 @@ pub enum Error {
/// far in the future, §4.3.2.
#[error("task {0}: report {1} too early: {2}")]
ReportTooEarly(TaskId, ReportId, Time),
/// Corresponds to `unrecognizedMessage`, §3.2
#[error("task {0:?}: unrecognized message: {1}")]
UnrecognizedMessage(Option<TaskId>, &'static str),
/// Corresponds to `roundMismatch`
/// Corresponds to `invalidMessage`, §3.2
#[error("task {0:?}: invalid message: {1}")]
InvalidMessage(Option<TaskId>, &'static str),
/// Corresponds to `stepMismatch`
#[error(
"task {task_id}: unexpected round in aggregation job {aggregation_job_id} (expected \
{expected_round}, got {got_round})"
"task {task_id}: unexpected step in aggregation job {aggregation_job_id} (expected \
{expected_step}, got {got_step})"
)]
RoundMismatch {
StepMismatch {
task_id: TaskId,
aggregation_job_id: AggregationJobId,
expected_round: AggregationJobRound,
got_round: AggregationJobRound,
expected_step: AggregationJobRound,
got_step: AggregationJobRound,
},
/// Corresponds to `unrecognizedTask`, §3.2
#[error("task {0}: unrecognized task")]
Expand Down Expand Up @@ -157,8 +157,8 @@ impl Error {
Error::Message(_) => "message",
Error::ReportRejected(_, _, _) => "report_rejected",
Error::ReportTooEarly(_, _, _) => "report_too_early",
Error::UnrecognizedMessage(_, _) => "unrecognized_message",
Error::RoundMismatch { .. } => "round_mismatch",
Error::InvalidMessage(_, _) => "unrecognized_message",
Error::StepMismatch { .. } => "step_mismatch",
Error::UnrecognizedTask(_) => "unrecognized_task",
Error::MissingTaskId => "missing_task_id",
Error::UnrecognizedAggregationJob(_, _) => "unrecognized_aggregation_job",
Expand Down
Loading
Loading