Skip to content

Commit

Permalink
Adopt DAP-05 error types
Browse files Browse the repository at this point in the history
DAP-05 changes a few error types.

`unrecongizedMessage -> invalidMessage`
`queryMismatch` is gone (now we use `invalidMessage` in that case)
`roundMismatch -> stepMismatch`

We still use the word "round" in several places where "step" would be
more appropriate given DAP-05 text. Renaming those variables, etc., will
arrive in a later commit to avoid adding unnecessary noise here.

Part of #1669
  • Loading branch information
tgeoghegan committed Sep 13, 2023
1 parent e1ffcf0 commit 8ce3d7c
Show file tree
Hide file tree
Showing 12 changed files with 173 additions and 95 deletions.
90 changes: 62 additions & 28 deletions aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,9 +290,9 @@ impl<C: Clock> Aggregator<C> {
Some(task_id_base64) => {
let task_id_bytes = URL_SAFE_NO_PAD
.decode(task_id_base64)
.map_err(|_| Error::UnrecognizedMessage(None, "task_id"))?;
.map_err(|_| Error::InvalidMessage(None, "task_id"))?;
let task_id = TaskId::get_decoded(&task_id_bytes)
.map_err(|_| Error::UnrecognizedMessage(None, "task_id"))?;
.map_err(|_| Error::InvalidMessage(None, "task_id"))?;
let task_aggregator = self
.task_aggregator_for(&task_id)
.await?
Expand Down Expand Up @@ -734,7 +734,7 @@ impl<C: Clock> Aggregator<C> {
.map(|url| url.try_into())
.collect::<Result<Vec<Url>, _>>()?;
if aggregator_urls.len() != 2 {
return Err(Error::UnrecognizedMessage(
return Err(Error::InvalidMessage(
Some(*task_id),
"taskprov configuration is missing one or both aggregators",
));
Expand Down Expand Up @@ -1586,11 +1586,11 @@ impl VdafOps {
let req = AggregationJobInitializeReq::<Q>::get_decoded(req_bytes)?;

// If two ReportShare messages have the same report ID, then the helper MUST abort with
// error "unrecognizedMessage". (§4.4.4.1)
// error "invalidMessage". (§4.5.1.2)
let mut seen_report_ids = HashSet::with_capacity(req.prepare_inits().len());
for prepare_init in req.prepare_inits() {
if !seen_report_ids.insert(*prepare_init.report_share().metadata().id()) {
return Err(Error::UnrecognizedMessage(
return Err(Error::InvalidMessage(
Some(*task.id()),
"aggregate request contains duplicate report IDs",
));
Expand Down Expand Up @@ -1700,37 +1700,71 @@ impl VdafOps {
});

let plaintext_input_share = plaintext.and_then(|plaintext| {
let plaintext_input_share = PlaintextInputShare::get_decoded(&plaintext).map_err(|error| {
info!(task_id = %task.id(), metadata = ?prepare_init.report_share().metadata(), ?error, "Couldn't decode helper's plaintext input share");
aggregate_step_failure_counter.add(1, &[KeyValue::new("type", "plaintext_input_share_decode_failure")]);
PrepareError::UnrecognizedMessage
})?;
let plaintext_input_share =
PlaintextInputShare::get_decoded(&plaintext).map_err(|error| {
info!(
task_id = %task.id(),
metadata = ?prepare_init.report_share().metadata(),
?error, "Couldn't decode helper's plaintext input share",
);
aggregate_step_failure_counter.add(
1,
&[KeyValue::new(
"type",
"plaintext_input_share_decode_failure",
)],
);
PrepareError::InvalidMessage
})?;
// Check for repeated extensions.
let mut extension_types = HashSet::new();
if !plaintext_input_share
.extensions()
.iter()
.all(|extension| extension_types.insert(extension.extension_type())) {
info!(task_id = %task.id(), metadata = ?prepare_init.report_share().metadata(), "Received report share with duplicate extensions");
aggregate_step_failure_counter.add(1, &[KeyValue::new("type", "duplicate_extension")]);
return Err(PrepareError::UnrecognizedMessage)
.all(|extension| extension_types.insert(extension.extension_type()))
{
info!(
task_id = %task.id(),
metadata = ?prepare_init.report_share().metadata(),
"Received report share with duplicate extensions",
);
aggregate_step_failure_counter
.add(1, &[KeyValue::new("type", "duplicate_extension")]);
return Err(PrepareError::InvalidMessage);
}
Ok(plaintext_input_share)
});

let input_share = plaintext_input_share.and_then(|plaintext_input_share| {
A::InputShare::get_decoded_with_param(&(vdaf, Role::Helper.index().unwrap()), plaintext_input_share.payload())
.map_err(|error| {
info!(task_id = %task.id(), metadata = ?prepare_init.report_share().metadata(), ?error, "Couldn't decode helper's input share");
aggregate_step_failure_counter.add(1, &[KeyValue::new("type", "input_share_decode_failure")]);
PrepareError::UnrecognizedMessage
})
A::InputShare::get_decoded_with_param(
&(vdaf, Role::Helper.index().unwrap()),
plaintext_input_share.payload(),
)
.map_err(|error| {
info!(
task_id = %task.id(),
metadata = ?prepare_init.report_share().metadata(),
?error, "Couldn't decode helper's input share",
);
aggregate_step_failure_counter
.add(1, &[KeyValue::new("type", "input_share_decode_failure")]);
PrepareError::InvalidMessage
})
});

let public_share = A::PublicShare::get_decoded_with_param(vdaf, prepare_init.report_share().public_share()).map_err(|error|{
info!(task_id = %task.id(), metadata = ?prepare_init.report_share().metadata(), ?error, "Couldn't decode public share");
aggregate_step_failure_counter.add(1, &[KeyValue::new("type", "public_share_decode_failure")]);
PrepareError::UnrecognizedMessage
let public_share = A::PublicShare::get_decoded_with_param(
vdaf,
prepare_init.report_share().public_share(),
)
.map_err(|error| {
info!(
task_id = %task.id(),
metadata = ?prepare_init.report_share().metadata(),
?error, "Couldn't decode public share",
);
aggregate_step_failure_counter
.add(1, &[KeyValue::new("type", "public_share_decode_failure")]);
PrepareError::InvalidMessage
});

let shares = input_share.and_then(|input_share| Ok((public_share?, input_share)));
Expand Down Expand Up @@ -2035,7 +2069,7 @@ impl VdafOps {
A::OutputShare: Send + Sync,
{
if leader_aggregation_job.round() == AggregationJobRound::from(0) {
return Err(Error::UnrecognizedMessage(
return Err(Error::InvalidMessage(
Some(*task.id()),
"aggregation job cannot be advanced to round 0",
));
Expand Down Expand Up @@ -2113,11 +2147,11 @@ impl VdafOps {
// If this is not a replay, the leader should be advancing our state to the next
// round and no further.
return Err(datastore::Error::User(
Error::RoundMismatch {
Error::StepMismatch {
task_id: *task.id(),
aggregation_job_id,
expected_round: helper_aggregation_job.round().increment(),
got_round: leader_aggregation_job.round(),
expected_step: helper_aggregation_job.round().increment(),
got_step: leader_aggregation_job.round(),
}
.into(),
));
Expand Down
51 changes: 49 additions & 2 deletions aggregator/src/aggregator/aggregate_init_tests.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use crate::aggregator::{
http_handlers::{aggregator_handler, test_util::decode_response_body},
http_handlers::{
aggregator_handler,
test_util::{decode_response_body, take_problem_details},
},
tests::generate_helper_report_share,
Config,
};
use assert_matches::assert_matches;
use http::StatusCode;
use janus_aggregator_core::{
datastore::{
test_util::{ephemeral_datastore, EphemeralDatastore},
Expand Down Expand Up @@ -31,6 +35,7 @@ use prio::{
},
};
use rand::random;
use serde_json::json;
use std::sync::Arc;
use trillium::{Handler, KnownHeaderName, Status};
use trillium_testing::{prelude::put, TestConn};
Expand Down Expand Up @@ -459,6 +464,48 @@ async fn aggregation_job_init_two_round_vdaf_idempotence() {
let aggregation_job_resp: AggregationJobResp = decode_response_body(&mut response).await;
assert_eq!(
aggregation_job_resp,
test_case.aggregation_job_init_resp.unwrap()
test_case.aggregation_job_init_resp.unwrap(),
);
}

#[tokio::test]
async fn aggregation_job_init_wrong_query() {
let test_case = setup_aggregate_init_test().await;

// setup_aggregate_init_test sets up a task with a time interval query. We send a fixed size
// query which should yield an error.
let wrong_query = AggregationJobInitializeReq::new(
test_case.aggregation_param.get_encoded(),
PartialBatchSelector::new_fixed_size(random()),
test_case.prepare_inits,
);

let mut response = put(test_case
.task
.aggregation_job_uri(&random())
.unwrap()
.path())
.with_request_header(
DAP_AUTH_HEADER,
test_case
.task
.primary_aggregator_auth_token()
.as_ref()
.to_owned(),
)
.with_request_header(
KnownHeaderName::ContentType,
AggregationJobInitializeReq::<TimeInterval>::MEDIA_TYPE,
)
.with_request_body(wrong_query.get_encoded())
.run_async(&test_case.handler)
.await;
assert_eq!(
take_problem_details(&mut response).await,
json!({
"status": StatusCode::BAD_REQUEST.as_u16(),
"type": "urn:ietf:params:ppm:dap:error:invalidMessage",
"title": "The message type for a response was incorrect or the payload was malformed.",
}),
);
}
14 changes: 7 additions & 7 deletions aggregator/src/aggregator/aggregation_job_continue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl VdafOps {
let report_aggregation = loop {
let report_agg = report_aggregations_iter.next().ok_or_else(|| {
datastore::Error::User(
Error::UnrecognizedMessage(
Error::InvalidMessage(
Some(*task.id()),
"leader sent unexpected, duplicate, or out-of-order prepare steps",
)
Expand Down Expand Up @@ -116,7 +116,7 @@ impl VdafOps {
}
_ => {
return Err(datastore::Error::User(
Error::UnrecognizedMessage(
Error::InvalidMessage(
Some(*task.id()),
"leader sent prepare step for non-WAITING report aggregation",
)
Expand Down Expand Up @@ -597,7 +597,7 @@ mod tests {
&round_zero_request,
&test_case.handler,
Status::BadRequest,
"urn:ietf:params:ppm:dap:error:unrecognizedMessage",
"urn:ietf:params:ppm:dap:error:invalidMessage",
"The message type for a response was incorrect or the payload was malformed.",
)
.await;
Expand Down Expand Up @@ -768,8 +768,8 @@ mod tests {
&past_round_request,
&test_case.handler,
Status::BadRequest,
"urn:ietf:params:ppm:dap:error:roundMismatch",
"The leader and helper are not on the same round of VDAF preparation.",
"urn:ietf:params:ppm:dap:error:stepMismatch",
"The leader and helper are not on the same step of VDAF preparation.",
)
.await;
}
Expand All @@ -791,8 +791,8 @@ mod tests {
&future_round_request,
&test_case.handler,
Status::BadRequest,
"urn:ietf:params:ppm:dap:error:roundMismatch",
"The leader and helper are not on the same round of VDAF preparation.",
"urn:ietf:params:ppm:dap:error:stepMismatch",
"The leader and helper are not on the same step of VDAF preparation.",
)
.await;
}
Expand Down
9 changes: 5 additions & 4 deletions aggregator/src/aggregator/aggregation_job_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,9 +351,10 @@ impl AggregationJobDriver {
info!(report_id = %report_aggregation.report_id(), "Received report with duplicate extensions");
self.aggregate_step_failure_counter
.add(1, &[KeyValue::new("type", "duplicate_extension")]);
report_aggregations_to_write.push(report_aggregation.with_state(
ReportAggregationState::Failed(PrepareError::UnrecognizedMessage),
));
report_aggregations_to_write.push(
report_aggregation
.with_state(ReportAggregationState::Failed(PrepareError::InvalidMessage)),
);
continue;
}

Expand Down Expand Up @@ -1515,7 +1516,7 @@ mod tests {
*repeated_extension_report.metadata().time(),
1,
None,
ReportAggregationState::Failed(PrepareError::UnrecognizedMessage),
ReportAggregationState::Failed(PrepareError::InvalidMessage),
);
let want_missing_report_report_aggregation =
ReportAggregation::<VERIFY_KEY_LENGTH, Prio3Count>::new(
Expand Down
22 changes: 11 additions & 11 deletions aggregator/src/aggregator/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,19 @@ pub enum Error {
/// far in the future, §4.3.2.
#[error("task {0}: report {1} too early: {2}")]
ReportTooEarly(TaskId, ReportId, Time),
/// Corresponds to `unrecognizedMessage`, §3.2
#[error("task {0:?}: unrecognized message: {1}")]
UnrecognizedMessage(Option<TaskId>, &'static str),
/// Corresponds to `roundMismatch`
/// Corresponds to `invalidMessage`, §3.2
#[error("task {0:?}: invalid message: {1}")]
InvalidMessage(Option<TaskId>, &'static str),
/// Corresponds to `stepMismatch`
#[error(
"task {task_id}: unexpected round in aggregation job {aggregation_job_id} (expected \
{expected_round}, got {got_round})"
"task {task_id}: unexpected step in aggregation job {aggregation_job_id} (expected \
{expected_step}, got {got_step})"
)]
RoundMismatch {
StepMismatch {
task_id: TaskId,
aggregation_job_id: AggregationJobId,
expected_round: AggregationJobRound,
got_round: AggregationJobRound,
expected_step: AggregationJobRound,
got_step: AggregationJobRound,
},
/// Corresponds to `unrecognizedTask`, §3.2
#[error("task {0}: unrecognized task")]
Expand Down Expand Up @@ -157,8 +157,8 @@ impl Error {
Error::Message(_) => "message",
Error::ReportRejected(_, _, _) => "report_rejected",
Error::ReportTooEarly(_, _, _) => "report_too_early",
Error::UnrecognizedMessage(_, _) => "unrecognized_message",
Error::RoundMismatch { .. } => "round_mismatch",
Error::InvalidMessage(_, _) => "unrecognized_message",
Error::StepMismatch { .. } => "step_mismatch",
Error::UnrecognizedTask(_) => "unrecognized_task",
Error::MissingTaskId => "missing_task_id",
Error::UnrecognizedAggregationJob(_, _) => "unrecognized_aggregation_job",
Expand Down
Loading

0 comments on commit 8ce3d7c

Please sign in to comment.