From f1dc65f2fa23f4a033595415a02d1bf8d2c2228e Mon Sep 17 00:00:00 2001 From: Ameer Ghani Date: Wed, 11 Oct 2023 15:53:17 -0400 Subject: [PATCH] Ensure all production code provides transaction names (#2114) --- aggregator/src/aggregator.rs | 28 +-- .../aggregator/aggregation_job_continue.rs | 8 +- .../src/aggregator/aggregation_job_creator.rs | 56 ++--- .../src/aggregator/aggregation_job_driver.rs | 44 ++-- .../src/aggregator/collection_job_driver.rs | 26 +- .../src/aggregator/collection_job_tests.rs | 18 +- .../src/aggregator/garbage_collector.rs | 20 +- aggregator/src/aggregator/http_handlers.rs | 60 ++--- aggregator/src/aggregator/report_writer.rs | 2 +- aggregator/src/aggregator/taskprov_tests.rs | 8 +- aggregator/src/bin/janus_cli.rs | 10 +- aggregator/src/cache.rs | 4 +- aggregator_api/src/routes.rs | 28 +-- aggregator_api/src/tests.rs | 50 ++-- aggregator_core/src/datastore.rs | 35 +-- aggregator_core/src/datastore/tests.rs | 228 +++++++++--------- .../src/bin/janus_interop_aggregator.rs | 2 +- 17 files changed, 320 insertions(+), 307 deletions(-) diff --git a/aggregator/src/aggregator.rs b/aggregator/src/aggregator.rs index b0b77c167..b49b530fb 100644 --- a/aggregator/src/aggregator.rs +++ b/aggregator/src/aggregator.rs @@ -621,7 +621,7 @@ impl Aggregator { // Slow path: retrieve task, create a task aggregator, store it to the cache, then return it. match self .datastore - .run_tx_with_name("task_aggregator_get_task", |tx| { + .run_tx("task_aggregator_get_task", |tx| { let task_id = *task_id; Box::pin(async move { tx.get_aggregator_task(&task_id).await }) }) @@ -689,7 +689,7 @@ impl Aggregator { ) .map_err(|err| Error::InvalidTask(*task_id, OptOutReason::TaskParameters(err)))?; self.datastore - .run_tx_with_name("taskprov_put_task", |tx| { + .run_tx("taskprov_put_task", |tx| { let task = task.clone(); Box::pin(async move { tx.put_aggregator_task(&task).await }) }) @@ -1883,7 +1883,7 @@ impl VdafOps { let accumulator = Arc::new(accumulator); Ok(datastore - .run_tx_with_name("aggregate_init", |tx| { + .run_tx("aggregate_init", |tx| { let vdaf = vdaf.clone(); let task = Arc::clone(&task); let req = Arc::clone(&req); @@ -2074,7 +2074,7 @@ impl VdafOps { // TODO(#224): don't hold DB transaction open while computing VDAF updates? // TODO(#224): don't do O(n) network round-trips (where n is the number of prepare steps) Ok(datastore - .run_tx_with_name("aggregate_continue", |tx| { + .run_tx("aggregate_continue", |tx| { let ( vdaf, aggregate_step_failure_counter, @@ -2233,7 +2233,7 @@ impl VdafOps { )?); Ok(datastore - .run_tx_with_name("collect", move |tx| { + .run_tx("collect", move |tx| { let (task, vdaf, collection_job_id, req, aggregation_param) = ( Arc::clone(&task), Arc::clone(&vdaf), @@ -2528,7 +2528,7 @@ impl VdafOps { A::AggregateShare: Send + Sync, { let (collection_job, spanned_interval) = datastore - .run_tx_with_name("get_collection_job", |tx| { + .run_tx("get_collection_job", |tx| { let (task, vdaf, collection_job_id) = (Arc::clone(&task), Arc::clone(&vdaf), *collection_job_id); Box::pin(async move { @@ -2703,7 +2703,7 @@ impl VdafOps { A::AggregateShare: Send + Sync + PartialEq + Eq, { datastore - .run_tx_with_name("delete_collection_job", move |tx| { + .run_tx("delete_collection_job", move |tx| { let (task, vdaf, collection_job_id) = (Arc::clone(&task), Arc::clone(&vdaf), *collection_job_id); Box::pin(async move { @@ -2828,7 +2828,7 @@ impl VdafOps { } let aggregate_share_job = datastore - .run_tx_with_name("aggregate_share", |tx| { + .run_tx("aggregate_share", |tx| { let (task, vdaf, aggregate_share_req) = ( Arc::clone(&task), Arc::clone(&vdaf), @@ -3256,7 +3256,7 @@ mod tests { .unwrap(); let got_report = datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (vdaf, task_id, report_id) = (vdaf.clone(), *task.id(), *report.metadata().id()); Box::pin(async move { tx.get_client_report(&vdaf, &task_id, &report_id).await }) @@ -3321,7 +3321,7 @@ mod tests { .unwrap(); let got_report_ids = datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let vdaf = vdaf.clone(); let task = task.clone(); Box::pin(async move { tx.get_client_reports_for_task(&vdaf, task.id()).await }) @@ -3386,7 +3386,7 @@ mod tests { .unwrap(); let got_report = datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (vdaf, task_id, report_id) = (vdaf.clone(), *task.id(), *report.metadata().id()); Box::pin(async move { tx.get_client_report(&vdaf, &task_id, &report_id).await }) @@ -3445,7 +3445,7 @@ mod tests { ) .unwrap(); datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task = task.clone(); Box::pin(async move { tx.put_collection_job(&CollectionJob::< @@ -3501,7 +3501,7 @@ mod tests { ); datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let global_hpke_keypair_same_id = global_hpke_keypair_same_id.clone(); let global_hpke_keypair_different_id = global_hpke_keypair_different_id.clone(); Box::pin(async move { @@ -3538,7 +3538,7 @@ mod tests { .unwrap(); let got_report = datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (vdaf, task_id, report_id) = (vdaf.clone(), *task.id(), *report.metadata().id()); Box::pin(async move { tx.get_client_report(&vdaf, &task_id, &report_id).await }) diff --git a/aggregator/src/aggregator/aggregation_job_continue.rs b/aggregator/src/aggregator/aggregation_job_continue.rs index 903df0e8e..435af6705 100644 --- a/aggregator/src/aggregator/aggregation_job_continue.rs +++ b/aggregator/src/aggregator/aggregation_job_continue.rs @@ -462,7 +462,7 @@ mod tests { prepare_init_generator.next(&IdpfInput::from_bools(&[true])); datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (task, aggregation_param, prepare_init, transcript) = ( helper_task.clone(), aggregation_param.clone(), @@ -630,7 +630,7 @@ mod tests { let (before_aggregation_job, before_report_aggregations) = test_case .datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (task_id, unrelated_prepare_init, aggregation_job_id) = ( *test_case.task.id(), unrelated_prepare_init.clone(), @@ -689,7 +689,7 @@ mod tests { // Make sure the state of the aggregation job and report aggregations has not changed let (after_aggregation_job, after_report_aggregations) = test_case .datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (task_id, aggregation_job_id) = (*test_case.task.id(), test_case.aggregation_job_id); Box::pin(async move { @@ -727,7 +727,7 @@ mod tests { test_case .datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (task_id, aggregation_job_id) = (*test_case.task.id(), test_case.aggregation_job_id); Box::pin(async move { diff --git a/aggregator/src/aggregator/aggregation_job_creator.rs b/aggregator/src/aggregator/aggregation_job_creator.rs index 3bf2890a9..597aa7bff 100644 --- a/aggregator/src/aggregator/aggregation_job_creator.rs +++ b/aggregator/src/aggregator/aggregation_job_creator.rs @@ -160,7 +160,7 @@ impl AggregationJobCreator { debug!("Updating tasks"); let tasks = self .datastore - .run_tx_with_name("aggregation_job_creator_get_tasks", |tx| { + .run_tx("aggregation_job_creator_get_tasks", |tx| { Box::pin(async move { tx.get_aggregator_tasks().await }) }) .await?; @@ -547,7 +547,7 @@ impl AggregationJobCreator { { Ok(self .datastore - .run_tx_with_name("aggregation_job_creator_time_no_param", |tx| { + .run_tx("aggregation_job_creator_time_no_param", |tx| { let this = Arc::clone(&self); let task = Arc::clone(&task); let vdaf = Arc::clone(&vdaf); @@ -654,7 +654,7 @@ impl AggregationJobCreator { ); Ok(self .datastore - .run_tx_with_name("aggregation_job_creator_fixed_no_param", |tx| { + .run_tx("aggregation_job_creator_fixed_no_param", |tx| { let this = Arc::clone(&self); let task = Arc::clone(&task); let vdaf = Arc::clone(&vdaf); @@ -753,7 +753,7 @@ mod tests { .unwrap(); let helper_report = LeaderStoredReport::new_dummy(*helper_task.id(), report_time); - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let (leader_task, helper_task) = (leader_task.clone(), helper_task.clone()); let (leader_report, helper_report) = (leader_report.clone(), helper_report.clone()); Box::pin(async move { @@ -790,7 +790,7 @@ mod tests { let (leader_aggregations, leader_batches, helper_aggregations, helper_batches) = job_creator .datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (leader_task, helper_task) = (leader_task.clone(), helper_task.clone()); Box::pin(async move { let vdaf = Prio3Count::new_count(2).unwrap(); @@ -876,7 +876,7 @@ mod tests { .map(|report| *report.metadata().id()) .collect(); - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let (task, reports) = (Arc::clone(&task), reports.clone()); Box::pin(async move { tx.put_aggregator_task(&task).await?; @@ -907,7 +907,7 @@ mod tests { // Verify. let (agg_jobs, batches) = job_creator .datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task = task.clone(); Box::pin(async move { Ok( @@ -976,7 +976,7 @@ mod tests { let first_report = LeaderStoredReport::new_dummy(*task.id(), report_time); let second_report = LeaderStoredReport::new_dummy(*task.id(), report_time); - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let (task, first_report) = (Arc::clone(&task), first_report.clone()); Box::pin(async move { tx.put_aggregator_task(&task).await?; @@ -1004,7 +1004,7 @@ mod tests { // Verify -- we haven't received enough reports yet, so we don't create anything. let (agg_jobs, batches) = job_creator .datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task = Arc::clone(&task); Box::pin(async move { Ok( @@ -1026,7 +1026,7 @@ mod tests { // Setup again -- add another report. job_creator .datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let second_report = second_report.clone(); Box::pin(async move { tx.put_client_report(&dummy_vdaf::Vdaf::new(), &second_report) @@ -1045,7 +1045,7 @@ mod tests { // Verify -- the additional report we wrote allows an aggregation job to be created. let (agg_jobs, batches) = job_creator .datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task = Arc::clone(&task); Box::pin(async move { Ok( @@ -1113,7 +1113,7 @@ mod tests { .map(|report| *report.metadata().id()) .collect(); - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let (task, reports) = (Arc::clone(&task), reports.clone()); Box::pin(async move { tx.put_aggregator_task(&task).await?; @@ -1153,7 +1153,7 @@ mod tests { // Verify. let (agg_jobs, batches) = job_creator .datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task = task.clone(); Box::pin(async move { Ok( @@ -1247,7 +1247,7 @@ mod tests { .map(|report| *report.metadata().id()) .collect(); - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let (task, reports) = (task.clone(), reports.clone()); Box::pin(async move { tx.put_aggregator_task(&task).await?; @@ -1279,7 +1279,7 @@ mod tests { let (outstanding_batches, (agg_jobs, batches)) = job_creator .datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task = Arc::clone(&task); Box::pin(async move { Ok(( @@ -1411,7 +1411,7 @@ mod tests { .take(5) .collect(); - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let (task, reports) = (task.clone(), reports.clone()); Box::pin(async move { tx.put_aggregator_task(&task).await?; @@ -1443,7 +1443,7 @@ mod tests { let (outstanding_batches, (agg_jobs, batches)) = job_creator .datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task = Arc::clone(&task); Box::pin(async move { Ok(( @@ -1471,7 +1471,7 @@ mod tests { // Confirm the reports are still available. let report_count = job_creator .datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task = Arc::clone(&task); Box::pin(async move { let report_ids = tx @@ -1530,7 +1530,7 @@ mod tests { .map(|report| *report.metadata().id()) .collect(); - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let (task, reports) = (task.clone(), reports.clone()); Box::pin(async move { tx.put_aggregator_task(&task).await?; @@ -1562,7 +1562,7 @@ mod tests { let (outstanding_batches, (agg_jobs, _batches)) = job_creator .datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task = Arc::clone(&task); Box::pin(async move { Ok(( @@ -1601,7 +1601,7 @@ mod tests { report_ids.insert(*last_report.metadata().id()); job_creator .datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let last_report = last_report.clone(); Box::pin(async move { let vdaf = dummy_vdaf::Vdaf::new(); @@ -1621,7 +1621,7 @@ mod tests { let (outstanding_batches, (agg_jobs, _batches)) = job_creator .datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task = Arc::clone(&task); Box::pin(async move { Ok(( @@ -1716,7 +1716,7 @@ mod tests { .map(|report| *report.metadata().id()) .collect(); - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let (task, reports) = (task.clone(), reports.clone()); Box::pin(async move { tx.put_aggregator_task(&task).await?; @@ -1748,7 +1748,7 @@ mod tests { let (outstanding_batches, (agg_jobs, _batches)) = job_creator .datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task = Arc::clone(&task); Box::pin(async move { Ok(( @@ -1791,7 +1791,7 @@ mod tests { report_ids.extend(new_reports.iter().map(|report| *report.metadata().id())); job_creator .datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let new_reports = new_reports.clone(); Box::pin(async move { let vdaf = dummy_vdaf::Vdaf::new(); @@ -1814,7 +1814,7 @@ mod tests { let (outstanding_batches, (agg_jobs, _batches)) = job_creator .datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task = Arc::clone(&task); Box::pin(async move { Ok(( @@ -1915,7 +1915,7 @@ mod tests { .map(|report| *report.metadata().id()) .collect(); - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let (task, reports) = (task.clone(), reports.clone()); Box::pin(async move { tx.put_aggregator_task(&task).await?; @@ -1953,7 +1953,7 @@ mod tests { let (outstanding_batches_bucket_1, outstanding_batches_bucket_2, (agg_jobs, batches)) = job_creator .datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task = Arc::clone(&task); Box::pin(async move { Ok(( diff --git a/aggregator/src/aggregator/aggregation_job_driver.rs b/aggregator/src/aggregator/aggregation_job_driver.rs index f18461f13..1766e0aa2 100644 --- a/aggregator/src/aggregator/aggregation_job_driver.rs +++ b/aggregator/src/aggregator/aggregation_job_driver.rs @@ -143,7 +143,7 @@ impl AggregationJobDriver { { // Read all information about the aggregation job. let (task, aggregation_job, report_aggregations, client_reports, verify_key) = datastore - .run_tx_with_name("step_aggregation_job_1", |tx| { + .run_tx("step_aggregation_job_1", |tx| { let (lease, vdaf) = (Arc::clone(&lease), Arc::clone(&vdaf)); Box::pin(async move { let task = tx @@ -704,7 +704,7 @@ impl AggregationJobDriver { let accumulator = Arc::new(accumulator); datastore - .run_tx_with_name("step_aggregation_job_2", |tx| { + .run_tx("step_aggregation_job_2", |tx| { let vdaf = Arc::clone(&vdaf); let aggregation_job_writer = Arc::clone(&aggregation_job_writer); let accumulator = Arc::clone(&accumulator); @@ -786,7 +786,7 @@ impl AggregationJobDriver { let vdaf = Arc::new(vdaf); let lease = Arc::new(lease); datastore - .run_tx_with_name("cancel_aggregation_job", |tx| { + .run_tx("cancel_aggregation_job", |tx| { let vdaf = Arc::clone(&vdaf); let lease = Arc::clone(&lease); @@ -855,7 +855,7 @@ impl AggregationJobDriver { let datastore = Arc::clone(&datastore); Box::pin(async move { datastore - .run_tx_with_name("acquire_aggregation_jobs", |tx| { + .run_tx("acquire_aggregation_jobs", |tx| { Box::pin(async move { tx.acquire_incomplete_aggregation_jobs( &lease_duration, @@ -1019,7 +1019,7 @@ mod tests { let aggregation_job_id = random(); let collection_job = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (vdaf, task, report, aggregation_param) = ( vdaf.clone(), leader_task.clone(), @@ -1215,7 +1215,7 @@ mod tests { let want_collection_job = collection_job.with_state(CollectionJobState::Collectable); let (got_aggregation_job, got_report_aggregation, got_batch, got_collection_job) = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let vdaf = Arc::clone(&vdaf); let task = task.clone(); let report_id = *report.metadata().id(); @@ -1323,7 +1323,7 @@ mod tests { let aggregation_job_id = random(); let lease = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (vdaf, task, report, repeated_extension_report) = ( vdaf.clone(), leader_task.clone(), @@ -1545,7 +1545,7 @@ mod tests { got_missing_report_report_aggregation, got_batch, ) = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (vdaf, task, report_id, repeated_extension_report_id) = ( Arc::clone(&vdaf), task.clone(), @@ -1672,7 +1672,7 @@ mod tests { let aggregation_job_id = random(); let lease = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (vdaf, task, report, aggregation_param) = ( vdaf.clone(), leader_task.clone(), @@ -1830,7 +1830,7 @@ mod tests { ); let (got_aggregation_job, got_report_aggregation, got_batch) = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (vdaf, task, report_id) = (Arc::clone(&vdaf), task.clone(), *report.metadata().id()); Box::pin(async move { @@ -1925,7 +1925,7 @@ mod tests { let aggregation_job_id = random(); let lease = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (vdaf, task, report) = (vdaf.clone(), leader_task.clone(), report.clone()); Box::pin(async move { tx.put_aggregator_task(&task).await?; @@ -2089,7 +2089,7 @@ mod tests { ); let (got_aggregation_job, got_report_aggregation, got_batch) = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (vdaf, task, report_id) = (Arc::clone(&vdaf), task.clone(), *report.metadata().id()); Box::pin(async move { @@ -2181,7 +2181,7 @@ mod tests { let aggregation_job_id = random(); let lease = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (vdaf, task, report, aggregation_param) = ( vdaf.clone(), leader_task.clone(), @@ -2339,7 +2339,7 @@ mod tests { ); let (got_aggregation_job, got_report_aggregation, got_batch) = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (vdaf, task, report_id) = (Arc::clone(&vdaf), task.clone(), *report.metadata().id()); Box::pin(async move { @@ -2447,7 +2447,7 @@ mod tests { .unwrap(); let (lease, want_collection_job) = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (vdaf, task, aggregation_param, report, transcript) = ( vdaf.clone(), leader_task.clone(), @@ -2689,7 +2689,7 @@ mod tests { got_other_batch, got_collection_job, ) = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (vdaf, task, report_metadata, aggregation_param, collection_job_id) = ( Arc::clone(&vdaf), leader_task.clone(), @@ -2850,7 +2850,7 @@ mod tests { .unwrap(); let (lease, collection_job) = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (vdaf, task, report, aggregation_param, transcript) = ( vdaf.clone(), leader_task.clone(), @@ -3061,7 +3061,7 @@ mod tests { got_batch, got_collection_job, ) = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (vdaf, task, report_metadata, aggregation_param, collection_job_id) = ( Arc::clone(&vdaf), leader_task.clone(), @@ -3201,7 +3201,7 @@ mod tests { ); let lease = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (vdaf, task, report, aggregation_job, report_aggregation) = ( vdaf.clone(), task.clone(), @@ -3262,7 +3262,7 @@ mod tests { ); let (got_aggregation_job, got_report_aggregation, got_batch, got_leases) = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (vdaf, task, report_id) = (Arc::clone(&vdaf), task.clone(), *report.metadata().id()); Box::pin(async move { @@ -3377,7 +3377,7 @@ mod tests { ); // Set up fixtures in the database. - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let vdaf = vdaf.clone(); let task = leader_task.clone(); let report = report.clone(); @@ -3519,7 +3519,7 @@ mod tests { // Confirm in the database that the job was abandoned. let (got_aggregation_job, got_batch) = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task = task.clone(); Box::pin(async move { let got_aggregation_job = tx diff --git a/aggregator/src/aggregator/collection_job_driver.rs b/aggregator/src/aggregator/collection_job_driver.rs index 024fb896b..c749af740 100644 --- a/aggregator/src/aggregator/collection_job_driver.rs +++ b/aggregator/src/aggregator/collection_job_driver.rs @@ -123,7 +123,7 @@ impl CollectionJobDriver { A::OutputShare: PartialEq + Eq + Send + Sync, { let (task, collection_job, batch_aggregations) = datastore - .run_tx_with_name("step_collection_job_1", |tx| { + .run_tx("step_collection_job_1", |tx| { let vdaf = Arc::clone(&vdaf); let lease = Arc::clone(&lease); let batch_aggregation_shard_count = self.batch_aggregation_shard_count; @@ -251,7 +251,7 @@ impl CollectionJobDriver { ); datastore - .run_tx_with_name("step_collection_job_2", |tx| { + .run_tx("step_collection_job_2", |tx| { let vdaf = Arc::clone(&vdaf); let lease = Arc::clone(&lease); let collection_job = Arc::clone(&collection_job); @@ -355,7 +355,7 @@ impl CollectionJobDriver { { let lease = Arc::new(lease); datastore - .run_tx_with_name("abandon_collection_job", |tx| { + .run_tx("abandon_collection_job", |tx| { let (vdaf, lease) = (Arc::clone(&vdaf), Arc::clone(&lease)); Box::pin(async move { let collection_job = tx @@ -393,7 +393,7 @@ impl CollectionJobDriver { let datastore = Arc::clone(&datastore); Box::pin(async move { datastore - .run_tx_with_name("acquire_collection_jobs", |tx| { + .run_tx("acquire_collection_jobs", |tx| { Box::pin(async move { tx.acquire_incomplete_collection_jobs( &lease_duration, @@ -600,7 +600,7 @@ mod tests { ); let lease = datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (clock, task, collection_job) = (clock.clone(), leader_task.clone(), collection_job.clone()); Box::pin(async move { @@ -737,7 +737,7 @@ mod tests { .unwrap(); let (collection_job_id, lease) = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task = leader_task.clone(); let clock = clock.clone(); Box::pin(async move { @@ -833,7 +833,7 @@ mod tests { }); // Put some batch aggregations in the DB. - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let (clock, task) = (clock.clone(), task.clone()); Box::pin(async move { tx.update_batch_aggregation( @@ -916,7 +916,7 @@ mod tests { mocked_failed_aggregate_share.assert_async().await; // collection job in datastore should be unchanged. - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let task_id = *task.id(); Box::pin(async move { @@ -966,7 +966,7 @@ mod tests { mocked_aggregate_share.assert_async().await; // Should now have recorded helper encrypted aggregate share, too. - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let task_id = *task.id(); let helper_aggregate_share = helper_response.encrypted_aggregate_share().clone(); @@ -1024,7 +1024,7 @@ mod tests { // Verify: check that the collection job was abandoned, and that it can no longer be acquired. let (abandoned_collection_job, leases) = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let collection_job = collection_job.clone(); Box::pin(async move { let abandoned_collection_job = tx @@ -1132,7 +1132,7 @@ mod tests { // Confirm that the collection job was abandoned. let collection_job_after = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let collection_job = collection_job.clone(); Box::pin(async move { tx.get_collection_job::<0, TimeInterval, dummy_vdaf::Vdaf>( @@ -1167,7 +1167,7 @@ mod tests { // Delete the collection job let collection_job = collection_job.with_state(CollectionJobState::Deleted); - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let collection_job = collection_job.clone(); Box::pin(async move { tx.update_collection_job::<0, TimeInterval, dummy_vdaf::Vdaf>(&collection_job) @@ -1205,7 +1205,7 @@ mod tests { mocked_aggregate_share.assert_async().await; // Verify: check that the collection job was abandoned, and that it can no longer be acquired. - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let collection_job = collection_job.clone(); Box::pin(async move { let collection_job = tx diff --git a/aggregator/src/aggregator/collection_job_tests.rs b/aggregator/src/aggregator/collection_job_tests.rs index e078e63d7..befc165a0 100644 --- a/aggregator/src/aggregator/collection_job_tests.rs +++ b/aggregator/src/aggregator/collection_job_tests.rs @@ -185,7 +185,7 @@ async fn setup_fixed_size_current_batch_collection_job_test_case( test_case .datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task = test_case.task.clone(); Box::pin(async move { for batch_id in [batch_id_1, batch_id_2] { @@ -295,7 +295,7 @@ async fn collection_job_success_fixed_size() { // Update the collection job with the aggregate shares. collection job should now be complete. let batch_id = test_case .datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task = test_case.task.clone(); let vdaf = vdaf.clone(); let helper_aggregate_share_bytes = helper_aggregate_share.get_encoded(); @@ -450,7 +450,7 @@ async fn collection_job_put_idempotence_time_interval() { // There should only be a single collection job despite two successful PUTs test_case .datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task_id = *test_case.task.id(); let vdaf = dummy_vdaf::Vdaf::new(); Box::pin(async move { @@ -500,7 +500,7 @@ async fn collection_job_put_idempotence_time_interval_varied_collection_id() { test_case .datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task_id = *test_case.task.id(); let collection_job_ids = collection_job_ids.clone(); @@ -554,7 +554,7 @@ async fn collection_job_put_idempotence_fixed_size_varied_collection_id() { test_case .datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task_id = *test_case.task.id(); let collection_job_ids = collection_job_ids.clone(); @@ -682,7 +682,7 @@ async fn collection_job_put_idempotence_fixed_size_current_batch() { // batch ID after each PUT let batch_id = test_case .datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task_id = *test_case.task.id(); Box::pin(async move { let vdaf = dummy_vdaf::Vdaf::new(); @@ -798,7 +798,7 @@ async fn collection_job_put_idempotence_fixed_size_by_batch_id() { test_case .datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task_id = *test_case.task.id(); Box::pin(async move { tx.put_batch(&Batch::<0, FixedSize, dummy_vdaf::Vdaf>::new( @@ -852,7 +852,7 @@ async fn collection_job_put_idempotence_fixed_size_by_batch_id_mutate_batch_id() test_case .datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task_id = *test_case.task.id(); Box::pin(async move { for batch_id in [first_batch_id, second_batch_id] { @@ -922,7 +922,7 @@ async fn collection_job_put_idempotence_fixed_size_by_batch_id_mutate_aggregatio test_case .datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task_id = *test_case.task.id(); Box::pin(async move { for aggregation_param in [first_aggregation_param, second_aggregation_param] { diff --git a/aggregator/src/aggregator/garbage_collector.rs b/aggregator/src/aggregator/garbage_collector.rs index f3cef5b9a..8d0f9900a 100644 --- a/aggregator/src/aggregator/garbage_collector.rs +++ b/aggregator/src/aggregator/garbage_collector.rs @@ -38,7 +38,7 @@ impl GarbageCollector { // Retrieve tasks. let tasks = self .datastore - .run_tx_with_name("garbage_collector_get_tasks", |tx| { + .run_tx("garbage_collector_get_tasks", |tx| { Box::pin(async move { tx.get_aggregator_tasks().await }) }) .await @@ -57,7 +57,7 @@ impl GarbageCollector { async fn gc_task(&self, task: Arc) -> Result<()> { self.datastore - .run_tx_with_name("garbage_collector", |tx| { + .run_tx("garbage_collector", |tx| { let task = Arc::clone(&task); let report_limit = self.report_limit; let aggregation_limit = self.aggregation_limit; @@ -121,7 +121,7 @@ mod tests { // Setup. let task = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (clock, vdaf) = (clock.clone(), vdaf.clone()); Box::pin(async move { let task = TaskBuilder::new(task::QueryType::TimeInterval, VdafInstance::Fake) @@ -240,7 +240,7 @@ mod tests { clock.set(OLDEST_ALLOWED_REPORT_TIMESTAMP); // Verify. - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let (vdaf, task) = (vdaf.clone(), Arc::clone(&task)); Box::pin(async move { assert!(tx @@ -301,7 +301,7 @@ mod tests { // Setup. let task = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let clock = clock.clone(); Box::pin(async move { let task = TaskBuilder::new(task::QueryType::TimeInterval, VdafInstance::Fake) @@ -427,7 +427,7 @@ mod tests { clock.set(OLDEST_ALLOWED_REPORT_TIMESTAMP); // Verify. - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let (vdaf, task) = (vdaf.clone(), Arc::clone(&task)); Box::pin(async move { assert!(tx @@ -488,7 +488,7 @@ mod tests { // Setup. let task = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (clock, vdaf) = (clock.clone(), vdaf.clone()); Box::pin(async move { let task = TaskBuilder::new( @@ -608,7 +608,7 @@ mod tests { clock.set(OLDEST_ALLOWED_REPORT_TIMESTAMP); // Verify. - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let (vdaf, task) = (vdaf.clone(), Arc::clone(&task)); Box::pin(async move { assert!(tx @@ -674,7 +674,7 @@ mod tests { // Setup. let task = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let clock = clock.clone(); Box::pin(async move { let task = TaskBuilder::new( @@ -807,7 +807,7 @@ mod tests { clock.set(OLDEST_ALLOWED_REPORT_TIMESTAMP); // Verify. - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let (vdaf, task) = (vdaf.clone(), Arc::clone(&task)); Box::pin(async move { assert!(tx diff --git a/aggregator/src/aggregator/http_handlers.rs b/aggregator/src/aggregator/http_handlers.rs index cbc3651f9..eda196189 100644 --- a/aggregator/src/aggregator/http_handlers.rs +++ b/aggregator/src/aggregator/http_handlers.rs @@ -817,7 +817,7 @@ mod tests { // in the database. let first_hpke_keypair = generate_test_hpke_config_and_private_key_with_id(1); datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let keypair = first_hpke_keypair.clone(); Box::pin(async move { tx.put_global_hpke_keypair(&keypair).await?; @@ -861,7 +861,7 @@ mod tests { // Insert an inactive HPKE config. let second_hpke_keypair = generate_test_hpke_config_and_private_key_with_id(2); datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let keypair = second_hpke_keypair.clone(); Box::pin(async move { tx.put_global_hpke_keypair(&keypair).await }) }) @@ -878,7 +878,7 @@ mod tests { // Set key active. datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let keypair = second_hpke_keypair.clone(); Box::pin(async move { tx.set_global_hpke_keypair_state(keypair.config().id(), &HpkeKeyState::Active) @@ -913,7 +913,7 @@ mod tests { // Expire a key. datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let keypair = second_hpke_keypair.clone(); Box::pin(async move { tx.set_global_hpke_keypair_state(keypair.config().id(), &HpkeKeyState::Expired) @@ -933,7 +933,7 @@ mod tests { // Delete a key, no keys left. datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let keypair = first_hpke_keypair.clone(); Box::pin(async move { tx.delete_global_hpke_keypair(keypair.config().id()).await }) }) @@ -952,7 +952,7 @@ mod tests { // in the database. let first_hpke_keypair = generate_test_hpke_config_and_private_key_with_id(1); datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let keypair = first_hpke_keypair.clone(); Box::pin(async move { tx.put_global_hpke_keypair(&keypair).await?; @@ -1651,7 +1651,7 @@ mod tests { let (prepare_init_8, transcript_8) = prep_init_generator.next(&measurement); let (conflicting_aggregation_job, non_conflicting_aggregation_job) = datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task = helper_task.clone(); let report_share_4 = prepare_init_4.report_share().clone(); let report_share_5 = prepare_init_5.report_share().clone(); @@ -1870,7 +1870,7 @@ mod tests { // Check aggregation job in datastore. let aggregation_jobs = datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task = task.clone(); Box::pin(async move { tx.get_aggregation_jobs_for_task::<0, TimeInterval, dummy_vdaf::Vdaf>( @@ -1940,7 +1940,7 @@ mod tests { .into(), ); datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let global_hpke_keypair_same_id = global_hpke_keypair_same_id.clone(); let global_hpke_keypair_different_id = global_hpke_keypair_different_id.clone(); Box::pin(async move { @@ -2197,7 +2197,7 @@ mod tests { // datastore. let client_reports = test_case .datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task_id = *test_case.task.id(); Box::pin(async move { let reports = tx.get_report_metadatas_for_task(&task_id).await.unwrap(); @@ -2452,7 +2452,7 @@ mod tests { ); datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task = helper_task.clone(); let (report_share_0, report_share_1, report_share_2) = ( report_share_0.clone(), @@ -2578,7 +2578,7 @@ mod tests { // Validate datastore. let (aggregation_job, report_aggregations) = datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (vdaf, task) = (Arc::clone(&vdaf), task.clone()); Box::pin(async move { let aggregation_job = tx @@ -2788,7 +2788,7 @@ mod tests { ); datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task = helper_task.clone(); let (report_share_0, report_share_1, report_share_2) = ( report_share_0.clone(), @@ -2918,7 +2918,7 @@ mod tests { // Map the batch aggregation ordinal value to 0, as it may vary due to sharding. let first_batch_got_batch_aggregations: Vec<_> = datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (task, vdaf, report_metadata_0, aggregation_param) = ( helper_task.clone(), vdaf.clone(), @@ -3000,7 +3000,7 @@ mod tests { ); let second_batch_got_batch_aggregations = datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (task, vdaf, report_metadata_2, aggregation_param) = ( helper_task.clone(), vdaf.clone(), @@ -3119,7 +3119,7 @@ mod tests { ); datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task = helper_task.clone(); let (report_share_3, report_share_4, report_share_5) = ( report_share_3.clone(), @@ -3221,7 +3221,7 @@ mod tests { // batch aggregations over the same interval. (the task & aggregation parameter will always // be the same) let merged_first_batch_aggregation = datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (task, vdaf, report_metadata_0, aggregation_param) = ( helper_task.clone(), vdaf.clone(), @@ -3308,7 +3308,7 @@ mod tests { ); let second_batch_got_batch_aggregations = datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (task, vdaf, report_metadata_2, aggregation_param) = ( helper_task.clone(), vdaf.clone(), @@ -3373,7 +3373,7 @@ mod tests { // Setup datastore. datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (task, aggregation_param, report_metadata, transcript) = ( helper_task.clone(), aggregation_param.clone(), @@ -3490,7 +3490,7 @@ mod tests { // Setup datastore. datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (task, aggregation_param, report_metadata, transcript, helper_report_share) = ( helper_task.clone(), aggregation_param.clone(), @@ -3562,7 +3562,7 @@ mod tests { // Check datastore state. let (aggregation_job, report_aggregation) = datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (vdaf, task, report_metadata) = (vdaf.clone(), task.clone(), report_metadata.clone()); Box::pin(async move { @@ -3648,7 +3648,7 @@ mod tests { // Setup datastore. datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (task, aggregation_param, report_metadata, transcript) = ( helper_task.clone(), aggregation_param.clone(), @@ -3769,7 +3769,7 @@ mod tests { // Setup datastore. datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let ( task, aggregation_param, @@ -3917,7 +3917,7 @@ mod tests { // Setup datastore. datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (task, report_metadata) = (helper_task.clone(), report_metadata.clone()); Box::pin(async move { tx.put_aggregator_task(&task).await?; @@ -4305,7 +4305,7 @@ mod tests { test_case .datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task_id = *test_case.task.id(); Box::pin(async move { @@ -4347,7 +4347,7 @@ mod tests { let (got_collection_job, got_batches) = test_case .datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task_id = *test_case.task.id(); Box::pin(async move { @@ -4374,7 +4374,7 @@ mod tests { // job should now be complete. test_case .datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task = test_case.task.clone(); let helper_aggregate_share_bytes = helper_aggregate_share.get_encoded(); Box::pin(async move { @@ -4499,7 +4499,7 @@ mod tests { test_case .datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task = test_case.task.clone(); Box::pin(async move { tx.put_batch_aggregation( @@ -4567,7 +4567,7 @@ mod tests { test_case .datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task = test_case.task.clone(); Box::pin(async move { tx.put_batch_aggregation( @@ -4852,7 +4852,7 @@ mod tests { // Put some batch aggregations in the DB. datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task = helper_task.clone(); Box::pin(async move { for aggregation_param in [ diff --git a/aggregator/src/aggregator/report_writer.rs b/aggregator/src/aggregator/report_writer.rs index 370e9e5c2..0485089d4 100644 --- a/aggregator/src/aggregator/report_writer.rs +++ b/aggregator/src/aggregator/report_writer.rs @@ -123,7 +123,7 @@ impl ReportWriteBatcher { // Run all report writes concurrently. let report_writers = Arc::new(report_writers); let rslts = ds - .run_tx_with_name("upload", |tx| { + .run_tx("upload", |tx| { let report_writers = Arc::clone(&report_writers); Box::pin(async move { Ok(join_all(report_writers.iter().map(|rw| rw.write_report(tx))).await) diff --git a/aggregator/src/aggregator/taskprov_tests.rs b/aggregator/src/aggregator/taskprov_tests.rs index a52fe85e7..60d4c585d 100644 --- a/aggregator/src/aggregator/taskprov_tests.rs +++ b/aggregator/src/aggregator/taskprov_tests.rs @@ -101,7 +101,7 @@ impl TaskprovTestCase { .build(); datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let global_hpke_key = global_hpke_key.clone(); let peer_aggregator = peer_aggregator.clone(); Box::pin(async move { @@ -330,7 +330,7 @@ async fn taskprov_aggregate_init() { let (aggregation_jobs, got_task) = test .datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task_id = test.task_id; Box::pin(async move { Ok(( @@ -760,7 +760,7 @@ async fn taskprov_aggregate_continue() { let (transcript, report_share, aggregation_param) = test.next_report_share(); test.datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task = test.task.clone(); let report_share = report_share.clone(); let transcript = transcript.clone(); @@ -902,7 +902,7 @@ async fn taskprov_aggregate_share() { let (transcript, _, aggregation_param) = test.next_report_share(); let batch_id = random(); test.datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task = test.task.clone(); let interval = Interval::new(Time::from_seconds_since_epoch(6000), *task.time_precision()) diff --git a/aggregator/src/bin/janus_cli.rs b/aggregator/src/bin/janus_cli.rs index ee6c89668..6d4bcace7 100644 --- a/aggregator/src/bin/janus_cli.rs +++ b/aggregator/src/bin/janus_cli.rs @@ -186,7 +186,7 @@ async fn provision_tasks( // Write all tasks requested. info!(task_count = %tasks.len(), "Writing tasks"); let written_tasks = datastore - .run_tx(|tx| { + .run_tx("provision-tasks", |tx| { let tasks = Arc::clone(&tasks); Box::pin(async move { let mut written_tasks = Vec::new(); @@ -599,7 +599,7 @@ mod tests { let want_tasks = task_hashmap_from_slice(tasks); let written_tasks = task_hashmap_from_slice(written_tasks); let got_tasks = task_hashmap_from_slice( - ds.run_tx(|tx| Box::pin(async move { tx.get_aggregator_tasks().await })) + ds.run_unnamed_tx(|tx| Box::pin(async move { tx.get_aggregator_tasks().await })) .await .unwrap(), ); @@ -626,7 +626,7 @@ mod tests { let written_tasks = task_hashmap_from_slice(written_tasks); assert_eq!(want_tasks, written_tasks); let got_tasks = task_hashmap_from_slice( - ds.run_tx(|tx| Box::pin(async move { tx.get_aggregator_tasks().await })) + ds.run_unnamed_tx(|tx| Box::pin(async move { tx.get_aggregator_tasks().await })) .await .unwrap(), ); @@ -692,7 +692,7 @@ mod tests { // Verify that the expected tasks were written. let got_tasks = task_hashmap_from_slice( - ds.run_tx(|tx| Box::pin(async move { tx.get_aggregator_tasks().await })) + ds.run_unnamed_tx(|tx| Box::pin(async move { tx.get_aggregator_tasks().await })) .await .unwrap(), ); @@ -793,7 +793,7 @@ mod tests { // Verify that the expected tasks were written. let got_tasks = ds - .run_tx(|tx| Box::pin(async move { tx.get_aggregator_tasks().await })) + .run_unnamed_tx(|tx| Box::pin(async move { tx.get_aggregator_tasks().await })) .await .unwrap(); diff --git a/aggregator/src/cache.rs b/aggregator/src/cache.rs index c4cd4c5da..a184bbd7a 100644 --- a/aggregator/src/cache.rs +++ b/aggregator/src/cache.rs @@ -82,7 +82,7 @@ impl GlobalHpkeKeypairCache { keypairs: &StdMutex, ) -> Result<(), Error> { let global_keypairs = datastore - .run_tx_with_name("refresh_global_hpke_keypairs_cache", |tx| { + .run_tx("refresh_global_hpke_keypairs_cache", |tx| { Box::pin(async move { tx.get_global_hpke_keypairs().await }) }) .await?; @@ -153,7 +153,7 @@ impl PeerAggregatorCache { pub async fn new(datastore: &Datastore) -> Result { Ok(Self { peers: datastore - .run_tx_with_name("refresh_peer_aggregators_cache", |tx| { + .run_tx("refresh_peer_aggregators_cache", |tx| { Box::pin(async move { tx.get_taskprov_peer_aggregators().await }) }) .await? diff --git a/aggregator_api/src/routes.rs b/aggregator_api/src/routes.rs index b5c72574c..387ef75a1 100644 --- a/aggregator_api/src/routes.rs +++ b/aggregator_api/src/routes.rs @@ -66,7 +66,7 @@ pub(super) async fn get_task_ids( .map_err(|err| Error::BadRequest(format!("Couldn't parse pagination_token: {:?}", err)))?; let task_ids = ds - .run_tx_with_name("get_task_ids", |tx| { + .run_tx("get_task_ids", |tx| { Box::pin(async move { tx.get_task_ids(lower_bound).await }) }) .await?; @@ -182,7 +182,7 @@ pub(super) async fn post_task( .map_err(|err| Error::BadRequest(format!("Error constructing task: {err}")))?, ); - ds.run_tx_with_name("post_task", |tx| { + ds.run_tx("post_task", |tx| { let task = Arc::clone(&task); Box::pin(async move { if let Some(existing_task) = tx.get_aggregator_task(task.id()).await? { @@ -230,7 +230,7 @@ pub(super) async fn get_task( let task_id = conn.task_id_param()?; let task = ds - .run_tx_with_name("get_task", |tx| { + .run_tx("get_task", |tx| { Box::pin(async move { tx.get_aggregator_task(&task_id).await }) }) .await? @@ -247,7 +247,7 @@ pub(super) async fn delete_task( ) -> Result { let task_id = conn.task_id_param()?; match ds - .run_tx_with_name("delete_task", |tx| { + .run_tx("delete_task", |tx| { Box::pin(async move { tx.delete_task(&task_id).await }) }) .await @@ -264,7 +264,7 @@ pub(super) async fn get_task_metrics( let task_id = conn.task_id_param()?; let (reports, report_aggregations) = ds - .run_tx_with_name("get_task_metrics", |tx| { + .run_tx("get_task_metrics", |tx| { Box::pin(async move { tx.get_task_metrics(&task_id).await }) }) .await? @@ -281,7 +281,7 @@ pub(super) async fn get_global_hpke_configs( State(ds): State>>, ) -> Result>, Error> { Ok(Json( - ds.run_tx_with_name("get_global_hpke_configs", |tx| { + ds.run_tx("get_global_hpke_configs", |tx| { Box::pin(async move { tx.get_global_hpke_keypairs().await }) }) .await? @@ -297,7 +297,7 @@ pub(super) async fn get_global_hpke_config( ) -> Result, Error> { let config_id = conn.hpke_config_id_param()?; Ok(Json(GlobalHpkeConfigResp::from( - ds.run_tx_with_name("get_global_hpke_config", |tx| { + ds.run_tx("get_global_hpke_config", |tx| { Box::pin(async move { tx.get_global_hpke_keypair(&config_id).await }) }) .await? @@ -310,7 +310,7 @@ pub(super) async fn put_global_hpke_config( (State(ds), Json(req)): (State>>, Json), ) -> Result<(Status, Json), Error> { let existing_keypairs = ds - .run_tx_with_name("put_global_hpke_config_determine_id", |tx| { + .run_tx("put_global_hpke_config_determine_id", |tx| { Box::pin(async move { tx.get_global_hpke_keypairs().await }) }) .await? @@ -333,7 +333,7 @@ pub(super) async fn put_global_hpke_config( )?; let inserted_keypair = ds - .run_tx_with_name("put_global_hpke_config", |tx| { + .run_tx("put_global_hpke_config", |tx| { let keypair = keypair.clone(); Box::pin(async move { tx.put_global_hpke_keypair(&keypair).await?; @@ -355,7 +355,7 @@ pub(super) async fn patch_global_hpke_config( ) -> Result { let config_id = conn.hpke_config_id_param()?; - ds.run_tx_with_name("patch_hpke_global_keypair", |tx| { + ds.run_tx("patch_hpke_global_keypair", |tx| { Box::pin(async move { tx.set_global_hpke_keypair_state(&config_id, &req.state) .await @@ -372,7 +372,7 @@ pub(super) async fn delete_global_hpke_config( ) -> Result { let config_id = conn.hpke_config_id_param()?; match ds - .run_tx_with_name("delete_global_hpke_config", |tx| { + .run_tx("delete_global_hpke_config", |tx| { Box::pin(async move { tx.delete_global_hpke_keypair(&config_id).await }) }) .await @@ -387,7 +387,7 @@ pub(super) async fn get_taskprov_peer_aggregators( State(ds): State>>, ) -> Result>, Error> { Ok(Json( - ds.run_tx_with_name("get_taskprov_peer_aggregators", |tx| { + ds.run_tx("get_taskprov_peer_aggregators", |tx| { Box::pin(async move { tx.get_taskprov_peer_aggregators().await }) }) .await? @@ -422,7 +422,7 @@ pub(super) async fn post_taskprov_peer_aggregator( ); let inserted = ds - .run_tx_with_name("post_taskprov_peer_aggregator", |tx| { + .run_tx("post_taskprov_peer_aggregator", |tx| { let to_insert = to_insert.clone(); Box::pin(async move { tx.put_taskprov_peer_aggregator(&to_insert).await?; @@ -445,7 +445,7 @@ pub(super) async fn delete_taskprov_peer_aggregator( ), ) -> Result { match ds - .run_tx_with_name("delete_taskprov_peer_aggregator", |tx| { + .run_tx("delete_taskprov_peer_aggregator", |tx| { let req = req.clone(); Box::pin(async move { tx.delete_taskprov_peer_aggregator(&req.endpoint, &req.role) diff --git a/aggregator_api/src/tests.rs b/aggregator_api/src/tests.rs index 0283ea74b..3fc70f1f3 100644 --- a/aggregator_api/src/tests.rs +++ b/aggregator_api/src/tests.rs @@ -98,7 +98,7 @@ async fn get_task_ids() { let (handler, _ephemeral_datastore, ds) = setup_api_test().await; let mut task_ids: Vec<_> = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { Box::pin(async move { let tasks: Vec<_> = iter::repeat_with(|| { TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Fake) @@ -328,7 +328,7 @@ async fn post_task_helper_no_optional_fields() { ); let got_task = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let got_task_resp = got_task_resp.clone(); Box::pin(async move { tx.get_aggregator_task(&got_task_resp.task_id).await }) }) @@ -468,7 +468,7 @@ async fn post_task_idempotence() { ); let got_tasks = ds - .run_tx(|tx| Box::pin(async move { tx.get_aggregator_tasks().await })) + .run_unnamed_tx(|tx| Box::pin(async move { tx.get_aggregator_tasks().await })) .await .unwrap(); @@ -538,7 +538,7 @@ async fn post_task_leader_all_optional_fields() { .unwrap(); let got_task = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let got_task_resp = got_task_resp.clone(); Box::pin(async move { tx.get_aggregator_task(&got_task_resp.task_id).await }) }) @@ -633,7 +633,7 @@ async fn get_task(#[case] role: Role) { .view_for_role(role) .unwrap(); - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let task = task.clone(); Box::pin(async move { tx.put_aggregator_task(&task).await?; @@ -690,7 +690,7 @@ async fn delete_task() { let (handler, _ephemeral_datastore, ds) = setup_api_test().await; let task_id = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { Box::pin(async move { let task = TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Fake) .build() @@ -716,7 +716,7 @@ async fn delete_task() { "", ); - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { Box::pin(async move { assert_eq!(tx.get_aggregator_task(&task_id).await.unwrap(), None); Ok(()) @@ -766,7 +766,7 @@ async fn get_task_metrics() { let (handler, _ephemeral_datastore, ds) = setup_api_test().await; let task_id = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { Box::pin(async move { let task = TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Fake) .build() @@ -894,7 +894,7 @@ async fn get_global_hpke_configs() { HpkeAeadId::Aes128Gcm, ) .unwrap(); - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let keypair1 = keypair1.clone(); let keypair2 = keypair2.clone(); Box::pin(async move { @@ -996,7 +996,7 @@ async fn get_global_hpke_config() { HpkeAeadId::Aes128Gcm, ) .unwrap(); - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let keypair1 = keypair1.clone(); let keypair2 = keypair2.clone(); Box::pin(async move { @@ -1090,7 +1090,7 @@ async fn put_global_hpke_config() { .unwrap(); let (got_key1, got_key2) = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let key1 = key1.config.clone(); let key2 = key2.config.clone(); Box::pin(async move { @@ -1186,7 +1186,7 @@ async fn patch_global_hpke_config() { ); let keypair = generate_test_hpke_config_and_private_key(); - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let keypair = keypair.clone(); Box::pin(async move { tx.put_global_hpke_keypair(&keypair).await }) }) @@ -1203,7 +1203,7 @@ async fn patch_global_hpke_config() { assert_response!(conn, Status::Ok); let got_key = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let keypair = keypair.clone(); Box::pin(async move { tx.get_global_hpke_keypair(keypair.config().id()).await }) }) @@ -1257,7 +1257,7 @@ async fn delete_global_hpke_config() { ); let keypair = generate_test_hpke_config_and_private_key(); - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let keypair = keypair.clone(); Box::pin(async move { tx.put_global_hpke_keypair(&keypair).await }) }) @@ -1273,7 +1273,7 @@ async fn delete_global_hpke_config() { assert_response!(conn, Status::NoContent); assert_eq!( - ds.run_tx(|tx| Box::pin(async move { tx.get_global_hpke_keypairs().await })) + ds.run_unnamed_tx(|tx| Box::pin(async move { tx.get_global_hpke_keypairs().await })) .await .unwrap(), vec![] @@ -1293,7 +1293,7 @@ async fn get_taskprov_peer_aggregator() { .with_role(Role::Helper) .build(); - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let leader = leader.clone(); let helper = helper.clone(); Box::pin(async move { @@ -1398,9 +1398,11 @@ async fn post_taskprov_peer_aggregator() { ); assert_eq!( - ds.run_tx(|tx| { Box::pin(async move { tx.get_taskprov_peer_aggregators().await }) }) - .await - .unwrap(), + ds.run_unnamed_tx(|tx| { + Box::pin(async move { tx.get_taskprov_peer_aggregators().await }) + }) + .await + .unwrap(), vec![leader] ); @@ -1438,7 +1440,7 @@ async fn delete_taskprov_peer_aggregator() { .with_role(Role::Leader) .build(); - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let leader = leader.clone(); Box::pin(async move { tx.put_taskprov_peer_aggregator(&leader).await }) }) @@ -1463,9 +1465,11 @@ async fn delete_taskprov_peer_aggregator() { ); assert_eq!( - ds.run_tx(|tx| { Box::pin(async move { tx.get_taskprov_peer_aggregators().await }) }) - .await - .unwrap(), + ds.run_unnamed_tx(|tx| { + Box::pin(async move { tx.get_taskprov_peer_aggregators().await }) + }) + .await + .unwrap(), vec![] ); diff --git a/aggregator_core/src/datastore.rs b/aggregator_core/src/datastore.rs index fa17adf42..29c681c1e 100644 --- a/aggregator_core/src/datastore.rs +++ b/aggregator_core/src/datastore.rs @@ -143,7 +143,7 @@ impl Datastore { let datastore = Self::new_without_supported_versions(pool, crypter, clock, meter).await; let (current_version, migration_description) = datastore - .run_tx_with_name("check schema version", |tx| { + .run_tx("check schema version", |tx| { Box::pin(async move { tx.get_current_schema_migration_version().await }) }) .await?; @@ -201,20 +201,10 @@ impl Datastore { /// rolling back & retrying with a new transaction, so the given function should support being /// called multiple times. Values read from the transaction should not be considered as /// "finalized" until the transaction is committed, i.e. after `run_tx` is run to completion. - pub fn run_tx<'s, F, T>(&'s self, f: F) -> impl Future> + 's - where - F: 's, - T: 's, - for<'a> F: - Fn(&'a Transaction) -> Pin> + Send + 'a>>, - { - self.run_tx_with_name("default", f) - } - - /// See [`Datastore::run_tx`]. This method additionally allows specifying a name for the - /// transaction, for use in database-related metrics. + /// + /// This method requires a transaction name for use in database metrics. #[tracing::instrument(level = "trace", skip(self, f))] - pub async fn run_tx_with_name(&self, name: &'static str, f: F) -> Result + pub async fn run_tx(&self, name: &'static str, f: F) -> Result where for<'a> F: Fn(&'a Transaction) -> Pin> + Send + 'a>>, @@ -306,11 +296,26 @@ impl Datastore { (rslt, retry.load(Ordering::Relaxed)) } + /// See [`Datastore::run_tx`]. This method provides a placeholder transaction name. It is useful + /// for tests where the transaction name is not important. + #[cfg(feature = "test-util")] + #[cfg_attr(docsrs, doc(cfg(feature = "test-util")))] + #[tracing::instrument(level = "trace", skip(self, f))] + pub fn run_unnamed_tx<'s, F, T>(&'s self, f: F) -> impl Future> + 's + where + F: 's, + T: 's, + for<'a> F: + Fn(&'a Transaction) -> Pin> + Send + 'a>>, + { + self.run_tx("default", f) + } + /// Write a task into the datastore. #[cfg(feature = "test-util")] #[cfg_attr(docsrs, doc(cfg(feature = "test-util")))] pub async fn put_aggregator_task(&self, task: &AggregatorTask) -> Result<(), Error> { - self.run_tx_with_name("test-put-task", |tx| { + self.run_tx("test-put-task", |tx| { let task = task.clone(); Box::pin(async move { tx.put_aggregator_task(&task).await }) }) diff --git a/aggregator_core/src/datastore/tests.rs b/aggregator_core/src/datastore/tests.rs index 384723533..ba282cc29 100644 --- a/aggregator_core/src/datastore/tests.rs +++ b/aggregator_core/src/datastore/tests.rs @@ -151,7 +151,7 @@ async fn roundtrip_task(ephemeral_datastore: EphemeralDatastore) { want_tasks.insert(*task.id(), task.clone()); let err = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task = task.clone(); Box::pin(async move { tx.delete_task(task.id()).await }) }) @@ -160,7 +160,7 @@ async fn roundtrip_task(ephemeral_datastore: EphemeralDatastore) { assert_matches!(err, Error::MutationTargetNotFound); let retrieved_task = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task = task.clone(); Box::pin(async move { tx.get_aggregator_task(task.id()).await }) }) @@ -171,7 +171,7 @@ async fn roundtrip_task(ephemeral_datastore: EphemeralDatastore) { ds.put_aggregator_task(&task).await.unwrap(); let retrieved_task = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task = task.clone(); Box::pin(async move { tx.get_aggregator_task(task.id()).await }) }) @@ -179,7 +179,7 @@ async fn roundtrip_task(ephemeral_datastore: EphemeralDatastore) { .unwrap(); assert_eq!(Some(&task), retrieved_task.as_ref()); - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let task = task.clone(); Box::pin(async move { tx.delete_task(task.id()).await }) }) @@ -187,7 +187,7 @@ async fn roundtrip_task(ephemeral_datastore: EphemeralDatastore) { .unwrap(); let retrieved_task = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task = task.clone(); Box::pin(async move { tx.get_aggregator_task(task.id()).await }) }) @@ -196,7 +196,7 @@ async fn roundtrip_task(ephemeral_datastore: EphemeralDatastore) { assert_eq!(None, retrieved_task); let err = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task = task.clone(); Box::pin(async move { tx.delete_task(task.id()).await }) }) @@ -210,7 +210,7 @@ async fn roundtrip_task(ephemeral_datastore: EphemeralDatastore) { ds.put_aggregator_task(&task).await.unwrap(); let retrieved_task = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task = task.clone(); Box::pin(async move { tx.get_aggregator_task(task.id()).await }) }) @@ -220,7 +220,7 @@ async fn roundtrip_task(ephemeral_datastore: EphemeralDatastore) { } let got_tasks: HashMap = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { Box::pin(async move { tx.check_timestamp_columns("tasks", "test-put-task", false) .await; @@ -251,7 +251,7 @@ async fn put_task_invalid_aggregator_auth_tokens(ephemeral_datastore: EphemeralD ds.put_aggregator_task(&task).await.unwrap(); for (auth_token, auth_token_type) in [("NULL", "'BEARER'"), ("'\\xDEADBEEF'::bytea", "NULL")] { - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { Box::pin(async move { let err = tx .query_one( @@ -290,7 +290,7 @@ async fn put_task_invalid_collector_auth_tokens(ephemeral_datastore: EphemeralDa ds.put_aggregator_task(&task).await.unwrap(); for (auth_token, auth_token_type) in [("NULL", "'BEARER'"), ("'\\xDEADBEEF'::bytea", "NULL")] { - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { Box::pin(async move { let err = tx .query_one( @@ -327,7 +327,7 @@ async fn get_task_metrics(ephemeral_datastore: EphemeralDatastore) { let ds = ephemeral_datastore.datastore(clock.clone()).await; let task_id = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { Box::pin(async move { let task = TaskBuilder::new(task::QueryType::TimeInterval, VdafInstance::Fake) .with_report_expiry_age(Some(REPORT_EXPIRY_AGE)) @@ -496,7 +496,7 @@ async fn get_task_metrics(ephemeral_datastore: EphemeralDatastore) { // Advance the clock to "enable" report expiry. clock.advance(&REPORT_EXPIRY_AGE); - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { Box::pin(async move { // Verify we get the correct results when we check metrics on our target task. assert_eq!( @@ -523,7 +523,7 @@ async fn get_task_ids(ephemeral_datastore: EphemeralDatastore) { install_test_trace_subscriber(); let ds = ephemeral_datastore.datastore(MockClock::default()).await; - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { Box::pin(async move { const TOTAL_TASK_ID_COUNT: usize = 20; let tasks: Vec<_> = iter::repeat_with(|| { @@ -572,7 +572,7 @@ async fn roundtrip_report(ephemeral_datastore: EphemeralDatastore) { .leader_view() .unwrap(); - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let task = task.clone(); Box::pin(async move { tx.put_aggregator_task(&task).await }) }) @@ -599,7 +599,7 @@ async fn roundtrip_report(ephemeral_datastore: EphemeralDatastore) { // Write a report twice to prove it is idempotent for _ in 0..2 { - ds.run_tx_with_name("test-put-client-report", |tx| { + ds.run_tx("test-put-client-report", |tx| { let report = report.clone(); Box::pin(async move { tx.put_client_report(&dummy_vdaf::Vdaf::new(), &report) @@ -610,7 +610,7 @@ async fn roundtrip_report(ephemeral_datastore: EphemeralDatastore) { .unwrap(); let retrieved_report = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task_id = *report.task_id(); Box::pin(async move { tx.get_client_report::<0, dummy_vdaf::Vdaf>( @@ -630,7 +630,7 @@ async fn roundtrip_report(ephemeral_datastore: EphemeralDatastore) { // Try to write a different report with the same ID, and verify we get the expected error. let result = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task_id = *report.task_id(); Box::pin(async move { tx.put_client_report( @@ -658,7 +658,7 @@ async fn roundtrip_report(ephemeral_datastore: EphemeralDatastore) { .await; assert_matches!(result, Err(Error::MutationTargetAlreadyExists)); - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { Box::pin(async move { tx.check_timestamp_columns("client_reports", "test-put-client-report", true) .await; @@ -671,7 +671,7 @@ async fn roundtrip_report(ephemeral_datastore: EphemeralDatastore) { // Advance the clock so that the report is expired, and verify that it does not exist. clock.advance(&Duration::from_seconds(1)); let retrieved_report = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task_id = *report.task_id(); Box::pin(async move { tx.get_client_report::<0, dummy_vdaf::Vdaf>( @@ -694,7 +694,7 @@ async fn report_not_found(ephemeral_datastore: EphemeralDatastore) { let ds = ephemeral_datastore.datastore(MockClock::default()).await; let rslt = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { Box::pin(async move { tx.get_client_report(&dummy_vdaf::Vdaf::new(), &random(), &random()) .await @@ -746,7 +746,7 @@ async fn get_unaggregated_client_report_ids_for_task(ephemeral_datastore: Epheme LeaderStoredReport::new_dummy(*unrelated_task.id(), OLDEST_ALLOWED_REPORT_TIMESTAMP); // Set up state. - ds.run_tx_with_name("test-unaggregated-reports", |tx| { + ds.run_tx("test-unaggregated-reports", |tx| { let task = task.clone(); let unrelated_task = unrelated_task.clone(); let first_unaggregated_report = first_unaggregated_report.clone(); @@ -784,7 +784,7 @@ async fn get_unaggregated_client_report_ids_for_task(ephemeral_datastore: Epheme // Verify that we can acquire both unaggregated reports. let got_reports = HashSet::from_iter( - ds.run_tx_with_name("test-unaggregated-reports", |tx| { + ds.run_tx("test-unaggregated-reports", |tx| { let task = task.clone(); Box::pin(async move { // At this point, first_unaggregated_report and second_unaggregated_report are @@ -818,7 +818,7 @@ async fn get_unaggregated_client_report_ids_for_task(ephemeral_datastore: Epheme // Verify that attempting to acquire again does not return the reports. let got_reports = HashSet::<(ReportId, Time)>::from_iter( - ds.run_tx_with_name("test-unaggregated-reports", |tx| { + ds.run_tx("test-unaggregated-reports", |tx| { let task = task.clone(); Box::pin(async move { // At this point, all reports have started aggregation. @@ -838,7 +838,7 @@ async fn get_unaggregated_client_report_ids_for_task(ephemeral_datastore: Epheme assert!(got_reports.is_empty()); // Mark one report un-aggregated. - ds.run_tx_with_name("test-unaggregated-reports", |tx| { + ds.run_tx("test-unaggregated-reports", |tx| { let (task, first_unaggregated_report) = (task.clone(), first_unaggregated_report.clone()); Box::pin(async move { tx.mark_reports_unaggregated(task.id(), &[*first_unaggregated_report.metadata().id()]) @@ -850,7 +850,7 @@ async fn get_unaggregated_client_report_ids_for_task(ephemeral_datastore: Epheme // Verify that we can retrieve the un-aggregated report again. let got_reports = HashSet::from_iter( - ds.run_tx_with_name("test-unaggregated-reports", |tx| { + ds.run_tx("test-unaggregated-reports", |tx| { let task = task.clone(); Box::pin(async move { // At this point, first_unaggregated_report is unaggregated. @@ -875,7 +875,7 @@ async fn get_unaggregated_client_report_ids_for_task(ephemeral_datastore: Epheme ),]), ); - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let (first_unaggregated_report, second_unaggregated_report) = ( first_unaggregated_report.clone(), second_unaggregated_report.clone(), @@ -968,7 +968,7 @@ async fn count_client_reports_for_interval(ephemeral_datastore: EphemeralDatasto LeaderStoredReport::new_dummy(*unrelated_task.id(), OLDEST_ALLOWED_REPORT_TIMESTAMP); // Set up state. - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let task = task.clone(); let unrelated_task = unrelated_task.clone(); let no_reports_task = no_reports_task.clone(); @@ -1004,7 +1004,7 @@ async fn count_client_reports_for_interval(ephemeral_datastore: EphemeralDatasto clock.advance(&REPORT_EXPIRY_AGE); let (report_count, no_reports_task_report_count) = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (task, no_reports_task) = (task.clone(), no_reports_task.clone()); Box::pin(async move { let report_count = tx @@ -1074,7 +1074,7 @@ async fn count_client_reports_for_batch_id(ephemeral_datastore: EphemeralDatasto // Set up state. let batch_id = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (task, unrelated_task) = (task.clone(), unrelated_task.clone()); Box::pin(async move { @@ -1219,7 +1219,7 @@ async fn count_client_reports_for_batch_id(ephemeral_datastore: EphemeralDatasto clock.advance(&REPORT_EXPIRY_AGE); let report_count = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task_id = *task.id(); Box::pin(async move { tx.count_client_reports_for_batch_id(&task_id, &batch_id) @@ -1254,7 +1254,7 @@ async fn roundtrip_report_share(ephemeral_datastore: EphemeralDatastore) { ), ); - ds.run_tx_with_name("test-put-report-share", |tx| { + ds.run_tx("test-put-report-share", |tx| { let (task, report_share) = (task.clone(), report_share.clone()); Box::pin(async move { tx.put_aggregator_task(&task).await?; @@ -1270,7 +1270,7 @@ async fn roundtrip_report_share(ephemeral_datastore: EphemeralDatastore) { .unwrap(); let (got_task_id, got_extensions, got_leader_input_share, got_helper_input_share) = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let report_share_metadata = report_share.metadata().clone(); Box::pin(async move { let row = tx @@ -1316,7 +1316,7 @@ async fn roundtrip_report_share(ephemeral_datastore: EphemeralDatastore) { assert!(got_helper_input_share.is_none()); // Put the same report share again. This should not cause an error. - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let (task_id, report_share) = (*task.id(), report_share.clone()); Box::pin(async move { tx.put_report_share(&task_id, &report_share).await.unwrap(); @@ -1369,7 +1369,7 @@ async fn roundtrip_aggregation_job(ephemeral_datastore: EphemeralDatastore) { AggregationJobStep::from(0), ); - ds.run_tx_with_name("test-put-aggregation-jobs", |tx| { + ds.run_tx("test-put-aggregation-jobs", |tx| { let (task, leader_aggregation_job, helper_aggregation_job) = ( task.clone(), leader_aggregation_job.clone(), @@ -1397,7 +1397,7 @@ async fn roundtrip_aggregation_job(ephemeral_datastore: EphemeralDatastore) { clock.advance(&REPORT_EXPIRY_AGE); let (got_leader_aggregation_job, got_helper_aggregation_job) = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (leader_aggregation_job, helper_aggregation_job) = ( leader_aggregation_job.clone(), helper_aggregation_job.clone(), @@ -1434,7 +1434,7 @@ async fn roundtrip_aggregation_job(ephemeral_datastore: EphemeralDatastore) { .clone() .with_state(AggregationJobState::Finished); let new_helper_aggregation_job = helper_aggregation_job.with_last_request_hash([3; 32]); - ds.run_tx_with_name("test-update-aggregation-jobs", |tx| { + ds.run_tx("test-update-aggregation-jobs", |tx| { let (new_leader_aggregation_job, new_helper_aggregation_job) = ( new_leader_aggregation_job.clone(), new_helper_aggregation_job.clone(), @@ -1462,7 +1462,7 @@ async fn roundtrip_aggregation_job(ephemeral_datastore: EphemeralDatastore) { .unwrap(); let (got_leader_aggregation_job, got_helper_aggregation_job) = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (new_leader_aggregation_job, new_helper_aggregation_job) = ( new_leader_aggregation_job.clone(), new_helper_aggregation_job.clone(), @@ -1509,7 +1509,7 @@ async fn roundtrip_aggregation_job(ephemeral_datastore: EphemeralDatastore) { AggregationJobState::InProgress, AggregationJobStep::from(0), ); - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let new_leader_aggregation_job = new_leader_aggregation_job.clone(); Box::pin(async move { let error = tx @@ -1528,7 +1528,7 @@ async fn roundtrip_aggregation_job(ephemeral_datastore: EphemeralDatastore) { // returned. clock.advance(&Duration::from_seconds(2)); let (got_leader_aggregation_job, got_helper_aggregation_job) = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (new_leader_aggregation_job, new_helper_aggregation_job) = ( new_leader_aggregation_job.clone(), new_helper_aggregation_job.clone(), @@ -1578,7 +1578,7 @@ async fn aggregation_job_acquire_release(ephemeral_datastore: EphemeralDatastore .collect(); aggregation_job_ids.sort(); - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let (task, aggregation_job_ids) = (task.clone(), aggregation_job_ids.clone()); Box::pin(async move { // Write a few aggregation jobs we expect to be able to retrieve with @@ -1720,7 +1720,7 @@ async fn aggregation_job_acquire_release(ephemeral_datastore: EphemeralDatastore // for a little while to keep this from affecting test outcome. let results = try_join_all( iter::repeat_with(|| { - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { Box::pin(async move { tx.acquire_incomplete_aggregation_jobs( &LEASE_DURATION, @@ -1779,7 +1779,7 @@ async fn aggregation_job_acquire_release(ephemeral_datastore: EphemeralDatastore .map(|lease| (lease.leased().clone(), *lease.lease_expiry_time())) .collect(); jobs_to_release.sort(); - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let leases_to_release = leases_to_release.clone(); Box::pin(async move { for lease in leases_to_release { @@ -1792,7 +1792,7 @@ async fn aggregation_job_acquire_release(ephemeral_datastore: EphemeralDatastore .unwrap(); let mut got_aggregation_jobs: Vec<_> = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { Box::pin(async move { tx.acquire_incomplete_aggregation_jobs(&LEASE_DURATION, MAXIMUM_ACQUIRE_COUNT) .await @@ -1831,7 +1831,7 @@ async fn aggregation_job_acquire_release(ephemeral_datastore: EphemeralDatastore }) .collect(); let mut got_aggregation_jobs: Vec<_> = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { Box::pin(async move { // This time, we just acquire all jobs in a single go for simplicity -- we've // already tested the maximum acquire count functionality above. @@ -1859,7 +1859,7 @@ async fn aggregation_job_acquire_release(ephemeral_datastore: EphemeralDatastore // fails. clock.advance(&Duration::from_seconds(LEASE_DURATION.as_secs())); let lease = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { Box::pin(async move { Ok(tx .acquire_incomplete_aggregation_jobs(&LEASE_DURATION, 1) @@ -1875,7 +1875,7 @@ async fn aggregation_job_acquire_release(ephemeral_datastore: EphemeralDatastore random(), lease.lease_attempts(), ); - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let lease_with_random_token = lease_with_random_token.clone(); Box::pin(async move { tx.release_aggregation_job(&lease_with_random_token).await }) }) @@ -1884,7 +1884,7 @@ async fn aggregation_job_acquire_release(ephemeral_datastore: EphemeralDatastore // Replace the original lease token and verify that we can release successfully with it in // place. - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let lease = lease.clone(); Box::pin(async move { tx.release_aggregation_job(&lease).await }) }) @@ -1899,7 +1899,7 @@ async fn aggregation_job_not_found(ephemeral_datastore: EphemeralDatastore) { let ds = ephemeral_datastore.datastore(MockClock::default()).await; let rslt = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { Box::pin(async move { tx.get_aggregation_job::( &random(), @@ -1913,7 +1913,7 @@ async fn aggregation_job_not_found(ephemeral_datastore: EphemeralDatastore) { assert_eq!(rslt, None); let rslt = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { Box::pin(async move { tx.update_aggregation_job::( &AggregationJob::new( @@ -1988,7 +1988,7 @@ async fn get_aggregation_jobs_for_task(ephemeral_datastore: EphemeralDatastore) aggregation_job_with_request_hash, ]); - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let (task, want_agg_jobs) = (task.clone(), want_agg_jobs.clone()); Box::pin(async move { tx.put_aggregator_task(&task).await?; @@ -2029,7 +2029,7 @@ async fn get_aggregation_jobs_for_task(ephemeral_datastore: EphemeralDatastore) // Run. want_agg_jobs.sort_by_key(|agg_job| *agg_job.id()); let mut got_agg_jobs = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task = task.clone(); Box::pin(async move { tx.get_aggregation_jobs_for_task(task.id()).await }) }) @@ -2109,7 +2109,7 @@ async fn roundtrip_report_aggregation(ephemeral_datastore: EphemeralDatastore) { let report_id = random(); let want_report_aggregation = ds - .run_tx_with_name("test-put-report-aggregations", |tx| { + .run_tx("test-put-report-aggregations", |tx| { let (task, state, aggregation_param) = (task.clone(), state.clone(), aggregation_param.clone()); Box::pin(async move { @@ -2186,7 +2186,7 @@ async fn roundtrip_report_aggregation(ephemeral_datastore: EphemeralDatastore) { clock.advance(&REPORT_EXPIRY_AGE); let got_report_aggregation = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (vdaf, task, aggregation_param) = (Arc::clone(&vdaf), task.clone(), aggregation_param.clone()); Box::pin(async move { @@ -2225,7 +2225,7 @@ async fn roundtrip_report_aggregation(ephemeral_datastore: EphemeralDatastore) { want_report_aggregation.state().clone(), ); - ds.run_tx_with_name("test-update-report-aggregation", |tx| { + ds.run_tx("test-update-report-aggregation", |tx| { let want_report_aggregation = want_report_aggregation.clone(); Box::pin(async move { tx.update_report_aggregation(&want_report_aggregation) @@ -2253,7 +2253,7 @@ async fn roundtrip_report_aggregation(ephemeral_datastore: EphemeralDatastore) { .unwrap(); let got_report_aggregation = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (vdaf, task, aggregation_param) = (Arc::clone(&vdaf), task.clone(), aggregation_param.clone()); Box::pin(async move { @@ -2276,7 +2276,7 @@ async fn roundtrip_report_aggregation(ephemeral_datastore: EphemeralDatastore) { clock.advance(&REPORT_EXPIRY_AGE); let got_report_aggregation = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (vdaf, task, aggregation_param) = (Arc::clone(&vdaf), task.clone(), aggregation_param.clone()); Box::pin(async move { @@ -2316,7 +2316,7 @@ async fn check_other_report_aggregation_exists(ephemeral_datastore: EphemeralDat let aggregation_job_id = random(); let report_id = random(); - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let task_id = *task.id(); Box::pin(async move { tx.put_aggregation_job(&AggregationJob::<0, TimeInterval, dummy_vdaf::Vdaf>::new( @@ -2362,7 +2362,7 @@ async fn check_other_report_aggregation_exists(ephemeral_datastore: EphemeralDat // Advance the clock to "enable" report expiry. clock.advance(&REPORT_EXPIRY_AGE); - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let task_id = *task.id(); Box::pin(async move { assert!(tx @@ -2428,7 +2428,7 @@ async fn check_other_report_aggregation_exists(ephemeral_datastore: EphemeralDat // Advance the clock again to expire all relevant datastore items. clock.advance(&REPORT_EXPIRY_AGE); - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let task_id = *task.id(); Box::pin(async move { assert!(!tx @@ -2457,7 +2457,7 @@ async fn report_aggregation_not_found(ephemeral_datastore: EphemeralDatastore) { let vdaf = Arc::new(dummy_vdaf::Vdaf::default()); let rslt = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let vdaf = Arc::clone(&vdaf); Box::pin(async move { tx.get_report_aggregation( @@ -2476,7 +2476,7 @@ async fn report_aggregation_not_found(ephemeral_datastore: EphemeralDatastore) { assert_eq!(rslt, None); let rslt = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { Box::pin(async move { tx.update_report_aggregation::<0, dummy_vdaf::Vdaf>(&ReportAggregation::new( random(), @@ -2528,7 +2528,7 @@ async fn get_report_aggregations_for_aggregation_job(ephemeral_datastore: Epheme let aggregation_job_id = random(); let want_report_aggregations = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (task, vdaf_transcript, aggregation_param) = ( task.clone(), vdaf_transcript.clone(), @@ -2608,7 +2608,7 @@ async fn get_report_aggregations_for_aggregation_job(ephemeral_datastore: Epheme clock.advance(&REPORT_EXPIRY_AGE); let got_report_aggregations = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (vdaf, task) = (Arc::clone(&vdaf), task.clone()); Box::pin(async move { tx.get_report_aggregations_for_aggregation_job( @@ -2628,7 +2628,7 @@ async fn get_report_aggregations_for_aggregation_job(ephemeral_datastore: Epheme clock.advance(&REPORT_EXPIRY_AGE); let got_report_aggregations = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let (vdaf, task) = (Arc::clone(&vdaf), task.clone()); Box::pin(async move { tx.get_report_aggregations_for_aggregation_job( @@ -2709,7 +2709,7 @@ async fn get_collection_job(ephemeral_datastore: EphemeralDatastore) { let aggregation_param = AggregationParam(13); let (first_collection_job, second_collection_job) = ds - .run_tx_with_name("test-put-collection-job", |tx| { + .run_tx("test-put-collection-job", |tx| { let task = task.clone(); Box::pin(async move { tx.put_aggregator_task(&task).await.unwrap(); @@ -2746,7 +2746,7 @@ async fn get_collection_job(ephemeral_datastore: EphemeralDatastore) { // Advance the clock to "enable" report expiry. clock.advance(&REPORT_EXPIRY_AGE); - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let task = task.clone(); let first_collection_job = first_collection_job.clone(); let second_collection_job = second_collection_job.clone(); @@ -2819,7 +2819,7 @@ async fn get_collection_job(ephemeral_datastore: EphemeralDatastore) { // Advance the clock again to expire everything that has been written. clock.advance(&REPORT_EXPIRY_AGE); - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let task = task.clone(); let first_collection_job = first_collection_job.clone(); let second_collection_job = second_collection_job.clone(); @@ -2877,7 +2877,7 @@ async fn update_collection_jobs(ephemeral_datastore: EphemeralDatastore) { ) .unwrap(); - ds.run_tx_with_name("test-update-collection-jobs", |tx| { + ds.run_tx("test-update-collection-jobs", |tx| { let task = task.clone(); Box::pin(async move { tx.put_aggregator_task(&task).await?; @@ -3095,7 +3095,7 @@ async fn setup_collection_job_acquire_test_case( ds: &Datastore, test_case: CollectionJobAcquireTestCase, ) -> CollectionJobAcquireTestCase { - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let mut test_case = test_case.clone(); Box::pin(async move { for task_id in &test_case.task_ids { @@ -3174,7 +3174,7 @@ async fn run_collection_job_acquire_test_case( let test_case = setup_collection_job_acquire_test_case(ds, test_case).await; let clock = &ds.clock; - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let test_case = test_case.clone(); let clock = clock.clone(); Box::pin(async move { @@ -3279,7 +3279,7 @@ async fn time_interval_collection_job_acquire_release_happy_path( .await; let reacquired_jobs = ds - .run_tx_with_name("test-acquire-leases", |tx| { + .run_tx("test-acquire-leases", |tx| { let collection_job_leases = collection_job_leases.clone(); Box::pin(async move { // Try to re-acquire collection jobs. Nothing should happen because the lease is still @@ -3324,7 +3324,7 @@ async fn time_interval_collection_job_acquire_release_happy_path( // Advance time by the lease duration clock.advance(&Duration::from_seconds(100)); - ds.run_tx_with_name("test-reacquire-leases", |tx| { + ds.run_tx("test-reacquire-leases", |tx| { let reacquired_jobs = reacquired_jobs.clone(); Box::pin(async move { // Re-acquire the jobs whose lease should have lapsed. @@ -3420,7 +3420,7 @@ async fn fixed_size_collection_job_acquire_release_happy_path( .await; let reacquired_jobs = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let collection_job_leases = collection_job_leases.clone(); Box::pin(async move { // Try to re-acquire collection jobs. Nothing should happen because the lease is still @@ -3462,7 +3462,7 @@ async fn fixed_size_collection_job_acquire_release_happy_path( // Advance time by the lease duration clock.advance(&Duration::from_seconds(100)); - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let reacquired_jobs = reacquired_jobs.clone(); Box::pin(async move { // Re-acquire the jobs whose lease should have lapsed. @@ -3903,7 +3903,7 @@ async fn collection_job_acquire_job_max(ephemeral_datastore: EphemeralDatastore) ) .await; - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let test_case = test_case.clone(); let clock = clock.clone(); Box::pin(async move { @@ -4074,7 +4074,7 @@ async fn collection_job_acquire_state_filtering(ephemeral_datastore: EphemeralDa ) .await; - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { Box::pin(async move { // No collection jobs should be acquired because none of them are in the START state let acquired_collection_jobs = tx @@ -4112,7 +4112,7 @@ async fn roundtrip_batch_aggregation_time_interval(ephemeral_datastore: Ephemera let aggregation_param = AggregationParam(12); let (first_batch_aggregation, second_batch_aggregation, third_batch_aggregation) = ds - .run_tx_with_name("test-put-batch-aggregations", |tx| { + .run_tx("test-put-batch-aggregations", |tx| { let task = task.clone(); let other_task = other_task.clone(); @@ -4300,7 +4300,7 @@ async fn roundtrip_batch_aggregation_time_interval(ephemeral_datastore: Ephemera // Advance the clock to "enable" report expiry. clock.advance(&REPORT_EXPIRY_AGE); - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let task = task.clone(); let first_batch_aggregation = first_batch_aggregation.clone(); let second_batch_aggregation = second_batch_aggregation.clone(); @@ -4393,7 +4393,7 @@ async fn roundtrip_batch_aggregation_time_interval(ephemeral_datastore: Ephemera // Advance the clock again to expire all written entities. clock.advance(&REPORT_EXPIRY_AGE); - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let task = task.clone(); Box::pin(async move { let vdaf = dummy_vdaf::Vdaf::new(); @@ -4448,7 +4448,7 @@ async fn roundtrip_batch_aggregation_fixed_size(ephemeral_datastore: EphemeralDa let aggregate_share = AggregateShare(23); let aggregation_param = AggregationParam(12); let batch_aggregation = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let task = task.clone(); Box::pin(async move { let other_task = TaskBuilder::new( @@ -4574,7 +4574,7 @@ async fn roundtrip_batch_aggregation_fixed_size(ephemeral_datastore: EphemeralDa // Advance the clock to "enable" report expiry. clock.advance(&REPORT_EXPIRY_AGE); - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let task = task.clone(); let batch_aggregation = batch_aggregation.clone(); Box::pin(async move { @@ -4623,7 +4623,7 @@ async fn roundtrip_batch_aggregation_fixed_size(ephemeral_datastore: EphemeralDa // Advance the clock again to expire all written entities. clock.advance(&REPORT_EXPIRY_AGE); - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let task = task.clone(); Box::pin(async move { let vdaf = dummy_vdaf::Vdaf::new(); @@ -4655,7 +4655,7 @@ async fn roundtrip_aggregate_share_job_time_interval(ephemeral_datastore: Epheme let ds = ephemeral_datastore.datastore(clock.clone()).await; let aggregate_share_job = ds - .run_tx_with_name("test-roundtrip-aggregate-share-job", |tx| { + .run_tx("test-roundtrip-aggregate-share-job", |tx| { Box::pin(async move { let task = TaskBuilder::new(task::QueryType::TimeInterval, VdafInstance::Fake) .with_report_expiry_age(Some(REPORT_EXPIRY_AGE)) @@ -4709,7 +4709,7 @@ async fn roundtrip_aggregate_share_job_time_interval(ephemeral_datastore: Epheme // Advance the clock to "enable" report expiry. clock.advance(&REPORT_EXPIRY_AGE); - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let want_aggregate_share_job = aggregate_share_job.clone(); Box::pin(async move { let vdaf = dummy_vdaf::Vdaf::new(); @@ -4779,7 +4779,7 @@ async fn roundtrip_aggregate_share_job_time_interval(ephemeral_datastore: Epheme // Advance the clock to expire all written entities. clock.advance(&REPORT_EXPIRY_AGE); - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let want_aggregate_share_job = aggregate_share_job.clone(); Box::pin(async move { let vdaf = dummy_vdaf::Vdaf::new(); @@ -4841,7 +4841,7 @@ async fn roundtrip_aggregate_share_job_fixed_size(ephemeral_datastore: Ephemeral let ds = ephemeral_datastore.datastore(clock.clone()).await; let aggregate_share_job = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { Box::pin(async move { let task = TaskBuilder::new( task::QueryType::FixedSize { @@ -4891,7 +4891,7 @@ async fn roundtrip_aggregate_share_job_fixed_size(ephemeral_datastore: Ephemeral // Advance the clock to "enable" report expiry. clock.advance(&REPORT_EXPIRY_AGE); - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let want_aggregate_share_job = aggregate_share_job.clone(); Box::pin(async move { let vdaf = dummy_vdaf::Vdaf::new(); @@ -4939,7 +4939,7 @@ async fn roundtrip_aggregate_share_job_fixed_size(ephemeral_datastore: Ephemeral // Advance the clock to expire all written entities. clock.advance(&REPORT_EXPIRY_AGE); - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let want_aggregate_share_job = aggregate_share_job.clone(); Box::pin(async move { let vdaf = dummy_vdaf::Vdaf::new(); @@ -4989,7 +4989,7 @@ async fn roundtrip_outstanding_batch(ephemeral_datastore: EphemeralDatastore) { .unwrap(); let (task_id_1, batch_id_1, task_id_2, batch_id_2) = ds - .run_tx_with_name("test-put-outstanding-batches", |tx| { + .run_tx("test-put-outstanding-batches", |tx| { let clock = clock.clone(); Box::pin(async move { let task_1 = TaskBuilder::new( @@ -5243,7 +5243,7 @@ async fn roundtrip_outstanding_batch(ephemeral_datastore: EphemeralDatastore) { outstanding_batches_task_2, outstanding_batches_empty_time_bucket, ) = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { Box::pin(async move { let outstanding_batches_task_1 = tx.get_outstanding_batches(&task_id_1, &None).await?; @@ -5297,7 +5297,9 @@ async fn roundtrip_outstanding_batch(ephemeral_datastore: EphemeralDatastore) { // Verify that the batch is no longer available. let outstanding_batches = ds - .run_tx(|tx| Box::pin(async move { tx.get_outstanding_batches(&task_id_1, &None).await })) + .run_unnamed_tx(|tx| { + Box::pin(async move { tx.get_outstanding_batches(&task_id_1, &None).await }) + }) .await .unwrap(); assert!(outstanding_batches.is_empty()); @@ -5306,14 +5308,16 @@ async fn roundtrip_outstanding_batch(ephemeral_datastore: EphemeralDatastore) { clock.set(OLDEST_ALLOWED_REPORT_TIMESTAMP); // Delete the outstanding batch, then check that it is no longer available. - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { Box::pin(async move { tx.delete_outstanding_batch(&task_id_1, &batch_id_1).await }) }) .await .unwrap(); let outstanding_batches = ds - .run_tx(|tx| Box::pin(async move { tx.get_outstanding_batches(&task_id_1, &None).await })) + .run_unnamed_tx(|tx| { + Box::pin(async move { tx.get_outstanding_batches(&task_id_1, &None).await }) + }) .await .unwrap(); assert!(outstanding_batches.is_empty()); @@ -5336,7 +5340,7 @@ async fn roundtrip_batch(ephemeral_datastore: EphemeralDatastore) { Interval::new(OLDEST_ALLOWED_REPORT_TIMESTAMP, Duration::from_seconds(1)).unwrap(), ); - ds.run_tx_with_name("test-put-batch", |tx| { + ds.run_tx("test-put-batch", |tx| { let want_batch = want_batch.clone(); Box::pin(async move { tx.put_aggregator_task( @@ -5368,7 +5372,7 @@ async fn roundtrip_batch(ephemeral_datastore: EphemeralDatastore) { // Advance the clock to "enable" report expiry. clock.advance(&REPORT_EXPIRY_AGE); - ds.run_tx_with_name("test-update-batch", |tx| { + ds.run_tx("test-update-batch", |tx| { let want_batch = want_batch.clone(); Box::pin(async move { // Try reading the batch back, and verify that modifying any of the primary key @@ -5445,7 +5449,7 @@ async fn roundtrip_batch(ephemeral_datastore: EphemeralDatastore) { // Advance the clock to expire the batch. clock.advance(&REPORT_EXPIRY_AGE); - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { let want_batch = want_batch.clone(); Box::pin(async move { // Try reading the batch back, and verify it is expired. @@ -5481,7 +5485,7 @@ async fn delete_expired_client_reports(ephemeral_datastore: EphemeralDatastore) .difference(&OLDEST_ALLOWED_REPORT_TIMESTAMP) .unwrap(); let (task_id, new_report_id, other_task_id, other_task_report_id) = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { Box::pin(async move { let task = TaskBuilder::new(task::QueryType::TimeInterval, VdafInstance::Fake) .with_report_expiry_age(Some(report_expiry_age)) @@ -5529,7 +5533,7 @@ async fn delete_expired_client_reports(ephemeral_datastore: EphemeralDatastore) .unwrap(); // Run. - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { Box::pin(async move { tx.delete_expired_client_reports(&task_id, u64::try_from(i64::MAX)?) .await @@ -5541,7 +5545,7 @@ async fn delete_expired_client_reports(ephemeral_datastore: EphemeralDatastore) // Verify. let want_report_ids = HashSet::from([new_report_id, other_task_report_id]); let got_report_ids = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let vdaf = vdaf.clone(); Box::pin(async move { let task_client_reports = tx.get_client_reports_for_task(&vdaf, &task_id).await?; @@ -5647,7 +5651,7 @@ async fn delete_expired_aggregation_artifacts(ephemeral_datastore: EphemeralData want_aggregation_job_ids, want_report_ids, ) = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { Box::pin(async move { let leader_time_interval_task = TaskBuilder::new(task::QueryType::TimeInterval, VdafInstance::Fake) @@ -5908,7 +5912,7 @@ async fn delete_expired_aggregation_artifacts(ephemeral_datastore: EphemeralData clock.advance(&REPORT_EXPIRY_AGE); // Run. - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { Box::pin(async move { tx.delete_expired_aggregation_artifacts( &leader_time_interval_task_id, @@ -5938,7 +5942,7 @@ async fn delete_expired_aggregation_artifacts(ephemeral_datastore: EphemeralData // Verify. let (got_aggregation_job_ids, got_report_ids) = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let vdaf = vdaf.clone(); Box::pin(async move { let leader_time_interval_aggregation_job_ids = tx @@ -6165,7 +6169,7 @@ async fn delete_expired_collection_artifacts(ephemeral_datastore: EphemeralDatas want_batch_aggregation_ids, time_bucket_starts, ) = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { Box::pin(async move { let leader_time_interval_task = TaskBuilder::new(task::QueryType::TimeInterval, VdafInstance::Fake) @@ -6628,7 +6632,7 @@ async fn delete_expired_collection_artifacts(ephemeral_datastore: EphemeralDatas clock.advance(&REPORT_EXPIRY_AGE); // Run. - ds.run_tx(|tx| { + ds.run_unnamed_tx(|tx| { Box::pin(async move { tx.delete_expired_collection_artifacts( &leader_time_interval_task_id, @@ -6677,7 +6681,7 @@ async fn delete_expired_collection_artifacts(ephemeral_datastore: EphemeralDatas got_outstanding_batch_ids, got_batch_aggregation_ids, ) = ds - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let time_bucket_starts = time_bucket_starts.clone(); Box::pin(async move { let vdaf = dummy_vdaf::Vdaf::new(); @@ -7000,7 +7004,7 @@ async fn roundtrip_interval_sql(ephemeral_datastore: EphemeralDatastore) { let datastore = ephemeral_datastore.datastore(MockClock::default()).await; datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { Box::pin(async move { let interval = tx .query_one( @@ -7112,7 +7116,7 @@ async fn roundtrip_global_hpke_keypair(ephemeral_datastore: EphemeralDatastore) let keypair = generate_test_hpke_config_and_private_key(); datastore - .run_tx_with_name("test-put-keys", |tx| { + .run_tx("test-put-keys", |tx| { let keypair = keypair.clone(); let clock = clock.clone(); Box::pin(async move { @@ -7162,7 +7166,7 @@ async fn roundtrip_global_hpke_keypair(ephemeral_datastore: EphemeralDatastore) // Should not be able to set keypair with the same id. assert_matches!( datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let keypair = keypair.clone(); Box::pin(async move { tx.put_global_hpke_keypair(&keypair).await }) }) @@ -7171,7 +7175,7 @@ async fn roundtrip_global_hpke_keypair(ephemeral_datastore: EphemeralDatastore) ); datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let keypair = keypair.clone(); Box::pin(async move { tx.delete_global_hpke_keypair(keypair.config().id()).await?; @@ -7212,7 +7216,7 @@ async fn roundtrip_taskprov_peer_aggregator(ephemeral_datastore: EphemeralDatast .build(); datastore - .run_tx_with_name("test-put-peer-aggregator", |tx| { + .run_tx("test-put-peer-aggregator", |tx| { let example_leader_peer_aggregator = example_leader_peer_aggregator.clone(); let example_helper_peer_aggregator = example_helper_peer_aggregator.clone(); let another_example_leader_peer_aggregator = @@ -7236,7 +7240,7 @@ async fn roundtrip_taskprov_peer_aggregator(ephemeral_datastore: EphemeralDatast // Should not be able to put an aggregator with the same endpoint and role. assert_matches!( datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { Box::pin(async move { let colliding_peer_aggregator = PeerAggregatorBuilder::new().build(); tx.put_taskprov_peer_aggregator(&colliding_peer_aggregator) @@ -7248,7 +7252,7 @@ async fn roundtrip_taskprov_peer_aggregator(ephemeral_datastore: EphemeralDatast ); datastore - .run_tx(|tx| { + .run_unnamed_tx(|tx| { let example_leader_peer_aggregator = example_leader_peer_aggregator.clone(); let example_helper_peer_aggregator = example_helper_peer_aggregator.clone(); let another_example_leader_peer_aggregator = diff --git a/interop_binaries/src/bin/janus_interop_aggregator.rs b/interop_binaries/src/bin/janus_interop_aggregator.rs index 1e683de2f..62cc30947 100644 --- a/interop_binaries/src/bin/janus_interop_aggregator.rs +++ b/interop_binaries/src/bin/janus_interop_aggregator.rs @@ -121,7 +121,7 @@ async fn handle_add_task( .context("error constructing task")?; datastore - .run_tx(move |tx| { + .run_unnamed_tx(move |tx| { let task = task.clone(); Box::pin(async move { tx.put_aggregator_task(&task).await }) })