Skip to content

Commit

Permalink
Set group_imports to StdExternalCrate
Browse files Browse the repository at this point in the history
  • Loading branch information
divergentdave committed May 1, 2024
1 parent f38b066 commit 8bed897
Show file tree
Hide file tree
Showing 92 changed files with 913 additions and 724 deletions.
1 change: 1 addition & 0 deletions .rustfmt.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ format_strings = true
wrap_comments = true
comment_width = 100
imports_granularity = "Crate"
group_imports = "StdExternalCrate"
80 changes: 43 additions & 37 deletions aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,15 @@
//! Common functionality for DAP aggregators.
pub use crate::aggregator::error::Error;
use crate::{
aggregator::{
aggregate_share::compute_aggregate_share,
aggregation_job_writer::{
AggregationJobWriter, AggregationJobWriterMetrics, InitialWrite,
ReportAggregationUpdate as _, WritableReportAggregation,
},
error::{
handle_ping_pong_error, BatchMismatch, OptOutReason, ReportRejection,
ReportRejectionReason,
},
query_type::{CollectableQueryType, UploadableQueryType},
report_writer::{ReportWriteBatcher, WritableReport},
},
cache::{
GlobalHpkeKeypairCache, PeerAggregatorCache, TaskAggregatorCache,
TASK_AGGREGATOR_CACHE_DEFAULT_CAPACITY, TASK_AGGREGATOR_CACHE_DEFAULT_TTL,
},
config::TaskprovConfig,
metrics::{aggregate_step_failure_counter, report_aggregation_success_counter},
use std::{
any::Any,
collections::{HashMap, HashSet},
fmt::Debug,
hash::Hash,
panic::{catch_unwind, resume_unwind, AssertUnwindSafe},
sync::{Arc, Mutex as SyncMutex},
time::{Duration as StdDuration, Instant},
};

use backoff::{backoff::Backoff, Notify};
use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine};
use bytes::Bytes;
Expand Down Expand Up @@ -97,22 +85,36 @@ use ring::{
rand::SystemRandom,
signature::{EcdsaKeyPair, Signature},
};
use std::{
any::Any,
collections::{HashMap, HashSet},
fmt::Debug,
hash::Hash,
panic::{catch_unwind, resume_unwind, AssertUnwindSafe},
sync::{Arc, Mutex as SyncMutex},
time::{Duration as StdDuration, Instant},
};
use tokio::{
sync::oneshot::{self, error::RecvError},
try_join,
};
use tracing::{debug, info, info_span, trace_span, warn, Level, Span};
use url::Url;

pub use crate::aggregator::error::Error;
use crate::{
aggregator::{
aggregate_share::compute_aggregate_share,
aggregation_job_writer::{
AggregationJobWriter, AggregationJobWriterMetrics, InitialWrite,
ReportAggregationUpdate as _, WritableReportAggregation,
},
error::{
handle_ping_pong_error, BatchMismatch, OptOutReason, ReportRejection,
ReportRejectionReason,
},
query_type::{CollectableQueryType, UploadableQueryType},
report_writer::{ReportWriteBatcher, WritableReport},
},
cache::{
GlobalHpkeKeypairCache, PeerAggregatorCache, TaskAggregatorCache,
TASK_AGGREGATOR_CACHE_DEFAULT_CAPACITY, TASK_AGGREGATOR_CACHE_DEFAULT_TTL,
},
config::TaskprovConfig,
metrics::{aggregate_step_failure_counter, report_aggregation_success_counter},
};

#[cfg(test)]
mod aggregate_init_tests;
pub mod aggregate_share;
Expand Down Expand Up @@ -3252,10 +3254,12 @@ async fn send_request_to_helper(
#[cfg(feature = "test-util")]
#[cfg_attr(docsrs, doc(cfg(feature = "test-util")))]
pub(crate) mod test_util {
use crate::{aggregator::Config, binaries::aggregator::parse_pem_ec_private_key};
use ring::signature::EcdsaKeyPair;
use std::time::Duration;

use ring::signature::EcdsaKeyPair;

use crate::{aggregator::Config, binaries::aggregator::parse_pem_ec_private_key};

pub(crate) const BATCH_AGGREGATION_SHARD_COUNT: u64 = 32;

/// HPKE config signing key for use in tests.
Expand Down Expand Up @@ -3300,10 +3304,8 @@ uKFxOelIgsiZJXKZNCX0FBmrfpCkKklCcg==

#[cfg(test)]
mod tests {
use crate::aggregator::{
error::ReportRejectionReason, test_util::default_aggregator_config, Aggregator, Config,
Error,
};
use std::{collections::HashSet, iter, sync::Arc, time::Duration as StdDuration};

use assert_matches::assert_matches;
use futures::future::try_join_all;
use janus_aggregator_core::{
Expand Down Expand Up @@ -3341,7 +3343,11 @@ mod tests {
vdaf::{self, prio3::Prio3Count, Client as _},
};
use rand::random;
use std::{collections::HashSet, iter, sync::Arc, time::Duration as StdDuration};

use crate::aggregator::{
error::ReportRejectionReason, test_util::default_aggregator_config, Aggregator, Config,
Error,
};

pub(super) fn create_report_custom(
task: &AggregatorTask,
Expand Down
20 changes: 11 additions & 9 deletions aggregator/src/aggregator/aggregate_init_tests.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
use crate::aggregator::{
http_handlers::{
aggregator_handler,
test_util::{decode_response_body, take_problem_details},
},
tests::generate_helper_report_share,
Config,
};
use std::sync::Arc;

use assert_matches::assert_matches;
use http::StatusCode;
use janus_aggregator_core::{
Expand Down Expand Up @@ -39,10 +33,18 @@ use prio::{
};
use rand::random;
use serde_json::json;
use std::sync::Arc;
use trillium::{Handler, KnownHeaderName, Status};
use trillium_testing::{prelude::put, TestConn};

use crate::aggregator::{
http_handlers::{
aggregator_handler,
test_util::{decode_response_body, take_problem_details},
},
tests::generate_helper_report_share,
Config,
};

#[derive(Clone)]
pub(super) struct PrepareInitGenerator<const VERIFY_KEY_SIZE: usize, V>
where
Expand Down
3 changes: 2 additions & 1 deletion aggregator/src/aggregator/aggregate_share.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! Implements functionality for computing & validating aggregate shares.
use super::Error;
use janus_aggregator_core::{
datastore::{
self,
Expand All @@ -12,6 +11,8 @@ use janus_core::{report_id::ReportIdChecksumExt, time::IntervalExt as _};
use janus_messages::{query_type::QueryType, Interval, ReportIdChecksum};
use prio::vdaf::{self, Aggregatable};

use super::Error;

/// Computes the aggregate share over the provided batch aggregations.
///
/// The assumption is that all aggregation jobs contributing to those batch aggregations have been
Expand Down
51 changes: 28 additions & 23 deletions aggregator/src/aggregator/aggregation_job_continue.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
//! Implements portions of aggregation job continuation for the helper.
use crate::aggregator::{
aggregation_job_writer::{AggregationJobWriter, UpdateWrite, WritableReportAggregation},
error::handle_ping_pong_error,
AggregatorMetrics, Error, VdafOps,
use std::{
any::Any,
panic::{catch_unwind, resume_unwind, AssertUnwindSafe},
sync::Arc,
};

use janus_aggregator_core::{
datastore::{
self,
Expand All @@ -24,14 +25,15 @@ use prio::{
topology::ping_pong::{PingPongContinuedValue, PingPongState, PingPongTopology},
vdaf,
};
use std::{
any::Any,
panic::{catch_unwind, resume_unwind, AssertUnwindSafe},
sync::Arc,
};
use tokio::sync::oneshot::{self, error::RecvError};
use tracing::{info_span, trace_span, Span};

use crate::aggregator::{
aggregation_job_writer::{AggregationJobWriter, UpdateWrite, WritableReportAggregation},
error::handle_ping_pong_error,
AggregatorMetrics, Error, VdafOps,
};

impl VdafOps {
/// Step the helper's aggregation job to the next step using the step `n` ping pong state in
/// `report_aggregations` with the step `n+1` ping pong messages in `leader_aggregation_job`.
Expand Down Expand Up @@ -276,7 +278,6 @@ impl VdafOps {
#[cfg(feature = "test-util")]
#[cfg_attr(docsrs, doc(cfg(feature = "test-util")))]
pub mod test_util {
use crate::aggregator::http_handlers::test_util::{decode_response_body, take_problem_details};
use assert_matches::assert_matches;
use janus_aggregator_core::task::test_util::Task;
use janus_messages::{AggregationJobContinueReq, AggregationJobId, AggregationJobResp};
Expand All @@ -285,6 +286,8 @@ pub mod test_util {
use trillium::{Handler, KnownHeaderName, Status};
use trillium_testing::{assert_headers, prelude::post, TestConn};

use crate::aggregator::http_handlers::test_util::{decode_response_body, take_problem_details};

async fn post_aggregation_job(
task: &Task,
aggregation_job_id: &AggregationJobId,
Expand Down Expand Up @@ -371,18 +374,8 @@ pub mod test_util {

#[cfg(test)]
mod tests {
use crate::aggregator::{
aggregate_init_tests::{put_aggregation_job, PrepareInitGenerator},
aggregation_job_continue::test_util::{
post_aggregation_job_and_decode, post_aggregation_job_expecting_error,
post_aggregation_job_expecting_status,
},
http_handlers::{
aggregator_handler,
test_util::{setup_http_handler_test, take_problem_details},
},
test_util::default_aggregator_config,
};
use std::sync::Arc;

use janus_aggregator_core::{
datastore::{
models::{
Expand Down Expand Up @@ -418,10 +411,22 @@ mod tests {
};
use rand::random;
use serde_json::json;
use std::sync::Arc;
use trillium::{Handler, Status};
use trillium_testing::prelude::delete;

use crate::aggregator::{
aggregate_init_tests::{put_aggregation_job, PrepareInitGenerator},
aggregation_job_continue::test_util::{
post_aggregation_job_and_decode, post_aggregation_job_expecting_error,
post_aggregation_job_expecting_status,
},
http_handlers::{
aggregator_handler,
test_util::{setup_http_handler_test, take_problem_details},
},
test_util::default_aggregator_config,
};

struct AggregationJobContinueTestCase<
const VERIFY_KEY_LENGTH: usize,
V: Aggregator<VERIFY_KEY_LENGTH, 16>,
Expand Down
41 changes: 22 additions & 19 deletions aggregator/src/aggregator/aggregation_job_creator.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use crate::aggregator::{
aggregation_job_writer::{AggregationJobWriter, InitialWrite},
batch_creator::BatchCreator,
use std::{
cmp::min,
collections::{HashMap, HashSet},
sync::Arc,
time::Duration,
};

#[cfg(feature = "fpvec_bounded_l2")]
use fixed::{
types::extra::{U15, U31},
Expand Down Expand Up @@ -53,19 +56,18 @@ use prio::{
},
};
use rand::{random, thread_rng, Rng};
use std::{
cmp::min,
collections::{HashMap, HashSet},
sync::Arc,
time::Duration,
};
use tokio::{
time::{self, sleep_until, Instant, MissedTickBehavior},
try_join,
};
use tracing::{debug, error, info};
use trillium_tokio::{CloneCounterObserver, Stopper};

use crate::aggregator::{
aggregation_job_writer::{AggregationJobWriter, InitialWrite},
batch_creator::BatchCreator,
};

pub struct AggregationJobCreator<C: Clock> {
// Dependencies.
datastore: Datastore<C>,
Expand Down Expand Up @@ -911,9 +913,15 @@ impl<C: Clock + 'static> AggregationJobCreator<C> {

#[cfg(test)]
mod tests {
use crate::aggregator::test_util::BATCH_AGGREGATION_SHARD_COUNT;
use std::{
any::{Any, TypeId},
collections::{HashMap, HashSet},
hash::Hash,
iter,
sync::Arc,
time::Duration,
};

use super::AggregationJobCreator;
use futures::future::try_join_all;
use janus_aggregator_core::{
datastore::{
Expand Down Expand Up @@ -946,17 +954,12 @@ mod tests {
prio3::{Prio3, Prio3Count},
};
use rand::random;
use std::{
any::{Any, TypeId},
collections::{HashMap, HashSet},
hash::Hash,
iter,
sync::Arc,
time::Duration,
};
use tokio::{task, time, try_join};
use trillium_tokio::Stopper;

use super::AggregationJobCreator;
use crate::aggregator::test_util::BATCH_AGGREGATION_SHARD_COUNT;

#[tokio::test]
async fn aggregation_job_creator() {
// This is a minimal test that AggregationJobCreator::run() will successfully find tasks &
Expand Down
Loading

0 comments on commit 8bed897

Please sign in to comment.