diff --git a/aggregator/src/aggregator.rs b/aggregator/src/aggregator.rs index dda78b60f..25898d692 100644 --- a/aggregator/src/aggregator.rs +++ b/aggregator/src/aggregator.rs @@ -409,6 +409,7 @@ impl Aggregator { task_aggregator .handle_aggregate_init( &self.datastore, + &self.clock, &self.global_hpke_keypairs, &self.aggregate_step_failure_counter, self.cfg.batch_aggregation_shard_count, @@ -942,6 +943,7 @@ impl TaskAggregator { async fn handle_aggregate_init( &self, datastore: &Datastore, + clock: &C, global_hpke_keypairs: &GlobalHpkeKeypairCache, aggregate_step_failure_counter: &Counter, batch_aggregation_shard_count: u64, @@ -951,6 +953,7 @@ impl TaskAggregator { self.vdaf_ops .handle_aggregate_init( datastore, + clock, global_hpke_keypairs, aggregate_step_failure_counter, Arc::clone(&self.task), @@ -1283,6 +1286,7 @@ impl VdafOps { async fn handle_aggregate_init( &self, datastore: &Datastore, + clock: &C, global_hpke_keypairs: &GlobalHpkeKeypairCache, aggregate_step_failure_counter: &Counter, task: Arc, @@ -1295,6 +1299,7 @@ impl VdafOps { vdaf_ops_dispatch!(self, (vdaf, verify_key, VdafType, VERIFY_KEY_LENGTH) => { Self::handle_aggregate_init_generic::( datastore, + clock, global_hpke_keypairs, Arc::clone(vdaf), aggregate_step_failure_counter, @@ -1311,6 +1316,7 @@ impl VdafOps { vdaf_ops_dispatch!(self, (vdaf, verify_key, VdafType, VERIFY_KEY_LENGTH) => { Self::handle_aggregate_init_generic::( datastore, + clock, global_hpke_keypairs, Arc::clone(vdaf), aggregate_step_failure_counter, @@ -1625,6 +1631,7 @@ impl VdafOps { /// [1]: https://www.ietf.org/archive/id/draft-ietf-ppm-dap-07.html#name-helper-initialization async fn handle_aggregate_init_generic( datastore: &Datastore, + clock: &C, global_hpke_keypairs: &GlobalHpkeKeypairCache, vdaf: Arc, aggregate_step_failure_counter: &Counter, @@ -1650,6 +1657,11 @@ impl VdafOps { let request_hash = digest(&SHA256, req_bytes).as_ref().try_into().unwrap(); let req = AggregationJobInitializeReq::::get_decoded(req_bytes)?; + let report_deadline = clock + .now() + .add(task.tolerable_clock_skew()) + .map_err(Error::from)?; + // If two ReportShare messages have the same report ID, then the helper MUST abort with // error "invalidMessage". (§4.5.1.2) let mut seen_report_ids = HashSet::with_capacity(req.prepare_inits().len()); @@ -1833,6 +1845,19 @@ impl VdafOps { let shares = input_share.and_then(|input_share| Ok((public_share?, input_share))); + // Reject reports from too far in the future. + let shares = shares.and_then(|shares| { + if prepare_init + .report_share() + .metadata() + .time() + .is_after(&report_deadline) + { + return Err(PrepareError::ReportTooEarly); + } + Ok(shares) + }); + // Next, the aggregator runs the preparation-state initialization algorithm for the VDAF // associated with the task and computes the first state transition. [...] If either // step fails, then the aggregator MUST fail with error `vdaf-prep-error`. (§4.4.2.2) diff --git a/aggregator/src/aggregator/aggregate_init_tests.rs b/aggregator/src/aggregator/aggregate_init_tests.rs index d6d305914..69f89f79f 100644 --- a/aggregator/src/aggregator/aggregate_init_tests.rs +++ b/aggregator/src/aggregator/aggregate_init_tests.rs @@ -27,7 +27,8 @@ use janus_core::{ }; use janus_messages::{ query_type::TimeInterval, AggregationJobId, AggregationJobInitializeReq, AggregationJobResp, - HpkeConfig, PartialBatchSelector, PrepareInit, PrepareStepResult, ReportMetadata, ReportShare, + Duration, HpkeConfig, PartialBatchSelector, PrepareError, PrepareInit, PrepareStepResult, + ReportMetadata, ReportShare, }; use prio::{ codec::Encode, @@ -159,9 +160,8 @@ pub(super) struct AggregationJobInitTestCase< pub(super) clock: MockClock, pub(super) task: Task, pub(super) prepare_init_generator: PrepareInitGenerator, - pub(super) prepare_inits: Vec, pub(super) aggregation_job_id: AggregationJobId, - aggregation_job_init_req: AggregationJobInitializeReq, + pub(super) aggregation_job_init_req: AggregationJobInitializeReq, aggregation_job_init_resp: Option, pub(super) aggregation_param: V::AggregationParam, pub(super) handler: Box, @@ -287,7 +287,6 @@ async fn setup_aggregate_init_test_without_sending_request< AggregationJobInitTestCase { clock, task, - prepare_inits, prepare_init_generator, aggregation_job_id, aggregation_job_init_req, @@ -396,7 +395,7 @@ async fn aggregation_job_mutation_aggregation_job() { let mutated_aggregation_job_init_req = AggregationJobInitializeReq::new( dummy_vdaf::AggregationParam(1).get_encoded(), PartialBatchSelector::new_time_interval(), - test_case.prepare_inits, + test_case.aggregation_job_init_req.prepare_inits().to_vec(), ); let response = put_aggregation_job( @@ -413,25 +412,27 @@ async fn aggregation_job_mutation_aggregation_job() { async fn aggregation_job_mutation_report_shares() { let test_case = setup_aggregate_init_test().await; + let prepare_inits = test_case.aggregation_job_init_req.prepare_inits(); + // Put the aggregation job again, mutating the associated report shares' metadata such that // uniqueness constraints on client_reports are violated for mutated_prepare_inits in [ // Omit a report share that was included previously - Vec::from(&test_case.prepare_inits[0..test_case.prepare_inits.len() - 1]), + Vec::from(&prepare_inits[0..prepare_inits.len() - 1]), // Include a different report share than was included previously [ - &test_case.prepare_inits[0..test_case.prepare_inits.len() - 1], + &prepare_inits[0..prepare_inits.len() - 1], &[test_case.prepare_init_generator.next(&()).0], ] .concat(), // Include an extra report share than was included previously [ - test_case.prepare_inits.as_slice(), + prepare_inits, &[test_case.prepare_init_generator.next(&()).0], ] .concat(), // Reverse the order of the reports - test_case.prepare_inits.into_iter().rev().collect(), + prepare_inits.iter().rev().cloned().collect(), ] { let mutated_aggregation_job_init_req = AggregationJobInitializeReq::new( test_case.aggregation_param.get_encoded(), @@ -458,7 +459,8 @@ async fn aggregation_job_mutation_report_aggregations() { // values such that the prepare state computed during aggregation initializaton won't match the // first aggregation job. let mutated_prepare_inits = test_case - .prepare_inits + .aggregation_job_init_req + .prepare_inits() .iter() .map(|s| { test_case @@ -487,6 +489,80 @@ async fn aggregation_job_mutation_report_aggregations() { assert_eq!(response.status(), Some(Status::Conflict)); } +#[tokio::test] +async fn aggregation_job_intolerable_clock_skew() { + let mut test_case = setup_aggregate_init_test_without_sending_request( + dummy_vdaf::Vdaf::new(), + VdafInstance::Fake, + dummy_vdaf::AggregationParam(0), + (), + AuthenticationToken::Bearer(random()), + ) + .await; + + test_case.aggregation_job_init_req = AggregationJobInitializeReq::new( + test_case.aggregation_param.get_encoded(), + PartialBatchSelector::new_time_interval(), + Vec::from([ + // Barely tolerable. + test_case + .prepare_init_generator + .next_with_metadata( + ReportMetadata::new( + random(), + test_case + .clock + .now() + .add(test_case.task.tolerable_clock_skew()) + .unwrap(), + ), + &(), + ) + .0, + // Barely intolerable. + test_case + .prepare_init_generator + .next_with_metadata( + ReportMetadata::new( + random(), + test_case + .clock + .now() + .add(test_case.task.tolerable_clock_skew()) + .unwrap() + .add(&Duration::from_seconds(1)) + .unwrap(), + ), + &(), + ) + .0, + ]), + ); + + let mut response = put_aggregation_job( + &test_case.task, + &test_case.aggregation_job_id, + &test_case.aggregation_job_init_req, + &test_case.handler, + ) + .await; + assert_eq!(response.status(), Some(Status::Ok)); + + let aggregation_job_init_resp: AggregationJobResp = decode_response_body(&mut response).await; + assert_eq!( + aggregation_job_init_resp.prepare_resps().len(), + test_case.aggregation_job_init_req.prepare_inits().len(), + ); + assert_matches!( + aggregation_job_init_resp.prepare_resps()[0].result(), + &PrepareStepResult::Continue { .. } + ); + assert_matches!( + aggregation_job_init_resp.prepare_resps()[1].result(), + &PrepareStepResult::Reject(PrepareError::ReportTooEarly) + ); +} + #[tokio::test] async fn aggregation_job_init_two_step_vdaf_idempotence() { // We must run Poplar1 in this test so that the aggregation job won't finish on the first step @@ -517,7 +593,7 @@ async fn aggregation_job_init_wrong_query() { let wrong_query = AggregationJobInitializeReq::new( test_case.aggregation_param.get_encoded(), PartialBatchSelector::new_fixed_size(random()), - test_case.prepare_inits, + test_case.aggregation_job_init_req.prepare_inits().to_vec(), ); let (header, value) = test_case diff --git a/aggregator/src/aggregator/http_handlers.rs b/aggregator/src/aggregator/http_handlers.rs index a6eeca427..38ce77886 100644 --- a/aggregator/src/aggregator/http_handlers.rs +++ b/aggregator/src/aggregator/http_handlers.rs @@ -2398,7 +2398,10 @@ mod tests { // This report has the same ID as the previous one, but a different timestamp. let mutated_timestamp_report_metadata = ReportMetadata::new( - *test_case.prepare_inits[0].report_share().metadata().id(), + *test_case.aggregation_job_init_req.prepare_inits()[0] + .report_share() + .metadata() + .id(), test_case .clock .now() @@ -2454,11 +2457,15 @@ mod tests { assert_eq!(client_reports.len(), 2); assert_eq!( &client_reports[0], - test_case.prepare_inits[0].report_share().metadata() + test_case.aggregation_job_init_req.prepare_inits()[0] + .report_share() + .metadata() ); assert_eq!( &client_reports[1], - test_case.prepare_inits[1].report_share().metadata() + test_case.aggregation_job_init_req.prepare_inits()[1] + .report_share() + .metadata() ); }