Skip to content

Commit

Permalink
Ensure all production code provides transaction names (#2114)
Browse files Browse the repository at this point in the history
  • Loading branch information
inahga authored Oct 11, 2023
1 parent 5683bc7 commit f1dc65f
Show file tree
Hide file tree
Showing 17 changed files with 320 additions and 307 deletions.
28 changes: 14 additions & 14 deletions aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ impl<C: Clock> Aggregator<C> {
// Slow path: retrieve task, create a task aggregator, store it to the cache, then return it.
match self
.datastore
.run_tx_with_name("task_aggregator_get_task", |tx| {
.run_tx("task_aggregator_get_task", |tx| {
let task_id = *task_id;
Box::pin(async move { tx.get_aggregator_task(&task_id).await })
})
Expand Down Expand Up @@ -689,7 +689,7 @@ impl<C: Clock> Aggregator<C> {
)
.map_err(|err| Error::InvalidTask(*task_id, OptOutReason::TaskParameters(err)))?;
self.datastore
.run_tx_with_name("taskprov_put_task", |tx| {
.run_tx("taskprov_put_task", |tx| {
let task = task.clone();
Box::pin(async move { tx.put_aggregator_task(&task).await })
})
Expand Down Expand Up @@ -1883,7 +1883,7 @@ impl VdafOps {
let accumulator = Arc::new(accumulator);

Ok(datastore
.run_tx_with_name("aggregate_init", |tx| {
.run_tx("aggregate_init", |tx| {
let vdaf = vdaf.clone();
let task = Arc::clone(&task);
let req = Arc::clone(&req);
Expand Down Expand Up @@ -2074,7 +2074,7 @@ impl VdafOps {
// TODO(#224): don't hold DB transaction open while computing VDAF updates?
// TODO(#224): don't do O(n) network round-trips (where n is the number of prepare steps)
Ok(datastore
.run_tx_with_name("aggregate_continue", |tx| {
.run_tx("aggregate_continue", |tx| {
let (
vdaf,
aggregate_step_failure_counter,
Expand Down Expand Up @@ -2233,7 +2233,7 @@ impl VdafOps {
)?);

Ok(datastore
.run_tx_with_name("collect", move |tx| {
.run_tx("collect", move |tx| {
let (task, vdaf, collection_job_id, req, aggregation_param) = (
Arc::clone(&task),
Arc::clone(&vdaf),
Expand Down Expand Up @@ -2528,7 +2528,7 @@ impl VdafOps {
A::AggregateShare: Send + Sync,
{
let (collection_job, spanned_interval) = datastore
.run_tx_with_name("get_collection_job", |tx| {
.run_tx("get_collection_job", |tx| {
let (task, vdaf, collection_job_id) =
(Arc::clone(&task), Arc::clone(&vdaf), *collection_job_id);
Box::pin(async move {
Expand Down Expand Up @@ -2703,7 +2703,7 @@ impl VdafOps {
A::AggregateShare: Send + Sync + PartialEq + Eq,
{
datastore
.run_tx_with_name("delete_collection_job", move |tx| {
.run_tx("delete_collection_job", move |tx| {
let (task, vdaf, collection_job_id) =
(Arc::clone(&task), Arc::clone(&vdaf), *collection_job_id);
Box::pin(async move {
Expand Down Expand Up @@ -2828,7 +2828,7 @@ impl VdafOps {
}

let aggregate_share_job = datastore
.run_tx_with_name("aggregate_share", |tx| {
.run_tx("aggregate_share", |tx| {
let (task, vdaf, aggregate_share_req) = (
Arc::clone(&task),
Arc::clone(&vdaf),
Expand Down Expand Up @@ -3256,7 +3256,7 @@ mod tests {
.unwrap();

let got_report = datastore
.run_tx(|tx| {
.run_unnamed_tx(|tx| {
let (vdaf, task_id, report_id) =
(vdaf.clone(), *task.id(), *report.metadata().id());
Box::pin(async move { tx.get_client_report(&vdaf, &task_id, &report_id).await })
Expand Down Expand Up @@ -3321,7 +3321,7 @@ mod tests {
.unwrap();

let got_report_ids = datastore
.run_tx(|tx| {
.run_unnamed_tx(|tx| {
let vdaf = vdaf.clone();
let task = task.clone();
Box::pin(async move { tx.get_client_reports_for_task(&vdaf, task.id()).await })
Expand Down Expand Up @@ -3386,7 +3386,7 @@ mod tests {
.unwrap();

let got_report = datastore
.run_tx(|tx| {
.run_unnamed_tx(|tx| {
let (vdaf, task_id, report_id) =
(vdaf.clone(), *task.id(), *report.metadata().id());
Box::pin(async move { tx.get_client_report(&vdaf, &task_id, &report_id).await })
Expand Down Expand Up @@ -3445,7 +3445,7 @@ mod tests {
)
.unwrap();
datastore
.run_tx(|tx| {
.run_unnamed_tx(|tx| {
let task = task.clone();
Box::pin(async move {
tx.put_collection_job(&CollectionJob::<
Expand Down Expand Up @@ -3501,7 +3501,7 @@ mod tests {
);

datastore
.run_tx(|tx| {
.run_unnamed_tx(|tx| {
let global_hpke_keypair_same_id = global_hpke_keypair_same_id.clone();
let global_hpke_keypair_different_id = global_hpke_keypair_different_id.clone();
Box::pin(async move {
Expand Down Expand Up @@ -3538,7 +3538,7 @@ mod tests {
.unwrap();

let got_report = datastore
.run_tx(|tx| {
.run_unnamed_tx(|tx| {
let (vdaf, task_id, report_id) =
(vdaf.clone(), *task.id(), *report.metadata().id());
Box::pin(async move { tx.get_client_report(&vdaf, &task_id, &report_id).await })
Expand Down
8 changes: 4 additions & 4 deletions aggregator/src/aggregator/aggregation_job_continue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ mod tests {
prepare_init_generator.next(&IdpfInput::from_bools(&[true]));

datastore
.run_tx(|tx| {
.run_unnamed_tx(|tx| {
let (task, aggregation_param, prepare_init, transcript) = (
helper_task.clone(),
aggregation_param.clone(),
Expand Down Expand Up @@ -630,7 +630,7 @@ mod tests {

let (before_aggregation_job, before_report_aggregations) = test_case
.datastore
.run_tx(|tx| {
.run_unnamed_tx(|tx| {
let (task_id, unrelated_prepare_init, aggregation_job_id) = (
*test_case.task.id(),
unrelated_prepare_init.clone(),
Expand Down Expand Up @@ -689,7 +689,7 @@ mod tests {
// Make sure the state of the aggregation job and report aggregations has not changed
let (after_aggregation_job, after_report_aggregations) = test_case
.datastore
.run_tx(|tx| {
.run_unnamed_tx(|tx| {
let (task_id, aggregation_job_id) =
(*test_case.task.id(), test_case.aggregation_job_id);
Box::pin(async move {
Expand Down Expand Up @@ -727,7 +727,7 @@ mod tests {

test_case
.datastore
.run_tx(|tx| {
.run_unnamed_tx(|tx| {
let (task_id, aggregation_job_id) =
(*test_case.task.id(), test_case.aggregation_job_id);
Box::pin(async move {
Expand Down
Loading

0 comments on commit f1dc65f

Please sign in to comment.