Skip to content

Commit

Permalink
Add discrete event simulation tests (#3324)
Browse files Browse the repository at this point in the history
* Add discrete event simulation tests

* Inspect HTTP responses

* Add tests for issue #2442

* Add test for issue #2464

* Add a test for abandoned agg job => batch mismatch
  • Loading branch information
divergentdave authored Aug 1, 2024
1 parent 3a90c6e commit 21c4d29
Show file tree
Hide file tree
Showing 13 changed files with 2,326 additions and 15 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 14 additions & 14 deletions aggregator/src/aggregator/aggregation_job_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ use trillium_tokio::{CloneCounterObserver, Stopper};

pub struct AggregationJobCreator<C: Clock> {
// Dependencies.
datastore: Datastore<C>,
datastore: Arc<Datastore<C>>,
meter: Meter,

// Configuration values.
Expand All @@ -92,7 +92,7 @@ pub struct AggregationJobCreator<C: Clock> {

impl<C: Clock + 'static> AggregationJobCreator<C> {
pub fn new(
datastore: Datastore<C>,
datastore: Arc<Datastore<C>>,
meter: Meter,
batch_aggregation_shard_count: u64,
tasks_update_frequency: Duration,
Expand Down Expand Up @@ -298,7 +298,7 @@ impl<C: Clock + 'static> AggregationJobCreator<C> {
fields(task_id = ?task.id()),
err
)]
async fn create_aggregation_jobs_for_task(
pub async fn create_aggregation_jobs_for_task(
self: Arc<Self>,
task: Arc<AggregatorTask>,
) -> anyhow::Result<bool> {
Expand Down Expand Up @@ -1034,7 +1034,7 @@ mod tests {
// kill it.
const AGGREGATION_JOB_CREATION_INTERVAL: Duration = Duration::from_secs(1);
let job_creator = Arc::new(AggregationJobCreator::new(
ds,
Arc::new(ds),
noop_meter(),
BATCH_AGGREGATION_SHARD_COUNT,
Duration::from_secs(3600),
Expand Down Expand Up @@ -1215,7 +1215,7 @@ mod tests {

// Run.
let job_creator = Arc::new(AggregationJobCreator::new(
ds,
Arc::new(ds),
noop_meter(),
BATCH_AGGREGATION_SHARD_COUNT,
Duration::from_secs(3600),
Expand Down Expand Up @@ -1400,7 +1400,7 @@ mod tests {

// Run.
let job_creator = Arc::new(AggregationJobCreator::new(
ds,
Arc::new(ds),
noop_meter(),
BATCH_AGGREGATION_SHARD_COUNT,
Duration::from_secs(3600),
Expand Down Expand Up @@ -1626,7 +1626,7 @@ mod tests {

// Run.
let job_creator = Arc::new(AggregationJobCreator::new(
ds,
Arc::new(ds),
noop_meter(),
1,
Duration::from_secs(3600),
Expand Down Expand Up @@ -1793,7 +1793,7 @@ mod tests {

// Run.
let job_creator = Arc::new(AggregationJobCreator::new(
ds,
Arc::new(ds),
noop_meter(),
BATCH_AGGREGATION_SHARD_COUNT,
Duration::from_secs(3600),
Expand Down Expand Up @@ -2010,7 +2010,7 @@ mod tests {

// Run.
let job_creator = Arc::new(AggregationJobCreator::new(
ds,
Arc::new(ds),
meter,
BATCH_AGGREGATION_SHARD_COUNT,
Duration::from_secs(3600),
Expand Down Expand Up @@ -2179,7 +2179,7 @@ mod tests {

// Run.
let job_creator = Arc::new(AggregationJobCreator::new(
ds,
Arc::new(ds),
meter,
BATCH_AGGREGATION_SHARD_COUNT,
Duration::from_secs(3600),
Expand Down Expand Up @@ -2443,7 +2443,7 @@ mod tests {

// Run.
let job_creator = Arc::new(AggregationJobCreator::new(
ds,
Arc::new(ds),
meter,
BATCH_AGGREGATION_SHARD_COUNT,
Duration::from_secs(3600),
Expand Down Expand Up @@ -2738,7 +2738,7 @@ mod tests {

// Run.
let job_creator = Arc::new(AggregationJobCreator::new(
ds,
Arc::new(ds),
meter,
BATCH_AGGREGATION_SHARD_COUNT,
Duration::from_secs(3600),
Expand Down Expand Up @@ -3030,7 +3030,7 @@ mod tests {

// Run.
let job_creator = Arc::new(AggregationJobCreator::new(
ds,
Arc::new(ds),
noop_meter(),
BATCH_AGGREGATION_SHARD_COUNT,
Duration::from_secs(3600),
Expand Down Expand Up @@ -3235,7 +3235,7 @@ mod tests {
.unwrap();

let job_creator = Arc::new(AggregationJobCreator::new(
ds,
Arc::new(ds),
noop_meter(),
BATCH_AGGREGATION_SHARD_COUNT,
Duration::from_secs(3600),
Expand Down
2 changes: 1 addition & 1 deletion aggregator/src/binaries/aggregation_job_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tracing::info;
pub async fn main_callback(ctx: BinaryContext<RealClock, Options, Config>) -> Result<()> {
// Start creating aggregation jobs.
let aggregation_job_creator = Arc::new(AggregationJobCreator::new(
ctx.datastore,
Arc::new(ctx.datastore),
ctx.meter,
ctx.config.batch_aggregation_shard_count,
Duration::from_secs(ctx.config.tasks_update_frequency_s),
Expand Down
7 changes: 7 additions & 0 deletions integration_tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,15 @@ uuid.workspace = true

[dev-dependencies]
chrono.workspace = true
derivative.workspace = true
divviup-client = { workspace = true, features = ["admin"] }
janus_collector = { workspace = true, features = ["test-util"] }
opentelemetry.workspace = true
quickcheck.workspace = true
regex.workspace = true
rstest.workspace = true
tempfile = { workspace = true }
tracing.workspace = true
trillium.workspace = true
trillium-macros.workspace = true
trillium-rustls.workspace = true
1 change: 1 addition & 0 deletions integration_tests/tests/integration/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod daphne;
mod divviup_ts;
mod in_cluster;
mod janus;
mod simulation;

fn initialize_rustls() {
// Choose aws-lc-rs as the default rustls crypto provider. This is what's currently enabled by
Expand Down
Loading

0 comments on commit 21c4d29

Please sign in to comment.