From 153872e696ec0e7f3640d8c63138e73c7febc4f2 Mon Sep 17 00:00:00 2001 From: Tim Geoghegan Date: Thu, 21 Sep 2023 15:22:19 -0700 Subject: [PATCH 1/6] works through janus_aggregator_core EXCEPT datastore --- aggregator/src/aggregator.rs | 3 +- .../src/aggregator/aggregate_init_tests.rs | 5 +- .../src/aggregator/collection_job_tests.rs | 3 +- aggregator/src/aggregator/http_handlers.rs | 7 +- aggregator/src/bin/aggregator.rs | 2 +- aggregator_api/src/lib.rs | 2 +- aggregator_api/src/models.rs | 2 +- aggregator_api/src/tests.rs | 3 +- aggregator_core/src/datastore.rs | 3 +- aggregator_core/src/datastore/models.rs | 3 +- aggregator_core/src/task.rs | 401 +++++++++++++----- aggregator_core/src/taskprov.rs | 6 +- collector/src/lib.rs | 4 +- core/src/auth_tokens.rs | 289 +++++++++++++ core/src/http.rs | 2 +- core/src/lib.rs | 1 + core/src/task.rs | 173 +------- docs/samples/tasks.yaml | 18 +- integration_tests/tests/in_cluster.rs | 2 +- .../src/bin/janus_interop_aggregator.rs | 2 +- .../src/bin/janus_interop_collector.rs | 10 +- tools/src/bin/collect.rs | 2 +- 22 files changed, 630 insertions(+), 313 deletions(-) create mode 100644 core/src/auth_tokens.rs diff --git a/aggregator/src/aggregator.rs b/aggregator/src/aggregator.rs index e4f1dd222..0597483ed 100644 --- a/aggregator/src/aggregator.rs +++ b/aggregator/src/aggregator.rs @@ -39,9 +39,10 @@ use janus_aggregator_core::{ #[cfg(feature = "test-util")] use janus_core::test_util::dummy_vdaf; use janus_core::{ + auth_tokens::AuthenticationToken, hpke::{self, HpkeApplicationInfo, HpkeKeypair, Label}, http::response_to_problem_details, - task::{AuthenticationToken, VdafInstance, VERIFY_KEY_LENGTH}, + task::{VdafInstance, VERIFY_KEY_LENGTH}, time::{Clock, DurationExt, IntervalExt, TimeExt}, }; use janus_messages::{ diff --git a/aggregator/src/aggregator/aggregate_init_tests.rs b/aggregator/src/aggregator/aggregate_init_tests.rs index 3487c0ba1..e1a73d5f7 100644 --- a/aggregator/src/aggregator/aggregate_init_tests.rs +++ b/aggregator/src/aggregator/aggregate_init_tests.rs @@ -17,7 +17,8 @@ use janus_aggregator_core::{ test_util::noop_meter, }; use janus_core::{ - task::{AuthenticationToken, VdafInstance, DAP_AUTH_HEADER}, + auth_tokens::{AuthenticationToken, DAP_AUTH_HEADER}, + task::VdafInstance, test_util::{dummy_vdaf, install_test_trace_subscriber, run_vdaf, VdafTranscript}, time::{Clock, MockClock, TimeExt as _}, }; @@ -209,7 +210,7 @@ async fn setup_aggregate_init_test_without_sending_request< install_test_trace_subscriber(); let task = TaskBuilder::new(QueryType::TimeInterval, vdaf_instance, Role::Helper) - .with_aggregator_auth_token(Some(auth_token)) + .with_aggregator_auth_token(auth_token) .build(); let clock = MockClock::default(); let ephemeral_datastore = ephemeral_datastore().await; diff --git a/aggregator/src/aggregator/collection_job_tests.rs b/aggregator/src/aggregator/collection_job_tests.rs index a190200dd..f8264728d 100644 --- a/aggregator/src/aggregator/collection_job_tests.rs +++ b/aggregator/src/aggregator/collection_job_tests.rs @@ -20,11 +20,12 @@ use janus_aggregator_core::{ test_util::noop_meter, }; use janus_core::{ + auth_tokens::AuthenticationToken, hpke::{ self, test_util::generate_test_hpke_config_and_private_key, HpkeApplicationInfo, HpkeKeypair, Label, }, - task::{AuthenticationToken, VdafInstance}, + task::VdafInstance, test_util::{ dummy_vdaf::{self, AggregationParam}, install_test_trace_subscriber, diff --git a/aggregator/src/aggregator/http_handlers.rs b/aggregator/src/aggregator/http_handlers.rs index 6581bf7a9..647eac5ef 100644 --- a/aggregator/src/aggregator/http_handlers.rs +++ b/aggregator/src/aggregator/http_handlers.rs @@ -4,8 +4,8 @@ use async_trait::async_trait; use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; use janus_aggregator_core::{datastore::Datastore, instrumented}; use janus_core::{ + auth_tokens::{AuthenticationToken, DAP_AUTH_HEADER}, http::extract_bearer_token, - task::{AuthenticationToken, DAP_AUTH_HEADER}, taskprov::TASKPROV_HEADER, time::Clock, }; @@ -679,6 +679,7 @@ mod tests { test_util::noop_meter, }; use janus_core::{ + auth_tokens::AuthenticationToken, hpke::{ self, test_util::{ @@ -688,7 +689,7 @@ mod tests { HpkeApplicationInfo, HpkeKeypair, Label, }, report_id::ReportIdChecksumExt, - task::{AuthenticationToken, VdafInstance, VERIFY_KEY_LENGTH}, + task::{VdafInstance, VERIFY_KEY_LENGTH}, test_util::{dummy_vdaf, install_test_trace_subscriber, run_vdaf}, time::{Clock, DurationExt, IntervalExt, MockClock, TimeExt}, }; @@ -1442,7 +1443,7 @@ mod tests { VdafInstance::Prio3Count, Role::Helper, ) - .with_aggregator_auth_token(Some(dap_auth_token.clone())) + .with_aggregator_auth_token(dap_auth_token.clone()) .build(); datastore.put_task(&task).await.unwrap(); diff --git a/aggregator/src/bin/aggregator.rs b/aggregator/src/bin/aggregator.rs index c4428b214..828696431 100644 --- a/aggregator/src/bin/aggregator.rs +++ b/aggregator/src/bin/aggregator.rs @@ -11,7 +11,7 @@ use janus_aggregator::{ }; use janus_aggregator_api::{self, aggregator_api_handler}; use janus_aggregator_core::datastore::Datastore; -use janus_core::{task::AuthenticationToken, time::RealClock}; +use janus_core::{auth_tokens::AuthenticationToken, time::RealClock}; use serde::{de, Deserialize, Deserializer, Serialize}; use std::{ future::{ready, Future}, diff --git a/aggregator_api/src/lib.rs b/aggregator_api/src/lib.rs index f69827ab4..dd11a72ee 100644 --- a/aggregator_api/src/lib.rs +++ b/aggregator_api/src/lib.rs @@ -9,7 +9,7 @@ use janus_aggregator_core::{ datastore::{self, Datastore}, instrumented, }; -use janus_core::{hpke, http::extract_bearer_token, task::AuthenticationToken, time::Clock}; +use janus_core::{auth_tokens::AuthenticationToken, hpke, http::extract_bearer_token, time::Clock}; use janus_messages::{HpkeConfigId, RoleParseError, TaskId}; use routes::*; use std::{str::FromStr, sync::Arc}; diff --git a/aggregator_api/src/models.rs b/aggregator_api/src/models.rs index d7e1625af..c3faf14b9 100644 --- a/aggregator_api/src/models.rs +++ b/aggregator_api/src/models.rs @@ -4,7 +4,7 @@ use janus_aggregator_core::{ task::{QueryType, Task}, taskprov::{PeerAggregator, VerifyKeyInit}, }; -use janus_core::task::{AuthenticationToken, VdafInstance}; +use janus_core::{auth_tokens::AuthenticationToken, task::VdafInstance}; use janus_messages::{ query_type::Code as SupportedQueryType, Duration, HpkeAeadId, HpkeConfig, HpkeKdfId, HpkeKemId, Role, TaskId, Time, diff --git a/aggregator_api/src/tests.rs b/aggregator_api/src/tests.rs index baf5ee0a6..6ecfe88e6 100644 --- a/aggregator_api/src/tests.rs +++ b/aggregator_api/src/tests.rs @@ -23,6 +23,7 @@ use janus_aggregator_core::{ SecretBytes, }; use janus_core::{ + auth_tokens::AuthenticationToken, hpke::{ generate_hpke_config_and_private_key, test_util::{ @@ -31,7 +32,7 @@ use janus_core::{ }, HpkeKeypair, HpkePrivateKey, }, - task::{AuthenticationToken, VdafInstance}, + task::VdafInstance, test_util::{ dummy_vdaf::{self, AggregationParam}, install_test_trace_subscriber, diff --git a/aggregator_core/src/datastore.rs b/aggregator_core/src/datastore.rs index 25b9eecf5..315727a6b 100644 --- a/aggregator_core/src/datastore.rs +++ b/aggregator_core/src/datastore.rs @@ -16,8 +16,9 @@ use crate::{ use chrono::NaiveDateTime; use futures::future::try_join_all; use janus_core::{ + auth_tokens::AuthenticationToken, hpke::{HpkeKeypair, HpkePrivateKey}, - task::{AuthenticationToken, VdafInstance}, + task::VdafInstance, time::{Clock, TimeExt}, }; use janus_messages::{ diff --git a/aggregator_core/src/datastore/models.rs b/aggregator_core/src/datastore/models.rs index 21994c369..f31f0e703 100644 --- a/aggregator_core/src/datastore/models.rs +++ b/aggregator_core/src/datastore/models.rs @@ -5,9 +5,10 @@ use base64::{display::Base64Display, engine::general_purpose::URL_SAFE_NO_PAD}; use chrono::NaiveDateTime; use derivative::Derivative; use janus_core::{ + auth_tokens::AuthenticationToken, hpke::HpkeKeypair, report_id::ReportIdChecksumExt, - task::{AuthenticationToken, VdafInstance}, + task::VdafInstance, time::{DurationExt, IntervalExt, TimeExt}, }; use janus_messages::{ diff --git a/aggregator_core/src/task.rs b/aggregator_core/src/task.rs index ff2230c33..07df2bd42 100644 --- a/aggregator_core/src/task.rs +++ b/aggregator_core/src/task.rs @@ -4,8 +4,9 @@ use crate::SecretBytes; use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; use derivative::Derivative; use janus_core::{ + auth_tokens::{AuthenticationToken, AuthenticationTokenHash}, hpke::{generate_hpke_config_and_private_key, HpkeKeypair}, - task::{url_ensure_trailing_slash, AuthenticationToken, VdafInstance}, + task::{url_ensure_trailing_slash, VdafInstance}, time::TimeExt, }; use janus_messages::{ @@ -129,8 +130,11 @@ pub struct Task { tolerable_clock_skew: Duration, /// HPKE configuration for the collector. collector_hpke_config: Option, - /// Token used to authenticate messages sent to or received from the other aggregator. Only set - /// if the task was not created via taskprov. + /// Token hash used to authenticate aggregation protocol messages received from the leader. Only + /// set if this aggregator is the leader and the task was not created via taskprov. + aggregator_auth_token_hash: Option, + /// Token used to authenticate messages sent to the helper. Only set if this aggregator is the + /// leader and if the task was not created via taskprov. aggregator_auth_token: Option, /// Token used to authenticate messages sent to or received from the collector. Only set if this /// aggregator is the leader. @@ -157,6 +161,7 @@ impl Task { time_precision: Duration, tolerable_clock_skew: Duration, collector_hpke_config: HpkeConfig, + aggregator_auth_token_hash: Option, aggregator_auth_token: Option, collector_auth_token: Option, hpke_keys: I, @@ -176,6 +181,7 @@ impl Task { time_precision, tolerable_clock_skew, Some(collector_hpke_config), + aggregator_auth_token_hash, aggregator_auth_token, collector_auth_token, hpke_keys, @@ -202,6 +208,7 @@ impl Task { time_precision: Duration, tolerable_clock_skew: Duration, collector_hpke_config: Option, + aggregator_auth_token_hash: Option, aggregator_auth_token: Option, collector_auth_token: Option, hpke_keys: I, @@ -230,6 +237,7 @@ impl Task { time_precision, tolerable_clock_skew, collector_hpke_config, + aggregator_auth_token_hash, aggregator_auth_token, collector_auth_token, hpke_keys, @@ -266,11 +274,17 @@ impl Task { pub(crate) fn validate(&self) -> Result<(), Error> { self.validate_common()?; - if self.aggregator_auth_token.is_none() { + + // Aggregator auth token is allowed and required iff this task is in the leader role + if (self.role == Role::Leader) == (self.aggregator_auth_token.is_none()) { return Err(Error::InvalidParameter("aggregator_auth_token")); } + // Aggregator auth token hash is allowed and required iff this task is in the helper role + if (self.role == Role::Helper) == (self.aggregator_auth_token_hash.is_none()) { + return Err(Error::InvalidParameter("aggregator_auth_token_hash")); + } if (self.role == Role::Leader) == (self.collector_auth_token.is_none()) { - // Collector auth tokens are allowed & required if and only if this task is in the + // Collector auth token is allowed & required if and only if this task is in the // leader role. return Err(Error::InvalidParameter("collector_auth_token")); } @@ -359,7 +373,14 @@ impl Task { self.collector_hpke_config.as_ref() } - /// Retrieves the aggregator authentication token associated with this task. + /// Retrieves the aggregator authentication token hash associatd with this task for the helper, + /// or `None` for the leader. + pub fn aggregator_auth_token_hash(&self) -> Option<&AuthenticationTokenHash> { + self.aggregator_auth_token_hash.as_ref() + } + + /// Retrieves the aggregator authentication token associated with this task for the leader, or + /// `None` for the helper. pub fn aggregator_auth_token(&self) -> Option<&AuthenticationToken> { self.aggregator_auth_token.as_ref() } @@ -399,15 +420,6 @@ impl Task { } } - /// Checks if the given aggregator authentication token is valid (i.e. matches with an - /// authentication token recognized by this task). - pub fn check_aggregator_auth_token(&self, auth_token: &AuthenticationToken) -> bool { - match self.aggregator_auth_token { - Some(ref t) => t == auth_token, - None => false, - } - } - /// Checks if the given collector authentication token is valid (i.e. matches with an /// authentication token recognized by this task). pub fn check_collector_auth_token(&self, auth_token: &AuthenticationToken) -> bool { @@ -484,6 +496,7 @@ pub struct SerializedTask { time_precision: Duration, tolerable_clock_skew: Duration, collector_hpke_config: HpkeConfig, + aggregator_auth_token_hash: Option, aggregator_auth_token: Option, collector_auth_token: Option, hpke_keys: Vec, // uses unpadded base64url @@ -500,7 +513,6 @@ impl SerializedTask { /// /// - Task ID /// - VDAF verify keys (only one key is generated) - /// - Aggregator authentication tokens (only one token is generated) /// - Collector authentication tokens (only one token is generated and only if the task's role /// is leader) /// - The aggregator's HPKE keypair (only one keypair is generated) @@ -521,10 +533,6 @@ impl SerializedTask { self.vdaf_verify_key = Some(URL_SAFE_NO_PAD.encode(vdaf_verify_key.as_ref())); } - if self.aggregator_auth_token.is_none() { - self.aggregator_auth_token = Some(random()); - } - if self.collector_auth_token.is_none() && self.role == Role::Leader { self.collector_auth_token = Some(random()); } @@ -566,6 +574,7 @@ impl Serialize for Task { .collector_hpke_config() .expect("serializable tasks must have collector_hpke_config") .clone(), + aggregator_auth_token_hash: self.aggregator_auth_token_hash.clone(), aggregator_auth_token: self.aggregator_auth_token.clone(), collector_auth_token: self.collector_auth_token.clone(), hpke_keys, @@ -603,6 +612,7 @@ impl TryFrom for Task { serialized_task.time_precision, serialized_task.tolerable_clock_skew, serialized_task.collector_hpke_config, + serialized_task.aggregator_auth_token_hash, serialized_task.aggregator_auth_token, serialized_task.collector_auth_token, serialized_task.hpke_keys, @@ -626,8 +636,9 @@ pub mod test_util { SecretBytes, }; use janus_core::{ + auth_tokens::{AuthenticationToken, AuthenticationTokenHash}, hpke::{test_util::generate_test_hpke_config_and_private_key, HpkeKeypair}, - task::{AuthenticationToken, VdafInstance, VERIFY_KEY_LENGTH}, + task::{VdafInstance, VERIFY_KEY_LENGTH}, time::DurationExt, }; use janus_messages::{Duration, HpkeConfig, HpkeConfigId, Role, TaskId, Time}; @@ -649,7 +660,13 @@ pub mod test_util { /// TaskBuilder is a testing utility allowing tasks to be built based on a template. #[derive(Clone)] - pub struct TaskBuilder(Task); + pub struct TaskBuilder { + /// The task being built. + task: Task, + /// The aggregator auth token for the task. In the case where the task's role is helper, the + /// `Task` will only store an `AuthenticationTokenHash`, so we store the actual token here. + aggregator_auth_token: AuthenticationToken, + } impl TaskBuilder { /// Create a [`TaskBuilder`] from the provided values, with arbitrary values for the other @@ -678,14 +695,26 @@ pub mod test_util { .collect(), ); + // Create an AuthenticationToken::Bearer by default + let aggregator_auth_token: AuthenticationToken = random(); + let (task_aggregator_auth_token, task_aggregator_auth_token_hash) = match role { + Role::Leader => (Some(aggregator_auth_token.clone()), None), + Role::Helper => ( + None, + Some(AuthenticationTokenHash::from(&aggregator_auth_token)), + ), + _ => panic!("illegal role in task"), + }; + let collector_auth_token = if role == Role::Leader { - Some(random()) // Create an AuthenticationToken::Bearer by default + // Create an AuthenticationToken::Bearer by default + Some(random()) } else { None }; - Self( - Task::new( + Self { + task: Task::new( task_id, "https://leader.endpoint".parse().unwrap(), "https://helper.endpoint".parse().unwrap(), @@ -700,108 +729,155 @@ pub mod test_util { Duration::from_hours(8).unwrap(), Duration::from_minutes(10).unwrap(), generate_test_hpke_config_and_private_key().config().clone(), - Some(random()), // Create an AuthenticationToken::Bearer by default + task_aggregator_auth_token_hash, + task_aggregator_auth_token, collector_auth_token, Vec::from([aggregator_keypair_0, aggregator_keypair_1]), ) .unwrap(), - ) + aggregator_auth_token, + } } /// Gets the leader aggregator endpoint for the eventual task. pub fn leader_aggregator_endpoint(&self) -> &Url { - self.0.leader_aggregator_endpoint() + self.task.leader_aggregator_endpoint() } /// Gets the helper aggregator endpoint for the eventual task. pub fn helper_aggregator_endpoint(&self) -> &Url { - self.0.helper_aggregator_endpoint() + self.task.helper_aggregator_endpoint() + } + + /// Gets the aggregator auth token for the eventual task. + pub fn aggregator_auth_token(&self) -> &AuthenticationToken { + &self.aggregator_auth_token } /// Associates the eventual task with the given task ID. pub fn with_id(self, task_id: TaskId) -> Self { - Self(Task { task_id, ..self.0 }) + Self { + task: Task { + task_id, + ..self.task + }, + ..self + } } /// Associates the eventual task with the given aggregator endpoint for the Leader. pub fn with_leader_aggregator_endpoint(self, leader_aggregator_endpoint: Url) -> Self { - Self(Task { - leader_aggregator_endpoint, - ..self.0 - }) + Self { + task: Task { + leader_aggregator_endpoint, + ..self.task + }, + ..self + } } /// Associates the eventual task with the given aggregator endpoint for the Helper. pub fn with_helper_aggregator_endpoint(self, helper_aggregator_endpoint: Url) -> Self { - Self(Task { - helper_aggregator_endpoint, - ..self.0 - }) + Self { + task: Task { + helper_aggregator_endpoint, + ..self.task + }, + ..self + } } /// Associates the eventual task with the given aggregator role. pub fn with_role(self, role: Role) -> Self { - Self(Task { role, ..self.0 }) + Self { + task: Task { role, ..self.task }, + ..self + } } /// Associates the eventual task with the given VDAF verification key. pub fn with_vdaf_verify_key(self, vdaf_verify_key: SecretBytes) -> Self { - Self(Task { - vdaf_verify_key, - ..self.0 - }) + Self { + task: Task { + vdaf_verify_key, + ..self.task + }, + ..self + } } /// Associates the eventual task with the given max batch query count parameter. pub fn with_max_batch_query_count(self, max_batch_query_count: u64) -> Self { - Self(Task { - max_batch_query_count, - ..self.0 - }) + Self { + task: Task { + max_batch_query_count, + ..self.task + }, + ..self + } } /// Associates the eventual task with the given min batch size parameter. pub fn with_min_batch_size(self, min_batch_size: u64) -> Self { - Self(Task { - min_batch_size, - ..self.0 - }) + Self { + task: Task { + min_batch_size, + ..self.task + }, + ..self + } } /// Associates the eventual task with the given time precision parameter. pub fn with_time_precision(self, time_precision: Duration) -> Self { - Self(Task { - time_precision, - ..self.0 - }) + Self { + task: Task { + time_precision, + ..self.task + }, + ..self + } } /// Associates the eventual task with the given collector HPKE config. pub fn with_collector_hpke_config(self, collector_hpke_config: HpkeConfig) -> Self { - Self(Task { - collector_hpke_config: Some(collector_hpke_config), - ..self.0 - }) + Self { + task: Task { + collector_hpke_config: Some(collector_hpke_config), + ..self.task + }, + ..self + } } /// Associates the eventual task with the given aggregator authentication token. pub fn with_aggregator_auth_token( self, - aggregator_auth_token: Option, + aggregator_auth_token: AuthenticationToken, ) -> Self { - Self(Task { + let task = match self.task.role { + Role::Leader => Task { + aggregator_auth_token: Some(aggregator_auth_token.clone()), + ..self.task + }, + Role::Helper => Task { + aggregator_auth_token_hash: Some(AuthenticationTokenHash::from( + &aggregator_auth_token, + )), + ..self.task + }, + _ => panic!("illegal role"), + }; + Self { + task, aggregator_auth_token, - ..self.0 - }) + } } /// Associates the eventual task with a random [`AuthenticationToken::DapAuth`] aggregator /// auth token. pub fn with_dap_auth_aggregator_token(self) -> Self { - Self(Task { - aggregator_auth_token: Some(AuthenticationToken::DapAuth(random())), - ..self.0 - }) + self.with_aggregator_auth_token(AuthenticationToken::DapAuth(random())) } /// Associates the eventual task with the given collector authentication token. @@ -809,35 +885,41 @@ pub mod test_util { self, collector_auth_token: Option, ) -> Self { - Self(Task { - collector_auth_token, - ..self.0 - }) + Self { + task: Task { + collector_auth_token, + ..self.task + }, + ..self + } } /// Associates the eventual task with a random [`AuthenticationToken::DapAuth`] collector /// auth token. pub fn with_dap_auth_collector_token(self) -> Self { - Self(Task { - collector_auth_token: Some(AuthenticationToken::DapAuth(random())), - ..self.0 - }) + self.with_collector_auth_token(Some(AuthenticationToken::DapAuth(random()))) } /// Sets the task expiration time. pub fn with_task_expiration(self, task_expiration: Option