diff --git a/integration_tests/src/daphne.rs b/integration_tests/src/daphne.rs index 8530b0a5f..84ffd9ddf 100644 --- a/integration_tests/src/daphne.rs +++ b/integration_tests/src/daphne.rs @@ -1,7 +1,7 @@ //! Functionality for tests interacting with Daphne (). use crate::interop_api; -use janus_aggregator_core::task::{test_util::TaskBuilder, Task}; +use janus_aggregator_core::task::test_util::{NewTaskBuilder as TaskBuilder, Task}; use janus_interop_binaries::{ get_rust_log_level, test_util::await_http_server, ContainerLogsDropGuard, ContainerLogsSource, }; @@ -25,8 +25,9 @@ impl<'a> Daphne<'a> { container_client: &'a Cli, network: &str, task: &Task, + role: Role, ) -> Daphne<'a> { - let (endpoint, image_name_and_tag) = match task.role() { + let (endpoint, image_name_and_tag) = match role { Role::Leader => panic!("A leader container image for Daphne is not yet available"), Role::Helper => ( task.helper_aggregator_endpoint(), @@ -63,7 +64,7 @@ impl<'a> Daphne<'a> { }; // Write the given task to the Daphne instance we started. - interop_api::aggregator_add_task(port, task).await; + interop_api::aggregator_add_task(port, task, role).await; Self { daphne_container } } diff --git a/integration_tests/src/interop_api.rs b/integration_tests/src/interop_api.rs index a11580fc7..e9a5d35a8 100644 --- a/integration_tests/src/interop_api.rs +++ b/integration_tests/src/interop_api.rs @@ -1,15 +1,16 @@ -use janus_aggregator_core::task::Task; +use janus_aggregator_core::task::test_util::Task; use janus_interop_binaries::AggregatorAddTaskRequest; +use janus_messages::Role; use std::collections::HashMap; use url::Url; /// Send an interop test API request to add a DAP task. This assumes the server is available on /// some localhost port. -pub async fn aggregator_add_task(port: u16, task: Task) { +pub async fn aggregator_add_task(port: u16, task: Task, role: Role) { let http_client = reqwest::Client::default(); let resp = http_client .post(Url::parse(&format!("http://127.0.0.1:{port}/internal/test/add_task")).unwrap()) - .json(&AggregatorAddTaskRequest::from(task)) + .json(&AggregatorAddTaskRequest::from_task(task, role)) .send() .await .unwrap(); diff --git a/integration_tests/src/janus.rs b/integration_tests/src/janus.rs index d57863691..8a8c516c5 100644 --- a/integration_tests/src/janus.rs +++ b/integration_tests/src/janus.rs @@ -1,7 +1,7 @@ //! Functionality for tests interacting with Janus (). use crate::interop_api; -use janus_aggregator_core::task::Task; +use janus_aggregator_core::task::test_util::Task; use janus_interop_binaries::{ get_rust_log_level, test_util::await_http_server, testcontainer::Aggregator, ContainerLogsDropGuard, @@ -22,9 +22,10 @@ impl<'a> Janus<'a> { container_client: &'a Cli, network: &str, task: &Task, + role: Role, ) -> Janus<'a> { // Start the Janus interop aggregator container running. - let endpoint = match task.role() { + let endpoint = match role { Role::Leader => task.leader_aggregator_endpoint(), Role::Helper => task.helper_aggregator_endpoint(), _ => panic!("unexpected task role"), @@ -44,7 +45,7 @@ impl<'a> Janus<'a> { await_http_server(port).await; // Write the given task to the Janus instance we started. - interop_api::aggregator_add_task(port, task.clone()).await; + interop_api::aggregator_add_task(port, task.clone(), role).await; Self { container } } diff --git a/integration_tests/src/lib.rs b/integration_tests/src/lib.rs index 0e6cfad95..8f706e172 100644 --- a/integration_tests/src/lib.rs +++ b/integration_tests/src/lib.rs @@ -2,8 +2,8 @@ use janus_aggregator_core::task::QueryType; use janus_collector::AuthenticationToken; -use janus_core::{hpke::HpkePrivateKey, vdaf::VdafInstance}; -use janus_messages::{Duration, HpkeConfig, TaskId}; +use janus_core::{hpke::HpkeKeypair, vdaf::VdafInstance}; +use janus_messages::{Duration, TaskId}; use url::Url; pub mod client; @@ -20,8 +20,7 @@ pub struct TaskParameters { pub vdaf: VdafInstance, pub min_batch_size: u64, pub time_precision: Duration, - pub collector_hpke_config: HpkeConfig, - pub collector_private_key: HpkePrivateKey, + pub collector_hpke_keypair: HpkeKeypair, pub collector_auth_token: AuthenticationToken, } diff --git a/integration_tests/tests/common/mod.rs b/integration_tests/tests/common/mod.rs index 087655b3e..353400a5b 100644 --- a/integration_tests/tests/common/mod.rs +++ b/integration_tests/tests/common/mod.rs @@ -1,9 +1,8 @@ use backoff::{future::retry, ExponentialBackoffBuilder}; use itertools::Itertools; -use janus_aggregator_core::task::{test_util::TaskBuilder, QueryType}; +use janus_aggregator_core::task::{test_util::NewTaskBuilder as TaskBuilder, QueryType}; use janus_collector::{Collection, Collector, CollectorParameters}; use janus_core::{ - hpke::test_util::generate_test_hpke_config_and_private_key, retries::test_http_request_exponential_backoff, time::{Clock, RealClock, TimeExt}, vdaf::VdafInstance, @@ -15,7 +14,7 @@ use janus_integration_tests::{ use janus_messages::{ problem_type::DapProblemType, query_type::{self, FixedSize}, - Duration, FixedSizeQuery, Interval, Query, Role, + Duration, FixedSizeQuery, Interval, Query, }; use prio::vdaf::{self, prio3::Prio3}; use rand::{random, thread_rng, Rng}; @@ -23,12 +22,11 @@ use std::{iter, time::Duration as StdDuration}; use tokio::time::{self, sleep}; use url::Url; -/// Returns a tuple of [`TaskParameters`], a task builder for the leader, and a task builder for the -/// helper. -pub fn test_task_builders( +/// Returns a tuple of [`TaskParameters`] and a task builder. +pub fn test_task_builder( vdaf: VdafInstance, query_type: QueryType, -) -> (TaskParameters, TaskBuilder, TaskBuilder) { +) -> (TaskParameters, TaskBuilder) { let endpoint_random_value = hex::encode(random::<[u8; 4]>()); let endpoint_fragments = EndpointFragments { leader_endpoint_host: format!("leader-{endpoint_random_value}"), @@ -36,8 +34,7 @@ pub fn test_task_builders( helper_endpoint_host: format!("helper-{endpoint_random_value}"), helper_endpoint_path: "/".to_string(), }; - let collector_keypair = generate_test_hpke_config_and_private_key(); - let leader_task = TaskBuilder::new(query_type, vdaf.clone(), Role::Leader) + let task_builder = TaskBuilder::new(query_type, vdaf.clone()) .with_leader_aggregator_endpoint( Url::parse(&format!("http://leader-{endpoint_random_value}:8080/")).unwrap(), ) @@ -47,26 +44,19 @@ pub fn test_task_builders( .with_min_batch_size(46) // Force use of DAP-Auth-Tokens, as required by interop testing standard. .with_dap_auth_aggregator_token() - .with_dap_auth_collector_token() - .with_collector_hpke_config(collector_keypair.config().clone()); - let helper_task = leader_task - .clone() - .with_role(Role::Helper) - .with_collector_auth_token(None); - let temporary_task = leader_task.clone().build(); + .with_dap_auth_collector_token(); let task_parameters = TaskParameters { - task_id: *temporary_task.id(), + task_id: *task_builder.task_id(), endpoint_fragments, query_type, vdaf, - min_batch_size: temporary_task.min_batch_size(), - time_precision: *temporary_task.time_precision(), - collector_hpke_config: collector_keypair.config().clone(), - collector_private_key: collector_keypair.private_key().clone(), - collector_auth_token: temporary_task.collector_auth_token().unwrap().clone(), + min_batch_size: task_builder.min_batch_size(), + time_precision: *task_builder.time_precision(), + collector_hpke_keypair: task_builder.collector_hpke_keypair().clone(), + collector_auth_token: task_builder.collector_auth_token().clone(), }; - (task_parameters, leader_task, helper_task) + (task_parameters, task_builder) } /// A set of inputs and an expected output for a VDAF's aggregation. @@ -145,8 +135,8 @@ pub async fn submit_measurements_and_verify_aggregate_generic( task_parameters.task_id, leader_endpoint, task_parameters.collector_auth_token.clone(), - task_parameters.collector_hpke_config.clone(), - task_parameters.collector_private_key.clone(), + task_parameters.collector_hpke_keypair.config().clone(), + task_parameters.collector_hpke_keypair.private_key().clone(), ) .with_http_request_backoff(test_http_request_exponential_backoff()) .with_collect_poll_backoff( diff --git a/integration_tests/tests/daphne.rs b/integration_tests/tests/daphne.rs index af1cae227..473926ecd 100644 --- a/integration_tests/tests/daphne.rs +++ b/integration_tests/tests/daphne.rs @@ -1,11 +1,12 @@ -use common::{submit_measurements_and_verify_aggregate, test_task_builders}; -use janus_aggregator_core::task::{QueryType, Task}; +use common::{submit_measurements_and_verify_aggregate, test_task_builder}; +use janus_aggregator_core::task::QueryType; use janus_core::{ test_util::{install_test_trace_subscriber, testcontainers::container_client}, vdaf::VdafInstance, }; use janus_integration_tests::{client::ClientBackend, daphne::Daphne, janus::Janus}; use janus_interop_binaries::test_util::generate_network_name; +use janus_messages::Role; mod common; @@ -18,26 +19,20 @@ async fn daphne_janus() { // Start servers. let network = generate_network_name(); - let (mut task_parameters, leader_task, helper_task) = - test_task_builders(VdafInstance::Prio3Count, QueryType::TimeInterval); + let (mut task_parameters, task_builder) = + test_task_builder(VdafInstance::Prio3Count, QueryType::TimeInterval); // Daphne is hardcoded to serve from a path starting with /v04/. task_parameters.endpoint_fragments.leader_endpoint_path = "/v04/".to_string(); - let [leader_task, helper_task]: [Task; 2] = [leader_task, helper_task] - .into_iter() - .map(|task| { - let mut leader_aggregator_endpoint = task.leader_aggregator_endpoint().clone(); - leader_aggregator_endpoint.set_path("/v04/"); - task.with_leader_aggregator_endpoint(leader_aggregator_endpoint) - .build() - }) - .collect::>() - .try_into() - .unwrap(); + let mut leader_aggregator_endpoint = task_builder.leader_aggregator_endpoint().clone(); + leader_aggregator_endpoint.set_path("/v04/"); + let task = task_builder + .with_leader_aggregator_endpoint(leader_aggregator_endpoint) + .build(); let container_client = container_client(); - let leader = Daphne::new(TEST_NAME, &container_client, &network, &leader_task).await; - let helper = Janus::new(TEST_NAME, &container_client, &network, &helper_task).await; + let leader = Daphne::new(TEST_NAME, &container_client, &network, &task, Role::Leader).await; + let helper = Janus::new(TEST_NAME, &container_client, &network, &task, Role::Helper).await; // Run the behavioral test. submit_measurements_and_verify_aggregate( @@ -58,26 +53,20 @@ async fn janus_daphne() { // Start servers. let network = generate_network_name(); - let (mut task_parameters, leader_task, helper_task) = - test_task_builders(VdafInstance::Prio3Count, QueryType::TimeInterval); + let (mut task_parameters, task_builder) = + test_task_builder(VdafInstance::Prio3Count, QueryType::TimeInterval); // Daphne is hardcoded to serve from a path starting with /v04/. - task_parameters.endpoint_fragments.helper_endpoint_path = "/v04/".to_string(); - let [leader_task, helper_task]: [Task; 2] = [leader_task, helper_task] - .into_iter() - .map(|task| { - let mut helper_aggregator_endpoint = task.helper_aggregator_endpoint().clone(); - helper_aggregator_endpoint.set_path("/v04/"); - task.with_helper_aggregator_endpoint(helper_aggregator_endpoint) - .build() - }) - .collect::>() - .try_into() - .unwrap(); + task_parameters.endpoint_fragments.leader_endpoint_path = "/v04/".to_string(); + let mut helper_aggregator_endpoint = task_builder.helper_aggregator_endpoint().clone(); + helper_aggregator_endpoint.set_path("/v04/"); + let task = task_builder + .with_helper_aggregator_endpoint(helper_aggregator_endpoint) + .build(); let container_client = container_client(); - let leader = Janus::new(TEST_NAME, &container_client, &network, &leader_task).await; - let helper = Daphne::new(TEST_NAME, &container_client, &network, &helper_task).await; + let leader = Janus::new(TEST_NAME, &container_client, &network, &task, Role::Leader).await; + let helper = Daphne::new(TEST_NAME, &container_client, &network, &task, Role::Helper).await; // Run the behavioral test. submit_measurements_and_verify_aggregate( diff --git a/integration_tests/tests/divviup_ts.rs b/integration_tests/tests/divviup_ts.rs index 292588c00..24cb9f2ee 100644 --- a/integration_tests/tests/divviup_ts.rs +++ b/integration_tests/tests/divviup_ts.rs @@ -1,6 +1,6 @@ //! These tests check interoperation between the divviup-ts client and Janus aggregators. -use common::{submit_measurements_and_verify_aggregate, test_task_builders}; +use common::{submit_measurements_and_verify_aggregate, test_task_builder}; use janus_aggregator_core::task::QueryType; use janus_core::{ test_util::{install_test_trace_subscriber, testcontainers::container_client}, @@ -11,6 +11,7 @@ use janus_integration_tests::{ janus::Janus, }; use janus_interop_binaries::test_util::generate_network_name; +use janus_messages::Role; use testcontainers::clients::Cli; mod common; @@ -20,11 +21,11 @@ async fn run_divviup_ts_integration_test( container_client: &Cli, vdaf: VdafInstance, ) { - let (task_parameters, leader_task, helper_task) = - test_task_builders(vdaf, QueryType::TimeInterval); + let (task_parameters, task_builder) = test_task_builder(vdaf, QueryType::TimeInterval); + let task = task_builder.build(); let network = generate_network_name(); - let leader = Janus::new(test_name, container_client, &network, &leader_task.build()).await; - let helper = Janus::new(test_name, container_client, &network, &helper_task.build()).await; + let leader = Janus::new(test_name, container_client, &network, &task, Role::Leader).await; + let helper = Janus::new(test_name, container_client, &network, &task, Role::Helper).await; let client_backend = ClientBackend::Container { container_client, diff --git a/integration_tests/tests/in_cluster.rs b/integration_tests/tests/in_cluster.rs index b12024661..a47d5e614 100644 --- a/integration_tests/tests/in_cluster.rs +++ b/integration_tests/tests/in_cluster.rs @@ -1,5 +1,5 @@ #![cfg(feature = "in-cluster")] -use common::{submit_measurements_and_verify_aggregate, test_task_builders}; +use common::{submit_measurements_and_verify_aggregate, test_task_builder}; use divviup_client::{ Client, CollectorAuthenticationToken, DivviupClient, Histogram, HpkeConfigContents, NewAggregator, NewSharedAggregator, NewTask, Vdaf, @@ -88,7 +88,7 @@ impl InClusterJanusPair { let cluster = Cluster::new(&kubeconfig_path, &kubectl_context_name); - let (mut task_parameters, task_builder, _) = test_task_builders(vdaf, query_type); + let (mut task_parameters, task_builder) = test_task_builder(vdaf, query_type); let task = task_builder.with_min_batch_size(100).build(); task_parameters.min_batch_size = 100; @@ -144,7 +144,7 @@ impl InClusterJanusPair { .await .unwrap(); - let hpke_config = task.collector_hpke_config().unwrap(); + let hpke_config = task.collector_hpke_keypair().config(); let collector_hpke_config = divviup_api .create_hpke_config( account.id, diff --git a/integration_tests/tests/janus.rs b/integration_tests/tests/janus.rs index d6180c717..b2cd28caa 100644 --- a/integration_tests/tests/janus.rs +++ b/integration_tests/tests/janus.rs @@ -1,4 +1,4 @@ -use common::{submit_measurements_and_verify_aggregate, test_task_builders}; +use common::{submit_measurements_and_verify_aggregate, test_task_builder}; use janus_aggregator_core::task::QueryType; use janus_core::{ test_util::{install_test_trace_subscriber, testcontainers::container_client}, @@ -6,6 +6,7 @@ use janus_core::{ }; use janus_integration_tests::{client::ClientBackend, janus::Janus, TaskParameters}; use janus_interop_binaries::test_util::generate_network_name; +use janus_messages::Role; use testcontainers::clients::Cli; mod common; @@ -31,11 +32,12 @@ impl<'a> JanusPair<'a> { vdaf: VdafInstance, query_type: QueryType, ) -> JanusPair<'a> { - let (task_parameters, leader_task, helper_task) = test_task_builders(vdaf, query_type); + let (task_parameters, task_builder) = test_task_builder(vdaf, query_type); + let task = task_builder.build(); let network = generate_network_name(); - let leader = Janus::new(test_name, container_client, &network, &leader_task.build()).await; - let helper = Janus::new(test_name, container_client, &network, &helper_task.build()).await; + let leader = Janus::new(test_name, container_client, &network, &task, Role::Leader).await; + let helper = Janus::new(test_name, container_client, &network, &task, Role::Helper).await; Self { task_parameters, diff --git a/interop_binaries/Cargo.toml b/interop_binaries/Cargo.toml index b99c35fb4..d6991f253 100644 --- a/interop_binaries/Cargo.toml +++ b/interop_binaries/Cargo.toml @@ -30,7 +30,7 @@ futures = { version = "0.3.28", optional = true } fixed = { version = "1.23", optional = true } fixed-macro = { version = "1.1.1", optional = true } hex = { version = "0.4", optional = true } -janus_aggregator_core.workspace = true +janus_aggregator_core = { workspace = true, features = ["test-util"] } janus_aggregator.workspace = true janus_client.workspace = true janus_collector.workspace = true diff --git a/interop_binaries/src/bin/janus_interop_aggregator.rs b/interop_binaries/src/bin/janus_interop_aggregator.rs index 2785abfbd..a9353798e 100644 --- a/interop_binaries/src/bin/janus_interop_aggregator.rs +++ b/interop_binaries/src/bin/janus_interop_aggregator.rs @@ -8,7 +8,7 @@ use janus_aggregator::{ }; use janus_aggregator_core::{ datastore::Datastore, - task::{self, Task}, + task::{self, AggregatorTask, AggregatorTaskParameters}, SecretBytes, }; use janus_core::{auth_tokens::AuthenticationToken, time::RealClock}; @@ -38,6 +38,10 @@ async fn handle_add_task( keyring: &Mutex, request: AggregatorAddTaskRequest, ) -> anyhow::Result<()> { + let peer_aggregator_endpoint = match request.role { + AggregatorRole::Leader => request.helper, + AggregatorRole::Helper => request.leader, + }; let vdaf = request.vdaf.into(); let leader_authentication_token = AuthenticationToken::new_dap_auth_token_from_string(request.leader_authentication_token) @@ -54,17 +58,25 @@ async fn handle_add_task( let collector_hpke_config = HpkeConfig::get_decoded(&collector_hpke_config_bytes) .context("could not parse collector HPKE configuration")?; - let collector_authentication_token = - match (request.role, request.collector_authentication_token) { - (AggregatorRole::Leader, None) => { - return Err(anyhow::anyhow!("collector authentication token is missing")) + let aggregator_parameters = match (request.role, request.collector_authentication_token) { + (AggregatorRole::Leader, None) => { + return Err(anyhow::anyhow!("collector authentication token is missing")) + } + (AggregatorRole::Leader, Some(collector_authentication_token)) => { + AggregatorTaskParameters::Leader { + aggregator_auth_token: leader_authentication_token, + collector_auth_token: AuthenticationToken::new_dap_auth_token_from_string( + collector_authentication_token, + ) + .context("invalid header value in \"collector_authentication_token\"")?, + collector_hpke_config, } - (AggregatorRole::Leader, Some(collector_authentication_token)) => Some( - AuthenticationToken::new_dap_auth_token_from_string(collector_authentication_token) - .context("invalid header value in \"collector_authentication_token\"")?, - ), - (AggregatorRole::Helper, _) => None, - }; + } + (AggregatorRole::Helper, _) => AggregatorTaskParameters::Helper { + aggregator_auth_token: leader_authentication_token, + collector_hpke_config, + }, + }; let hpke_keypair = keyring.lock().await.get_random_keypair(); @@ -84,13 +96,11 @@ async fn handle_add_task( } }; - let task = Task::new( + let task = AggregatorTask::new( request.task_id, - request.leader, - request.helper, + peer_aggregator_endpoint, query_type, vdaf, - request.role.into(), vdaf_verify_key, request.max_batch_query_count, request.task_expiration.map(Time::from_seconds_since_epoch), @@ -100,17 +110,15 @@ async fn handle_add_task( // We can be strict about clock skew since this executable is only intended for use with // other aggregators running on the same host. Duration::from_seconds(1), - collector_hpke_config, - Some(leader_authentication_token), - collector_authentication_token, [hpke_keypair], + aggregator_parameters, ) .context("error constructing task")?; datastore .run_tx(move |tx| { let task = task.clone(); - Box::pin(async move { tx.put_task(&task).await }) + Box::pin(async move { tx.put_aggregator_task(&task).await }) }) .await .context("error adding task to database") diff --git a/interop_binaries/src/lib.rs b/interop_binaries/src/lib.rs index c96bab270..9e9e18608 100644 --- a/interop_binaries/src/lib.rs +++ b/interop_binaries/src/lib.rs @@ -1,5 +1,5 @@ use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; -use janus_aggregator_core::task::{QueryType, Task}; +use janus_aggregator_core::task::{test_util::Task, QueryType}; use janus_core::{ hpke::{generate_hpke_config_and_private_key, HpkeKeypair}, vdaf::VdafInstance, @@ -278,15 +278,8 @@ pub struct AggregatorAddTaskRequest { pub task_expiration: Option, // in seconds since the epoch } -#[derive(Debug, Serialize, Deserialize)] -pub struct AddTaskResponse { - pub status: String, - #[serde(default)] - pub error: Option, -} - -impl From for AggregatorAddTaskRequest { - fn from(task: Task) -> Self { +impl AggregatorAddTaskRequest { + pub fn from_task(task: Task, role: Role) -> Self { let (query_type, max_batch_size) = match task.query_type() { QueryType::TimeInterval => (TimeInterval::CODE as u8, None), QueryType::FixedSize { max_batch_size, .. } => { @@ -299,18 +292,15 @@ impl From for AggregatorAddTaskRequest { helper: task.helper_aggregator_endpoint().clone(), vdaf: task.vdaf().clone().into(), leader_authentication_token: String::from_utf8( - task.aggregator_auth_token().unwrap().as_ref().to_vec(), + task.aggregator_auth_token().as_ref().to_vec(), ) .unwrap(), - collector_authentication_token: if task.role() == &Role::Leader { - Some( - String::from_utf8(task.collector_auth_token().unwrap().as_ref().to_vec()) - .unwrap(), - ) + collector_authentication_token: if role == Role::Leader { + Some(String::from_utf8(task.collector_auth_token().as_ref().to_vec()).unwrap()) } else { None }, - role: (*task.role()).try_into().unwrap(), + role: role.try_into().unwrap(), vdaf_verify_key: URL_SAFE_NO_PAD.encode(task.opaque_vdaf_verify_key().as_ref()), max_batch_query_count: task.max_batch_query_count(), query_type, @@ -318,12 +308,19 @@ impl From for AggregatorAddTaskRequest { max_batch_size, time_precision: task.time_precision().as_seconds(), collector_hpke_config: URL_SAFE_NO_PAD - .encode(task.collector_hpke_config().unwrap().get_encoded()), + .encode(task.collector_hpke_keypair().config().get_encoded()), task_expiration: task.task_expiration().map(Time::as_seconds_since_epoch), } } } +#[derive(Debug, Serialize, Deserialize)] +pub struct AddTaskResponse { + pub status: String, + #[serde(default)] + pub error: Option, +} + pub fn install_tracing_subscriber() -> anyhow::Result<()> { let stdout_filter = EnvFilter::builder().from_env()?; let layer = tracing_subscriber::fmt::layer()