Skip to content

Commit

Permalink
PR review.
Browse files Browse the repository at this point in the history
  • Loading branch information
branlwyd committed Dec 13, 2024
1 parent 52fd456 commit 314c2eb
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 11 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions aggregator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ regex = { workspace = true }
reqwest = { workspace = true, features = ["json"] }
rustls = { workspace = true }
rustls-pemfile = { workspace = true }
retry-after.workspace = true
sec1.workspace = true
serde.workspace = true
serde_json.workspace = true
Expand Down
40 changes: 29 additions & 11 deletions aggregator/src/aggregator/aggregation_job_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,13 @@ use prio::{
};
use rayon::iter::{IndexedParallelIterator as _, IntoParallelIterator as _, ParallelIterator as _};
use reqwest::Method;
use std::{collections::HashSet, panic, str::FromStr, sync::Arc, time::Duration};
use retry_after::RetryAfter;
use std::{
collections::HashSet,
panic,
sync::Arc,
time::{Duration, UNIX_EPOCH},
};
use tokio::{join, sync::mpsc, try_join};
use tracing::{debug, error, info, info_span, trace_span, warn, Span};

Expand Down Expand Up @@ -885,7 +891,7 @@ where
aggregation_job: AggregationJob<SEED_SIZE, B, A>,
stepped_aggregations: Vec<SteppedAggregation<SEED_SIZE, A>>,
report_aggregations_to_write: Vec<WritableReportAggregation<SEED_SIZE, A>>,
retry_after: Option<&Duration>,
retry_after: Option<&RetryAfter>,
helper_resp: AggregationJobResp,
) -> Result<(), Error>
where
Expand All @@ -899,7 +905,6 @@ where
A::PublicShare: Send + Sync,
{
match helper_resp {
// TODO(#3436): implement asynchronous aggregation
AggregationJobResp::Processing => {
self.process_response_from_helper_pending(
datastore,
Expand Down Expand Up @@ -944,7 +949,7 @@ where
aggregation_job: AggregationJob<SEED_SIZE, B, A>,
stepped_aggregations: Vec<SteppedAggregation<SEED_SIZE, A>>,
mut report_aggregations_to_write: Vec<WritableReportAggregation<SEED_SIZE, A>>,
retry_after: Option<&Duration>,
retry_after: Option<&RetryAfter>,
) -> Result<(), Error>
where
A::AggregationParam: Send + Sync + Eq + PartialEq,
Expand Down Expand Up @@ -993,7 +998,8 @@ where
let aggregation_job_writer = Arc::new(aggregation_job_writer);

let retry_after = retry_after
.copied()
.map(|ra| retry_after_to_duration(datastore.clock(), ra))
.transpose()?
.or_else(|| Some(Duration::from_secs(60)));
let counters = datastore
.run_tx("process_response_from_helper_pending", |tx| {
Expand Down Expand Up @@ -1514,10 +1520,22 @@ struct SteppedAggregation<const SEED_SIZE: usize, A: vdaf::Aggregator<SEED_SIZE,
leader_state: PingPongState<SEED_SIZE, 16, A>,
}

fn parse_retry_after(header_value: &HeaderValue) -> Result<Duration, Error> {
let val = header_value
.to_str()
.map_err(|err| Error::BadRequest(err.to_string()))?;
let val = u64::from_str(val).map_err(|err| Error::BadRequest(err.to_string()))?;
Ok(Duration::from_secs(val))
fn parse_retry_after(header_value: &HeaderValue) -> Result<RetryAfter, Error> {
RetryAfter::try_from(header_value)
.map_err(|err| Error::BadRequest(format!("couldn't parse retry-after header: {err}")))
}

fn retry_after_to_duration<C: Clock>(clock: &C, retry_after: &RetryAfter) -> Result<Duration, Error> {
match retry_after {
RetryAfter::Delay(duration) => Ok(*duration),
RetryAfter::DateTime(next_retry_time) => {
let now = UNIX_EPOCH + Duration::from_secs(clock.now().as_seconds_since_epoch());
if &now > next_retry_time {
return Ok(Duration::ZERO);
}
next_retry_time
.duration_since(now)
.map_err(|err| Error::Internal(format!("computing retry-after duration: {err}")))
}
}
}
5 changes: 5 additions & 0 deletions aggregator_core/src/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,11 @@ impl<C: Clock> Datastore<C> {
(rslt, retry.load(Ordering::Relaxed))
}

/// Returns the clock in use by this datastore.
pub fn clock(&self) -> &C {
&self.clock
}

/// See [`Datastore::run_tx`]. This method provides a placeholder transaction name. It is useful
/// for tests where the transaction name is not important.
#[cfg(feature = "test-util")]
Expand Down
27 changes: 27 additions & 0 deletions aggregator_core/src/datastore/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -886,6 +886,7 @@ impl<const SEED_SIZE: usize, A: vdaf::Aggregator<SEED_SIZE, 16>> PartialEq
where
A::InputShare: PartialEq,
A::PrepareShare: PartialEq,
A::PrepareState: PartialEq,
A::PublicShare: PartialEq,
A::OutputShare: PartialEq,
{
Expand All @@ -909,6 +910,7 @@ impl<const SEED_SIZE: usize, A: vdaf::Aggregator<SEED_SIZE, 16>> Eq
where
A::InputShare: Eq,
A::PrepareShare: Eq,
A::PrepareState: Eq,
A::PublicShare: Eq,
A::OutputShare: Eq,
{
Expand Down Expand Up @@ -1112,6 +1114,7 @@ impl<const SEED_SIZE: usize, A: vdaf::Aggregator<SEED_SIZE, 16>> PartialEq
where
A::InputShare: PartialEq,
A::PrepareShare: PartialEq,
A::PrepareState: PartialEq,
A::PublicShare: PartialEq,
A::OutputShare: PartialEq,
{
Expand Down Expand Up @@ -1139,6 +1142,7 @@ where
&& lhs_leader_input_share == rhs_leader_input_share
&& lhs_helper_encrypted_input_share == rhs_helper_encrypted_input_share
}

(
Self::LeaderContinue {
transition: lhs_transition,
Expand All @@ -1147,6 +1151,26 @@ where
transition: rhs_transition,
},
) => lhs_transition == rhs_transition,

(
Self::LeaderPoll {
leader_state: lhs_leader_state,
},
Self::LeaderPoll {
leader_state: rhs_leader_state,
},
) => match (lhs_leader_state, rhs_leader_state) {
(
PingPongState::Continued(lhs_prepare_state),
PingPongState::Continued(rhs_prepare_state),
) => lhs_prepare_state == rhs_prepare_state,
(
PingPongState::Finished(lhs_output_share),
PingPongState::Finished(rhs_output_share),
) => lhs_output_share == rhs_output_share,
_ => false,
},

(
Self::HelperContinue {
prepare_state: lhs_state,
Expand All @@ -1155,6 +1179,7 @@ where
prepare_state: rhs_state,
},
) => lhs_state == rhs_state,

(
Self::Failed {
report_error: lhs_report_error,
Expand All @@ -1163,6 +1188,7 @@ where
report_error: rhs_report_error,
},
) => lhs_report_error == rhs_report_error,

_ => core::mem::discriminant(self) == core::mem::discriminant(other),
}
}
Expand All @@ -1177,6 +1203,7 @@ impl<const SEED_SIZE: usize, A: vdaf::Aggregator<SEED_SIZE, 16>> Eq
where
A::InputShare: Eq,
A::PrepareShare: Eq,
A::PrepareState: Eq,
A::PublicShare: Eq,
A::OutputShare: Eq,
{
Expand Down

0 comments on commit 314c2eb

Please sign in to comment.