Skip to content

Commit

Permalink
Replace "round" with "step"
Browse files Browse the repository at this point in the history
When DAP-05 introduced the ping-pong topology, it also started talking
about aggregation protocol "steps" instead of "rounds". This commit
corrects the word usage across Janus to line up with the specification.

Relevant to #1669
  • Loading branch information
tgeoghegan committed Sep 13, 2023
1 parent 8ce3d7c commit a79d187
Show file tree
Hide file tree
Showing 19 changed files with 230 additions and 230 deletions.
32 changes: 16 additions & 16 deletions aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use janus_messages::{
query_type::{FixedSize, TimeInterval},
taskprov::TaskConfig,
AggregateShare, AggregateShareAad, AggregateShareReq, AggregationJobContinueReq,
AggregationJobId, AggregationJobInitializeReq, AggregationJobResp, AggregationJobRound,
AggregationJobId, AggregationJobInitializeReq, AggregationJobResp, AggregationJobStep,
BatchSelector, Collection, CollectionJobId, CollectionReq, Duration, HpkeConfig,
HpkeConfigList, InputShareAad, Interval, PartialBatchSelector, PlaintextInputShare,
PrepareError, PrepareResp, PrepareStepResult, Report, ReportIdChecksum, ReportShare, Role,
Expand Down Expand Up @@ -1799,7 +1799,7 @@ impl VdafOps {
let (report_aggregation_state, prepare_step_result) = match init_rslt {
Ok((PingPongState::Continued(prep_state), outgoing_message)) => {
// Helper is not finished. Await the next message from the Leader to advance to
// the next round.
// the next step.
saw_continue = true;
(
ReportAggregationState::WaitingHelper(prep_state),
Expand Down Expand Up @@ -1879,7 +1879,7 @@ impl VdafOps {
} else {
AggregationJobState::Finished
},
AggregationJobRound::from(0),
AggregationJobStep::from(0),
)
.with_last_request_hash(request_hash),
);
Expand Down Expand Up @@ -2068,10 +2068,10 @@ impl VdafOps {
A::PrepareMessage: Send + Sync,
A::OutputShare: Send + Sync,
{
if leader_aggregation_job.round() == AggregationJobRound::from(0) {
if leader_aggregation_job.step() == AggregationJobStep::from(0) {
return Err(Error::InvalidMessage(
Some(*task.id()),
"aggregation job cannot be advanced to round 0",
"aggregation job cannot be advanced to step 0",
));
}

Expand Down Expand Up @@ -2112,18 +2112,18 @@ impl VdafOps {
)
})?;

// If the leader's request is on the same round as our stored aggregation job,
// then we probably have already received this message and computed this round,
// If the leader's request is on the same step as our stored aggregation job,
// then we probably have already received this message and computed this step,
// but the leader never got our response and so retried stepping the job.
// TODO(issue #1087): measure how often this happens with a Prometheus metric
if helper_aggregation_job.round() == leader_aggregation_job.round() {
if helper_aggregation_job.step() == leader_aggregation_job.step() {
match helper_aggregation_job.last_request_hash() {
None => {
return Err(datastore::Error::User(
Error::Internal(format!(
"aggregation job {aggregation_job_id} is in round {} but \
"aggregation job {aggregation_job_id} is on step {} but \
has no last request hash",
helper_aggregation_job.round(),
helper_aggregation_job.step(),
))
.into(),
));
Expand All @@ -2141,23 +2141,23 @@ impl VdafOps {
}
}
return Ok(Self::aggregation_job_resp_for(report_aggregations));
} else if helper_aggregation_job.round().increment()
!= leader_aggregation_job.round()
} else if helper_aggregation_job.step().increment()
!= leader_aggregation_job.step()
{
// If this is not a replay, the leader should be advancing our state to the next
// round and no further.
// step and no further.
return Err(datastore::Error::User(
Error::StepMismatch {
task_id: *task.id(),
aggregation_job_id,
expected_step: helper_aggregation_job.round().increment(),
got_step: leader_aggregation_job.round(),
expected_step: helper_aggregation_job.step().increment(),
got_step: leader_aggregation_job.step(),
}
.into(),
));
}

// The leader is advancing us to the next round. Step the aggregation job to
// The leader is advancing us to the next step. Step the aggregation job to
// compute the next round of prepare messages and state.
Self::step_aggregation_job(
tx,
Expand Down
2 changes: 1 addition & 1 deletion aggregator/src/aggregator/aggregate_init_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ async fn aggregation_job_mutation_report_aggregations() {
}

#[tokio::test]
async fn aggregation_job_init_two_round_vdaf_idempotence() {
async fn aggregation_job_init_two_step_vdaf_idempotence() {
// We must run Poplar1 in this test so that the aggregation job won't finish on the first step
let test_case = setup_poplar1_aggregate_init_test().await;

Expand Down
80 changes: 40 additions & 40 deletions aggregator/src/aggregator/aggregation_job_continue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ use tokio::try_join;
use tracing::trace_span;

impl VdafOps {
/// Step the helper's aggregation job to the next round of VDAF preparation using the round `n`
/// prepare state in `report_aggregations` with the round `n+1` broadcast prepare messages in
/// Step the helper's aggregation job to the next step of VDAF preparation using the step `n`
/// prepare state in `report_aggregations` with the step `n+1` broadcast prepare messages in
/// `leader_aggregation_job`.
pub(super) async fn step_aggregation_job<const SEED_SIZE: usize, C, Q, A>(
tx: &Transaction<'_, C>,
Expand Down Expand Up @@ -240,16 +240,16 @@ impl VdafOps {
)
});
let helper_aggregation_job = helper_aggregation_job
// Advance the job to the leader's round
.with_round(leader_aggregation_job.round())
// Advance the job to the leader's step
.with_step(leader_aggregation_job.step())
.with_state(match (saw_continue, saw_finish) {
(false, false) => AggregationJobState::Finished, // everything failed, or there were no reports
(true, false) => AggregationJobState::InProgress,
(false, true) => AggregationJobState::Finished,
(true, true) => {
return Err(datastore::Error::User(
Error::Internal(
"VDAF took an inconsistent number of rounds to reach Finish state"
"VDAF took an inconsistent number of steps to reach Finish state"
.to_string(),
)
.into(),
Expand Down Expand Up @@ -403,7 +403,7 @@ mod tests {
};
use janus_messages::{
query_type::TimeInterval, AggregationJobContinueReq, AggregationJobId, AggregationJobResp,
AggregationJobRound, Interval, PrepareContinue, PrepareResp, PrepareStepResult, Role,
AggregationJobStep, Interval, PrepareContinue, PrepareResp, PrepareStepResult, Role,
};
use prio::{
idpf::IdpfInput,
Expand Down Expand Up @@ -431,7 +431,7 @@ mod tests {
_ephemeral_datastore: EphemeralDatastore,
}

/// Set up a helper with an aggregation job in round 0
/// Set up a helper with an aggregation job in step 0
#[allow(clippy::unit_arg)]
async fn setup_aggregation_job_continue_test(
) -> AggregationJobContinueTestCase<VERIFY_KEY_LENGTH, Poplar1<XofShake128, 16>> {
Expand Down Expand Up @@ -490,7 +490,7 @@ mod tests {
(),
Interval::from_time(prepare_init.report_share().metadata().time()).unwrap(),
AggregationJobState::InProgress,
AggregationJobRound::from(0),
AggregationJobStep::from(0),
))
.await
.unwrap();
Expand Down Expand Up @@ -520,7 +520,7 @@ mod tests {
.unwrap();

let first_continue_request = AggregationJobContinueReq::new(
AggregationJobRound::from(1),
AggregationJobStep::from(1),
Vec::from([PrepareContinue::new(
*prepare_init.report_share().metadata().id(),
transcript.leader_prepare_transitions[1].message.clone(),
Expand Down Expand Up @@ -549,9 +549,9 @@ mod tests {
}
}

/// Set up a helper with an aggregation job in round 1.
/// Set up a helper with an aggregation job in step 1.
#[allow(clippy::unit_arg)]
async fn setup_aggregation_job_continue_round_recovery_test(
async fn setup_aggregation_job_continue_step_recovery_test(
) -> AggregationJobContinueTestCase<VERIFY_KEY_LENGTH, Poplar1<XofShake128, 16>> {
let mut test_case = setup_aggregation_job_continue_test().await;

Expand Down Expand Up @@ -581,20 +581,20 @@ mod tests {
}

#[tokio::test]
async fn aggregation_job_continue_round_zero() {
async fn aggregation_job_continue_step_zero() {
let test_case = setup_aggregation_job_continue_test().await;

// The job is initialized into round 0 but has never been continued. Send a continue request
// to advance to round 0. Should be rejected because that is an illegal transition.
let round_zero_request = AggregationJobContinueReq::new(
AggregationJobRound::from(0),
// The job is initialized into step 0 but has never been continued. Send a continue request
// to advance to step 0. Should be rejected because that is an illegal transition.
let step_zero_request = AggregationJobContinueReq::new(
AggregationJobStep::from(0),
test_case.first_continue_request.prepare_steps().to_vec(),
);

post_aggregation_job_expecting_error(
&test_case.task,
&test_case.aggregation_job_id,
&round_zero_request,
&step_zero_request,
&test_case.handler,
Status::BadRequest,
"urn:ietf:params:ppm:dap:error:invalidMessage",
Expand All @@ -604,8 +604,8 @@ mod tests {
}

#[tokio::test]
async fn aggregation_job_continue_round_recovery_replay_request() {
let test_case = setup_aggregation_job_continue_round_recovery_test().await;
async fn aggregation_job_continue_step_recovery_replay_request() {
let test_case = setup_aggregation_job_continue_step_recovery_test().await;

// Re-send the request, simulating the leader crashing and losing the helper's response. The
// helper should send back the exact same response.
Expand All @@ -624,8 +624,8 @@ mod tests {

#[tokio::test]
#[allow(clippy::unit_arg)]
async fn aggregation_job_continue_round_recovery_mutate_continue_request() {
let test_case = setup_aggregation_job_continue_round_recovery_test().await;
async fn aggregation_job_continue_step_recovery_mutate_continue_request() {
let test_case = setup_aggregation_job_continue_step_recovery_test().await;

let (unrelated_prepare_init, unrelated_transcript) = test_case
.prepare_init_generator
Expand Down Expand Up @@ -671,7 +671,7 @@ mod tests {
// Send another continue request for the same aggregation job, but with a different report
// ID.
let modified_request = AggregationJobContinueReq::new(
test_case.first_continue_request.round(),
test_case.first_continue_request.step(),
Vec::from([PrepareContinue::new(
*unrelated_prepare_init.report_share().metadata().id(),
unrelated_transcript.leader_prepare_transitions[1]
Expand Down Expand Up @@ -725,19 +725,19 @@ mod tests {
}

#[tokio::test]
async fn aggregation_job_continue_round_recovery_past_round() {
let test_case = setup_aggregation_job_continue_round_recovery_test().await;
async fn aggregation_job_continue_step_recovery_past_step() {
let test_case = setup_aggregation_job_continue_step_recovery_test().await;

test_case
.datastore
.run_tx(|tx| {
let (task_id, aggregation_job_id) =
(*test_case.task.id(), test_case.aggregation_job_id);
Box::pin(async move {
// This is a cheat: dummy_vdaf only has a single round, so we artificially force
// this job into round 2 so that we can send a request for round 1 and force a
// round mismatch error instead of tripping the check for a request to continue
// to round 0.
// This is a cheat: dummy_vdaf only has a single step, so we artificially force
// this job into step 2 so that we can send a request for step 1 and force a
// step mismatch error instead of tripping the check for a request to continue
// to step 0.
let aggregation_job = tx
.get_aggregation_job::<VERIFY_KEY_LENGTH, TimeInterval, Poplar1<XofShake128, 16>>(
&task_id,
Expand All @@ -746,7 +746,7 @@ mod tests {
.await
.unwrap()
.unwrap()
.with_round(AggregationJobRound::from(2));
.with_step(AggregationJobStep::from(2));

tx.update_aggregation_job(&aggregation_job).await.unwrap();

Expand All @@ -756,16 +756,16 @@ mod tests {
.await
.unwrap();

// Send another request for a round that the helper is past. Should fail.
let past_round_request = AggregationJobContinueReq::new(
AggregationJobRound::from(1),
// Send another request for a step that the helper is past. Should fail.
let past_step_request = AggregationJobContinueReq::new(
AggregationJobStep::from(1),
test_case.first_continue_request.prepare_steps().to_vec(),
);

post_aggregation_job_expecting_error(
&test_case.task,
&test_case.aggregation_job_id,
&past_round_request,
&past_step_request,
&test_case.handler,
Status::BadRequest,
"urn:ietf:params:ppm:dap:error:stepMismatch",
Expand All @@ -775,20 +775,20 @@ mod tests {
}

#[tokio::test]
async fn aggregation_job_continue_round_recovery_future_round() {
let test_case = setup_aggregation_job_continue_round_recovery_test().await;
async fn aggregation_job_continue_step_recovery_future_step() {
let test_case = setup_aggregation_job_continue_step_recovery_test().await;

// Send another request for a round too far past the helper's round. Should fail because the
// helper isn't on that round.
let future_round_request = AggregationJobContinueReq::new(
AggregationJobRound::from(17),
// Send another request for a step too far past the helper's step. Should fail because the
// helper isn't on that step.
let future_step_request = AggregationJobContinueReq::new(
AggregationJobStep::from(17),
test_case.first_continue_request.prepare_steps().to_vec(),
);

post_aggregation_job_expecting_error(
&test_case.task,
&test_case.aggregation_job_id,
&future_round_request,
&future_step_request,
&test_case.handler,
Status::BadRequest,
"urn:ietf:params:ppm:dap:error:stepMismatch",
Expand Down
Loading

0 comments on commit a79d187

Please sign in to comment.