diff --git a/aggregator/src/aggregator/aggregation_job_creator.rs b/aggregator/src/aggregator/aggregation_job_creator.rs index 08c963d32..fb0c8d7aa 100644 --- a/aggregator/src/aggregator/aggregation_job_creator.rs +++ b/aggregator/src/aggregator/aggregation_job_creator.rs @@ -631,7 +631,7 @@ impl AggregationJobCreator { self: Arc, task: Arc, vdaf: Arc, - task_max_batch_size: u64, + task_max_batch_size: Option, task_batch_time_window_size: Option, ) -> anyhow::Result where @@ -646,7 +646,7 @@ impl AggregationJobCreator { { let (task_min_batch_size, task_max_batch_size) = ( usize::try_from(task.min_batch_size())?, - usize::try_from(task_max_batch_size)?, + task_max_batch_size.map(usize::try_from).transpose()?, ); Ok(self .datastore @@ -1430,7 +1430,7 @@ mod tests { let task = Arc::new( TaskBuilder::new( TaskQueryType::FixedSize { - max_batch_size: MAX_BATCH_SIZE as u64, + max_batch_size: Some(MAX_BATCH_SIZE as u64), batch_time_window_size: None, }, VdafInstance::Prio3Count, @@ -1637,7 +1637,7 @@ mod tests { let task = Arc::new( TaskBuilder::new( TaskQueryType::FixedSize { - max_batch_size: MAX_BATCH_SIZE as u64, + max_batch_size: Some(MAX_BATCH_SIZE as u64), batch_time_window_size: None, }, VdafInstance::Prio3Count, @@ -1794,7 +1794,7 @@ mod tests { let task = Arc::new( TaskBuilder::new( TaskQueryType::FixedSize { - max_batch_size: MAX_BATCH_SIZE as u64, + max_batch_size: Some(MAX_BATCH_SIZE as u64), batch_time_window_size: None, }, VdafInstance::Prio3Count, @@ -2056,7 +2056,7 @@ mod tests { let task = Arc::new( TaskBuilder::new( TaskQueryType::FixedSize { - max_batch_size: MAX_BATCH_SIZE as u64, + max_batch_size: Some(MAX_BATCH_SIZE as u64), batch_time_window_size: None, }, VdafInstance::Prio3Count, @@ -2325,7 +2325,7 @@ mod tests { let task = Arc::new( TaskBuilder::new( TaskQueryType::FixedSize { - max_batch_size: MAX_BATCH_SIZE as u64, + max_batch_size: Some(MAX_BATCH_SIZE as u64), batch_time_window_size: Some(batch_time_window_size), }, VdafInstance::Prio3Count, @@ -2589,6 +2589,187 @@ mod tests { ); } + #[tokio::test] + async fn create_aggregation_jobs_for_fixed_size_task_no_max_batch_size() { + // Setup. + install_test_trace_subscriber(); + let clock: MockClock = MockClock::default(); + let ephemeral_datastore = ephemeral_datastore().await; + let ds = ephemeral_datastore.datastore(clock.clone()).await; + + const MIN_AGGREGATION_JOB_SIZE: usize = 50; + const MAX_AGGREGATION_JOB_SIZE: usize = 60; + const MIN_BATCH_SIZE: usize = 200; + + let task = Arc::new( + TaskBuilder::new( + TaskQueryType::FixedSize { + max_batch_size: None, + batch_time_window_size: None, + }, + VdafInstance::Prio3Count, + ) + .with_min_batch_size(MIN_BATCH_SIZE as u64) + .build() + .leader_view() + .unwrap(), + ); + + // Create MIN_BATCH_SIZE + MIN_BATCH_SIZE + MIN_AGGREGATION_JOB_SIZE reports. We expect + // aggregation jobs to be created containing all these reports, but only two batches. + let report_time = clock.now(); + let vdaf = Arc::new(Prio3::new_count(2).unwrap()); + let helper_hpke_keypair = generate_test_hpke_config_and_private_key(); + let reports: Arc> = Arc::new( + iter::repeat_with(|| { + let report_metadata = ReportMetadata::new(random(), report_time); + let transcript = run_vdaf( + vdaf.as_ref(), + task.vdaf_verify_key().unwrap().as_bytes(), + &(), + report_metadata.id(), + &false, + ); + LeaderStoredReport::generate( + *task.id(), + report_metadata, + helper_hpke_keypair.config(), + Vec::new(), + &transcript, + ) + }) + .take(MIN_BATCH_SIZE + MIN_BATCH_SIZE + MIN_AGGREGATION_JOB_SIZE) + .collect(), + ); + + let report_ids: HashSet = reports + .iter() + .map(|report| *report.metadata().id()) + .collect(); + + ds.run_unnamed_tx(|tx| { + let task = Arc::clone(&task); + let vdaf = Arc::clone(&vdaf); + let reports = Arc::clone(&reports); + + Box::pin(async move { + tx.put_aggregator_task(&task).await.unwrap(); + for report in reports.iter() { + tx.put_client_report(vdaf.as_ref(), report).await.unwrap(); + } + Ok(()) + }) + }) + .await + .unwrap(); + + // Run. + let job_creator = Arc::new(AggregationJobCreator::new( + ds, + noop_meter(), + Duration::from_secs(3600), + Duration::from_secs(1), + MIN_AGGREGATION_JOB_SIZE, + MAX_AGGREGATION_JOB_SIZE, + )); + Arc::clone(&job_creator) + .create_aggregation_jobs_for_task(Arc::clone(&task)) + .await + .unwrap(); + + // Verify. + let want_ra_states: Arc> = Arc::new( + reports + .iter() + .map(|report| { + ( + *report.metadata().id(), + report + .as_start_leader_report_aggregation(random(), 0) + .state() + .clone(), + ) + }) + .collect(), + ); + let (outstanding_batches, (agg_jobs, _)) = + job_creator + .datastore + .run_unnamed_tx(|tx| { + let task = Arc::clone(&task); + let vdaf = Arc::clone(&vdaf); + let want_ra_states = Arc::clone(&want_ra_states); + + Box::pin(async move { + Ok(( + tx.get_outstanding_batches(task.id(), &None).await.unwrap(), + read_and_verify_aggregate_info_for_task::< + VERIFY_KEY_LENGTH, + FixedSize, + _, + _, + >( + tx, vdaf.as_ref(), task.id(), want_ra_states.as_ref() + ) + .await, + )) + }) + }) + .await + .unwrap(); + + // Verify outstanding batches. + let mut total_max_size = 0; + println!("{outstanding_batches:?}"); + + for outstanding_batch in &outstanding_batches { + assert_eq!(outstanding_batch.size().start(), &0); + assert!( + outstanding_batch.size().end() == &MIN_BATCH_SIZE + || outstanding_batch.size().end() == &MIN_AGGREGATION_JOB_SIZE + ); + total_max_size += *outstanding_batch.size().end(); + } + assert_eq!( + total_max_size, + 2 * MIN_BATCH_SIZE + MIN_AGGREGATION_JOB_SIZE + ); + let batch_ids: HashSet<_> = outstanding_batches + .iter() + .map(|outstanding_batch| *outstanding_batch.id()) + .collect(); + + // Verify aggregation jobs. + let mut seen_report_ids = HashSet::new(); + let mut batches_with_small_agg_jobs = HashSet::new(); + for (agg_job, report_ids) in agg_jobs { + // Aggregation jobs are created in step 0. + assert_eq!(agg_job.step(), AggregationJobStep::from(0)); + + // Every batch corresponds to one of the outstanding batches. + assert!(batch_ids.contains(agg_job.batch_id())); + + // At most one aggregation job per batch will be smaller than the normal minimum + // aggregation job size. + if report_ids.len() < MIN_AGGREGATION_JOB_SIZE { + assert!(!batches_with_small_agg_jobs.contains(agg_job.batch_id())); + batches_with_small_agg_jobs.insert(*agg_job.batch_id()); + } + + // The aggregation job is at most MAX_AGGREGATION_JOB_SIZE in size. + assert!(report_ids.len() <= MAX_AGGREGATION_JOB_SIZE); + + // Report IDs are not repeated across or inside aggregation jobs. + for report_id in report_ids { + assert!(!seen_report_ids.contains(&report_id)); + seen_report_ids.insert(report_id); + } + } + + // Every client report was added to some aggregation job. + assert_eq!(report_ids, seen_report_ids); + } + /// Test helper function that reads all aggregation jobs & batches for a given task ID, /// returning the aggregation jobs, the report IDs included in the aggregation job, and the /// batches. Report IDs are returned in the order they are included in the aggregation job, and diff --git a/aggregator/src/aggregator/aggregation_job_driver.rs b/aggregator/src/aggregator/aggregation_job_driver.rs index 654408fd1..c6135513c 100644 --- a/aggregator/src/aggregator/aggregation_job_driver.rs +++ b/aggregator/src/aggregator/aggregation_job_driver.rs @@ -2246,7 +2246,7 @@ mod tests { let task = TaskBuilder::new( QueryType::FixedSize { - max_batch_size: 10, + max_batch_size: Some(10), batch_time_window_size: None, }, VdafInstance::Prio3Count, @@ -2501,7 +2501,7 @@ mod tests { let task = TaskBuilder::new( QueryType::FixedSize { - max_batch_size: 10, + max_batch_size: Some(10), batch_time_window_size: None, }, VdafInstance::Poplar1 { bits: 1 }, @@ -3171,7 +3171,7 @@ mod tests { let task = TaskBuilder::new( QueryType::FixedSize { - max_batch_size: 10, + max_batch_size: Some(10), batch_time_window_size: None, }, VdafInstance::Poplar1 { bits: 1 }, diff --git a/aggregator/src/aggregator/batch_creator.rs b/aggregator/src/aggregator/batch_creator.rs index f23178a72..57005a19c 100644 --- a/aggregator/src/aggregator/batch_creator.rs +++ b/aggregator/src/aggregator/batch_creator.rs @@ -45,7 +45,7 @@ struct Properties { max_aggregation_job_size: usize, task_id: TaskId, task_min_batch_size: usize, - task_max_batch_size: usize, + effective_task_max_batch_size: usize, task_batch_time_window_size: Option, } @@ -59,7 +59,7 @@ where max_aggregation_job_size: usize, task_id: TaskId, task_min_batch_size: usize, - task_max_batch_size: usize, + task_max_batch_size: Option, task_batch_time_window_size: Option, aggregation_job_writer: &'a mut AggregationJobWriter, ) -> Self { @@ -69,7 +69,13 @@ where max_aggregation_job_size, task_id, task_min_batch_size, - task_max_batch_size, + // If the task has no explicit max_batch_size set, then our goal is to create + // batches of exactly min_batch_size reports, so we use that value as the effective + // maximum batch size, but we may create batches which exceed this size. See + // process_batches, below. + // + // https://datatracker.ietf.org/doc/html/draft-ietf-ppm-dap-09#section-4.1.2-6 + effective_task_max_batch_size: task_max_batch_size.unwrap_or(task_min_batch_size), task_batch_time_window_size, }, aggregation_job_writer, @@ -153,7 +159,8 @@ where return Ok(()); } // Discard any outstanding batches that do not currently have room for more reports. - if largest_outstanding_batch.max_size() >= properties.task_max_batch_size { + if largest_outstanding_batch.max_size() >= properties.effective_task_max_batch_size + { PeekMut::pop(largest_outstanding_batch); continue; } @@ -164,7 +171,8 @@ where bucket.unaggregated_reports.len(), properties.max_aggregation_job_size, ), - properties.task_max_batch_size - largest_outstanding_batch.max_size(), + properties.effective_task_max_batch_size + - largest_outstanding_batch.max_size(), ); if (desired_aggregation_job_size >= properties.min_aggregation_job_size) || (largest_outstanding_batch.max_size() < properties.task_min_batch_size @@ -205,7 +213,8 @@ where // any more. let desired_aggregation_job_size = min( properties.max_aggregation_job_size, - properties.task_max_batch_size - largest_outstanding_batch.max_size(), + properties.effective_task_max_batch_size + - largest_outstanding_batch.max_size(), ); if bucket.unaggregated_reports.len() >= desired_aggregation_job_size { Self::create_aggregation_job( @@ -236,7 +245,7 @@ where bucket.unaggregated_reports.len(), properties.max_aggregation_job_size, ), - properties.task_max_batch_size, + properties.effective_task_max_batch_size, ); if desired_aggregation_job_size >= new_batch_threshold { let batch_id = random(); diff --git a/aggregator/src/aggregator/collection_job_tests.rs b/aggregator/src/aggregator/collection_job_tests.rs index ef6c2bc00..394073013 100644 --- a/aggregator/src/aggregator/collection_job_tests.rs +++ b/aggregator/src/aggregator/collection_job_tests.rs @@ -168,7 +168,7 @@ async fn setup_fixed_size_current_batch_collection_job_test_case( let test_case = setup_collection_job_test_case( Role::Leader, QueryType::FixedSize { - max_batch_size: 10, + max_batch_size: Some(10), batch_time_window_size: None, }, ) @@ -792,7 +792,7 @@ async fn collection_job_put_idempotence_fixed_size_by_batch_id() { let test_case = setup_collection_job_test_case( Role::Leader, QueryType::FixedSize { - max_batch_size: 10, + max_batch_size: Some(10), batch_time_window_size: None, }, ) @@ -845,7 +845,7 @@ async fn collection_job_put_idempotence_fixed_size_by_batch_id_mutate_batch_id() let test_case = setup_collection_job_test_case( Role::Leader, QueryType::FixedSize { - max_batch_size: 10, + max_batch_size: Some(10), batch_time_window_size: None, }, ) @@ -914,7 +914,7 @@ async fn collection_job_put_idempotence_fixed_size_by_batch_id_mutate_aggregatio let test_case = setup_collection_job_test_case( Role::Leader, QueryType::FixedSize { - max_batch_size: 10, + max_batch_size: Some(10), batch_time_window_size: None, }, ) diff --git a/aggregator/src/aggregator/garbage_collector.rs b/aggregator/src/aggregator/garbage_collector.rs index 149b83a3b..6181c80ac 100644 --- a/aggregator/src/aggregator/garbage_collector.rs +++ b/aggregator/src/aggregator/garbage_collector.rs @@ -584,7 +584,7 @@ mod tests { Box::pin(async move { let task = TaskBuilder::new( task::QueryType::FixedSize { - max_batch_size: 10, + max_batch_size: Some(10), batch_time_window_size: None, }, VdafInstance::Fake, @@ -766,7 +766,7 @@ mod tests { Box::pin(async move { let task = TaskBuilder::new( task::QueryType::FixedSize { - max_batch_size: 10, + max_batch_size: Some(10), batch_time_window_size: None, }, VdafInstance::Fake, diff --git a/aggregator/src/aggregator/http_handlers.rs b/aggregator/src/aggregator/http_handlers.rs index d8e70a749..09256d862 100644 --- a/aggregator/src/aggregator/http_handlers.rs +++ b/aggregator/src/aggregator/http_handlers.rs @@ -2127,7 +2127,7 @@ mod tests { let task = TaskBuilder::new( QueryType::FixedSize { - max_batch_size: 100, + max_batch_size: Some(100), batch_time_window_size: None, }, VdafInstance::Fake, diff --git a/aggregator/src/aggregator/taskprov_tests.rs b/aggregator/src/aggregator/taskprov_tests.rs index 49481388e..6978392d7 100644 --- a/aggregator/src/aggregator/taskprov_tests.rs +++ b/aggregator/src/aggregator/taskprov_tests.rs @@ -167,7 +167,7 @@ impl TaskprovTestCase { let task = TaskBuilder::new( QueryType::FixedSize { - max_batch_size: max_batch_size as u64, + max_batch_size: Some(max_batch_size as u64), batch_time_window_size: None, }, vdaf_instance, diff --git a/aggregator/src/bin/janus_cli.rs b/aggregator/src/bin/janus_cli.rs index 228e0eaf4..702bf0a27 100644 --- a/aggregator/src/bin/janus_cli.rs +++ b/aggregator/src/bin/janus_cli.rs @@ -666,7 +666,7 @@ mod tests { // Construct a "new" task with a previously existing ID. let replacement_task = TaskBuilder::new( QueryType::FixedSize { - max_batch_size: 100, + max_batch_size: Some(100), batch_time_window_size: None, }, VdafInstance::Prio3CountVec { diff --git a/aggregator_api/src/tests.rs b/aggregator_api/src/tests.rs index 94e02a045..e7df6d3a7 100644 --- a/aggregator_api/src/tests.rs +++ b/aggregator_api/src/tests.rs @@ -1542,7 +1542,7 @@ fn post_task_req_serialization() { &PostTaskReq { peer_aggregator_endpoint: "https://example.com/".parse().unwrap(), query_type: QueryType::FixedSize { - max_batch_size: 999, + max_batch_size: Some(999), batch_time_window_size: None, }, vdaf: VdafInstance::Prio3CountVec { @@ -1651,7 +1651,7 @@ fn post_task_req_serialization() { &PostTaskReq { peer_aggregator_endpoint: "https://example.com/".parse().unwrap(), query_type: QueryType::FixedSize { - max_batch_size: 999, + max_batch_size: Some(999), batch_time_window_size: None, }, vdaf: VdafInstance::Prio3CountVec { @@ -1792,7 +1792,7 @@ fn task_resp_serialization() { TaskId::from([0u8; 32]), "https://helper.com/".parse().unwrap(), QueryType::FixedSize { - max_batch_size: 999, + max_batch_size: Some(999), batch_time_window_size: None, }, VdafInstance::Prio3CountVec { diff --git a/aggregator_core/src/datastore/tests.rs b/aggregator_core/src/datastore/tests.rs index 843d04e35..9f5235c6e 100644 --- a/aggregator_core/src/datastore/tests.rs +++ b/aggregator_core/src/datastore/tests.rs @@ -1054,7 +1054,7 @@ async fn count_client_reports_for_batch_id(ephemeral_datastore: EphemeralDatasto let task = TaskBuilder::new( task::QueryType::FixedSize { - max_batch_size: 10, + max_batch_size: Some(10), batch_time_window_size: None, }, VdafInstance::Fake, @@ -1065,7 +1065,7 @@ async fn count_client_reports_for_batch_id(ephemeral_datastore: EphemeralDatasto .unwrap(); let unrelated_task = TaskBuilder::new( task::QueryType::FixedSize { - max_batch_size: 10, + max_batch_size: None, batch_time_window_size: None, }, VdafInstance::Fake, @@ -1316,7 +1316,7 @@ async fn roundtrip_aggregation_job(ephemeral_datastore: EphemeralDatastore) { // serialization/deserialization roundtrip of the batch_identifier & aggregation_param. let task = TaskBuilder::new( task::QueryType::FixedSize { - max_batch_size: 10, + max_batch_size: Some(10), batch_time_window_size: None, }, VdafInstance::Fake, @@ -1921,7 +1921,7 @@ async fn get_aggregation_jobs_for_task(ephemeral_datastore: EphemeralDatastore) // serialization/deserialization roundtrip of the batch_identifier & aggregation_param. let task = TaskBuilder::new( task::QueryType::FixedSize { - max_batch_size: 10, + max_batch_size: None, batch_time_window_size: None, }, VdafInstance::Fake, @@ -1977,7 +1977,7 @@ async fn get_aggregation_jobs_for_task(ephemeral_datastore: EphemeralDatastore) // is not returned. let unrelated_task = TaskBuilder::new( task::QueryType::FixedSize { - max_batch_size: 10, + max_batch_size: None, batch_time_window_size: None, }, VdafInstance::Fake, @@ -3404,7 +3404,7 @@ async fn fixed_size_collection_job_acquire_release_happy_path( CollectionJobAcquireTestCase { task_ids: Vec::from([task_id]), query_type: task::QueryType::FixedSize { - max_batch_size: 10, + max_batch_size: Some(10), batch_time_window_size: None, }, reports, @@ -4443,7 +4443,7 @@ async fn roundtrip_batch_aggregation_fixed_size(ephemeral_datastore: EphemeralDa let task = TaskBuilder::new( task::QueryType::FixedSize { - max_batch_size: 10, + max_batch_size: Some(10), batch_time_window_size: None, }, VdafInstance::Fake, @@ -4461,7 +4461,7 @@ async fn roundtrip_batch_aggregation_fixed_size(ephemeral_datastore: EphemeralDa Box::pin(async move { let other_task = TaskBuilder::new( task::QueryType::FixedSize { - max_batch_size: 10, + max_batch_size: Some(10), batch_time_window_size: None, }, VdafInstance::Fake, @@ -4853,7 +4853,7 @@ async fn roundtrip_aggregate_share_job_fixed_size(ephemeral_datastore: Ephemeral Box::pin(async move { let task = TaskBuilder::new( task::QueryType::FixedSize { - max_batch_size: 10, + max_batch_size: None, batch_time_window_size: None, }, VdafInstance::Fake, @@ -5002,7 +5002,7 @@ async fn roundtrip_outstanding_batch(ephemeral_datastore: EphemeralDatastore) { Box::pin(async move { let task_1 = TaskBuilder::new( task::QueryType::FixedSize { - max_batch_size: 10, + max_batch_size: Some(10), batch_time_window_size: None, }, VdafInstance::Fake, @@ -5031,7 +5031,7 @@ async fn roundtrip_outstanding_batch(ephemeral_datastore: EphemeralDatastore) { let task_2 = TaskBuilder::new( task::QueryType::FixedSize { - max_batch_size: 10, + max_batch_size: Some(10), batch_time_window_size: Some(batch_time_window_size), }, VdafInstance::Fake, @@ -5341,7 +5341,7 @@ async fn roundtrip_batch(ephemeral_datastore: EphemeralDatastore) { tx.put_aggregator_task( &TaskBuilder::new( task::QueryType::FixedSize { - max_batch_size: 10, + max_batch_size: None, batch_time_window_size: None, }, VdafInstance::Fake, @@ -5657,7 +5657,7 @@ async fn delete_expired_aggregation_artifacts(ephemeral_datastore: EphemeralData .unwrap(); let leader_fixed_size_task = TaskBuilder::new( task::QueryType::FixedSize { - max_batch_size: 10, + max_batch_size: Some(10), batch_time_window_size: None, }, VdafInstance::Fake, @@ -5668,7 +5668,7 @@ async fn delete_expired_aggregation_artifacts(ephemeral_datastore: EphemeralData .unwrap(); let helper_fixed_size_task = TaskBuilder::new( task::QueryType::FixedSize { - max_batch_size: 10, + max_batch_size: Some(10), batch_time_window_size: None, }, VdafInstance::Fake, @@ -6174,7 +6174,7 @@ async fn delete_expired_collection_artifacts(ephemeral_datastore: EphemeralDatas .unwrap(); let leader_fixed_size_task = TaskBuilder::new( task::QueryType::FixedSize { - max_batch_size: 10, + max_batch_size: Some(10), batch_time_window_size: None, }, VdafInstance::Fake, @@ -6185,7 +6185,7 @@ async fn delete_expired_collection_artifacts(ephemeral_datastore: EphemeralDatas .unwrap(); let helper_fixed_size_task = TaskBuilder::new( task::QueryType::FixedSize { - max_batch_size: 10, + max_batch_size: Some(10), batch_time_window_size: None, }, VdafInstance::Fake, @@ -6196,7 +6196,7 @@ async fn delete_expired_collection_artifacts(ephemeral_datastore: EphemeralDatas .unwrap(); let leader_fixed_size_time_bucketed_task = TaskBuilder::new( task::QueryType::FixedSize { - max_batch_size: 10, + max_batch_size: Some(10), batch_time_window_size: Some(Duration::from_hours(24)?), }, VdafInstance::Fake, diff --git a/aggregator_core/src/task.rs b/aggregator_core/src/task.rs index 15446c298..2982eccbb 100644 --- a/aggregator_core/src/task.rs +++ b/aggregator_core/src/task.rs @@ -41,8 +41,10 @@ pub enum QueryType { /// latency of waiting for batch time intervals to pass, and with direct control over the number /// of reports per batch. FixedSize { - /// The maximum number of reports in a batch to allow it to be collected. - max_batch_size: u64, + /// If present, the maximum number of reports in a batch to allow it to be collected. If + /// absent, then there is no limit to the number of reports that Janus will include in a + /// batch, but we generally try to make batches close to the task's `min_batch_size`. + max_batch_size: Option, /// If present, reports will be separated into different batches by timestamp, such that /// the client timestamp interval duration will not exceed this value. The minimum and /// maximum allowed report timestamps for each batch will be multiples of this value as @@ -61,7 +63,16 @@ impl TryFrom<&taskprov::Query> for QueryType { match value { taskprov::Query::TimeInterval => Ok(Self::TimeInterval), taskprov::Query::FixedSize { max_batch_size } => Ok(Self::FixedSize { - max_batch_size: *max_batch_size as u64, + // taskprov's QueryConfig always sets a max_batch_size value (if + // query type is fixed size), but in the forthcoming draft 6, a + // value of 0 will mean "no maximum". + // + // https://github.com/wangshan/draft-wang-ppm-dap-taskprov/blob/1ddcb35830923d2a770bb737b95e19033fa44a83/draft-wang-ppm-dap-taskprov.md?plain=1#L197 + max_batch_size: if *max_batch_size == 0 { + None + } else { + Some(*max_batch_size as u64) + }, batch_time_window_size: None, }), _ => Err(Error::InvalidParameter("unknown query type")), @@ -137,7 +148,11 @@ impl CommonTaskParameters { time_precision: Duration, tolerable_clock_skew: Duration, ) -> Result { - if let QueryType::FixedSize { max_batch_size, .. } = query_type { + if let QueryType::FixedSize { + max_batch_size: Some(max_batch_size), + .. + } = query_type + { if max_batch_size < min_batch_size { return Err(Error::InvalidParameter("max_batch_size")); } @@ -346,16 +361,17 @@ impl AggregatorTask { /// Returns true if the `batch_size` is valid given this task's query type and batch size /// parameters, per - /// + /// pub fn validate_batch_size(&self, batch_size: u64) -> bool { match self.common_parameters.query_type { QueryType::TimeInterval => { - // https://www.ietf.org/archive/id/draft-ietf-ppm-dap-02.html#section-4.5.6.1.2 + // https://datatracker.ietf.org/doc/html/draft-ietf-ppm-dap-09#section-4.6.5.1.2 batch_size >= self.common_parameters.min_batch_size } QueryType::FixedSize { max_batch_size, .. } => { - // https://www.ietf.org/archive/id/draft-ietf-ppm-dap-02.html#section-4.5.6.2.2 - batch_size >= self.common_parameters.min_batch_size && batch_size <= max_batch_size + // https://datatracker.ietf.org/doc/html/draft-ietf-ppm-dap-09#section-4.6.5.2.2 + batch_size >= self.common_parameters.min_batch_size + && max_batch_size.map_or(true, |max_batch_size| batch_size <= max_batch_size) } } } @@ -1610,7 +1626,7 @@ mod tests { TaskId::from([255; 32]), "https://example.com/".parse().unwrap(), QueryType::FixedSize { - max_batch_size: 10, + max_batch_size: Some(10), batch_time_window_size: None, }, VdafInstance::Prio3CountVec { @@ -1668,6 +1684,7 @@ mod tests { len: 2, }, Token::Str("max_batch_size"), + Token::Some, Token::U64(10), Token::Str("batch_time_window_size"), Token::None, @@ -1807,7 +1824,7 @@ mod tests { ); assert_tokens( &QueryType::FixedSize { - max_batch_size: 10, + max_batch_size: Some(10), batch_time_window_size: None, }, &[ @@ -1817,6 +1834,7 @@ mod tests { len: 2, }, Token::Str("max_batch_size"), + Token::Some, Token::U64(10), Token::Str("batch_time_window_size"), Token::None, @@ -1825,7 +1843,7 @@ mod tests { ); assert_tokens( &QueryType::FixedSize { - max_batch_size: 10, + max_batch_size: Some(10), batch_time_window_size: Some(Duration::from_hours(1).unwrap()), }, &[ @@ -1835,6 +1853,7 @@ mod tests { len: 2, }, Token::Str("max_batch_size"), + Token::Some, Token::U64(10), Token::Str("batch_time_window_size"), Token::Some, @@ -1843,11 +1862,29 @@ mod tests { Token::StructVariantEnd, ], ); + assert_tokens( + &QueryType::FixedSize { + max_batch_size: None, + batch_time_window_size: None, + }, + &[ + Token::StructVariant { + name: "QueryType", + variant: "FixedSize", + len: 2, + }, + Token::Str("max_batch_size"), + Token::None, + Token::Str("batch_time_window_size"), + Token::None, + Token::StructVariantEnd, + ], + ); // Backwards compatibility cases: assert_de_tokens( &QueryType::FixedSize { - max_batch_size: 10, + max_batch_size: Some(10), batch_time_window_size: None, }, &[ @@ -1857,6 +1894,7 @@ mod tests { len: 2, }, Token::Str("max_batch_size"), + Token::Some, Token::U64(10), Token::StructVariantEnd, ], @@ -1864,14 +1902,14 @@ mod tests { assert_matches!( serde_json::from_value(json!({ "FixedSize": { "max_batch_size": 10 } })), Ok(QueryType::FixedSize { - max_batch_size: 10, + max_batch_size: Some(10), batch_time_window_size: None, }) ); assert_matches!( serde_yaml::from_str("!FixedSize { max_batch_size: 10 }"), Ok(QueryType::FixedSize { - max_batch_size: 10, + max_batch_size: Some(10), batch_time_window_size: None, }) ); diff --git a/integration_tests/tests/integration/in_cluster.rs b/integration_tests/tests/integration/in_cluster.rs index 1ff8f8fe0..4bf24c78e 100644 --- a/integration_tests/tests/integration/in_cluster.rs +++ b/integration_tests/tests/integration/in_cluster.rs @@ -428,7 +428,7 @@ impl InClusterJanusPair { min_batch_size: task.min_batch_size(), max_batch_size: match task.query_type() { QueryType::TimeInterval => None, - QueryType::FixedSize { max_batch_size, .. } => Some(*max_batch_size), + QueryType::FixedSize { max_batch_size, .. } => *max_batch_size, }, time_precision_seconds: task.time_precision().as_seconds(), collector_credential_id, @@ -545,7 +545,7 @@ async fn in_cluster_fixed_size() { let janus_pair = InClusterJanusPair::new( VdafInstance::Prio3Count, QueryType::FixedSize { - max_batch_size: 110, + max_batch_size: Some(110), batch_time_window_size: None, }, ) diff --git a/integration_tests/tests/integration/janus.rs b/integration_tests/tests/integration/janus.rs index 4cf58c45d..c5bb9e1e7 100644 --- a/integration_tests/tests/integration/janus.rs +++ b/integration_tests/tests/integration/janus.rs @@ -260,7 +260,7 @@ async fn janus_janus_fixed_size() { &container_client, VdafInstance::Prio3Count, QueryType::FixedSize { - max_batch_size: 50, + max_batch_size: Some(50), batch_time_window_size: None, }, ) @@ -285,7 +285,7 @@ async fn janus_in_process_fixed_size() { let janus_pair = JanusInProcessPair::new( VdafInstance::Prio3Count, QueryType::FixedSize { - max_batch_size: 50, + max_batch_size: Some(50), batch_time_window_size: None, }, ) diff --git a/interop_binaries/src/bin/janus_interop_aggregator.rs b/interop_binaries/src/bin/janus_interop_aggregator.rs index 005a60ede..f866bcc32 100644 --- a/interop_binaries/src/bin/janus_interop_aggregator.rs +++ b/interop_binaries/src/bin/janus_interop_aggregator.rs @@ -89,9 +89,7 @@ async fn handle_add_task( let query_type = match request.query_type { 1 => task::QueryType::TimeInterval, 2 => task::QueryType::FixedSize { - max_batch_size: request - .max_batch_size - .ok_or_else(|| anyhow::anyhow!("\"max_batch_size\" is missing"))?, + max_batch_size: request.max_batch_size, batch_time_window_size: None, }, _ => { diff --git a/interop_binaries/src/lib.rs b/interop_binaries/src/lib.rs index d9ab4b006..8cfcb31c5 100644 --- a/interop_binaries/src/lib.rs +++ b/interop_binaries/src/lib.rs @@ -270,9 +270,7 @@ impl AggregatorAddTaskRequest { pub fn from_task(task: Task, role: Role) -> Self { let (query_type, max_batch_size) = match task.query_type() { QueryType::TimeInterval => (TimeInterval::CODE as u8, None), - QueryType::FixedSize { max_batch_size, .. } => { - (FixedSize::CODE as u8, Some(*max_batch_size)) - } + QueryType::FixedSize { max_batch_size, .. } => (FixedSize::CODE as u8, *max_batch_size), }; Self { task_id: *task.id(),