Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace "round" with "step" #1922

Merged
merged 2 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use janus_messages::{
query_type::{FixedSize, TimeInterval},
taskprov::TaskConfig,
AggregateShare, AggregateShareAad, AggregateShareReq, AggregationJobContinueReq,
AggregationJobId, AggregationJobInitializeReq, AggregationJobResp, AggregationJobRound,
AggregationJobId, AggregationJobInitializeReq, AggregationJobResp, AggregationJobStep,
BatchSelector, Collection, CollectionJobId, CollectionReq, Duration, HpkeConfig,
HpkeConfigList, InputShareAad, Interval, PartialBatchSelector, PlaintextInputShare,
PrepareError, PrepareResp, PrepareStepResult, Report, ReportIdChecksum, ReportShare, Role,
Expand Down Expand Up @@ -1799,7 +1799,7 @@ impl VdafOps {
let (report_aggregation_state, prepare_step_result) = match init_rslt {
Ok((PingPongState::Continued(prep_state), outgoing_message)) => {
// Helper is not finished. Await the next message from the Leader to advance to
// the next round.
// the next step.
saw_continue = true;
(
ReportAggregationState::WaitingHelper(prep_state),
Expand Down Expand Up @@ -1879,7 +1879,7 @@ impl VdafOps {
} else {
AggregationJobState::Finished
},
AggregationJobRound::from(0),
AggregationJobStep::from(0),
)
.with_last_request_hash(request_hash),
);
Expand Down Expand Up @@ -2068,10 +2068,10 @@ impl VdafOps {
A::PrepareMessage: Send + Sync,
A::OutputShare: Send + Sync,
{
if leader_aggregation_job.round() == AggregationJobRound::from(0) {
if leader_aggregation_job.step() == AggregationJobStep::from(0) {
return Err(Error::InvalidMessage(
Some(*task.id()),
"aggregation job cannot be advanced to round 0",
"aggregation job cannot be advanced to step 0",
));
}

Expand Down Expand Up @@ -2112,18 +2112,18 @@ impl VdafOps {
)
})?;

// If the leader's request is on the same round as our stored aggregation job,
// then we probably have already received this message and computed this round,
// If the leader's request is on the same step as our stored aggregation job,
// then we probably have already received this message and computed this step,
// but the leader never got our response and so retried stepping the job.
// TODO(issue #1087): measure how often this happens with a Prometheus metric
if helper_aggregation_job.round() == leader_aggregation_job.round() {
if helper_aggregation_job.step() == leader_aggregation_job.step() {
match helper_aggregation_job.last_request_hash() {
None => {
return Err(datastore::Error::User(
Error::Internal(format!(
"aggregation job {aggregation_job_id} is in round {} but \
"aggregation job {aggregation_job_id} is on step {} but \
has no last request hash",
helper_aggregation_job.round(),
helper_aggregation_job.step(),
))
.into(),
));
Expand All @@ -2141,23 +2141,23 @@ impl VdafOps {
}
}
return Ok(Self::aggregation_job_resp_for(report_aggregations));
} else if helper_aggregation_job.round().increment()
!= leader_aggregation_job.round()
} else if helper_aggregation_job.step().increment()
!= leader_aggregation_job.step()
{
// If this is not a replay, the leader should be advancing our state to the next
// round and no further.
// step and no further.
return Err(datastore::Error::User(
Error::StepMismatch {
task_id: *task.id(),
aggregation_job_id,
expected_step: helper_aggregation_job.round().increment(),
got_step: leader_aggregation_job.round(),
expected_step: helper_aggregation_job.step().increment(),
got_step: leader_aggregation_job.step(),
}
.into(),
));
}

// The leader is advancing us to the next round. Step the aggregation job to
// The leader is advancing us to the next step. Step the aggregation job to
// compute the next round of prepare messages and state.
Self::step_aggregation_job(
tx,
Expand Down
2 changes: 1 addition & 1 deletion aggregator/src/aggregator/aggregate_init_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ async fn aggregation_job_mutation_report_aggregations() {
}

#[tokio::test]
async fn aggregation_job_init_two_round_vdaf_idempotence() {
async fn aggregation_job_init_two_step_vdaf_idempotence() {
Copy link
Collaborator

@divergentdave divergentdave Sep 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably keep this as-is. Poplar1 is two rounds, so this will still take one DAP step, just testing different code paths.

Suggested change
async fn aggregation_job_init_two_step_vdaf_idempotence() {
async fn aggregation_job_init_two_round_vdaf_idempotence() {

Edit: nevermind, I got the conversion from rounds to steps wrong.

// We must run Poplar1 in this test so that the aggregation job won't finish on the first step
let test_case = setup_poplar1_aggregate_init_test().await;

Expand Down
80 changes: 40 additions & 40 deletions aggregator/src/aggregator/aggregation_job_continue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ use tokio::try_join;
use tracing::trace_span;

impl VdafOps {
/// Step the helper's aggregation job to the next round of VDAF preparation using the round `n`
/// prepare state in `report_aggregations` with the round `n+1` broadcast prepare messages in
/// Step the helper's aggregation job to the next step of VDAF preparation using the step `n`
/// prepare state in `report_aggregations` with the step `n+1` broadcast prepare messages in
/// `leader_aggregation_job`.
pub(super) async fn step_aggregation_job<const SEED_SIZE: usize, C, Q, A>(
tx: &Transaction<'_, C>,
Expand Down Expand Up @@ -240,16 +240,16 @@ impl VdafOps {
)
});
let helper_aggregation_job = helper_aggregation_job
// Advance the job to the leader's round
.with_round(leader_aggregation_job.round())
// Advance the job to the leader's step
.with_step(leader_aggregation_job.step())
.with_state(match (saw_continue, saw_finish) {
(false, false) => AggregationJobState::Finished, // everything failed, or there were no reports
(true, false) => AggregationJobState::InProgress,
(false, true) => AggregationJobState::Finished,
(true, true) => {
return Err(datastore::Error::User(
Error::Internal(
"VDAF took an inconsistent number of rounds to reach Finish state"
"VDAF took an inconsistent number of steps to reach Finish state"
.to_string(),
)
.into(),
Expand Down Expand Up @@ -403,7 +403,7 @@ mod tests {
};
use janus_messages::{
query_type::TimeInterval, AggregationJobContinueReq, AggregationJobId, AggregationJobResp,
AggregationJobRound, Interval, PrepareContinue, PrepareResp, PrepareStepResult, Role,
AggregationJobStep, Interval, PrepareContinue, PrepareResp, PrepareStepResult, Role,
};
use prio::{
idpf::IdpfInput,
Expand Down Expand Up @@ -431,7 +431,7 @@ mod tests {
_ephemeral_datastore: EphemeralDatastore,
}

/// Set up a helper with an aggregation job in round 0
/// Set up a helper with an aggregation job in step 0
#[allow(clippy::unit_arg)]
async fn setup_aggregation_job_continue_test(
) -> AggregationJobContinueTestCase<VERIFY_KEY_LENGTH, Poplar1<XofShake128, 16>> {
Expand Down Expand Up @@ -490,7 +490,7 @@ mod tests {
(),
Interval::from_time(prepare_init.report_share().metadata().time()).unwrap(),
AggregationJobState::InProgress,
AggregationJobRound::from(0),
AggregationJobStep::from(0),
))
.await
.unwrap();
Expand Down Expand Up @@ -520,7 +520,7 @@ mod tests {
.unwrap();

let first_continue_request = AggregationJobContinueReq::new(
AggregationJobRound::from(1),
AggregationJobStep::from(1),
Vec::from([PrepareContinue::new(
*prepare_init.report_share().metadata().id(),
transcript.leader_prepare_transitions[1].message.clone(),
Expand Down Expand Up @@ -549,9 +549,9 @@ mod tests {
}
}

/// Set up a helper with an aggregation job in round 1.
/// Set up a helper with an aggregation job in step 1.
#[allow(clippy::unit_arg)]
async fn setup_aggregation_job_continue_round_recovery_test(
async fn setup_aggregation_job_continue_step_recovery_test(
) -> AggregationJobContinueTestCase<VERIFY_KEY_LENGTH, Poplar1<XofShake128, 16>> {
let mut test_case = setup_aggregation_job_continue_test().await;

Expand Down Expand Up @@ -581,20 +581,20 @@ mod tests {
}

#[tokio::test]
async fn aggregation_job_continue_round_zero() {
async fn aggregation_job_continue_step_zero() {
let test_case = setup_aggregation_job_continue_test().await;

// The job is initialized into round 0 but has never been continued. Send a continue request
// to advance to round 0. Should be rejected because that is an illegal transition.
let round_zero_request = AggregationJobContinueReq::new(
AggregationJobRound::from(0),
// The job is initialized into step 0 but has never been continued. Send a continue request
// to advance to step 0. Should be rejected because that is an illegal transition.
let step_zero_request = AggregationJobContinueReq::new(
AggregationJobStep::from(0),
test_case.first_continue_request.prepare_steps().to_vec(),
);

post_aggregation_job_expecting_error(
&test_case.task,
&test_case.aggregation_job_id,
&round_zero_request,
&step_zero_request,
&test_case.handler,
Status::BadRequest,
"urn:ietf:params:ppm:dap:error:invalidMessage",
Expand All @@ -604,8 +604,8 @@ mod tests {
}

#[tokio::test]
async fn aggregation_job_continue_round_recovery_replay_request() {
let test_case = setup_aggregation_job_continue_round_recovery_test().await;
async fn aggregation_job_continue_step_recovery_replay_request() {
let test_case = setup_aggregation_job_continue_step_recovery_test().await;

// Re-send the request, simulating the leader crashing and losing the helper's response. The
// helper should send back the exact same response.
Expand All @@ -624,8 +624,8 @@ mod tests {

#[tokio::test]
#[allow(clippy::unit_arg)]
async fn aggregation_job_continue_round_recovery_mutate_continue_request() {
let test_case = setup_aggregation_job_continue_round_recovery_test().await;
async fn aggregation_job_continue_step_recovery_mutate_continue_request() {
let test_case = setup_aggregation_job_continue_step_recovery_test().await;

let (unrelated_prepare_init, unrelated_transcript) = test_case
.prepare_init_generator
Expand Down Expand Up @@ -671,7 +671,7 @@ mod tests {
// Send another continue request for the same aggregation job, but with a different report
// ID.
let modified_request = AggregationJobContinueReq::new(
test_case.first_continue_request.round(),
test_case.first_continue_request.step(),
Vec::from([PrepareContinue::new(
*unrelated_prepare_init.report_share().metadata().id(),
unrelated_transcript.leader_prepare_transitions[1]
Expand Down Expand Up @@ -725,19 +725,19 @@ mod tests {
}

#[tokio::test]
async fn aggregation_job_continue_round_recovery_past_round() {
let test_case = setup_aggregation_job_continue_round_recovery_test().await;
async fn aggregation_job_continue_step_recovery_past_step() {
let test_case = setup_aggregation_job_continue_step_recovery_test().await;

test_case
.datastore
.run_tx(|tx| {
let (task_id, aggregation_job_id) =
(*test_case.task.id(), test_case.aggregation_job_id);
Box::pin(async move {
// This is a cheat: dummy_vdaf only has a single round, so we artificially force
// this job into round 2 so that we can send a request for round 1 and force a
// round mismatch error instead of tripping the check for a request to continue
// to round 0.
// This is a cheat: dummy_vdaf only has a single step, so we artificially force
// this job into step 2 so that we can send a request for step 1 and force a
// step mismatch error instead of tripping the check for a request to continue
// to step 0.
let aggregation_job = tx
.get_aggregation_job::<VERIFY_KEY_LENGTH, TimeInterval, Poplar1<XofShake128, 16>>(
&task_id,
Expand All @@ -746,7 +746,7 @@ mod tests {
.await
.unwrap()
.unwrap()
.with_round(AggregationJobRound::from(2));
.with_step(AggregationJobStep::from(2));

tx.update_aggregation_job(&aggregation_job).await.unwrap();

Expand All @@ -756,16 +756,16 @@ mod tests {
.await
.unwrap();

// Send another request for a round that the helper is past. Should fail.
let past_round_request = AggregationJobContinueReq::new(
AggregationJobRound::from(1),
// Send another request for a step that the helper is past. Should fail.
let past_step_request = AggregationJobContinueReq::new(
AggregationJobStep::from(1),
test_case.first_continue_request.prepare_steps().to_vec(),
);

post_aggregation_job_expecting_error(
&test_case.task,
&test_case.aggregation_job_id,
&past_round_request,
&past_step_request,
&test_case.handler,
Status::BadRequest,
"urn:ietf:params:ppm:dap:error:stepMismatch",
Expand All @@ -775,20 +775,20 @@ mod tests {
}

#[tokio::test]
async fn aggregation_job_continue_round_recovery_future_round() {
let test_case = setup_aggregation_job_continue_round_recovery_test().await;
async fn aggregation_job_continue_step_recovery_future_step() {
let test_case = setup_aggregation_job_continue_step_recovery_test().await;

// Send another request for a round too far past the helper's round. Should fail because the
// helper isn't on that round.
let future_round_request = AggregationJobContinueReq::new(
AggregationJobRound::from(17),
// Send another request for a step too far past the helper's step. Should fail because the
// helper isn't on that step.
let future_step_request = AggregationJobContinueReq::new(
AggregationJobStep::from(17),
test_case.first_continue_request.prepare_steps().to_vec(),
);

post_aggregation_job_expecting_error(
&test_case.task,
&test_case.aggregation_job_id,
&future_round_request,
&future_step_request,
&test_case.handler,
Status::BadRequest,
"urn:ietf:params:ppm:dap:error:stepMismatch",
Expand Down
Loading
Loading