From a63a4a3bca18fb2bba80ed51d562a92a47d55468 Mon Sep 17 00:00:00 2001 From: Tim Geoghegan Date: Fri, 29 Sep 2023 10:05:00 -0700 Subject: [PATCH] Task rewrite: `TaskBuilder` in `aggregator` pt. 1 Adopts `NewTaskBuilder` and `AggregatorTask` across portions of the test utilities and tests in the `janus_aggregator` module. Part of #1524 --- .../src/aggregator/aggregate_init_tests.rs | 40 +- .../aggregator/aggregation_job_continue.rs | 27 +- .../src/aggregator/collection_job_tests.rs | 34 +- aggregator/src/aggregator/http_handlers.rs | 384 ++++++++---------- 4 files changed, 208 insertions(+), 277 deletions(-) diff --git a/aggregator/src/aggregator/aggregate_init_tests.rs b/aggregator/src/aggregator/aggregate_init_tests.rs index 6c6a182c7..48e44b58a 100644 --- a/aggregator/src/aggregator/aggregate_init_tests.rs +++ b/aggregator/src/aggregator/aggregate_init_tests.rs @@ -13,7 +13,10 @@ use janus_aggregator_core::{ test_util::{ephemeral_datastore, EphemeralDatastore}, Datastore, }, - task::{test_util::TaskBuilder, QueryType, Task}, + task::{ + test_util::{NewTaskBuilder as TaskBuilder, Task}, + AggregatorTask, QueryType, + }, test_util::noop_meter, }; use janus_core::{ @@ -24,7 +27,7 @@ use janus_core::{ }; use janus_messages::{ query_type::TimeInterval, AggregationJobId, AggregationJobInitializeReq, AggregationJobResp, - PartialBatchSelector, PrepareInit, PrepareStepResult, ReportMetadata, Role, + PartialBatchSelector, PrepareInit, PrepareStepResult, ReportMetadata, }; use prio::{ codec::Encode, @@ -46,7 +49,7 @@ where V: vdaf::Vdaf, { clock: MockClock, - task: Task, + task: AggregatorTask, vdaf: V, aggregation_param: V::AggregationParam, } @@ -57,7 +60,7 @@ where { pub(super) fn new( clock: MockClock, - task: Task, + task: AggregatorTask, vdaf: V, aggregation_param: V::AggregationParam, ) -> Self { @@ -209,14 +212,15 @@ async fn setup_aggregate_init_test_without_sending_request< ) -> AggregationJobInitTestCase { install_test_trace_subscriber(); - let task = TaskBuilder::new(QueryType::TimeInterval, vdaf_instance, Role::Helper) - .with_aggregator_auth_token(Some(auth_token)) + let task = TaskBuilder::new(QueryType::TimeInterval, vdaf_instance) + .with_aggregator_auth_token(auth_token) .build(); + let helper_task = task.helper_view().unwrap(); let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; let datastore = Arc::new(ephemeral_datastore.datastore(clock.clone()).await); - datastore.put_task(&task).await.unwrap(); + datastore.put_aggregator_task(&helper_task).await.unwrap(); let handler = aggregator_handler( Arc::clone(&datastore), @@ -227,8 +231,12 @@ async fn setup_aggregate_init_test_without_sending_request< .await .unwrap(); - let prepare_init_generator = - PrepareInitGenerator::new(clock.clone(), task.clone(), vdaf, aggregation_param.clone()); + let prepare_init_generator = PrepareInitGenerator::new( + clock.clone(), + helper_task.clone(), + vdaf, + aggregation_param.clone(), + ); let prepare_inits = Vec::from([ prepare_init_generator.next(&measurement).0, @@ -263,10 +271,7 @@ pub(crate) async fn put_aggregation_job( aggregation_job: &AggregationJobInitializeReq, handler: &impl Handler, ) -> TestConn { - let (header, value) = task - .aggregator_auth_token() - .unwrap() - .request_authentication(); + let (header, value) = task.aggregator_auth_token().request_authentication(); put(task.aggregation_job_uri(aggregation_job_id).unwrap().path()) .with_request_header(header, value) .with_request_header( @@ -292,7 +297,6 @@ async fn aggregation_job_init_authorization_dap_auth_token() { let (auth_header, auth_value) = test_case .task .aggregator_auth_token() - .unwrap() .request_authentication(); let response = put(test_case @@ -337,12 +341,7 @@ async fn aggregation_job_init_malformed_authorization_header(#[case] header_valu .with_request_header(KnownHeaderName::Authorization, header_value.to_string()) .with_request_header( DAP_AUTH_HEADER, - test_case - .task - .aggregator_auth_token() - .unwrap() - .as_ref() - .to_owned(), + test_case.task.aggregator_auth_token().as_ref().to_owned(), ) .with_request_header( KnownHeaderName::ContentType, @@ -490,7 +489,6 @@ async fn aggregation_job_init_wrong_query() { let (header, value) = test_case .task .aggregator_auth_token() - .unwrap() .request_authentication(); let mut response = put(test_case diff --git a/aggregator/src/aggregator/aggregation_job_continue.rs b/aggregator/src/aggregator/aggregation_job_continue.rs index d529db38d..49bc441b6 100644 --- a/aggregator/src/aggregator/aggregation_job_continue.rs +++ b/aggregator/src/aggregator/aggregation_job_continue.rs @@ -289,7 +289,7 @@ impl VdafOps { #[cfg_attr(docsrs, doc(cfg(feature = "test-util")))] pub mod test_util { use crate::aggregator::http_handlers::test_util::{decode_response_body, take_problem_details}; - use janus_aggregator_core::task::Task; + use janus_aggregator_core::task::test_util::Task; use janus_messages::{AggregationJobContinueReq, AggregationJobId, AggregationJobResp}; use prio::codec::Encode; use serde_json::json; @@ -302,10 +302,7 @@ pub mod test_util { request: &AggregationJobContinueReq, handler: &impl Handler, ) -> TestConn { - let (header, value) = task - .aggregator_auth_token() - .unwrap() - .request_authentication(); + let (header, value) = task.aggregator_auth_token().request_authentication(); post(task.aggregation_job_uri(aggregation_job_id).unwrap().path()) .with_request_header(header, value) .with_request_header( @@ -393,7 +390,10 @@ mod tests { test_util::{ephemeral_datastore, EphemeralDatastore}, Datastore, }, - task::{test_util::TaskBuilder, QueryType, Task}, + task::{ + test_util::{NewTaskBuilder as TaskBuilder, Task}, + QueryType, + }, test_util::noop_meter, }; use janus_core::{ @@ -439,12 +439,9 @@ mod tests { install_test_trace_subscriber(); let aggregation_job_id = random(); - let task = TaskBuilder::new( - QueryType::TimeInterval, - VdafInstance::Poplar1 { bits: 1 }, - Role::Helper, - ) - .build(); + let task = + TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Poplar1 { bits: 1 }).build(); + let helper_task = task.helper_view().unwrap(); let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; let meter = noop_meter(); @@ -456,7 +453,7 @@ mod tests { .unwrap(); let prepare_init_generator = PrepareInitGenerator::new( clock.clone(), - task.clone(), + helper_task.clone(), Poplar1::new_shake128(1), aggregation_param.clone(), ); @@ -467,14 +464,14 @@ mod tests { datastore .run_tx(|tx| { let (task, aggregation_param, prepare_init, transcript) = ( - task.clone(), + helper_task.clone(), aggregation_param.clone(), prepare_init.clone(), transcript.clone(), ); Box::pin(async move { - tx.put_task(&task).await.unwrap(); + tx.put_aggregator_task(&task).await.unwrap(); tx.put_report_share(task.id(), prepare_init.report_share()) .await .unwrap(); diff --git a/aggregator/src/aggregator/collection_job_tests.rs b/aggregator/src/aggregator/collection_job_tests.rs index 960f79293..7d3fbcb46 100644 --- a/aggregator/src/aggregator/collection_job_tests.rs +++ b/aggregator/src/aggregator/collection_job_tests.rs @@ -16,15 +16,15 @@ use janus_aggregator_core::{ test_util::{ephemeral_datastore, EphemeralDatastore}, Datastore, }, - task::{test_util::TaskBuilder, QueryType, Task}, + task::{ + test_util::{NewTaskBuilder as TaskBuilder, Task}, + QueryType, + }, test_util::noop_meter, }; use janus_core::{ auth_tokens::AuthenticationToken, - hpke::{ - self, test_util::generate_test_hpke_config_and_private_key, HpkeApplicationInfo, - HpkeKeypair, Label, - }, + hpke::{self, HpkeApplicationInfo, Label}, test_util::{ dummy_vdaf::{self, AggregationParam}, install_test_trace_subscriber, @@ -51,7 +51,6 @@ use trillium_testing::{ pub(crate) struct CollectionJobTestCase { pub(super) task: Task, clock: MockClock, - pub(super) collector_hpke_keypair: HpkeKeypair, pub(super) handler: Box, pub(super) datastore: Arc>, _ephemeral_datastore: EphemeralDatastore, @@ -92,7 +91,7 @@ impl CollectionJobTestCase { self.put_collection_job_with_auth_token( collection_job_id, request, - self.task.collector_auth_token(), + Some(self.task.collector_auth_token()), ) .await } @@ -121,7 +120,7 @@ impl CollectionJobTestCase { ) -> TestConn { self.post_collection_job_with_auth_token( collection_job_id, - self.task.collector_auth_token(), + Some(self.task.collector_auth_token()), ) .await } @@ -133,15 +132,13 @@ pub(crate) async fn setup_collection_job_test_case( ) -> CollectionJobTestCase { install_test_trace_subscriber(); - let collector_hpke_keypair = generate_test_hpke_config_and_private_key(); - let task = TaskBuilder::new(query_type, VdafInstance::Fake, role) - .with_collector_hpke_config(collector_hpke_keypair.config().clone()) - .build(); + let task = TaskBuilder::new(query_type, VdafInstance::Fake).build(); + let role_task = task.view_for_role(role).unwrap(); let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; let datastore = Arc::new(ephemeral_datastore.datastore(clock.clone()).await); - datastore.put_task(&task).await.unwrap(); + datastore.put_aggregator_task(&role_task).await.unwrap(); let handler = aggregator_handler( Arc::clone(&datastore), @@ -158,7 +155,6 @@ pub(crate) async fn setup_collection_job_test_case( CollectionJobTestCase { task, clock, - collector_hpke_keypair, handler: Box::new(handler), datastore, _ephemeral_datastore: ephemeral_datastore, @@ -316,7 +312,7 @@ async fn collection_job_success_fixed_size() { let batch_id = *collection_job.batch_identifier(); let encrypted_helper_aggregate_share = hpke::seal( - task.collector_hpke_config().unwrap(), + task.collector_hpke_keypair().config(), &HpkeApplicationInfo::new( &Label::AggregateShare, &Role::Helper, @@ -371,8 +367,8 @@ async fn collection_job_success_fixed_size() { ); let decrypted_leader_aggregate_share = hpke::open( - test_case.task.collector_hpke_config().unwrap(), - test_case.collector_hpke_keypair.private_key(), + test_case.task.collector_hpke_keypair().config(), + test_case.task.collector_hpke_keypair().private_key(), &HpkeApplicationInfo::new(&Label::AggregateShare, &Role::Leader, &Role::Collector), collect_resp.leader_encrypted_aggregate_share(), &AggregateShareAad::new( @@ -390,8 +386,8 @@ async fn collection_job_success_fixed_size() { ); let decrypted_helper_aggregate_share = hpke::open( - test_case.task.collector_hpke_config().unwrap(), - test_case.collector_hpke_keypair.private_key(), + test_case.task.collector_hpke_keypair().config(), + test_case.task.collector_hpke_keypair().private_key(), &HpkeApplicationInfo::new(&Label::AggregateShare, &Role::Helper, &Role::Collector), collect_resp.helper_encrypted_aggregate_share(), &AggregateShareAad::new( diff --git a/aggregator/src/aggregator/http_handlers.rs b/aggregator/src/aggregator/http_handlers.rs index 07b5fe247..bf162d8f5 100644 --- a/aggregator/src/aggregator/http_handlers.rs +++ b/aggregator/src/aggregator/http_handlers.rs @@ -674,10 +674,7 @@ mod tests { Datastore, }, query_type::{AccumulableQueryType, CollectableQueryType}, - task::{ - test_util::{NewTaskBuilder, TaskBuilder}, - QueryType, VerifyKey, - }, + task::{test_util::NewTaskBuilder as TaskBuilder, QueryType, Task, VerifyKey}, test_util::noop_meter, }; use janus_core::{ @@ -754,13 +751,11 @@ mod tests { async fn hpke_config() { let (_, _ephemeral_datastore, datastore, handler) = setup_http_handler_test().await; - let task = TaskBuilder::new( - QueryType::TimeInterval, - VdafInstance::Prio3Count, - Role::Leader, - ) - .build(); - datastore.put_task(&task).await.unwrap(); + let task = TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Prio3Count) + .build() + .leader_view() + .unwrap(); + datastore.put_aggregator_task(&task).await.unwrap(); let unknown_task_id: TaskId = random(); let want_hpke_key = task.current_hpke_key().clone(); @@ -970,7 +965,7 @@ mod tests { .unwrap(); // Insert a taskprov task. This task won't have its task-specific HPKE key. - let task = NewTaskBuilder::new(QueryType::TimeInterval, VdafInstance::Prio3Count).build(); + let task = TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Prio3Count).build(); let taskprov_helper_task = task.taskprov_helper_view().unwrap(); datastore .put_aggregator_task(&taskprov_helper_task) @@ -1036,13 +1031,11 @@ mod tests { async fn hpke_config_cors_headers() { let (_, _ephemeral_datastore, datastore, handler) = setup_http_handler_test().await; - let task = TaskBuilder::new( - QueryType::TimeInterval, - VdafInstance::Prio3Count, - Role::Leader, - ) - .build(); - datastore.put_task(&task).await.unwrap(); + let task = TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Prio3Count) + .build() + .leader_view() + .unwrap(); + datastore.put_aggregator_task(&task).await.unwrap(); // Check for appropriate CORS headers in response to a preflight request. let test_conn = TestConn::build( @@ -1098,16 +1091,14 @@ mod tests { let (clock, _ephemeral_datastore, datastore, handler) = setup_http_handler_test().await; const REPORT_EXPIRY_AGE: u64 = 1_000_000; - let task = TaskBuilder::new( - QueryType::TimeInterval, - VdafInstance::Prio3Count, - Role::Leader, - ) - .with_report_expiry_age(Some(Duration::from_seconds(REPORT_EXPIRY_AGE))) - .build(); - datastore.put_task(&task).await.unwrap(); + let task = TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Prio3Count) + .with_report_expiry_age(Some(Duration::from_seconds(REPORT_EXPIRY_AGE))) + .build(); - let report = create_report(&task, clock.now()); + let leader_task = task.leader_view().unwrap(); + datastore.put_aggregator_task(&leader_task).await.unwrap(); + + let report = create_report(&Task::from(leader_task.clone()), clock.now()); // Upload a report. Do this twice to prove that PUT is idempotent. for _ in 0..2 { @@ -1125,10 +1116,10 @@ mod tests { // Verify that new reports using an existing report ID are rejected with reportRejected let duplicate_id_report = create_report_custom( - &task, + &Task::from(leader_task.clone()), clock.now(), *accepted_report_id, - task.current_hpke_key(), + leader_task.current_hpke_key(), ); let mut test_conn = put(task.report_upload_uri().unwrap().path()) .with_request_header(KnownHeaderName::ContentType, Report::MEDIA_TYPE) @@ -1176,7 +1167,7 @@ mod tests { // the error type outdatedConfig. let unused_hpke_config_id = (0..) .map(HpkeConfigId::from) - .find(|id| !task.hpke_keys().contains_key(id)) + .find(|id| !leader_task.hpke_keys().contains_key(id)) .unwrap(); let bad_report = Report::new( report.metadata().clone(), @@ -1233,16 +1224,16 @@ mod tests { .await; // Reports with timestamps past the task's expiration should be rejected. - let task_expire_soon = TaskBuilder::new( - QueryType::TimeInterval, - VdafInstance::Prio3Count, - Role::Leader, - ) - .with_task_expiration(Some(clock.now().add(&Duration::from_seconds(60)).unwrap())) - .build(); - datastore.put_task(&task_expire_soon).await.unwrap(); + let task_expire_soon = TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Prio3Count) + .with_task_expiration(Some(clock.now().add(&Duration::from_seconds(60)).unwrap())) + .build(); + let leader_task_expire_soon = task_expire_soon.leader_view().unwrap(); + datastore + .put_aggregator_task(&leader_task_expire_soon) + .await + .unwrap(); let report_2 = create_report( - &task_expire_soon, + &Task::from(leader_task_expire_soon), clock.now().add(&Duration::from_seconds(120)).unwrap(), ); let mut test_conn = put(task_expire_soon.report_upload_uri().unwrap().path()) @@ -1312,14 +1303,10 @@ mod tests { async fn upload_handler_helper() { let (clock, _ephemeral_datastore, datastore, handler) = setup_http_handler_test().await; - let task = TaskBuilder::new( - QueryType::TimeInterval, - VdafInstance::Prio3Count, - Role::Helper, - ) - .build(); - datastore.put_task(&task).await.unwrap(); - let report = create_report(&task, clock.now()); + let task = TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Prio3Count).build(); + let helper_task = task.helper_view().unwrap(); + datastore.put_aggregator_task(&helper_task).await.unwrap(); + let report = create_report(&Task::from(helper_task), clock.now()); let mut test_conn = put(task.report_upload_uri().unwrap().path()) .with_request_header(KnownHeaderName::ContentType, Report::MEDIA_TYPE) @@ -1354,13 +1341,11 @@ mod tests { async fn aggregate_leader() { let (_, _ephemeral_datastore, datastore, handler) = setup_http_handler_test().await; - let task = TaskBuilder::new( - QueryType::TimeInterval, - VdafInstance::Prio3Count, - Role::Leader, - ) - .build(); - datastore.put_task(&task).await.unwrap(); + let task = TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Prio3Count).build(); + datastore + .put_aggregator_task(&task.leader_view().unwrap()) + .await + .unwrap(); let request = AggregationJobInitializeReq::new( Vec::new(), @@ -1422,14 +1407,14 @@ mod tests { let dap_auth_token = AuthenticationToken::DapAuth(random()); - let task = TaskBuilder::new( - QueryType::TimeInterval, - VdafInstance::Prio3Count, - Role::Helper, - ) - .with_aggregator_auth_token(Some(dap_auth_token.clone())) - .build(); - datastore.put_task(&task).await.unwrap(); + let task = TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Prio3Count) + .with_aggregator_auth_token(dap_auth_token.clone()) + .build(); + + datastore + .put_aggregator_task(&task.helper_view().unwrap()) + .await + .unwrap(); let request = AggregationJobInitializeReq::new( Vec::new(), @@ -1484,16 +1469,17 @@ mod tests { async fn aggregate_init() { let (clock, _ephemeral_datastore, datastore, handler) = setup_http_handler_test().await; - let task = - TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Fake, Role::Helper).build(); + let task = TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Fake).build(); + + let helper_task = task.helper_view().unwrap(); let vdaf = dummy_vdaf::Vdaf::new(); let verify_key: VerifyKey<0> = task.vdaf_verify_key().unwrap(); - let hpke_key = task.current_hpke_key(); + let hpke_key = helper_task.current_hpke_key(); let measurement = (); let prep_init_generator = PrepareInitGenerator::new( clock.clone(), - task.clone(), + helper_task.clone(), vdaf.clone(), dummy_vdaf::AggregationParam(0), ); @@ -1547,7 +1533,7 @@ mod tests { let wrong_hpke_config = loop { let hpke_config = generate_test_hpke_config_and_private_key().config().clone(); - if task.hpke_keys().contains_key(hpke_config.id()) { + if helper_task.hpke_keys().contains_key(hpke_config.id()) { continue; } break hpke_config; @@ -1667,12 +1653,12 @@ mod tests { let (conflicting_aggregation_job, non_conflicting_aggregation_job) = datastore .run_tx(|tx| { - let task = task.clone(); + let task = helper_task.clone(); let report_share_4 = prepare_init_4.report_share().clone(); let report_share_5 = prepare_init_5.report_share().clone(); let report_share_8 = prepare_init_8.report_share().clone(); Box::pin(async move { - tx.put_task(&task).await?; + tx.put_aggregator_task(&task).await?; // report_share_4 and report_share_8 are already in the datastore as they were // referenced by existing aggregation jobs. @@ -1928,24 +1914,29 @@ mod tests { async fn aggregate_init_with_reports_encrypted_by_global_key() { let (clock, _ephemeral_datastore, datastore, _) = setup_http_handler_test().await; - let task = - TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Fake, Role::Helper).build(); - datastore.put_task(&task).await.unwrap(); + let task = TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Fake).build(); + + let helper_task = task.helper_view().unwrap(); + datastore.put_aggregator_task(&helper_task).await.unwrap(); let vdaf = dummy_vdaf::Vdaf::new(); let aggregation_param = dummy_vdaf::AggregationParam(0); - let prep_init_generator = - PrepareInitGenerator::new(clock.clone(), task.clone(), vdaf.clone(), aggregation_param); + let prep_init_generator = PrepareInitGenerator::new( + clock.clone(), + helper_task.clone(), + vdaf.clone(), + aggregation_param, + ); // Insert some global HPKE keys. // Same ID as the task to test having both keys to choose from. let global_hpke_keypair_same_id = generate_test_hpke_config_and_private_key_with_id( - (*task.current_hpke_key().config().id()).into(), + (*helper_task.current_hpke_key().config().id()).into(), ); // Different ID to test misses on the task key. let global_hpke_keypair_different_id = generate_test_hpke_config_and_private_key_with_id( (0..) .map(HpkeConfigId::from) - .find(|id| !task.hpke_keys().contains_key(id)) + .find(|id| !helper_task.hpke_keys().contains_key(id)) .unwrap() .into(), ); @@ -2232,20 +2223,17 @@ mod tests { async fn aggregate_init_prep_init_failed() { let (clock, _ephemeral_datastore, datastore, handler) = setup_http_handler_test().await; - let task = TaskBuilder::new( - QueryType::TimeInterval, - VdafInstance::FakeFailsPrepInit, - Role::Helper, - ) - .build(); + let task = + TaskBuilder::new(QueryType::TimeInterval, VdafInstance::FakeFailsPrepInit).build(); + let helper_task = task.helper_view().unwrap(); let prep_init_generator = PrepareInitGenerator::new( clock.clone(), - task.clone(), + helper_task.clone(), dummy_vdaf::Vdaf::new(), dummy_vdaf::AggregationParam(0), ); - datastore.put_task(&task).await.unwrap(); + datastore.put_aggregator_task(&helper_task).await.unwrap(); let (prepare_init, _) = prep_init_generator.next(&()); let request = AggregationJobInitializeReq::new( @@ -2283,20 +2271,17 @@ mod tests { async fn aggregate_init_prep_step_failed() { let (clock, _ephemeral_datastore, datastore, handler) = setup_http_handler_test().await; - let task = TaskBuilder::new( - QueryType::TimeInterval, - VdafInstance::FakeFailsPrepStep, - Role::Helper, - ) - .build(); + let task = + TaskBuilder::new(QueryType::TimeInterval, VdafInstance::FakeFailsPrepStep).build(); + let helper_task = task.helper_view().unwrap(); let prep_init_generator = PrepareInitGenerator::new( clock.clone(), - task.clone(), + helper_task.clone(), dummy_vdaf::Vdaf::new(), dummy_vdaf::AggregationParam(0), ); - datastore.put_task(&task).await.unwrap(); + datastore.put_aggregator_task(&helper_task).await.unwrap(); let (prepare_init, _) = prep_init_generator.next(&()); let request = AggregationJobInitializeReq::new( @@ -2333,16 +2318,17 @@ mod tests { async fn aggregate_init_duplicated_report_id() { let (clock, _ephemeral_datastore, datastore, handler) = setup_http_handler_test().await; - let task = - TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Fake, Role::Helper).build(); + let task = TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Fake).build(); + + let helper_task = task.helper_view().unwrap(); let prep_init_generator = PrepareInitGenerator::new( clock.clone(), - task.clone(), + helper_task.clone(), dummy_vdaf::Vdaf::new(), dummy_vdaf::AggregationParam(0), ); - datastore.put_task(&task).await.unwrap(); + datastore.put_aggregator_task(&helper_task).await.unwrap(); let (prepare_init, _) = prep_init_generator.next(&()); @@ -2374,16 +2360,13 @@ mod tests { let (clock, _ephemeral_datastore, datastore, handler) = setup_http_handler_test().await; let aggregation_job_id = random(); - let task = TaskBuilder::new( - QueryType::TimeInterval, - VdafInstance::Poplar1 { bits: 1 }, - Role::Helper, - ) - .build(); + let task = + TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Poplar1 { bits: 1 }).build(); + let helper_task = task.helper_view().unwrap(); let vdaf = Arc::new(Poplar1::::new(1)); let verify_key: VerifyKey = task.vdaf_verify_key().unwrap(); - let hpke_key = task.current_hpke_key(); + let hpke_key = helper_task.current_hpke_key(); let measurement = IdpfInput::from_bools(&[true]); let aggregation_param = Poplar1AggregationParam::try_from_prefixes(vec![measurement.clone()]).unwrap(); @@ -2471,7 +2454,7 @@ mod tests { datastore .run_tx(|tx| { - let task = task.clone(); + let task = helper_task.clone(); let (report_share_0, report_share_1, report_share_2) = ( report_share_0.clone(), report_share_1.clone(), @@ -2491,7 +2474,7 @@ mod tests { let helper_aggregate_share = transcript_0.helper_aggregate_share.clone(); Box::pin(async move { - tx.put_task(&task).await?; + tx.put_aggregator_task(&task).await?; tx.put_report_share(task.id(), &report_share_0).await?; tx.put_report_share(task.id(), &report_share_1).await?; @@ -2680,12 +2663,9 @@ mod tests { async fn aggregate_continue_accumulate_batch_aggregation() { let (_, _ephemeral_datastore, datastore, handler) = setup_http_handler_test().await; - let task = TaskBuilder::new( - QueryType::TimeInterval, - VdafInstance::Poplar1 { bits: 1 }, - Role::Helper, - ) - .build(); + let task = + TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Poplar1 { bits: 1 }).build(); + let helper_task = task.helper_view().unwrap(); let aggregation_job_id_0 = random(); let aggregation_job_id_1 = random(); let first_batch_interval_clock = MockClock::default(); @@ -2698,7 +2678,7 @@ mod tests { let vdaf = Poplar1::new(1); let verify_key: VerifyKey = task.vdaf_verify_key().unwrap(); - let hpke_key = task.current_hpke_key(); + let hpke_key = helper_task.current_hpke_key(); let measurement = IdpfInput::from_bools(&[true]); let aggregation_param = Poplar1AggregationParam::try_from_prefixes(vec![measurement.clone()]).unwrap(); @@ -2801,7 +2781,7 @@ mod tests { .unwrap(); let second_batch_want_batch_aggregations = empty_batch_aggregations::>( - &task.view_for_role().unwrap(), + &helper_task, BATCH_AGGREGATION_SHARD_COUNT, &second_batch_identifier, &aggregation_param, @@ -2810,7 +2790,7 @@ mod tests { datastore .run_tx(|tx| { - let task = task.clone(); + let task = helper_task.clone(); let (report_share_0, report_share_1, report_share_2) = ( report_share_0.clone(), report_share_1.clone(), @@ -2831,7 +2811,7 @@ mod tests { second_batch_want_batch_aggregations.clone(); Box::pin(async move { - tx.put_task(&task).await?; + tx.put_aggregator_task(&task).await?; tx.put_report_share(task.id(), &report_share_0).await?; tx.put_report_share(task.id(), &report_share_1).await?; @@ -2941,7 +2921,7 @@ mod tests { let first_batch_got_batch_aggregations: Vec<_> = datastore .run_tx(|tx| { let (task, vdaf, report_metadata_0, aggregation_param) = ( - task.clone(), + helper_task.clone(), vdaf.clone(), report_metadata_0.clone(), aggregation_param.clone(), @@ -2953,7 +2933,7 @@ mod tests { _, >( tx, - &task.view_for_role().unwrap(), + &task, &vdaf, &Interval::new( report_metadata_0 @@ -3023,7 +3003,7 @@ mod tests { let second_batch_got_batch_aggregations = datastore .run_tx(|tx| { let (task, vdaf, report_metadata_2, aggregation_param) = ( - task.clone(), + helper_task.clone(), vdaf.clone(), report_metadata_2.clone(), aggregation_param.clone(), @@ -3035,7 +3015,7 @@ mod tests { _, >( tx, - &task.view_for_role().unwrap(), + &task, &vdaf, &Interval::new( report_metadata_2 @@ -3141,7 +3121,7 @@ mod tests { datastore .run_tx(|tx| { - let task = task.clone(); + let task = helper_task.clone(); let (report_share_3, report_share_4, report_share_5) = ( report_share_3.clone(), report_share_4.clone(), @@ -3244,7 +3224,7 @@ mod tests { let merged_first_batch_aggregation = datastore .run_tx(|tx| { let (task, vdaf, report_metadata_0, aggregation_param) = ( - task.clone(), + helper_task.clone(), vdaf.clone(), report_metadata_0.clone(), aggregation_param.clone(), @@ -3256,7 +3236,7 @@ mod tests { _, >( tx, - &task.view_for_role().unwrap(), + &task, &vdaf, &Interval::new( report_metadata_0 @@ -3331,7 +3311,7 @@ mod tests { let second_batch_got_batch_aggregations = datastore .run_tx(|tx| { let (task, vdaf, report_metadata_2, aggregation_param) = ( - task.clone(), + helper_task.clone(), vdaf.clone(), report_metadata_2.clone(), aggregation_param.clone(), @@ -3343,7 +3323,7 @@ mod tests { _, >( tx, - &task.view_for_role().unwrap(), + &task, &vdaf, &Interval::new( report_metadata_2 @@ -3371,12 +3351,9 @@ mod tests { let (_, _ephemeral_datastore, datastore, handler) = setup_http_handler_test().await; // Prepare parameters. - let task = TaskBuilder::new( - QueryType::TimeInterval, - VdafInstance::Poplar1 { bits: 1 }, - Role::Helper, - ) - .build(); + let task = + TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Poplar1 { bits: 1 }).build(); + let helper_task = task.helper_view().unwrap(); let report_id = random(); let aggregation_param = Poplar1AggregationParam::try_from_prefixes(Vec::from([ IdpfInput::from_bools(&[false]), @@ -3399,13 +3376,13 @@ mod tests { datastore .run_tx(|tx| { let (task, aggregation_param, report_metadata, transcript) = ( - task.clone(), + helper_task.clone(), aggregation_param.clone(), report_metadata.clone(), transcript.clone(), ); Box::pin(async move { - tx.put_task(&task).await?; + tx.put_aggregator_task(&task).await?; tx.put_report_share( task.id(), &ReportShare::new( @@ -3485,12 +3462,9 @@ mod tests { let (_, _ephemeral_datastore, datastore, handler) = setup_http_handler_test().await; // Prepare parameters. - let task = TaskBuilder::new( - QueryType::TimeInterval, - VdafInstance::Poplar1 { bits: 1 }, - Role::Helper, - ) - .build(); + let task = + TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Poplar1 { bits: 1 }).build(); + let helper_task = task.helper_view().unwrap(); let vdaf = Poplar1::new_shake128(1); let report_id = random(); let aggregation_param = Poplar1AggregationParam::try_from_prefixes(Vec::from([ @@ -3509,7 +3483,7 @@ mod tests { let helper_report_share = generate_helper_report_share::>( *task.id(), report_metadata.clone(), - task.current_hpke_key().config(), + helper_task.current_hpke_key().config(), &transcript.public_share, Vec::new(), &transcript.helper_input_share, @@ -3519,7 +3493,7 @@ mod tests { datastore .run_tx(|tx| { let (task, aggregation_param, report_metadata, transcript, helper_report_share) = ( - task.clone(), + helper_task.clone(), aggregation_param.clone(), report_metadata.clone(), transcript.clone(), @@ -3527,7 +3501,7 @@ mod tests { ); Box::pin(async move { - tx.put_task(&task).await?; + tx.put_aggregator_task(&task).await?; tx.put_report_share(task.id(), &helper_report_share).await?; tx.put_aggregation_job(&AggregationJob::< 16, @@ -3655,12 +3629,9 @@ mod tests { let (_, _ephemeral_datastore, datastore, handler) = setup_http_handler_test().await; // Prepare parameters. - let task = TaskBuilder::new( - QueryType::TimeInterval, - VdafInstance::Poplar1 { bits: 1 }, - Role::Helper, - ) - .build(); + let task = + TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Poplar1 { bits: 1 }).build(); + let helper_task = task.helper_view().unwrap(); let report_id = random(); let aggregation_param = Poplar1AggregationParam::try_from_prefixes(Vec::from([ IdpfInput::from_bools(&[false]), @@ -3680,14 +3651,14 @@ mod tests { datastore .run_tx(|tx| { let (task, aggregation_param, report_metadata, transcript) = ( - task.clone(), + helper_task.clone(), aggregation_param.clone(), report_metadata.clone(), transcript.clone(), ); Box::pin(async move { - tx.put_task(&task).await?; + tx.put_aggregator_task(&task).await?; tx.put_report_share( task.id(), &ReportShare::new( @@ -3768,12 +3739,9 @@ mod tests { let (_, _ephemeral_datastore, datastore, handler) = setup_http_handler_test().await; // Prepare parameters. - let task = TaskBuilder::new( - QueryType::TimeInterval, - VdafInstance::Poplar1 { bits: 1 }, - Role::Helper, - ) - .build(); + let task = + TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Poplar1 { bits: 1 }).build(); + let helper_task = task.helper_view().unwrap(); let report_id_0 = random(); let aggregation_param = Poplar1AggregationParam::try_from_prefixes(Vec::from([ IdpfInput::from_bools(&[false]), @@ -3811,7 +3779,7 @@ mod tests { transcript_0, transcript_1, ) = ( - task.clone(), + helper_task.clone(), aggregation_param.clone(), report_metadata_0.clone(), report_metadata_1.clone(), @@ -3820,7 +3788,7 @@ mod tests { ); Box::pin(async move { - tx.put_task(&task).await?; + tx.put_aggregator_task(&task).await?; tx.put_report_share( task.id(), @@ -3940,8 +3908,8 @@ mod tests { let (_, _ephemeral_datastore, datastore, handler) = setup_http_handler_test().await; // Prepare parameters. - let task = - TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Fake, Role::Helper).build(); + let task = TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Fake).build(); + let helper_task = task.helper_view().unwrap(); let aggregation_job_id = random(); let report_metadata = ReportMetadata::new( ReportId::from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]), @@ -3951,9 +3919,9 @@ mod tests { // Setup datastore. datastore .run_tx(|tx| { - let (task, report_metadata) = (task.clone(), report_metadata.clone()); + let (task, report_metadata) = (helper_task.clone(), report_metadata.clone()); Box::pin(async move { - tx.put_task(&task).await?; + tx.put_aggregator_task(&task).await?; tx.put_report_share( task.id(), &ReportShare::new( @@ -4125,10 +4093,11 @@ mod tests { let (_, _ephemeral_datastore, datastore, handler) = setup_http_handler_test().await; // Prepare parameters. - let task = TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Fake, Role::Leader) + let task = TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Fake) .with_min_batch_size(1) .build(); - datastore.put_task(&task).await.unwrap(); + let leader_task = task.leader_view().unwrap(); + datastore.put_aggregator_task(&leader_task).await.unwrap(); let collection_job_id: CollectionJobId = random(); let request = CollectionReq::new( @@ -4142,10 +4111,7 @@ mod tests { dummy_vdaf::AggregationParam::default().get_encoded(), ); - let (header, value) = task - .collector_auth_token() - .unwrap() - .request_authentication(); + let (header, value) = task.collector_auth_token().request_authentication(); let mut test_conn = put(task.collection_job_uri(&collection_job_id).unwrap().path()) .with_request_header(header, value) .with_request_header( @@ -4206,7 +4172,7 @@ mod tests { .put_collection_job_with_auth_token( &collection_job_id, &req, - test_case.task.aggregator_auth_token(), + Some(test_case.task.aggregator_auth_token()), ) .await; @@ -4283,7 +4249,7 @@ mod tests { let mut test_conn = test_case .post_collection_job_with_auth_token( &collection_job_id, - test_case.task.aggregator_auth_token(), + Some(test_case.task.aggregator_auth_token()), ) .await; @@ -4322,7 +4288,7 @@ mod tests { let test_case = setup_collection_job_test_case(Role::Leader, QueryType::TimeInterval).await; let batch_interval = TimeInterval::to_batch_identifier( - &test_case.task.view_for_role().unwrap(), + &test_case.task.leader_view().unwrap(), &(), &Time::from_seconds_since_epoch(0), ) @@ -4414,7 +4380,7 @@ mod tests { let helper_aggregate_share_bytes = helper_aggregate_share.get_encoded(); Box::pin(async move { let encrypted_helper_aggregate_share = hpke::seal( - task.collector_hpke_config().unwrap(), + task.collector_hpke_keypair().config(), &HpkeApplicationInfo::new( &Label::AggregateShare, &Role::Helper, @@ -4467,8 +4433,8 @@ mod tests { assert_eq!(collect_resp.interval(), &batch_interval); let decrypted_leader_aggregate_share = hpke::open( - test_case.task.collector_hpke_config().unwrap(), - test_case.collector_hpke_keypair.private_key(), + test_case.task.collector_hpke_keypair().config(), + test_case.task.collector_hpke_keypair().private_key(), &HpkeApplicationInfo::new(&Label::AggregateShare, &Role::Leader, &Role::Collector), collect_resp.leader_encrypted_aggregate_share(), &AggregateShareAad::new( @@ -4486,8 +4452,8 @@ mod tests { ); let decrypted_helper_aggregate_share = hpke::open( - test_case.task.collector_hpke_config().unwrap(), - test_case.collector_hpke_keypair.private_key(), + test_case.task.collector_hpke_keypair().config(), + test_case.task.collector_hpke_keypair().private_key(), &HpkeApplicationInfo::new(&Label::AggregateShare, &Role::Helper, &Role::Collector), collect_resp.helper_encrypted_aggregate_share(), &AggregateShareAad::new( @@ -4514,7 +4480,6 @@ mod tests { let (header, value) = test_case .task .collector_auth_token() - .unwrap() .request_authentication(); let test_conn = post(&format!( "/tasks/{}/collection_jobs/{no_such_collection_job_id}", @@ -4680,7 +4645,6 @@ mod tests { let (header, value) = test_case .task .collector_auth_token() - .unwrap() .request_authentication(); // Try to delete a collection job that doesn't exist @@ -4731,9 +4695,9 @@ mod tests { let (_, _ephemeral_datastore, datastore, handler) = setup_http_handler_test().await; // Prepare parameters. - let task = - TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Fake, Role::Leader).build(); - datastore.put_task(&task).await.unwrap(); + let task = TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Fake).build(); + let leader_task = task.leader_view().unwrap(); + datastore.put_aggregator_task(&leader_task).await.unwrap(); let request = AggregateShareReq::new( BatchSelector::new_time_interval( @@ -4744,10 +4708,7 @@ mod tests { ReportIdChecksum::default(), ); - let (header, value) = task - .aggregator_auth_token() - .unwrap() - .request_authentication(); + let (header, value) = task.aggregator_auth_token().request_authentication(); let mut test_conn = post(task.aggregate_shares_uri().unwrap().path()) .with_request_header(header, value) @@ -4777,10 +4738,11 @@ mod tests { // Prepare parameters. const REPORT_EXPIRY_AGE: Duration = Duration::from_seconds(3600); - let task = TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Fake, Role::Helper) + let task = TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Fake) .with_report_expiry_age(Some(REPORT_EXPIRY_AGE)) .build(); - datastore.put_task(&task).await.unwrap(); + let helper_task = task.helper_view().unwrap(); + datastore.put_aggregator_task(&helper_task).await.unwrap(); let request = AggregateShareReq::new( BatchSelector::new_time_interval( @@ -4796,10 +4758,7 @@ mod tests { ReportIdChecksum::default(), ); - let (header, value) = task - .aggregator_auth_token() - .unwrap() - .request_authentication(); + let (header, value) = task.aggregator_auth_token().request_authentication(); // Test that a request for an invalid batch fails. (Specifically, the batch interval is too // small.) @@ -4853,14 +4812,13 @@ mod tests { async fn aggregate_share_request() { let (_, _ephemeral_datastore, datastore, handler) = setup_http_handler_test().await; - let collector_hpke_keypair = generate_test_hpke_config_and_private_key(); - let task = TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Fake, Role::Helper) + let task = TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Fake) .with_max_batch_query_count(1) .with_time_precision(Duration::from_seconds(500)) .with_min_batch_size(10) - .with_collector_hpke_config(collector_hpke_keypair.config().clone()) .build(); - datastore.put_task(&task).await.unwrap(); + let helper_task = task.helper_view().unwrap(); + datastore.put_aggregator_task(&helper_task).await.unwrap(); // There are no batch aggregations in the datastore yet let request = AggregateShareReq::new( @@ -4872,10 +4830,7 @@ mod tests { ReportIdChecksum::default(), ); - let (header, value) = task - .aggregator_auth_token() - .unwrap() - .request_authentication(); + let (header, value) = task.aggregator_auth_token().request_authentication(); let mut test_conn = post(task.aggregate_shares_uri().unwrap().path()) .with_request_header(header, value) @@ -4901,7 +4856,7 @@ mod tests { // Put some batch aggregations in the DB. datastore .run_tx(|tx| { - let task = task.clone(); + let task = helper_task.clone(); Box::pin(async move { for aggregation_param in [ dummy_vdaf::AggregationParam(0), @@ -5059,10 +5014,7 @@ mod tests { 5, ReportIdChecksum::default(), ); - let (header, value) = task - .aggregator_auth_token() - .unwrap() - .request_authentication(); + let (header, value) = task.aggregator_auth_token().request_authentication(); let mut test_conn = post(task.aggregate_shares_uri().unwrap().path()) .with_request_header(header, value) .with_request_header( @@ -5113,10 +5065,7 @@ mod tests { ReportIdChecksum::get_decoded(&[4 ^ 8; 32]).unwrap(), ), ] { - let (header, value) = task - .aggregator_auth_token() - .unwrap() - .request_authentication(); + let (header, value) = task.aggregator_auth_token().request_authentication(); let mut test_conn = post(task.aggregate_shares_uri().unwrap().path()) .with_request_header(header, value) .with_request_header( @@ -5179,10 +5128,7 @@ mod tests { // Request the aggregate share multiple times. If the request parameters don't change, // then there is no query count violation and all requests should succeed. for iteration in 0..3 { - let (header, value) = task - .aggregator_auth_token() - .unwrap() - .request_authentication(); + let (header, value) = task.aggregator_auth_token().request_authentication(); let mut test_conn = post(task.aggregate_shares_uri().unwrap().path()) .with_request_header(header, value) .with_request_header( @@ -5206,8 +5152,8 @@ mod tests { decode_response_body(&mut test_conn).await; let aggregate_share = hpke::open( - collector_hpke_keypair.config(), - collector_hpke_keypair.private_key(), + task.collector_hpke_keypair().config(), + task.collector_hpke_keypair().private_key(), &HpkeApplicationInfo::new( &Label::AggregateShare, &Role::Helper, @@ -5247,10 +5193,7 @@ mod tests { 20, ReportIdChecksum::get_decoded(&[8 ^ 4 ^ 3 ^ 2; 32]).unwrap(), ); - let (header, value) = task - .aggregator_auth_token() - .unwrap() - .request_authentication(); + let (header, value) = task.aggregator_auth_token().request_authentication(); let mut test_conn = post(task.aggregate_shares_uri().unwrap().path()) .with_request_header(header, value) .with_request_header( @@ -5299,10 +5242,7 @@ mod tests { ReportIdChecksum::get_decoded(&[4 ^ 8; 32]).unwrap(), ), ] { - let (header, value) = task - .aggregator_auth_token() - .unwrap() - .request_authentication(); + let (header, value) = task.aggregator_auth_token().request_authentication(); let mut test_conn = post(task.aggregate_shares_uri().unwrap().path()) .with_request_header(header, value) .with_request_header(