Skip to content

Commit

Permalink
[0.6] Helper: disallow reports from too far in the future. (#2601)
Browse files Browse the repository at this point in the history
Like with the Leader, this is based on the `tolerable_clock_skew` task
parameter.
  • Loading branch information
branlwyd authored Feb 2, 2024
1 parent b573b6f commit 1d1eb8c
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 14 deletions.
25 changes: 25 additions & 0 deletions aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ impl<C: Clock> Aggregator<C> {
task_aggregator
.handle_aggregate_init(
&self.datastore,
&self.clock,
&self.global_hpke_keypairs,
&self.aggregate_step_failure_counter,
self.cfg.batch_aggregation_shard_count,
Expand Down Expand Up @@ -942,6 +943,7 @@ impl<C: Clock> TaskAggregator<C> {
async fn handle_aggregate_init(
&self,
datastore: &Datastore<C>,
clock: &C,
global_hpke_keypairs: &GlobalHpkeKeypairCache,
aggregate_step_failure_counter: &Counter<u64>,
batch_aggregation_shard_count: u64,
Expand All @@ -951,6 +953,7 @@ impl<C: Clock> TaskAggregator<C> {
self.vdaf_ops
.handle_aggregate_init(
datastore,
clock,
global_hpke_keypairs,
aggregate_step_failure_counter,
Arc::clone(&self.task),
Expand Down Expand Up @@ -1283,6 +1286,7 @@ impl VdafOps {
async fn handle_aggregate_init<C: Clock>(
&self,
datastore: &Datastore<C>,
clock: &C,
global_hpke_keypairs: &GlobalHpkeKeypairCache,
aggregate_step_failure_counter: &Counter<u64>,
task: Arc<AggregatorTask>,
Expand All @@ -1295,6 +1299,7 @@ impl VdafOps {
vdaf_ops_dispatch!(self, (vdaf, verify_key, VdafType, VERIFY_KEY_LENGTH) => {
Self::handle_aggregate_init_generic::<VERIFY_KEY_LENGTH, TimeInterval, VdafType, _>(
datastore,
clock,
global_hpke_keypairs,
Arc::clone(vdaf),
aggregate_step_failure_counter,
Expand All @@ -1311,6 +1316,7 @@ impl VdafOps {
vdaf_ops_dispatch!(self, (vdaf, verify_key, VdafType, VERIFY_KEY_LENGTH) => {
Self::handle_aggregate_init_generic::<VERIFY_KEY_LENGTH, FixedSize, VdafType, _>(
datastore,
clock,
global_hpke_keypairs,
Arc::clone(vdaf),
aggregate_step_failure_counter,
Expand Down Expand Up @@ -1625,6 +1631,7 @@ impl VdafOps {
/// [1]: https://www.ietf.org/archive/id/draft-ietf-ppm-dap-07.html#name-helper-initialization
async fn handle_aggregate_init_generic<const SEED_SIZE: usize, Q, A, C>(
datastore: &Datastore<C>,
clock: &C,
global_hpke_keypairs: &GlobalHpkeKeypairCache,
vdaf: Arc<A>,
aggregate_step_failure_counter: &Counter<u64>,
Expand All @@ -1650,6 +1657,11 @@ impl VdafOps {
let request_hash = digest(&SHA256, req_bytes).as_ref().try_into().unwrap();
let req = AggregationJobInitializeReq::<Q>::get_decoded(req_bytes)?;

let report_deadline = clock
.now()
.add(task.tolerable_clock_skew())
.map_err(Error::from)?;

// If two ReportShare messages have the same report ID, then the helper MUST abort with
// error "invalidMessage". (§4.5.1.2)
let mut seen_report_ids = HashSet::with_capacity(req.prepare_inits().len());
Expand Down Expand Up @@ -1833,6 +1845,19 @@ impl VdafOps {

let shares = input_share.and_then(|input_share| Ok((public_share?, input_share)));

// Reject reports from too far in the future.
let shares = shares.and_then(|shares| {
if prepare_init
.report_share()
.metadata()
.time()
.is_after(&report_deadline)
{
return Err(PrepareError::ReportTooEarly);
}
Ok(shares)
});

// Next, the aggregator runs the preparation-state initialization algorithm for the VDAF
// associated with the task and computes the first state transition. [...] If either
// step fails, then the aggregator MUST fail with error `vdaf-prep-error`. (§4.4.2.2)
Expand Down
98 changes: 87 additions & 11 deletions aggregator/src/aggregator/aggregate_init_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ use janus_core::{
};
use janus_messages::{
query_type::TimeInterval, AggregationJobId, AggregationJobInitializeReq, AggregationJobResp,
HpkeConfig, PartialBatchSelector, PrepareInit, PrepareStepResult, ReportMetadata, ReportShare,
Duration, HpkeConfig, PartialBatchSelector, PrepareError, PrepareInit, PrepareStepResult,
ReportMetadata, ReportShare,
};
use prio::{
codec::Encode,
Expand Down Expand Up @@ -159,9 +160,8 @@ pub(super) struct AggregationJobInitTestCase<
pub(super) clock: MockClock,
pub(super) task: Task,
pub(super) prepare_init_generator: PrepareInitGenerator<VERIFY_KEY_SIZE, V>,
pub(super) prepare_inits: Vec<PrepareInit>,
pub(super) aggregation_job_id: AggregationJobId,
aggregation_job_init_req: AggregationJobInitializeReq<TimeInterval>,
pub(super) aggregation_job_init_req: AggregationJobInitializeReq<TimeInterval>,
aggregation_job_init_resp: Option<AggregationJobResp>,
pub(super) aggregation_param: V::AggregationParam,
pub(super) handler: Box<dyn Handler>,
Expand Down Expand Up @@ -287,7 +287,6 @@ async fn setup_aggregate_init_test_without_sending_request<
AggregationJobInitTestCase {
clock,
task,
prepare_inits,
prepare_init_generator,
aggregation_job_id,
aggregation_job_init_req,
Expand Down Expand Up @@ -396,7 +395,7 @@ async fn aggregation_job_mutation_aggregation_job() {
let mutated_aggregation_job_init_req = AggregationJobInitializeReq::new(
dummy_vdaf::AggregationParam(1).get_encoded(),
PartialBatchSelector::new_time_interval(),
test_case.prepare_inits,
test_case.aggregation_job_init_req.prepare_inits().to_vec(),
);

let response = put_aggregation_job(
Expand All @@ -413,25 +412,27 @@ async fn aggregation_job_mutation_aggregation_job() {
async fn aggregation_job_mutation_report_shares() {
let test_case = setup_aggregate_init_test().await;

let prepare_inits = test_case.aggregation_job_init_req.prepare_inits();

// Put the aggregation job again, mutating the associated report shares' metadata such that
// uniqueness constraints on client_reports are violated
for mutated_prepare_inits in [
// Omit a report share that was included previously
Vec::from(&test_case.prepare_inits[0..test_case.prepare_inits.len() - 1]),
Vec::from(&prepare_inits[0..prepare_inits.len() - 1]),
// Include a different report share than was included previously
[
&test_case.prepare_inits[0..test_case.prepare_inits.len() - 1],
&prepare_inits[0..prepare_inits.len() - 1],
&[test_case.prepare_init_generator.next(&()).0],
]
.concat(),
// Include an extra report share than was included previously
[
test_case.prepare_inits.as_slice(),
prepare_inits,
&[test_case.prepare_init_generator.next(&()).0],
]
.concat(),
// Reverse the order of the reports
test_case.prepare_inits.into_iter().rev().collect(),
prepare_inits.iter().rev().cloned().collect(),
] {
let mutated_aggregation_job_init_req = AggregationJobInitializeReq::new(
test_case.aggregation_param.get_encoded(),
Expand All @@ -458,7 +459,8 @@ async fn aggregation_job_mutation_report_aggregations() {
// values such that the prepare state computed during aggregation initializaton won't match the
// first aggregation job.
let mutated_prepare_inits = test_case
.prepare_inits
.aggregation_job_init_req
.prepare_inits()
.iter()
.map(|s| {
test_case
Expand Down Expand Up @@ -487,6 +489,80 @@ async fn aggregation_job_mutation_report_aggregations() {
assert_eq!(response.status(), Some(Status::Conflict));
}

#[tokio::test]
async fn aggregation_job_intolerable_clock_skew() {
let mut test_case = setup_aggregate_init_test_without_sending_request(
dummy_vdaf::Vdaf::new(),
VdafInstance::Fake,
dummy_vdaf::AggregationParam(0),
(),
AuthenticationToken::Bearer(random()),
)
.await;

test_case.aggregation_job_init_req = AggregationJobInitializeReq::new(
test_case.aggregation_param.get_encoded(),
PartialBatchSelector::new_time_interval(),
Vec::from([
// Barely tolerable.
test_case
.prepare_init_generator
.next_with_metadata(
ReportMetadata::new(
random(),
test_case
.clock
.now()
.add(test_case.task.tolerable_clock_skew())
.unwrap(),
),
&(),
)
.0,
// Barely intolerable.
test_case
.prepare_init_generator
.next_with_metadata(
ReportMetadata::new(
random(),
test_case
.clock
.now()
.add(test_case.task.tolerable_clock_skew())
.unwrap()
.add(&Duration::from_seconds(1))
.unwrap(),
),
&(),
)
.0,
]),
);

let mut response = put_aggregation_job(
&test_case.task,
&test_case.aggregation_job_id,
&test_case.aggregation_job_init_req,
&test_case.handler,
)
.await;
assert_eq!(response.status(), Some(Status::Ok));

let aggregation_job_init_resp: AggregationJobResp = decode_response_body(&mut response).await;
assert_eq!(
aggregation_job_init_resp.prepare_resps().len(),
test_case.aggregation_job_init_req.prepare_inits().len(),
);
assert_matches!(
aggregation_job_init_resp.prepare_resps()[0].result(),
&PrepareStepResult::Continue { .. }
);
assert_matches!(
aggregation_job_init_resp.prepare_resps()[1].result(),
&PrepareStepResult::Reject(PrepareError::ReportTooEarly)
);
}

#[tokio::test]
async fn aggregation_job_init_two_step_vdaf_idempotence() {
// We must run Poplar1 in this test so that the aggregation job won't finish on the first step
Expand Down Expand Up @@ -517,7 +593,7 @@ async fn aggregation_job_init_wrong_query() {
let wrong_query = AggregationJobInitializeReq::new(
test_case.aggregation_param.get_encoded(),
PartialBatchSelector::new_fixed_size(random()),
test_case.prepare_inits,
test_case.aggregation_job_init_req.prepare_inits().to_vec(),
);

let (header, value) = test_case
Expand Down
13 changes: 10 additions & 3 deletions aggregator/src/aggregator/http_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2398,7 +2398,10 @@ mod tests {

// This report has the same ID as the previous one, but a different timestamp.
let mutated_timestamp_report_metadata = ReportMetadata::new(
*test_case.prepare_inits[0].report_share().metadata().id(),
*test_case.aggregation_job_init_req.prepare_inits()[0]
.report_share()
.metadata()
.id(),
test_case
.clock
.now()
Expand Down Expand Up @@ -2454,11 +2457,15 @@ mod tests {
assert_eq!(client_reports.len(), 2);
assert_eq!(
&client_reports[0],
test_case.prepare_inits[0].report_share().metadata()
test_case.aggregation_job_init_req.prepare_inits()[0]
.report_share()
.metadata()
);
assert_eq!(
&client_reports[1],
test_case.prepare_inits[1].report_share().metadata()
test_case.aggregation_job_init_req.prepare_inits()[1]
.report_share()
.metadata()
);
}

Expand Down

0 comments on commit 1d1eb8c

Please sign in to comment.