Skip to content

Commit

Permalink
Comment cleanup (#2440)
Browse files Browse the repository at this point in the history
  • Loading branch information
divergentdave authored Jan 5, 2024
1 parent 313fa72 commit 74fb29b
Show file tree
Hide file tree
Showing 10 changed files with 73 additions and 49 deletions.
50 changes: 29 additions & 21 deletions aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,11 @@ pub struct Aggregator<C: Clock> {
task_aggregators: Mutex<HashMap<TaskId, Arc<TaskAggregator<C>>>>,

// Metrics.
/// Counter tracking the number of failed decryptions while handling the /upload endpoint.
/// Counter tracking the number of failed decryptions while handling the
/// `tasks/{task-id}/reports` endpoint.
upload_decrypt_failure_counter: Counter<u64>,
/// Counter tracking the number of failed message decodes while handling the /upload endpoint.
/// Counter tracking the number of failed message decodes while handling the
/// `tasks/{task-id}/reports` endpoint.
upload_decode_failure_counter: Counter<u64>,
/// Counters tracking the number of failures to step client reports through the aggregation
/// process.
Expand All @@ -192,8 +194,8 @@ pub struct Config {
pub max_upload_batch_size: usize,

/// Defines the maximum delay before writing a batch of uploaded reports, even if it has not yet
/// reached `max_batch_upload_size`. This is the maximum delay added to the /upload endpoint due
/// to write-batching.
/// reached `max_batch_upload_size`. This is the maximum delay added to the
/// `tasks/{task-id}/reports` endpoint due to write-batching.
pub max_upload_batch_write_delay: StdDuration,

/// Defines the number of shards to break each batch aggregation into. Increasing this value
Expand Down Expand Up @@ -235,14 +237,18 @@ impl<C: Clock> Aggregator<C> {

let upload_decrypt_failure_counter = meter
.u64_counter("janus_upload_decrypt_failures")
.with_description("Number of decryption failures in the /upload endpoint.")
.with_description(
"Number of decryption failures in the tasks/{task-id}/reports endpoint.",
)
.with_unit(Unit::new("{error}"))
.init();
upload_decrypt_failure_counter.add(0, &[]);

let upload_decode_failure_counter = meter
.u64_counter("janus_upload_decode_failures")
.with_description("Number of message decode failures in the /upload endpoint.")
.with_description(
"Number of message decode failures in the tasks/{task-id}/reports endpoint.",
)
.with_unit(Unit::new("{error}"))
.init();
upload_decode_failure_counter.add(0, &[]);
Expand Down Expand Up @@ -781,8 +787,8 @@ impl<C: Clock> Aggregator<C> {
}

/// TaskAggregator provides aggregation functionality for a single task.
// TODO(#224): refactor Aggregator to perform indepedent batched operations (e.g. report handling in
// Aggregate requests) using a parallelized library like Rayon.
// TODO(#1307): refactor Aggregator to perform indepedent batched operations (e.g. report handling
// in Aggregate requests) using a parallelized library like Rayon.
pub struct TaskAggregator<C: Clock> {
/// The task being aggregated.
task: Arc<AggregatorTask>,
Expand Down Expand Up @@ -1266,8 +1272,9 @@ impl VdafOps {
}
}

/// Implements the `/aggregate` endpoint for initialization requests for the helper, described
/// in §4.4.4.1 & §4.4.4.2 of draft-gpew-priv-ppm.
/// Implements [helper aggregate initialization][1].
///
/// [1]: https://www.ietf.org/archive/id/draft-ietf-ppm-dap-07.html#name-helper-initialization
#[tracing::instrument(
skip(self, datastore, global_hpke_keypairs, aggregate_step_failure_counter, task, req_bytes),
fields(task_id = ?task.id()),
Expand Down Expand Up @@ -1433,10 +1440,10 @@ impl VdafOps {
}

// Decode (and in the case of the leader input share, decrypt) the remaining fields of the
// report before storing them in the datastore. The spec does not require the /upload
// handler to do this, but it exercises HPKE decryption, saves us the trouble of storing
// reports we can't use, and lets the aggregation job handler assume the values it reads
// from the datastore are valid.
// report before storing them in the datastore. The spec does not require the
// `tasks/{task-id}/reports` handler to do this, but it exercises HPKE decryption, saves us
// the trouble of storing reports we can't use, and lets the aggregation job handler assume
// the values it reads from the datastore are valid.
let public_share =
match A::PublicShare::get_decoded_with_param(vdaf.as_ref(), report.public_share()) {
Ok(public_share) => public_share,
Expand Down Expand Up @@ -1613,8 +1620,9 @@ impl VdafOps {
Ok(existing_aggregation_job.eq(incoming_aggregation_job))
}

/// Implements the aggregate initialization request portion of the `/aggregate` endpoint for the
/// helper, described in §4.4.4.1 of draft-gpew-priv-ppm.
/// Implements [helper aggregate initialization][1].
///
/// [1]: https://www.ietf.org/archive/id/draft-ietf-ppm-dap-07.html#name-helper-initialization
async fn handle_aggregate_init_generic<const SEED_SIZE: usize, Q, A, C>(
datastore: &Datastore<C>,
global_hpke_keypairs: &GlobalHpkeKeypairCache,
Expand Down Expand Up @@ -2178,7 +2186,7 @@ impl VdafOps {
}

// TODO(#224): don't hold DB transaction open while computing VDAF updates?
// TODO(#224): don't do O(n) network round-trips (where n is the number of prepare steps)
// TODO(#1035): don't do O(n) network round-trips (where n is the number of prepare steps)
Ok(datastore
.run_tx("aggregate_continue", |tx| {
let (
Expand Down Expand Up @@ -2581,9 +2589,9 @@ impl VdafOps {
.await?)
}

/// Handle GET requests to a collection job URI obtained from the leader's `/collect` endpoint.
/// The return value is an encoded `CollectResp<Q>`.
/// https://www.ietf.org/archive/id/draft-ietf-ppm-dap-02.html#section-4.5.1
/// Handle GET requests to the leader's `tasks/{task-id}/collection_jobs/{collection-job-id}`
/// endpoint. The return value is an encoded `CollectResp<Q>`.
/// <https://www.ietf.org/archive/id/draft-ietf-ppm-dap-07.html#name-collecting-results>
#[tracing::instrument(skip(self, datastore, task), fields(task_id = ?task.id()), err)]
async fn handle_get_collection_job<C: Clock>(
&self,
Expand Down Expand Up @@ -2830,7 +2838,7 @@ impl VdafOps {
Ok(())
}

/// Implements the `/aggregate_share` endpoint for the helper, described in §4.4.4.3
/// Implements the `tasks/{task-id}/aggregate_shares` endpoint for the helper.
#[tracing::instrument(
skip(self, datastore, clock, task, req_bytes),
fields(task_id = ?task.id()),
Expand Down
3 changes: 1 addition & 2 deletions aggregator/src/aggregator/aggregation_job_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ use trillium_tokio::{CloneCounterObserver, Stopper};

use super::batch_creator::BatchCreator;

// TODO(#680): add metrics to aggregation job creator.
pub struct AggregationJobCreator<C: Clock> {
// Dependencies.
datastore: Datastore<C>,
Expand Down Expand Up @@ -87,7 +86,7 @@ impl<C: Clock + 'static> AggregationJobCreator<C> {
}

pub async fn run(self: Arc<Self>, stopper: Stopper) {
// TODO(#224): add support for handling only a subset of tasks in a single job (i.e. sharding).
// TODO(#1393): add support for handling only a subset of tasks in a single job (i.e. sharding).

// Create metric instruments.
let task_update_time_histogram = self
Expand Down
4 changes: 2 additions & 2 deletions aggregator/src/aggregator/http_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl Handler for Error {
/// long clients will cache the results of CORS preflight requests. Of popular browsers, Mozilla
/// Firefox has the highest Max-Age cap, at 24 hours, so we use that. Our CORS preflight handlers
/// are tightly scoped to relevant endpoints, and our CORS settings are unlikely to change.
/// See: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Access-Control-Max-Age
/// See: <https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Access-Control-Max-Age>.
const CORS_PREFLIGHT_CACHE_AGE: u32 = 24 * 60 * 60;

/// Wrapper around a type that implements [`Encode`]. It acts as a Trillium handler, encoding the
Expand Down Expand Up @@ -1414,7 +1414,7 @@ mod tests {
);
}

// Helper should not expose /upload endpoint
// Helper should not expose `tasks/{task-id}/reports` endpoint.
#[tokio::test]
async fn upload_handler_helper() {
let (clock, _ephemeral_datastore, datastore, handler) = setup_http_handler_test().await;
Expand Down
1 change: 0 additions & 1 deletion aggregator/src/binaries/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,6 @@ pub struct Config {

/// Address on which this server should listen for connections to the DAP aggregator API and
/// serve its API endpoints.
// TODO(#232): options for terminating TLS, unless that gets handled in a load balancer?
pub listen_address: SocketAddr,

/// How to serve the Janus aggregator API. If not set, the aggregator API is not served.
Expand Down
5 changes: 2 additions & 3 deletions aggregator/src/binary_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,9 +335,8 @@ async fn get_traceconfigz(
})
}

/// Allows modifying the runtime tracing filter. Accepts a request with a body corresponding to
/// [`TraceconfigzBody`]. If the `filter` field is empty, the filter will fallback to `error`.
/// See [`EnvFilter::try_new`] for details.
/// Allows modifying the runtime tracing filter. Accepts a request with a body containing a filter
/// expression. See [`EnvFilter::try_new`] for details.
async fn put_traceconfigz(
conn: &mut trillium::Conn,
(State(trace_reload_handle), request): (State<Arc<TraceReloadHandle>>, String),
Expand Down
5 changes: 1 addition & 4 deletions aggregator_core/src/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ pub mod test_util;
#[cfg(test)]
mod tests;

// TODO(#196): retry network-related & other transient failures once we know what they look like

/// This macro stamps out an array of schema versions supported by this version of Janus and an
/// [`rstest_reuse`][1] template that can be applied to tests to have them run against all supported
/// schema versions.
Expand Down Expand Up @@ -790,7 +788,7 @@ impl<C: Clock> Transaction<'_, C> {
.collect::<Result<_, _>>()
}

/// Construct a [`Task`] from the contents of the provided (tasks) `Row` and
/// Construct an [`AggregatorTask`] from the contents of the provided (tasks) `Row` and
/// `task_hpke_keys` rows.
fn task_from_rows(
&self,
Expand Down Expand Up @@ -3837,7 +3835,6 @@ impl<C: Clock> Transaction<'_, C> {
task_id: &TaskId,
min_report_count: u64,
) -> Result<Option<BatchId>, Error> {
// TODO(#1467): fix this to work in presence of GC.
let stmt = self
.prepare_cached(
"WITH batches AS (
Expand Down
24 changes: 15 additions & 9 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,10 @@ impl ClientParameters {
}
}

/// URL from which the HPKE configuration for the server filling `role` may be fetched per
/// draft-gpew-priv-ppm §4.3.1
/// URL from which the HPKE configuration for the server filling `role` may be fetched, per
/// the [DAP specification][1].
///
/// [1]: https://www.ietf.org/archive/id/draft-ietf-ppm-dap-07.html#name-hpke-configuration-request
fn hpke_config_endpoint(&self, role: &Role) -> Result<Url, Error> {
Ok(self.aggregator_endpoint(role)?.join("hpke_config")?)
}
Expand Down Expand Up @@ -346,8 +348,8 @@ impl<V: vdaf::Client<16>> Client<V> {
)
}

/// Shard a measurement, encrypt its shares, and construct a [`janus_core::message::Report`]
/// to be uploaded.
/// Shard a measurement, encrypt its shares, and construct a [`janus_messages::Report`] to be
/// uploaded.
fn prepare_report(&self, measurement: &V::Measurement, time: &Time) -> Result<Report, Error> {
let report_id: ReportId = random();
let (public_share, input_shares) = self.vdaf.shard(measurement, report_id.as_ref())?;
Expand Down Expand Up @@ -393,17 +395,21 @@ impl<V: vdaf::Client<16>> Client<V> {
))
}

/// Upload a [`Report`] to the leader, per §4.3.2 of draft-gpew-priv-ppm. The provided
/// measurement is sharded into two shares and then uploaded to the leader.
/// Upload a [`Report`] to the leader, per the [DAP specification][1]. The provided measurement
/// is sharded into two shares and then uploaded to the leader.
///
/// [1]: https://www.ietf.org/archive/id/draft-ietf-ppm-dap-07.html#name-uploading-reports
#[tracing::instrument(skip(measurement), err)]
pub async fn upload(&self, measurement: &V::Measurement) -> Result<(), Error> {
self.upload_with_time(measurement, Clock::now(&RealClock::default()))
.await
}

/// Upload a [`Report`] to the leader, per §4.3.2 of draft-gpew-priv-ppm, and override the
/// report's timestamp. The provided measurement is sharded into two shares and then uploaded to
/// the leader.
/// Upload a [`Report`] to the leader, per the [DAP specification][1], and override the report's
/// timestamp. The provided measurement is sharded into two shares and then uploaded to the
/// leader.
///
/// [1]: https://www.ietf.org/archive/id/draft-ietf-ppm-dap-07.html#name-uploading-reports
///
/// ```no_run
/// # use janus_client::{Client, Error};
Expand Down
5 changes: 3 additions & 2 deletions core/src/auth_tokens.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,9 @@ impl BearerToken {
&self.0
}

/// Validate that a bearer token value matches the format in
/// https://datatracker.ietf.org/doc/html/rfc6750#section-2.1.
/// Validate that a bearer token value matches the format for [OAuth 2.0 bearer tokens][1].
///
/// [1]: https://datatracker.ietf.org/doc/html/rfc6750#section-2.1
fn validate(value: &str) -> Result<(), anyhow::Error> {
static REGEX: OnceLock<Regex> = OnceLock::new();

Expand Down
16 changes: 15 additions & 1 deletion integration_tests/tests/integration/divviup_ts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,18 @@ async fn janus_divviup_ts_histogram() {
.await;
}

// TODO(https://github.com/divviup/divviup-ts/issues/100): Test CountVec once it is implemented.
#[tokio::test(flavor = "multi_thread")]
async fn janus_divviup_ts_sumvec() {
install_test_trace_subscriber();

run_divviup_ts_integration_test(
"janus_divviup_ts_sumvec",
&container_client(),
VdafInstance::Prio3SumVec {
bits: 16,
length: 15,
chunk_length: 16,
},
)
.await;
}
9 changes: 5 additions & 4 deletions messages/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,12 +512,13 @@ impl Role {
matches!(self, Role::Leader | Role::Helper)
}

/// If this [`Role`] is one of the aggregators, returns the index at which
/// that aggregator's message or data can be found in various lists, or
/// `None` if the role is not an aggregator.
/// Returns a VDAF aggregator ID if this [`Role`] is one of the aggregators, or `None` if the
/// role is not an aggregator. This is also used in [draft-wang-ppm-dap-taskprov-04][1] and earlier
/// to index into the `aggregator_endpoints` array.
///
/// [1]: https://www.ietf.org/archive/id/draft-wang-ppm-dap-taskprov-04.html#section-3-4
pub fn index(&self) -> Option<usize> {
match self {
// draft-gpew-priv-ppm §4.2: the leader's endpoint MUST be the first
Role::Leader => Some(0),
Role::Helper => Some(1),
_ => None,
Expand Down

0 comments on commit 74fb29b

Please sign in to comment.