diff --git a/aggregator/src/aggregator.rs b/aggregator/src/aggregator.rs index 274f99299..a80fe83f7 100644 --- a/aggregator/src/aggregator.rs +++ b/aggregator/src/aggregator.rs @@ -49,7 +49,7 @@ use janus_messages::{ query_type::{FixedSize, TimeInterval}, taskprov::TaskConfig, AggregateShare, AggregateShareAad, AggregateShareReq, AggregationJobContinueReq, - AggregationJobId, AggregationJobInitializeReq, AggregationJobResp, AggregationJobRound, + AggregationJobId, AggregationJobInitializeReq, AggregationJobResp, AggregationJobStep, BatchSelector, Collection, CollectionJobId, CollectionReq, Duration, HpkeConfig, HpkeConfigList, InputShareAad, Interval, PartialBatchSelector, PlaintextInputShare, PrepareError, PrepareResp, PrepareStepResult, Report, ReportIdChecksum, ReportShare, Role, @@ -1799,7 +1799,7 @@ impl VdafOps { let (report_aggregation_state, prepare_step_result) = match init_rslt { Ok((PingPongState::Continued(prep_state), outgoing_message)) => { // Helper is not finished. Await the next message from the Leader to advance to - // the next round. + // the next step. saw_continue = true; ( ReportAggregationState::WaitingHelper(prep_state), @@ -1879,7 +1879,7 @@ impl VdafOps { } else { AggregationJobState::Finished }, - AggregationJobRound::from(0), + AggregationJobStep::from(0), ) .with_last_request_hash(request_hash), ); @@ -2068,10 +2068,10 @@ impl VdafOps { A::PrepareMessage: Send + Sync, A::OutputShare: Send + Sync, { - if leader_aggregation_job.round() == AggregationJobRound::from(0) { + if leader_aggregation_job.step() == AggregationJobStep::from(0) { return Err(Error::InvalidMessage( Some(*task.id()), - "aggregation job cannot be advanced to round 0", + "aggregation job cannot be advanced to step 0", )); } @@ -2112,18 +2112,18 @@ impl VdafOps { ) })?; - // If the leader's request is on the same round as our stored aggregation job, - // then we probably have already received this message and computed this round, + // If the leader's request is on the same step as our stored aggregation job, + // then we probably have already received this message and computed this step, // but the leader never got our response and so retried stepping the job. // TODO(issue #1087): measure how often this happens with a Prometheus metric - if helper_aggregation_job.round() == leader_aggregation_job.round() { + if helper_aggregation_job.step() == leader_aggregation_job.step() { match helper_aggregation_job.last_request_hash() { None => { return Err(datastore::Error::User( Error::Internal(format!( - "aggregation job {aggregation_job_id} is in round {} but \ + "aggregation job {aggregation_job_id} is on step {} but \ has no last request hash", - helper_aggregation_job.round(), + helper_aggregation_job.step(), )) .into(), )); @@ -2141,23 +2141,23 @@ impl VdafOps { } } return Ok(Self::aggregation_job_resp_for(report_aggregations)); - } else if helper_aggregation_job.round().increment() - != leader_aggregation_job.round() + } else if helper_aggregation_job.step().increment() + != leader_aggregation_job.step() { // If this is not a replay, the leader should be advancing our state to the next - // round and no further. + // step and no further. return Err(datastore::Error::User( Error::StepMismatch { task_id: *task.id(), aggregation_job_id, - expected_step: helper_aggregation_job.round().increment(), - got_step: leader_aggregation_job.round(), + expected_step: helper_aggregation_job.step().increment(), + got_step: leader_aggregation_job.step(), } .into(), )); } - // The leader is advancing us to the next round. Step the aggregation job to + // The leader is advancing us to the next step. Step the aggregation job to // compute the next round of prepare messages and state. Self::step_aggregation_job( tx, diff --git a/aggregator/src/aggregator/aggregate_init_tests.rs b/aggregator/src/aggregator/aggregate_init_tests.rs index aa9936d60..c24520e94 100644 --- a/aggregator/src/aggregator/aggregate_init_tests.rs +++ b/aggregator/src/aggregator/aggregate_init_tests.rs @@ -448,7 +448,7 @@ async fn aggregation_job_mutation_report_aggregations() { } #[tokio::test] -async fn aggregation_job_init_two_round_vdaf_idempotence() { +async fn aggregation_job_init_two_step_vdaf_idempotence() { // We must run Poplar1 in this test so that the aggregation job won't finish on the first step let test_case = setup_poplar1_aggregate_init_test().await; diff --git a/aggregator/src/aggregator/aggregation_job_continue.rs b/aggregator/src/aggregator/aggregation_job_continue.rs index 6a8fc4c41..3fb8b53ac 100644 --- a/aggregator/src/aggregator/aggregation_job_continue.rs +++ b/aggregator/src/aggregator/aggregation_job_continue.rs @@ -28,8 +28,8 @@ use tokio::try_join; use tracing::trace_span; impl VdafOps { - /// Step the helper's aggregation job to the next round of VDAF preparation using the round `n` - /// prepare state in `report_aggregations` with the round `n+1` broadcast prepare messages in + /// Step the helper's aggregation job to the next step of VDAF preparation using the step `n` + /// prepare state in `report_aggregations` with the step `n+1` broadcast prepare messages in /// `leader_aggregation_job`. pub(super) async fn step_aggregation_job( tx: &Transaction<'_, C>, @@ -240,8 +240,8 @@ impl VdafOps { ) }); let helper_aggregation_job = helper_aggregation_job - // Advance the job to the leader's round - .with_round(leader_aggregation_job.round()) + // Advance the job to the leader's step + .with_step(leader_aggregation_job.step()) .with_state(match (saw_continue, saw_finish) { (false, false) => AggregationJobState::Finished, // everything failed, or there were no reports (true, false) => AggregationJobState::InProgress, @@ -249,7 +249,7 @@ impl VdafOps { (true, true) => { return Err(datastore::Error::User( Error::Internal( - "VDAF took an inconsistent number of rounds to reach Finish state" + "VDAF took an inconsistent number of steps to reach Finish state" .to_string(), ) .into(), @@ -403,7 +403,7 @@ mod tests { }; use janus_messages::{ query_type::TimeInterval, AggregationJobContinueReq, AggregationJobId, AggregationJobResp, - AggregationJobRound, Interval, PrepareContinue, PrepareResp, PrepareStepResult, Role, + AggregationJobStep, Interval, PrepareContinue, PrepareResp, PrepareStepResult, Role, }; use prio::{ idpf::IdpfInput, @@ -431,7 +431,7 @@ mod tests { _ephemeral_datastore: EphemeralDatastore, } - /// Set up a helper with an aggregation job in round 0 + /// Set up a helper with an aggregation job in step 0 #[allow(clippy::unit_arg)] async fn setup_aggregation_job_continue_test( ) -> AggregationJobContinueTestCase> { @@ -490,7 +490,7 @@ mod tests { (), Interval::from_time(prepare_init.report_share().metadata().time()).unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), )) .await .unwrap(); @@ -520,7 +520,7 @@ mod tests { .unwrap(); let first_continue_request = AggregationJobContinueReq::new( - AggregationJobRound::from(1), + AggregationJobStep::from(1), Vec::from([PrepareContinue::new( *prepare_init.report_share().metadata().id(), transcript.leader_prepare_transitions[1].message.clone(), @@ -549,9 +549,9 @@ mod tests { } } - /// Set up a helper with an aggregation job in round 1. + /// Set up a helper with an aggregation job in step 1. #[allow(clippy::unit_arg)] - async fn setup_aggregation_job_continue_round_recovery_test( + async fn setup_aggregation_job_continue_step_recovery_test( ) -> AggregationJobContinueTestCase> { let mut test_case = setup_aggregation_job_continue_test().await; @@ -581,20 +581,20 @@ mod tests { } #[tokio::test] - async fn aggregation_job_continue_round_zero() { + async fn aggregation_job_continue_step_zero() { let test_case = setup_aggregation_job_continue_test().await; - // The job is initialized into round 0 but has never been continued. Send a continue request - // to advance to round 0. Should be rejected because that is an illegal transition. - let round_zero_request = AggregationJobContinueReq::new( - AggregationJobRound::from(0), + // The job is initialized into step 0 but has never been continued. Send a continue request + // to advance to step 0. Should be rejected because that is an illegal transition. + let step_zero_request = AggregationJobContinueReq::new( + AggregationJobStep::from(0), test_case.first_continue_request.prepare_steps().to_vec(), ); post_aggregation_job_expecting_error( &test_case.task, &test_case.aggregation_job_id, - &round_zero_request, + &step_zero_request, &test_case.handler, Status::BadRequest, "urn:ietf:params:ppm:dap:error:invalidMessage", @@ -604,8 +604,8 @@ mod tests { } #[tokio::test] - async fn aggregation_job_continue_round_recovery_replay_request() { - let test_case = setup_aggregation_job_continue_round_recovery_test().await; + async fn aggregation_job_continue_step_recovery_replay_request() { + let test_case = setup_aggregation_job_continue_step_recovery_test().await; // Re-send the request, simulating the leader crashing and losing the helper's response. The // helper should send back the exact same response. @@ -624,8 +624,8 @@ mod tests { #[tokio::test] #[allow(clippy::unit_arg)] - async fn aggregation_job_continue_round_recovery_mutate_continue_request() { - let test_case = setup_aggregation_job_continue_round_recovery_test().await; + async fn aggregation_job_continue_step_recovery_mutate_continue_request() { + let test_case = setup_aggregation_job_continue_step_recovery_test().await; let (unrelated_prepare_init, unrelated_transcript) = test_case .prepare_init_generator @@ -671,7 +671,7 @@ mod tests { // Send another continue request for the same aggregation job, but with a different report // ID. let modified_request = AggregationJobContinueReq::new( - test_case.first_continue_request.round(), + test_case.first_continue_request.step(), Vec::from([PrepareContinue::new( *unrelated_prepare_init.report_share().metadata().id(), unrelated_transcript.leader_prepare_transitions[1] @@ -725,8 +725,8 @@ mod tests { } #[tokio::test] - async fn aggregation_job_continue_round_recovery_past_round() { - let test_case = setup_aggregation_job_continue_round_recovery_test().await; + async fn aggregation_job_continue_step_recovery_past_step() { + let test_case = setup_aggregation_job_continue_step_recovery_test().await; test_case .datastore @@ -734,10 +734,10 @@ mod tests { let (task_id, aggregation_job_id) = (*test_case.task.id(), test_case.aggregation_job_id); Box::pin(async move { - // This is a cheat: dummy_vdaf only has a single round, so we artificially force - // this job into round 2 so that we can send a request for round 1 and force a - // round mismatch error instead of tripping the check for a request to continue - // to round 0. + // This is a cheat: dummy_vdaf only has a single step, so we artificially force + // this job into step 2 so that we can send a request for step 1 and force a + // step mismatch error instead of tripping the check for a request to continue + // to step 0. let aggregation_job = tx .get_aggregation_job::>( &task_id, @@ -746,7 +746,7 @@ mod tests { .await .unwrap() .unwrap() - .with_round(AggregationJobRound::from(2)); + .with_step(AggregationJobStep::from(2)); tx.update_aggregation_job(&aggregation_job).await.unwrap(); @@ -756,16 +756,16 @@ mod tests { .await .unwrap(); - // Send another request for a round that the helper is past. Should fail. - let past_round_request = AggregationJobContinueReq::new( - AggregationJobRound::from(1), + // Send another request for a step that the helper is past. Should fail. + let past_step_request = AggregationJobContinueReq::new( + AggregationJobStep::from(1), test_case.first_continue_request.prepare_steps().to_vec(), ); post_aggregation_job_expecting_error( &test_case.task, &test_case.aggregation_job_id, - &past_round_request, + &past_step_request, &test_case.handler, Status::BadRequest, "urn:ietf:params:ppm:dap:error:stepMismatch", @@ -775,20 +775,20 @@ mod tests { } #[tokio::test] - async fn aggregation_job_continue_round_recovery_future_round() { - let test_case = setup_aggregation_job_continue_round_recovery_test().await; + async fn aggregation_job_continue_step_recovery_future_step() { + let test_case = setup_aggregation_job_continue_step_recovery_test().await; - // Send another request for a round too far past the helper's round. Should fail because the - // helper isn't on that round. - let future_round_request = AggregationJobContinueReq::new( - AggregationJobRound::from(17), + // Send another request for a step too far past the helper's step. Should fail because the + // helper isn't on that step. + let future_step_request = AggregationJobContinueReq::new( + AggregationJobStep::from(17), test_case.first_continue_request.prepare_steps().to_vec(), ); post_aggregation_job_expecting_error( &test_case.task, &test_case.aggregation_job_id, - &future_round_request, + &future_step_request, &test_case.handler, Status::BadRequest, "urn:ietf:params:ppm:dap:error:stepMismatch", diff --git a/aggregator/src/aggregator/aggregation_job_creator.rs b/aggregator/src/aggregator/aggregation_job_creator.rs index 28ecc5a2b..c763cb6fe 100644 --- a/aggregator/src/aggregator/aggregation_job_creator.rs +++ b/aggregator/src/aggregator/aggregation_job_creator.rs @@ -17,7 +17,7 @@ use janus_core::{ }; use janus_messages::{ query_type::{FixedSize, TimeInterval}, - AggregationJobRound, Duration as DurationMsg, Interval, Role, TaskId, + AggregationJobStep, Duration as DurationMsg, Interval, Role, TaskId, }; use opentelemetry::{ metrics::{Histogram, Meter, Unit}, @@ -579,7 +579,7 @@ impl AggregationJobCreator { (), client_timestamp_interval, AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), ); let report_aggregations = agg_job_reports @@ -692,7 +692,7 @@ mod tests { use janus_messages::{ codec::ParameterizedDecode, query_type::{FixedSize, TimeInterval}, - AggregationJobRound, Interval, ReportId, Role, TaskId, Time, + AggregationJobStep, Interval, ReportId, Role, TaskId, Time, }; use prio::vdaf::{self, prio3::Prio3Count}; use std::{collections::HashSet, iter, sync::Arc, time::Duration}; @@ -807,7 +807,7 @@ mod tests { assert_eq!(leader_aggregations.len(), 1); let leader_aggregation = leader_aggregations.into_iter().next().unwrap(); assert_eq!(leader_aggregation.0.partial_batch_identifier(), &()); - assert_eq!(leader_aggregation.0.round(), AggregationJobRound::from(0)); + assert_eq!(leader_aggregation.0.step(), AggregationJobStep::from(0)); assert_eq!( leader_aggregation.1, Vec::from([*leader_report.metadata().id()]) @@ -910,8 +910,8 @@ mod tests { .unwrap(); let mut seen_report_ids = HashSet::new(); for (agg_job, report_ids) in &agg_jobs { - // Jobs are created in round 0 - assert_eq!(agg_job.round(), AggregationJobRound::from(0)); + // Jobs are created in step 0 + assert_eq!(agg_job.step(), AggregationJobStep::from(0)); // The batch is at most MAX_AGGREGATION_JOB_SIZE in size. assert!(report_ids.len() <= MAX_AGGREGATION_JOB_SIZE); @@ -1163,8 +1163,8 @@ mod tests { // Job immediately finished since all reports are in a closed batch. assert_eq!(agg_job.state(), &AggregationJobState::Finished); - // Jobs are created in round 0. - assert_eq!(agg_job.round(), AggregationJobRound::from(0)); + // Jobs are created in step 0. + assert_eq!(agg_job.step(), AggregationJobStep::from(0)); // The batch is at most MAX_AGGREGATION_JOB_SIZE in size. assert!(report_ids.len() <= MAX_AGGREGATION_JOB_SIZE); @@ -1314,8 +1314,8 @@ mod tests { let mut seen_report_ids = HashSet::new(); let mut batches_with_small_agg_jobs = HashSet::new(); for (agg_job, report_ids) in agg_jobs { - // Aggregation jobs are created in round 0. - assert_eq!(agg_job.round(), AggregationJobRound::from(0)); + // Aggregation jobs are created in step 0. + assert_eq!(agg_job.step(), AggregationJobStep::from(0)); // Every batch corresponds to one of the outstanding batches. assert!(batch_ids.contains(agg_job.batch_id())); @@ -1648,7 +1648,7 @@ mod tests { // Verify consistency of batches and aggregation jobs. let mut seen_report_ids = HashSet::new(); for (agg_job, report_ids) in agg_jobs { - assert_eq!(agg_job.round(), AggregationJobRound::from(0)); + assert_eq!(agg_job.step(), AggregationJobStep::from(0)); assert!(batch_ids.contains(agg_job.batch_id())); assert!(report_ids.len() <= MAX_AGGREGATION_JOB_SIZE); @@ -1840,7 +1840,7 @@ mod tests { // Verify consistency of batches and aggregation jobs. let mut seen_report_ids = HashSet::new(); for (agg_job, report_ids) in agg_jobs { - assert_eq!(agg_job.round(), AggregationJobRound::from(0)); + assert_eq!(agg_job.step(), AggregationJobStep::from(0)); assert!(batch_ids.contains(agg_job.batch_id())); assert!(report_ids.len() <= MAX_AGGREGATION_JOB_SIZE); @@ -1996,7 +1996,7 @@ mod tests { let mut seen_report_ids = HashSet::new(); let mut batches_with_small_agg_jobs = HashSet::new(); for (agg_job, report_ids) in agg_jobs { - assert_eq!(agg_job.round(), AggregationJobRound::from(0)); + assert_eq!(agg_job.step(), AggregationJobStep::from(0)); assert!(batch_ids.contains(agg_job.batch_id())); assert!(report_ids.len() <= MAX_AGGREGATION_JOB_SIZE); diff --git a/aggregator/src/aggregator/aggregation_job_driver.rs b/aggregator/src/aggregator/aggregation_job_driver.rs index 2b2d7609a..24cc59721 100644 --- a/aggregator/src/aggregator/aggregation_job_driver.rs +++ b/aggregator/src/aggregator/aggregation_job_driver.rs @@ -498,7 +498,7 @@ impl AggregationJobDriver { // Construct request, send it to the helper, and process the response. // TODO(#235): abandon work immediately on "terminal" failures from helper, or other // unexpected cases such as unknown/unexpected content type. - let req = AggregationJobContinueReq::new(aggregation_job.round(), prepare_continues); + let req = AggregationJobContinueReq::new(aggregation_job.step(), prepare_continues); let resp_bytes = send_request_to_helper( &self.http_client, @@ -595,7 +595,7 @@ impl AggregationJobDriver { match state_and_message { Ok(PingPongContinuedValue::WithMessage { transition }) => { // Leader did not finish. Store our state and outgoing message for the - // next round. + // next step. // n.b. it's possible we finished and recovered an output share at the // VDAF level (i.e., state may be PingPongState::Finished) but we cannot // finish at the DAP layer and commit the output share until we get @@ -687,9 +687,9 @@ impl AggregationJobDriver { // Write everything back to storage. let mut aggregation_job_writer = AggregationJobWriter::new(Arc::clone(&task)); - let new_round = aggregation_job.round().increment(); + let new_step = aggregation_job.step().increment(); aggregation_job_writer.update( - aggregation_job.with_round(new_round), + aggregation_job.with_step(new_step), report_aggregations_to_write, )?; let aggregation_job_writer = Arc::new(aggregation_job_writer); @@ -933,7 +933,7 @@ mod tests { use janus_messages::{ query_type::{FixedSize, TimeInterval}, AggregationJobContinueReq, AggregationJobInitializeReq, AggregationJobResp, - AggregationJobRound, Duration, Extension, ExtensionType, FixedSizeQuery, HpkeConfig, + AggregationJobStep, Duration, Extension, ExtensionType, FixedSizeQuery, HpkeConfig, InputShareAad, Interval, PartialBatchSelector, PlaintextInputShare, PrepareContinue, PrepareError, PrepareInit, PrepareResp, PrepareStepResult, Query, ReportIdChecksum, ReportMetadata, ReportShare, Role, TaskId, Time, @@ -1037,7 +1037,7 @@ mod tests { Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), )) .await?; tx.put_report_aggregation(&ReportAggregation::< @@ -1187,7 +1187,7 @@ mod tests { Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), AggregationJobState::Finished, - AggregationJobRound::from(2), + AggregationJobStep::from(2), ); let want_report_aggregation = ReportAggregation::>::new( @@ -1260,7 +1260,7 @@ mod tests { } #[tokio::test] - async fn step_time_interval_aggregation_job_init_single_round() { + async fn step_time_interval_aggregation_job_init_single_step() { // Setup: insert a client report and add it to a new aggregation job. install_test_trace_subscriber(); let mut server = mockito::Server::new_async().await; @@ -1345,7 +1345,7 @@ mod tests { Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), )) .await?; tx.put_report_aggregation( @@ -1497,7 +1497,7 @@ mod tests { Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), AggregationJobState::Finished, - AggregationJobRound::from(1), + AggregationJobStep::from(1), ); let want_report_aggregation = ReportAggregation::::new( *task.id(), @@ -1622,7 +1622,7 @@ mod tests { } #[tokio::test] - async fn step_time_interval_aggregation_job_init_two_rounds() { + async fn step_time_interval_aggregation_job_init_two_steps() { // Setup: insert a client report and add it to a new aggregation job. install_test_trace_subscriber(); let mut server = mockito::Server::new_async().await; @@ -1696,7 +1696,7 @@ mod tests { Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), )) .await?; @@ -1806,7 +1806,7 @@ mod tests { Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(1), + AggregationJobStep::from(1), ); let want_report_aggregation = ReportAggregation::>::new( @@ -1875,7 +1875,7 @@ mod tests { } #[tokio::test] - async fn step_fixed_size_aggregation_job_init_single_round() { + async fn step_fixed_size_aggregation_job_init_single_step() { // Setup: insert a client report and add it to a new aggregation job. install_test_trace_subscriber(); let mut server = mockito::Server::new_async().await; @@ -1945,7 +1945,7 @@ mod tests { Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), )) .await?; @@ -2072,7 +2072,7 @@ mod tests { batch_id, Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)).unwrap(), AggregationJobState::Finished, - AggregationJobRound::from(1), + AggregationJobStep::from(1), ); let want_report_aggregation = ReportAggregation::::new( *task.id(), @@ -2128,7 +2128,7 @@ mod tests { } #[tokio::test] - async fn step_fixed_size_aggregation_job_init_two_rounds() { + async fn step_fixed_size_aggregation_job_init_two_steps() { // Setup: insert a client report and add it to a new aggregation job. install_test_trace_subscriber(); let mut server = mockito::Server::new_async().await; @@ -2207,7 +2207,7 @@ mod tests { Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), )) .await?; @@ -2317,7 +2317,7 @@ mod tests { Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(1), + AggregationJobStep::from(1), ); let want_report_aggregation = ReportAggregation::>::new( @@ -2480,7 +2480,7 @@ mod tests { Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(1), + AggregationJobStep::from(1), )) .await?; @@ -2562,7 +2562,7 @@ mod tests { // It would be nicer to retrieve the request bytes from the mock, then do our own parsing & // verification -- but mockito does not expose this functionality at time of writing.) let leader_request = AggregationJobContinueReq::new( - AggregationJobRound::from(1), + AggregationJobStep::from(1), Vec::from([PrepareContinue::new( *report.metadata().id(), transcript.leader_prepare_transitions[1].message.clone(), @@ -2638,7 +2638,7 @@ mod tests { Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), AggregationJobState::Finished, - AggregationJobRound::from(2), + AggregationJobStep::from(2), ); let want_report_aggregation = ReportAggregation::>::new( @@ -2882,7 +2882,7 @@ mod tests { Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(1), + AggregationJobStep::from(1), )) .await?; @@ -2951,7 +2951,7 @@ mod tests { // It would be nicer to retrieve the request bytes from the mock, then do our own parsing & // verification -- but mockito does not expose this functionality at time of writing.) let leader_request = AggregationJobContinueReq::new( - AggregationJobRound::from(1), + AggregationJobStep::from(1), Vec::from([PrepareContinue::new( *report.metadata().id(), transcript.leader_prepare_transitions[1].message.clone(), @@ -3027,7 +3027,7 @@ mod tests { Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), AggregationJobState::Finished, - AggregationJobRound::from(2), + AggregationJobStep::from(2), ); let want_report_aggregation = ReportAggregation::>::new( @@ -3200,7 +3200,7 @@ mod tests { (), Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)).unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), ); let report_aggregation = ReportAggregation::::new( *task.id(), @@ -3414,7 +3414,7 @@ mod tests { Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), )) .await?; @@ -3564,7 +3564,7 @@ mod tests { Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), AggregationJobState::Abandoned, - AggregationJobRound::from(0), + AggregationJobStep::from(0), ), ); assert_eq!( diff --git a/aggregator/src/aggregator/batch_creator.rs b/aggregator/src/aggregator/batch_creator.rs index 02dfa8203..f439e1f68 100644 --- a/aggregator/src/aggregator/batch_creator.rs +++ b/aggregator/src/aggregator/batch_creator.rs @@ -10,7 +10,7 @@ use janus_aggregator_core::datastore::{ }; use janus_core::time::{Clock, DurationExt, TimeExt}; use janus_messages::{ - query_type::FixedSize, AggregationJobRound, BatchId, Duration, Interval, ReportId, TaskId, Time, + query_type::FixedSize, AggregationJobStep, BatchId, Duration, Interval, ReportId, TaskId, Time, }; use prio::{codec::Encode, vdaf::Aggregator}; use rand::random; @@ -318,7 +318,7 @@ where batch_id, client_timestamp_interval, AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), ); aggregation_job_writer.put(aggregation_job, report_aggregations)?; diff --git a/aggregator/src/aggregator/collection_job_driver.rs b/aggregator/src/aggregator/collection_job_driver.rs index 606ce74b6..c12a470a6 100644 --- a/aggregator/src/aggregator/collection_job_driver.rs +++ b/aggregator/src/aggregator/collection_job_driver.rs @@ -552,7 +552,7 @@ mod tests { Runtime, }; use janus_messages::{ - query_type::TimeInterval, AggregateShare, AggregateShareReq, AggregationJobRound, + query_type::TimeInterval, AggregateShare, AggregateShareReq, AggregationJobStep, BatchSelector, Duration, HpkeCiphertext, HpkeConfigId, Interval, Query, ReportIdChecksum, Role, }; @@ -612,7 +612,7 @@ mod tests { (), Interval::from_time(&report_timestamp).unwrap(), AggregationJobState::Finished, - AggregationJobRound::from(1), + AggregationJobStep::from(1), ), ) .await?; @@ -770,7 +770,7 @@ mod tests { (), Interval::from_time(&report_timestamp).unwrap(), AggregationJobState::Finished, - AggregationJobRound::from(1), + AggregationJobStep::from(1), ), ) .await?; diff --git a/aggregator/src/aggregator/collection_job_tests.rs b/aggregator/src/aggregator/collection_job_tests.rs index 864f6e476..82bbd5404 100644 --- a/aggregator/src/aggregator/collection_job_tests.rs +++ b/aggregator/src/aggregator/collection_job_tests.rs @@ -33,7 +33,7 @@ use janus_core::{ }; use janus_messages::{ query_type::{FixedSize, QueryType as QueryTypeTrait, TimeInterval}, - AggregateShareAad, AggregationJobRound, BatchId, BatchSelector, Collection, CollectionJobId, + AggregateShareAad, AggregationJobStep, BatchId, BatchSelector, Collection, CollectionJobId, CollectionReq, Duration, FixedSizeQuery, Interval, Query, ReportIdChecksum, Role, Time, }; use prio::codec::{Decode, Encode}; @@ -200,7 +200,7 @@ async fn setup_fixed_size_current_batch_collection_job_test_case( batch_id, interval, AggregationJobState::Finished, - AggregationJobRound::from(1), + AggregationJobStep::from(1), )) .await .unwrap(); diff --git a/aggregator/src/aggregator/error.rs b/aggregator/src/aggregator/error.rs index c9c265282..c9c517aac 100644 --- a/aggregator/src/aggregator/error.rs +++ b/aggregator/src/aggregator/error.rs @@ -1,7 +1,7 @@ use http_api_problem::HttpApiProblem; use janus_aggregator_core::{datastore, task}; use janus_messages::{ - problem_type::DapProblemType, AggregationJobId, AggregationJobRound, CollectionJobId, + problem_type::DapProblemType, AggregationJobId, AggregationJobStep, CollectionJobId, HpkeConfigId, Interval, PrepareError, ReportId, ReportIdChecksum, Role, TaskId, Time, }; use opentelemetry::{metrics::Counter, KeyValue}; @@ -42,8 +42,8 @@ pub enum Error { StepMismatch { task_id: TaskId, aggregation_job_id: AggregationJobId, - expected_step: AggregationJobRound, - got_step: AggregationJobRound, + expected_step: AggregationJobStep, + got_step: AggregationJobStep, }, /// Corresponds to `unrecognizedTask`, ยง3.2 #[error("task {0}: unrecognized task")] diff --git a/aggregator/src/aggregator/garbage_collector.rs b/aggregator/src/aggregator/garbage_collector.rs index 56044fe27..82468fd70 100644 --- a/aggregator/src/aggregator/garbage_collector.rs +++ b/aggregator/src/aggregator/garbage_collector.rs @@ -99,7 +99,7 @@ mod tests { }; use janus_messages::{ query_type::{FixedSize, TimeInterval}, - AggregationJobRound, Duration, FixedSizeQuery, HpkeCiphertext, HpkeConfigId, Interval, + AggregationJobStep, Duration, FixedSizeQuery, HpkeCiphertext, HpkeConfigId, Interval, Query, ReportIdChecksum, ReportMetadata, ReportShare, Role, Time, }; use rand::random; @@ -153,7 +153,7 @@ mod tests { (), Interval::from_time(&client_timestamp).unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), ), ) .await @@ -343,7 +343,7 @@ mod tests { (), Interval::from_time(&client_timestamp).unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), ), ) .await @@ -524,7 +524,7 @@ mod tests { batch_id, Interval::from_time(&client_timestamp).unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), ); tx.put_aggregation_job(&aggregation_job).await.unwrap(); @@ -717,7 +717,7 @@ mod tests { batch_id, Interval::from_time(&client_timestamp).unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), ); tx.put_aggregation_job(&aggregation_job).await.unwrap(); diff --git a/aggregator/src/aggregator/http_handlers.rs b/aggregator/src/aggregator/http_handlers.rs index b06804457..0c793fc03 100644 --- a/aggregator/src/aggregator/http_handlers.rs +++ b/aggregator/src/aggregator/http_handlers.rs @@ -695,7 +695,7 @@ mod tests { use janus_messages::{ query_type::TimeInterval, AggregateShare as AggregateShareMessage, AggregateShareAad, AggregateShareReq, AggregationJobContinueReq, AggregationJobId, - AggregationJobInitializeReq, AggregationJobResp, AggregationJobRound, BatchSelector, + AggregationJobInitializeReq, AggregationJobResp, AggregationJobStep, BatchSelector, Collection, CollectionJobId, CollectionReq, Duration, Extension, ExtensionType, HpkeCiphertext, HpkeConfigId, HpkeConfigList, InputShareAad, Interval, PartialBatchSelector, PrepareContinue, PrepareError, PrepareInit, PrepareResp, @@ -1711,7 +1711,7 @@ mod tests { Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), ); tx.put_aggregation_job::<0, TimeInterval, dummy_vdaf::Vdaf>( &conflicting_aggregation_job, @@ -1742,7 +1742,7 @@ mod tests { Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), ); tx.put_aggregation_job::<0, TimeInterval, dummy_vdaf::Vdaf>( &non_conflicting_aggregation_job, @@ -2530,7 +2530,7 @@ mod tests { Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), )) .await?; @@ -2592,7 +2592,7 @@ mod tests { .unwrap(); let request = AggregationJobContinueReq::new( - AggregationJobRound::from(1), + AggregationJobStep::from(1), Vec::from([ PrepareContinue::new(*report_metadata_0.id(), leader_prep_message_0.clone()), PrepareContinue::new(*report_metadata_2.id(), leader_prep_message_2.clone()), @@ -2653,7 +2653,7 @@ mod tests { Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), AggregationJobState::Finished, - AggregationJobRound::from(1), + AggregationJobStep::from(1), ) .with_last_request_hash(aggregation_job.last_request_hash().unwrap()) ); @@ -2870,7 +2870,7 @@ mod tests { Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), )) .await?; @@ -2946,7 +2946,7 @@ mod tests { .unwrap(); let request = AggregationJobContinueReq::new( - AggregationJobRound::from(1), + AggregationJobStep::from(1), Vec::from([ PrepareContinue::new(*report_metadata_0.id(), ping_pong_leader_message_0.clone()), PrepareContinue::new(*report_metadata_1.id(), ping_pong_leader_message_1.clone()), @@ -3197,7 +3197,7 @@ mod tests { Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), )) .await?; @@ -3248,7 +3248,7 @@ mod tests { .unwrap(); let request = AggregationJobContinueReq::new( - AggregationJobRound::from(1), + AggregationJobStep::from(1), Vec::from([ PrepareContinue::new(*report_metadata_3.id(), ping_pong_leader_message_3.clone()), PrepareContinue::new(*report_metadata_4.id(), ping_pong_leader_message_4.clone()), @@ -3453,7 +3453,7 @@ mod tests { Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), )) .await?; tx.put_report_aggregation( @@ -3479,7 +3479,7 @@ mod tests { // Make request. let request = AggregationJobContinueReq::new( - AggregationJobRound::from(1), + AggregationJobStep::from(1), Vec::from([PrepareContinue::new( *report_metadata.id(), // An AggregationJobContinueReq should only ever contain Continue or Finished @@ -3562,7 +3562,7 @@ mod tests { Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), )) .await?; tx.put_report_aggregation( @@ -3588,7 +3588,7 @@ mod tests { // Make request. let request = AggregationJobContinueReq::new( - AggregationJobRound::from(1), + AggregationJobStep::from(1), Vec::from([PrepareContinue::new( *report_metadata.id(), PingPongMessage::Continue { @@ -3650,7 +3650,7 @@ mod tests { Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), AggregationJobState::Finished, - AggregationJobRound::from(1), + AggregationJobStep::from(1), ) .with_last_request_hash(aggregation_job.last_request_hash().unwrap()) ); @@ -3734,7 +3734,7 @@ mod tests { Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), )) .await?; tx.put_report_aggregation( @@ -3760,7 +3760,7 @@ mod tests { // Make request. let request = AggregationJobContinueReq::new( - AggregationJobRound::from(1), + AggregationJobStep::from(1), Vec::from([PrepareContinue::new( ReportId::from( [16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1], // not the same as above @@ -3882,7 +3882,7 @@ mod tests { Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), )) .await?; @@ -3925,7 +3925,7 @@ mod tests { // Make request. let request = AggregationJobContinueReq::new( - AggregationJobRound::from(1), + AggregationJobStep::from(1), Vec::from([ // Report IDs are in opposite order to what was stored in the datastore. PrepareContinue::new( @@ -4000,7 +4000,7 @@ mod tests { ) .unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), ), ) .await?; @@ -4021,7 +4021,7 @@ mod tests { // Make request. let request = AggregationJobContinueReq::new( - AggregationJobRound::from(1), + AggregationJobStep::from(1), Vec::from([PrepareContinue::new( ReportId::from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]), PingPongMessage::Continue { diff --git a/aggregator/src/aggregator/taskprov_tests.rs b/aggregator/src/aggregator/taskprov_tests.rs index b6d22d0d1..127f8dfc0 100644 --- a/aggregator/src/aggregator/taskprov_tests.rs +++ b/aggregator/src/aggregator/taskprov_tests.rs @@ -44,7 +44,7 @@ use janus_messages::{ }, AggregateShare as AggregateShareMessage, AggregateShareAad, AggregateShareReq, AggregationJobContinueReq, AggregationJobId, AggregationJobInitializeReq, AggregationJobResp, - AggregationJobRound, BatchSelector, Duration, Interval, PartialBatchSelector, PrepareContinue, + AggregationJobStep, BatchSelector, Duration, Interval, PartialBatchSelector, PrepareContinue, PrepareInit, PrepareResp, PrepareStepResult, ReportIdChecksum, ReportMetadata, ReportShare, Role, TaskId, Time, }; @@ -155,7 +155,7 @@ async fn setup_taskprov_test() -> TaskprovTestCase { task_config.encode(&mut task_config_encoded); // We use a real VDAF since taskprov doesn't have any allowance for a test VDAF, and we use - // Poplar1 so that the VDAF wil take more than one round, so we can exercise aggregation + // Poplar1 so that the VDAF wil take more than one step, so we can exercise aggregation // continuation. let vdaf = Poplar1::new(1); @@ -755,7 +755,7 @@ async fn taskprov_aggregate_continue() { Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), ), ) .await?; @@ -792,7 +792,7 @@ async fn taskprov_aggregate_continue() { .unwrap(); let request = AggregationJobContinueReq::new( - AggregationJobRound::from(1), + AggregationJobStep::from(1), Vec::from([PrepareContinue::new( *test.report_metadata.id(), test.transcript.leader_prepare_transitions[1] @@ -1044,7 +1044,7 @@ async fn end_to_end() { ); let aggregation_job_continue_request = AggregationJobContinueReq::new( - AggregationJobRound::from(1), + AggregationJobStep::from(1), Vec::from([PrepareContinue::new( *test.report_metadata.id(), test.transcript.leader_prepare_transitions[1] diff --git a/aggregator_api/src/tests.rs b/aggregator_api/src/tests.rs index 422172771..b1737585f 100644 --- a/aggregator_api/src/tests.rs +++ b/aggregator_api/src/tests.rs @@ -39,7 +39,7 @@ use janus_core::{ time::MockClock, }; use janus_messages::{ - query_type::TimeInterval, AggregationJobRound, Duration, HpkeAeadId, HpkeConfig, HpkeConfigId, + query_type::TimeInterval, AggregationJobStep, Duration, HpkeAeadId, HpkeConfig, HpkeConfigId, HpkeKdfId, HpkeKemId, HpkePublicKey, Interval, Role, TaskId, Time, }; use rand::{distributions::Standard, random, thread_rng, Rng}; @@ -763,7 +763,7 @@ async fn get_task_metrics() { Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), )) .await?; diff --git a/aggregator_core/src/datastore.rs b/aggregator_core/src/datastore.rs index 6e4d54dea..a846f5b70 100644 --- a/aggregator_core/src/datastore.rs +++ b/aggregator_core/src/datastore.rs @@ -1733,7 +1733,7 @@ impl Transaction<'_, C> { let stmt = self .prepare_cached( "SELECT - aggregation_param, batch_id, client_timestamp_interval, state, round, + aggregation_param, batch_id, client_timestamp_interval, state, step, last_request_hash FROM aggregation_jobs JOIN tasks ON tasks.id = aggregation_jobs.task_id @@ -1771,7 +1771,7 @@ impl Transaction<'_, C> { .prepare_cached( "SELECT aggregation_job_id, aggregation_param, batch_id, client_timestamp_interval, - state, round, last_request_hash + state, step, last_request_hash FROM aggregation_jobs JOIN tasks ON tasks.id = aggregation_jobs.task_id WHERE tasks.task_id = $1 @@ -1814,7 +1814,7 @@ impl Transaction<'_, C> { row.get::<_, SqlInterval>("client_timestamp_interval") .as_interval(), row.get("state"), - row.get_postgres_integer_and_convert::("round")?, + row.get_postgres_integer_and_convert::("step")?, ); if let Some(hash) = row.try_get::<_, Option>>("last_request_hash")? { @@ -1951,7 +1951,7 @@ impl Transaction<'_, C> { .prepare_cached( "INSERT INTO aggregation_jobs (task_id, aggregation_job_id, aggregation_param, batch_id, - client_timestamp_interval, state, round, last_request_hash) + client_timestamp_interval, state, step, last_request_hash) VALUES ((SELECT id FROM tasks WHERE task_id = $1), $2, $3, $4, $5, $6, $7, $8) ON CONFLICT DO NOTHING RETURNING COALESCE(UPPER(client_timestamp_interval) < COALESCE($9::TIMESTAMP - (SELECT report_expiry_age FROM tasks WHERE task_id = $1) * '1 second'::INTERVAL, '-infinity'::TIMESTAMP), FALSE) AS is_expired", @@ -1970,7 +1970,7 @@ impl Transaction<'_, C> { /* client_timestamp_interval */ &SqlInterval::from(aggregation_job.client_timestamp_interval()), /* state */ &aggregation_job.state(), - /* round */ &(u16::from(aggregation_job.round()) as i32), + /* step */ &(u16::from(aggregation_job.step()) as i32), /* last_request_hash */ &aggregation_job.last_request_hash(), /* now */ &self.clock.now().as_naive_date_time()?, @@ -2021,7 +2021,7 @@ impl Transaction<'_, C> { .prepare_cached( "UPDATE aggregation_jobs SET state = $1, - round = $2, + step = $2, last_request_hash = $3 FROM tasks WHERE tasks.task_id = $4 @@ -2034,7 +2034,7 @@ impl Transaction<'_, C> { &stmt, &[ /* state */ &aggregation_job.state(), - /* round */ &(u16::from(aggregation_job.round()) as i32), + /* step */ &(u16::from(aggregation_job.step()) as i32), /* last_request_hash */ &aggregation_job.last_request_hash(), /* task_id */ &aggregation_job.task_id().as_ref(), diff --git a/aggregator_core/src/datastore/models.rs b/aggregator_core/src/datastore/models.rs index e11f4cf62..a1bb8267d 100644 --- a/aggregator_core/src/datastore/models.rs +++ b/aggregator_core/src/datastore/models.rs @@ -12,7 +12,7 @@ use janus_core::{ }; use janus_messages::{ query_type::{FixedSize, QueryType, TimeInterval}, - AggregationJobId, AggregationJobRound, BatchId, CollectionJobId, Duration, Extension, + AggregationJobId, AggregationJobStep, BatchId, CollectionJobId, Duration, Extension, HpkeCiphertext, Interval, PrepareError, PrepareResp, Query, ReportId, ReportIdChecksum, ReportMetadata, Role, TaskId, Time, }; @@ -233,11 +233,11 @@ pub struct AggregationJob, } @@ -252,7 +252,7 @@ impl> batch_id: Q::PartialBatchIdentifier, client_timestamp_interval: Interval, state: AggregationJobState, - round: AggregationJobRound, + step: AggregationJobStep, ) -> Self { Self { task_id, @@ -261,7 +261,7 @@ impl> batch_id, client_timestamp_interval, state, - round, + step, last_request_hash: None, } } @@ -306,15 +306,15 @@ impl> AggregationJob { state, ..self } } - /// Returns the round of the VDAF preparation protocol the aggregation job is on. - pub fn round(&self) -> AggregationJobRound { - self.round + /// Returns the step of the VDAF preparation protocol the aggregation job is on. + pub fn step(&self) -> AggregationJobStep { + self.step } /// Returns a new [`AggregationJob`] corresponding to this aggregation job updated to be on - /// the given VDAF preparation round. - pub fn with_round(self, round: AggregationJobRound) -> Self { - Self { round, ..self } + /// the given VDAF preparation step. + pub fn with_step(self, step: AggregationJobStep) -> Self { + Self { step, ..self } } /// Returns the SHA-256 digest of the most recent @@ -355,7 +355,7 @@ where && self.batch_id == other.batch_id && self.client_timestamp_interval == other.client_timestamp_interval && self.state == other.state - && self.round == other.round + && self.step == other.step && self.last_request_hash == other.last_request_hash } } diff --git a/aggregator_core/src/datastore/tests.rs b/aggregator_core/src/datastore/tests.rs index f57bc2047..e2229b867 100644 --- a/aggregator_core/src/datastore/tests.rs +++ b/aggregator_core/src/datastore/tests.rs @@ -32,7 +32,7 @@ use janus_core::{ }; use janus_messages::{ query_type::{FixedSize, QueryType, TimeInterval}, - AggregateShareAad, AggregationJobId, AggregationJobRound, BatchId, BatchSelector, + AggregateShareAad, AggregationJobId, AggregationJobStep, BatchId, BatchSelector, CollectionJobId, Duration, Extension, ExtensionType, FixedSizeQuery, HpkeCiphertext, HpkeConfigId, Interval, PrepareError, PrepareResp, PrepareStepResult, Query, ReportId, ReportIdChecksum, ReportMetadata, ReportShare, Role, TaskId, Time, @@ -268,7 +268,7 @@ async fn get_task_metrics(ephemeral_datastore: EphemeralDatastore) { ) .unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), ); let expired_aggregation_job = AggregationJob::<0, TimeInterval, dummy_vdaf::Vdaf>::new( @@ -284,7 +284,7 @@ async fn get_task_metrics(ephemeral_datastore: EphemeralDatastore) { ) .unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), ); let other_aggregation_job = AggregationJob::<0, TimeInterval, dummy_vdaf::Vdaf>::new( @@ -300,7 +300,7 @@ async fn get_task_metrics(ephemeral_datastore: EphemeralDatastore) { ) .unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), ); let report_aggregations: Vec<_> = reports @@ -959,7 +959,7 @@ async fn count_client_reports_for_batch_id(ephemeral_datastore: EphemeralDatasto ) .unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), ); let expired_report_aggregation = ReportAggregation::<0, dummy_vdaf::Vdaf>::new( *task.id(), @@ -979,7 +979,7 @@ async fn count_client_reports_for_batch_id(ephemeral_datastore: EphemeralDatasto Interval::new(OLDEST_ALLOWED_REPORT_TIMESTAMP, Duration::from_seconds(2)) .unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), ); let aggregation_job_0_report_aggregation_0 = ReportAggregation::<0, dummy_vdaf::Vdaf>::new( @@ -1010,7 +1010,7 @@ async fn count_client_reports_for_batch_id(ephemeral_datastore: EphemeralDatasto Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), ); let aggregation_job_1_report_aggregation_0 = ReportAggregation::<0, dummy_vdaf::Vdaf>::new( @@ -1202,7 +1202,7 @@ async fn roundtrip_aggregation_job(ephemeral_datastore: EphemeralDatastore) { batch_id, Interval::new(OLDEST_ALLOWED_REPORT_TIMESTAMP, Duration::from_seconds(1)).unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), ); let helper_aggregation_job = AggregationJob::<0, FixedSize, dummy_vdaf::Vdaf>::new( *task.id(), @@ -1211,7 +1211,7 @@ async fn roundtrip_aggregation_job(ephemeral_datastore: EphemeralDatastore) { random(), Interval::new(OLDEST_ALLOWED_REPORT_TIMESTAMP, Duration::from_seconds(1)).unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), ); ds.run_tx(|tx| { @@ -1341,7 +1341,7 @@ async fn roundtrip_aggregation_job(ephemeral_datastore: EphemeralDatastore) { ) .unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), ); ds.run_tx(|tx| { let new_leader_aggregation_job = new_leader_aggregation_job.clone(); @@ -1442,7 +1442,7 @@ async fn aggregation_job_acquire_release(ephemeral_datastore: EphemeralDatastore ) .unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), )) .await } @@ -1459,7 +1459,7 @@ async fn aggregation_job_acquire_release(ephemeral_datastore: EphemeralDatastore Interval::new(OLDEST_ALLOWED_REPORT_TIMESTAMP, Duration::from_seconds(1)) .unwrap(), AggregationJobState::Finished, - AggregationJobRound::from(1), + AggregationJobStep::from(1), ), ) .await?; @@ -1474,7 +1474,7 @@ async fn aggregation_job_acquire_release(ephemeral_datastore: EphemeralDatastore Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), ), ) .await?; @@ -1497,7 +1497,7 @@ async fn aggregation_job_acquire_release(ephemeral_datastore: EphemeralDatastore Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), ), ) .await @@ -1761,7 +1761,7 @@ async fn aggregation_job_not_found(ephemeral_datastore: EphemeralDatastore) { Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), ), ) .await @@ -1796,7 +1796,7 @@ async fn get_aggregation_jobs_for_task(ephemeral_datastore: EphemeralDatastore) random(), Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)).unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), ); let second_aggregation_job = AggregationJob::<0, FixedSize, dummy_vdaf::Vdaf>::new( *task.id(), @@ -1805,7 +1805,7 @@ async fn get_aggregation_jobs_for_task(ephemeral_datastore: EphemeralDatastore) random(), Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)).unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), ); let aggregation_job_with_request_hash = AggregationJob::<0, FixedSize, dummy_vdaf::Vdaf>::new( *task.id(), @@ -1814,7 +1814,7 @@ async fn get_aggregation_jobs_for_task(ephemeral_datastore: EphemeralDatastore) random(), Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)).unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), ) .with_last_request_hash([3; 32]); @@ -1853,7 +1853,7 @@ async fn get_aggregation_jobs_for_task(ephemeral_datastore: EphemeralDatastore) Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), )) .await }) @@ -1960,7 +1960,7 @@ async fn roundtrip_report_aggregation(ephemeral_datastore: EphemeralDatastore) { Interval::new(OLDEST_ALLOWED_REPORT_TIMESTAMP, Duration::from_seconds(1)) .unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), )) .await?; tx.put_report_share( @@ -2127,7 +2127,7 @@ async fn check_other_report_aggregation_exists(ephemeral_datastore: EphemeralDat (), Interval::new(OLDEST_ALLOWED_REPORT_TIMESTAMP, Duration::from_seconds(1)).unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), )) .await?; tx.put_report_share( @@ -2349,7 +2349,7 @@ async fn get_report_aggregations_for_aggregation_job(ephemeral_datastore: Epheme Interval::new(OLDEST_ALLOWED_REPORT_TIMESTAMP, Duration::from_seconds(1)) .unwrap(), AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), )) .await .unwrap(); @@ -3037,7 +3037,7 @@ async fn time_interval_collection_job_acquire_release_happy_path( (), Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)).unwrap(), AggregationJobState::Finished, - AggregationJobRound::from(1), + AggregationJobStep::from(1), )]); let report_aggregations = Vec::from([ReportAggregation::<0, dummy_vdaf::Vdaf>::new( task_id, @@ -3162,7 +3162,7 @@ async fn fixed_size_collection_job_acquire_release_happy_path( batch_id, Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)).unwrap(), AggregationJobState::Finished, - AggregationJobRound::from(1), + AggregationJobStep::from(1), )]); let report_aggregations = Vec::from([ReportAggregation::<0, dummy_vdaf::Vdaf>::new( task_id, @@ -3294,7 +3294,7 @@ async fn collection_job_acquire_no_aggregation_job_with_task_id( (), Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)).unwrap(), AggregationJobState::Finished, - AggregationJobRound::from(1), + AggregationJobStep::from(1), )]); let collection_job_test_cases = Vec::from([CollectionJobTestCase:: { @@ -3349,7 +3349,7 @@ async fn collection_job_acquire_no_aggregation_job_with_agg_param( (), Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)).unwrap(), AggregationJobState::Finished, - AggregationJobRound::from(1), + AggregationJobStep::from(1), )]); let collection_job_test_cases = Vec::from([CollectionJobTestCase:: { @@ -3404,7 +3404,7 @@ async fn collection_job_acquire_report_shares_outside_interval( ) .unwrap(), AggregationJobState::Finished, - AggregationJobRound::from(1), + AggregationJobStep::from(1), )]); let report_aggregations = Vec::from([ReportAggregation::<0, dummy_vdaf::Vdaf>::new( task_id, @@ -3467,7 +3467,7 @@ async fn collection_job_acquire_release_job_finished(ephemeral_datastore: Epheme (), Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)).unwrap(), AggregationJobState::Finished, - AggregationJobRound::from(1), + AggregationJobStep::from(1), )]); let report_aggregations = Vec::from([ReportAggregation::<0, dummy_vdaf::Vdaf>::new( @@ -3534,7 +3534,7 @@ async fn collection_job_acquire_release_aggregation_job_in_progress( (), Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)).unwrap(), AggregationJobState::Finished, - AggregationJobRound::from(1), + AggregationJobStep::from(1), ), AggregationJob::<0, TimeInterval, dummy_vdaf::Vdaf>::new( task_id, @@ -3544,7 +3544,7 @@ async fn collection_job_acquire_release_aggregation_job_in_progress( Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)).unwrap(), // Aggregation job included in collect request is in progress AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), ), ]); @@ -3619,7 +3619,7 @@ async fn collection_job_acquire_job_max(ephemeral_datastore: EphemeralDatastore) (), Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)).unwrap(), AggregationJobState::Finished, - AggregationJobRound::from(1), + AggregationJobStep::from(1), ), AggregationJob::<0, TimeInterval, dummy_vdaf::Vdaf>::new( task_id, @@ -3628,7 +3628,7 @@ async fn collection_job_acquire_job_max(ephemeral_datastore: EphemeralDatastore) (), Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)).unwrap(), AggregationJobState::Finished, - AggregationJobRound::from(1), + AggregationJobStep::from(1), ), ]); let report_aggregations = Vec::from([ @@ -3763,7 +3763,7 @@ async fn collection_job_acquire_state_filtering(ephemeral_datastore: EphemeralDa (), Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)).unwrap(), AggregationJobState::Finished, - AggregationJobRound::from(1), + AggregationJobStep::from(1), ), AggregationJob::<0, TimeInterval, dummy_vdaf::Vdaf>::new( task_id, @@ -3772,7 +3772,7 @@ async fn collection_job_acquire_state_filtering(ephemeral_datastore: EphemeralDa (), Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)).unwrap(), AggregationJobState::Finished, - AggregationJobRound::from(1), + AggregationJobStep::from(1), ), AggregationJob::<0, TimeInterval, dummy_vdaf::Vdaf>::new( task_id, @@ -3781,7 +3781,7 @@ async fn collection_job_acquire_state_filtering(ephemeral_datastore: EphemeralDa (), Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)).unwrap(), AggregationJobState::Finished, - AggregationJobRound::from(1), + AggregationJobStep::from(1), ), ]); let report_aggregations = Vec::from([ @@ -4826,7 +4826,7 @@ async fn roundtrip_outstanding_batch(ephemeral_datastore: EphemeralDatastore) { Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), AggregationJobState::Finished, - AggregationJobRound::from(1), + AggregationJobStep::from(1), ); let report_aggregation_0_0 = ReportAggregation::<0, dummy_vdaf::Vdaf>::new( *task_1.id(), @@ -4877,7 +4877,7 @@ async fn roundtrip_outstanding_batch(ephemeral_datastore: EphemeralDatastore) { Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), AggregationJobState::Finished, - AggregationJobRound::from(1), + AggregationJobStep::from(1), ); let report_aggregation_1_0 = ReportAggregation::<0, dummy_vdaf::Vdaf>::new( *task_1.id(), @@ -4915,7 +4915,7 @@ async fn roundtrip_outstanding_batch(ephemeral_datastore: EphemeralDatastore) { Interval::new(Time::from_seconds_since_epoch(0), Duration::from_seconds(1)) .unwrap(), AggregationJobState::Finished, - AggregationJobRound::from(1), + AggregationJobStep::from(1), ); let report_aggregation_2_0 = ReportAggregation::<0, dummy_vdaf::Vdaf>::new( *task_2.id(), @@ -5364,7 +5364,7 @@ async fn delete_expired_aggregation_artifacts(ephemeral_datastore: EphemeralData Q::partial_batch_identifier(&batch_identifier).clone(), client_timestamp_interval, AggregationJobState::InProgress, - AggregationJobRound::from(0), + AggregationJobStep::from(0), ); tx.put_aggregation_job(&aggregation_job).await.unwrap(); diff --git a/core/src/test_util/mod.rs b/core/src/test_util/mod.rs index 1fff7e7dc..bbb828b0b 100644 --- a/core/src/test_util/mod.rs +++ b/core/src/test_util/mod.rs @@ -60,12 +60,12 @@ pub struct VdafTranscript< pub helper_input_share: V::InputShare, /// The leader's states and messages computed throughout the protocol run. Indexed by the - /// aggregation job round. + /// aggregation job step. #[allow(clippy::type_complexity)] pub leader_prepare_transitions: Vec>, /// The helper's states and messages computed throughout the protocol run. Indexed by the - /// aggregation job round. + /// aggregation job step. #[allow(clippy::type_complexity)] pub helper_prepare_transitions: Vec>, diff --git a/db/00000000000001_initial_schema.up.sql b/db/00000000000001_initial_schema.up.sql index c2b31549a..f744ed8a1 100644 --- a/db/00000000000001_initial_schema.up.sql +++ b/db/00000000000001_initial_schema.up.sql @@ -173,7 +173,7 @@ CREATE TABLE aggregation_jobs( batch_id BYTEA NOT NULL, -- batch ID (fixed-size only; corresponds to identifier in BatchSelector) client_timestamp_interval TSRANGE NOT NULL, -- the minimal interval containing all of client timestamps included in this aggregation job state AGGREGATION_JOB_STATE NOT NULL, -- current state of the aggregation job - round INTEGER NOT NULL, -- current round of the VDAF preparation protocol + step INTEGER NOT NULL, -- current step of the VDAF preparation protocol last_request_hash BYTEA, -- SHA-256 hash of the most recently received AggregationJobContinueReq (helper only) trace_context JSONB, -- distributed tracing metadata diff --git a/messages/src/lib.rs b/messages/src/lib.rs index d65901258..d61492b39 100644 --- a/messages/src/lib.rs +++ b/messages/src/lib.rs @@ -2483,24 +2483,24 @@ impl Decode for AggregationJobInitializeReq { } } -/// Type representing the round of an aggregation job. +/// Type representing the step of an aggregation job. #[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] -pub struct AggregationJobRound(u16); +pub struct AggregationJobStep(u16); -impl AggregationJobRound { - /// Construct a new [`AggregationJobRound`] representing the round after this one. +impl AggregationJobStep { + /// Construct a new [`AggregationJobStep`] representing the step after this one. pub fn increment(&self) -> Self { Self(self.0 + 1) } } -impl Display for AggregationJobRound { +impl Display for AggregationJobStep { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { write!(f, "{}", self.0) } } -impl Encode for AggregationJobRound { +impl Encode for AggregationJobStep { fn encode(&self, bytes: &mut Vec) { self.0.encode(bytes) } @@ -2510,40 +2510,40 @@ impl Encode for AggregationJobRound { } } -impl Decode for AggregationJobRound { +impl Decode for AggregationJobStep { fn decode(bytes: &mut Cursor<&[u8]>) -> Result { Ok(Self(u16::decode(bytes)?)) } } -impl From for AggregationJobRound { +impl From for AggregationJobStep { fn from(value: u16) -> Self { Self(value) } } -impl From for u16 { - fn from(value: AggregationJobRound) -> Self { +impl From for u16 { + fn from(value: AggregationJobStep) -> Self { value.0 } } -impl TryFrom for AggregationJobRound { - // This implementation is convenient for converting from the representation of a round in +impl TryFrom for AggregationJobStep { + // This implementation is convenient for converting from the representation of a step in // PostgreSQL, where the smallest type that can store a u16 is `integer`, which is represented // as i32 in Rust. type Error = TryFromIntError; fn try_from(value: i32) -> Result { - Ok(AggregationJobRound(u16::try_from(value)?)) + Ok(AggregationJobStep(u16::try_from(value)?)) } } /// DAP protocol message representing a request to continue an aggregation job. #[derive(Clone, Debug, PartialEq, Eq)] pub struct AggregationJobContinueReq { - round: AggregationJobRound, + step: AggregationJobStep, prepare_continues: Vec, } @@ -2552,16 +2552,16 @@ impl AggregationJobContinueReq { pub const MEDIA_TYPE: &'static str = "application/dap-aggregation-job-continue-req"; /// Constructs a new aggregate continuation response from its components. - pub fn new(round: AggregationJobRound, prepare_continues: Vec) -> Self { + pub fn new(step: AggregationJobStep, prepare_continues: Vec) -> Self { Self { - round, + step, prepare_continues, } } - /// Gets the round of VDAF preparation this aggregation job is on. - pub fn round(&self) -> AggregationJobRound { - self.round + /// Gets the step of VDAF preparation this aggregation job is on. + pub fn step(&self) -> AggregationJobStep { + self.step } /// Gets the prepare steps associated with this aggregate continuation response. @@ -2572,12 +2572,12 @@ impl AggregationJobContinueReq { impl Encode for AggregationJobContinueReq { fn encode(&self, bytes: &mut Vec) { - self.round.encode(bytes); + self.step.encode(bytes); encode_u32_items(bytes, &(), &self.prepare_continues); } fn encoded_len(&self) -> Option { - let mut length = self.round.encoded_len()?; + let mut length = self.step.encoded_len()?; length += 4; for prepare_continue in self.prepare_continues.iter() { length += prepare_continue.encoded_len()?; @@ -2588,9 +2588,9 @@ impl Encode for AggregationJobContinueReq { impl Decode for AggregationJobContinueReq { fn decode(bytes: &mut Cursor<&[u8]>) -> Result { - let round = AggregationJobRound::decode(bytes)?; + let step = AggregationJobStep::decode(bytes)?; let prepare_continues = decode_u32_items(&(), bytes)?; - Ok(Self::new(round, prepare_continues)) + Ok(Self::new(step, prepare_continues)) } } @@ -2869,9 +2869,9 @@ mod tests { use crate::{ query_type, roundtrip_encoding, AggregateShare, AggregateShareAad, AggregateShareReq, AggregationJobContinueReq, AggregationJobInitializeReq, AggregationJobResp, - AggregationJobRound, BatchId, BatchSelector, Collection, CollectionReq, Duration, - Extension, ExtensionType, FixedSize, FixedSizeQuery, HpkeAeadId, HpkeCiphertext, - HpkeConfig, HpkeConfigId, HpkeKdfId, HpkeKemId, HpkePublicKey, InputShareAad, Interval, + AggregationJobStep, BatchId, BatchSelector, Collection, CollectionReq, Duration, Extension, + ExtensionType, FixedSize, FixedSizeQuery, HpkeAeadId, HpkeCiphertext, HpkeConfig, + HpkeConfigId, HpkeKdfId, HpkeKemId, HpkePublicKey, InputShareAad, Interval, PartialBatchSelector, PlaintextInputShare, PrepareContinue, PrepareError, PrepareInit, PrepareResp, PrepareStepResult, Query, Report, ReportId, ReportIdChecksum, ReportMetadata, ReportShare, Role, TaskId, Time, TimeInterval, Url, @@ -4541,7 +4541,7 @@ mod tests { fn roundtrip_aggregation_job_continue_req() { roundtrip_encoding(&[( AggregationJobContinueReq { - round: AggregationJobRound(42405), + step: AggregationJobStep(42405), prepare_continues: Vec::from([ PrepareContinue { report_id: ReportId::from([