From 8bed897b7e8d35d7c20c2f59e002ad9a8a9071fc Mon Sep 17 00:00:00 2001 From: David Cook Date: Wed, 1 May 2024 17:59:30 -0500 Subject: [PATCH] Set group_imports to StdExternalCrate --- .rustfmt.toml | 1 + aggregator/src/aggregator.rs | 80 ++++++++++--------- .../src/aggregator/aggregate_init_tests.rs | 20 ++--- aggregator/src/aggregator/aggregate_share.rs | 3 +- .../aggregator/aggregation_job_continue.rs | 51 ++++++------ .../src/aggregator/aggregation_job_creator.rs | 41 +++++----- .../src/aggregator/aggregation_job_driver.rs | 34 ++++---- .../aggregation_job_driver/tests.rs | 18 +++-- .../src/aggregator/aggregation_job_writer.rs | 6 +- aggregator/src/aggregator/batch_creator.rs | 16 ++-- .../src/aggregator/collection_job_driver.rs | 34 ++++---- .../src/aggregator/collection_job_tests.rs | 20 ++--- aggregator/src/aggregator/error.rs | 13 +-- .../src/aggregator/garbage_collector.rs | 15 ++-- aggregator/src/aggregator/http_handlers.rs | 22 ++--- .../http_handlers/tests/aggregate_share.rs | 13 +-- .../tests/aggregation_job_continue.rs | 22 ++--- .../tests/aggregation_job_init.rs | 21 ++--- .../http_handlers/tests/collection_job.rs | 13 +-- .../http_handlers/tests/hpke_config.rs | 28 ++++--- .../aggregator/http_handlers/tests/report.rs | 22 ++--- aggregator/src/aggregator/problem_details.rs | 12 +-- aggregator/src/aggregator/query_type.rs | 12 +-- aggregator/src/aggregator/report_writer.rs | 26 +++--- aggregator/src/aggregator/taskprov_tests.rs | 26 +++--- .../src/binaries/aggregation_job_creator.rs | 20 +++-- .../src/binaries/aggregation_job_driver.rs | 20 +++-- aggregator/src/binaries/aggregator.rs | 62 +++++++------- .../src/binaries/collection_job_driver.rs | 20 +++-- aggregator/src/binaries/garbage_collector.rs | 6 +- aggregator/src/binaries/janus_cli.rs | 58 +++++++------- aggregator/src/binary_utils.rs | 56 +++++++------ aggregator/src/binary_utils/job_driver.rs | 19 +++-- aggregator/src/cache.rs | 16 ++-- aggregator/src/config.rs | 21 +++-- aggregator/src/metrics.rs | 30 ++++--- aggregator/src/metrics/tests/prometheus.rs | 12 +-- aggregator/src/trace.rs | 8 +- .../tests/integration/graceful_shutdown.rs | 15 ++-- aggregator_api/src/lib.rs | 3 +- aggregator_api/src/routes.rs | 22 ++--- aggregator_api/src/tests.rs | 22 ++--- aggregator_core/src/datastore.rs | 62 +++++++------- aggregator_core/src/datastore/models.rs | 14 ++-- aggregator_core/src/datastore/test_util.rs | 19 ++--- aggregator_core/src/datastore/tests.rs | 61 +++++++------- aggregator_core/src/lib.rs | 9 +-- aggregator_core/src/query_type.rs | 25 +++--- aggregator_core/src/task.rs | 39 +++++---- aggregator_core/src/taskprov.rs | 6 +- client/src/lib.rs | 6 +- collector/src/credential.rs | 3 +- collector/src/lib.rs | 12 +-- core/src/auth_tokens.rs | 15 ++-- core/src/cli.rs | 3 +- core/src/hpke.rs | 22 ++--- core/src/http.rs | 6 +- core/src/lib.rs | 1 + core/src/retries.rs | 21 +++-- core/src/test_util/kubernetes.rs | 16 ++-- core/src/test_util/mod.rs | 3 +- core/src/test_util/runtime.rs | 10 ++- core/src/test_util/testcontainers.rs | 1 + core/src/time.rs | 8 +- core/src/vdaf.rs | 6 +- integration_tests/src/client.rs | 6 +- integration_tests/src/daphne.rs | 3 +- integration_tests/src/interop_api.rs | 3 +- integration_tests/src/janus.rs | 8 +- integration_tests/src/lib.rs | 3 +- integration_tests/tests/integration/common.rs | 3 +- integration_tests/tests/integration/daphne.rs | 12 +-- .../tests/integration/divviup_ts.rs | 12 +-- .../tests/integration/in_cluster.rs | 30 ++++--- integration_tests/tests/integration/janus.rs | 18 +++-- .../src/commands/janus_interop_aggregator.rs | 15 ++-- .../src/commands/janus_interop_client.rs | 14 ++-- .../src/commands/janus_interop_collector.rs | 22 ++--- interop_binaries/src/lib.rs | 30 +++---- interop_binaries/src/testcontainer.rs | 1 + interop_binaries/tests/end_to_end.rs | 8 +- messages/src/lib.rs | 21 ++--- messages/src/query_type.rs | 15 ++-- messages/src/taskprov.rs | 9 ++- messages/src/tests/aggregation.rs | 3 +- messages/src/tests/collection.rs | 3 +- messages/src/tests/common.rs | 3 +- messages/src/tests/hpke.rs | 3 +- tools/src/bin/collect.rs | 15 ++-- tools/src/bin/dap_decode.rs | 14 ++-- tools/src/bin/hpke_keygen.rs | 6 +- xtask/src/main.rs | 10 ++- 92 files changed, 913 insertions(+), 724 deletions(-) diff --git a/.rustfmt.toml b/.rustfmt.toml index dd55b0f42..b4f09bede 100644 --- a/.rustfmt.toml +++ b/.rustfmt.toml @@ -2,3 +2,4 @@ format_strings = true wrap_comments = true comment_width = 100 imports_granularity = "Crate" +group_imports = "StdExternalCrate" diff --git a/aggregator/src/aggregator.rs b/aggregator/src/aggregator.rs index e26c7e56d..00d0b3d34 100644 --- a/aggregator/src/aggregator.rs +++ b/aggregator/src/aggregator.rs @@ -1,27 +1,15 @@ //! Common functionality for DAP aggregators. -pub use crate::aggregator::error::Error; -use crate::{ - aggregator::{ - aggregate_share::compute_aggregate_share, - aggregation_job_writer::{ - AggregationJobWriter, AggregationJobWriterMetrics, InitialWrite, - ReportAggregationUpdate as _, WritableReportAggregation, - }, - error::{ - handle_ping_pong_error, BatchMismatch, OptOutReason, ReportRejection, - ReportRejectionReason, - }, - query_type::{CollectableQueryType, UploadableQueryType}, - report_writer::{ReportWriteBatcher, WritableReport}, - }, - cache::{ - GlobalHpkeKeypairCache, PeerAggregatorCache, TaskAggregatorCache, - TASK_AGGREGATOR_CACHE_DEFAULT_CAPACITY, TASK_AGGREGATOR_CACHE_DEFAULT_TTL, - }, - config::TaskprovConfig, - metrics::{aggregate_step_failure_counter, report_aggregation_success_counter}, +use std::{ + any::Any, + collections::{HashMap, HashSet}, + fmt::Debug, + hash::Hash, + panic::{catch_unwind, resume_unwind, AssertUnwindSafe}, + sync::{Arc, Mutex as SyncMutex}, + time::{Duration as StdDuration, Instant}, }; + use backoff::{backoff::Backoff, Notify}; use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; use bytes::Bytes; @@ -97,15 +85,6 @@ use ring::{ rand::SystemRandom, signature::{EcdsaKeyPair, Signature}, }; -use std::{ - any::Any, - collections::{HashMap, HashSet}, - fmt::Debug, - hash::Hash, - panic::{catch_unwind, resume_unwind, AssertUnwindSafe}, - sync::{Arc, Mutex as SyncMutex}, - time::{Duration as StdDuration, Instant}, -}; use tokio::{ sync::oneshot::{self, error::RecvError}, try_join, @@ -113,6 +92,29 @@ use tokio::{ use tracing::{debug, info, info_span, trace_span, warn, Level, Span}; use url::Url; +pub use crate::aggregator::error::Error; +use crate::{ + aggregator::{ + aggregate_share::compute_aggregate_share, + aggregation_job_writer::{ + AggregationJobWriter, AggregationJobWriterMetrics, InitialWrite, + ReportAggregationUpdate as _, WritableReportAggregation, + }, + error::{ + handle_ping_pong_error, BatchMismatch, OptOutReason, ReportRejection, + ReportRejectionReason, + }, + query_type::{CollectableQueryType, UploadableQueryType}, + report_writer::{ReportWriteBatcher, WritableReport}, + }, + cache::{ + GlobalHpkeKeypairCache, PeerAggregatorCache, TaskAggregatorCache, + TASK_AGGREGATOR_CACHE_DEFAULT_CAPACITY, TASK_AGGREGATOR_CACHE_DEFAULT_TTL, + }, + config::TaskprovConfig, + metrics::{aggregate_step_failure_counter, report_aggregation_success_counter}, +}; + #[cfg(test)] mod aggregate_init_tests; pub mod aggregate_share; @@ -3252,10 +3254,12 @@ async fn send_request_to_helper( #[cfg(feature = "test-util")] #[cfg_attr(docsrs, doc(cfg(feature = "test-util")))] pub(crate) mod test_util { - use crate::{aggregator::Config, binaries::aggregator::parse_pem_ec_private_key}; - use ring::signature::EcdsaKeyPair; use std::time::Duration; + use ring::signature::EcdsaKeyPair; + + use crate::{aggregator::Config, binaries::aggregator::parse_pem_ec_private_key}; + pub(crate) const BATCH_AGGREGATION_SHARD_COUNT: u64 = 32; /// HPKE config signing key for use in tests. @@ -3300,10 +3304,8 @@ uKFxOelIgsiZJXKZNCX0FBmrfpCkKklCcg== #[cfg(test)] mod tests { - use crate::aggregator::{ - error::ReportRejectionReason, test_util::default_aggregator_config, Aggregator, Config, - Error, - }; + use std::{collections::HashSet, iter, sync::Arc, time::Duration as StdDuration}; + use assert_matches::assert_matches; use futures::future::try_join_all; use janus_aggregator_core::{ @@ -3341,7 +3343,11 @@ mod tests { vdaf::{self, prio3::Prio3Count, Client as _}, }; use rand::random; - use std::{collections::HashSet, iter, sync::Arc, time::Duration as StdDuration}; + + use crate::aggregator::{ + error::ReportRejectionReason, test_util::default_aggregator_config, Aggregator, Config, + Error, + }; pub(super) fn create_report_custom( task: &AggregatorTask, diff --git a/aggregator/src/aggregator/aggregate_init_tests.rs b/aggregator/src/aggregator/aggregate_init_tests.rs index 36e8a884b..0f2cc2e13 100644 --- a/aggregator/src/aggregator/aggregate_init_tests.rs +++ b/aggregator/src/aggregator/aggregate_init_tests.rs @@ -1,11 +1,5 @@ -use crate::aggregator::{ - http_handlers::{ - aggregator_handler, - test_util::{decode_response_body, take_problem_details}, - }, - tests::generate_helper_report_share, - Config, -}; +use std::sync::Arc; + use assert_matches::assert_matches; use http::StatusCode; use janus_aggregator_core::{ @@ -39,10 +33,18 @@ use prio::{ }; use rand::random; use serde_json::json; -use std::sync::Arc; use trillium::{Handler, KnownHeaderName, Status}; use trillium_testing::{prelude::put, TestConn}; +use crate::aggregator::{ + http_handlers::{ + aggregator_handler, + test_util::{decode_response_body, take_problem_details}, + }, + tests::generate_helper_report_share, + Config, +}; + #[derive(Clone)] pub(super) struct PrepareInitGenerator where diff --git a/aggregator/src/aggregator/aggregate_share.rs b/aggregator/src/aggregator/aggregate_share.rs index 3024d3bbc..d094c84f6 100644 --- a/aggregator/src/aggregator/aggregate_share.rs +++ b/aggregator/src/aggregator/aggregate_share.rs @@ -1,6 +1,5 @@ //! Implements functionality for computing & validating aggregate shares. -use super::Error; use janus_aggregator_core::{ datastore::{ self, @@ -12,6 +11,8 @@ use janus_core::{report_id::ReportIdChecksumExt, time::IntervalExt as _}; use janus_messages::{query_type::QueryType, Interval, ReportIdChecksum}; use prio::vdaf::{self, Aggregatable}; +use super::Error; + /// Computes the aggregate share over the provided batch aggregations. /// /// The assumption is that all aggregation jobs contributing to those batch aggregations have been diff --git a/aggregator/src/aggregator/aggregation_job_continue.rs b/aggregator/src/aggregator/aggregation_job_continue.rs index de66f4db8..1be1cd010 100644 --- a/aggregator/src/aggregator/aggregation_job_continue.rs +++ b/aggregator/src/aggregator/aggregation_job_continue.rs @@ -1,10 +1,11 @@ //! Implements portions of aggregation job continuation for the helper. -use crate::aggregator::{ - aggregation_job_writer::{AggregationJobWriter, UpdateWrite, WritableReportAggregation}, - error::handle_ping_pong_error, - AggregatorMetrics, Error, VdafOps, +use std::{ + any::Any, + panic::{catch_unwind, resume_unwind, AssertUnwindSafe}, + sync::Arc, }; + use janus_aggregator_core::{ datastore::{ self, @@ -24,14 +25,15 @@ use prio::{ topology::ping_pong::{PingPongContinuedValue, PingPongState, PingPongTopology}, vdaf, }; -use std::{ - any::Any, - panic::{catch_unwind, resume_unwind, AssertUnwindSafe}, - sync::Arc, -}; use tokio::sync::oneshot::{self, error::RecvError}; use tracing::{info_span, trace_span, Span}; +use crate::aggregator::{ + aggregation_job_writer::{AggregationJobWriter, UpdateWrite, WritableReportAggregation}, + error::handle_ping_pong_error, + AggregatorMetrics, Error, VdafOps, +}; + impl VdafOps { /// Step the helper's aggregation job to the next step using the step `n` ping pong state in /// `report_aggregations` with the step `n+1` ping pong messages in `leader_aggregation_job`. @@ -276,7 +278,6 @@ impl VdafOps { #[cfg(feature = "test-util")] #[cfg_attr(docsrs, doc(cfg(feature = "test-util")))] pub mod test_util { - use crate::aggregator::http_handlers::test_util::{decode_response_body, take_problem_details}; use assert_matches::assert_matches; use janus_aggregator_core::task::test_util::Task; use janus_messages::{AggregationJobContinueReq, AggregationJobId, AggregationJobResp}; @@ -285,6 +286,8 @@ pub mod test_util { use trillium::{Handler, KnownHeaderName, Status}; use trillium_testing::{assert_headers, prelude::post, TestConn}; + use crate::aggregator::http_handlers::test_util::{decode_response_body, take_problem_details}; + async fn post_aggregation_job( task: &Task, aggregation_job_id: &AggregationJobId, @@ -371,18 +374,8 @@ pub mod test_util { #[cfg(test)] mod tests { - use crate::aggregator::{ - aggregate_init_tests::{put_aggregation_job, PrepareInitGenerator}, - aggregation_job_continue::test_util::{ - post_aggregation_job_and_decode, post_aggregation_job_expecting_error, - post_aggregation_job_expecting_status, - }, - http_handlers::{ - aggregator_handler, - test_util::{setup_http_handler_test, take_problem_details}, - }, - test_util::default_aggregator_config, - }; + use std::sync::Arc; + use janus_aggregator_core::{ datastore::{ models::{ @@ -418,10 +411,22 @@ mod tests { }; use rand::random; use serde_json::json; - use std::sync::Arc; use trillium::{Handler, Status}; use trillium_testing::prelude::delete; + use crate::aggregator::{ + aggregate_init_tests::{put_aggregation_job, PrepareInitGenerator}, + aggregation_job_continue::test_util::{ + post_aggregation_job_and_decode, post_aggregation_job_expecting_error, + post_aggregation_job_expecting_status, + }, + http_handlers::{ + aggregator_handler, + test_util::{setup_http_handler_test, take_problem_details}, + }, + test_util::default_aggregator_config, + }; + struct AggregationJobContinueTestCase< const VERIFY_KEY_LENGTH: usize, V: Aggregator, diff --git a/aggregator/src/aggregator/aggregation_job_creator.rs b/aggregator/src/aggregator/aggregation_job_creator.rs index 8c373fad9..263db93c7 100644 --- a/aggregator/src/aggregator/aggregation_job_creator.rs +++ b/aggregator/src/aggregator/aggregation_job_creator.rs @@ -1,7 +1,10 @@ -use crate::aggregator::{ - aggregation_job_writer::{AggregationJobWriter, InitialWrite}, - batch_creator::BatchCreator, +use std::{ + cmp::min, + collections::{HashMap, HashSet}, + sync::Arc, + time::Duration, }; + #[cfg(feature = "fpvec_bounded_l2")] use fixed::{ types::extra::{U15, U31}, @@ -53,12 +56,6 @@ use prio::{ }, }; use rand::{random, thread_rng, Rng}; -use std::{ - cmp::min, - collections::{HashMap, HashSet}, - sync::Arc, - time::Duration, -}; use tokio::{ time::{self, sleep_until, Instant, MissedTickBehavior}, try_join, @@ -66,6 +63,11 @@ use tokio::{ use tracing::{debug, error, info}; use trillium_tokio::{CloneCounterObserver, Stopper}; +use crate::aggregator::{ + aggregation_job_writer::{AggregationJobWriter, InitialWrite}, + batch_creator::BatchCreator, +}; + pub struct AggregationJobCreator { // Dependencies. datastore: Datastore, @@ -911,9 +913,15 @@ impl AggregationJobCreator { #[cfg(test)] mod tests { - use crate::aggregator::test_util::BATCH_AGGREGATION_SHARD_COUNT; + use std::{ + any::{Any, TypeId}, + collections::{HashMap, HashSet}, + hash::Hash, + iter, + sync::Arc, + time::Duration, + }; - use super::AggregationJobCreator; use futures::future::try_join_all; use janus_aggregator_core::{ datastore::{ @@ -946,17 +954,12 @@ mod tests { prio3::{Prio3, Prio3Count}, }; use rand::random; - use std::{ - any::{Any, TypeId}, - collections::{HashMap, HashSet}, - hash::Hash, - iter, - sync::Arc, - time::Duration, - }; use tokio::{task, time, try_join}; use trillium_tokio::Stopper; + use super::AggregationJobCreator; + use crate::aggregator::test_util::BATCH_AGGREGATION_SHARD_COUNT; + #[tokio::test] async fn aggregation_job_creator() { // This is a minimal test that AggregationJobCreator::run() will successfully find tasks & diff --git a/aggregator/src/aggregator/aggregation_job_driver.rs b/aggregator/src/aggregator/aggregation_job_driver.rs index 96c5ed591..377815147 100644 --- a/aggregator/src/aggregator/aggregation_job_driver.rs +++ b/aggregator/src/aggregator/aggregation_job_driver.rs @@ -1,13 +1,11 @@ -use crate::aggregator::{ - aggregate_step_failure_counter, - aggregation_job_writer::{ - AggregationJobWriter, AggregationJobWriterMetrics, UpdateWrite, WritableReportAggregation, - }, - error::handle_ping_pong_error, - http_handlers::AGGREGATION_JOB_ROUTE, - query_type::CollectableQueryType, - report_aggregation_success_counter, send_request_to_helper, Error, RequestBody, +use std::{ + any::Any, + collections::HashSet, + panic::{catch_unwind, resume_unwind, AssertUnwindSafe}, + sync::Arc, + time::Duration, }; + use anyhow::{anyhow, Result}; use backoff::backoff::Backoff; use bytes::Bytes; @@ -41,19 +39,23 @@ use prio::{ vdaf, }; use reqwest::Method; -use std::{ - any::Any, - collections::HashSet, - panic::{catch_unwind, resume_unwind, AssertUnwindSafe}, - sync::Arc, - time::Duration, -}; use tokio::{ sync::oneshot::{self, error::RecvError}, try_join, }; use tracing::{debug, error, info, info_span, trace_span, warn, Span}; +use crate::aggregator::{ + aggregate_step_failure_counter, + aggregation_job_writer::{ + AggregationJobWriter, AggregationJobWriterMetrics, UpdateWrite, WritableReportAggregation, + }, + error::handle_ping_pong_error, + http_handlers::AGGREGATION_JOB_ROUTE, + query_type::CollectableQueryType, + report_aggregation_success_counter, send_request_to_helper, Error, RequestBody, +}; + #[cfg(test)] mod tests; diff --git a/aggregator/src/aggregator/aggregation_job_driver/tests.rs b/aggregator/src/aggregator/aggregation_job_driver/tests.rs index ca144bc67..a99ce1389 100644 --- a/aggregator/src/aggregator/aggregation_job_driver/tests.rs +++ b/aggregator/src/aggregator/aggregation_job_driver/tests.rs @@ -1,10 +1,5 @@ -use crate::{ - aggregator::{ - aggregation_job_driver::AggregationJobDriver, test_util::BATCH_AGGREGATION_SHARD_COUNT, - Error, - }, - binary_utils::job_driver::JobDriver, -}; +use std::{sync::Arc, time::Duration as StdDuration}; + use assert_matches::assert_matches; use futures::future::join_all; use http::{header::CONTENT_TYPE, StatusCode}; @@ -51,9 +46,16 @@ use prio::{ }, }; use rand::random; -use std::{sync::Arc, time::Duration as StdDuration}; use trillium_tokio::Stopper; +use crate::{ + aggregator::{ + aggregation_job_driver::AggregationJobDriver, test_util::BATCH_AGGREGATION_SHARD_COUNT, + Error, + }, + binary_utils::job_driver::JobDriver, +}; + #[tokio::test] async fn aggregation_job_driver() { // This is a minimal test that AggregationJobDriver::run() will successfully find diff --git a/aggregator/src/aggregator/aggregation_job_writer.rs b/aggregator/src/aggregator/aggregation_job_writer.rs index d20827dac..56b79a652 100644 --- a/aggregator/src/aggregator/aggregation_job_writer.rs +++ b/aggregator/src/aggregator/aggregation_job_writer.rs @@ -1,7 +1,8 @@ //! In-memory accumulation of aggregation job (& report aggregation) writes, along with related //! batch aggregation writes. -use crate::Operation; +use std::{borrow::Cow, collections::HashMap, marker::PhantomData, sync::Arc}; + use async_trait::async_trait; use futures::future::try_join_all; use janus_aggregator_core::{ @@ -27,10 +28,11 @@ use janus_messages::{ use opentelemetry::{metrics::Counter, KeyValue}; use prio::{codec::Encode, vdaf}; use rand::{thread_rng, Rng as _}; -use std::{borrow::Cow, collections::HashMap, marker::PhantomData, sync::Arc}; use tokio::try_join; use tracing::{warn, Level}; +use crate::Operation; + /// Buffers pending writes to aggregation jobs and their report aggregations. pub struct AggregationJobWriter where diff --git a/aggregator/src/aggregator/batch_creator.rs b/aggregator/src/aggregator/batch_creator.rs index 5a82e3c67..1e94ac272 100644 --- a/aggregator/src/aggregator/batch_creator.rs +++ b/aggregator/src/aggregator/batch_creator.rs @@ -1,6 +1,12 @@ //! In-memory data structure to incrementally build fixed-size batches. -use crate::aggregator::aggregation_job_writer::{AggregationJobWriter, InitialWrite}; +use std::{ + cmp::{max, min, Ordering}, + collections::{binary_heap::PeekMut, hash_map, BinaryHeap, HashMap, HashSet, VecDeque}, + ops::RangeInclusive, + sync::Arc, +}; + use futures::future::try_join_all; use janus_aggregator_core::datastore::{ models::{ @@ -16,15 +22,11 @@ use janus_messages::{ }; use prio::{codec::Encode, vdaf::Aggregator}; use rand::random; -use std::{ - cmp::{max, min, Ordering}, - collections::{binary_heap::PeekMut, hash_map, BinaryHeap, HashMap, HashSet, VecDeque}, - ops::RangeInclusive, - sync::Arc, -}; use tokio::try_join; use tracing::debug; +use crate::aggregator::aggregation_job_writer::{AggregationJobWriter, InitialWrite}; + /// This data structure loads existing outstanding batches, incrementally assigns new reports to /// outstanding batches and aggregation jobs, and provides unused reports at the end. If time /// bucketing is enabled, reports will be separated by timestamp into different sets of outstanding diff --git a/aggregator/src/aggregator/collection_job_driver.rs b/aggregator/src/aggregator/collection_job_driver.rs index bdf8e213c..98dac27b7 100644 --- a/aggregator/src/aggregator/collection_job_driver.rs +++ b/aggregator/src/aggregator/collection_job_driver.rs @@ -1,10 +1,7 @@ //! Implements portions of collect sub-protocol for DAP leader and helper. -use crate::aggregator::{ - aggregate_share::compute_aggregate_share, empty_batch_aggregations, - http_handlers::AGGREGATE_SHARES_ROUTE, query_type::CollectableQueryType, - send_request_to_helper, Error, RequestBody, -}; +use std::{sync::Arc, time::Duration}; + use anyhow::bail; use backoff::backoff::Backoff; use bytes::Bytes; @@ -33,10 +30,15 @@ use prio::{ vdaf, }; use reqwest::Method; -use std::{sync::Arc, time::Duration}; use tokio::try_join; use tracing::{error, info, warn}; +use crate::aggregator::{ + aggregate_share::compute_aggregate_share, empty_batch_aggregations, + http_handlers::AGGREGATE_SHARES_ROUTE, query_type::CollectableQueryType, + send_request_to_helper, Error, RequestBody, +}; + /// Drives a collection job. #[derive(Derivative)] #[derivative(Debug)] @@ -783,14 +785,8 @@ impl RetryStrategy { #[cfg(test)] mod tests { - use crate::{ - aggregator::{ - collection_job_driver::{CollectionJobDriver, RetryStrategy}, - test_util::BATCH_AGGREGATION_SHARD_COUNT, - Error, - }, - binary_utils::job_driver::JobDriver, - }; + use std::{sync::Arc, time::Duration as StdDuration}; + use assert_matches::assert_matches; use http::{header::CONTENT_TYPE, StatusCode}; use janus_aggregator_core::{ @@ -826,9 +822,17 @@ mod tests { vdaf::dummy, }; use rand::random; - use std::{sync::Arc, time::Duration as StdDuration}; use trillium_tokio::Stopper; + use crate::{ + aggregator::{ + collection_job_driver::{CollectionJobDriver, RetryStrategy}, + test_util::BATCH_AGGREGATION_SHARD_COUNT, + Error, + }, + binary_utils::job_driver::JobDriver, + }; + async fn setup_collection_job_test_case( server: &mut mockito::Server, clock: MockClock, diff --git a/aggregator/src/aggregator/collection_job_tests.rs b/aggregator/src/aggregator/collection_job_tests.rs index 206df6d4e..8332d521e 100644 --- a/aggregator/src/aggregator/collection_job_tests.rs +++ b/aggregator/src/aggregator/collection_job_tests.rs @@ -1,11 +1,5 @@ -use crate::aggregator::{ - http_handlers::{ - aggregator_handler, - test_util::{decode_response_body, take_problem_details}, - }, - test_util::BATCH_AGGREGATION_SHARD_COUNT, - Config, -}; +use std::{collections::HashSet, sync::Arc}; + use http::StatusCode; use janus_aggregator_core::{ datastore::{ @@ -40,7 +34,6 @@ use prio::{ }; use rand::random; use serde_json::json; -use std::{collections::HashSet, sync::Arc}; use trillium::{Handler, KnownHeaderName, Status}; use trillium_testing::{ assert_headers, @@ -48,6 +41,15 @@ use trillium_testing::{ TestConn, }; +use crate::aggregator::{ + http_handlers::{ + aggregator_handler, + test_util::{decode_response_body, take_problem_details}, + }, + test_util::BATCH_AGGREGATION_SHARD_COUNT, + Config, +}; + pub(crate) struct CollectionJobTestCase { pub(super) task: Task, clock: MockClock, diff --git a/aggregator/src/aggregator/error.rs b/aggregator/src/aggregator/error.rs index 7f95c7aec..6e3156b0d 100644 --- a/aggregator/src/aggregator/error.rs +++ b/aggregator/src/aggregator/error.rs @@ -1,3 +1,10 @@ +use std::{ + fmt::{self, Display, Formatter}, + num::TryFromIntError, + ops::Deref, + sync::Arc, +}; + use janus_aggregator_core::{datastore, task}; use janus_core::http::HttpErrorResponse; use janus_messages::{ @@ -6,12 +13,6 @@ use janus_messages::{ }; use opentelemetry::{metrics::Counter, KeyValue}; use prio::{topology::ping_pong::PingPongError, vdaf::VdafError}; -use std::{ - fmt::{self, Display, Formatter}, - num::TryFromIntError, - ops::Deref, - sync::Arc, -}; use tracing::info; /// Errors returned by functions and methods in this module. diff --git a/aggregator/src/aggregator/garbage_collector.rs b/aggregator/src/aggregator/garbage_collector.rs index e3a4d55d3..ec2a29ec7 100644 --- a/aggregator/src/aggregator/garbage_collector.rs +++ b/aggregator/src/aggregator/garbage_collector.rs @@ -1,13 +1,14 @@ +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, +}; + use anyhow::{Context, Error, Result}; use futures::future::{join_all, try_join_all, OptionFuture}; use janus_aggregator_core::datastore::{self, Datastore}; use janus_core::time::Clock; use janus_messages::TaskId; use opentelemetry::metrics::{Counter, Meter, Unit}; -use std::sync::{ - atomic::{AtomicU64, Ordering}, - Arc, -}; use tokio::{sync::Semaphore, try_join}; use tracing::error; @@ -173,7 +174,8 @@ impl GarbageCollector { #[cfg(test)] mod tests { - use crate::aggregator::garbage_collector::GarbageCollector; + use std::sync::Arc; + use janus_aggregator_core::{ datastore::{ models::{ @@ -198,7 +200,8 @@ mod tests { }; use prio::vdaf::dummy; use rand::random; - use std::sync::Arc; + + use crate::aggregator::garbage_collector::GarbageCollector; const OLDEST_ALLOWED_REPORT_TIMESTAMP: Time = Time::from_seconds_since_epoch(1000); const REPORT_EXPIRY_AGE: Duration = Duration::from_seconds(500); diff --git a/aggregator/src/aggregator/http_handlers.rs b/aggregator/src/aggregator/http_handlers.rs index d244ab667..23449afaf 100644 --- a/aggregator/src/aggregator/http_handlers.rs +++ b/aggregator/src/aggregator/http_handlers.rs @@ -1,8 +1,5 @@ -use super::{ - error::{ArcError, ReportRejectionReason}, - Aggregator, Config, Error, -}; -use crate::aggregator::problem_details::{ProblemDetailsConnExt, ProblemDocument}; +use std::{borrow::Cow, io::Cursor, sync::Arc, time::Duration as StdDuration}; + use async_trait::async_trait; use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; use janus_aggregator_core::{datastore::Datastore, instrumented}; @@ -26,7 +23,6 @@ use opentelemetry::{ use prio::codec::Encode; use ring::digest::{digest, SHA256}; use serde::Deserialize; -use std::{borrow::Cow, io::Cursor, sync::Arc, time::Duration as StdDuration}; use tracing::warn; use trillium::{Conn, Handler, KnownHeaderName, Status}; use trillium_api::{api, State}; @@ -34,6 +30,12 @@ use trillium_caching_headers::{CacheControlDirective, CachingHeadersExt as _}; use trillium_opentelemetry::metrics; use trillium_router::{Router, RouterConnExt}; +use super::{ + error::{ArcError, ReportRejectionReason}, + Aggregator, Config, Error, +}; +use crate::aggregator::problem_details::{ProblemDetailsConnExt, ProblemDocument}; + #[cfg(test)] mod tests; @@ -708,8 +710,8 @@ fn parse_taskprov_header( #[cfg(feature = "test-util")] #[cfg_attr(docsrs, doc(cfg(feature = "test-util")))] pub mod test_util { - use super::aggregator_handler; - use crate::aggregator::test_util::default_aggregator_config; + use std::sync::Arc; + use janus_aggregator_core::{ datastore::{ test_util::{ephemeral_datastore, EphemeralDatastore}, @@ -722,10 +724,12 @@ pub mod test_util { time::MockClock, }; use janus_messages::codec::Decode; - use std::sync::Arc; use trillium::Handler; use trillium_testing::{assert_headers, TestConn}; + use super::aggregator_handler; + use crate::aggregator::test_util::default_aggregator_config; + pub async fn take_response_body(test_conn: &mut TestConn) -> Vec { test_conn .take_response_body() diff --git a/aggregator/src/aggregator/http_handlers/tests/aggregate_share.rs b/aggregator/src/aggregator/http_handlers/tests/aggregate_share.rs index 50d2d03b8..639b115af 100644 --- a/aggregator/src/aggregator/http_handlers/tests/aggregate_share.rs +++ b/aggregator/src/aggregator/http_handlers/tests/aggregate_share.rs @@ -1,9 +1,3 @@ -use crate::aggregator::{ - error::BatchMismatch, - http_handlers::test_util::{ - decode_response_body, setup_http_handler_test, take_problem_details, - }, -}; use assert_matches::assert_matches; use futures::future::try_join_all; use janus_aggregator_core::{ @@ -33,6 +27,13 @@ use serde_json::json; use trillium::{Handler, KnownHeaderName, Status}; use trillium_testing::{assert_headers, prelude::post, TestConn}; +use crate::aggregator::{ + error::BatchMismatch, + http_handlers::test_util::{ + decode_response_body, setup_http_handler_test, take_problem_details, + }, +}; + pub(crate) async fn post_aggregate_share_request( task: &Task, request: &AggregateShareReq, diff --git a/aggregator/src/aggregator/http_handlers/tests/aggregation_job_continue.rs b/aggregator/src/aggregator/http_handlers/tests/aggregation_job_continue.rs index d3d47a1bd..b6efb681e 100644 --- a/aggregator/src/aggregator/http_handlers/tests/aggregation_job_continue.rs +++ b/aggregator/src/aggregator/http_handlers/tests/aggregation_job_continue.rs @@ -1,12 +1,5 @@ -use crate::aggregator::{ - aggregation_job_continue::test_util::{ - post_aggregation_job_and_decode, post_aggregation_job_expecting_error, - }, - empty_batch_aggregations, - http_handlers::test_util::setup_http_handler_test, - test_util::BATCH_AGGREGATION_SHARD_COUNT, - tests::generate_helper_report_share, -}; +use std::sync::Arc; + use futures::future::try_join_all; use janus_aggregator_core::{ datastore::models::{ @@ -38,9 +31,18 @@ use prio::{ }, }; use rand::random; -use std::sync::Arc; use trillium::Status; +use crate::aggregator::{ + aggregation_job_continue::test_util::{ + post_aggregation_job_and_decode, post_aggregation_job_expecting_error, + }, + empty_batch_aggregations, + http_handlers::test_util::setup_http_handler_test, + test_util::BATCH_AGGREGATION_SHARD_COUNT, + tests::generate_helper_report_share, +}; + #[tokio::test] async fn aggregate_continue() { let (clock, _ephemeral_datastore, datastore, handler) = setup_http_handler_test().await; diff --git a/aggregator/src/aggregator/http_handlers/tests/aggregation_job_init.rs b/aggregator/src/aggregator/http_handlers/tests/aggregation_job_init.rs index 62b3e84fd..10b3820cb 100644 --- a/aggregator/src/aggregator/http_handlers/tests/aggregation_job_init.rs +++ b/aggregator/src/aggregator/http_handlers/tests/aggregation_job_init.rs @@ -1,13 +1,3 @@ -use crate::aggregator::{ - aggregate_init_tests::{put_aggregation_job, PrepareInitGenerator}, - empty_batch_aggregations, - http_handlers::{ - aggregator_handler, - test_util::{decode_response_body, setup_http_handler_test, take_problem_details}, - }, - test_util::{default_aggregator_config, BATCH_AGGREGATION_SHARD_COUNT}, - tests::{generate_helper_report_share, generate_helper_report_share_for_plaintext}, -}; use assert_matches::assert_matches; use futures::future::try_join_all; use janus_aggregator_core::{ @@ -42,6 +32,17 @@ use serde_json::json; use trillium::{KnownHeaderName, Status}; use trillium_testing::{assert_headers, prelude::put, TestConn}; +use crate::aggregator::{ + aggregate_init_tests::{put_aggregation_job, PrepareInitGenerator}, + empty_batch_aggregations, + http_handlers::{ + aggregator_handler, + test_util::{decode_response_body, setup_http_handler_test, take_problem_details}, + }, + test_util::{default_aggregator_config, BATCH_AGGREGATION_SHARD_COUNT}, + tests::{generate_helper_report_share, generate_helper_report_share_for_plaintext}, +}; + #[tokio::test] async fn aggregate_leader() { let (_, _ephemeral_datastore, datastore, handler) = setup_http_handler_test().await; diff --git a/aggregator/src/aggregator/http_handlers/tests/collection_job.rs b/aggregator/src/aggregator/http_handlers/tests/collection_job.rs index 9cc223610..483c401ff 100644 --- a/aggregator/src/aggregator/http_handlers/tests/collection_job.rs +++ b/aggregator/src/aggregator/http_handlers/tests/collection_job.rs @@ -1,9 +1,3 @@ -use crate::aggregator::{ - collection_job_tests::setup_collection_job_test_case, - http_handlers::test_util::{ - decode_response_body, setup_http_handler_test, take_problem_details, - }, -}; use janus_aggregator_core::{ datastore::models::{ BatchAggregation, BatchAggregationState, CollectionJob, CollectionJobState, @@ -31,6 +25,13 @@ use trillium_testing::{ prelude::{delete, post, put}, }; +use crate::aggregator::{ + collection_job_tests::setup_collection_job_test_case, + http_handlers::test_util::{ + decode_response_body, setup_http_handler_test, take_problem_details, + }, +}; + #[tokio::test] async fn collection_job_put_request_to_helper() { let test_case = setup_collection_job_test_case(Role::Helper, QueryType::TimeInterval).await; diff --git a/aggregator/src/aggregator/http_handlers/tests/hpke_config.rs b/aggregator/src/aggregator/http_handlers/tests/hpke_config.rs index e6d38fa50..751cb3c0b 100644 --- a/aggregator/src/aggregator/http_handlers/tests/hpke_config.rs +++ b/aggregator/src/aggregator/http_handlers/tests/hpke_config.rs @@ -1,15 +1,5 @@ -use crate::{ - aggregator::{ - http_handlers::{ - aggregator_handler_with_aggregator, - test_util::{setup_http_handler_test, take_problem_details, take_response_body}, - HPKE_CONFIG_SIGNATURE_HEADER, - }, - test_util::{hpke_config_signing_key, hpke_config_verification_key}, - Config, - }, - config::TaskprovConfig, -}; +use std::{collections::HashMap, sync::Arc}; + use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine as _}; use janus_aggregator_core::{ datastore::models::HpkeKeyState, @@ -28,10 +18,22 @@ use janus_messages::{HpkeConfigList, Role, TaskId}; use prio::codec::Decode as _; use rand::random; use serde_json::json; -use std::{collections::HashMap, sync::Arc}; use trillium::{KnownHeaderName, Status}; use trillium_testing::{assert_headers, prelude::get, TestConn}; +use crate::{ + aggregator::{ + http_handlers::{ + aggregator_handler_with_aggregator, + test_util::{setup_http_handler_test, take_problem_details, take_response_body}, + HPKE_CONFIG_SIGNATURE_HEADER, + }, + test_util::{hpke_config_signing_key, hpke_config_verification_key}, + Config, + }, + config::TaskprovConfig, +}; + #[tokio::test] async fn hpke_config() { let (_, _ephemeral_datastore, datastore, handler) = setup_http_handler_test().await; diff --git a/aggregator/src/aggregator/http_handlers/tests/report.rs b/aggregator/src/aggregator/http_handlers/tests/report.rs index cf5d9c885..127e586f5 100644 --- a/aggregator/src/aggregator/http_handlers/tests/report.rs +++ b/aggregator/src/aggregator/http_handlers/tests/report.rs @@ -1,12 +1,5 @@ -use crate::aggregator::{ - error::ReportRejectionReason, - http_handlers::{ - aggregator_handler, - test_util::{setup_http_handler_test, take_problem_details}, - }, - test_util::default_aggregator_config, - tests::{create_report, create_report_custom}, -}; +use std::{sync::Arc, time::Duration as StdDuration}; + use janus_aggregator_core::{ datastore::test_util::EphemeralDatastoreBuilder, task::{test_util::TaskBuilder, QueryType}, @@ -28,11 +21,20 @@ use janus_messages::{ use prio::codec::Encode; use rand::random; use serde_json::json; -use std::{sync::Arc, time::Duration as StdDuration}; use tokio::time::sleep; use trillium::{KnownHeaderName, Status}; use trillium_testing::{assert_headers, prelude::put, TestConn}; +use crate::aggregator::{ + error::ReportRejectionReason, + http_handlers::{ + aggregator_handler, + test_util::{setup_http_handler_test, take_problem_details}, + }, + test_util::default_aggregator_config, + tests::{create_report, create_report_custom}, +}; + #[tokio::test] async fn upload_handler() { async fn check_response( diff --git a/aggregator/src/aggregator/problem_details.rs b/aggregator/src/aggregator/problem_details.rs index 7996ad91b..5e253582a 100644 --- a/aggregator/src/aggregator/problem_details.rs +++ b/aggregator/src/aggregator/problem_details.rs @@ -118,10 +118,8 @@ impl ProblemDetailsConnExt for Conn { #[cfg(test)] mod tests { - use crate::aggregator::{ - error::{BatchMismatch, ReportRejection, ReportRejectionReason}, - send_request_to_helper, Error, RequestBody, - }; + use std::{borrow::Cow, sync::Arc}; + use assert_matches::assert_matches; use bytes::Bytes; use futures::future::join_all; @@ -138,9 +136,13 @@ mod tests { use opentelemetry::metrics::Unit; use rand::random; use reqwest::Client; - use std::{borrow::Cow, sync::Arc}; use trillium_testing::prelude::post; + use crate::aggregator::{ + error::{BatchMismatch, ReportRejection, ReportRejectionReason}, + send_request_to_helper, Error, RequestBody, + }; + #[test] fn dap_problem_type_round_trip() { for problem_type in [ diff --git a/aggregator/src/aggregator/query_type.rs b/aggregator/src/aggregator/query_type.rs index 365c7e0af..1574cfeda 100644 --- a/aggregator/src/aggregator/query_type.rs +++ b/aggregator/src/aggregator/query_type.rs @@ -1,7 +1,5 @@ -use super::{ - error::{ReportRejection, ReportRejectionReason}, - Error, -}; +use std::{collections::HashSet, hash::Hash}; + use async_trait::async_trait; use janus_aggregator_core::{ datastore::{self, models::LeaderStoredReport, Transaction}, @@ -14,7 +12,11 @@ use janus_messages::{ Role, }; use prio::vdaf; -use std::{collections::HashSet, hash::Hash}; + +use super::{ + error::{ReportRejection, ReportRejectionReason}, + Error, +}; #[async_trait] pub trait UploadableQueryType: QueryType { diff --git a/aggregator/src/aggregator/report_writer.rs b/aggregator/src/aggregator/report_writer.rs index 0db0e3b2c..6b8f80ac2 100644 --- a/aggregator/src/aggregator/report_writer.rs +++ b/aggregator/src/aggregator/report_writer.rs @@ -1,8 +1,12 @@ -use crate::aggregator::{ - error::{ReportRejection, ReportRejectionReason}, - query_type::UploadableQueryType, - Error, +use std::{ + collections::BTreeMap, + fmt::Debug, + marker::PhantomData, + mem::{replace, take}, + sync::{Arc, Mutex as StdMutex}, + time::Duration, }; + use async_trait::async_trait; use futures::future::{join_all, try_join_all}; use janus_aggregator_core::datastore::{ @@ -14,14 +18,6 @@ use janus_core::{time::Clock, Runtime}; use janus_messages::TaskId; use prio::vdaf; use rand::{thread_rng, Rng}; -use std::{ - collections::BTreeMap, - fmt::Debug, - marker::PhantomData, - mem::{replace, take}, - sync::{Arc, Mutex as StdMutex}, - time::Duration, -}; use tokio::{ select, sync::{mpsc, oneshot}, @@ -29,6 +25,12 @@ use tokio::{ }; use tracing::{debug, error}; +use crate::aggregator::{ + error::{ReportRejection, ReportRejectionReason}, + query_type::UploadableQueryType, + Error, +}; + type ReportResult = Result>, ReportRejection>; type ResultSender = oneshot::Sender>>; diff --git a/aggregator/src/aggregator/taskprov_tests.rs b/aggregator/src/aggregator/taskprov_tests.rs index 597d13a24..a32b7bb71 100644 --- a/aggregator/src/aggregator/taskprov_tests.rs +++ b/aggregator/src/aggregator/taskprov_tests.rs @@ -1,14 +1,5 @@ -use crate::{ - aggregator::{ - aggregate_init_tests::PrepareInitGenerator, - http_handlers::{ - aggregator_handler, - test_util::{decode_response_body, take_problem_details}, - }, - Config, - }, - config::TaskprovConfig, -}; +use std::sync::Arc; + use assert_matches::assert_matches; use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; use janus_aggregator_core::{ @@ -63,7 +54,6 @@ use prio::{ use rand::random; use ring::digest::{digest, SHA256}; use serde_json::json; -use std::sync::Arc; use trillium::{Handler, KnownHeaderName, Status}; use trillium_testing::{ assert_headers, @@ -71,6 +61,18 @@ use trillium_testing::{ }; use url::Url; +use crate::{ + aggregator::{ + aggregate_init_tests::PrepareInitGenerator, + http_handlers::{ + aggregator_handler, + test_util::{decode_response_body, take_problem_details}, + }, + Config, + }, + config::TaskprovConfig, +}; + type TestVdaf = Poplar1; pub struct TaskprovTestCase { diff --git a/aggregator/src/binaries/aggregation_job_creator.rs b/aggregator/src/binaries/aggregation_job_creator.rs index da7fe39b4..b99dc5a91 100644 --- a/aggregator/src/binaries/aggregation_job_creator.rs +++ b/aggregator/src/binaries/aggregation_job_creator.rs @@ -1,13 +1,15 @@ +use std::{sync::Arc, time::Duration}; + +use anyhow::Result; +use clap::Parser; +use janus_core::time::RealClock; +use serde::{Deserialize, Serialize}; + use crate::{ aggregator::aggregation_job_creator::AggregationJobCreator, binary_utils::{BinaryContext, BinaryOptions, CommonBinaryOptions}, config::{BinaryConfig, CommonConfig}, }; -use anyhow::Result; -use clap::Parser; -use janus_core::time::RealClock; -use serde::{Deserialize, Serialize}; -use std::{sync::Arc, time::Duration}; pub async fn main_callback(ctx: BinaryContext) -> Result<()> { // Start creating aggregation jobs. @@ -105,15 +107,17 @@ impl BinaryConfig for Config { #[cfg(test)] mod tests { + use std::net::{Ipv4Addr, SocketAddr}; + + use clap::CommandFactory; + use janus_core::test_util::roundtrip_encoding; + use super::{Config, Options}; use crate::config::{ default_max_transaction_retries, test_util::{generate_db_config, generate_metrics_config, generate_trace_config}, CommonConfig, }; - use clap::CommandFactory; - use janus_core::test_util::roundtrip_encoding; - use std::net::{Ipv4Addr, SocketAddr}; #[test] fn verify_app() { diff --git a/aggregator/src/binaries/aggregation_job_driver.rs b/aggregator/src/binaries/aggregation_job_driver.rs index a6efe0e59..9b37454aa 100644 --- a/aggregator/src/binaries/aggregation_job_driver.rs +++ b/aggregator/src/binaries/aggregation_job_driver.rs @@ -1,13 +1,15 @@ +use std::{fmt::Debug, sync::Arc, time::Duration}; + +use anyhow::{Context, Result}; +use clap::Parser; +use janus_core::{time::RealClock, TokioRuntime}; +use serde::{Deserialize, Serialize}; + use crate::{ aggregator::aggregation_job_driver::AggregationJobDriver, binary_utils::{job_driver::JobDriver, BinaryContext, BinaryOptions, CommonBinaryOptions}, config::{BinaryConfig, CommonConfig, JobDriverConfig, TaskprovConfig}, }; -use anyhow::{Context, Result}; -use clap::Parser; -use janus_core::{time::RealClock, TokioRuntime}; -use serde::{Deserialize, Serialize}; -use std::{fmt::Debug, sync::Arc, time::Duration}; pub async fn main_callback(ctx: BinaryContext) -> Result<()> { const CLIENT_USER_AGENT: &str = concat!( @@ -135,15 +137,17 @@ impl BinaryConfig for Config { #[cfg(test)] mod tests { + use std::net::{Ipv4Addr, SocketAddr}; + + use clap::CommandFactory; + use janus_core::test_util::roundtrip_encoding; + use super::{Config, Options}; use crate::config::{ default_max_transaction_retries, test_util::{generate_db_config, generate_metrics_config, generate_trace_config}, CommonConfig, JobDriverConfig, TaskprovConfig, }; - use clap::CommandFactory; - use janus_core::test_util::roundtrip_encoding; - use std::net::{Ipv4Addr, SocketAddr}; #[test] fn verify_app() { diff --git a/aggregator/src/binaries/aggregator.rs b/aggregator/src/binaries/aggregator.rs index cfa555f6e..3eab7b9c3 100644 --- a/aggregator/src/binaries/aggregator.rs +++ b/aggregator/src/binaries/aggregator.rs @@ -1,13 +1,12 @@ -use crate::{ - aggregator::{self, http_handlers::aggregator_handler}, - binaries::garbage_collector::run_garbage_collector, - binary_utils::{setup_server, BinaryContext, BinaryOptions, CommonBinaryOptions}, - cache::{ - GlobalHpkeKeypairCache, TASK_AGGREGATOR_CACHE_DEFAULT_CAPACITY, - TASK_AGGREGATOR_CACHE_DEFAULT_TTL, - }, - config::{BinaryConfig, CommonConfig, TaskprovConfig}, +use std::{ + future::{ready, Future}, + iter::Iterator, + net::SocketAddr, + pin::Pin, + sync::Arc, + time::Duration, }; + use anyhow::{anyhow, Context, Result}; use clap::Parser; use derivative::Derivative; @@ -21,20 +20,23 @@ use ring::{ }; use sec1::EcPrivateKey; use serde::{de, Deserialize, Deserializer, Serialize}; -use std::{ - future::{ready, Future}, - iter::Iterator, - net::SocketAddr, - pin::Pin, - sync::Arc, - time::Duration, -}; use tokio::{join, sync::watch}; use tracing::info; use trillium::Handler; use trillium_router::router; use url::Url; +use crate::{ + aggregator::{self, http_handlers::aggregator_handler}, + binaries::garbage_collector::run_garbage_collector, + binary_utils::{setup_server, BinaryContext, BinaryOptions, CommonBinaryOptions}, + cache::{ + GlobalHpkeKeypairCache, TASK_AGGREGATOR_CACHE_DEFAULT_CAPACITY, + TASK_AGGREGATOR_CACHE_DEFAULT_TTL, + }, + config::{BinaryConfig, CommonConfig, TaskprovConfig}, +}; + pub async fn main_callback(ctx: BinaryContext) -> Result<()> { let (sender, _) = watch::channel(None); run_aggregator(ctx, sender).await @@ -456,6 +458,20 @@ pub(crate) fn parse_pem_ec_private_key(ec_private_key_pem: &str) -> Result) -> Result<()> { const CLIENT_USER_AGENT: &str = concat!( @@ -170,15 +172,17 @@ impl BinaryConfig for Config { #[cfg(test)] mod tests { + use std::net::{Ipv4Addr, SocketAddr}; + + use clap::CommandFactory; + use janus_core::test_util::roundtrip_encoding; + use super::{Config, Options}; use crate::config::{ default_max_transaction_retries, test_util::{generate_db_config, generate_metrics_config, generate_trace_config}, CommonConfig, JobDriverConfig, }; - use clap::CommandFactory; - use janus_core::test_util::roundtrip_encoding; - use std::net::{Ipv4Addr, SocketAddr}; #[test] fn verify_app() { diff --git a/aggregator/src/binaries/garbage_collector.rs b/aggregator/src/binaries/garbage_collector.rs index 58f209434..d4d3c4a69 100644 --- a/aggregator/src/binaries/garbage_collector.rs +++ b/aggregator/src/binaries/garbage_collector.rs @@ -10,14 +10,13 @@ use tokio::time::interval; use tracing::error; use trillium_tokio::Stopper; +use super::aggregator::GarbageCollectorConfig; use crate::{ aggregator::garbage_collector::GarbageCollector, binary_utils::{BinaryContext, BinaryOptions, CommonBinaryOptions}, config::{BinaryConfig, CommonConfig}, }; -use super::aggregator::GarbageCollectorConfig; - pub async fn main_callback(ctx: BinaryContext) -> Result<()> { let BinaryContext { config, @@ -121,6 +120,7 @@ mod tests { use clap::CommandFactory; use janus_core::test_util::roundtrip_encoding; + use super::{Config, Options}; use crate::{ binaries::aggregator::GarbageCollectorConfig, config::{ @@ -130,8 +130,6 @@ mod tests { }, }; - use super::{Config, Options}; - #[test] fn verify_app() { Options::command().debug_assert(); diff --git a/aggregator/src/binaries/janus_cli.rs b/aggregator/src/binaries/janus_cli.rs index 8ac0c656a..1ae9d9544 100644 --- a/aggregator/src/binaries/janus_cli.rs +++ b/aggregator/src/binaries/janus_cli.rs @@ -1,10 +1,9 @@ -use crate::{ - binary_utils::{database_pool, datastore, initialize_rustls, read_config, CommonBinaryOptions}, - config::{BinaryConfig, CommonConfig}, - git_revision, - metrics::{install_metrics_exporter, MetricsExporterHandle}, - trace::{install_trace_subscriber, TraceGuards}, +use std::{ + collections::BTreeMap, + path::{Path, PathBuf}, + sync::{Arc, OnceLock}, }; + use anyhow::{anyhow, Context, Result}; use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; use clap::Parser; @@ -29,11 +28,6 @@ use prio::codec::Decode as _; use rand::{distributions::Standard, thread_rng, Rng}; use ring::aead::AES_128_GCM; use serde::{Deserialize, Serialize}; -use std::{ - collections::BTreeMap, - path::{Path, PathBuf}, - sync::{Arc, OnceLock}, -}; use tokio::{ fs, runtime::{self, Runtime}, @@ -41,6 +35,14 @@ use tokio::{ use tracing::{debug, info}; use url::Url; +use crate::{ + binary_utils::{database_pool, datastore, initialize_rustls, read_config, CommonBinaryOptions}, + config::{BinaryConfig, CommonConfig}, + git_revision, + metrics::{install_metrics_exporter, MetricsExporterHandle}, + trace::{install_trace_subscriber, TraceGuards}, +}; + pub fn run(command_line_options: CommandLineOptions) -> Result<()> { initialize_rustls(); @@ -735,18 +737,12 @@ impl From for LazyKubeClient { #[cfg(test)] mod tests { - use crate::{ - binaries::janus_cli::{ - fetch_datastore_keys, CommandLineOptions, ConfigFile, KubernetesSecretOptions, - LazyKubeClient, - }, - binary_utils::{initialize_rustls, CommonBinaryOptions}, - config::{ - default_max_transaction_retries, - test_util::{generate_db_config, generate_metrics_config, generate_trace_config}, - CommonConfig, - }, + use std::{ + collections::HashMap, + io::Write, + net::{Ipv4Addr, SocketAddr}, }; + use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; use clap::CommandFactory; use janus_aggregator_core::{ @@ -768,15 +764,23 @@ mod tests { use prio::codec::Decode; use rand::random; use ring::aead::{UnboundKey, AES_128_GCM}; - use std::{ - collections::HashMap, - io::Write, - net::{Ipv4Addr, SocketAddr}, - }; use tempfile::{tempdir, NamedTempFile}; use tokio::fs; use url::Url; + use crate::{ + binaries::janus_cli::{ + fetch_datastore_keys, CommandLineOptions, ConfigFile, KubernetesSecretOptions, + LazyKubeClient, + }, + binary_utils::{initialize_rustls, CommonBinaryOptions}, + config::{ + default_max_transaction_retries, + test_util::{generate_db_config, generate_metrics_config, generate_trace_config}, + CommonConfig, + }, + }; + #[test] fn verify_app() { CommandLineOptions::command().debug_assert() diff --git a/aggregator/src/binary_utils.rs b/aggregator/src/binary_utils.rs index 338f33ec9..d703594cf 100644 --- a/aggregator/src/binary_utils.rs +++ b/aggregator/src/binary_utils.rs @@ -2,12 +2,19 @@ pub mod job_driver; -use crate::{ - config::{BinaryConfig, DbConfig}, - git_revision, - metrics::install_metrics_exporter, - trace::{install_trace_subscriber, TraceReloadHandle}, +use std::{ + fmt::{self, Debug, Formatter}, + fs::{self, File}, + future::Future, + io::{self, BufReader}, + net::SocketAddr, + panic, + path::{Path, PathBuf}, + str::FromStr, + sync::Arc, + time::Duration, }; + use anyhow::{anyhow, Context as _, Result}; use backoff::{future::retry, ExponentialBackoff}; use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; @@ -21,18 +28,6 @@ use opentelemetry::metrics::{Meter, MetricsError}; use rayon::{ThreadPoolBuildError, ThreadPoolBuilder}; use ring::aead::{LessSafeKey, UnboundKey, AES_128_GCM}; use rustls::RootCertStore; -use std::{ - fmt::{self, Debug, Formatter}, - fs::{self, File}, - future::Future, - io::{self, BufReader}, - net::SocketAddr, - panic, - path::{Path, PathBuf}, - str::FromStr, - sync::Arc, - time::Duration, -}; use tokio::{runtime, sync::oneshot}; use tokio_postgres::NoTls; use tokio_postgres_rustls::MakeRustlsConnect; @@ -44,6 +39,13 @@ use trillium_head::Head; use trillium_router::Router; use trillium_tokio::Stopper; +use crate::{ + config::{BinaryConfig, DbConfig}, + git_revision, + metrics::install_metrics_exporter, + trace::{install_trace_subscriber, TraceReloadHandle}, +}; + /// Reads, parses, and returns the config referenced by the given options, or None if no config file /// path was set. pub fn read_config(options: &CommonBinaryOptions) -> Result { @@ -553,14 +555,8 @@ pub(crate) fn initialize_rustls() { #[cfg(test)] mod tests { - use crate::{ - aggregator::http_handlers::test_util::take_response_body, - binary_utils::{ - database_pool, initialize_rustls, register_database_pool_status_metrics, - zpages_handler, CommonBinaryOptions, - }, - config::DbConfig, - }; + use std::{collections::HashMap, fs}; + use clap::CommandFactory; use janus_aggregator_core::datastore::test_util::ephemeral_datastore; use janus_core::test_util::{ @@ -573,13 +569,21 @@ mod tests { runtime::Tokio, testing::metrics::InMemoryMetricsExporter, }; - use std::{collections::HashMap, fs}; use testcontainers::{core::Mount, runners::AsyncRunner, RunnableImage}; use tokio::task::spawn_blocking; use tracing_subscriber::{reload, EnvFilter}; use trillium::Status; use trillium_testing::prelude::*; + use crate::{ + aggregator::http_handlers::test_util::take_response_body, + binary_utils::{ + database_pool, initialize_rustls, register_database_pool_status_metrics, + zpages_handler, CommonBinaryOptions, + }, + config::DbConfig, + }; + #[test] fn verify_app() { CommonBinaryOptions::command().debug_assert() diff --git a/aggregator/src/binary_utils/job_driver.rs b/aggregator/src/binary_utils/job_driver.rs index a39422fa0..dae479f9d 100644 --- a/aggregator/src/binary_utils/job_driver.rs +++ b/aggregator/src/binary_utils/job_driver.rs @@ -1,5 +1,12 @@ //! Discovery and driving of jobs scheduled elsewhere. +use std::{ + fmt::{Debug, Display}, + future::Future, + sync::Arc, + time::Duration, +}; + use anyhow::Context as _; use chrono::NaiveDateTime; use janus_aggregator_core::datastore::{self, models::Lease}; @@ -9,12 +16,6 @@ use opentelemetry::{ KeyValue, }; use rand::{thread_rng, Rng}; -use std::{ - fmt::{Debug, Display}, - future::Future, - sync::Arc, - time::Duration, -}; use tokio::{ sync::{Semaphore, SemaphorePermit}, time::{self, Instant}, @@ -268,7 +269,8 @@ where #[cfg(test)] mod tests { - use super::JobDriver; + use std::{sync::Arc, time::Duration}; + use chrono::{DateTime, NaiveDateTime, Utc}; use janus_aggregator_core::{ datastore::{self, models::Lease}, @@ -282,10 +284,11 @@ mod tests { }; use janus_messages::{AggregationJobId, TaskId}; use rand::random; - use std::{sync::Arc, time::Duration}; use tokio::sync::Mutex; use trillium_tokio::Stopper; + use super::JobDriver; + #[tokio::test] async fn job_driver() { // This is a minimal test that JobDriver::run() will successfully find jobs & step them to diff --git a/aggregator/src/cache.rs b/aggregator/src/cache.rs index 92d927ca7..d28554903 100644 --- a/aggregator/src/cache.rs +++ b/aggregator/src/cache.rs @@ -1,6 +1,12 @@ //! Various in-memory caches that can be used by an aggregator. -use crate::aggregator::{report_writer::ReportWriteBatcher, Error, TaskAggregator}; +use std::{ + collections::HashMap, + fmt::Debug, + sync::{Arc, Mutex as StdMutex}, + time::{Duration, Instant}, +}; + use janus_aggregator_core::{ datastore::{models::HpkeKeyState, Datastore}, taskprov::PeerAggregator, @@ -12,16 +18,12 @@ use moka::{ ops::compute::Op, Entry, }; -use std::{ - collections::HashMap, - fmt::Debug, - sync::{Arc, Mutex as StdMutex}, - time::{Duration, Instant}, -}; use tokio::{spawn, task::JoinHandle, time::sleep}; use tracing::{debug, error}; use url::Url; +use crate::aggregator::{report_writer::ReportWriteBatcher, Error, TaskAggregator}; + type HpkeConfigs = Arc>; type HpkeKeypairs = HashMap>; diff --git a/aggregator/src/config.rs b/aggregator/src/config.rs index d8a6795e9..68194a815 100644 --- a/aggregator/src/config.rs +++ b/aggregator/src/config.rs @@ -1,17 +1,19 @@ //! Configuration for various Janus binaries. -use crate::{metrics::MetricsConfiguration, trace::TraceConfiguration}; -use backoff::{ExponentialBackoff, ExponentialBackoffBuilder}; -use derivative::Derivative; -use serde::{de::DeserializeOwned, Deserialize, Serialize}; use std::{ fmt::Debug, net::{IpAddr, Ipv4Addr, SocketAddr}, path::PathBuf, time::Duration, }; + +use backoff::{ExponentialBackoff, ExponentialBackoffBuilder}; +use derivative::Derivative; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; use url::Url; +use crate::{metrics::MetricsConfiguration, trace::TraceConfiguration}; + /// Configuration options common to all Janus binaries. /// /// # Examples @@ -244,6 +246,8 @@ impl JobDriverConfig { #[cfg(feature = "test-util")] #[cfg_attr(docsrs, doc(cfg(feature = "test-util")))] pub mod test_util { + use reqwest::Url; + use super::DbConfig; use crate::{ metrics::{MetricsConfiguration, MetricsExporterConfiguration}, @@ -252,7 +256,6 @@ pub mod test_util { TraceConfiguration, }, }; - use reqwest::Url; pub fn generate_db_config() -> DbConfig { DbConfig { @@ -295,6 +298,11 @@ pub mod test_util { #[cfg(test)] mod tests { + use std::net::{Ipv4Addr, SocketAddr}; + + use assert_matches::assert_matches; + use janus_core::test_util::roundtrip_encoding; + use crate::{ config::{ default_max_transaction_retries, @@ -304,9 +312,6 @@ mod tests { metrics::{HistogramScale, MetricsExporterConfiguration}, trace::OpenTelemetryTraceConfiguration, }; - use assert_matches::assert_matches; - use janus_core::test_util::roundtrip_encoding; - use std::net::{Ipv4Addr, SocketAddr}; #[test] fn roundtrip_db_config() { diff --git a/aggregator/src/metrics.rs b/aggregator/src/metrics.rs index 68bfa6863..b4b76bc64 100644 --- a/aggregator/src/metrics.rs +++ b/aggregator/src/metrics.rs @@ -1,14 +1,26 @@ //! Collection and exporting of application-level metrics for Janus. +use std::net::AddrParseError; + use anyhow::anyhow; use opentelemetry::{ metrics::{Counter, Meter, Unit}, KeyValue, }; use serde::{Deserialize, Serialize}; -use std::net::AddrParseError; use tokio::runtime::Runtime; - +#[cfg(any(feature = "otlp", feature = "prometheus"))] +use { + crate::git_revision, + janus_aggregator_core::datastore::TRANSACTION_RETRIES_METER_NAME, + opentelemetry::metrics::MetricsError, + opentelemetry_sdk::{ + metrics::{ + new_view, Aggregation, Instrument, InstrumentKind, SdkMeterProvider, Stream, View, + }, + Resource, + }, +}; #[cfg(feature = "prometheus")] use { anyhow::Context, @@ -21,7 +33,6 @@ use { tokio::{sync::oneshot, task::JoinHandle}, trillium::{Info, Init}, }; - #[cfg(feature = "otlp")] use { opentelemetry_otlp::WithExportConfig, @@ -34,19 +45,6 @@ use { }, }; -#[cfg(any(feature = "otlp", feature = "prometheus"))] -use { - crate::git_revision, - janus_aggregator_core::datastore::TRANSACTION_RETRIES_METER_NAME, - opentelemetry::metrics::MetricsError, - opentelemetry_sdk::{ - metrics::{ - new_view, Aggregation, Instrument, InstrumentKind, SdkMeterProvider, Stream, View, - }, - Resource, - }, -}; - #[cfg(all(tokio_unstable, feature = "prometheus"))] pub(crate) mod tokio_runtime; diff --git a/aggregator/src/metrics/tests/prometheus.rs b/aggregator/src/metrics/tests/prometheus.rs index 8845c373a..3787badb9 100644 --- a/aggregator/src/metrics/tests/prometheus.rs +++ b/aggregator/src/metrics/tests/prometheus.rs @@ -1,7 +1,5 @@ -use crate::{ - aggregator::{http_handlers::aggregator_handler, test_util::default_aggregator_config}, - metrics::{build_opentelemetry_prometheus_meter_provider, prometheus_metrics_server}, -}; +use std::{collections::HashMap, net::Ipv4Addr, sync::Arc}; + use http::StatusCode; use janus_aggregator_core::datastore::test_util::ephemeral_datastore; use janus_core::{ @@ -14,9 +12,13 @@ use prometheus::{ proto::{Metric, MetricType}, Registry, }; -use std::{collections::HashMap, net::Ipv4Addr, sync::Arc}; use trillium_testing::prelude::get; +use crate::{ + aggregator::{http_handlers::aggregator_handler, test_util::default_aggregator_config}, + metrics::{build_opentelemetry_prometheus_meter_provider, prometheus_metrics_server}, +}; + #[tokio::test] async fn prometheus_metrics_pull() { let registry = Registry::new(); diff --git a/aggregator/src/trace.rs b/aggregator/src/trace.rs index c6435db64..980723eca 100644 --- a/aggregator/src/trace.rs +++ b/aggregator/src/trace.rs @@ -1,10 +1,13 @@ //! Configures a tracing subscriber for Janus. -use serde::{Deserialize, Serialize}; use std::{ io::{stdout, IsTerminal}, net::SocketAddr, }; + +#[cfg(feature = "otlp")] +use opentelemetry_otlp::WithExportConfig; +use serde::{Deserialize, Serialize}; use tracing::Level; use tracing_chrome::{ChromeLayerBuilder, TraceStyle}; use tracing_log::LogTracer; @@ -12,9 +15,6 @@ use tracing_subscriber::{ filter::FromEnvError, layer::SubscriberExt, reload, EnvFilter, Layer, Registry, }; -#[cfg(feature = "otlp")] -use opentelemetry_otlp::WithExportConfig; - /// Errors from initializing trace subscriber. #[derive(Debug, thiserror::Error)] pub enum Error { diff --git a/aggregator/tests/integration/graceful_shutdown.rs b/aggregator/tests/integration/graceful_shutdown.rs index 65e1a612c..8bc6d716f 100644 --- a/aggregator/tests/integration/graceful_shutdown.rs +++ b/aggregator/tests/integration/graceful_shutdown.rs @@ -3,6 +3,14 @@ //! process. The process should promptly shut down, and this test will fail if //! it times out waiting for the process to do so. +use std::{ + future::Future, + io::{ErrorKind, Write}, + net::{Ipv4Addr, SocketAddr}, + process::{Child, Command, Stdio}, + time::Instant, +}; + use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; use janus_aggregator::{ binaries::{ @@ -26,13 +34,6 @@ use janus_aggregator_core::{ use janus_core::{test_util::install_test_trace_subscriber, time::RealClock, vdaf::VdafInstance}; use reqwest::Url; use serde::Serialize; -use std::{ - future::Future, - io::{ErrorKind, Write}, - net::{Ipv4Addr, SocketAddr}, - process::{Child, Command, Stdio}, - time::Instant, -}; use tokio::{ io::{AsyncBufReadExt, BufReader}, join, diff --git a/aggregator_api/src/lib.rs b/aggregator_api/src/lib.rs index 33d65e7d3..cc6d1f523 100644 --- a/aggregator_api/src/lib.rs +++ b/aggregator_api/src/lib.rs @@ -4,6 +4,8 @@ mod routes; #[cfg(test)] mod tests; +use std::{borrow::Cow, str::FromStr, sync::Arc}; + use async_trait::async_trait; use janus_aggregator_core::{ datastore::{self, Datastore}, @@ -13,7 +15,6 @@ use janus_core::{auth_tokens::AuthenticationToken, hpke, http::extract_bearer_to use janus_messages::{HpkeConfigId, RoleParseError, TaskId}; use opentelemetry::metrics::Meter; use routes::*; -use std::{borrow::Cow, str::FromStr, sync::Arc}; use tracing::error; use trillium::{ Conn, Handler, diff --git a/aggregator_api/src/routes.rs b/aggregator_api/src/routes.rs index 0ea23ede3..c83bd909a 100644 --- a/aggregator_api/src/routes.rs +++ b/aggregator_api/src/routes.rs @@ -1,12 +1,5 @@ -use crate::{ - models::{ - AggregatorApiConfig, AggregatorRole, DeleteTaskprovPeerAggregatorReq, GetTaskIdsResp, - GetTaskUploadMetricsResp, GlobalHpkeConfigResp, PatchGlobalHpkeConfigReq, PatchTaskReq, - PostTaskReq, PostTaskprovPeerAggregatorReq, PutGlobalHpkeConfigReq, SupportedVdaf, - TaskResp, TaskprovPeerAggregatorResp, - }, - Config, ConnExt, Error, -}; +use std::{str::FromStr, sync::Arc, unreachable}; + use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; use janus_aggregator_core::{ datastore::{self, Datastore}, @@ -24,10 +17,19 @@ use janus_messages::{ use querystring::querify; use rand::random; use ring::digest::{digest, SHA256}; -use std::{str::FromStr, sync::Arc, unreachable}; use trillium::{Conn, Status}; use trillium_api::{Json, State}; +use crate::{ + models::{ + AggregatorApiConfig, AggregatorRole, DeleteTaskprovPeerAggregatorReq, GetTaskIdsResp, + GetTaskUploadMetricsResp, GlobalHpkeConfigResp, PatchGlobalHpkeConfigReq, PatchTaskReq, + PostTaskReq, PostTaskprovPeerAggregatorReq, PutGlobalHpkeConfigReq, SupportedVdaf, + TaskResp, TaskprovPeerAggregatorResp, + }, + Config, ConnExt, Error, +}; + pub(super) async fn get_config( _: &mut Conn, State(config): State>, diff --git a/aggregator_api/src/tests.rs b/aggregator_api/src/tests.rs index 2aef115b7..74b8529f6 100644 --- a/aggregator_api/src/tests.rs +++ b/aggregator_api/src/tests.rs @@ -1,12 +1,5 @@ -use crate::{ - aggregator_api_handler, - models::{ - DeleteTaskprovPeerAggregatorReq, GetTaskIdsResp, GetTaskUploadMetricsResp, - GlobalHpkeConfigResp, PatchGlobalHpkeConfigReq, PostTaskReq, PostTaskprovPeerAggregatorReq, - PutGlobalHpkeConfigReq, TaskResp, TaskprovPeerAggregatorResp, - }, - Config, CONTENT_TYPE, -}; +use std::{iter, sync::Arc}; + use assert_matches::assert_matches; use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; use futures::future::try_join_all; @@ -41,7 +34,6 @@ use janus_messages::{ }; use rand::{distributions::Standard, random, thread_rng, Rng}; use serde_test::{assert_ser_tokens, assert_tokens, Token}; -use std::{iter, sync::Arc}; use trillium::{Handler, Status}; use trillium_testing::{ assert_response, assert_status, @@ -49,6 +41,16 @@ use trillium_testing::{ Url, }; +use crate::{ + aggregator_api_handler, + models::{ + DeleteTaskprovPeerAggregatorReq, GetTaskIdsResp, GetTaskUploadMetricsResp, + GlobalHpkeConfigResp, PatchGlobalHpkeConfigReq, PostTaskReq, PostTaskprovPeerAggregatorReq, + PutGlobalHpkeConfigReq, TaskResp, TaskprovPeerAggregatorResp, + }, + Config, CONTENT_TYPE, +}; + const AUTH_TOKEN: &str = "Y29sbGVjdG9yLWFiY2RlZjAw"; async fn setup_api_test() -> (impl Handler, EphemeralDatastore, Arc>) { diff --git a/aggregator_core/src/datastore.rs b/aggregator_core/src/datastore.rs index 6c1e7642d..78937bfe9 100644 --- a/aggregator_core/src/datastore.rs +++ b/aggregator_core/src/datastore.rs @@ -1,21 +1,21 @@ //! Janus datastore (durable storage) implementation. -use self::models::{ - AcquiredAggregationJob, AcquiredCollectionJob, AggregateShareJob, AggregationJob, - AggregatorRole, AuthenticationTokenType, BatchAggregation, BatchAggregationState, - BatchAggregationStateCode, CollectionJob, CollectionJobState, CollectionJobStateCode, - GlobalHpkeKeypair, HpkeKeyState, LeaderStoredReport, Lease, LeaseToken, OutstandingBatch, - ReportAggregation, ReportAggregationMetadata, ReportAggregationMetadataState, - ReportAggregationState, ReportAggregationStateCode, SqlInterval, TaskUploadCounter, -}; -#[cfg(feature = "test-util")] -use crate::VdafHasAggregationParameter; -use crate::{ - query_type::{AccumulableQueryType, CollectableQueryType}, - task::{self, AggregatorTask, AggregatorTaskParameters}, - taskprov::PeerAggregator, - SecretBytes, +use std::{ + collections::HashMap, + convert::TryFrom, + fmt::{Debug, Display}, + future::Future, + io::Cursor, + mem::size_of, + ops::RangeInclusive, + pin::Pin, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Mutex, + }, + time::{Duration as StdDuration, Instant}, }; + use chrono::NaiveDateTime; use futures::future::try_join_all; use janus_core::{ @@ -42,26 +42,28 @@ use prio::{ }; use rand::random; use ring::aead::{self, LessSafeKey, AES_128_GCM}; -use std::{ - collections::HashMap, - convert::TryFrom, - fmt::{Debug, Display}, - future::Future, - io::Cursor, - mem::size_of, - ops::RangeInclusive, - pin::Pin, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, Mutex, - }, - time::{Duration as StdDuration, Instant}, -}; use tokio::{sync::Barrier, try_join}; use tokio_postgres::{error::SqlState, row::RowIndex, IsolationLevel, Row, Statement, ToStatement}; use tracing::{error, Level}; use url::Url; +use self::models::{ + AcquiredAggregationJob, AcquiredCollectionJob, AggregateShareJob, AggregationJob, + AggregatorRole, AuthenticationTokenType, BatchAggregation, BatchAggregationState, + BatchAggregationStateCode, CollectionJob, CollectionJobState, CollectionJobStateCode, + GlobalHpkeKeypair, HpkeKeyState, LeaderStoredReport, Lease, LeaseToken, OutstandingBatch, + ReportAggregation, ReportAggregationMetadata, ReportAggregationMetadataState, + ReportAggregationState, ReportAggregationStateCode, SqlInterval, TaskUploadCounter, +}; +#[cfg(feature = "test-util")] +use crate::VdafHasAggregationParameter; +use crate::{ + query_type::{AccumulableQueryType, CollectableQueryType}, + task::{self, AggregatorTask, AggregatorTaskParameters}, + taskprov::PeerAggregator, + SecretBytes, +}; + pub mod models; #[cfg(feature = "test-util")] #[cfg_attr(docsrs, doc(cfg(feature = "test-util")))] diff --git a/aggregator_core/src/datastore/models.rs b/aggregator_core/src/datastore/models.rs index 61a7aa2df..182b98481 100644 --- a/aggregator_core/src/datastore/models.rs +++ b/aggregator_core/src/datastore/models.rs @@ -1,6 +1,11 @@ //! This module contains models used by the datastore that are not DAP messages. -use crate::{datastore::Error, task}; +use std::{ + fmt::{Debug, Display, Formatter}, + hash::Hash, + ops::RangeInclusive, +}; + use base64::{display::Base64Display, engine::general_purpose::URL_SAFE_NO_PAD}; use chrono::NaiveDateTime; use clap::ValueEnum; @@ -29,11 +34,8 @@ use prio::{ }; use rand::{distributions::Standard, prelude::Distribution}; use serde::{Deserialize, Serialize}; -use std::{ - fmt::{Debug, Display, Formatter}, - hash::Hash, - ops::RangeInclusive, -}; + +use crate::{datastore::Error, task}; // We have to manually implement [Partial]Eq for a number of types because the derived // implementations don't play nice with generic fields, even if those fields are constrained to diff --git a/aggregator_core/src/datastore/test_util.rs b/aggregator_core/src/datastore/test_util.rs index 4760ca2ed..eb962aee2 100644 --- a/aggregator_core/src/datastore/test_util.rs +++ b/aggregator_core/src/datastore/test_util.rs @@ -1,7 +1,10 @@ -use crate::{ - datastore::{Crypter, Datastore, Transaction}, - test_util::noop_meter, +use std::{ + path::PathBuf, + str::FromStr, + sync::{Arc, Weak}, + time::Duration, }; + use backoff::{future::retry, ExponentialBackoffBuilder}; use chrono::NaiveDateTime; use deadpool_postgres::{Manager, Pool, Timeouts}; @@ -16,18 +19,16 @@ use sqlx::{ migrate::{Migrate, Migrator}, Connection, PgConnection, }; -use std::{ - path::PathBuf, - str::FromStr, - sync::{Arc, Weak}, - time::Duration, -}; use testcontainers::{runners::AsyncRunner, ContainerAsync, RunnableImage}; use tokio::sync::Mutex; use tokio_postgres::{connect, Config, NoTls}; use tracing::trace; use super::SUPPORTED_SCHEMA_VERSIONS; +use crate::{ + datastore::{Crypter, Datastore, Transaction}, + test_util::noop_meter, +}; struct EphemeralDatabase { _db_container: ContainerAsync, diff --git a/aggregator_core/src/datastore/tests.rs b/aggregator_core/src/datastore/tests.rs index a380034f0..22aef6526 100644 --- a/aggregator_core/src/datastore/tests.rs +++ b/aggregator_core/src/datastore/tests.rs @@ -1,25 +1,14 @@ -use crate::{ - datastore::{ - models::{ - AcquiredAggregationJob, AcquiredCollectionJob, AggregateShareJob, AggregationJob, - AggregationJobState, BatchAggregation, BatchAggregationState, CollectionJob, - CollectionJobState, CollectionJobStateCode, GlobalHpkeKeypair, HpkeKeyState, - LeaderStoredReport, Lease, OutstandingBatch, ReportAggregation, - ReportAggregationMetadata, ReportAggregationMetadataState, ReportAggregationState, - SqlInterval, TaskUploadCounter, - }, - schema_versions_template, - test_util::{ - ephemeral_datastore_schema_version, generate_aead_key, EphemeralDatastore, - EphemeralDatastoreBuilder, TEST_DATASTORE_MAX_TRANSACTION_RETRIES, - }, - Crypter, Datastore, Error, RowExt, Transaction, SUPPORTED_SCHEMA_VERSIONS, +use std::{ + collections::{HashMap, HashSet}, + iter, + ops::RangeInclusive, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, }, - query_type::CollectableQueryType, - task::{self, test_util::TaskBuilder, AggregatorTask}, - taskprov::test_util::PeerAggregatorBuilder, - test_util::noop_meter, + time::Duration as StdDuration, }; + use assert_matches::assert_matches; use async_trait::async_trait; use chrono::NaiveDate; @@ -51,22 +40,34 @@ use prio::{ }, }; use rand::{distributions::Standard, random, thread_rng, Rng}; -use std::{ - collections::{HashMap, HashSet}, - iter, - ops::RangeInclusive, - sync::{ - atomic::{AtomicU64, Ordering}, - Arc, - }, - time::Duration as StdDuration, -}; use tokio::{time::timeout, try_join}; use url::Url; // This function is only used when there are multiple supported versions. #[allow(unused_imports)] use crate::datastore::test_util::ephemeral_datastore_schema_version_by_downgrade; +use crate::{ + datastore::{ + models::{ + AcquiredAggregationJob, AcquiredCollectionJob, AggregateShareJob, AggregationJob, + AggregationJobState, BatchAggregation, BatchAggregationState, CollectionJob, + CollectionJobState, CollectionJobStateCode, GlobalHpkeKeypair, HpkeKeyState, + LeaderStoredReport, Lease, OutstandingBatch, ReportAggregation, + ReportAggregationMetadata, ReportAggregationMetadataState, ReportAggregationState, + SqlInterval, TaskUploadCounter, + }, + schema_versions_template, + test_util::{ + ephemeral_datastore_schema_version, generate_aead_key, EphemeralDatastore, + EphemeralDatastoreBuilder, TEST_DATASTORE_MAX_TRANSACTION_RETRIES, + }, + Crypter, Datastore, Error, RowExt, Transaction, SUPPORTED_SCHEMA_VERSIONS, + }, + query_type::CollectableQueryType, + task::{self, test_util::TaskBuilder, AggregatorTask}, + taskprov::test_util::PeerAggregatorBuilder, + test_util::noop_meter, +}; const OLDEST_ALLOWED_REPORT_TIMESTAMP: Time = Time::from_seconds_since_epoch(1000); const REPORT_EXPIRY_AGE: Duration = Duration::from_seconds(1000); diff --git a/aggregator_core/src/lib.rs b/aggregator_core/src/lib.rs index ad155697e..169de95cd 100644 --- a/aggregator_core/src/lib.rs +++ b/aggregator_core/src/lib.rs @@ -6,15 +6,14 @@ #![allow(clippy::single_component_path_imports)] use derivative::Derivative; -use tracing::{debug, info_span, Instrument, Span}; -use trillium::{Conn, Handler, Status}; -use trillium_macros::Handler; -use trillium_router::RouterConnExt; - // We must import `rstest_reuse` at the top of the crate // https://docs.rs/rstest_reuse/0.5.0/rstest_reuse/#use-rstest_reuse-at-the-top-of-your-crate #[cfg(test)] use rstest_reuse; +use tracing::{debug, info_span, Instrument, Span}; +use trillium::{Conn, Handler, Status}; +use trillium_macros::Handler; +use trillium_router::RouterConnExt; pub mod datastore; pub mod query_type; diff --git a/aggregator_core/src/query_type.rs b/aggregator_core/src/query_type.rs index a1e3b5f1d..24315c157 100644 --- a/aggregator_core/src/query_type.rs +++ b/aggregator_core/src/query_type.rs @@ -1,11 +1,5 @@ -use crate::{ - datastore::{ - self, - models::{BatchAggregation, CollectionJob}, - Transaction, - }, - task::AggregatorTask, -}; +use std::iter; + use async_trait::async_trait; use futures::future::try_join_all; use janus_core::time::{Clock, IntervalExt as _, TimeExt as _}; @@ -14,7 +8,15 @@ use janus_messages::{ Duration, FixedSizeQuery, Interval, Query, TaskId, Time, }; use prio::vdaf; -use std::iter; + +use crate::{ + datastore::{ + self, + models::{BatchAggregation, CollectionJob}, + Transaction, + }, + task::AggregatorTask, +}; #[async_trait] pub trait AccumulableQueryType: QueryType { @@ -432,12 +434,13 @@ impl CollectableQueryType for FixedSize { #[cfg(test)] mod tests { + use janus_core::vdaf::VdafInstance; + use janus_messages::{query_type::TimeInterval, Duration, Interval, Time}; + use crate::{ query_type::CollectableQueryType, task::{test_util::TaskBuilder, QueryType}, }; - use janus_core::vdaf::VdafInstance; - use janus_messages::{query_type::TimeInterval, Duration, Interval, Time}; #[test] fn validate_collect_identifier() { diff --git a/aggregator_core/src/task.rs b/aggregator_core/src/task.rs index 1891c42c4..3b3f4e7dd 100644 --- a/aggregator_core/src/task.rs +++ b/aggregator_core/src/task.rs @@ -1,6 +1,7 @@ //! Shared parameters for a DAP task. -use crate::SecretBytes; +use std::{array::TryFromSliceError, collections::HashMap}; + use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; use derivative::Derivative; use janus_core::{ @@ -15,9 +16,10 @@ use janus_messages::{ }; use rand::{distributions::Standard, random, thread_rng, Rng}; use serde::{de::Error as _, Deserialize, Deserializer, Serialize, Serializer}; -use std::{array::TryFromSliceError, collections::HashMap}; use url::Url; +use crate::SecretBytes; + /// Errors that methods and functions in this module may return. #[derive(Debug, thiserror::Error)] pub enum Error { @@ -777,13 +779,8 @@ impl<'de> Deserialize<'de> for AggregatorTask { #[cfg(feature = "test-util")] #[cfg_attr(docsrs, doc(cfg(feature = "test-util")))] pub mod test_util { - use crate::{ - task::{ - AggregatorTask, AggregatorTaskParameters, CommonTaskParameters, Error, QueryType, - VerifyKey, - }, - SecretBytes, - }; + use std::collections::HashMap; + use derivative::Derivative; use janus_core::{ auth_tokens::{AuthenticationToken, AuthenticationTokenHash}, @@ -802,9 +799,16 @@ pub mod test_util { AggregationJobId, CollectionJobId, Duration, HpkeConfigId, Role, TaskId, Time, }; use rand::{distributions::Standard, random, thread_rng, Rng}; - use std::collections::HashMap; use url::Url; + use crate::{ + task::{ + AggregatorTask, AggregatorTaskParameters, CommonTaskParameters, Error, QueryType, + VerifyKey, + }, + SecretBytes, + }; + /// All parameters and secrets for a task, for all participants. #[derive(Clone, Derivative, PartialEq, Eq)] #[derivative(Debug)] @@ -1332,13 +1336,6 @@ pub mod test_util { #[cfg(test)] mod tests { - use crate::{ - task::{ - test_util::TaskBuilder, AggregatorTask, AggregatorTaskParameters, QueryType, - VdafInstance, - }, - SecretBytes, - }; use assert_matches::assert_matches; use janus_core::{ auth_tokens::{AuthenticationToken, AuthenticationTokenHash}, @@ -1353,6 +1350,14 @@ mod tests { use serde_json::json; use serde_test::{assert_de_tokens, assert_tokens, Token}; + use crate::{ + task::{ + test_util::TaskBuilder, AggregatorTask, AggregatorTaskParameters, QueryType, + VdafInstance, + }, + SecretBytes, + }; + #[test] fn leader_task_serialization() { roundtrip_encoding( diff --git a/aggregator_core/src/taskprov.rs b/aggregator_core/src/taskprov.rs index e15a28276..73c3a99be 100644 --- a/aggregator_core/src/taskprov.rs +++ b/aggregator_core/src/taskprov.rs @@ -1,4 +1,5 @@ -use crate::{task::Error, SecretBytes}; +use std::{fmt, str::FromStr, sync::OnceLock}; + use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; use derivative::Derivative; use janus_core::{auth_tokens::AuthenticationToken, vdaf::VdafInstance}; @@ -9,9 +10,10 @@ use serde::{ de::{self, Visitor}, Deserialize, Serialize, Serializer, }; -use std::{fmt, str::FromStr, sync::OnceLock}; use url::Url; +use crate::{task::Error, SecretBytes}; + #[derive(Derivative, Clone, Copy, PartialEq, Eq)] #[derivative(Debug)] pub struct VerifyKeyInit(#[derivative(Debug = "ignore")] [u8; Self::LEN]); diff --git a/client/src/lib.rs b/client/src/lib.rs index 6cab15de0..89cb7d310 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -37,6 +37,8 @@ //! } //! ``` +use std::{convert::Infallible, fmt::Debug, time::SystemTimeError}; + use backoff::ExponentialBackoff; use derivative::Derivative; use http::header::CONTENT_TYPE; @@ -57,7 +59,6 @@ use prio::{ vdaf, }; use rand::random; -use std::{convert::Infallible, fmt::Debug, time::SystemTimeError}; use tokio::try_join; use url::Url; @@ -501,7 +502,6 @@ impl> Client { #[cfg(test)] mod tests { - use crate::{aggregator_hpke_config, default_http_client, Client, ClientParameters, Error}; use assert_matches::assert_matches; use hex_literal::hex; use http::{header::CONTENT_TYPE, StatusCode}; @@ -518,6 +518,8 @@ mod tests { use rand::random; use url::Url; + use crate::{aggregator_hpke_config, default_http_client, Client, ClientParameters, Error}; + fn setup_client>(server: &mockito::Server, vdaf: V) -> Client { let server_url = Url::parse(&server.url()).unwrap(); Client::builder( diff --git a/collector/src/credential.rs b/collector/src/credential.rs index a589959dc..6fe0ea082 100644 --- a/collector/src/credential.rs +++ b/collector/src/credential.rs @@ -45,7 +45,6 @@ impl PrivateCollectorCredential { #[cfg(test)] mod tests { - use crate::credential::PrivateCollectorCredential; use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; use janus_core::{ auth_tokens::AuthenticationToken, @@ -55,6 +54,8 @@ mod tests { HpkeAeadId, HpkeConfig, HpkeConfigId, HpkeKdfId, HpkeKemId, HpkePublicKey, }; + use crate::credential::PrivateCollectorCredential; + const SAMPLE_COLLECTOR_CREDENTIAL: &str = r#"{ "aead": "AesGcm128", "id": 66, diff --git a/collector/src/lib.rs b/collector/src/lib.rs index 2e6889a49..e6478a955 100644 --- a/collector/src/lib.rs +++ b/collector/src/lib.rs @@ -53,6 +53,11 @@ mod credential; +use std::{ + convert::TryFrom, + time::{Duration as StdDuration, SystemTime}, +}; + use backoff::backoff::Backoff; pub use backoff::ExponentialBackoff; use chrono::{DateTime, Duration, TimeZone, Utc}; @@ -82,10 +87,6 @@ use reqwest::{ }; pub use retry_after; use retry_after::{FromHeaderValueError, RetryAfter}; -use std::{ - convert::TryFrom, - time::{Duration as StdDuration, SystemTime}, -}; use tokio::time::{sleep, Instant}; use tracing::debug; use url::Url; @@ -747,7 +748,6 @@ impl Collector { #[cfg(test)] mod tests { - use crate::{Collection, CollectionJob, Collector, Error, PollResult}; use assert_matches::assert_matches; use chrono::{DateTime, TimeZone, Utc}; #[cfg(feature = "fpvec_bounded_l2")] @@ -780,6 +780,8 @@ mod tests { }; use retry_after::RetryAfter; + use crate::{Collection, CollectionJob, Collector, Error, PollResult}; + fn setup_collector(server: &mut mockito::Server, vdaf: V) -> Collector { let server_url = Url::parse(&server.url()).unwrap(); let hpke_keypair = generate_test_hpke_config_and_private_key(); diff --git a/core/src/auth_tokens.rs b/core/src/auth_tokens.rs index 2381db7e9..166fbbe5c 100644 --- a/core/src/auth_tokens.rs +++ b/core/src/auth_tokens.rs @@ -1,3 +1,8 @@ +use std::{ + str::{self, FromStr}, + sync::OnceLock, +}; + use anyhow::anyhow; use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; use derivative::Derivative; @@ -9,10 +14,6 @@ use ring::{ digest::{digest, SHA256, SHA256_OUTPUT_LEN}, }; use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer}; -use std::{ - str::{self, FromStr}, - sync::OnceLock, -}; /// HTTP header where auth tokens are provided in messages between participants. pub const DAP_AUTH_HEADER: &str = "DAP-Auth-Token"; @@ -433,10 +434,12 @@ impl AsRef<[u8]> for AuthenticationTokenHash { #[cfg(test)] mod tests { - use crate::auth_tokens::{AuthenticationToken, AuthenticationTokenHash}; - use rand::random; use std::str::FromStr as _; + use rand::random; + + use crate::auth_tokens::{AuthenticationToken, AuthenticationTokenHash}; + #[test] fn valid_dap_auth_token() { serde_yaml::from_str::( diff --git a/core/src/cli.rs b/core/src/cli.rs index 9625cfcaf..22d5f3ec4 100644 --- a/core/src/cli.rs +++ b/core/src/cli.rs @@ -1,7 +1,8 @@ //! Types useful for creating CLI tools. +use std::fmt::Display; + use clap::ValueEnum; use janus_messages::{HpkeAeadId, HpkeKdfId, HpkeKemId}; -use std::fmt::Display; #[derive(Debug, Copy, Clone, ValueEnum)] #[value()] diff --git a/core/src/hpke.rs b/core/src/hpke.rs index 57d660a10..25798c4ac 100644 --- a/core/src/hpke.rs +++ b/core/src/hpke.rs @@ -1,4 +1,9 @@ //! Encryption and decryption of messages using HPKE (RFC 9180). +use std::{ + fmt::{self, Debug}, + str::FromStr, +}; + use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; use derivative::Derivative; use hpke_dispatch::{HpkeError, Kem, Keypair}; @@ -9,10 +14,6 @@ use serde::{ de::{self, Visitor}, Deserialize, Serialize, Serializer, }; -use std::{ - fmt::{self, Debug}, - str::FromStr, -}; #[derive(Debug, thiserror::Error)] pub enum Error { @@ -265,10 +266,11 @@ impl HpkeKeypair { #[cfg(feature = "test-util")] #[cfg_attr(docsrs, doc(cfg(feature = "test-util")))] pub mod test_util { - use super::{generate_hpke_config_and_private_key, HpkeKeypair}; use janus_messages::{HpkeAeadId, HpkeConfigId, HpkeKdfId, HpkeKemId}; use rand::random; + use super::{generate_hpke_config_and_private_key, HpkeKeypair}; + pub fn generate_test_hpke_config_and_private_key() -> HpkeKeypair { generate_hpke_config_and_private_key( HpkeConfigId::from(random::()), @@ -292,16 +294,18 @@ pub mod test_util { #[cfg(test)] mod tests { - use super::{test_util::generate_test_hpke_config_and_private_key, HpkeApplicationInfo, Label}; - #[allow(deprecated)] - use crate::hpke::{open, seal, HpkeKeypair, HpkePrivateKey}; + use std::collections::HashSet; + use hpke_dispatch::{Kem, Keypair}; use janus_messages::{ HpkeAeadId, HpkeCiphertext, HpkeConfig, HpkeConfigId, HpkeKdfId, HpkeKemId, HpkePublicKey, Role, }; use serde::Deserialize; - use std::collections::HashSet; + + use super::{test_util::generate_test_hpke_config_and_private_key, HpkeApplicationInfo, Label}; + #[allow(deprecated)] + use crate::hpke::{open, seal, HpkeKeypair, HpkePrivateKey}; #[test] fn exchange_message() { diff --git a/core/src/http.rs b/core/src/http.rs index becb501a5..92eb31d93 100644 --- a/core/src/http.rs +++ b/core/src/http.rs @@ -1,13 +1,15 @@ -use crate::auth_tokens::AuthenticationToken; +use std::fmt::{self, Display, Formatter}; + use anyhow::{anyhow, Context}; use http::StatusCode; use http_api_problem::{HttpApiProblem, PROBLEM_JSON_MEDIA_TYPE}; use janus_messages::problem_type::DapProblemType; use reqwest::{header::CONTENT_TYPE, Response}; -use std::fmt::{self, Display, Formatter}; use tracing::warn; use trillium::Conn; +use crate::auth_tokens::AuthenticationToken; + /// This captures an HTTP status code and parsed problem details document from an HTTP response. #[derive(Debug)] pub struct HttpErrorResponse { diff --git a/core/src/lib.rs b/core/src/lib.rs index cf6b97182..c3b652ca3 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -1,6 +1,7 @@ #![cfg_attr(docsrs, feature(doc_cfg))] use std::future::Future; + use tokio::task::JoinHandle; use url::Url; diff --git a/core/src/retries.rs b/core/src/retries.rs index 1ec4c2448..9914f5da1 100644 --- a/core/src/retries.rs +++ b/core/src/retries.rs @@ -1,14 +1,16 @@ //! Provides a simple interface for retrying fallible HTTP requests. -use crate::http::HttpErrorResponse; +use std::{error::Error as StdError, time::Duration}; + use backoff::{backoff::Backoff, future::retry_notify, ExponentialBackoff, Notify}; use bytes::Bytes; use futures::Future; use http::HeaderMap; use reqwest::StatusCode; -use std::{error::Error as StdError, time::Duration}; use tracing::{debug, warn}; +use crate::http::HttpErrorResponse; + /// Traverse chain of source errors looking for an `std::io::Error`. fn find_io_error(original_error: &reqwest::Error) -> Option<&std::io::Error> { let mut cause = original_error.source(); @@ -210,9 +212,10 @@ pub fn is_retryable_http_status(status: StatusCode) -> bool { #[cfg(feature = "test-util")] #[cfg_attr(docsrs, doc(cfg(feature = "test-util")))] pub mod test_util { - use backoff::{backoff::Backoff, ExponentialBackoff}; use std::time::Duration; + use backoff::{backoff::Backoff, ExponentialBackoff}; + /// An [`ExponentialBackoff`] with parameters tuned for tests where we don't want to be retrying /// for 10 minutes. pub fn test_http_request_exponential_backoff() -> ExponentialBackoff { @@ -258,16 +261,18 @@ pub mod test_util { #[cfg(test)] mod tests { - use crate::{ - retries::{retry_http_request, retry_http_request_notify, test_util::LimitedRetryer}, - test_util::install_test_trace_subscriber, - }; + use std::time::Duration; + use backoff::Notify; use reqwest::StatusCode; - use std::time::Duration; use tokio::net::TcpListener; use url::Url; + use crate::{ + retries::{retry_http_request, retry_http_request_notify, test_util::LimitedRetryer}, + test_util::install_test_trace_subscriber, + }; + #[derive(Default)] struct NotifyCounter { count: u64, diff --git a/core/src/test_util/kubernetes.rs b/core/src/test_util/kubernetes.rs index e69b80f5a..a479b63b7 100644 --- a/core/src/test_util/kubernetes.rs +++ b/core/src/test_util/kubernetes.rs @@ -1,5 +1,11 @@ //! Testing framework for functionality that interacts with Kubernetes. +use std::{ + net::{Ipv4Addr, SocketAddrV4}, + path::{Path, PathBuf}, + process::{Command, Stdio}, +}; + use anyhow::Context; use futures::TryStreamExt; use k8s_openapi::api::core::v1::{Pod, Service}; @@ -9,11 +15,6 @@ use kube::{ Api, ResourceExt, }; use rand::random; -use std::{ - net::{Ipv4Addr, SocketAddrV4}, - path::{Path, PathBuf}, - process::{Command, Stdio}, -}; use stopper::Stopper; use tempfile::NamedTempFile; use tokio::{ @@ -261,11 +262,12 @@ impl Drop for PortForward { #[cfg(test)] mod tests { - use super::EphemeralCluster; - use crate::test_util::install_test_trace_subscriber; use k8s_openapi::api::core::v1::Node; use kube::{api::ListParams, Api}; + use super::EphemeralCluster; + use crate::test_util::install_test_trace_subscriber; + #[tokio::test] async fn create_clusters() { // Create a couple of clusters, check communication, then drop them, to test that creating diff --git a/core/src/test_util/mod.rs b/core/src/test_util/mod.rs index 8427d316e..950ef4396 100644 --- a/core/src/test_util/mod.rs +++ b/core/src/test_util/mod.rs @@ -1,3 +1,5 @@ +use std::{fmt::Debug, sync::Once}; + use assert_matches::assert_matches; use janus_messages::{ReportId, Role}; use prio::{ @@ -8,7 +10,6 @@ use prio::{ vdaf, }; use serde::{de::DeserializeOwned, Serialize}; -use std::{fmt::Debug, sync::Once}; use tracing_log::LogTracer; use tracing_subscriber::{prelude::*, EnvFilter, Registry}; diff --git a/core/src/test_util/runtime.rs b/core/src/test_util/runtime.rs index 0a144b338..ced20a7de 100644 --- a/core/src/test_util/runtime.rs +++ b/core/src/test_util/runtime.rs @@ -1,5 +1,3 @@ -use crate::Runtime; -use futures::FutureExt; use std::{ collections::HashMap, future::Future, @@ -10,11 +8,15 @@ use std::{ Arc, }, }; + +use futures::FutureExt; use tokio::{ sync::watch::{self, Sender}, task::JoinHandle, }; +use crate::Runtime; + /// Tracks multiple instrumented [`Runtime`] objects, for use in tests. Each /// [`TestRuntime`] keeps track of how many of its tasks have been completed, /// and tests can wait until a given number of tasks finish. If any task @@ -136,10 +138,12 @@ impl Runtime for TestRuntime { #[cfg(test)] mod tests { - use super::{Runtime, TestRuntimeManager}; use std::sync::Arc; + use tokio::{sync::Barrier, task::JoinHandle}; + use super::{Runtime, TestRuntimeManager}; + #[tokio::test] async fn mock_runtime() { #[derive(PartialEq, Eq, Hash)] diff --git a/core/src/test_util/testcontainers.rs b/core/src/test_util/testcontainers.rs index 7f9add493..d6f29f335 100644 --- a/core/src/test_util/testcontainers.rs +++ b/core/src/test_util/testcontainers.rs @@ -1,6 +1,7 @@ //! Testing functionality that interacts with the testcontainers library. use std::{collections::HashMap, process::Command}; + use testcontainers::{core::WaitFor, Image}; /// A [`testcontainers::Image`] that provides a Postgres server. diff --git a/core/src/time.rs b/core/src/time.rs index 2842aa415..dd8526818 100644 --- a/core/src/time.rs +++ b/core/src/time.rs @@ -1,12 +1,13 @@ //! Utilities for timestamps and durations. -use chrono::{DateTime, NaiveDateTime, Utc}; -use janus_messages::{Duration, Error, Interval, Time}; use std::{ fmt::{Debug, Formatter}, sync::{Arc, Mutex}, }; +use chrono::{DateTime, NaiveDateTime, Utc}; +use janus_messages::{Duration, Error, Interval, Time}; + /// A clock knows what time it currently is. pub trait Clock: 'static + Clone + Debug + Sync + Send { /// Get the current time. @@ -337,9 +338,10 @@ impl IntervalExt for Interval { #[cfg(test)] mod tests { - use crate::time::{DurationExt, IntervalExt}; use janus_messages::{Duration, Interval, Time}; + use crate::time::{DurationExt, IntervalExt}; + #[test] fn round_up_duration() { for (label, duration, time_precision, expected) in [ diff --git a/core/src/vdaf.rs b/core/src/vdaf.rs index 2881fdbe2..7190b390d 100644 --- a/core/src/vdaf.rs +++ b/core/src/vdaf.rs @@ -1,3 +1,5 @@ +use std::str; + use derivative::Derivative; use janus_messages::taskprov; use prio::{ @@ -9,7 +11,6 @@ use prio::{ vdaf::{prio3::Prio3, xof::XofHmacSha256Aes128, VdafError}, }; use serde::{Deserialize, Serialize}; -use std::str; /// The length of the verify key parameter for Prio3 and Poplar1 VDAF instantiations using /// [`XofTurboShake128`][prio::vdaf::xof::XofTurboShake128]. @@ -536,10 +537,11 @@ macro_rules! vdaf_dispatch { #[cfg(test)] mod tests { - use super::VdafInstance; use assert_matches::assert_matches; use serde_test::{assert_tokens, Token}; + use super::VdafInstance; + #[test] fn vdaf_serialization() { // The `Vdaf` type must have a stable serialization, as it gets stored in a JSON database diff --git a/integration_tests/src/client.rs b/integration_tests/src/client.rs index 36fc7731e..9d7ab0210 100644 --- a/integration_tests/src/client.rs +++ b/integration_tests/src/client.rs @@ -1,4 +1,5 @@ -use crate::TaskParameters; +use std::env; + use anyhow::anyhow; use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; use janus_client::Client; @@ -16,10 +17,11 @@ use prio::{ }; use rand::random; use serde_json::{json, Value}; -use std::env; use testcontainers::{core::WaitFor, runners::AsyncRunner, Image, RunnableImage}; use url::Url; +use crate::TaskParameters; + /// Extension trait to encode measurements for VDAFs as JSON objects, according to /// draft-dcook-ppm-dap-interop-test-design. pub trait InteropClientEncoding: vdaf::Client<16> { diff --git a/integration_tests/src/daphne.rs b/integration_tests/src/daphne.rs index e169ea3ad..06e943cc9 100644 --- a/integration_tests/src/daphne.rs +++ b/integration_tests/src/daphne.rs @@ -1,6 +1,5 @@ //! Functionality for tests interacting with Daphne (). -use crate::interop_api; use janus_aggregator_core::task::test_util::{Task, TaskBuilder}; use janus_interop_binaries::{ get_rust_log_level, test_util::await_ready_ok, ContainerLogsDropGuard, ContainerLogsSource, @@ -10,6 +9,8 @@ use serde_json::json; use testcontainers::{runners::AsyncRunner, GenericImage, RunnableImage}; use url::Url; +use crate::interop_api; + const DAPHNE_HELPER_IMAGE_NAME_AND_TAG: &str = "cloudflare/daphne-worker-helper:sha-f6b3ef1"; /// Represents a running Daphne test instance. diff --git a/integration_tests/src/interop_api.rs b/integration_tests/src/interop_api.rs index bcfb0a746..6e05030f1 100644 --- a/integration_tests/src/interop_api.rs +++ b/integration_tests/src/interop_api.rs @@ -1,7 +1,8 @@ +use std::collections::HashMap; + use janus_aggregator_core::task::test_util::Task; use janus_interop_binaries::AggregatorAddTaskRequest; use janus_messages::Role; -use std::collections::HashMap; use url::Url; /// Send an interop test API request to add a DAP task. This assumes the server is available on diff --git a/integration_tests/src/janus.rs b/integration_tests/src/janus.rs index 23d5cd34a..ae07f4a4b 100644 --- a/integration_tests/src/janus.rs +++ b/integration_tests/src/janus.rs @@ -1,7 +1,7 @@ //! Functionality for tests interacting with Janus (). -#[cfg(feature = "testcontainer")] -use crate::interop_api; +use std::net::{Ipv4Addr, SocketAddr}; + use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; use janus_aggregator::{ binaries::{ @@ -35,11 +35,13 @@ use janus_interop_binaries::{ ContainerLogsDropGuard, }; use janus_messages::Role; -use std::net::{Ipv4Addr, SocketAddr}; #[cfg(feature = "testcontainer")] use testcontainers::{runners::AsyncRunner, RunnableImage}; use trillium_tokio::Stopper; +#[cfg(feature = "testcontainer")] +use crate::interop_api; + /// Represents a running Janus test instance in a container. #[cfg(feature = "testcontainer")] pub struct JanusContainer { diff --git a/integration_tests/src/lib.rs b/integration_tests/src/lib.rs index 2c998de55..3067b27ed 100644 --- a/integration_tests/src/lib.rs +++ b/integration_tests/src/lib.rs @@ -1,10 +1,11 @@ //! This crate contains functionality useful for Janus integration tests. +use std::time; + use janus_aggregator_core::task::QueryType; use janus_collector::AuthenticationToken; use janus_core::{hpke::HpkeKeypair, vdaf::VdafInstance}; use janus_messages::{Duration, TaskId}; -use std::time; use url::Url; pub mod client; diff --git a/integration_tests/tests/integration/common.rs b/integration_tests/tests/integration/common.rs index 3b52b240c..45a9abc71 100644 --- a/integration_tests/tests/integration/common.rs +++ b/integration_tests/tests/integration/common.rs @@ -1,3 +1,5 @@ +use std::{iter, time::Duration as StdDuration}; + use backoff::{future::retry, ExponentialBackoffBuilder}; use itertools::Itertools; use janus_aggregator_core::task::{test_util::TaskBuilder, QueryType}; @@ -21,7 +23,6 @@ use prio::{ vdaf::{self, dummy, prio3::Prio3}, }; use rand::{random, thread_rng, Rng}; -use std::{iter, time::Duration as StdDuration}; use tokio::time::{self, sleep}; use url::Url; diff --git a/integration_tests/tests/integration/daphne.rs b/integration_tests/tests/integration/daphne.rs index 48724921f..3c7097434 100644 --- a/integration_tests/tests/integration/daphne.rs +++ b/integration_tests/tests/integration/daphne.rs @@ -1,7 +1,5 @@ -use crate::{ - common::{build_test_task, submit_measurements_and_verify_aggregate, TestContext}, - initialize_rustls, -}; +use std::time::Duration; + use janus_aggregator_core::task::{test_util::TaskBuilder, QueryType}; use janus_core::{test_util::install_test_trace_subscriber, vdaf::VdafInstance}; #[cfg(feature = "testcontainer")] @@ -11,7 +9,11 @@ use janus_integration_tests::{ }; use janus_interop_binaries::test_util::generate_network_name; use janus_messages::Role; -use std::time::Duration; + +use crate::{ + common::{build_test_task, submit_measurements_and_verify_aggregate, TestContext}, + initialize_rustls, +}; const VERSION_PATH: &str = "/v09/"; diff --git a/integration_tests/tests/integration/divviup_ts.rs b/integration_tests/tests/integration/divviup_ts.rs index 8aac8b0db..6a439df6e 100644 --- a/integration_tests/tests/integration/divviup_ts.rs +++ b/integration_tests/tests/integration/divviup_ts.rs @@ -1,10 +1,8 @@ #![cfg(feature = "testcontainer")] //! These tests check interoperation between the divviup-ts client and Janus aggregators. -use crate::{ - common::{build_test_task, submit_measurements_and_verify_aggregate, TestContext}, - initialize_rustls, -}; +use std::time::Duration; + use janus_aggregator_core::task::{test_util::TaskBuilder, QueryType}; use janus_core::{test_util::install_test_trace_subscriber, vdaf::VdafInstance}; use janus_integration_tests::{ @@ -13,7 +11,11 @@ use janus_integration_tests::{ }; use janus_interop_binaries::test_util::generate_network_name; use janus_messages::Role; -use std::time::Duration; + +use crate::{ + common::{build_test_task, submit_measurements_and_verify_aggregate, TestContext}, + initialize_rustls, +}; async fn run_divviup_ts_integration_test(test_name: &str, vdaf: VdafInstance) { let (task_parameters, task_builder) = build_test_task( diff --git a/integration_tests/tests/integration/in_cluster.rs b/integration_tests/tests/integration/in_cluster.rs index 3b52de652..c974e564c 100644 --- a/integration_tests/tests/integration/in_cluster.rs +++ b/integration_tests/tests/integration/in_cluster.rs @@ -1,9 +1,7 @@ #![cfg(feature = "in-cluster")] -use crate::{ - common::{build_test_task, submit_measurements_and_verify_aggregate, TestContext}, - initialize_rustls, -}; +use std::{env, str::FromStr, time::Duration}; + use chrono::prelude::*; use clap::{CommandFactory, FromArgMatches, Parser}; use divviup_client::{ @@ -23,12 +21,16 @@ use janus_core::{ }; use janus_integration_tests::{client::ClientBackend, TaskParameters}; use janus_messages::{Duration as JanusDuration, TaskId}; -use std::{env, str::FromStr, time::Duration}; use trillium_rustls::RustlsConfig; use trillium_tokio::ClientConfig; use url::Url; use uuid::Uuid; +use crate::{ + common::{build_test_task, submit_measurements_and_verify_aggregate, TestContext}, + initialize_rustls, +}; + /// Options for running tests. #[derive(Debug, Parser)] #[clap( @@ -595,8 +597,13 @@ async fn in_cluster_time_bucketed_fixed_size() { #[cfg(feature = "in-cluster-rate-limits")] mod rate_limits { - use super::InClusterJanusPair; - use crate::initialize_rustls; + use std::{ + env, + fs::File, + sync::{Arc, OnceLock}, + time::Duration, + }; + use assert_matches::assert_matches; use http::Method; use janus_aggregator_core::task::QueryType; @@ -605,15 +612,12 @@ mod rate_limits { use rand::random; use reqwest::StatusCode; use serde::Deserialize; - use std::{ - env, - fs::File, - sync::{Arc, OnceLock}, - time::Duration, - }; use tokio::sync::Semaphore; use url::Url; + use super::InClusterJanusPair; + use crate::initialize_rustls; + /// Configuration for the rate limit test. We need to know the QPS and the window over which /// it is enforced so that we can send the appropriate number of requests. We load this config /// from a file so that an integration test harness that knows what rate limits have been diff --git a/integration_tests/tests/integration/janus.rs b/integration_tests/tests/integration/janus.rs index 806744826..2899585a1 100644 --- a/integration_tests/tests/integration/janus.rs +++ b/integration_tests/tests/integration/janus.rs @@ -1,10 +1,5 @@ -use crate::{ - common::{ - build_test_task, submit_measurements_and_verify_aggregate, - submit_measurements_and_verify_aggregate_varying_aggregation_parameter, TestContext, - }, - initialize_rustls, -}; +use std::time::Duration; + use janus_aggregator_core::task::{test_util::TaskBuilder, QueryType}; use janus_core::{test_util::install_test_trace_subscriber, vdaf::VdafInstance}; #[cfg(feature = "testcontainer")] @@ -14,7 +9,14 @@ use janus_integration_tests::{client::ClientBackend, janus::JanusInProcess, Task use janus_interop_binaries::test_util::generate_network_name; use janus_messages::Role; use prio::vdaf::dummy; -use std::time::Duration; + +use crate::{ + common::{ + build_test_task, submit_measurements_and_verify_aggregate, + submit_measurements_and_verify_aggregate_varying_aggregation_parameter, TestContext, + }, + initialize_rustls, +}; /// A pair of Janus instances, running in containers, against which integration tests may be run. #[cfg(feature = "testcontainer")] diff --git a/interop_binaries/src/commands/janus_interop_aggregator.rs b/interop_binaries/src/commands/janus_interop_aggregator.rs index 3bc2f0eb3..ddb2f1305 100644 --- a/interop_binaries/src/commands/janus_interop_aggregator.rs +++ b/interop_binaries/src/commands/janus_interop_aggregator.rs @@ -1,7 +1,5 @@ -use crate::{ - status::{ERROR, SUCCESS}, - AddTaskResponse, AggregatorAddTaskRequest, AggregatorRole, HpkeConfigRegistry, Keyring, -}; +use std::{net::SocketAddr, sync::Arc}; + use anyhow::Context; use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; use clap::Parser; @@ -23,7 +21,6 @@ use janus_messages::{Duration, HpkeConfig, Time}; use prio::codec::Decode; use serde::{Deserialize, Serialize}; use serde_json::json; -use std::{net::SocketAddr, sync::Arc}; use tokio::sync::Mutex; use trillium::{Conn, Handler, Status}; use trillium_api::{api, ApiConnExt, Json}; @@ -32,6 +29,11 @@ use trillium_router::Router; use trillium_tokio::ClientConfig; use url::Url; +use crate::{ + status::{ERROR, SUCCESS}, + AddTaskResponse, AggregatorAddTaskRequest, AggregatorRole, HpkeConfigRegistry, Keyring, +}; + #[derive(Debug, Serialize)] struct EndpointResponse { status: &'static str, @@ -280,9 +282,10 @@ impl Options { #[cfg(test)] mod tests { - use super::Options; use clap::CommandFactory; + use super::Options; + #[test] fn verify_clap_app() { Options::command().debug_assert(); diff --git a/interop_binaries/src/commands/janus_interop_client.rs b/interop_binaries/src/commands/janus_interop_client.rs index a16bea42e..91689eae3 100644 --- a/interop_binaries/src/commands/janus_interop_client.rs +++ b/interop_binaries/src/commands/janus_interop_client.rs @@ -1,8 +1,5 @@ -use crate::{ - install_tracing_subscriber, - status::{ERROR, SUCCESS}, - ErrorHandler, NumberAsString, VdafObject, -}; +use std::{fmt::Display, net::Ipv4Addr, str::FromStr}; + use anyhow::Context; use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; use clap::Parser; @@ -25,12 +22,17 @@ use prio::{ vdaf::prio3::Prio3, }; use serde::{Deserialize, Serialize}; -use std::{fmt::Display, net::Ipv4Addr, str::FromStr}; use trillium::{Conn, Handler}; use trillium_api::{api, Json, State}; use trillium_router::Router; use url::Url; +use crate::{ + install_tracing_subscriber, + status::{ERROR, SUCCESS}, + ErrorHandler, NumberAsString, VdafObject, +}; + /// Parse a numeric measurement from its intermediate JSON representation. fn parse_primitive_measurement(value: serde_json::Value) -> anyhow::Result where diff --git a/interop_binaries/src/commands/janus_interop_collector.rs b/interop_binaries/src/commands/janus_interop_collector.rs index 0cd713725..efd800b01 100644 --- a/interop_binaries/src/commands/janus_interop_collector.rs +++ b/interop_binaries/src/commands/janus_interop_collector.rs @@ -1,8 +1,10 @@ -use crate::{ - install_tracing_subscriber, - status::{COMPLETE, ERROR, IN_PROGRESS, SUCCESS}, - ErrorHandler, HpkeConfigRegistry, Keyring, NumberAsString, VdafObject, +use std::{ + collections::{hash_map::Entry, HashMap}, + net::Ipv4Addr, + sync::Arc, + time::Duration as StdDuration, }; + use anyhow::Context; use backoff::ExponentialBackoffBuilder; use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; @@ -34,17 +36,17 @@ use prio::{ use rand::{distributions::Standard, prelude::Distribution, random}; use reqwest::Url; use serde::{Deserialize, Serialize}; -use std::{ - collections::{hash_map::Entry, HashMap}, - net::Ipv4Addr, - sync::Arc, - time::Duration as StdDuration, -}; use tokio::{sync::Mutex, task::JoinHandle}; use trillium::{Conn, Handler}; use trillium_api::{api, Json, State}; use trillium_router::Router; +use crate::{ + install_tracing_subscriber, + status::{COMPLETE, ERROR, IN_PROGRESS, SUCCESS}, + ErrorHandler, HpkeConfigRegistry, Keyring, NumberAsString, VdafObject, +}; + #[derive(Derivative, Deserialize)] #[derivative(Debug)] struct AddTaskRequest { diff --git a/interop_binaries/src/lib.rs b/interop_binaries/src/lib.rs index 0e1256afa..a801a24e6 100644 --- a/interop_binaries/src/lib.rs +++ b/interop_binaries/src/lib.rs @@ -1,3 +1,17 @@ +use std::{ + collections::HashMap, + env::{self, VarError}, + fmt::Display, + fs::{create_dir_all, File}, + io::{stderr, Write}, + marker::PhantomData, + ops::Deref, + path::PathBuf, + process::{Command, Stdio}, + str::FromStr, + sync::Arc, +}; + use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; use derivative::Derivative; use janus_aggregator_core::task::{test_util::Task, QueryType}; @@ -14,19 +28,6 @@ use janus_messages::{ use prio::codec::Encode; use rand::random; use serde::{de::Visitor, Deserialize, Serialize}; -use std::{ - collections::HashMap, - env::{self, VarError}, - fmt::Display, - fs::{create_dir_all, File}, - io::{stderr, Write}, - marker::PhantomData, - ops::Deref, - path::PathBuf, - process::{Command, Stdio}, - str::FromStr, - sync::Arc, -}; use testcontainers::{ContainerAsync, Image}; use tokio::sync::Mutex; use tracing_log::LogTracer; @@ -566,10 +567,11 @@ pub fn get_rust_log_level() -> (&'static str, String) { #[cfg(feature = "test-util")] pub mod test_util { + use std::{fmt::Debug, sync::OnceLock, time::Duration}; + use backoff::{future::retry, ExponentialBackoff}; use futures::{Future, TryFutureExt}; use rand::random; - use std::{fmt::Debug, sync::OnceLock, time::Duration}; use url::Url; async fn await_readiness_condition< diff --git a/interop_binaries/src/testcontainer.rs b/interop_binaries/src/testcontainer.rs index 7af6bfa59..9903307b9 100644 --- a/interop_binaries/src/testcontainer.rs +++ b/interop_binaries/src/testcontainer.rs @@ -4,6 +4,7 @@ //! and provide them. use std::env; + use testcontainers::{core::WaitFor, Image}; // Note that testcontainers always assembles image names in the format "$NAME:$TAG". Images will diff --git a/interop_binaries/tests/end_to_end.rs b/interop_binaries/tests/end_to_end.rs index 2a76012d7..d242aca7d 100644 --- a/interop_binaries/tests/end_to_end.rs +++ b/interop_binaries/tests/end_to_end.rs @@ -1,7 +1,11 @@ #![cfg(feature = "testcontainer")] +use std::time::Duration as StdDuration; + use backoff::{backoff::Backoff, ExponentialBackoffBuilder}; use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; +#[cfg(feature = "fpvec_bounded_l2")] +use fixed_macro::fixed; use futures::future::join_all; use janus_core::{ test_util::install_test_trace_subscriber, @@ -22,13 +26,9 @@ use prio::codec::Encode; use rand::random; use reqwest::{header::CONTENT_TYPE, StatusCode, Url}; use serde_json::{json, Value}; -use std::time::Duration as StdDuration; use testcontainers::{runners::AsyncRunner, RunnableImage}; use tokio::time::sleep; -#[cfg(feature = "fpvec_bounded_l2")] -use fixed_macro::fixed; - const JSON_MEDIA_TYPE: &str = "application/json"; const TIME_PRECISION: u64 = 3600; diff --git a/messages/src/lib.rs b/messages/src/lib.rs index ff3357072..aa42f7a8c 100644 --- a/messages/src/lib.rs +++ b/messages/src/lib.rs @@ -3,11 +3,20 @@ //! //! [dap]: https://datatracker.ietf.org/doc/draft-ietf-ppm-dap/ -use self::query_type::{FixedSize, QueryType, TimeInterval}; +use std::{ + fmt::{self, Debug, Display, Formatter}, + io::{Cursor, Read}, + num::TryFromIntError, + str, + str::FromStr, + time::{SystemTime, SystemTimeError}, +}; + use anyhow::anyhow; use base64::{display::Base64Display, engine::general_purpose::URL_SAFE_NO_PAD, Engine}; use derivative::Derivative; use num_enum::{FromPrimitive, IntoPrimitive, TryFromPrimitive}; +pub use prio::codec; use prio::{ codec::{ decode_u16_items, decode_u32_items, encode_u16_items, encode_u32_items, CodecError, Decode, @@ -20,16 +29,8 @@ use serde::{ de::{self, Visitor}, Deserialize, Serialize, Serializer, }; -use std::{ - fmt::{self, Debug, Display, Formatter}, - io::{Cursor, Read}, - num::TryFromIntError, - str, - str::FromStr, - time::{SystemTime, SystemTimeError}, -}; -pub use prio::codec; +use self::query_type::{FixedSize, QueryType, TimeInterval}; pub mod problem_type; pub mod query_type; diff --git a/messages/src/query_type.rs b/messages/src/query_type.rs index a36995ef4..ffe313783 100644 --- a/messages/src/query_type.rs +++ b/messages/src/query_type.rs @@ -1,16 +1,17 @@ -use crate::{Collection, FixedSizeQuery, Query}; - -use super::{BatchId, Interval}; -use anyhow::anyhow; -use num_enum::TryFromPrimitive; -use prio::codec::{CodecError, Decode, Encode}; -use serde::{Deserialize, Serialize}; use std::{ fmt::{Debug, Display}, hash::Hash, io::Cursor, }; +use anyhow::anyhow; +use num_enum::TryFromPrimitive; +use prio::codec::{CodecError, Decode, Encode}; +use serde::{Deserialize, Serialize}; + +use super::{BatchId, Interval}; +use crate::{Collection, FixedSizeQuery, Query}; + /// QueryType represents a DAP query type. This is a task-level configuration setting which /// determines how individual client reports are grouped together into batches for collection. pub trait QueryType: Clone + Debug + PartialEq + Eq + Send + Sync + 'static { diff --git a/messages/src/taskprov.rs b/messages/src/taskprov.rs index 01846df6d..9fa9e1bf4 100644 --- a/messages/src/taskprov.rs +++ b/messages/src/taskprov.rs @@ -2,14 +2,16 @@ //! //! [1]: https://datatracker.ietf.org/doc/draft-wang-ppm-dap-taskprov/ -use crate::{Duration, Error, Time, Url}; +use std::{fmt::Debug, io::Cursor}; + use anyhow::anyhow; use derivative::Derivative; use prio::codec::{ decode_u16_items, decode_u8_items, encode_u16_items, encode_u8_items, CodecError, Decode, Encode, }; -use std::{fmt::Debug, io::Cursor}; + +use crate::{Duration, Error, Time, Url}; /// Defines all parameters necessary to configure an aggregator with a new task. /// Provided by taskprov participants in all requests incident to task execution. @@ -571,9 +573,10 @@ impl Decode for DpMechanism { #[cfg(test)] mod tests { + use assert_matches::assert_matches; + use super::*; use crate::roundtrip_encoding; - use assert_matches::assert_matches; #[test] fn roundtrip_dp_config() { diff --git a/messages/src/tests/aggregation.rs b/messages/src/tests/aggregation.rs index 4172b0fdd..c85fb7f03 100644 --- a/messages/src/tests/aggregation.rs +++ b/messages/src/tests/aggregation.rs @@ -1,10 +1,11 @@ +use prio::topology::ping_pong::PingPongMessage; + use crate::{ roundtrip_encoding, AggregationJobContinueReq, AggregationJobInitializeReq, AggregationJobResp, AggregationJobStep, BatchId, FixedSize, HpkeCiphertext, HpkeConfigId, PartialBatchSelector, PrepareContinue, PrepareError, PrepareInit, PrepareResp, PrepareStepResult, ReportId, ReportMetadata, ReportShare, Time, }; -use prio::topology::ping_pong::PingPongMessage; #[test] fn roundtrip_report_share() { diff --git a/messages/src/tests/collection.rs b/messages/src/tests/collection.rs index 98172f15f..cbf656e73 100644 --- a/messages/src/tests/collection.rs +++ b/messages/src/tests/collection.rs @@ -1,10 +1,11 @@ +use prio::codec::Decode; + use crate::{ roundtrip_encoding, AggregateShare, AggregateShareAad, AggregateShareReq, BatchId, BatchSelector, Collection, CollectionReq, Duration, FixedSize, FixedSizeQuery, HpkeCiphertext, HpkeConfigId, Interval, PartialBatchSelector, Query, ReportIdChecksum, TaskId, Time, TimeInterval, }; -use prio::codec::Decode; #[test] fn roundtrip_collection_req() { diff --git a/messages/src/tests/common.rs b/messages/src/tests/common.rs index ba3dc767d..dca154562 100644 --- a/messages/src/tests/common.rs +++ b/messages/src/tests/common.rs @@ -1,8 +1,9 @@ -use crate::{roundtrip_encoding, Duration, Interval, Role, TaskId, Time, Url}; use assert_matches::assert_matches; use prio::codec::{CodecError, Decode, Encode}; use serde_test::{assert_de_tokens_error, assert_tokens, Token}; +use crate::{roundtrip_encoding, Duration, Interval, Role, TaskId, Time, Url}; + #[test] fn roundtrip_url() { for (test, len) in [ diff --git a/messages/src/tests/hpke.rs b/messages/src/tests/hpke.rs index 28f4b20ba..d9ba31aae 100644 --- a/messages/src/tests/hpke.rs +++ b/messages/src/tests/hpke.rs @@ -1,8 +1,9 @@ +use serde_test::{assert_de_tokens_error, assert_tokens, Token}; + use crate::{ roundtrip_encoding, HpkeAeadId, HpkeCiphertext, HpkeConfig, HpkeConfigId, HpkeConfigList, HpkeKdfId, HpkeKemId, HpkePublicKey, }; -use serde_test::{assert_de_tokens_error, assert_tokens, Token}; #[test] fn roundtrip_hpke_config_id() { diff --git a/tools/src/bin/collect.rs b/tools/src/bin/collect.rs index 5f9f9ffad..fb7136b9c 100644 --- a/tools/src/bin/collect.rs +++ b/tools/src/bin/collect.rs @@ -1,3 +1,5 @@ +use std::{fmt::Debug, fs::File, path::PathBuf, process::exit, time::Duration as StdDuration}; + use anyhow::Context; use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; use clap::{ @@ -26,7 +28,6 @@ use prio::{ vdaf::{self, prio3::Prio3, Vdaf}, }; use rand::random; -use std::{fmt::Debug, fs::File, path::PathBuf, process::exit, time::Duration as StdDuration}; use tracing_log::LogTracer; use tracing_subscriber::{prelude::*, EnvFilter, Registry}; use url::Url; @@ -720,10 +721,8 @@ impl QueryTypeExt for FixedSize { #[cfg(test)] mod tests { - use crate::{ - run, AuthenticationOptions, AuthenticationToken, Error, HpkeConfigOptions, Options, - QueryOptions, Subcommands, VdafType, - }; + use std::io::Write; + use assert_matches::assert_matches; use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; use clap::{error::ErrorKind, CommandFactory, Parser}; @@ -736,9 +735,13 @@ mod tests { use prio::codec::Encode; use rand::random; use reqwest::Url; - use std::io::Write; use tempfile::NamedTempFile; + use crate::{ + run, AuthenticationOptions, AuthenticationToken, Error, HpkeConfigOptions, Options, + QueryOptions, Subcommands, VdafType, + }; + const SAMPLE_COLLECTOR_CREDENTIAL: &str = r#"{ "aead": "AesGcm128", "id": 66, diff --git a/tools/src/bin/dap_decode.rs b/tools/src/bin/dap_decode.rs index 0ab9ad6a6..5840a6f3e 100644 --- a/tools/src/bin/dap_decode.rs +++ b/tools/src/bin/dap_decode.rs @@ -1,3 +1,9 @@ +use std::{ + fmt::Debug, + fs::File, + io::{stdin, Read}, +}; + use anyhow::Result; use clap::{Parser, ValueEnum}; use janus_messages::{ @@ -6,11 +12,6 @@ use janus_messages::{ AggregationJobResp, Collection, CollectionReq, HpkeConfig, HpkeConfigList, Report, }; use prio::codec::Decode; -use std::{ - fmt::Debug, - fs::File, - io::{stdin, Read}, -}; fn main() -> Result<()> { let options = Options::parse(); @@ -151,9 +152,10 @@ struct Options { #[cfg(test)] mod tests { - use crate::Options; use clap::CommandFactory; + use crate::Options; + #[test] fn verify_clap_app() { Options::command().debug_assert(); diff --git a/tools/src/bin/hpke_keygen.rs b/tools/src/bin/hpke_keygen.rs index 27ab398f7..7d66bfa88 100644 --- a/tools/src/bin/hpke_keygen.rs +++ b/tools/src/bin/hpke_keygen.rs @@ -1,3 +1,5 @@ +use std::io::{stdout, Write}; + use anyhow::Result; use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; use clap::Parser; @@ -8,7 +10,6 @@ use janus_core::{ use janus_messages::HpkeConfigId; use prio::codec::Encode; use serde_yaml::to_writer; -use std::io::{stdout, Write}; fn main() -> Result<()> { let options = Options::parse(); @@ -73,9 +74,10 @@ struct Options { #[cfg(test)] mod tests { - use crate::Options; use clap::CommandFactory; + use crate::Options; + #[test] fn verify_clap_app() { Options::command().debug_assert(); diff --git a/xtask/src/main.rs b/xtask/src/main.rs index 6cb560a3f..e8837d184 100644 --- a/xtask/src/main.rs +++ b/xtask/src/main.rs @@ -1,12 +1,13 @@ -use anyhow::{anyhow, Context, Result}; -use clap::{Args, Parser}; -use serde::Deserialize; use std::{ collections::HashMap, env::{self}, fs::File, process::Command, }; + +use anyhow::{anyhow, Context, Result}; +use clap::{Args, Parser}; +use serde::Deserialize; use tempfile::tempdir; /// Command line arguments that will get passed through to Cargo. @@ -154,9 +155,10 @@ fn run_docker_tests(images: ContainerImages, cargo_args: CargoArgs) -> Result<() #[cfg(test)] mod tests { - use crate::Subcommand; use clap::CommandFactory; + use crate::Subcommand; + #[test] fn verify_app() { Subcommand::command().debug_assert();