diff --git a/aggregator/src/aggregator.rs b/aggregator/src/aggregator.rs index b83eb7adf..041bac671 100644 --- a/aggregator/src/aggregator.rs +++ b/aggregator/src/aggregator.rs @@ -8,7 +8,7 @@ use crate::{ query_type::{CollectableQueryType, UploadableQueryType}, report_writer::{ReportWriteBatcher, WritableReport}, }, - cache::{GlobalHpkeKeypairCache, PeerAggregatorCache}, + cache::GlobalHpkeKeypairCache, Operation, }; use bytes::Bytes; @@ -27,10 +27,10 @@ use janus_aggregator_core::{ }, query_type::AccumulableQueryType, task::{self, Task, VerifyKey}, - taskprov::{self, PeerAggregator}, + taskprov::{self, VerifyKeyInit}, }; #[cfg(feature = "test-util")] -use janus_core::test_util::dummy_vdaf; +use janus_core::{hpke::generate_hpke_config_and_private_key, test_util::dummy_vdaf}; use janus_core::{ hpke::{self, aggregate_share_aad, input_share_aad, HpkeApplicationInfo, HpkeKeypair, Label}, http::response_to_problem_details, @@ -46,6 +46,8 @@ use janus_messages::{ Duration, HpkeCiphertext, HpkeConfig, Interval, PartialBatchSelector, PrepareStep, PrepareStepResult, Report, ReportIdChecksum, ReportShare, ReportShareError, Role, TaskId, }; +#[cfg(feature = "test-util")] +use janus_messages::{HpkeAeadId, HpkeConfigId, HpkeKdfId, HpkeKemId}; use opentelemetry::{ metrics::{Counter, Histogram, Meter, Unit}, KeyValue, @@ -157,13 +159,10 @@ pub struct Aggregator { /// Cache of global HPKE keypairs and configs. global_hpke_keypairs: GlobalHpkeKeypairCache, - - /// Cache of taskprov peer aggregators. - peer_aggregators: PeerAggregatorCache, } /// Config represents a configuration for an Aggregator. -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct Config { /// Defines the maximum size of a batch of uploaded reports which will be written in a single /// transaction. @@ -182,8 +181,26 @@ pub struct Config { /// Defines how often to refresh the global HPKE configs cache. This affects how often an aggregator /// becomes aware of key state changes. pub global_hpke_configs_refresh_interval: StdDuration, + + /// Collection results will be encrypted to this public key. + pub collector_hpke_config: HpkeConfig, + + /// New tasks will have this report expiration age. + pub report_expiry_age: Option, + + /// New tasks will have this tolerable clock skew. + pub tolerable_clock_skew: Duration, + + /// Defines the key used to derive the VDAF verify key for new tasks. + pub verify_key_init: VerifyKeyInit, + + /// Authentication tokens used for requests from the leader. + pub auth_tokens: Vec, } +// subscriber-01 only: the config now has mandatory fields, so default only makes sense as a helper +// impl inside unit tests. +#[cfg(feature = "test-util")] impl Default for Config { fn default() -> Self { Self { @@ -191,6 +208,18 @@ impl Default for Config { max_upload_batch_write_delay: StdDuration::ZERO, batch_aggregation_shard_count: 1, global_hpke_configs_refresh_interval: GlobalHpkeKeypairCache::DEFAULT_REFRESH_INTERVAL, + collector_hpke_config: generate_hpke_config_and_private_key( + HpkeConfigId::from(1), + HpkeKemId::X25519HkdfSha256, + HpkeKdfId::HkdfSha256, + HpkeAeadId::Aes128Gcm, + ) + .config() + .clone(), + report_expiry_age: None, + tolerable_clock_skew: Duration::from_minutes(60).unwrap(), + verify_key_init: random(), + auth_tokens: Vec::new(), } } } @@ -231,8 +260,6 @@ impl Aggregator { ) .await?; - let peer_aggregators = PeerAggregatorCache::new(&datastore).await?; - Ok(Self { datastore, clock, @@ -243,7 +270,6 @@ impl Aggregator { upload_decode_failure_counter, aggregate_step_failure_counter, global_hpke_keypairs, - peer_aggregators, }) } @@ -478,16 +504,14 @@ impl Aggregator { // Authorize the request and retrieve the collector's HPKE config. If this is a taskprov task, we // have to use the peer aggregator's collector config rather than the main task. let collector_hpke_config = if taskprov_task_config.is_some() { - let (peer_aggregator, _) = self - .taskprov_authorize_request( - &Role::Leader, - task_id, - taskprov_task_config.unwrap(), - auth_token.as_ref(), - ) - .await?; - - peer_aggregator.collector_hpke_config() + self.taskprov_authorize_request( + &Role::Leader, + task_id, + taskprov_task_config.unwrap(), + auth_token.as_ref(), + ) + .await?; + &self.cfg.collector_hpke_config } else { if !auth_token .map(|t| task_aggregator.task.check_aggregator_auth_token(&t)) @@ -561,7 +585,7 @@ impl Aggregator { } /// Opts in or out of a taskprov task. - #[tracing::instrument(skip(self, aggregator_auth_token), err)] + #[tracing::instrument(skip(self), err)] async fn taskprov_opt_in( &self, peer_role: &Role, @@ -569,10 +593,21 @@ impl Aggregator { task_config: &TaskConfig, aggregator_auth_token: Option<&AuthenticationToken>, ) -> Result<(), Error> { - let (peer_aggregator, aggregator_urls) = self - .taskprov_authorize_request(peer_role, task_id, task_config, aggregator_auth_token) + self.taskprov_authorize_request(peer_role, task_id, task_config, aggregator_auth_token) .await?; + let aggregator_urls = task_config + .aggregator_endpoints() + .iter() + .map(|url| url.try_into()) + .collect::, _>>()?; + if aggregator_urls.len() < 2 { + return Err(Error::UnrecognizedMessage( + Some(*task_id), + "taskprov configuration is missing one or both aggregators", + )); + } + // TODO(#1647): Check whether task config parameters are acceptable for privacy and // availability of the system. @@ -595,8 +630,10 @@ impl Aggregator { } }; - let vdaf_verify_keys = - Vec::from([peer_aggregator.derive_vdaf_verify_key(task_id, &vdaf_instance)]); + let vdaf_verify_keys = Vec::from([self + .cfg + .verify_key_init + .derive_vdaf_verify_key(task_id, &vdaf_instance)]); let task = taskprov::Task::new( *task_id, @@ -607,10 +644,10 @@ impl Aggregator { vdaf_verify_keys, task_config.query_config().max_batch_query_count() as u64, Some(*task_config.task_expiration()), - peer_aggregator.report_expiry_age().cloned(), + self.cfg.report_expiry_age, task_config.query_config().min_batch_size() as u64, *task_config.query_config().time_precision(), - *peer_aggregator.tolerable_clock_skew(), + self.cfg.tolerable_clock_skew, ) .map_err(|err| Error::InvalidTask(*task_id, OptOutReason::TaskParameters(err)))?; self.datastore @@ -636,45 +673,27 @@ impl Aggregator { } })?; - info!(?task, ?peer_aggregator, "taskprov: opted into new task"); + info!(?task, "taskprov: opted into new task"); Ok(()) } /// Validate and authorize a taskprov request. Returns values necessary for determining whether /// we can opt into the task. This function might return an opt-out error for conditions that /// are relevant for all DAP workflows (e.g. task expiration). - #[tracing::instrument(skip(self, aggregator_auth_token), err)] + #[tracing::instrument(skip(self), err)] async fn taskprov_authorize_request( &self, peer_role: &Role, task_id: &TaskId, task_config: &TaskConfig, aggregator_auth_token: Option<&AuthenticationToken>, - ) -> Result<(&PeerAggregator, Vec), Error> { - let aggregator_urls = task_config - .aggregator_endpoints() + ) -> Result<(), Error> { + let request_token = aggregator_auth_token.ok_or(Error::UnauthorizedRequest(*task_id))?; + if !self + .cfg + .auth_tokens .iter() - .map(|url| url.try_into()) - .collect::, _>>()?; - if aggregator_urls.len() < 2 { - return Err(Error::UnrecognizedMessage( - Some(*task_id), - "taskprov configuration is missing one or both aggregators", - )); - } - let peer_aggregator_url = &aggregator_urls[peer_role.index().unwrap()]; - - let peer_aggregator = self - .peer_aggregators - .get(peer_aggregator_url, peer_role) - .ok_or(Error::InvalidTask( - *task_id, - OptOutReason::NoSuchPeer(*peer_role), - ))?; - - if !aggregator_auth_token - .map(|t| peer_aggregator.check_aggregator_auth_token(t)) - .unwrap_or(false) + .any(|token| token == request_token) { return Err(Error::UnauthorizedRequest(*task_id)); } @@ -683,13 +702,8 @@ impl Aggregator { return Err(Error::InvalidTask(*task_id, OptOutReason::TaskExpired)); } - debug!( - ?task_id, - ?task_config, - ?peer_aggregator, - "taskprov: authorized request" - ); - Ok((peer_aggregator, aggregator_urls)) + debug!(?task_id, ?task_config, "taskprov: authorized request"); + Ok(()) } #[cfg(feature = "test-util")] diff --git a/aggregator/src/aggregator/taskprov_tests.rs b/aggregator/src/aggregator/taskprov_tests.rs index f46231195..17fd6ad65 100644 --- a/aggregator/src/aggregator/taskprov_tests.rs +++ b/aggregator/src/aggregator/taskprov_tests.rs @@ -18,7 +18,6 @@ use janus_aggregator_core::{ Datastore, }, task::{QueryType, Task}, - taskprov::{test_util::PeerAggregatorBuilder, PeerAggregator}, test_util::noop_meter, }; use janus_core::{ @@ -27,7 +26,7 @@ use janus_core::{ HpkeApplicationInfo, HpkeKeypair, Label, }, report_id::ReportIdChecksumExt, - task::PRIO3_VERIFY_KEY_LENGTH, + task::{AuthenticationToken, PRIO3_VERIFY_KEY_LENGTH}, taskprov::TASKPROV_HEADER, test_util::{install_test_trace_subscriber, run_vdaf, VdafTranscript}, time::{Clock, DurationExt, MockClock, TimeExt}, @@ -66,13 +65,13 @@ pub struct TaskprovTestCase { collector_hpke_keypair: HpkeKeypair, datastore: Arc>, handler: Box, - peer_aggregator: PeerAggregator, report_metadata: ReportMetadata, transcript: VdafTranscript<16, TestVdaf>, report_share: ReportShare, task: Task, task_config: TaskConfig, task_id: TaskId, + aggregator_auth_token: AuthenticationToken, } async fn setup_taskprov_test() -> TaskprovTestCase { @@ -84,32 +83,33 @@ async fn setup_taskprov_test() -> TaskprovTestCase { let global_hpke_key = generate_test_hpke_config_and_private_key(); let collector_hpke_keypair = generate_test_hpke_config_and_private_key(); - let peer_aggregator = PeerAggregatorBuilder::new() - .with_endpoint(url::Url::parse("https://leader.example.com/").unwrap()) - .with_role(Role::Leader) - .with_collector_hpke_config(collector_hpke_keypair.config().clone()) - .build(); + let aggregator_auth_token: AuthenticationToken = random(); datastore .run_tx(|tx| { let global_hpke_key = global_hpke_key.clone(); - let peer_aggregator = peer_aggregator.clone(); Box::pin(async move { tx.put_global_hpke_keypair(&global_hpke_key).await.unwrap(); - tx.put_taskprov_peer_aggregator(&peer_aggregator) - .await - .unwrap(); Ok(()) }) }) .await .unwrap(); + let tolerable_clock_skew = Duration::from_seconds(60); + let config = Config { + collector_hpke_config: collector_hpke_keypair.config().clone(), + verify_key_init: random(), + auth_tokens: vec![aggregator_auth_token.clone()], + tolerable_clock_skew, + ..Default::default() + }; + let handler = aggregator_handler( Arc::clone(&datastore), clock.clone(), &noop_meter(), - Config::default(), + config.clone(), ) .await .unwrap(); @@ -144,7 +144,9 @@ async fn setup_taskprov_test() -> TaskprovTestCase { let task_id = TaskId::try_from(digest(&SHA256, &task_config_encoded).as_ref()).unwrap(); let vdaf_instance = task_config.vdaf_config().vdaf_type().try_into().unwrap(); - let vdaf_verify_key = peer_aggregator.derive_vdaf_verify_key(&task_id, &vdaf_instance); + let vdaf_verify_key = config + .verify_key_init + .derive_vdaf_verify_key(&task_id, &vdaf_instance); let task = janus_aggregator_core::taskprov::Task::new( task_id, @@ -161,10 +163,10 @@ async fn setup_taskprov_test() -> TaskprovTestCase { Vec::from([vdaf_verify_key.clone()]), max_batch_query_count as u64, Some(task_expiration), - peer_aggregator.report_expiry_age().copied(), + config.report_expiry_age, min_batch_size as u64, Duration::from_seconds(1), - Duration::from_seconds(1), + tolerable_clock_skew, ) .unwrap(); @@ -197,13 +199,13 @@ async fn setup_taskprov_test() -> TaskprovTestCase { collector_hpke_keypair, datastore, handler: Box::new(handler), - peer_aggregator, task: task.into(), task_config, task_id, report_metadata, transcript, report_share, + aggregator_auth_token, } } @@ -222,10 +224,7 @@ async fn taskprov_aggregate_init() { Vec::from([test.report_share.clone()]), ); - let auth = test - .peer_aggregator - .primary_aggregator_auth_token() - .request_authentication(); + let auth = test.aggregator_auth_token.request_authentication(); let mut test_conn = post(test.task.aggregation_job_uri().unwrap().path()) .with_request_header(auth.0, "Bearer invalid_token") @@ -321,10 +320,7 @@ async fn taskprov_opt_out_task_expired() { Vec::from([test.report_share.clone()]), ); - let auth = test - .peer_aggregator - .primary_aggregator_auth_token() - .request_authentication(); + let auth = test.aggregator_auth_token.request_authentication(); // Advance clock past task expiry. test.clock.advance(&Duration::from_hours(48).unwrap()); @@ -394,10 +390,7 @@ async fn taskprov_opt_out_mismatched_task_id() { ) .unwrap(); - let auth = test - .peer_aggregator - .primary_aggregator_auth_token() - .request_authentication(); + let auth = test.aggregator_auth_token.request_authentication(); let mut test_conn = post( test @@ -474,10 +467,7 @@ async fn taskprov_opt_out_missing_aggregator() { Vec::from([test.report_share.clone()]), ); - let auth = test - .peer_aggregator - .primary_aggregator_auth_token() - .request_authentication(); + let auth = test.aggregator_auth_token.request_authentication(); let mut test_conn = post( test @@ -511,172 +501,6 @@ async fn taskprov_opt_out_missing_aggregator() { ); } -#[tokio::test] -async fn taskprov_opt_out_peer_aggregator_wrong_role() { - let test = setup_taskprov_test().await; - - let batch_id = random(); - let aggregation_job_id: AggregationJobId = random(); - - let task_expiration = test - .clock - .now() - .add(&Duration::from_hours(24).unwrap()) - .unwrap(); - let another_task_config = TaskConfig::new( - Vec::from("foobar".as_bytes()), - // Attempt to configure leader as a helper. - Vec::from([ - "https://helper.example.com/".as_bytes().try_into().unwrap(), - "https://leader.example.com/".as_bytes().try_into().unwrap(), - ]), - QueryConfig::new( - Duration::from_seconds(1), - 100, - 100, - TaskprovQuery::FixedSize { - max_batch_size: 100, - }, - ), - task_expiration, - VdafConfig::new(DpConfig::new(DpMechanism::None), VdafType::Prio3Aes128Count).unwrap(), - ) - .unwrap(); - let another_task_config_encoded = another_task_config.get_encoded(); - let another_task_id: TaskId = digest(&SHA256, &another_task_config_encoded) - .as_ref() - .try_into() - .unwrap(); - - let request = AggregateInitializeReq::new( - another_task_id, - aggregation_job_id, - ().get_encoded(), - PartialBatchSelector::new_fixed_size(batch_id), - Vec::from([test.report_share.clone()]), - ); - - let auth = test - .peer_aggregator - .primary_aggregator_auth_token() - .request_authentication(); - - let mut test_conn = post( - test - // Use the test case task's ID. - .task - .aggregation_job_uri() - .unwrap() - .path(), - ) - .with_request_header(auth.0, auth.1) - .with_request_header( - KnownHeaderName::ContentType, - AggregateInitializeReq::::MEDIA_TYPE, - ) - .with_request_header( - TASKPROV_HEADER, - URL_SAFE_NO_PAD.encode(another_task_config_encoded), - ) - .with_request_body(request.get_encoded()) - .run_async(&test.handler) - .await; - assert_eq!(test_conn.status(), Some(Status::BadRequest)); - assert_eq!( - take_problem_details(&mut test_conn).await, - json!({ - "status": Status::BadRequest as u16, - "type": "urn:ietf:params:ppm:dap:error:invalidTask", - "title": "Aggregator has opted out of the indicated task.", - "taskid": format!("{}", another_task_id - ), - }) - ); -} - -#[tokio::test] -async fn taskprov_opt_out_peer_aggregator_does_not_exist() { - let test = setup_taskprov_test().await; - - let batch_id = random(); - let aggregation_job_id: AggregationJobId = random(); - - let task_expiration = test - .clock - .now() - .add(&Duration::from_hours(24).unwrap()) - .unwrap(); - let another_task_config = TaskConfig::new( - Vec::from("foobar".as_bytes()), - Vec::from([ - // Some non-existent aggregator. - "https://foobar.example.com/".as_bytes().try_into().unwrap(), - "https://leader.example.com/".as_bytes().try_into().unwrap(), - ]), - QueryConfig::new( - Duration::from_seconds(1), - 100, - 100, - TaskprovQuery::FixedSize { - max_batch_size: 100, - }, - ), - task_expiration, - VdafConfig::new(DpConfig::new(DpMechanism::None), VdafType::Prio3Aes128Count).unwrap(), - ) - .unwrap(); - let another_task_config_encoded = another_task_config.get_encoded(); - let another_task_id: TaskId = digest(&SHA256, &another_task_config_encoded) - .as_ref() - .try_into() - .unwrap(); - - let request = AggregateInitializeReq::new( - another_task_id, - aggregation_job_id, - ().get_encoded(), - PartialBatchSelector::new_fixed_size(batch_id), - Vec::from([test.report_share.clone()]), - ); - - let auth = test - .peer_aggregator - .primary_aggregator_auth_token() - .request_authentication(); - - let mut test_conn = post( - test - // Use the test case task's ID. - .task - .aggregation_job_uri() - .unwrap() - .path(), - ) - .with_request_header(auth.0, auth.1) - .with_request_header( - KnownHeaderName::ContentType, - AggregateInitializeReq::::MEDIA_TYPE, - ) - .with_request_header( - TASKPROV_HEADER, - URL_SAFE_NO_PAD.encode(another_task_config_encoded), - ) - .with_request_body(request.get_encoded()) - .run_async(&test.handler) - .await; - assert_eq!(test_conn.status(), Some(Status::BadRequest)); - assert_eq!( - take_problem_details(&mut test_conn).await, - json!({ - "status": Status::BadRequest as u16, - "type": "urn:ietf:params:ppm:dap:error:invalidTask", - "title": "Aggregator has opted out of the indicated task.", - "taskid": format!("{}", another_task_id - ), - }) - ); -} - #[tokio::test] async fn taskprov_aggregate_continue() { let test = setup_taskprov_test().await; @@ -752,10 +576,7 @@ async fn taskprov_aggregate_continue() { )]), ); - let auth = test - .peer_aggregator - .primary_aggregator_auth_token() - .request_authentication(); + let auth = test.aggregator_auth_token.request_authentication(); // Attempt using the wrong credentials, should reject. let mut test_conn = post(test.task.aggregation_job_uri().unwrap().path()) @@ -873,10 +694,7 @@ async fn taskprov_aggregate_share() { ReportIdChecksum::get_decoded(&[3; 32]).unwrap(), ); - let auth = test - .peer_aggregator - .primary_aggregator_auth_token() - .request_authentication(); + let auth = test.aggregator_auth_token.request_authentication(); // Attempt using the wrong credentials, should reject. let mut test_conn = post(test.task.aggregate_shares_uri().unwrap().path()) @@ -942,10 +760,7 @@ async fn taskprov_aggregate_share() { #[tokio::test] async fn end_to_end() { let test = setup_taskprov_test().await; - let (auth_header_name, auth_header_value) = test - .peer_aggregator - .primary_aggregator_auth_token() - .request_authentication(); + let (auth_header_name, auth_header_value) = test.aggregator_auth_token.request_authentication(); let batch_id = random(); let aggregation_job_id = random(); diff --git a/aggregator/src/bin/aggregator.rs b/aggregator/src/bin/aggregator.rs index 2c7b90a90..3fce2208e 100644 --- a/aggregator/src/bin/aggregator.rs +++ b/aggregator/src/bin/aggregator.rs @@ -1,4 +1,5 @@ use anyhow::{Context, Result}; +use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; use clap::Parser; use janus_aggregator::{ aggregator::{self, garbage_collector::GarbageCollector, http_handlers::aggregator_handler}, @@ -7,10 +8,10 @@ use janus_aggregator::{ CommonBinaryOptions, }, cache::GlobalHpkeKeypairCache, - config::{BinaryConfig, CommonConfig}, + config::{BinaryConfig, CommonConfig, TaskprovConfig}, }; use janus_aggregator_api::{self, aggregator_api_handler}; -use janus_aggregator_core::datastore::Datastore; +use janus_aggregator_core::{datastore::Datastore, taskprov::VerifyKeyInit}; use janus_core::{task::AuthenticationToken, time::RealClock}; use serde::{Deserialize, Serialize}; use std::{ @@ -44,12 +45,14 @@ async fn main() -> Result<()> { .response_headers() .context("failed to parse response headers")?; + let auth_tokens = parse_auth_tokens(&options.aggregator_auth_tokens)?; + let mut handlers = ( aggregator_handler( Arc::clone(&datastore), clock, &meter, - config.aggregator_config(), + config.aggregator_config(options.verify_key_init, auth_tokens), ) .await?, None, @@ -148,16 +151,7 @@ fn build_aggregator_api_handler<'a>( let Some(aggregator_api) = &config.aggregator_api else { return Ok(None); }; - let aggregator_api_auth_tokens = options - .aggregator_api_auth_tokens - .iter() - .filter(|token| !token.is_empty()) - .map(|token| { - // Aggregator API auth tokens are always bearer tokens - AuthenticationToken::new_bearer_token_from_string(token) - .context("invalid aggregator API auth token") - }) - .collect::>>()?; + let aggregator_api_auth_tokens = parse_auth_tokens(&options.aggregator_api_auth_tokens)?; Ok(Some(( aggregator_api_handler( @@ -171,6 +165,18 @@ fn build_aggregator_api_handler<'a>( ))) } +fn parse_auth_tokens(auth_tokens: &[String]) -> Result> { + auth_tokens + .iter() + .filter(|token| !token.is_empty()) + .map(|token| { + // Auth tokens taken at the CLI are always Bearer tokens. + AuthenticationToken::new_bearer_token_from_string(token) + .context("invalid bearer auth token") + }) + .collect() +} + #[derive(Debug, Parser)] #[clap( name = "janus-aggregator", @@ -192,6 +198,32 @@ struct Options { help = "aggregator API auth tokens, encoded in base64 then comma-separated" )] aggregator_api_auth_tokens: Vec, + + /// Tokens for authenticating DAP operations. + #[clap( + long, + env = "AGGREGATOR_AUTH_TOKENS", + hide_env_values = true, + required = true, + use_value_delimiter = true, + help = "DAP aggregator tokens, encoded in base64 then comma-separated" + )] + aggregator_auth_tokens: Vec, + + /// Taskprov verify key init, used to derive per-task VDAF verify keys. + #[clap( + long, + value_parser = parse_verify_key_init, + env = "VERIFY_KEY_INIT", + hide_env_values = true, + help = "Taskprov verify key init, encoded in base64" + )] + verify_key_init: VerifyKeyInit, +} + +fn parse_verify_key_init(arg: &str) -> Result { + let result = VerifyKeyInit::try_from(URL_SAFE_NO_PAD.decode(arg)?.as_ref())?; + Ok(result) } impl BinaryOptions for Options { @@ -331,6 +363,8 @@ struct Config { /// specify this. #[serde(default)] global_hpke_configs_refresh_interval: Option, + + taskprov_config: TaskprovConfig, } #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] @@ -364,7 +398,11 @@ impl Config { .collect() } - fn aggregator_config(&self) -> aggregator::Config { + fn aggregator_config( + &self, + verify_key_init: VerifyKeyInit, + auth_tokens: Vec, + ) -> aggregator::Config { aggregator::Config { max_upload_batch_size: self.max_upload_batch_size, max_upload_batch_write_delay: Duration::from_millis( @@ -375,6 +413,11 @@ impl Config { Some(duration) => Duration::from_millis(duration), None => GlobalHpkeKeypairCache::DEFAULT_REFRESH_INTERVAL, }, + collector_hpke_config: self.taskprov_config.collector_hpke_config.clone(), + report_expiry_age: self.taskprov_config.report_expiry_age, + tolerable_clock_skew: self.taskprov_config.tolerable_clock_skew, + verify_key_init, + auth_tokens, } } } @@ -392,19 +435,27 @@ impl BinaryConfig for Config { #[cfg(test)] mod tests { use super::{AggregatorApi, Config, GarbageCollectorConfig, HeaderEntry, Options}; + use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; use clap::CommandFactory; use janus_aggregator::{ aggregator, config::{ test_util::{generate_db_config, generate_metrics_config, generate_trace_config}, - BinaryConfig, CommonConfig, + BinaryConfig, CommonConfig, TaskprovConfig, }, metrics::{MetricsExporterConfiguration, OtlpExporterConfiguration}, trace::{ OpenTelemetryTraceConfiguration, OtlpTraceConfiguration, TokioConsoleConfiguration, }, }; - use janus_core::test_util::roundtrip_encoding; + use janus_aggregator_core::taskprov::VerifyKeyInit; + use janus_core::{ + hpke::test_util::generate_test_hpke_config_and_private_key, test_util::roundtrip_encoding, + }; + use janus_messages::{ + HpkeAeadId, HpkeConfig, HpkeConfigId, HpkeKdfId, HpkeKemId, HpkePublicKey, + }; + use rand::random; use std::{ collections::HashMap, net::{IpAddr, Ipv4Addr, SocketAddr}, @@ -450,6 +501,11 @@ mod tests { max_upload_batch_write_delay_ms: 250, batch_aggregation_shard_count: 32, global_hpke_configs_refresh_interval: None, + taskprov_config: TaskprovConfig { + collector_hpke_config: generate_test_hpke_config_and_private_key().config().clone(), + report_expiry_age: None, + tolerable_clock_skew: janus_messages::Duration::from_seconds(60), + }, }) } @@ -465,6 +521,14 @@ mod tests { max_upload_batch_size: 100 max_upload_batch_write_delay_ms: 250 batch_aggregation_shard_count: 32 + taskprov_config: + collector_hpke_config: + id: 183 + kem_id: X25519HkdfSha256 + kdf_id: HkdfSha256 + aead_id: Aes128Gcm + public_key: 4qiv6IY5jrjCV3xbaQXULmPIpvoIml1oJmeXm-yOuAo + tolerable_clock_skew: 60 "# ) .unwrap() @@ -485,6 +549,14 @@ mod tests { max_upload_batch_size: 100 max_upload_batch_write_delay_ms: 250 batch_aggregation_shard_count: 32 + taskprov_config: + collector_hpke_config: + id: 183 + kem_id: X25519HkdfSha256 + kdf_id: HkdfSha256 + aead_id: Aes128Gcm + public_key: 4qiv6IY5jrjCV3xbaQXULmPIpvoIml1oJmeXm-yOuAo + tolerable_clock_skew: 60 garbage_collection: gc_frequency_s: 60 report_limit: 25 @@ -515,6 +587,14 @@ mod tests { max_upload_batch_size: 100 max_upload_batch_write_delay_ms: 250 batch_aggregation_shard_count: 32 + taskprov_config: + collector_hpke_config: + id: 183 + kem_id: X25519HkdfSha256 + kdf_id: HkdfSha256 + aead_id: Aes128Gcm + public_key: 4qiv6IY5jrjCV3xbaQXULmPIpvoIml1oJmeXm-yOuAo + tolerable_clock_skew: 60 aggregator_api: listen_address: "0.0.0.0:8081" public_dap_url: "https://dap.url" @@ -541,6 +621,14 @@ mod tests { max_upload_batch_size: 100 max_upload_batch_write_delay_ms: 250 batch_aggregation_shard_count: 32 + taskprov_config: + collector_hpke_config: + id: 183 + kem_id: X25519HkdfSha256 + kdf_id: HkdfSha256 + aead_id: Aes128Gcm + public_key: 4qiv6IY5jrjCV3xbaQXULmPIpvoIml1oJmeXm-yOuAo + tolerable_clock_skew: 60 aggregator_api: path_prefix: "aggregator-api" public_dap_url: "https://dap.url" @@ -574,6 +662,14 @@ mod tests { max_upload_batch_size: 100 max_upload_batch_write_delay_ms: 250 batch_aggregation_shard_count: 32 + taskprov_config: + collector_hpke_config: + id: 183 + kem_id: X25519HkdfSha256 + kdf_id: HkdfSha256 + aead_id: Aes128Gcm + public_key: 4qiv6IY5jrjCV3xbaQXULmPIpvoIml1oJmeXm-yOuAo + tolerable_clock_skew: 60 "# ) .unwrap() @@ -589,6 +685,7 @@ mod tests { }, ); + let verify_key_init: VerifyKeyInit = random(); assert_eq!( serde_yaml::from_str::( r#"--- @@ -604,14 +701,36 @@ mod tests { max_upload_batch_size: 100 max_upload_batch_write_delay_ms: 250 batch_aggregation_shard_count: 32 + taskprov_config: + collector_hpke_config: + id: 183 + kem_id: X25519HkdfSha256 + kdf_id: HkdfSha256 + aead_id: Aes128Gcm + public_key: 4qiv6IY5jrjCV3xbaQXULmPIpvoIml1oJmeXm-yOuAo + tolerable_clock_skew: 60 "# ) .unwrap() - .aggregator_config(), + .aggregator_config(verify_key_init, Vec::new()), aggregator::Config { max_upload_batch_size: 100, max_upload_batch_write_delay: Duration::from_millis(250), batch_aggregation_shard_count: 32, + collector_hpke_config: HpkeConfig::new( + HpkeConfigId::from(183), + HpkeKemId::X25519HkdfSha256, + HpkeKdfId::HkdfSha256, + HpkeAeadId::Aes128Gcm, + HpkePublicKey::from( + URL_SAFE_NO_PAD + .decode("4qiv6IY5jrjCV3xbaQXULmPIpvoIml1oJmeXm-yOuAo") + .unwrap() + ), + ), + report_expiry_age: None, + tolerable_clock_skew: janus_messages::Duration::from_seconds(60), + verify_key_init, ..Default::default() } ); @@ -631,6 +750,14 @@ mod tests { max_upload_batch_size: 100 max_upload_batch_write_delay_ms: 250 batch_aggregation_shard_count: 32 + taskprov_config: + collector_hpke_config: + id: 183 + kem_id: X25519HkdfSha256 + kdf_id: HkdfSha256 + aead_id: Aes128Gcm + public_key: 4qiv6IY5jrjCV3xbaQXULmPIpvoIml1oJmeXm-yOuAo + tolerable_clock_skew: 60 "# ) .unwrap() @@ -662,6 +789,14 @@ mod tests { max_upload_batch_size: 100 max_upload_batch_write_delay_ms: 250 batch_aggregation_shard_count: 32 + taskprov_config: + collector_hpke_config: + id: 183 + kem_id: X25519HkdfSha256 + kdf_id: HkdfSha256 + aead_id: Aes128Gcm + public_key: 4qiv6IY5jrjCV3xbaQXULmPIpvoIml1oJmeXm-yOuAo + tolerable_clock_skew: 60 "# ) .unwrap() @@ -695,6 +830,14 @@ mod tests { max_upload_batch_size: 100 max_upload_batch_write_delay_ms: 250 batch_aggregation_shard_count: 32 + taskprov_config: + collector_hpke_config: + id: 183 + kem_id: X25519HkdfSha256 + kdf_id: HkdfSha256 + aead_id: Aes128Gcm + public_key: 4qiv6IY5jrjCV3xbaQXULmPIpvoIml1oJmeXm-yOuAo + tolerable_clock_skew: 60 "# ) .unwrap() @@ -725,6 +868,14 @@ mod tests { max_upload_batch_size: 100 max_upload_batch_write_delay_ms: 250 batch_aggregation_shard_count: 32 + taskprov_config: + collector_hpke_config: + id: 183 + kem_id: X25519HkdfSha256 + kdf_id: HkdfSha256 + aead_id: Aes128Gcm + public_key: 4qiv6IY5jrjCV3xbaQXULmPIpvoIml1oJmeXm-yOuAo + tolerable_clock_skew: 60 "# ) .unwrap() diff --git a/aggregator/src/cache.rs b/aggregator/src/cache.rs index c4cd4c5da..bdeac444c 100644 --- a/aggregator/src/cache.rs +++ b/aggregator/src/cache.rs @@ -1,12 +1,9 @@ //! Various in-memory caches that can be used by an aggregator. use crate::aggregator::Error; -use janus_aggregator_core::{ - datastore::{models::HpkeKeyState, Datastore}, - taskprov::PeerAggregator, -}; +use janus_aggregator_core::datastore::{models::HpkeKeyState, Datastore}; use janus_core::{hpke::HpkeKeypair, time::Clock}; -use janus_messages::{HpkeConfig, HpkeConfigId, Role}; +use janus_messages::{HpkeConfig, HpkeConfigId}; use std::{ collections::HashMap, fmt::Debug, @@ -15,7 +12,6 @@ use std::{ }; use tokio::{spawn, task::JoinHandle, time::sleep}; use tracing::{debug, error}; -use url::Url; type HpkeConfigs = Arc>; type HpkeKeypairs = HashMap>; @@ -141,32 +137,3 @@ impl Drop for GlobalHpkeKeypairCache { self.refresh_handle.abort() } } - -/// Caches taskprov [`PeerAggregator`]'s. This cache is never invalidated, so the process needs to -/// be restarted if there are any changes to peer aggregators. -#[derive(Debug)] -pub struct PeerAggregatorCache { - peers: Vec, -} - -impl PeerAggregatorCache { - pub async fn new(datastore: &Datastore) -> Result { - Ok(Self { - peers: datastore - .run_tx_with_name("refresh_peer_aggregators_cache", |tx| { - Box::pin(async move { tx.get_taskprov_peer_aggregators().await }) - }) - .await? - .into_iter() - .collect(), - }) - } - - pub fn get(&self, endpoint: &Url, role: &Role) -> Option<&PeerAggregator> { - // The peer aggregator table is unlikely to be more than a few entries long (1-2 entries), - // so a linear search should be fine. - self.peers - .iter() - .find(|peer| peer.endpoint() == endpoint && peer.role() == role) - } -} diff --git a/aggregator/src/config.rs b/aggregator/src/config.rs index 61783a7b6..f965d0b34 100644 --- a/aggregator/src/config.rs +++ b/aggregator/src/config.rs @@ -2,6 +2,7 @@ use crate::{metrics::MetricsConfiguration, trace::TraceConfiguration}; use derivative::Derivative; +use janus_messages::{Duration, HpkeConfig}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use std::{ fmt::Debug, @@ -127,6 +128,18 @@ pub struct JobDriverConfig { pub maximum_attempts_before_failure: usize, } +/// Configuration options for the Taskprov extension. This extension is +/// described in [draft-wang-ppm-dap-taskprov][spec], although its configuration +/// options are implementation-specific. +/// +/// [spec]: https://datatracker.ietf.org/doc/draft-wang-ppm-dap-taskprov/ +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct TaskprovConfig { + pub collector_hpke_config: HpkeConfig, + pub report_expiry_age: Option, + pub tolerable_clock_skew: Duration, +} + #[cfg(feature = "test-util")] #[cfg_attr(docsrs, doc(cfg(feature = "test-util")))] pub mod test_util { diff --git a/aggregator/tests/cmd/aggregator.trycmd b/aggregator/tests/cmd/aggregator.trycmd index 60d3e1ae4..cc69c0d4b 100644 --- a/aggregator/tests/cmd/aggregator.trycmd +++ b/aggregator/tests/cmd/aggregator.trycmd @@ -2,7 +2,7 @@ $ aggregator --help DAP aggregator server -Usage: aggregator [OPTIONS] --config-file +Usage: aggregator [OPTIONS] --config-file --aggregator-auth-tokens --verify-key-init Options: --config-file @@ -17,6 +17,10 @@ Options: additional OTLP/gRPC metadata key/value pairs for the metrics exporter [env: OTLP_METRICS_METADATA=] --aggregator-api-auth-tokens [] aggregator API auth tokens, encoded in base64 then comma-separated [env: AGGREGATOR_API_AUTH_TOKENS] + --aggregator-auth-tokens + DAP aggregator tokens, encoded in base64 then comma-separated [env: AGGREGATOR_AUTH_TOKENS] + --verify-key-init + Taskprov verify key init, encoded in base64 [env: VERIFY_KEY_INIT] -h, --help Print help -V, --version diff --git a/aggregator/tests/graceful_shutdown.rs b/aggregator/tests/graceful_shutdown.rs index 8a9710a7b..a499796b4 100644 --- a/aggregator/tests/graceful_shutdown.rs +++ b/aggregator/tests/graceful_shutdown.rs @@ -3,15 +3,25 @@ //! process. The process should promptly shut down, and this test will fail if //! it times out waiting for the process to do so. -use base64::{engine::general_purpose::STANDARD_NO_PAD, Engine}; +use base64::{ + engine::general_purpose::{STANDARD_NO_PAD, URL_SAFE_NO_PAD}, + Engine, +}; use janus_aggregator_core::{ datastore::test_util::ephemeral_datastore, task::{test_util::TaskBuilder, QueryType}, + taskprov::VerifyKeyInit, +}; +use janus_core::{ + hpke::test_util::generate_test_hpke_config_and_private_key, + task::{AuthenticationToken, VdafInstance}, + test_util::install_test_trace_subscriber, + time::RealClock, }; -use janus_core::{task::VdafInstance, test_util::install_test_trace_subscriber, time::RealClock}; use janus_messages::Role; +use rand::random; use reqwest::Url; -use serde_yaml::{Mapping, Value}; +use serde_yaml::{to_value, Mapping, Value}; use std::{ future::Future, io::{ErrorKind, Write}, @@ -119,6 +129,15 @@ async fn graceful_shutdown(binary: &Path, mut config: Mapping) { db_config.insert("url".into(), ephemeral_datastore.connection_string().into()); db_config.insert("connection_pool_timeout_secs".into(), "60".into()); config.insert("database".into(), db_config.into()); + + let mut taskprov_config = Mapping::new(); + taskprov_config.insert( + "collector_hpke_config".into(), + to_value(generate_test_hpke_config_and_private_key().config()).unwrap(), + ); + taskprov_config.insert("tolerable_clock_skew".into(), 60.into()); + config.insert("taskprov_config".into(), taskprov_config.into()); + config.insert( "health_check_listen_address".into(), format!("{health_check_listen_address}").into(), @@ -162,6 +181,14 @@ async fn graceful_shutdown(binary: &Path, mut config: Mapping) { "DATASTORE_KEYS", STANDARD_NO_PAD.encode(ephemeral_datastore.datastore_key_bytes()), ) + .env( + "VERIFY_KEY_INIT", + URL_SAFE_NO_PAD.encode(random::().as_ref()), + ) + .env( + "AGGREGATOR_AUTH_TOKENS", + URL_SAFE_NO_PAD.encode(random::()), + ) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn() diff --git a/aggregator_api/src/lib.rs b/aggregator_api/src/lib.rs index de50cb36a..4f4f5af74 100644 --- a/aggregator_api/src/lib.rs +++ b/aggregator_api/src/lib.rs @@ -107,18 +107,6 @@ pub fn aggregator_api_handler(ds: Arc>, cfg: Config) -> i .delete( "/hpke_configs/:config_id", instrumented(api(delete_global_hpke_config::)), - ) - .get( - "/taskprov/peer_aggregators", - instrumented(api(get_taskprov_peer_aggregators::)), - ) - .post( - "/taskprov/peer_aggregators", - instrumented(api(post_taskprov_peer_aggregator::)), - ) - .delete( - "/taskprov/peer_aggregators", - instrumented(api(delete_taskprov_peer_aggregator::)), ), ) } diff --git a/aggregator_api/src/models.rs b/aggregator_api/src/models.rs index 84e103c0b..06d39b981 100644 --- a/aggregator_api/src/models.rs +++ b/aggregator_api/src/models.rs @@ -2,7 +2,6 @@ use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; use janus_aggregator_core::{ datastore::models::{GlobalHpkeKeypair, HpkeKeyState}, task::{QueryType, Task}, - taskprov::{PeerAggregator, VerifyKeyInit}, }; use janus_core::task::{AuthenticationToken, VdafInstance}; use janus_messages::{ @@ -224,43 +223,3 @@ pub(crate) struct PutGlobalHpkeConfigReq { pub(crate) struct PatchGlobalHpkeConfigReq { pub(crate) state: HpkeKeyState, } - -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] -pub(crate) struct TaskprovPeerAggregatorResp { - pub(crate) endpoint: Url, - pub(crate) role: Role, - pub(crate) collector_hpke_config: HpkeConfig, - pub(crate) report_expiry_age: Option, - pub(crate) tolerable_clock_skew: Duration, -} - -impl From for TaskprovPeerAggregatorResp { - fn from(value: PeerAggregator) -> Self { - // Exclude sensitive values. - Self { - endpoint: value.endpoint().clone(), - role: *value.role(), - collector_hpke_config: value.collector_hpke_config().clone(), - report_expiry_age: value.report_expiry_age().cloned(), - tolerable_clock_skew: *value.tolerable_clock_skew(), - } - } -} - -#[derive(Serialize, Deserialize)] -pub(crate) struct PostTaskprovPeerAggregatorReq { - pub(crate) endpoint: Url, - pub(crate) role: Role, - pub(crate) collector_hpke_config: HpkeConfig, - pub(crate) verify_key_init: VerifyKeyInit, - pub(crate) report_expiry_age: Option, - pub(crate) tolerable_clock_skew: Duration, - pub(crate) aggregator_auth_tokens: Vec, - pub(crate) collector_auth_tokens: Vec, -} - -#[derive(Clone, Serialize, Deserialize)] -pub(crate) struct DeleteTaskprovPeerAggregatorReq { - pub(crate) endpoint: Url, - pub(crate) role: Role, -} diff --git a/aggregator_api/src/routes.rs b/aggregator_api/src/routes.rs index 4ccf326c4..f56152e87 100644 --- a/aggregator_api/src/routes.rs +++ b/aggregator_api/src/routes.rs @@ -1,9 +1,8 @@ use crate::{ models::{ - AggregatorApiConfig, AggregatorRole, DeleteTaskprovPeerAggregatorReq, GetTaskIdsResp, - GetTaskMetricsResp, GlobalHpkeConfigResp, PatchGlobalHpkeConfigReq, PostTaskReq, - PostTaskprovPeerAggregatorReq, PutGlobalHpkeConfigReq, SupportedVdaf, TaskResp, - TaskprovPeerAggregatorResp, + AggregatorApiConfig, AggregatorRole, GetTaskIdsResp, GetTaskMetricsResp, + GlobalHpkeConfigResp, PatchGlobalHpkeConfigReq, PostTaskReq, PutGlobalHpkeConfigReq, + SupportedVdaf, TaskResp, }, Config, ConnExt, Error, }; @@ -11,7 +10,6 @@ use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; use janus_aggregator_core::{ datastore::{self, Datastore}, task::Task, - taskprov::PeerAggregator, SecretBytes, }; use janus_core::{hpke::generate_hpke_config_and_private_key, time::Clock}; @@ -370,80 +368,3 @@ pub(super) async fn delete_global_hpke_config( Err(err) => Err(err.into()), } } - -pub(super) async fn get_taskprov_peer_aggregators( - _: &mut Conn, - State(ds): State>>, -) -> Result>, Error> { - Ok(Json( - ds.run_tx_with_name("get_taskprov_peer_aggregators", |tx| { - Box::pin(async move { tx.get_taskprov_peer_aggregators().await }) - }) - .await? - .into_iter() - .map(TaskprovPeerAggregatorResp::from) - .collect::>(), - )) -} - -/// Inserts a new peer aggregator. Insertion is only supported, attempting to modify an existing -/// peer aggregator will fail. -/// -/// TODO(1685): Requiring that we delete an existing peer aggregator before we can change it makes -/// token rotation cumbersome and fragile. Since token rotation is the main use case for updating -/// an existing peer aggregator, we will resolve peer aggregator updates in that issue. -pub(super) async fn post_taskprov_peer_aggregator( - _: &mut Conn, - (State(ds), Json(req)): ( - State>>, - Json, - ), -) -> Result<(Status, Json), Error> { - let to_insert = PeerAggregator::new( - req.endpoint, - req.role, - req.verify_key_init, - req.collector_hpke_config, - req.report_expiry_age, - req.tolerable_clock_skew, - req.aggregator_auth_tokens, - req.collector_auth_tokens, - ); - - let inserted = ds - .run_tx_with_name("post_taskprov_peer_aggregator", |tx| { - let to_insert = to_insert.clone(); - Box::pin(async move { - tx.put_taskprov_peer_aggregator(&to_insert).await?; - tx.get_taskprov_peer_aggregator(to_insert.endpoint(), to_insert.role()) - .await - }) - }) - .await? - .map(TaskprovPeerAggregatorResp::from) - .ok_or_else(|| Error::Internal("Newly inserted peer aggregator disappeared".to_string()))?; - - Ok((Status::Created, Json(inserted))) -} - -pub(super) async fn delete_taskprov_peer_aggregator( - _: &mut Conn, - (State(ds), Json(req)): ( - State>>, - Json, - ), -) -> Result { - match ds - .run_tx_with_name("delete_taskprov_peer_aggregator", |tx| { - let req = req.clone(); - Box::pin(async move { - tx.delete_taskprov_peer_aggregator(&req.endpoint, &req.role) - .await - }) - }) - .await - { - Ok(_) | Err(datastore::Error::MutationTargetNotFound) => Ok(Status::NoContent), - Err(err) => Err(err.into()), - } -} diff --git a/aggregator_api/src/tests.rs b/aggregator_api/src/tests.rs index 6a59b4899..bd40bdfcd 100644 --- a/aggregator_api/src/tests.rs +++ b/aggregator_api/src/tests.rs @@ -1,9 +1,8 @@ use crate::{ aggregator_api_handler, models::{ - DeleteTaskprovPeerAggregatorReq, GetTaskIdsResp, GetTaskMetricsResp, GlobalHpkeConfigResp, - PatchGlobalHpkeConfigReq, PostTaskReq, PostTaskprovPeerAggregatorReq, - PutGlobalHpkeConfigReq, TaskResp, TaskprovPeerAggregatorResp, + GetTaskIdsResp, GetTaskMetricsResp, GlobalHpkeConfigResp, PatchGlobalHpkeConfigReq, + PostTaskReq, PutGlobalHpkeConfigReq, TaskResp, }, Config, CONTENT_TYPE, }; @@ -19,7 +18,6 @@ use janus_aggregator_core::{ Datastore, }, task::{test_util::TaskBuilder, QueryType, Task}, - taskprov::test_util::PeerAggregatorBuilder, SecretBytes, }; use janus_core::{ @@ -49,7 +47,6 @@ use trillium::{Handler, Status}; use trillium_testing::{ assert_response, assert_status, prelude::{delete, get, patch, post, put}, - Url, }; const AUTH_TOKEN: &str = "Y29sbGVjdG9yLWFiY2RlZjAw"; @@ -1245,224 +1242,6 @@ async fn delete_global_hpke_config() { ); } -#[tokio::test] -async fn get_taskprov_peer_aggregator() { - let (handler, _ephemeral_datastore, ds) = setup_api_test().await; - - let leader = PeerAggregatorBuilder::new() - .with_endpoint(Url::parse("https://leader.example.com/").unwrap()) - .with_role(Role::Leader) - .build(); - let helper = PeerAggregatorBuilder::new() - .with_endpoint(Url::parse("https://helper.example.com/").unwrap()) - .with_role(Role::Helper) - .build(); - - ds.run_tx(|tx| { - let leader = leader.clone(); - let helper = helper.clone(); - Box::pin(async move { - tx.put_taskprov_peer_aggregator(&leader).await?; - tx.put_taskprov_peer_aggregator(&helper).await?; - Ok(()) - }) - }) - .await - .unwrap(); - - // List all. - let mut conn = get("/taskprov/peer_aggregators") - .with_request_header("Authorization", format!("Bearer {AUTH_TOKEN}")) - .with_request_header("Accept", CONTENT_TYPE) - .with_request_header("Content-Type", CONTENT_TYPE) - .run_async(&handler) - .await; - assert_response!(conn, Status::Ok); - let mut resp: Vec = serde_json::from_slice( - &conn - .take_response_body() - .unwrap() - .into_bytes() - .await - .unwrap(), - ) - .unwrap(); - resp.sort_by(|a, b| a.endpoint.cmp(&b.endpoint)); - - let mut expected = vec![ - TaskprovPeerAggregatorResp { - endpoint: leader.endpoint().clone(), - role: *leader.role(), - collector_hpke_config: leader.collector_hpke_config().clone(), - report_expiry_age: leader.report_expiry_age().cloned(), - tolerable_clock_skew: *leader.tolerable_clock_skew(), - }, - TaskprovPeerAggregatorResp { - endpoint: helper.endpoint().clone(), - role: *helper.role(), - collector_hpke_config: helper.collector_hpke_config().clone(), - report_expiry_age: helper.report_expiry_age().cloned(), - tolerable_clock_skew: *helper.tolerable_clock_skew(), - }, - ]; - expected.sort_by(|a, b| a.endpoint.cmp(&b.endpoint)); - - assert_eq!(resp, expected); - - // Missing authorization. - assert_response!( - get("/taskprov/peer_aggregators") - .with_request_header("Accept", CONTENT_TYPE) - .with_request_header("Content-Type", CONTENT_TYPE) - .run_async(&handler) - .await, - Status::Unauthorized - ); -} - -#[tokio::test] -async fn post_taskprov_peer_aggregator() { - let (handler, _ephemeral_datastore, ds) = setup_api_test().await; - - let endpoint = Url::parse("https://leader.example.com/").unwrap(); - let leader = PeerAggregatorBuilder::new() - .with_endpoint(endpoint.clone()) - .with_role(Role::Leader) - .build(); - - let req = PostTaskprovPeerAggregatorReq { - endpoint, - role: Role::Leader, - collector_hpke_config: leader.collector_hpke_config().clone(), - verify_key_init: *leader.verify_key_init(), - report_expiry_age: leader.report_expiry_age().cloned(), - tolerable_clock_skew: *leader.tolerable_clock_skew(), - aggregator_auth_tokens: Vec::from(leader.aggregator_auth_tokens()), - collector_auth_tokens: Vec::from(leader.collector_auth_tokens()), - }; - - let mut conn = post("/taskprov/peer_aggregators") - .with_request_body(serde_json::to_vec(&req).unwrap()) - .with_request_header("Authorization", format!("Bearer {AUTH_TOKEN}")) - .with_request_header("Accept", CONTENT_TYPE) - .with_request_header("Content-Type", CONTENT_TYPE) - .run_async(&handler) - .await; - assert_response!(conn, Status::Created); - assert_eq!( - serde_json::from_slice::( - &conn - .take_response_body() - .unwrap() - .into_bytes() - .await - .unwrap(), - ) - .unwrap(), - leader.clone().into() - ); - - assert_eq!( - ds.run_tx(|tx| { Box::pin(async move { tx.get_taskprov_peer_aggregators().await }) }) - .await - .unwrap(), - vec![leader] - ); - - // Can't insert the same aggregator. - assert_response!( - post("/taskprov/peer_aggregators") - .with_request_body(serde_json::to_vec(&req).unwrap()) - .with_request_header("Authorization", format!("Bearer {AUTH_TOKEN}")) - .with_request_header("Accept", CONTENT_TYPE) - .with_request_header("Content-Type", CONTENT_TYPE) - .run_async(&handler) - .await, - Status::Conflict - ); - - // Missing authorization. - assert_response!( - post("/taskprov/peer_aggregators") - .with_request_body(serde_json::to_vec(&req).unwrap()) - .with_request_header("Accept", CONTENT_TYPE) - .with_request_header("Content-Type", CONTENT_TYPE) - .run_async(&handler) - .await, - Status::Unauthorized - ); -} - -#[tokio::test] -async fn delete_taskprov_peer_aggregator() { - let (handler, _ephemeral_datastore, ds) = setup_api_test().await; - - let endpoint = Url::parse("https://leader.example.com/").unwrap(); - let leader = PeerAggregatorBuilder::new() - .with_endpoint(endpoint.clone()) - .with_role(Role::Leader) - .build(); - - ds.run_tx(|tx| { - let leader = leader.clone(); - Box::pin(async move { tx.put_taskprov_peer_aggregator(&leader).await }) - }) - .await - .unwrap(); - - let req = DeleteTaskprovPeerAggregatorReq { - endpoint, - role: Role::Leader, - }; - - // Delete target. - assert_response!( - delete("/taskprov/peer_aggregators") - .with_request_body(serde_json::to_vec(&req).unwrap()) - .with_request_header("Authorization", format!("Bearer {AUTH_TOKEN}")) - .with_request_header("Accept", CONTENT_TYPE) - .with_request_header("Content-Type", CONTENT_TYPE) - .run_async(&handler) - .await, - Status::NoContent - ); - - assert_eq!( - ds.run_tx(|tx| { Box::pin(async move { tx.get_taskprov_peer_aggregators().await }) }) - .await - .unwrap(), - vec![] - ); - - // Non-existent target. - assert_response!( - delete("/taskprov/peer_aggregators") - .with_request_body( - serde_json::to_vec(&DeleteTaskprovPeerAggregatorReq { - endpoint: Url::parse("https://doesnt-exist.example.com/").unwrap(), - role: Role::Leader, - }) - .unwrap() - ) - .with_request_header("Authorization", format!("Bearer {AUTH_TOKEN}")) - .with_request_header("Accept", CONTENT_TYPE) - .with_request_header("Content-Type", CONTENT_TYPE) - .run_async(&handler) - .await, - Status::NoContent - ); - - // Missing authorization. - assert_response!( - delete("/taskprov/peer_aggregators") - .with_request_header("Accept", CONTENT_TYPE) - .with_request_header("Content-Type", CONTENT_TYPE) - .run_async(&handler) - .await, - Status::Unauthorized - ); -} - #[test] fn get_task_ids_resp_serialization() { assert_ser_tokens( diff --git a/aggregator_core/src/datastore.rs b/aggregator_core/src/datastore.rs index a63c8b854..04e033479 100644 --- a/aggregator_core/src/datastore.rs +++ b/aggregator_core/src/datastore.rs @@ -10,15 +10,14 @@ use self::models::{ use crate::{ query_type::{AccumulableQueryType, CollectableQueryType}, task::{self, Task}, - taskprov::{self, PeerAggregator}, - SecretBytes, + taskprov, SecretBytes, }; use anyhow::anyhow; use chrono::NaiveDateTime; use futures::future::try_join_all; use janus_core::{ hpke::{HpkeKeypair, HpkePrivateKey}, - task::{AuthenticationToken, VdafInstance}, + task::VdafInstance, time::{Clock, TimeExt}, }; use janus_messages::{ @@ -4418,336 +4417,6 @@ impl Transaction<'_, C> { .await?, ) } - - #[tracing::instrument(skip(self), err)] - pub async fn get_taskprov_peer_aggregators(&self) -> Result, Error> { - let stmt = self - .prepare_cached( - "SELECT id, endpoint, role, verify_key_init, collector_hpke_config, - report_expiry_age, tolerable_clock_skew - FROM taskprov_peer_aggregators", - ) - .await?; - let peer_aggregator_rows = self.query(&stmt, &[]); - - let stmt = self - .prepare_cached( - "SELECT (SELECT p.id FROM taskprov_peer_aggregators AS p - WHERE p.id = a.peer_aggregator_id) AS peer_id, - ord, type, token FROM taskprov_aggregator_auth_tokens AS a - ORDER BY ord ASC", - ) - .await?; - let aggregator_auth_token_rows = self.query(&stmt, &[]); - - let stmt = self - .prepare_cached( - "SELECT (SELECT p.id FROM taskprov_peer_aggregators AS p - WHERE p.id = a.peer_aggregator_id) AS peer_id, - ord, type, token FROM taskprov_collector_auth_tokens AS a - ORDER BY ord ASC", - ) - .await?; - let collector_auth_token_rows = self.query(&stmt, &[]); - - let (peer_aggregator_rows, aggregator_auth_token_rows, collector_auth_token_rows) = try_join!( - peer_aggregator_rows, - aggregator_auth_token_rows, - collector_auth_token_rows, - )?; - - let mut aggregator_auth_token_rows_by_peer_id: HashMap> = HashMap::new(); - for row in aggregator_auth_token_rows { - aggregator_auth_token_rows_by_peer_id - .entry(row.get("peer_id")) - .or_default() - .push(row); - } - - let mut collector_auth_token_rows_by_peer_id: HashMap> = HashMap::new(); - for row in collector_auth_token_rows { - collector_auth_token_rows_by_peer_id - .entry(row.get("peer_id")) - .or_default() - .push(row); - } - - peer_aggregator_rows - .into_iter() - .map(|row| (row.get("id"), row)) - .map(|(peer_id, peer_aggregator_row)| { - self.taskprov_peer_aggregator_from_rows( - &peer_aggregator_row, - &aggregator_auth_token_rows_by_peer_id - .remove(&peer_id) - .unwrap_or_default(), - &collector_auth_token_rows_by_peer_id - .remove(&peer_id) - .unwrap_or_default(), - ) - }) - .collect() - } - - #[tracing::instrument(skip(self), err)] - pub async fn get_taskprov_peer_aggregator( - &self, - aggregator_url: &Url, - role: &Role, - ) -> Result, Error> { - let aggregator_url = aggregator_url.as_str(); - let role = AggregatorRole::from_role(*role)?; - let params: &[&(dyn ToSql + Sync)] = &[&aggregator_url, &role]; - - let stmt = self - .prepare_cached( - "SELECT id, endpoint, role, verify_key_init, collector_hpke_config, - report_expiry_age, tolerable_clock_skew - FROM taskprov_peer_aggregators WHERE endpoint = $1 AND role = $2", - ) - .await?; - let peer_aggregator_row = self.query_opt(&stmt, params); - - let stmt = self - .prepare_cached( - "SELECT ord, type, token FROM taskprov_aggregator_auth_tokens - WHERE peer_aggregator_id = (SELECT id FROM taskprov_peer_aggregators - WHERE endpoint = $1 AND role = $2) - ORDER BY ord ASC", - ) - .await?; - let aggregator_auth_token_rows = self.query(&stmt, params); - - let stmt = self - .prepare_cached( - "SELECT ord, type, token FROM taskprov_collector_auth_tokens - WHERE peer_aggregator_id = (SELECT id FROM taskprov_peer_aggregators - WHERE endpoint = $1 AND role = $2) - ORDER BY ord ASC", - ) - .await?; - let collector_auth_token_rows = self.query(&stmt, params); - - let (peer_aggregator_row, aggregator_auth_token_rows, collector_auth_token_rows) = try_join!( - peer_aggregator_row, - aggregator_auth_token_rows, - collector_auth_token_rows, - )?; - peer_aggregator_row - .map(|peer_aggregator_row| { - self.taskprov_peer_aggregator_from_rows( - &peer_aggregator_row, - &aggregator_auth_token_rows, - &collector_auth_token_rows, - ) - }) - .transpose() - } - - fn taskprov_peer_aggregator_from_rows( - &self, - peer_aggregator_row: &Row, - aggregator_auth_token_rows: &[Row], - collector_auth_token_rows: &[Row], - ) -> Result { - let endpoint = Url::parse(peer_aggregator_row.get::<_, &str>("endpoint"))?; - let endpoint_bytes = endpoint.as_str().as_ref(); - let role: AggregatorRole = peer_aggregator_row.get("role"); - let report_expiry_age = peer_aggregator_row - .get_nullable_bigint_and_convert("report_expiry_age")? - .map(Duration::from_seconds); - let tolerable_clock_skew = Duration::from_seconds( - peer_aggregator_row.get_bigint_and_convert("tolerable_clock_skew")?, - ); - let collector_hpke_config = - HpkeConfig::get_decoded(peer_aggregator_row.get("collector_hpke_config"))?; - - let encrypted_verify_key_init: Vec = peer_aggregator_row.get("verify_key_init"); - let verify_key_init = self - .crypter - .decrypt( - "taskprov_peer_aggregator", - endpoint_bytes, - "verify_key_init", - &encrypted_verify_key_init, - )? - .as_slice() - .try_into()?; - - let decrypt_tokens = |rows: &[Row], table| -> Result, Error> { - rows.iter() - .map(|row| { - let ord: i64 = row.get("ord"); - let auth_token_type: AuthenticationTokenType = row.get("type"); - let encrypted_token: Vec = row.get("token"); - - let mut row_id = Vec::new(); - row_id.extend_from_slice(endpoint_bytes); - row_id.extend_from_slice(&role.as_role().get_encoded()); - row_id.extend_from_slice(&ord.to_be_bytes()); - - auth_token_type.as_authentication(&self.crypter.decrypt( - table, - &row_id, - "token", - &encrypted_token, - )?) - }) - .collect() - }; - - let aggregator_auth_tokens = decrypt_tokens( - aggregator_auth_token_rows, - "taskprov_aggregator_auth_tokens", - )?; - let collector_auth_tokens = - decrypt_tokens(collector_auth_token_rows, "taskprov_collector_auth_tokens")?; - - Ok(PeerAggregator::new( - endpoint, - role.as_role(), - verify_key_init, - collector_hpke_config, - report_expiry_age, - tolerable_clock_skew, - aggregator_auth_tokens, - collector_auth_tokens, - )) - } - - #[tracing::instrument(skip(self), err)] - pub async fn put_taskprov_peer_aggregator( - &self, - peer_aggregator: &PeerAggregator, - ) -> Result<(), Error> { - let endpoint = peer_aggregator.endpoint().as_str(); - let role = &AggregatorRole::from_role(*peer_aggregator.role())?; - let encrypted_verify_key_init = self.crypter.encrypt( - "taskprov_peer_aggregator", - endpoint.as_ref(), - "verify_key_init", - peer_aggregator.verify_key_init().as_ref(), - )?; - - let stmt = self - .prepare_cached( - "INSERT INTO taskprov_peer_aggregators ( - endpoint, role, verify_key_init, tolerable_clock_skew, report_expiry_age, - collector_hpke_config - ) VALUES ($1, $2, $3, $4, $5, $6) - ON CONFLICT DO NOTHING", - ) - .await?; - check_insert( - self.execute( - &stmt, - &[ - /* endpoint */ &endpoint, - /* role */ role, - /* verify_key_init */ &encrypted_verify_key_init, - /* tolerable_clock_skew */ - &i64::try_from(peer_aggregator.tolerable_clock_skew().as_seconds())?, - /* report_expiry_age */ - &peer_aggregator - .report_expiry_age() - .map(Duration::as_seconds) - .map(i64::try_from) - .transpose()?, - /* collector_hpke_config */ - &peer_aggregator.collector_hpke_config().get_encoded(), - ], - ) - .await?, - )?; - - let encrypt_tokens = |tokens: &[AuthenticationToken], table| -> Result<_, Error> { - let mut ords = Vec::new(); - let mut types = Vec::new(); - let mut encrypted_tokens = Vec::new(); - for (ord, token) in tokens.iter().enumerate() { - let ord = i64::try_from(ord)?; - - let mut row_id = Vec::new(); - row_id.extend_from_slice(endpoint.as_ref()); - row_id.extend_from_slice(&role.as_role().get_encoded()); - row_id.extend_from_slice(&ord.to_be_bytes()); - - let encrypted_auth_token = - self.crypter - .encrypt(table, &row_id, "token", token.as_ref())?; - - ords.push(ord); - types.push(AuthenticationTokenType::from(token)); - encrypted_tokens.push(encrypted_auth_token); - } - Ok((ords, types, encrypted_tokens)) - }; - - let (aggregator_auth_token_ords, aggregator_auth_token_types, aggregator_auth_tokens) = - encrypt_tokens( - peer_aggregator.aggregator_auth_tokens(), - "taskprov_aggregator_auth_tokens", - )?; - let stmt = self - .prepare_cached( - "INSERT INTO taskprov_aggregator_auth_tokens (peer_aggregator_id, ord, type, token) - SELECT - (SELECT id FROM taskprov_peer_aggregators WHERE endpoint = $1 AND role = $2), - * FROM UNNEST($3::BIGINT[], $4::AUTH_TOKEN_TYPE[], $5::BYTEA[])", - ) - .await?; - let aggregator_auth_tokens_params: &[&(dyn ToSql + Sync)] = &[ - /* endpoint */ &endpoint, - /* role */ role, - /* ords */ &aggregator_auth_token_ords, - /* token_types */ &aggregator_auth_token_types, - /* tokens */ &aggregator_auth_tokens, - ]; - let aggregator_auth_tokens_future = self.execute(&stmt, aggregator_auth_tokens_params); - - let (collector_auth_token_ords, collector_auth_token_types, collector_auth_tokens) = - encrypt_tokens( - peer_aggregator.collector_auth_tokens(), - "taskprov_collector_auth_tokens", - )?; - let stmt = self - .prepare_cached( - "INSERT INTO taskprov_collector_auth_tokens (peer_aggregator_id, ord, type, token) - SELECT - (SELECT id FROM taskprov_peer_aggregators WHERE endpoint = $1 AND role = $2), - * FROM UNNEST($3::BIGINT[], $4::AUTH_TOKEN_TYPE[], $5::BYTEA[])", - ) - .await?; - let collector_auth_tokens_params: &[&(dyn ToSql + Sync)] = &[ - /* endpoint */ &endpoint, - /* role */ role, - /* ords */ &collector_auth_token_ords, - /* token_types */ &collector_auth_token_types, - /* tokens */ &collector_auth_tokens, - ]; - let collector_auth_tokens_future = self.execute(&stmt, collector_auth_tokens_params); - - try_join!(aggregator_auth_tokens_future, collector_auth_tokens_future)?; - Ok(()) - } - - #[tracing::instrument(skip(self), err)] - pub async fn delete_taskprov_peer_aggregator( - &self, - aggregator_url: &Url, - role: &Role, - ) -> Result<(), Error> { - let aggregator_url = aggregator_url.as_str(); - let role = AggregatorRole::from_role(*role)?; - - // Deletion of other data implemented via ON DELETE CASCADE. - let stmt = self - .prepare_cached( - "DELETE FROM taskprov_peer_aggregators WHERE endpoint = $1 AND role = $2", - ) - .await?; - check_single_row_mutation(self.execute(&stmt, &[&aggregator_url, &role]).await?) - } } fn check_insert(row_count: u64) -> Result<(), Error> { diff --git a/aggregator_core/src/datastore/tests.rs b/aggregator_core/src/datastore/tests.rs index 58542d8ed..848bfaeed 100644 --- a/aggregator_core/src/datastore/tests.rs +++ b/aggregator_core/src/datastore/tests.rs @@ -16,7 +16,6 @@ use crate::{ }, query_type::CollectableQueryType, task::{self, test_util::TaskBuilder, Task}, - taskprov::test_util::PeerAggregatorBuilder, test_util::noop_meter, }; @@ -56,7 +55,6 @@ use std::{ time::Duration as StdDuration, }; use tokio::time::timeout; -use url::Url; const OLDEST_ALLOWED_REPORT_TIMESTAMP: Time = Time::from_seconds_since_epoch(1000); const REPORT_EXPIRY_AGE: Duration = Duration::from_seconds(1000); @@ -6727,103 +6725,3 @@ async fn roundtrip_global_hpke_keypair(ephemeral_datastore: EphemeralDatastore) .await .unwrap(); } - -#[rstest_reuse::apply(schema_versions_template)] -#[tokio::test] -async fn roundtrip_taskprov_peer_aggregator(ephemeral_datastore: EphemeralDatastore) { - install_test_trace_subscriber(); - let datastore = ephemeral_datastore.datastore(MockClock::default()).await; - - // Basic aggregator. - let example_leader_peer_aggregator = - PeerAggregatorBuilder::new().with_role(Role::Leader).build(); - let example_helper_peer_aggregator = PeerAggregatorBuilder::new() - .with_role(Role::Helper) - .with_aggregator_auth_tokens(vec![random(), random()]) - .with_collector_auth_tokens(vec![]) - .build(); - let another_example_leader_peer_aggregator = PeerAggregatorBuilder::new() - .with_endpoint(Url::parse("https://another.example.com/").unwrap()) - .with_aggregator_auth_tokens(vec![]) - .with_collector_auth_tokens(vec![random(), random()]) - .build(); - - datastore - .run_tx(|tx| { - let example_leader_peer_aggregator = example_leader_peer_aggregator.clone(); - let example_helper_peer_aggregator = example_helper_peer_aggregator.clone(); - let another_example_leader_peer_aggregator = - another_example_leader_peer_aggregator.clone(); - Box::pin(async move { - tx.put_taskprov_peer_aggregator(&example_leader_peer_aggregator) - .await?; - tx.put_taskprov_peer_aggregator(&example_helper_peer_aggregator) - .await?; - tx.put_taskprov_peer_aggregator(&another_example_leader_peer_aggregator) - .await?; - Ok(()) - }) - }) - .await - .unwrap(); - - // Should not be able to put an aggregator with the same endpoint and role. - assert_matches!( - datastore - .run_tx(|tx| { - Box::pin(async move { - let colliding_peer_aggregator = PeerAggregatorBuilder::new().build(); - tx.put_taskprov_peer_aggregator(&colliding_peer_aggregator) - .await - }) - }) - .await, - Err(Error::MutationTargetAlreadyExists) - ); - - datastore - .run_tx(|tx| { - let example_leader_peer_aggregator = example_leader_peer_aggregator.clone(); - let example_helper_peer_aggregator = example_helper_peer_aggregator.clone(); - let another_example_leader_peer_aggregator = - another_example_leader_peer_aggregator.clone(); - Box::pin(async move { - for peer in [ - example_leader_peer_aggregator.clone(), - example_helper_peer_aggregator.clone(), - another_example_leader_peer_aggregator.clone(), - ] { - assert_eq!( - tx.get_taskprov_peer_aggregator(peer.endpoint(), peer.role()) - .await - .unwrap(), - Some(peer.clone()), - ); - } - - assert_eq!( - tx.get_taskprov_peer_aggregators().await.unwrap(), - vec![ - example_leader_peer_aggregator.clone(), - example_helper_peer_aggregator.clone(), - another_example_leader_peer_aggregator.clone(), - ] - ); - - for peer in [ - example_leader_peer_aggregator.clone(), - example_helper_peer_aggregator.clone(), - another_example_leader_peer_aggregator.clone(), - ] { - tx.delete_taskprov_peer_aggregator(peer.endpoint(), peer.role()) - .await - .unwrap(); - } - assert_eq!(tx.get_taskprov_peer_aggregators().await.unwrap(), vec![]); - - Ok(()) - }) - }) - .await - .unwrap(); -} diff --git a/aggregator_core/src/taskprov.rs b/aggregator_core/src/taskprov.rs index fb8facfe6..7c88104ae 100644 --- a/aggregator_core/src/taskprov.rs +++ b/aggregator_core/src/taskprov.rs @@ -4,8 +4,8 @@ use crate::{ }; use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; use derivative::Derivative; -use janus_core::task::{AuthenticationToken, VdafInstance}; -use janus_messages::{Duration, HpkeConfig, Role, TaskId, Time}; +use janus_core::task::VdafInstance; +use janus_messages::{Duration, Role, TaskId, Time}; use lazy_static::lazy_static; use rand::{distributions::Standard, prelude::Distribution}; use ring::hkdf::{KeyType, Salt, HKDF_SHA256}; @@ -87,41 +87,6 @@ impl Distribution for Standard { } } -/// Represents another aggregator that is peered with our aggregator for taskprov purposes. Contains -/// data that needs to be identical between both aggregators for the taskprov flow to work. -#[derive(Debug, Clone, Derivative, PartialEq, Eq)] -pub struct PeerAggregator { - /// The URL at which the peer aggregator can be reached. This, along with `role`, is used to - /// uniquely represent the peer aggregator. - endpoint: Url, - - /// The role that the peer aggregator takes in DAP. Must be [`Role::Leader`] or [`Role::Helper`]. - /// This, along with `endpoint`, uniquely represents the peer aggregator. - role: Role, - - /// The preshared key used to derive the VDAF verify key for each task. - verify_key_init: VerifyKeyInit, - - // The HPKE configuration of the collector. This needs to be shared out-of-band with the peer - // aggregator. - collector_hpke_config: HpkeConfig, - - /// How long reports exist until they're eligible for GC. Set to None for no GC. This value is - /// copied into the definition for a provisioned task. - report_expiry_age: Option, - - /// The maximum allowable clock skew between peers. This value is copied into the definition for - /// a provisioned task. - tolerable_clock_skew: Duration, - - /// Auth tokens used for authenticating Leader to Helper requests. - aggregator_auth_tokens: Vec, - - /// Auth tokens used for authenticating Collector to Leader requests. It should be empty if the - /// peer aggregator is the Leader. - collector_auth_tokens: Vec, -} - lazy_static! { /// Salt generated by the SHA256 of the string 'dap-taskprov". See [taskprov section 3.2][1]. /// @@ -136,101 +101,7 @@ lazy_static! { ); } -impl PeerAggregator { - #[allow(clippy::too_many_arguments)] - pub fn new( - endpoint: Url, - role: Role, - verify_key_init: VerifyKeyInit, - collector_hpke_config: HpkeConfig, - report_expiry_age: Option, - tolerable_clock_skew: Duration, - aggregator_auth_tokens: Vec, - collector_auth_tokens: Vec, - ) -> Self { - Self { - endpoint, - role, - verify_key_init, - collector_hpke_config, - report_expiry_age, - tolerable_clock_skew, - aggregator_auth_tokens, - collector_auth_tokens, - } - } - - /// Retrieve the URL endpoint of the peer. - pub fn endpoint(&self) -> &Url { - &self.endpoint - } - - /// Retrieve the role of the peer. - pub fn role(&self) -> &Role { - &self.role - } - - /// Retrieve the VDAF verify key initialization parameter, used for derivation of the VDAF - /// verify key for a task. - pub fn verify_key_init(&self) -> &VerifyKeyInit { - &self.verify_key_init - } - - /// Retrieve the collector HPKE configuration for this peer. - pub fn collector_hpke_config(&self) -> &HpkeConfig { - &self.collector_hpke_config - } - - /// Retrieve the report expiry age that each task will be configured with. - pub fn report_expiry_age(&self) -> Option<&Duration> { - self.report_expiry_age.as_ref() - } - - /// Retrieve the maximum tolerable clock skew that each task will be configured with. - pub fn tolerable_clock_skew(&self) -> &Duration { - &self.tolerable_clock_skew - } - - /// Retrieve the [`AuthenticationToken`]s used for authenticating leader to helper requests. - pub fn aggregator_auth_tokens(&self) -> &[AuthenticationToken] { - &self.aggregator_auth_tokens - } - - /// Retrieve the [`AuthenticationToken`]s used for authenticating collector to leader requests. - pub fn collector_auth_tokens(&self) -> &[AuthenticationToken] { - &self.collector_auth_tokens - } - - /// Returns the [`AuthenticationToken`] currently used by this peer to authenticate itself. - pub fn primary_aggregator_auth_token(&self) -> &AuthenticationToken { - self.aggregator_auth_tokens.iter().next_back().unwrap() - } - - /// Checks if the given aggregator authentication token is valid (i.e. matches with an - /// authentication token recognized by this task). - pub fn check_aggregator_auth_token(&self, auth_token: &AuthenticationToken) -> bool { - self.aggregator_auth_tokens - .iter() - .rev() - .any(|t| t == auth_token) - } - - /// Returns the [`AuthenticationToken`] currently used by the collector to authenticate itself - /// to the aggregators. - pub fn primary_collector_auth_token(&self) -> &AuthenticationToken { - // Unwrap safety: self.collector_auth_tokens is never empty - self.collector_auth_tokens.iter().next_back().unwrap() - } - - /// Checks if the given collector authentication token is valid (i.e. matches with an - /// authentication token recognized by this task). - pub fn check_collector_auth_token(&self, auth_token: &AuthenticationToken) -> bool { - self.collector_auth_tokens - .iter() - .rev() - .any(|t| t == auth_token) - } - +impl VerifyKeyInit { /// Computes the VDAF verify key using the method defined in [draft-wang-ppm-dap-taskprov][1]. /// /// [1]: https://www.ietf.org/archive/id/draft-wang-ppm-dap-taskprov-04.html#name-deriving-the-vdaf-verificat @@ -239,7 +110,7 @@ impl PeerAggregator { task_id: &TaskId, vdaf_instance: &VdafInstance, ) -> SecretBytes { - let prk = SALT.extract(self.verify_key_init.as_ref()); + let prk = SALT.extract(self.0.as_ref()); let info = [task_id.as_ref().as_slice()]; // Unwrap safety: this function only errors if the OKM length is too long @@ -332,106 +203,3 @@ impl From for task::Task { value.0 } } - -#[cfg(feature = "test-util")] -#[cfg_attr(docsrs, doc(cfg(feature = "test-util")))] -pub mod test_util { - use janus_core::{ - hpke::test_util::generate_test_hpke_config_and_private_key, task::AuthenticationToken, - }; - use janus_messages::{Duration, HpkeConfig, Role}; - use rand::random; - use url::Url; - - use super::{PeerAggregator, VerifyKeyInit}; - - #[derive(Debug, Clone)] - pub struct PeerAggregatorBuilder(PeerAggregator); - - impl PeerAggregatorBuilder { - pub fn new() -> Self { - Self(PeerAggregator::new( - Url::parse("https://example.com").unwrap(), - Role::Leader, - random(), - generate_test_hpke_config_and_private_key().config().clone(), - None, - Duration::from_seconds(1), - Vec::from([random()]), - Vec::from([random()]), - )) - } - - pub fn with_endpoint(self, endpoint: Url) -> Self { - Self(PeerAggregator { endpoint, ..self.0 }) - } - - pub fn with_role(self, role: Role) -> Self { - Self(PeerAggregator { role, ..self.0 }) - } - - pub fn with_verify_key_init(self, verify_key_init: VerifyKeyInit) -> Self { - Self(PeerAggregator { - verify_key_init, - ..self.0 - }) - } - - pub fn with_collector_hpke_config(self, collector_hpke_config: HpkeConfig) -> Self { - Self(PeerAggregator { - collector_hpke_config, - ..self.0 - }) - } - - pub fn with_report_expiry_age(self, report_expiry_age: Option) -> Self { - Self(PeerAggregator { - report_expiry_age, - ..self.0 - }) - } - - pub fn with_tolerable_clock_skew(self, tolerable_clock_skew: Duration) -> Self { - Self(PeerAggregator { - tolerable_clock_skew, - ..self.0 - }) - } - - pub fn with_aggregator_auth_tokens( - self, - aggregator_auth_tokens: Vec, - ) -> Self { - Self(PeerAggregator { - aggregator_auth_tokens, - ..self.0 - }) - } - - pub fn with_collector_auth_tokens( - self, - collector_auth_tokens: Vec, - ) -> Self { - Self(PeerAggregator { - collector_auth_tokens, - ..self.0 - }) - } - - pub fn build(self) -> PeerAggregator { - self.0 - } - } - - impl From for PeerAggregatorBuilder { - fn from(value: PeerAggregator) -> Self { - Self(value) - } - } - - impl Default for PeerAggregatorBuilder { - fn default() -> Self { - Self::new() - } - } -} diff --git a/db/00000000000003_taskprov_sub01.down.sql b/db/00000000000003_taskprov_sub01.down.sql new file mode 100644 index 000000000..c9b17e52d --- /dev/null +++ b/db/00000000000003_taskprov_sub01.down.sql @@ -0,0 +1,38 @@ +-- Another DAP aggregator who we've partnered with to use the taskprov extension. +CREATE TABLE taskprov_peer_aggregators( + id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, -- artificial ID, internal only. + endpoint TEXT NOT NULL, -- peer aggregator HTTPS endpoint + role AGGREGATOR_ROLE NOT NULL, -- the role of this aggregator relative to the peer + verify_key_init BYTEA NOT NULL, -- the preshared key used for VDAF verify key derivation. + + -- Parameters applied to every task created with this peer aggregator. + tolerable_clock_skew BIGINT NOT NULL, -- the maximum acceptable clock skew to allow between client and aggregator, in seconds + report_expiry_age BIGINT, -- the maximum age of a report before it is considered expired (and acceptable for garbage collection), in seconds. NULL means that GC is disabled. + collector_hpke_config BYTEA NOT NULL, -- the HPKE config of the collector (encoded HpkeConfig message) + + CONSTRAINT taskprov_peer_aggregator_endpoint_and_role_unique UNIQUE(endpoint, role) +); + +-- Task aggregator auth tokens that we've shared with the peer aggregator. +CREATE TABLE taskprov_aggregator_auth_tokens( + id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, -- artificial ID, internal-only + peer_aggregator_id BIGINT NOT NULL, -- task ID the token is associated with + ord BIGINT NOT NULL, -- a value used to specify the ordering of the authentication tokens + token BYTEA NOT NULL, -- bearer token used to authenticate messages to/from the other aggregator (encrypted) + type AUTH_TOKEN_TYPE NOT NULL DEFAULT 'BEARER', + + CONSTRAINT task_aggregator_auth_tokens_unique_peer_aggregator_id_and_ord UNIQUE(peer_aggregator_id, ord), + CONSTRAINT fk_peer_aggregator_id FOREIGN KEY(peer_aggregator_id) REFERENCES taskprov_peer_aggregators(id) ON DELETE CASCADE +); + +-- Task collector auth tokens that we've shared with the peer aggregator. +CREATE TABLE taskprov_collector_auth_tokens( + id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, -- artificial ID, internal-only + peer_aggregator_id BIGINT NOT NULL, -- task ID the token is associated with + ord BIGINT NOT NULL, -- a value used to specify the ordering of the authentication tokens + token BYTEA NOT NULL, -- bearer token used to authenticate messages to/from the other aggregator (encrypted) + type AUTH_TOKEN_TYPE NOT NULL DEFAULT 'BEARER', + + CONSTRAINT task_collector_auth_tokens_unique_peer_aggregator_id_and_ord UNIQUE(peer_aggregator_id, ord), + CONSTRAINT fk_peer_aggregator_id FOREIGN KEY(peer_aggregator_id) REFERENCES taskprov_peer_aggregators(id) ON DELETE CASCADE +); diff --git a/db/00000000000003_taskprov_sub01.up.sql b/db/00000000000003_taskprov_sub01.up.sql new file mode 100644 index 000000000..11d83f3ac --- /dev/null +++ b/db/00000000000003_taskprov_sub01.up.sql @@ -0,0 +1,3 @@ +DROP TABLE taskprov_peer_aggregators CASCADE; +DROP TABLE taskprov_aggregator_auth_tokens; +DROP TABLE taskprov_collector_auth_tokens; \ No newline at end of file diff --git a/docs/samples/advanced_config/aggregator.yaml b/docs/samples/advanced_config/aggregator.yaml index 1b06f69ed..69fd2e5ca 100644 --- a/docs/samples/advanced_config/aggregator.yaml +++ b/docs/samples/advanced_config/aggregator.yaml @@ -104,3 +104,13 @@ garbage_collection: # The maximum number of collection jobs (& related artifacts), per task, to delete in a single run # of the garbage collector. collection_limit: 50 + +taskprov_config: + collector_hpke_config: + id: 183 + kem_id: X25519HkdfSha256 + kdf_id: HkdfSha256 + aead_id: Aes128Gcm + public_key: 4qiv6IY5jrjCV3xbaQXULmPIpvoIml1oJmeXm-yOuAo + tolerable_clock_skew: 60 + report_expiry_age: 2592000 diff --git a/docs/samples/basic_config/aggregator.yaml b/docs/samples/basic_config/aggregator.yaml index 37c5932d4..8e6368982 100644 --- a/docs/samples/basic_config/aggregator.yaml +++ b/docs/samples/basic_config/aggregator.yaml @@ -21,3 +21,12 @@ max_upload_batch_write_delay_ms: 250 # Number of sharded database records per batch aggregation. Must not be greater # than the equivalent setting in the collection job driver. (required) batch_aggregation_shard_count: 32 + +taskprov_config: + collector_hpke_config: + id: 183 + kem_id: X25519HkdfSha256 + kdf_id: HkdfSha256 + aead_id: Aes128Gcm + public_key: 4qiv6IY5jrjCV3xbaQXULmPIpvoIml1oJmeXm-yOuAo + tolerable_clock_skew: 60 diff --git a/interop_binaries/config/janus_interop_aggregator.yaml b/interop_binaries/config/janus_interop_aggregator.yaml index 27ceeeb38..49715210a 100644 --- a/interop_binaries/config/janus_interop_aggregator.yaml +++ b/interop_binaries/config/janus_interop_aggregator.yaml @@ -7,3 +7,11 @@ health_check_listen_address: 0.0.0.0:8000 logging_config: force_json_output: true listen_address: 0.0.0.0:8080 +taskprov_config: + collector_hpke_config: + id: 183 + kem_id: X25519HkdfSha256 + kdf_id: HkdfSha256 + aead_id: Aes128Gcm + public_key: 4qiv6IY5jrjCV3xbaQXULmPIpvoIml1oJmeXm-yOuAo + tolerable_clock_skew: 60 diff --git a/interop_binaries/src/bin/janus_interop_aggregator.rs b/interop_binaries/src/bin/janus_interop_aggregator.rs index ffff95972..6d27a98d3 100644 --- a/interop_binaries/src/bin/janus_interop_aggregator.rs +++ b/interop_binaries/src/bin/janus_interop_aggregator.rs @@ -4,7 +4,8 @@ use clap::Parser; use janus_aggregator::{ aggregator::{self, http_handlers::aggregator_handler}, binary_utils::{janus_main, BinaryOptions, CommonBinaryOptions}, - config::{BinaryConfig, CommonConfig}, + cache::GlobalHpkeKeypairCache, + config::{BinaryConfig, CommonConfig, TaskprovConfig}, }; use janus_aggregator_core::{ datastore::{models::HpkeKeyState, Datastore}, @@ -12,7 +13,9 @@ use janus_aggregator_core::{ SecretBytes, }; use janus_core::{ - hpke::generate_hpke_config_and_private_key, task::AuthenticationToken, time::RealClock, + hpke::generate_hpke_config_and_private_key, + task::AuthenticationToken, + time::{DurationExt, RealClock}, }; use janus_interop_binaries::{ status::{ERROR, SUCCESS}, @@ -24,6 +27,7 @@ use janus_messages::{ }; use opentelemetry::metrics::Meter; use prio::codec::Decode; +use rand::random; use serde::{Deserialize, Serialize}; use sqlx::{migrate::Migrator, Connection, PgConnection}; use std::{ @@ -161,7 +165,21 @@ async fn make_handler( max_upload_batch_size: 100, max_upload_batch_write_delay: std::time::Duration::from_millis(100), batch_aggregation_shard_count: 32, - ..Default::default() + global_hpke_configs_refresh_interval: GlobalHpkeKeypairCache::DEFAULT_REFRESH_INTERVAL, + + // TODO(janus-ops#991): Give these taskprov parameters actual values to facilitiate an E2E test. + collector_hpke_config: generate_hpke_config_and_private_key( + HpkeConfigId::from(1), + HpkeKemId::X25519HkdfSha256, + HpkeKdfId::HkdfSha256, + HpkeAeadId::Aes128Gcm, + ) + .config() + .clone(), + report_expiry_age: None, + tolerable_clock_skew: Duration::from_minutes(60).unwrap(), + verify_key_init: random(), + auth_tokens: Vec::new(), }, ) .await?; @@ -263,6 +281,8 @@ struct Config { /// Path prefix, e.g. `/dap/`, to serve DAP from. #[serde(default = "default_dap_serving_prefix")] dap_serving_prefix: String, + + taskprov_config: TaskprovConfig, } impl BinaryConfig for Config {