diff --git a/aggregator/src/aggregator/aggregation_job_creator.rs b/aggregator/src/aggregator/aggregation_job_creator.rs index 2827d9428..7f83acb4d 100644 --- a/aggregator/src/aggregator/aggregation_job_creator.rs +++ b/aggregator/src/aggregator/aggregation_job_creator.rs @@ -72,6 +72,8 @@ pub struct AggregationJobCreator { min_aggregation_job_size: usize, /// The maximum number of client reports to include in an aggregation job. max_aggregation_job_size: usize, + /// Maximum number of reports to load at a time when creating aggregation jobs. + aggregation_job_creation_report_window: usize, } impl AggregationJobCreator { @@ -82,6 +84,7 @@ impl AggregationJobCreator { aggregation_job_creation_interval: Duration, min_aggregation_job_size: usize, max_aggregation_job_size: usize, + aggregation_job_creation_report_window: usize, ) -> AggregationJobCreator { assert!( max_aggregation_job_size > 0, @@ -94,6 +97,7 @@ impl AggregationJobCreator { aggregation_job_creation_interval, min_aggregation_job_size, max_aggregation_job_size, + aggregation_job_creation_report_window, } } @@ -545,11 +549,17 @@ impl AggregationJobCreator { let this = Arc::clone(&self); let task = Arc::clone(&task); let vdaf = Arc::clone(&vdaf); + let aggregation_job_creation_report_window = + self.aggregation_job_creation_report_window; Box::pin(async move { // Find some unaggregated client reports. let mut reports = tx - .get_unaggregated_client_reports_for_task(vdaf.as_ref(), task.id()) + .get_unaggregated_client_reports_for_task( + vdaf.as_ref(), + task.id(), + aggregation_job_creation_report_window, + ) .await?; reports.sort_by_key(|report| *report.metadata().time()); @@ -706,11 +716,17 @@ impl AggregationJobCreator { let this = Arc::clone(&self); let task = Arc::clone(&task); let vdaf = Arc::clone(&vdaf); + let aggregation_job_creation_report_window = + self.aggregation_job_creation_report_window; Box::pin(async move { // Find unaggregated client reports. let unaggregated_reports = tx - .get_unaggregated_client_reports_for_task(vdaf.as_ref(), task.id()) + .get_unaggregated_client_reports_for_task( + vdaf.as_ref(), + task.id(), + aggregation_job_creation_report_window, + ) .await?; let mut aggregation_job_writer = @@ -868,6 +884,7 @@ mod tests { AGGREGATION_JOB_CREATION_INTERVAL, 1, 100, + 5000, )); let stopper = Stopper::new(); let task_handle = task::spawn(Arc::clone(&job_creator).run(stopper.clone())); @@ -1043,6 +1060,7 @@ mod tests { Duration::from_secs(1), MIN_AGGREGATION_JOB_SIZE, MAX_AGGREGATION_JOB_SIZE, + 5000, )); Arc::clone(&job_creator) .create_aggregation_jobs_for_task(Arc::clone(&task)) @@ -1217,6 +1235,7 @@ mod tests { Duration::from_secs(1), 2, 100, + 5000, )); Arc::clone(&job_creator) .create_aggregation_jobs_for_task(Arc::clone(&task)) @@ -1429,6 +1448,7 @@ mod tests { Duration::from_secs(1), MIN_AGGREGATION_JOB_SIZE, MAX_AGGREGATION_JOB_SIZE, + 5000, )); Arc::clone(&job_creator) .create_aggregation_jobs_for_task(Arc::clone(&task)) @@ -1589,6 +1609,7 @@ mod tests { Duration::from_secs(1), MIN_AGGREGATION_JOB_SIZE, MAX_AGGREGATION_JOB_SIZE, + 5000, )); Arc::clone(&job_creator) .create_aggregation_jobs_for_task(Arc::clone(&task)) @@ -1791,6 +1812,7 @@ mod tests { Duration::from_secs(1), MIN_AGGREGATION_JOB_SIZE, MAX_AGGREGATION_JOB_SIZE, + 5000, )); Arc::clone(&job_creator) .create_aggregation_jobs_for_task(Arc::clone(&task)) @@ -1852,7 +1874,7 @@ mod tests { Box::pin(async move { let report_ids = tx - .get_unaggregated_client_reports_for_task(vdaf.as_ref(), task.id()) + .get_unaggregated_client_reports_for_task(vdaf.as_ref(), task.id(), 5000) .await .unwrap() .into_iter() @@ -1958,6 +1980,7 @@ mod tests { Duration::from_secs(1), MIN_AGGREGATION_JOB_SIZE, MAX_AGGREGATION_JOB_SIZE, + 5000, )); Arc::clone(&job_creator) .create_aggregation_jobs_for_task(Arc::clone(&task)) @@ -2220,6 +2243,7 @@ mod tests { Duration::from_secs(1), MIN_AGGREGATION_JOB_SIZE, MAX_AGGREGATION_JOB_SIZE, + 5000, )); Arc::clone(&job_creator) .create_aggregation_jobs_for_task(Arc::clone(&task)) @@ -2511,6 +2535,7 @@ mod tests { Duration::from_secs(1), MIN_AGGREGATION_JOB_SIZE, MAX_AGGREGATION_JOB_SIZE, + 5000, )); Arc::clone(&job_creator) .create_aggregation_jobs_for_task(Arc::clone(&task)) @@ -2768,6 +2793,7 @@ mod tests { Duration::from_secs(1), MIN_AGGREGATION_JOB_SIZE, MAX_AGGREGATION_JOB_SIZE, + 5000, )); Arc::clone(&job_creator) .create_aggregation_jobs_for_task(Arc::clone(&task)) diff --git a/aggregator/src/binaries/aggregation_job_creator.rs b/aggregator/src/binaries/aggregation_job_creator.rs index 01aeac1b5..1306257cf 100644 --- a/aggregator/src/binaries/aggregation_job_creator.rs +++ b/aggregator/src/binaries/aggregation_job_creator.rs @@ -19,6 +19,7 @@ pub async fn main_callback(ctx: BinaryContext) -> Re Duration::from_secs(ctx.config.aggregation_job_creation_interval_secs), ctx.config.min_aggregation_job_size, ctx.config.max_aggregation_job_size, + ctx.config.aggregation_job_creation_report_window, )); aggregation_job_creator.run(ctx.stopper).await; @@ -78,6 +79,13 @@ pub struct Config { pub min_aggregation_job_size: usize, /// The maximum number of client reports to include in an aggregation job. pub max_aggregation_job_size: usize, + /// Maximum number of reports to load at a time when creating aggregation jobs. + #[serde(default = "default_aggregation_job_creation_report_window")] + pub aggregation_job_creation_report_window: usize, +} + +fn default_aggregation_job_creation_report_window() -> usize { + 5000 } impl BinaryConfig for Config { @@ -119,6 +127,7 @@ mod tests { aggregation_job_creation_interval_secs: 60, min_aggregation_job_size: 100, max_aggregation_job_size: 500, + aggregation_job_creation_report_window: 5000, }) } diff --git a/aggregator/tests/integration/graceful_shutdown.rs b/aggregator/tests/integration/graceful_shutdown.rs index 40d7c6673..2a8320a03 100644 --- a/aggregator/tests/integration/graceful_shutdown.rs +++ b/aggregator/tests/integration/graceful_shutdown.rs @@ -291,6 +291,7 @@ async fn aggregation_job_creator_shutdown() { aggregation_job_creation_interval_secs: 60, min_aggregation_job_size: 100, max_aggregation_job_size: 100, + aggregation_job_creation_report_window: 5000, }; graceful_shutdown(trycmd::cargo::cargo_bin!("aggregation_job_creator"), config).await; diff --git a/aggregator_core/src/datastore.rs b/aggregator_core/src/datastore.rs index 5b57b1b35..fd020cec8 100644 --- a/aggregator_core/src/datastore.rs +++ b/aggregator_core/src/datastore.rs @@ -1148,12 +1148,12 @@ impl Transaction<'_, C> { &self, vdaf: &A, task_id: &TaskId, + limit: usize, ) -> Result>, Error> where A::InputShare: PartialEq, A::PublicShare: PartialEq, { - // TODO(#269): allow the number of returned results to be controlled? let stmt = self .prepare_cached( "WITH unaggregated_reports AS ( @@ -1164,7 +1164,7 @@ impl Transaction<'_, C> { AND client_reports.client_timestamp >= COALESCE($2::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP) ORDER BY client_timestamp DESC FOR UPDATE OF client_reports SKIP LOCKED - LIMIT 5000 + LIMIT $5::BIGINT ) UPDATE client_reports SET aggregation_started = TRUE, updated_at = $3, updated_by = $4 @@ -1181,6 +1181,7 @@ impl Transaction<'_, C> { /* now */ &self.clock.now().as_naive_date_time()?, /* updated_at */ &self.clock.now().as_naive_date_time()?, /* updated_by */ &self.name, + /* limit */ &i64::try_from(limit)?, ], ) .await?; diff --git a/aggregator_core/src/datastore/tests.rs b/aggregator_core/src/datastore/tests.rs index 8253cf39f..4fcd1b32d 100644 --- a/aggregator_core/src/datastore/tests.rs +++ b/aggregator_core/src/datastore/tests.rs @@ -620,7 +620,11 @@ async fn get_unaggregated_client_reports_for_task(ephemeral_datastore: Ephemeral .unwrap()); Ok(tx - .get_unaggregated_client_reports_for_task(&dummy_vdaf::Vdaf::new(), task.id()) + .get_unaggregated_client_reports_for_task( + &dummy_vdaf::Vdaf::new(), + task.id(), + 5000, + ) .await .unwrap()) }) @@ -649,7 +653,11 @@ async fn get_unaggregated_client_reports_for_task(ephemeral_datastore: Ephemeral .unwrap()); Ok(tx - .get_unaggregated_client_reports_for_task(&dummy_vdaf::Vdaf::new(), task.id()) + .get_unaggregated_client_reports_for_task( + &dummy_vdaf::Vdaf::new(), + task.id(), + 5000, + ) .await .unwrap()) }) @@ -682,7 +690,11 @@ async fn get_unaggregated_client_reports_for_task(ephemeral_datastore: Ephemeral ); Ok(tx - .get_unaggregated_client_reports_for_task(&dummy_vdaf::Vdaf::new(), task.id()) + .get_unaggregated_client_reports_for_task( + &dummy_vdaf::Vdaf::new(), + task.id(), + 5000, + ) .await .unwrap()) }) diff --git a/docs/samples/advanced_config/aggregation_job_creator.yaml b/docs/samples/advanced_config/aggregation_job_creator.yaml index a21bf4a0d..337f1f527 100644 --- a/docs/samples/advanced_config/aggregation_job_creator.yaml +++ b/docs/samples/advanced_config/aggregation_job_creator.yaml @@ -81,3 +81,7 @@ min_aggregation_job_size: 10 # Maximum aggregation job size, in reports. (required) max_aggregation_job_size: 100 + +# Maximum number of reports to load at a time when creating aggregation jobs. +# (optional, defaults to 5000) +aggregation_job_creation_report_window: 5000 diff --git a/integration_tests/src/janus.rs b/integration_tests/src/janus.rs index 38c32258f..b073864e4 100644 --- a/integration_tests/src/janus.rs +++ b/integration_tests/src/janus.rs @@ -171,6 +171,7 @@ impl JanusInProcess { aggregation_job_creation_interval_secs: 1, min_aggregation_job_size: 1, max_aggregation_job_size: 100, + aggregation_job_creation_report_window: 5000, }; let aggregation_job_driver_options = AggregationJobDriverOptions { common: common_binary_options.clone(),