Skip to content

Commit

Permalink
Add validation that min_batch_size is nonzero (#3357)
Browse files Browse the repository at this point in the history
  • Loading branch information
divergentdave authored Aug 8, 2024
1 parent 42d3609 commit f2ae5c1
Show file tree
Hide file tree
Showing 4 changed files with 203 additions and 144 deletions.
8 changes: 7 additions & 1 deletion aggregator/src/aggregator/aggregation_job_driver/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use prio::{
};
use rand::random;
use std::{sync::Arc, time::Duration as StdDuration};
use tokio::time::timeout;
use trillium_tokio::Stopper;

#[tokio::test]
Expand Down Expand Up @@ -255,7 +256,12 @@ async fn aggregation_job_driver() {

tracing::info!("awaiting stepper tasks");
// Wait for all of the aggregation job stepper tasks to complete.
runtime_manager.wait_for_completed_tasks("stepper", 2).await;
timeout(
StdDuration::from_secs(30),
runtime_manager.wait_for_completed_tasks("stepper", 2),
)
.await
.unwrap();
// Stop the aggregation job driver.
stopper.stop();
// Wait for the aggregation job driver task to complete.
Expand Down
231 changes: 159 additions & 72 deletions aggregator/src/aggregator/collection_job_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use janus_core::{
use janus_messages::{
query_type::{FixedSize, QueryType as QueryTypeTrait, TimeInterval},
AggregateShareAad, AggregationJobStep, BatchId, BatchSelector, Collection, CollectionJobId,
CollectionReq, FixedSizeQuery, Interval, Query, ReportIdChecksum, Role, Time,
CollectionReq, FixedSizeQuery, Interval, Query, Role, Time,
};
use prio::{
codec::{Decode, Encode},
Expand Down Expand Up @@ -124,6 +124,133 @@ impl CollectionJobTestCase {
)
.await
}

/// Seed the database with reports and various aggregation artifacts from aggregating them in a
/// fixed size batch.
pub(super) async fn setup_fixed_size_batch(&self, time: Time, report_count: u64) -> BatchId {
let batch_id = random();
self.datastore
.run_unnamed_tx(|tx| {
let task = self.task.clone();
Box::pin(async move {
let client_timestamp_interval = Interval::from_time(&time).unwrap();
let aggregation_job_id = random();
tx.put_aggregation_job(&AggregationJob::<0, FixedSize, dummy::Vdaf>::new(
*task.id(),
aggregation_job_id,
dummy::AggregationParam::default(),
batch_id,
client_timestamp_interval,
AggregationJobState::Finished,
AggregationJobStep::from(1),
))
.await
.unwrap();
for ord in 0..report_count {
let report = LeaderStoredReport::new_dummy(*task.id(), time);
tx.put_client_report(&report).await.unwrap();
tx.scrub_client_report(report.task_id(), report.metadata().id())
.await
.unwrap();
tx.put_report_aggregation(&ReportAggregation::new(
*task.id(),
aggregation_job_id,
*report.metadata().id(),
time,
ord,
None,
ReportAggregationState::<0, dummy::Vdaf>::Finished,
))
.await
.unwrap();
}
let batch_aggregation = BatchAggregation::<0, FixedSize, dummy::Vdaf>::new(
*task.id(),
batch_id,
dummy::AggregationParam::default(),
0,
client_timestamp_interval,
BatchAggregationState::Aggregating {
aggregate_share: Some(dummy::AggregateShare(0)),
report_count,
checksum: Default::default(),
aggregation_jobs_created: 1,
aggregation_jobs_terminated: 1,
},
);
tx.put_batch_aggregation(&batch_aggregation).await.unwrap();
tx.put_outstanding_batch(task.id(), &batch_id, &None)
.await
.unwrap();
Ok(())
})
})
.await
.unwrap();
batch_id
}

/// Seed the database with a report and various aggregation artifacts from aggregating it in a
/// time interval batch.
pub(super) async fn setup_time_interval_batch(&self, time: Time) -> Interval {
self.datastore
.run_unnamed_tx(|tx| {
let task = self.task.clone();
Box::pin(async move {
let report = LeaderStoredReport::new_dummy(*task.id(), time);
let client_timestamp_interval =
Interval::from_time(report.metadata().time()).unwrap();
let batch_interval = client_timestamp_interval
.align_to_time_precision(task.time_precision())
.unwrap();
let aggregation_job_id = random();
tx.put_client_report(&report).await.unwrap();
tx.scrub_client_report(report.task_id(), report.metadata().id())
.await
.unwrap();
tx.put_aggregation_job(&AggregationJob::<0, TimeInterval, dummy::Vdaf>::new(
*task.id(),
aggregation_job_id,
dummy::AggregationParam::default(),
(),
client_timestamp_interval,
AggregationJobState::Finished,
AggregationJobStep::from(1),
))
.await
.unwrap();
tx.put_report_aggregation(&ReportAggregation::new(
*task.id(),
aggregation_job_id,
*report.metadata().id(),
time,
0,
None,
ReportAggregationState::<0, dummy::Vdaf>::Finished,
))
.await
.unwrap();
let batch_aggregation = BatchAggregation::<0, TimeInterval, dummy::Vdaf>::new(
*task.id(),
batch_interval,
dummy::AggregationParam::default(),
0,
client_timestamp_interval,
BatchAggregationState::Aggregating {
aggregate_share: Some(dummy::AggregateShare(0)),
report_count: 1,
checksum: Default::default(),
aggregation_jobs_created: 1,
aggregation_jobs_terminated: 1,
},
);
tx.put_batch_aggregation(&batch_aggregation).await.unwrap();
Ok(batch_interval)
})
})
.await
.unwrap()
}
}

pub(crate) async fn setup_collection_job_test_case(
Expand Down Expand Up @@ -174,77 +301,17 @@ async fn setup_fixed_size_current_batch_collection_job_test_case(
)
.await;

// Fill the datastore with the necessary data so that there is are two outstanding batches to be
// Fill the datastore with the necessary data so that there are two outstanding batches to be
// collected.
let batch_id_1 = random();
let batch_id_2 = random();
let time = test_case.clock.now();
let batch_id_1 = test_case
.setup_fixed_size_batch(time, test_case.task.min_batch_size() + 1)
.await;
let batch_id_2 = test_case
.setup_fixed_size_batch(time, test_case.task.min_batch_size() + 1)
.await;
let client_timestamp_interval = Interval::from_time(&time).unwrap();

test_case
.datastore
.run_unnamed_tx(|tx| {
let task = test_case.task.clone();
Box::pin(async move {
for batch_id in [batch_id_1, batch_id_2] {
let aggregation_job_id = random();
tx.put_aggregation_job::<0, FixedSize, dummy::Vdaf>(&AggregationJob::new(
*task.id(),
aggregation_job_id,
dummy::AggregationParam::default(),
batch_id,
client_timestamp_interval,
AggregationJobState::Finished,
AggregationJobStep::from(1),
))
.await
.unwrap();

for ord in 0..task.min_batch_size() + 1 {
let report = LeaderStoredReport::new_dummy(*task.id(), time);
tx.put_client_report(&report).await.unwrap();

tx.put_report_aggregation::<0, dummy::Vdaf>(&ReportAggregation::new(
*task.id(),
aggregation_job_id,
*report.metadata().id(),
time,
ord,
None,
ReportAggregationState::Finished,
))
.await
.unwrap();
}

tx.put_batch_aggregation::<0, FixedSize, dummy::Vdaf>(&BatchAggregation::new(
*task.id(),
batch_id,
dummy::AggregationParam::default(),
0,
client_timestamp_interval,
BatchAggregationState::Aggregating {
aggregate_share: Some(dummy::AggregateShare(0)),
report_count: task.min_batch_size() + 1,
checksum: ReportIdChecksum::default(),
aggregation_jobs_created: 1,
aggregation_jobs_terminated: 1,
},
))
.await
.unwrap();

tx.put_outstanding_batch(task.id(), &batch_id, &None)
.await
.unwrap();
}

Ok(())
})
})
.await
.unwrap();

(test_case, batch_id_1, batch_id_2, client_timestamp_interval)
}

Expand Down Expand Up @@ -411,6 +478,9 @@ async fn collection_job_success_fixed_size() {
#[tokio::test]
async fn collection_job_put_idempotence_time_interval() {
let test_case = setup_collection_job_test_case(Role::Leader, QueryType::TimeInterval).await;
test_case
.setup_time_interval_batch(Time::from_seconds_since_epoch(0))
.await;

let collection_job_id = random();
let request = CollectionReq::new(
Expand Down Expand Up @@ -460,6 +530,9 @@ async fn collection_job_put_idempotence_time_interval_varied_collection_id() {
// parameters that the batch has been collected against.

let test_case = setup_collection_job_test_case(Role::Leader, QueryType::TimeInterval).await;
test_case
.setup_time_interval_batch(Time::from_seconds_since_epoch(0))
.await;

let collection_job_ids = HashSet::from(random::<[CollectionJobId; 2]>());
let request = CollectionReq::new(
Expand Down Expand Up @@ -568,6 +641,9 @@ async fn collection_job_put_idempotence_fixed_size_varied_collection_id() {
#[tokio::test]
async fn collection_job_put_idempotence_time_interval_mutate_time_interval() {
let test_case = setup_collection_job_test_case(Role::Leader, QueryType::TimeInterval).await;
test_case
.setup_time_interval_batch(Time::from_seconds_since_epoch(0))
.await;

let collection_job_id = random();
let request = CollectionReq::new(
Expand Down Expand Up @@ -606,6 +682,9 @@ async fn collection_job_put_idempotence_time_interval_mutate_time_interval() {
#[tokio::test]
async fn collection_job_put_idempotence_time_interval_mutate_aggregation_param() {
let test_case = setup_collection_job_test_case(Role::Leader, QueryType::TimeInterval).await;
test_case
.setup_time_interval_batch(Time::from_seconds_since_epoch(0))
.await;

let collection_job_id = random();
let request = CollectionReq::new(
Expand Down Expand Up @@ -772,9 +851,11 @@ async fn collection_job_put_idempotence_fixed_size_by_batch_id() {
},
)
.await;
let batch_id = test_case
.setup_fixed_size_batch(test_case.clock.now(), 1)
.await;

let collection_job_id = random();
let batch_id = random();

let request = CollectionReq::new(
Query::new_fixed_size(FixedSizeQuery::ByBatchId { batch_id }),
Expand All @@ -800,10 +881,14 @@ async fn collection_job_put_idempotence_fixed_size_by_batch_id_mutate_batch_id()
},
)
.await;
let first_batch_id = test_case
.setup_fixed_size_batch(test_case.clock.now(), 1)
.await;
let second_batch_id = test_case
.setup_fixed_size_batch(test_case.clock.now(), 1)
.await;

let collection_job_id = random();
let first_batch_id = random();
let second_batch_id = random();

let response = test_case
.put_collection_job(
Expand Down Expand Up @@ -842,9 +927,11 @@ async fn collection_job_put_idempotence_fixed_size_by_batch_id_mutate_aggregatio
},
)
.await;
let batch_id = test_case
.setup_fixed_size_batch(test_case.clock.now(), 1)
.await;

let collection_job_id = random();
let batch_id = random();
let first_aggregation_param = dummy::AggregationParam(0);
let second_aggregation_param = dummy::AggregationParam(1);

Expand Down
Loading

0 comments on commit f2ae5c1

Please sign in to comment.