Skip to content

Commit

Permalink
Configurable LIMIT in unaggregated reports query (#2690)
Browse files Browse the repository at this point in the history
  • Loading branch information
divergentdave authored Feb 16, 2024
1 parent 735a030 commit 68bee62
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 8 deletions.
32 changes: 29 additions & 3 deletions aggregator/src/aggregator/aggregation_job_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ pub struct AggregationJobCreator<C: Clock> {
min_aggregation_job_size: usize,
/// The maximum number of client reports to include in an aggregation job.
max_aggregation_job_size: usize,
/// Maximum number of reports to load at a time when creating aggregation jobs.
aggregation_job_creation_report_window: usize,
}

impl<C: Clock + 'static> AggregationJobCreator<C> {
Expand All @@ -82,6 +84,7 @@ impl<C: Clock + 'static> AggregationJobCreator<C> {
aggregation_job_creation_interval: Duration,
min_aggregation_job_size: usize,
max_aggregation_job_size: usize,
aggregation_job_creation_report_window: usize,
) -> AggregationJobCreator<C> {
assert!(
max_aggregation_job_size > 0,
Expand All @@ -94,6 +97,7 @@ impl<C: Clock + 'static> AggregationJobCreator<C> {
aggregation_job_creation_interval,
min_aggregation_job_size,
max_aggregation_job_size,
aggregation_job_creation_report_window,
}
}

Expand Down Expand Up @@ -545,11 +549,17 @@ impl<C: Clock + 'static> AggregationJobCreator<C> {
let this = Arc::clone(&self);
let task = Arc::clone(&task);
let vdaf = Arc::clone(&vdaf);
let aggregation_job_creation_report_window =
self.aggregation_job_creation_report_window;

Box::pin(async move {
// Find some unaggregated client reports.
let mut reports = tx
.get_unaggregated_client_reports_for_task(vdaf.as_ref(), task.id())
.get_unaggregated_client_reports_for_task(
vdaf.as_ref(),
task.id(),
aggregation_job_creation_report_window,
)
.await?;
reports.sort_by_key(|report| *report.metadata().time());

Expand Down Expand Up @@ -706,11 +716,17 @@ impl<C: Clock + 'static> AggregationJobCreator<C> {
let this = Arc::clone(&self);
let task = Arc::clone(&task);
let vdaf = Arc::clone(&vdaf);
let aggregation_job_creation_report_window =
self.aggregation_job_creation_report_window;

Box::pin(async move {
// Find unaggregated client reports.
let unaggregated_reports = tx
.get_unaggregated_client_reports_for_task(vdaf.as_ref(), task.id())
.get_unaggregated_client_reports_for_task(
vdaf.as_ref(),
task.id(),
aggregation_job_creation_report_window,
)
.await?;

let mut aggregation_job_writer =
Expand Down Expand Up @@ -868,6 +884,7 @@ mod tests {
AGGREGATION_JOB_CREATION_INTERVAL,
1,
100,
5000,
));
let stopper = Stopper::new();
let task_handle = task::spawn(Arc::clone(&job_creator).run(stopper.clone()));
Expand Down Expand Up @@ -1043,6 +1060,7 @@ mod tests {
Duration::from_secs(1),
MIN_AGGREGATION_JOB_SIZE,
MAX_AGGREGATION_JOB_SIZE,
5000,
));
Arc::clone(&job_creator)
.create_aggregation_jobs_for_task(Arc::clone(&task))
Expand Down Expand Up @@ -1217,6 +1235,7 @@ mod tests {
Duration::from_secs(1),
2,
100,
5000,
));
Arc::clone(&job_creator)
.create_aggregation_jobs_for_task(Arc::clone(&task))
Expand Down Expand Up @@ -1429,6 +1448,7 @@ mod tests {
Duration::from_secs(1),
MIN_AGGREGATION_JOB_SIZE,
MAX_AGGREGATION_JOB_SIZE,
5000,
));
Arc::clone(&job_creator)
.create_aggregation_jobs_for_task(Arc::clone(&task))
Expand Down Expand Up @@ -1589,6 +1609,7 @@ mod tests {
Duration::from_secs(1),
MIN_AGGREGATION_JOB_SIZE,
MAX_AGGREGATION_JOB_SIZE,
5000,
));
Arc::clone(&job_creator)
.create_aggregation_jobs_for_task(Arc::clone(&task))
Expand Down Expand Up @@ -1791,6 +1812,7 @@ mod tests {
Duration::from_secs(1),
MIN_AGGREGATION_JOB_SIZE,
MAX_AGGREGATION_JOB_SIZE,
5000,
));
Arc::clone(&job_creator)
.create_aggregation_jobs_for_task(Arc::clone(&task))
Expand Down Expand Up @@ -1852,7 +1874,7 @@ mod tests {

Box::pin(async move {
let report_ids = tx
.get_unaggregated_client_reports_for_task(vdaf.as_ref(), task.id())
.get_unaggregated_client_reports_for_task(vdaf.as_ref(), task.id(), 5000)
.await
.unwrap()
.into_iter()
Expand Down Expand Up @@ -1958,6 +1980,7 @@ mod tests {
Duration::from_secs(1),
MIN_AGGREGATION_JOB_SIZE,
MAX_AGGREGATION_JOB_SIZE,
5000,
));
Arc::clone(&job_creator)
.create_aggregation_jobs_for_task(Arc::clone(&task))
Expand Down Expand Up @@ -2220,6 +2243,7 @@ mod tests {
Duration::from_secs(1),
MIN_AGGREGATION_JOB_SIZE,
MAX_AGGREGATION_JOB_SIZE,
5000,
));
Arc::clone(&job_creator)
.create_aggregation_jobs_for_task(Arc::clone(&task))
Expand Down Expand Up @@ -2511,6 +2535,7 @@ mod tests {
Duration::from_secs(1),
MIN_AGGREGATION_JOB_SIZE,
MAX_AGGREGATION_JOB_SIZE,
5000,
));
Arc::clone(&job_creator)
.create_aggregation_jobs_for_task(Arc::clone(&task))
Expand Down Expand Up @@ -2768,6 +2793,7 @@ mod tests {
Duration::from_secs(1),
MIN_AGGREGATION_JOB_SIZE,
MAX_AGGREGATION_JOB_SIZE,
5000,
));
Arc::clone(&job_creator)
.create_aggregation_jobs_for_task(Arc::clone(&task))
Expand Down
9 changes: 9 additions & 0 deletions aggregator/src/binaries/aggregation_job_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub async fn main_callback(ctx: BinaryContext<RealClock, Options, Config>) -> Re
Duration::from_secs(ctx.config.aggregation_job_creation_interval_secs),
ctx.config.min_aggregation_job_size,
ctx.config.max_aggregation_job_size,
ctx.config.aggregation_job_creation_report_window,
));
aggregation_job_creator.run(ctx.stopper).await;

Expand Down Expand Up @@ -78,6 +79,13 @@ pub struct Config {
pub min_aggregation_job_size: usize,
/// The maximum number of client reports to include in an aggregation job.
pub max_aggregation_job_size: usize,
/// Maximum number of reports to load at a time when creating aggregation jobs.
#[serde(default = "default_aggregation_job_creation_report_window")]
pub aggregation_job_creation_report_window: usize,
}

fn default_aggregation_job_creation_report_window() -> usize {
5000
}

impl BinaryConfig for Config {
Expand Down Expand Up @@ -119,6 +127,7 @@ mod tests {
aggregation_job_creation_interval_secs: 60,
min_aggregation_job_size: 100,
max_aggregation_job_size: 500,
aggregation_job_creation_report_window: 5000,
})
}

Expand Down
1 change: 1 addition & 0 deletions aggregator/tests/integration/graceful_shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ async fn aggregation_job_creator_shutdown() {
aggregation_job_creation_interval_secs: 60,
min_aggregation_job_size: 100,
max_aggregation_job_size: 100,
aggregation_job_creation_report_window: 5000,
};

graceful_shutdown(trycmd::cargo::cargo_bin!("aggregation_job_creator"), config).await;
Expand Down
5 changes: 3 additions & 2 deletions aggregator_core/src/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1148,12 +1148,12 @@ impl<C: Clock> Transaction<'_, C> {
&self,
vdaf: &A,
task_id: &TaskId,
limit: usize,
) -> Result<Vec<LeaderStoredReport<SEED_SIZE, A>>, Error>
where
A::InputShare: PartialEq,
A::PublicShare: PartialEq,
{
// TODO(#269): allow the number of returned results to be controlled?
let stmt = self
.prepare_cached(
"WITH unaggregated_reports AS (
Expand All @@ -1164,7 +1164,7 @@ impl<C: Clock> Transaction<'_, C> {
AND client_reports.client_timestamp >= COALESCE($2::TIMESTAMP - tasks.report_expiry_age * '1 second'::INTERVAL, '-infinity'::TIMESTAMP)
ORDER BY client_timestamp DESC
FOR UPDATE OF client_reports SKIP LOCKED
LIMIT 5000
LIMIT $5::BIGINT
)
UPDATE client_reports SET
aggregation_started = TRUE, updated_at = $3, updated_by = $4
Expand All @@ -1181,6 +1181,7 @@ impl<C: Clock> Transaction<'_, C> {
/* now */ &self.clock.now().as_naive_date_time()?,
/* updated_at */ &self.clock.now().as_naive_date_time()?,
/* updated_by */ &self.name,
/* limit */ &i64::try_from(limit)?,
],
)
.await?;
Expand Down
18 changes: 15 additions & 3 deletions aggregator_core/src/datastore/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,11 @@ async fn get_unaggregated_client_reports_for_task(ephemeral_datastore: Ephemeral
.unwrap());

Ok(tx
.get_unaggregated_client_reports_for_task(&dummy_vdaf::Vdaf::new(), task.id())
.get_unaggregated_client_reports_for_task(
&dummy_vdaf::Vdaf::new(),
task.id(),
5000,
)
.await
.unwrap())
})
Expand Down Expand Up @@ -649,7 +653,11 @@ async fn get_unaggregated_client_reports_for_task(ephemeral_datastore: Ephemeral
.unwrap());

Ok(tx
.get_unaggregated_client_reports_for_task(&dummy_vdaf::Vdaf::new(), task.id())
.get_unaggregated_client_reports_for_task(
&dummy_vdaf::Vdaf::new(),
task.id(),
5000,
)
.await
.unwrap())
})
Expand Down Expand Up @@ -682,7 +690,11 @@ async fn get_unaggregated_client_reports_for_task(ephemeral_datastore: Ephemeral
);

Ok(tx
.get_unaggregated_client_reports_for_task(&dummy_vdaf::Vdaf::new(), task.id())
.get_unaggregated_client_reports_for_task(
&dummy_vdaf::Vdaf::new(),
task.id(),
5000,
)
.await
.unwrap())
})
Expand Down
4 changes: 4 additions & 0 deletions docs/samples/advanced_config/aggregation_job_creator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,7 @@ min_aggregation_job_size: 10

# Maximum aggregation job size, in reports. (required)
max_aggregation_job_size: 100

# Maximum number of reports to load at a time when creating aggregation jobs.
# (optional, defaults to 5000)
aggregation_job_creation_report_window: 5000
1 change: 1 addition & 0 deletions integration_tests/src/janus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ impl JanusInProcess {
aggregation_job_creation_interval_secs: 1,
min_aggregation_job_size: 1,
max_aggregation_job_size: 100,
aggregation_job_creation_report_window: 5000,
};
let aggregation_job_driver_options = AggregationJobDriverOptions {
common: common_binary_options.clone(),
Expand Down

0 comments on commit 68bee62

Please sign in to comment.