Skip to content

Commit

Permalink
Use Runtime and RuntimeManager instead of sleeping in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
inahga committed Jan 24, 2024
1 parent 9a12677 commit 37a3593
Show file tree
Hide file tree
Showing 10 changed files with 163 additions and 54 deletions.
153 changes: 115 additions & 38 deletions aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use janus_core::{
http::HttpErrorResponse,
time::{Clock, DurationExt, IntervalExt, TimeExt},
vdaf::{VdafInstance, VERIFY_KEY_LENGTH},
Runtime,
};
use janus_messages::{
query_type::{FixedSize, TimeInterval},
Expand Down Expand Up @@ -229,14 +230,16 @@ impl Default for Config {
}

impl<C: Clock> Aggregator<C> {
async fn new(
async fn new<R: Runtime + Send + Sync + 'static>(
datastore: Arc<Datastore<C>>,
clock: C,
runtime: R,
meter: &Meter,
cfg: Config,
) -> Result<Self, Error> {
let report_writer = Arc::new(ReportWriteBatcher::new(
Arc::clone(&datastore),
runtime,
cfg.task_counter_shard_count,
cfg.max_upload_batch_size,
cfg.max_upload_batch_write_delay,
Expand Down Expand Up @@ -3290,9 +3293,13 @@ mod tests {
self, test_util::generate_test_hpke_config_and_private_key_with_id,
HpkeApplicationInfo, HpkeKeypair, Label,
},
test_util::install_test_trace_subscriber,
test_util::{
install_test_trace_subscriber,
runtime::{TestRuntime, TestRuntimeManager},
},
time::{Clock, MockClock, TimeExt},
vdaf::{VdafInstance, VERIFY_KEY_LENGTH},
Runtime,
};
use janus_messages::{
query_type::TimeInterval, Duration, Extension, HpkeCiphertext, HpkeConfig, HpkeConfigId,
Expand All @@ -3305,7 +3312,6 @@ mod tests {
};
use rand::random;
use std::{collections::HashSet, iter, sync::Arc, time::Duration as StdDuration};
use tokio::time::sleep;

pub(super) fn create_report_custom(
task: &AggregatorTask,
Expand Down Expand Up @@ -3367,6 +3373,23 @@ mod tests {
Arc<Datastore<MockClock>>,
EphemeralDatastore,
) {
setup_upload_test_with_runtime(TestRuntime::default(), cfg).await
}

async fn setup_upload_test_with_runtime<R>(
runtime: R,
cfg: Config,
) -> (
Prio3Count,
Aggregator<MockClock>,
MockClock,
Task,
Arc<Datastore<MockClock>>,
EphemeralDatastore,
)
where
R: Runtime + Send + Sync + 'static,
{
let clock = MockClock::default();
let vdaf = Prio3Count::new_count(2).unwrap();
let task = TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Prio3Count).build();
Expand All @@ -3378,9 +3401,15 @@ mod tests {

datastore.put_aggregator_task(&leader_task).await.unwrap();

let aggregator = Aggregator::new(Arc::clone(&datastore), clock.clone(), &noop_meter(), cfg)
.await
.unwrap();
let aggregator = Aggregator::new(
Arc::clone(&datastore),
clock.clone(),
runtime,
&noop_meter(),
cfg,
)
.await
.unwrap();

(
vdaf,
Expand Down Expand Up @@ -3527,9 +3556,13 @@ mod tests {
async fn upload_wrong_hpke_config_id() {
install_test_trace_subscriber();

let config = default_aggregator_config();
let mut runtime_manager = TestRuntimeManager::new();
let (_, aggregator, clock, task, datastore, _ephemeral_datastore) =
setup_upload_test(config).await;
setup_upload_test_with_runtime(
runtime_manager.with_label("aggregator"),
default_aggregator_config(),
)
.await;
let leader_task = task.leader_view().unwrap();
let report = create_report(&leader_task, clock.now());

Expand Down Expand Up @@ -3565,8 +3598,10 @@ mod tests {
})
});

// Wait out the batch write delay so the report status can be flushed to the DB.
sleep(config.max_upload_batch_write_delay * 2).await;
// Wait for the report writer to have completed one write task.
runtime_manager
.wait_for_completed_tasks("aggregator", 1)
.await;

let got_counters = datastore
.run_unnamed_tx(|tx| {
Expand Down Expand Up @@ -3625,9 +3660,13 @@ mod tests {
#[tokio::test]
async fn upload_report_in_the_future_past_clock_skew() {
install_test_trace_subscriber();
let config = default_aggregator_config();
let mut runtime_manager = TestRuntimeManager::new();
let (_, aggregator, clock, task, datastore, _ephemeral_datastore) =
setup_upload_test(config).await;
setup_upload_test_with_runtime(
runtime_manager.with_label("aggregator"),
default_aggregator_config(),
)
.await;
let report = create_report(
&task.leader_view().unwrap(),
clock
Expand All @@ -3649,8 +3688,10 @@ mod tests {
assert_matches!(rejection.reason(), ReportRejectionReason::TooEarly);
});

// Wait out the batch write delay so the report status can be flushed to the DB.
sleep(config.max_upload_batch_write_delay * 2).await;
// Wait for the report writer to have completed one write task.
runtime_manager
.wait_for_completed_tasks("aggregator", 1)
.await;

let got_counters = datastore
.run_unnamed_tx(|tx| {
Expand All @@ -3669,9 +3710,13 @@ mod tests {
async fn upload_report_for_collected_batch() {
install_test_trace_subscriber();

let config = default_aggregator_config();
let mut runtime_manager = TestRuntimeManager::new();
let (_, aggregator, clock, task, datastore, _ephemeral_datastore) =
setup_upload_test(config).await;
setup_upload_test_with_runtime(
runtime_manager.with_label("aggregator"),
default_aggregator_config(),
)
.await;
let report = create_report(&task.leader_view().unwrap(), clock.now());

// Insert a collection job for the batch interval including our report.
Expand Down Expand Up @@ -3721,8 +3766,10 @@ mod tests {
}
);

// Wait out the batch write delay so the report status can be flushed to the DB.
sleep(config.max_upload_batch_write_delay * 2).await;
// Wait for the report writer to have completed one write task.
runtime_manager
.wait_for_completed_tasks("aggregator", 1)
.await;

let got_counters = datastore
.run_unnamed_tx(|tx| {
Expand Down Expand Up @@ -3817,9 +3864,13 @@ mod tests {
#[tokio::test]
async fn upload_report_task_expired() {
install_test_trace_subscriber();
let config = default_aggregator_config();
let mut runtime_manager = TestRuntimeManager::new();
let (_, aggregator, clock, _, datastore, _ephemeral_datastore) =
setup_upload_test(config).await;
setup_upload_test_with_runtime(
runtime_manager.with_label("aggregator"),
default_aggregator_config(),
)
.await;

let task = TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Prio3Count)
.with_task_expiration(Some(clock.now()))
Expand Down Expand Up @@ -3847,8 +3898,10 @@ mod tests {
}
);

// Wait out the batch write delay so the report status can be flushed to the DB.
sleep(config.max_upload_batch_write_delay * 2).await;
// Wait for the report writer to have completed one write task.
runtime_manager
.wait_for_completed_tasks("aggregator", 1)
.await;

let got_counters = datastore
.run_unnamed_tx(|tx| {
Expand All @@ -3866,9 +3919,13 @@ mod tests {
#[tokio::test]
async fn upload_report_report_expired() {
install_test_trace_subscriber();
let config = default_aggregator_config();
let mut runtime_manager = TestRuntimeManager::new();
let (_, aggregator, clock, _, datastore, _ephemeral_datastore) =
setup_upload_test(config).await;
setup_upload_test_with_runtime(
runtime_manager.with_label("aggregator"),
default_aggregator_config(),
)
.await;

let task = TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Prio3Count)
.with_report_expiry_age(Some(Duration::from_seconds(60)))
Expand Down Expand Up @@ -3897,8 +3954,10 @@ mod tests {
}
);

// Wait out the batch write delay so the report status can be flushed to the DB.
sleep(config.max_upload_batch_write_delay * 2).await;
// Wait for the report writer to have completed one write task.
runtime_manager
.wait_for_completed_tasks("aggregator", 1)
.await;

let got_counters = datastore
.run_unnamed_tx(|tx| {
Expand All @@ -3916,9 +3975,13 @@ mod tests {
#[tokio::test]
async fn upload_report_faulty_encryption() {
install_test_trace_subscriber();
let config = default_aggregator_config();
let mut runtime_manager = TestRuntimeManager::new();
let (_, aggregator, clock, task, datastore, _ephemeral_datastore) =
setup_upload_test(config).await;
setup_upload_test_with_runtime(
runtime_manager.with_label("aggregator"),
default_aggregator_config(),
)
.await;

let task = task.leader_view().unwrap();

Expand Down Expand Up @@ -3947,8 +4010,10 @@ mod tests {
}
);

// Wait out the batch write delay so the report status can be flushed to the DB.
sleep(config.max_upload_batch_write_delay * 2).await;
// Wait for the report writer to have completed one write task.
runtime_manager
.wait_for_completed_tasks("aggregator", 1)
.await;

let got_counters = datastore
.run_unnamed_tx(|tx| {
Expand All @@ -3966,9 +4031,13 @@ mod tests {
#[tokio::test]
async fn upload_report_public_share_decode_failure() {
install_test_trace_subscriber();
let config = default_aggregator_config();
let mut runtime_manager = TestRuntimeManager::new();
let (_, aggregator, clock, task, datastore, _ephemeral_datastore) =
setup_upload_test(config).await;
setup_upload_test_with_runtime(
runtime_manager.with_label("aggregator"),
default_aggregator_config(),
)
.await;

let task = task.leader_view().unwrap();

Expand Down Expand Up @@ -3996,8 +4065,10 @@ mod tests {
}
);

// Wait out the batch write delay so the report status can be flushed to the DB.
sleep(config.max_upload_batch_write_delay * 2).await;
// Wait for the report writer to have completed one write task.
runtime_manager
.wait_for_completed_tasks("aggregator", 1)
.await;

let got_counters = datastore
.run_unnamed_tx(|tx| {
Expand All @@ -4015,9 +4086,13 @@ mod tests {
#[tokio::test]
async fn upload_report_leader_input_share_decode_failure() {
install_test_trace_subscriber();
let config = default_aggregator_config();
let mut runtime_manager = TestRuntimeManager::new();
let (_, aggregator, clock, task, datastore, _ephemeral_datastore) =
setup_upload_test(config).await;
setup_upload_test_with_runtime(
runtime_manager.with_label("aggregator"),
default_aggregator_config(),
)
.await;

let task = task.leader_view().unwrap();

Expand Down Expand Up @@ -4059,8 +4134,10 @@ mod tests {
}
);

// Wait out the batch write delay so the report status can be flushed to the DB.
sleep(config.max_upload_batch_write_delay * 2).await;
// Wait for the report writer to have completed one write task.
runtime_manager
.wait_for_completed_tasks("aggregator", 1)
.await;

let got_counters = datastore
.run_unnamed_tx(|tx| {
Expand Down
5 changes: 4 additions & 1 deletion aggregator/src/aggregator/aggregate_init_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ use janus_aggregator_core::{
};
use janus_core::{
auth_tokens::{AuthenticationToken, DAP_AUTH_HEADER},
test_util::{dummy_vdaf, install_test_trace_subscriber, run_vdaf, VdafTranscript},
test_util::{
dummy_vdaf, install_test_trace_subscriber, run_vdaf, runtime::TestRuntime, VdafTranscript,
},
time::{Clock, MockClock, TimeExt as _},
vdaf::VdafInstance,
};
Expand Down Expand Up @@ -259,6 +261,7 @@ async fn setup_aggregate_init_test_without_sending_request<
let handler = aggregator_handler(
Arc::clone(&datastore),
clock.clone(),
TestRuntime::default(),
&noop_meter(),
Config::default(),
)
Expand Down
3 changes: 2 additions & 1 deletion aggregator/src/aggregator/aggregation_job_continue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ mod tests {
test_util::noop_meter,
};
use janus_core::{
test_util::install_test_trace_subscriber,
test_util::{install_test_trace_subscriber, runtime::TestRuntime},
time::{IntervalExt, MockClock},
vdaf::{VdafInstance, VERIFY_KEY_LENGTH},
};
Expand Down Expand Up @@ -530,6 +530,7 @@ mod tests {
let handler = aggregator_handler(
Arc::clone(&datastore),
clock,
TestRuntime::default(),
&meter,
default_aggregator_config(),
)
Expand Down
2 changes: 2 additions & 0 deletions aggregator/src/aggregator/collection_job_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use janus_core::{
test_util::{
dummy_vdaf::{self, AggregationParam},
install_test_trace_subscriber,
runtime::TestRuntime,
},
time::{Clock, IntervalExt, MockClock},
vdaf::VdafInstance,
Expand Down Expand Up @@ -143,6 +144,7 @@ pub(crate) async fn setup_collection_job_test_case(
let handler = aggregator_handler(
Arc::clone(&datastore),
clock.clone(),
TestRuntime::default(),
&noop_meter(),
Config {
batch_aggregation_shard_count: 32,
Expand Down
Loading

0 comments on commit 37a3593

Please sign in to comment.