Skip to content

Commit

Permalink
DAP-05: Specialize to exactly two aggregators
Browse files Browse the repository at this point in the history
This commit takes changes to various messages so that we handle exactly
one leader and exactly one helper, as specified in [DAP-05][1]. These
changes are voluminous but mostly mechanical.

This commit also adds variants for `Poplar1` to a couple of enums and
some macros, because later changes will need it.

[1]: https://datatracker.ietf.org/doc/draft-ietf-ppm-dap/05/

Part of #1669
  • Loading branch information
tgeoghegan committed Aug 24, 2023
1 parent bb077cb commit 145aa4c
Show file tree
Hide file tree
Showing 34 changed files with 1,574 additions and 1,616 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

157 changes: 67 additions & 90 deletions aggregator/src/aggregator.rs

Large diffs are not rendered by default.

406 changes: 207 additions & 199 deletions aggregator/src/aggregator/aggregation_job_creator.rs

Large diffs are not rendered by default.

531 changes: 238 additions & 293 deletions aggregator/src/aggregator/aggregation_job_driver.rs

Large diffs are not rendered by default.

11 changes: 2 additions & 9 deletions aggregator/src/aggregator/collection_job_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,6 @@ mod tests {
use rand::random;
use std::{str, sync::Arc, time::Duration as StdDuration};
use trillium_tokio::Stopper;
use url::Url;

async fn setup_collection_job_test_case(
server: &mut mockito::Server,
Expand All @@ -571,10 +570,7 @@ mod tests {
) {
let time_precision = Duration::from_seconds(500);
let task = TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Fake, Role::Leader)
.with_aggregator_endpoints(Vec::from([
Url::parse("http://irrelevant").unwrap(), // leader URL doesn't matter
Url::parse(&server.url()).unwrap(),
]))
.with_helper_aggregator_endpoint(server.url().parse().unwrap())
.with_time_precision(time_precision)
.with_min_batch_size(10)
.build();
Expand Down Expand Up @@ -712,10 +708,7 @@ mod tests {

let time_precision = Duration::from_seconds(500);
let task = TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Fake, Role::Leader)
.with_aggregator_endpoints(Vec::from([
Url::parse("http://irrelevant").unwrap(), // leader URL doesn't matter
Url::parse(&server.url()).unwrap(),
]))
.with_helper_aggregator_endpoint(server.url().parse().unwrap())
.with_time_precision(time_precision)
.with_min_batch_size(10)
.build();
Expand Down
49 changes: 35 additions & 14 deletions aggregator/src/aggregator/collection_job_tests.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,4 @@
use crate::aggregator::{
http_handlers::{
aggregator_handler,
test_util::{decode_response_body, take_problem_details},
},
Config,
};
use crate::aggregator::{http_handlers::aggregator_handler, Config};
use http::StatusCode;
use janus_aggregator_core::{
datastore::{
Expand Down Expand Up @@ -42,7 +36,6 @@ use serde_json::json;
use std::{collections::HashSet, sync::Arc};
use trillium::{Handler, KnownHeaderName, Status};
use trillium_testing::{
assert_headers,
prelude::{post, put},
TestConn,
};
Expand Down Expand Up @@ -353,10 +346,23 @@ async fn collection_job_success_fixed_size() {
}

let mut test_conn = test_case.post_collection_job(&collection_job_id).await;

assert_eq!(test_conn.status(), Some(Status::Ok));
assert_headers!(&test_conn, "content-type" => (Collection::<FixedSize>::MEDIA_TYPE));
assert_eq!(
test_conn
.response_headers()
.get(KnownHeaderName::ContentType)
.unwrap(),
Collection::<FixedSize>::MEDIA_TYPE
);
let body_bytes = test_conn
.take_response_body()
.unwrap()
.into_bytes()
.await
.unwrap();
let collect_resp = Collection::<FixedSize>::get_decoded(body_bytes.as_ref()).unwrap();

let collect_resp: Collection<FixedSize> = decode_response_body(&mut test_conn).await;
assert_eq!(
collect_resp.report_count(),
test_case.task.min_batch_size() + 1
Expand All @@ -367,13 +373,12 @@ async fn collection_job_success_fixed_size() {
.align_to_time_precision(test_case.task.time_precision())
.unwrap(),
);
assert_eq!(collect_resp.encrypted_aggregate_shares().len(), 2);

let decrypted_leader_aggregate_share = hpke::open(
test_case.task.collector_hpke_config().unwrap(),
test_case.collector_hpke_keypair.private_key(),
&HpkeApplicationInfo::new(&Label::AggregateShare, &Role::Leader, &Role::Collector),
&collect_resp.encrypted_aggregate_shares()[0],
collect_resp.leader_encrypted_aggregate_share(),
&AggregateShareAad::new(
*test_case.task.id(),
BatchSelector::new_fixed_size(batch_id),
Expand All @@ -391,7 +396,7 @@ async fn collection_job_success_fixed_size() {
test_case.task.collector_hpke_config().unwrap(),
test_case.collector_hpke_keypair.private_key(),
&HpkeApplicationInfo::new(&Label::AggregateShare, &Role::Helper, &Role::Collector),
&collect_resp.encrypted_aggregate_shares()[1],
collect_resp.helper_encrypted_aggregate_share(),
&AggregateShareAad::new(
*test_case.task.id(),
BatchSelector::new_fixed_size(batch_id),
Expand All @@ -417,7 +422,23 @@ async fn collection_job_success_fixed_size() {
.await;
assert_eq!(test_conn.status(), Some(Status::BadRequest));
assert_eq!(
take_problem_details(&mut test_conn).await,
test_conn
.response_headers()
.get(KnownHeaderName::ContentType)
.unwrap(),
"application/problem+json"
);
let problem_details: serde_json::Value = serde_json::from_slice(
&test_conn
.take_response_body()
.unwrap()
.into_bytes()
.await
.unwrap(),
)
.unwrap();
assert_eq!(
problem_details,
json!({
"status": StatusCode::BAD_REQUEST.as_u16(),
"type": "urn:ietf:params:ppm:dap:error:batchInvalid",
Expand Down
Loading

0 comments on commit 145aa4c

Please sign in to comment.