Skip to content

Commit

Permalink
Scrub batch aggregations once we are done with them. (#2694)
Browse files Browse the repository at this point in the history
Specifically, we scrub batch aggregations whose aggregate shares have
been rolled into collection jobs (for the Leader) or aggregate share
jobs (for the Helper).

To make this work out, the Leader also has to be able to borrow
collection job results from a duplicate collection job. (The Helper was
already borrowing results from identical aggregate share jobs.)

Note that we can't simply delete the relevant batch aggregations: doing
so might make us miss a contended write by a concurrent aggregation job
step, which is required to ensure that aggregation jobs can't
concurrently update the batch aggregations. (At the Repeatable Read
isolation level, write contention can only happen on extant rows.)
  • Loading branch information
branlwyd authored Feb 21, 2024
1 parent 59377c9 commit 3a882aa
Show file tree
Hide file tree
Showing 13 changed files with 1,041 additions and 540 deletions.
205 changes: 95 additions & 110 deletions aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3121,7 +3121,7 @@ impl VdafOps {
async fn handle_aggregate_share_generic<
const SEED_SIZE: usize,
Q: CollectableQueryType,
S: DifferentialPrivacyStrategy + Send + Clone + Sync + 'static,
S: DifferentialPrivacyStrategy + Send + Clone + Send + Sync + 'static,
A: vdaf::AggregatorWithNoise<SEED_SIZE, 16, S> + Send + Sync + 'static,
C: Clock,
>(
Expand All @@ -3137,7 +3137,6 @@ impl VdafOps {
where
A::AggregationParam: Send + Sync + Eq + Hash,
A::AggregateShare: Send + Sync,
S: Send + Sync,
{
// Decode request, and verify that it is for the current task. We use an assert to check
// that the task IDs match as this should be guaranteed by the caller.
Expand Down Expand Up @@ -3187,7 +3186,7 @@ impl VdafOps {
let aggregation_param = A::AggregationParam::get_decoded(
aggregate_share_req.aggregation_parameter(),
)?;
let aggregate_share_job = match tx
if let Some(aggregate_share_job) = tx
.get_aggregate_share_job(
vdaf.as_ref(),
task.id(),
Expand All @@ -3196,119 +3195,104 @@ impl VdafOps {
)
.await?
{
Some(aggregate_share_job) => {
debug!(
?aggregate_share_req,
"Serving cached aggregate share job result"
);
aggregate_share_job
}
None => {
debug!(
?aggregate_share_req,
"Cache miss, computing aggregate share job result"
);
let aggregation_param = A::AggregationParam::get_decoded(
aggregate_share_req.aggregation_parameter(),
)?;
let (batch_aggregations, _) = try_join!(
Q::get_batch_aggregations_for_collection_identifier(
tx,
&task,
vdaf.as_ref(),
aggregate_share_req.batch_selector().batch_identifier(),
&aggregation_param
),
Q::validate_query_count::<SEED_SIZE, C, A>(
tx,
vdaf.as_ref(),
&task,
aggregate_share_req.batch_selector().batch_identifier(),
&aggregation_param,
)
)?;
debug!(
?aggregate_share_req,
"Serving cached aggregate share job result"
);
return Ok(aggregate_share_job);
}

// To ensure that concurrent aggregations don't write into a
// currently-nonexistent batch aggregation, we write (empty) batch
// aggregations for any that have not already been written to storage.
let empty_batch_aggregations = empty_batch_aggregations(
&task,
batch_aggregation_shard_count,
aggregate_share_req.batch_selector().batch_identifier(),
&aggregation_param,
&batch_aggregations,
);
// This is a new aggregate share request, compute & validate the response.
debug!(
?aggregate_share_req,
"Cache miss, computing aggregate share job result"
);
let aggregation_param = A::AggregationParam::get_decoded(
aggregate_share_req.aggregation_parameter(),
)?;
let (batch_aggregations, _) = try_join!(
Q::get_batch_aggregations_for_collection_identifier(
tx,
&task,
vdaf.as_ref(),
aggregate_share_req.batch_selector().batch_identifier(),
&aggregation_param
),
Q::validate_query_count::<SEED_SIZE, C, A>(
tx,
vdaf.as_ref(),
&task,
aggregate_share_req.batch_selector().batch_identifier(),
&aggregation_param,
)
)?;

let (mut helper_aggregate_share, report_count, checksum) =
compute_aggregate_share::<SEED_SIZE, Q, A>(
&task,
&batch_aggregations,
)
.await
.map_err(|e| datastore::Error::User(e.into()))?;

vdaf.add_noise_to_agg_share(
&dp_strategy,
&aggregation_param,
&mut helper_aggregate_share,
report_count.try_into()?,
)
.map_err(|e| datastore::Error::User(e.into()))?;
// To ensure that concurrent aggregations don't write into a
// currently-nonexistent batch aggregation, we write (empty) batch
// aggregations for any that have not already been written to storage.
let empty_batch_aggregations = empty_batch_aggregations(
&task,
batch_aggregation_shard_count,
aggregate_share_req.batch_selector().batch_identifier(),
&aggregation_param,
&batch_aggregations,
);

// Now that we are satisfied that the request is serviceable, we consume
// a query by recording the aggregate share request parameters and the
// result.
let aggregate_share_job = AggregateShareJob::<SEED_SIZE, Q, A>::new(
*task.id(),
aggregate_share_req
.batch_selector()
.batch_identifier()
.clone(),
aggregation_param,
helper_aggregate_share,
report_count,
checksum,
);
try_join!(
tx.put_aggregate_share_job(&aggregate_share_job),
try_join_all(batch_aggregations.into_iter().map(|ba| async move {
tx.update_batch_aggregation(
&ba.with_state(BatchAggregationState::Collected),
)
.await
})),
try_join_all(
empty_batch_aggregations
.iter()
.map(|ba| tx.put_batch_aggregation(ba))
),
)?;
aggregate_share_job
}
};
let (mut helper_aggregate_share, report_count, checksum) =
compute_aggregate_share::<SEED_SIZE, Q, A>(&task, &batch_aggregations)
.await
.map_err(|e| datastore::Error::User(e.into()))?;

// §4.4.4.3: verify total report count and the checksum we computed against
// those reported by the leader.
if aggregate_share_job.report_count() != aggregate_share_req.report_count()
|| aggregate_share_job.checksum() != aggregate_share_req.checksum()
{
return Err(datastore::Error::User(
Error::BatchMismatch(Box::new(BatchMismatch {
task_id: *task.id(),
own_checksum: *aggregate_share_job.checksum(),
own_report_count: aggregate_share_job.report_count(),
peer_checksum: *aggregate_share_req.checksum(),
peer_report_count: aggregate_share_req.report_count(),
}))
.into(),
));
}
vdaf.add_noise_to_agg_share(
&dp_strategy,
&aggregation_param,
&mut helper_aggregate_share,
report_count.try_into()?,
)
.map_err(|e| datastore::Error::User(e.into()))?;

// Now that we are satisfied that the request is serviceable, we consume
// a query by recording the aggregate share request parameters and the
// result.
let aggregate_share_job = AggregateShareJob::<SEED_SIZE, Q, A>::new(
*task.id(),
aggregate_share_req
.batch_selector()
.batch_identifier()
.clone(),
aggregation_param,
helper_aggregate_share,
report_count,
checksum,
);
try_join!(
tx.put_aggregate_share_job(&aggregate_share_job),
try_join_all(batch_aggregations.into_iter().map(|ba| async move {
tx.update_batch_aggregation(&ba.scrubbed()).await
})),
try_join_all(empty_batch_aggregations.into_iter().map(|ba| async move {
tx.put_batch_aggregation(&ba.scrubbed()).await
}))
)?;
Ok(aggregate_share_job)
})
})
.await?;

// §4.4.4.3: Verify total report count and the checksum we computed against those reported
// by the leader.
if aggregate_share_job.report_count() != aggregate_share_req.report_count()
|| aggregate_share_job.checksum() != aggregate_share_req.checksum()
{
return Err(Error::BatchMismatch(Box::new(BatchMismatch {
task_id: *task.id(),
own_checksum: *aggregate_share_job.checksum(),
own_report_count: aggregate_share_job.report_count(),
peer_checksum: *aggregate_share_req.checksum(),
peer_report_count: aggregate_share_req.report_count(),
})));
}

// §4.4.4.3: HPKE encrypt aggregate share to the collector. We store *unencrypted* aggregate
// shares in the datastore so that we can encrypt cached results to the collector HPKE
// config valid when the current AggregateShareReq was made, and not whatever was valid at
Expand Down Expand Up @@ -3362,10 +3346,11 @@ fn empty_batch_aggregations<
batch_identifier,
aggregation_param.clone(),
ord,
BatchAggregationState::Collected,
None,
0,
ReportIdChecksum::default(),
BatchAggregationState::Collected {
aggregate_share: None,
report_count: 0,
checksum: ReportIdChecksum::default(),
},
))
} else {
None
Expand Down
9 changes: 5 additions & 4 deletions aggregator/src/aggregator/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,11 @@ impl<const SEED_SIZE: usize, Q: AccumulableQueryType, A: vdaf::Aggregator<SEED_S
batch_identifier.clone(),
self.aggregation_parameter.clone(),
thread_rng().gen_range(0..self.shard_count),
BatchAggregationState::Aggregating,
Some(A::AggregateShare::from(output_share.clone())),
1,
ReportIdChecksum::for_report_id(report_id),
BatchAggregationState::Aggregating {
aggregate_share: Some(A::AggregateShare::from(output_share.clone())),
report_count: 1,
checksum: ReportIdChecksum::for_report_id(report_id),
},
)
};

Expand Down
45 changes: 34 additions & 11 deletions aggregator/src/aggregator/aggregate_share.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
//! Implements functionality for computing & validating aggregate shares.
use super::Error;
use janus_aggregator_core::{datastore::models::BatchAggregation, task::AggregatorTask};
use janus_aggregator_core::{
datastore::{
self,
models::{BatchAggregation, BatchAggregationState},
},
task::AggregatorTask,
};
use janus_core::report_id::ReportIdChecksumExt;
use janus_messages::{query_type::QueryType, ReportIdChecksum};
use prio::vdaf::{self, Aggregatable};

/// Computes the aggregate share over the provided batch aggregations.
/// The assumption is that all aggregation jobs contributing to those batch aggregations have
/// been driven to completion, and that the query count requirements have been validated for the
/// included batches.
///
/// The assumption is that all aggregation jobs contributing to those batch aggregations have been
/// driven to completion, and that the query count requirements have been validated for the included
/// batches.
#[tracing::instrument(skip(task, batch_aggregations), fields(task_id = ?task.id()), err)]
pub(crate) async fn compute_aggregate_share<
const SEED_SIZE: usize,
Expand All @@ -35,8 +42,8 @@ pub(crate) async fn compute_aggregate_share<
// unfinished aggregation jobs were intentionally abandoned by the leader (see issue #104 for
// more discussion).
//
// On the leader side, we know/assume that we would not be stepping a collection job unless we had
// verified that the constituent aggregation jobs were finished.
// On the leader side, we know/assume that we would not be stepping a collection job unless we
// had verified that the constituent aggregation jobs were finished.
//
// In either case, we go ahead and service the aggregate share request with whatever batch
// aggregations are available now.
Expand All @@ -45,22 +52,38 @@ pub(crate) async fn compute_aggregate_share<
let mut total_aggregate_share: Option<A::AggregateShare> = None;

for batch_aggregation in batch_aggregations {
let (aggregate_share, report_count, checksum) = match batch_aggregation.state() {
BatchAggregationState::Aggregating {
aggregate_share,
report_count,
checksum,
} => (aggregate_share, report_count, checksum),
BatchAggregationState::Collected {
aggregate_share,
report_count,
checksum,
} => (aggregate_share, report_count, checksum),
BatchAggregationState::Scrubbed => {
return Err(Error::Datastore(datastore::Error::Scrubbed))
}
};

// XOR this batch interval's checksum into the overall checksum
// https://www.ietf.org/archive/id/draft-ietf-ppm-dap-02.html#section-4.5.2
total_checksum = total_checksum.combined_with(batch_aggregation.checksum());
total_checksum = total_checksum.combined_with(checksum);

// Sum all the report counts
// https://www.ietf.org/archive/id/draft-ietf-ppm-dap-02.html#section-4.5.2
total_report_count += batch_aggregation.report_count();
total_report_count += report_count;

match &mut total_aggregate_share {
Some(share) => {
batch_aggregation
.aggregate_share()
aggregate_share
.as_ref()
.map(|other| share.merge(other))
.transpose()?;
}
None => total_aggregate_share = batch_aggregation.aggregate_share().cloned(),
None => total_aggregate_share = aggregate_share.clone(),
}
}

Expand Down
Loading

0 comments on commit 3a882aa

Please sign in to comment.