Skip to content

Commit

Permalink
Copy report shares within database when creating aggregation jobs (#2750
Browse files Browse the repository at this point in the history
)
  • Loading branch information
divergentdave authored Feb 28, 2024
1 parent 7bf93ad commit 3d9e71f
Show file tree
Hide file tree
Showing 7 changed files with 1,185 additions and 424 deletions.
52 changes: 33 additions & 19 deletions aggregator/src/aggregator/aggregation_job_creator.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::aggregator::aggregation_job_writer::AggregationJobWriter;
use crate::aggregator::aggregation_job_writer::NewAggregationJobWriter;
#[cfg(feature = "fpvec_bounded_l2")]
use fixed::{
types::extra::{U15, U31},
Expand All @@ -7,8 +7,14 @@ use fixed::{
use futures::future::try_join_all;
use itertools::Itertools as _;
use janus_aggregator_core::{
datastore::models::{AggregationJob, AggregationJobState},
datastore::{self, Datastore},
datastore::{
self,
models::{
AggregationJob, AggregationJobState, ReportAggregationMetadata,
ReportAggregationMetadataState,
},
Datastore,
},
task::{self, AggregatorTask},
};
#[cfg(feature = "fpvec_bounded_l2")]
Expand Down Expand Up @@ -545,24 +551,24 @@ impl<C: Clock + 'static> AggregationJobCreator<C> {
aggregation_job_creation_report_window,
)
.await?;
reports.sort_by_key(|report| *report.metadata().time());
reports.sort_by_key(|report_metadata| *report_metadata.time());

// Generate aggregation jobs & report aggregations based on the reports we read.
// We attempt to generate reports from touching a minimal number of batches by
// generating as many aggregation jobs in the allowed size range for each batch
// before considering using reports from the next batch.
let mut aggregation_job_writer = AggregationJobWriter::new(Arc::clone(&task));
let mut aggregation_job_writer =
NewAggregationJobWriter::new(Arc::clone(&task));
let mut report_ids_to_scrub = HashSet::new();
let mut outstanding_reports = Vec::new();
{
// We have to place `reports_by_batch` in this block, as some of its
// internal types are not Send/Sync & thus cannot be held across an await
// point.
let reports_by_batch = reports.into_iter().group_by(|report| {
let reports_by_batch = reports.into_iter().group_by(|report_metadata| {
// Unwrap safety: task.time_precision() is nonzero, so
// `to_batch_interval_start` will never return an error.
report
.metadata()
report_metadata
.time()
.to_batch_interval_start(task.time_precision())
.unwrap()
Expand Down Expand Up @@ -608,12 +614,12 @@ impl<C: Clock + 'static> AggregationJobCreator<C> {

let min_client_timestamp = agg_job_reports
.iter()
.map(|report| report.metadata().time())
.map(|report_metadata| report_metadata.time())
.min()
.unwrap(); // unwrap safety: agg_job_reports is non-empty
let max_client_timestamp = agg_job_reports
.iter()
.map(|report| report.metadata().time())
.map(|report_metadata| report_metadata.time())
.max()
.unwrap(); // unwrap safety: agg_job_reports is non-empty
let client_timestamp_interval = Interval::new(
Expand All @@ -636,31 +642,39 @@ impl<C: Clock + 'static> AggregationJobCreator<C> {
let report_aggregations = agg_job_reports
.iter()
.enumerate()
.map(|(ord, report)| {
Ok(report.as_start_leader_report_aggregation(
.map(|(ord, report_metadata)| {
Ok(ReportAggregationMetadata::new(
*task.id(),
aggregation_job_id,
*report_metadata.id(),
*report_metadata.time(),
ord.try_into()?,
ReportAggregationMetadataState::Start,
))
})
.collect::<Result<_, datastore::Error>>()?;
report_ids_to_scrub.extend(
agg_job_reports.iter().map(|report| *report.metadata().id()),
agg_job_reports
.iter()
.map(|report_metadata| *report_metadata.id()),
);

aggregation_job_writer.put(aggregation_job, report_aggregations)?;
}
}

// Write the aggregation jobs & report aggregations we created.
// Write the aggregation jobs and report aggregations we created.
aggregation_job_writer.write(tx, vdaf).await?;
// Report scrubbing must wait until after report aggregations have been created,
// because they have a write-after-read antidependency on the report shares.
try_join!(
aggregation_job_writer.write(tx, vdaf),
try_join_all(
report_ids_to_scrub
.iter()
.map(|report_id| tx.scrub_client_report(task.id(), report_id))
),
try_join_all(outstanding_reports.iter().map(|report| {
tx.mark_report_unaggregated(task.id(), report.metadata().id())
try_join_all(outstanding_reports.iter().map(|report_metadata| {
tx.mark_report_unaggregated(task.id(), report_metadata.id())
})),
)?;

Expand Down Expand Up @@ -714,7 +728,7 @@ impl<C: Clock + 'static> AggregationJobCreator<C> {
.await?;

let mut aggregation_job_writer =
AggregationJobWriter::<SEED_SIZE, FixedSize, A>::new(Arc::clone(&task));
NewAggregationJobWriter::<SEED_SIZE, FixedSize, A>::new(Arc::clone(&task));
let mut batch_creator = BatchCreator::new(
this.min_aggregation_job_size,
this.max_aggregation_job_size,
Expand Down Expand Up @@ -1862,7 +1876,7 @@ mod tests {
.await
.unwrap()
.into_iter()
.map(|report| *report.metadata().id())
.map(|report_metadata| *report_metadata.id())
.collect::<Vec<_>>();

try_join_all(
Expand Down
7 changes: 4 additions & 3 deletions aggregator/src/aggregator/aggregation_job_driver.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::{error::handle_ping_pong_error, Error};
use crate::aggregator::{
accumulator::Accumulator, aggregate_step_failure_counter,
aggregation_job_writer::AggregationJobWriter, http_handlers::AGGREGATION_JOB_ROUTE,
aggregation_job_writer::UpdatedAggregationJobWriter, http_handlers::AGGREGATION_JOB_ROUTE,
query_type::CollectableQueryType, send_request_to_helper, RequestBody,
};
use anyhow::{anyhow, Result};
Expand Down Expand Up @@ -757,7 +757,7 @@ where
}

// Write everything back to storage.
let mut aggregation_job_writer = AggregationJobWriter::new(Arc::clone(&task));
let mut aggregation_job_writer = UpdatedAggregationJobWriter::new(Arc::clone(&task));
let new_step = aggregation_job.step().increment();
aggregation_job_writer.update(
aggregation_job.with_step(new_step),
Expand Down Expand Up @@ -894,7 +894,8 @@ where
)
.await?;

let mut aggregation_job_writer = AggregationJobWriter::new(Arc::new(task));
let mut aggregation_job_writer =
UpdatedAggregationJobWriter::new(Arc::new(task));
aggregation_job_writer.update(aggregation_job, report_aggregations)?;

try_join!(
Expand Down
Loading

0 comments on commit 3d9e71f

Please sign in to comment.