From fde4e092e215ec5f5d6293a34f62682d2f0dbc9f Mon Sep 17 00:00:00 2001 From: Ameer Ghani Date: Thu, 21 Sep 2023 16:30:35 -0400 Subject: [PATCH] Specialize taskprov to only one peer aggregator --- aggregator/src/aggregator.rs | 143 ++++----- aggregator/src/aggregator/taskprov_tests.rs | 26 +- aggregator/src/bin/aggregator.rs | 156 ++++++++- aggregator/src/cache.rs | 37 +-- aggregator/src/config.rs | 13 + aggregator_api/src/lib.rs | 12 - aggregator_api/src/models.rs | 41 --- aggregator_api/src/routes.rs | 85 +---- aggregator_api/src/tests.rs | 225 +------------ aggregator_core/src/datastore.rs | 335 +------------------- aggregator_core/src/datastore/tests.rs | 102 ------ aggregator_core/src/taskprov.rs | 240 +------------- db/00000000000003_taskprov_sub01.down.sql | 38 +++ db/00000000000003_taskprov_sub01.up.sql | 2 + 14 files changed, 287 insertions(+), 1168 deletions(-) create mode 100644 db/00000000000003_taskprov_sub01.down.sql create mode 100644 db/00000000000003_taskprov_sub01.up.sql diff --git a/aggregator/src/aggregator.rs b/aggregator/src/aggregator.rs index b83eb7adf..eb43de03f 100644 --- a/aggregator/src/aggregator.rs +++ b/aggregator/src/aggregator.rs @@ -8,7 +8,7 @@ use crate::{ query_type::{CollectableQueryType, UploadableQueryType}, report_writer::{ReportWriteBatcher, WritableReport}, }, - cache::{GlobalHpkeKeypairCache, PeerAggregatorCache}, + cache::GlobalHpkeKeypairCache, Operation, }; use bytes::Bytes; @@ -27,12 +27,15 @@ use janus_aggregator_core::{ }, query_type::AccumulableQueryType, task::{self, Task, VerifyKey}, - taskprov::{self, PeerAggregator}, + taskprov::{self, VerifyKeyInit}, }; #[cfg(feature = "test-util")] use janus_core::test_util::dummy_vdaf; use janus_core::{ - hpke::{self, aggregate_share_aad, input_share_aad, HpkeApplicationInfo, HpkeKeypair, Label}, + hpke::{ + self, aggregate_share_aad, generate_hpke_config_and_private_key, input_share_aad, + HpkeApplicationInfo, HpkeKeypair, Label, + }, http::response_to_problem_details, task::{AuthenticationToken, VdafInstance, PRIO3_VERIFY_KEY_LENGTH}, time::{Clock, DurationExt, IntervalExt, TimeExt}, @@ -43,8 +46,9 @@ use janus_messages::{ taskprov::TaskConfig, AggregateContinueReq, AggregateContinueResp, AggregateInitializeReq, AggregateInitializeResp, AggregateShareReq, AggregateShareResp, BatchSelector, CollectReq, CollectResp, CollectionJobId, - Duration, HpkeCiphertext, HpkeConfig, Interval, PartialBatchSelector, PrepareStep, - PrepareStepResult, Report, ReportIdChecksum, ReportShare, ReportShareError, Role, TaskId, + Duration, HpkeAeadId, HpkeCiphertext, HpkeConfig, HpkeConfigId, HpkeKdfId, HpkeKemId, Interval, + PartialBatchSelector, PrepareStep, PrepareStepResult, Report, ReportIdChecksum, ReportShare, + ReportShareError, Role, TaskId, }; use opentelemetry::{ metrics::{Counter, Histogram, Meter, Unit}, @@ -157,13 +161,10 @@ pub struct Aggregator { /// Cache of global HPKE keypairs and configs. global_hpke_keypairs: GlobalHpkeKeypairCache, - - /// Cache of taskprov peer aggregators. - peer_aggregators: PeerAggregatorCache, } /// Config represents a configuration for an Aggregator. -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct Config { /// Defines the maximum size of a batch of uploaded reports which will be written in a single /// transaction. @@ -182,8 +183,23 @@ pub struct Config { /// Defines how often to refresh the global HPKE configs cache. This affects how often an aggregator /// becomes aware of key state changes. pub global_hpke_configs_refresh_interval: StdDuration, + + /// Collection results will be encrypted to this public key. + pub collector_hpke_config: HpkeConfig, + + /// New tasks will have this report expiration age. + pub report_expiry_age: Option, + + /// New tasks will have this tolerable clock skew. + pub tolerable_clock_skew: Duration, + + /// Defines the key used to deterministically derive the VDAF verify key for new tasks. + pub verify_key_init: VerifyKeyInit, } +// subscriber-01 only: the config now has mandatory fields, so default only makes sense as a helper +// impl inside unit tests. +#[cfg(feature = "test-util")] impl Default for Config { fn default() -> Self { Self { @@ -191,6 +207,17 @@ impl Default for Config { max_upload_batch_write_delay: StdDuration::ZERO, batch_aggregation_shard_count: 1, global_hpke_configs_refresh_interval: GlobalHpkeKeypairCache::DEFAULT_REFRESH_INTERVAL, + collector_hpke_config: generate_hpke_config_and_private_key( + HpkeConfigId::from(1), + HpkeKemId::X25519HkdfSha256, + HpkeKdfId::HkdfSha256, + HpkeAeadId::Aes128Gcm, + ) + .config() + .clone(), + report_expiry_age: None, + tolerable_clock_skew: Duration::from_minutes(60).unwrap(), + verify_key_init: random(), } } } @@ -231,8 +258,6 @@ impl Aggregator { ) .await?; - let peer_aggregators = PeerAggregatorCache::new(&datastore).await?; - Ok(Self { datastore, clock, @@ -243,7 +268,6 @@ impl Aggregator { upload_decode_failure_counter, aggregate_step_failure_counter, global_hpke_keypairs, - peer_aggregators, }) } @@ -298,13 +322,8 @@ impl Aggregator { task_aggregator } None if taskprov_task_config.is_some() => { - self.taskprov_opt_in( - &Role::Leader, - task_id, - taskprov_task_config.unwrap(), - auth_token.as_ref(), - ) - .await?; + self.taskprov_opt_in(&Role::Leader, task_id, taskprov_task_config.unwrap()) + .await?; // Retry fetching the aggregator, since the last function would have just inserted // its task. @@ -347,13 +366,8 @@ impl Aggregator { } if taskprov_task_config.is_some() { - self.taskprov_authorize_request( - &Role::Leader, - task_id, - taskprov_task_config.unwrap(), - auth_token.as_ref(), - ) - .await?; + self.taskprov_authorize_request(&Role::Leader, task_id, taskprov_task_config.unwrap()) + .await?; } else if !auth_token .map(|t| task_aggregator.task.check_aggregator_auth_token(&t)) .unwrap_or(false) @@ -478,16 +492,9 @@ impl Aggregator { // Authorize the request and retrieve the collector's HPKE config. If this is a taskprov task, we // have to use the peer aggregator's collector config rather than the main task. let collector_hpke_config = if taskprov_task_config.is_some() { - let (peer_aggregator, _) = self - .taskprov_authorize_request( - &Role::Leader, - task_id, - taskprov_task_config.unwrap(), - auth_token.as_ref(), - ) + self.taskprov_authorize_request(&Role::Leader, task_id, taskprov_task_config.unwrap()) .await?; - - peer_aggregator.collector_hpke_config() + &self.cfg.collector_hpke_config } else { if !auth_token .map(|t| task_aggregator.task.check_aggregator_auth_token(&t)) @@ -561,18 +568,22 @@ impl Aggregator { } /// Opts in or out of a taskprov task. - #[tracing::instrument(skip(self, aggregator_auth_token), err)] + #[tracing::instrument(skip(self), err)] async fn taskprov_opt_in( &self, peer_role: &Role, task_id: &TaskId, task_config: &TaskConfig, - aggregator_auth_token: Option<&AuthenticationToken>, ) -> Result<(), Error> { - let (peer_aggregator, aggregator_urls) = self - .taskprov_authorize_request(peer_role, task_id, task_config, aggregator_auth_token) + self.taskprov_authorize_request(peer_role, task_id, task_config) .await?; + let aggregator_urls = task_config + .aggregator_endpoints() + .iter() + .map(|url| url.try_into()) + .collect::, _>>()?; + // TODO(#1647): Check whether task config parameters are acceptable for privacy and // availability of the system. @@ -595,8 +606,10 @@ impl Aggregator { } }; - let vdaf_verify_keys = - Vec::from([peer_aggregator.derive_vdaf_verify_key(task_id, &vdaf_instance)]); + let vdaf_verify_keys = Vec::from([self + .cfg + .verify_key_init + .derive_vdaf_verify_key(task_id, &vdaf_instance)]); let task = taskprov::Task::new( *task_id, @@ -607,10 +620,10 @@ impl Aggregator { vdaf_verify_keys, task_config.query_config().max_batch_query_count() as u64, Some(*task_config.task_expiration()), - peer_aggregator.report_expiry_age().cloned(), + self.cfg.report_expiry_age.clone(), task_config.query_config().min_batch_size() as u64, *task_config.query_config().time_precision(), - *peer_aggregator.tolerable_clock_skew(), + self.cfg.tolerable_clock_skew, ) .map_err(|err| Error::InvalidTask(*task_id, OptOutReason::TaskParameters(err)))?; self.datastore @@ -636,60 +649,26 @@ impl Aggregator { } })?; - info!(?task, ?peer_aggregator, "taskprov: opted into new task"); + info!(?task, "taskprov: opted into new task"); Ok(()) } /// Validate and authorize a taskprov request. Returns values necessary for determining whether /// we can opt into the task. This function might return an opt-out error for conditions that /// are relevant for all DAP workflows (e.g. task expiration). - #[tracing::instrument(skip(self, aggregator_auth_token), err)] + #[tracing::instrument(skip(self), err)] async fn taskprov_authorize_request( &self, peer_role: &Role, task_id: &TaskId, task_config: &TaskConfig, - aggregator_auth_token: Option<&AuthenticationToken>, - ) -> Result<(&PeerAggregator, Vec), Error> { - let aggregator_urls = task_config - .aggregator_endpoints() - .iter() - .map(|url| url.try_into()) - .collect::, _>>()?; - if aggregator_urls.len() < 2 { - return Err(Error::UnrecognizedMessage( - Some(*task_id), - "taskprov configuration is missing one or both aggregators", - )); - } - let peer_aggregator_url = &aggregator_urls[peer_role.index().unwrap()]; - - let peer_aggregator = self - .peer_aggregators - .get(peer_aggregator_url, peer_role) - .ok_or(Error::InvalidTask( - *task_id, - OptOutReason::NoSuchPeer(*peer_role), - ))?; - - if !aggregator_auth_token - .map(|t| peer_aggregator.check_aggregator_auth_token(t)) - .unwrap_or(false) - { - return Err(Error::UnauthorizedRequest(*task_id)); - } - + ) -> Result<(), Error> { if self.clock.now() > *task_config.task_expiration() { return Err(Error::InvalidTask(*task_id, OptOutReason::TaskExpired)); } - debug!( - ?task_id, - ?task_config, - ?peer_aggregator, - "taskprov: authorized request" - ); - Ok((peer_aggregator, aggregator_urls)) + debug!(?task_id, ?task_config, "taskprov: authorized request"); + Ok(()) } #[cfg(feature = "test-util")] diff --git a/aggregator/src/aggregator/taskprov_tests.rs b/aggregator/src/aggregator/taskprov_tests.rs index f46231195..56a335c90 100644 --- a/aggregator/src/aggregator/taskprov_tests.rs +++ b/aggregator/src/aggregator/taskprov_tests.rs @@ -18,7 +18,6 @@ use janus_aggregator_core::{ Datastore, }, task::{QueryType, Task}, - taskprov::{test_util::PeerAggregatorBuilder, PeerAggregator}, test_util::noop_meter, }; use janus_core::{ @@ -66,7 +65,6 @@ pub struct TaskprovTestCase { collector_hpke_keypair: HpkeKeypair, datastore: Arc>, handler: Box, - peer_aggregator: PeerAggregator, report_metadata: ReportMetadata, transcript: VdafTranscript<16, TestVdaf>, report_share: ReportShare, @@ -84,32 +82,29 @@ async fn setup_taskprov_test() -> TaskprovTestCase { let global_hpke_key = generate_test_hpke_config_and_private_key(); let collector_hpke_keypair = generate_test_hpke_config_and_private_key(); - let peer_aggregator = PeerAggregatorBuilder::new() - .with_endpoint(url::Url::parse("https://leader.example.com/").unwrap()) - .with_role(Role::Leader) - .with_collector_hpke_config(collector_hpke_keypair.config().clone()) - .build(); datastore .run_tx(|tx| { let global_hpke_key = global_hpke_key.clone(); - let peer_aggregator = peer_aggregator.clone(); Box::pin(async move { tx.put_global_hpke_keypair(&global_hpke_key).await.unwrap(); - tx.put_taskprov_peer_aggregator(&peer_aggregator) - .await - .unwrap(); Ok(()) }) }) .await .unwrap(); + let config = Config { + collector_hpke_config: collector_hpke_keypair.config().clone(), + verify_key_init: random(), + ..Default::default() + }; + let handler = aggregator_handler( Arc::clone(&datastore), clock.clone(), &noop_meter(), - Config::default(), + config.clone(), ) .await .unwrap(); @@ -144,7 +139,9 @@ async fn setup_taskprov_test() -> TaskprovTestCase { let task_id = TaskId::try_from(digest(&SHA256, &task_config_encoded).as_ref()).unwrap(); let vdaf_instance = task_config.vdaf_config().vdaf_type().try_into().unwrap(); - let vdaf_verify_key = peer_aggregator.derive_vdaf_verify_key(&task_id, &vdaf_instance); + let vdaf_verify_key = config + .verify_key_init + .derive_vdaf_verify_key(&task_id, &vdaf_instance); let task = janus_aggregator_core::taskprov::Task::new( task_id, @@ -161,7 +158,7 @@ async fn setup_taskprov_test() -> TaskprovTestCase { Vec::from([vdaf_verify_key.clone()]), max_batch_query_count as u64, Some(task_expiration), - peer_aggregator.report_expiry_age().copied(), + config.report_expiry_age.clone(), min_batch_size as u64, Duration::from_seconds(1), Duration::from_seconds(1), @@ -197,7 +194,6 @@ async fn setup_taskprov_test() -> TaskprovTestCase { collector_hpke_keypair, datastore, handler: Box::new(handler), - peer_aggregator, task: task.into(), task_config, task_id, diff --git a/aggregator/src/bin/aggregator.rs b/aggregator/src/bin/aggregator.rs index 2c7b90a90..f96c72895 100644 --- a/aggregator/src/bin/aggregator.rs +++ b/aggregator/src/bin/aggregator.rs @@ -1,4 +1,5 @@ use anyhow::{Context, Result}; +use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; use clap::Parser; use janus_aggregator::{ aggregator::{self, garbage_collector::GarbageCollector, http_handlers::aggregator_handler}, @@ -7,10 +8,10 @@ use janus_aggregator::{ CommonBinaryOptions, }, cache::GlobalHpkeKeypairCache, - config::{BinaryConfig, CommonConfig}, + config::{BinaryConfig, CommonConfig, TaskprovConfig}, }; use janus_aggregator_api::{self, aggregator_api_handler}; -use janus_aggregator_core::datastore::Datastore; +use janus_aggregator_core::{datastore::Datastore, taskprov::VerifyKeyInit}; use janus_core::{task::AuthenticationToken, time::RealClock}; use serde::{Deserialize, Serialize}; use std::{ @@ -49,7 +50,7 @@ async fn main() -> Result<()> { Arc::clone(&datastore), clock, &meter, - config.aggregator_config(), + config.aggregator_config(options.verify_key_init), ) .await?, None, @@ -192,6 +193,32 @@ struct Options { help = "aggregator API auth tokens, encoded in base64 then comma-separated" )] aggregator_api_auth_tokens: Vec, + + /// Tokens for authenticating DAP operations. + #[clap( + long, + env = "AGGREGATOR_AUTH_TOKENS", + hide_env_values = true, + num_args = 0..=1, + use_value_delimiter = true, + help = "DAP aggregator tokens, encoded in base64 then comma-separated" + )] + aggregator_auth_tokens: Vec, + + /// Taskprov verify key init, used to derive per-task VDAF verify keys. + #[clap( + long, + value_parser = parse_verify_key_init, + env = "VERIFY_KEY_INIT", + hide_env_values = true, + help = "Taskprov verify key init, encoded in base64" + )] + verify_key_init: VerifyKeyInit, +} + +fn parse_verify_key_init(arg: &str) -> Result { + let result = VerifyKeyInit::try_from(URL_SAFE_NO_PAD.decode(arg)?.as_ref())?; + Ok(result) } impl BinaryOptions for Options { @@ -331,6 +358,8 @@ struct Config { /// specify this. #[serde(default)] global_hpke_configs_refresh_interval: Option, + + taskprov_config: TaskprovConfig, } #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] @@ -364,7 +393,7 @@ impl Config { .collect() } - fn aggregator_config(&self) -> aggregator::Config { + fn aggregator_config(&self, verify_key_init: VerifyKeyInit) -> aggregator::Config { aggregator::Config { max_upload_batch_size: self.max_upload_batch_size, max_upload_batch_write_delay: Duration::from_millis( @@ -375,6 +404,10 @@ impl Config { Some(duration) => Duration::from_millis(duration), None => GlobalHpkeKeypairCache::DEFAULT_REFRESH_INTERVAL, }, + collector_hpke_config: self.taskprov_config.collector_hpke_config.clone(), + report_expiry_age: self.taskprov_config.report_expiry_age, + tolerable_clock_skew: self.taskprov_config.tolerable_clock_skew, + verify_key_init, } } } @@ -392,19 +425,28 @@ impl BinaryConfig for Config { #[cfg(test)] mod tests { use super::{AggregatorApi, Config, GarbageCollectorConfig, HeaderEntry, Options}; + use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; use clap::CommandFactory; use janus_aggregator::{ aggregator, config::{ test_util::{generate_db_config, generate_metrics_config, generate_trace_config}, - BinaryConfig, CommonConfig, + BinaryConfig, CommonConfig, TaskprovConfig, }, metrics::{MetricsExporterConfiguration, OtlpExporterConfiguration}, trace::{ OpenTelemetryTraceConfiguration, OtlpTraceConfiguration, TokioConsoleConfiguration, }, }; - use janus_core::test_util::roundtrip_encoding; + use janus_aggregator_core::taskprov::VerifyKeyInit; + use janus_core::{ + hpke::test_util::generate_test_hpke_config_and_private_key, test_util::roundtrip_encoding, + time::DurationExt, + }; + use janus_messages::{ + HpkeAeadId, HpkeConfig, HpkeConfigId, HpkeKdfId, HpkeKemId, HpkePublicKey, + }; + use rand::random; use std::{ collections::HashMap, net::{IpAddr, Ipv4Addr, SocketAddr}, @@ -450,6 +492,11 @@ mod tests { max_upload_batch_write_delay_ms: 250, batch_aggregation_shard_count: 32, global_hpke_configs_refresh_interval: None, + taskprov_config: TaskprovConfig { + collector_hpke_config: generate_test_hpke_config_and_private_key().config().clone(), + report_expiry_age: None, + tolerable_clock_skew: janus_messages::Duration::from_seconds(60), + }, }) } @@ -465,6 +512,14 @@ mod tests { max_upload_batch_size: 100 max_upload_batch_write_delay_ms: 250 batch_aggregation_shard_count: 32 + taskprov_config: + collector_hpke_config: + id: 183 + kem_id: X25519HkdfSha256 + kdf_id: HkdfSha256 + aead_id: Aes128Gcm + public_key: 4qiv6IY5jrjCV3xbaQXULmPIpvoIml1oJmeXm-yOuAo + tolerable_clock_skew: 60 "# ) .unwrap() @@ -485,6 +540,14 @@ mod tests { max_upload_batch_size: 100 max_upload_batch_write_delay_ms: 250 batch_aggregation_shard_count: 32 + taskprov_config: + collector_hpke_config: + id: 183 + kem_id: X25519HkdfSha256 + kdf_id: HkdfSha256 + aead_id: Aes128Gcm + public_key: 4qiv6IY5jrjCV3xbaQXULmPIpvoIml1oJmeXm-yOuAo + tolerable_clock_skew: 60 garbage_collection: gc_frequency_s: 60 report_limit: 25 @@ -515,6 +578,14 @@ mod tests { max_upload_batch_size: 100 max_upload_batch_write_delay_ms: 250 batch_aggregation_shard_count: 32 + taskprov_config: + collector_hpke_config: + id: 183 + kem_id: X25519HkdfSha256 + kdf_id: HkdfSha256 + aead_id: Aes128Gcm + public_key: 4qiv6IY5jrjCV3xbaQXULmPIpvoIml1oJmeXm-yOuAo + tolerable_clock_skew: 60 aggregator_api: listen_address: "0.0.0.0:8081" public_dap_url: "https://dap.url" @@ -541,6 +612,14 @@ mod tests { max_upload_batch_size: 100 max_upload_batch_write_delay_ms: 250 batch_aggregation_shard_count: 32 + taskprov_config: + collector_hpke_config: + id: 183 + kem_id: X25519HkdfSha256 + kdf_id: HkdfSha256 + aead_id: Aes128Gcm + public_key: 4qiv6IY5jrjCV3xbaQXULmPIpvoIml1oJmeXm-yOuAo + tolerable_clock_skew: 60 aggregator_api: path_prefix: "aggregator-api" public_dap_url: "https://dap.url" @@ -574,6 +653,14 @@ mod tests { max_upload_batch_size: 100 max_upload_batch_write_delay_ms: 250 batch_aggregation_shard_count: 32 + taskprov_config: + collector_hpke_config: + id: 183 + kem_id: X25519HkdfSha256 + kdf_id: HkdfSha256 + aead_id: Aes128Gcm + public_key: 4qiv6IY5jrjCV3xbaQXULmPIpvoIml1oJmeXm-yOuAo + tolerable_clock_skew: 60 "# ) .unwrap() @@ -589,6 +676,7 @@ mod tests { }, ); + let verify_key_init: VerifyKeyInit = random(); assert_eq!( serde_yaml::from_str::( r#"--- @@ -604,14 +692,36 @@ mod tests { max_upload_batch_size: 100 max_upload_batch_write_delay_ms: 250 batch_aggregation_shard_count: 32 + taskprov_config: + collector_hpke_config: + id: 183 + kem_id: X25519HkdfSha256 + kdf_id: HkdfSha256 + aead_id: Aes128Gcm + public_key: 4qiv6IY5jrjCV3xbaQXULmPIpvoIml1oJmeXm-yOuAo + tolerable_clock_skew: 60 "# ) .unwrap() - .aggregator_config(), + .aggregator_config(verify_key_init.clone()), aggregator::Config { max_upload_batch_size: 100, max_upload_batch_write_delay: Duration::from_millis(250), batch_aggregation_shard_count: 32, + collector_hpke_config: HpkeConfig::new( + HpkeConfigId::from(183), + HpkeKemId::X25519HkdfSha256, + HpkeKdfId::HkdfSha256, + HpkeAeadId::Aes128Gcm, + HpkePublicKey::from( + URL_SAFE_NO_PAD + .decode("4qiv6IY5jrjCV3xbaQXULmPIpvoIml1oJmeXm-yOuAo") + .unwrap() + ), + ), + report_expiry_age: None, + tolerable_clock_skew: janus_messages::Duration::from_minutes(60).unwrap(), + verify_key_init, ..Default::default() } ); @@ -631,6 +741,14 @@ mod tests { max_upload_batch_size: 100 max_upload_batch_write_delay_ms: 250 batch_aggregation_shard_count: 32 + taskprov_config: + collector_hpke_config: + id: 183 + kem_id: X25519HkdfSha256 + kdf_id: HkdfSha256 + aead_id: Aes128Gcm + public_key: 4qiv6IY5jrjCV3xbaQXULmPIpvoIml1oJmeXm-yOuAo + tolerable_clock_skew: 60 "# ) .unwrap() @@ -662,6 +780,14 @@ mod tests { max_upload_batch_size: 100 max_upload_batch_write_delay_ms: 250 batch_aggregation_shard_count: 32 + taskprov_config: + collector_hpke_config: + id: 183 + kem_id: X25519HkdfSha256 + kdf_id: HkdfSha256 + aead_id: Aes128Gcm + public_key: 4qiv6IY5jrjCV3xbaQXULmPIpvoIml1oJmeXm-yOuAo + tolerable_clock_skew: 60 "# ) .unwrap() @@ -695,6 +821,14 @@ mod tests { max_upload_batch_size: 100 max_upload_batch_write_delay_ms: 250 batch_aggregation_shard_count: 32 + taskprov_config: + collector_hpke_config: + id: 183 + kem_id: X25519HkdfSha256 + kdf_id: HkdfSha256 + aead_id: Aes128Gcm + public_key: 4qiv6IY5jrjCV3xbaQXULmPIpvoIml1oJmeXm-yOuAo + tolerable_clock_skew: 60 "# ) .unwrap() @@ -725,6 +859,14 @@ mod tests { max_upload_batch_size: 100 max_upload_batch_write_delay_ms: 250 batch_aggregation_shard_count: 32 + taskprov_config: + collector_hpke_config: + id: 183 + kem_id: X25519HkdfSha256 + kdf_id: HkdfSha256 + aead_id: Aes128Gcm + public_key: 4qiv6IY5jrjCV3xbaQXULmPIpvoIml1oJmeXm-yOuAo + tolerable_clock_skew: 60 "# ) .unwrap() diff --git a/aggregator/src/cache.rs b/aggregator/src/cache.rs index c4cd4c5da..bdeac444c 100644 --- a/aggregator/src/cache.rs +++ b/aggregator/src/cache.rs @@ -1,12 +1,9 @@ //! Various in-memory caches that can be used by an aggregator. use crate::aggregator::Error; -use janus_aggregator_core::{ - datastore::{models::HpkeKeyState, Datastore}, - taskprov::PeerAggregator, -}; +use janus_aggregator_core::datastore::{models::HpkeKeyState, Datastore}; use janus_core::{hpke::HpkeKeypair, time::Clock}; -use janus_messages::{HpkeConfig, HpkeConfigId, Role}; +use janus_messages::{HpkeConfig, HpkeConfigId}; use std::{ collections::HashMap, fmt::Debug, @@ -15,7 +12,6 @@ use std::{ }; use tokio::{spawn, task::JoinHandle, time::sleep}; use tracing::{debug, error}; -use url::Url; type HpkeConfigs = Arc>; type HpkeKeypairs = HashMap>; @@ -141,32 +137,3 @@ impl Drop for GlobalHpkeKeypairCache { self.refresh_handle.abort() } } - -/// Caches taskprov [`PeerAggregator`]'s. This cache is never invalidated, so the process needs to -/// be restarted if there are any changes to peer aggregators. -#[derive(Debug)] -pub struct PeerAggregatorCache { - peers: Vec, -} - -impl PeerAggregatorCache { - pub async fn new(datastore: &Datastore) -> Result { - Ok(Self { - peers: datastore - .run_tx_with_name("refresh_peer_aggregators_cache", |tx| { - Box::pin(async move { tx.get_taskprov_peer_aggregators().await }) - }) - .await? - .into_iter() - .collect(), - }) - } - - pub fn get(&self, endpoint: &Url, role: &Role) -> Option<&PeerAggregator> { - // The peer aggregator table is unlikely to be more than a few entries long (1-2 entries), - // so a linear search should be fine. - self.peers - .iter() - .find(|peer| peer.endpoint() == endpoint && peer.role() == role) - } -} diff --git a/aggregator/src/config.rs b/aggregator/src/config.rs index 61783a7b6..f965d0b34 100644 --- a/aggregator/src/config.rs +++ b/aggregator/src/config.rs @@ -2,6 +2,7 @@ use crate::{metrics::MetricsConfiguration, trace::TraceConfiguration}; use derivative::Derivative; +use janus_messages::{Duration, HpkeConfig}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use std::{ fmt::Debug, @@ -127,6 +128,18 @@ pub struct JobDriverConfig { pub maximum_attempts_before_failure: usize, } +/// Configuration options for the Taskprov extension. This extension is +/// described in [draft-wang-ppm-dap-taskprov][spec], although its configuration +/// options are implementation-specific. +/// +/// [spec]: https://datatracker.ietf.org/doc/draft-wang-ppm-dap-taskprov/ +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct TaskprovConfig { + pub collector_hpke_config: HpkeConfig, + pub report_expiry_age: Option, + pub tolerable_clock_skew: Duration, +} + #[cfg(feature = "test-util")] #[cfg_attr(docsrs, doc(cfg(feature = "test-util")))] pub mod test_util { diff --git a/aggregator_api/src/lib.rs b/aggregator_api/src/lib.rs index de50cb36a..4f4f5af74 100644 --- a/aggregator_api/src/lib.rs +++ b/aggregator_api/src/lib.rs @@ -107,18 +107,6 @@ pub fn aggregator_api_handler(ds: Arc>, cfg: Config) -> i .delete( "/hpke_configs/:config_id", instrumented(api(delete_global_hpke_config::)), - ) - .get( - "/taskprov/peer_aggregators", - instrumented(api(get_taskprov_peer_aggregators::)), - ) - .post( - "/taskprov/peer_aggregators", - instrumented(api(post_taskprov_peer_aggregator::)), - ) - .delete( - "/taskprov/peer_aggregators", - instrumented(api(delete_taskprov_peer_aggregator::)), ), ) } diff --git a/aggregator_api/src/models.rs b/aggregator_api/src/models.rs index 84e103c0b..06d39b981 100644 --- a/aggregator_api/src/models.rs +++ b/aggregator_api/src/models.rs @@ -2,7 +2,6 @@ use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; use janus_aggregator_core::{ datastore::models::{GlobalHpkeKeypair, HpkeKeyState}, task::{QueryType, Task}, - taskprov::{PeerAggregator, VerifyKeyInit}, }; use janus_core::task::{AuthenticationToken, VdafInstance}; use janus_messages::{ @@ -224,43 +223,3 @@ pub(crate) struct PutGlobalHpkeConfigReq { pub(crate) struct PatchGlobalHpkeConfigReq { pub(crate) state: HpkeKeyState, } - -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] -pub(crate) struct TaskprovPeerAggregatorResp { - pub(crate) endpoint: Url, - pub(crate) role: Role, - pub(crate) collector_hpke_config: HpkeConfig, - pub(crate) report_expiry_age: Option, - pub(crate) tolerable_clock_skew: Duration, -} - -impl From for TaskprovPeerAggregatorResp { - fn from(value: PeerAggregator) -> Self { - // Exclude sensitive values. - Self { - endpoint: value.endpoint().clone(), - role: *value.role(), - collector_hpke_config: value.collector_hpke_config().clone(), - report_expiry_age: value.report_expiry_age().cloned(), - tolerable_clock_skew: *value.tolerable_clock_skew(), - } - } -} - -#[derive(Serialize, Deserialize)] -pub(crate) struct PostTaskprovPeerAggregatorReq { - pub(crate) endpoint: Url, - pub(crate) role: Role, - pub(crate) collector_hpke_config: HpkeConfig, - pub(crate) verify_key_init: VerifyKeyInit, - pub(crate) report_expiry_age: Option, - pub(crate) tolerable_clock_skew: Duration, - pub(crate) aggregator_auth_tokens: Vec, - pub(crate) collector_auth_tokens: Vec, -} - -#[derive(Clone, Serialize, Deserialize)] -pub(crate) struct DeleteTaskprovPeerAggregatorReq { - pub(crate) endpoint: Url, - pub(crate) role: Role, -} diff --git a/aggregator_api/src/routes.rs b/aggregator_api/src/routes.rs index 4ccf326c4..f56152e87 100644 --- a/aggregator_api/src/routes.rs +++ b/aggregator_api/src/routes.rs @@ -1,9 +1,8 @@ use crate::{ models::{ - AggregatorApiConfig, AggregatorRole, DeleteTaskprovPeerAggregatorReq, GetTaskIdsResp, - GetTaskMetricsResp, GlobalHpkeConfigResp, PatchGlobalHpkeConfigReq, PostTaskReq, - PostTaskprovPeerAggregatorReq, PutGlobalHpkeConfigReq, SupportedVdaf, TaskResp, - TaskprovPeerAggregatorResp, + AggregatorApiConfig, AggregatorRole, GetTaskIdsResp, GetTaskMetricsResp, + GlobalHpkeConfigResp, PatchGlobalHpkeConfigReq, PostTaskReq, PutGlobalHpkeConfigReq, + SupportedVdaf, TaskResp, }, Config, ConnExt, Error, }; @@ -11,7 +10,6 @@ use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; use janus_aggregator_core::{ datastore::{self, Datastore}, task::Task, - taskprov::PeerAggregator, SecretBytes, }; use janus_core::{hpke::generate_hpke_config_and_private_key, time::Clock}; @@ -370,80 +368,3 @@ pub(super) async fn delete_global_hpke_config( Err(err) => Err(err.into()), } } - -pub(super) async fn get_taskprov_peer_aggregators( - _: &mut Conn, - State(ds): State>>, -) -> Result>, Error> { - Ok(Json( - ds.run_tx_with_name("get_taskprov_peer_aggregators", |tx| { - Box::pin(async move { tx.get_taskprov_peer_aggregators().await }) - }) - .await? - .into_iter() - .map(TaskprovPeerAggregatorResp::from) - .collect::>(), - )) -} - -/// Inserts a new peer aggregator. Insertion is only supported, attempting to modify an existing -/// peer aggregator will fail. -/// -/// TODO(1685): Requiring that we delete an existing peer aggregator before we can change it makes -/// token rotation cumbersome and fragile. Since token rotation is the main use case for updating -/// an existing peer aggregator, we will resolve peer aggregator updates in that issue. -pub(super) async fn post_taskprov_peer_aggregator( - _: &mut Conn, - (State(ds), Json(req)): ( - State>>, - Json, - ), -) -> Result<(Status, Json), Error> { - let to_insert = PeerAggregator::new( - req.endpoint, - req.role, - req.verify_key_init, - req.collector_hpke_config, - req.report_expiry_age, - req.tolerable_clock_skew, - req.aggregator_auth_tokens, - req.collector_auth_tokens, - ); - - let inserted = ds - .run_tx_with_name("post_taskprov_peer_aggregator", |tx| { - let to_insert = to_insert.clone(); - Box::pin(async move { - tx.put_taskprov_peer_aggregator(&to_insert).await?; - tx.get_taskprov_peer_aggregator(to_insert.endpoint(), to_insert.role()) - .await - }) - }) - .await? - .map(TaskprovPeerAggregatorResp::from) - .ok_or_else(|| Error::Internal("Newly inserted peer aggregator disappeared".to_string()))?; - - Ok((Status::Created, Json(inserted))) -} - -pub(super) async fn delete_taskprov_peer_aggregator( - _: &mut Conn, - (State(ds), Json(req)): ( - State>>, - Json, - ), -) -> Result { - match ds - .run_tx_with_name("delete_taskprov_peer_aggregator", |tx| { - let req = req.clone(); - Box::pin(async move { - tx.delete_taskprov_peer_aggregator(&req.endpoint, &req.role) - .await - }) - }) - .await - { - Ok(_) | Err(datastore::Error::MutationTargetNotFound) => Ok(Status::NoContent), - Err(err) => Err(err.into()), - } -} diff --git a/aggregator_api/src/tests.rs b/aggregator_api/src/tests.rs index 6a59b4899..bd40bdfcd 100644 --- a/aggregator_api/src/tests.rs +++ b/aggregator_api/src/tests.rs @@ -1,9 +1,8 @@ use crate::{ aggregator_api_handler, models::{ - DeleteTaskprovPeerAggregatorReq, GetTaskIdsResp, GetTaskMetricsResp, GlobalHpkeConfigResp, - PatchGlobalHpkeConfigReq, PostTaskReq, PostTaskprovPeerAggregatorReq, - PutGlobalHpkeConfigReq, TaskResp, TaskprovPeerAggregatorResp, + GetTaskIdsResp, GetTaskMetricsResp, GlobalHpkeConfigResp, PatchGlobalHpkeConfigReq, + PostTaskReq, PutGlobalHpkeConfigReq, TaskResp, }, Config, CONTENT_TYPE, }; @@ -19,7 +18,6 @@ use janus_aggregator_core::{ Datastore, }, task::{test_util::TaskBuilder, QueryType, Task}, - taskprov::test_util::PeerAggregatorBuilder, SecretBytes, }; use janus_core::{ @@ -49,7 +47,6 @@ use trillium::{Handler, Status}; use trillium_testing::{ assert_response, assert_status, prelude::{delete, get, patch, post, put}, - Url, }; const AUTH_TOKEN: &str = "Y29sbGVjdG9yLWFiY2RlZjAw"; @@ -1245,224 +1242,6 @@ async fn delete_global_hpke_config() { ); } -#[tokio::test] -async fn get_taskprov_peer_aggregator() { - let (handler, _ephemeral_datastore, ds) = setup_api_test().await; - - let leader = PeerAggregatorBuilder::new() - .with_endpoint(Url::parse("https://leader.example.com/").unwrap()) - .with_role(Role::Leader) - .build(); - let helper = PeerAggregatorBuilder::new() - .with_endpoint(Url::parse("https://helper.example.com/").unwrap()) - .with_role(Role::Helper) - .build(); - - ds.run_tx(|tx| { - let leader = leader.clone(); - let helper = helper.clone(); - Box::pin(async move { - tx.put_taskprov_peer_aggregator(&leader).await?; - tx.put_taskprov_peer_aggregator(&helper).await?; - Ok(()) - }) - }) - .await - .unwrap(); - - // List all. - let mut conn = get("/taskprov/peer_aggregators") - .with_request_header("Authorization", format!("Bearer {AUTH_TOKEN}")) - .with_request_header("Accept", CONTENT_TYPE) - .with_request_header("Content-Type", CONTENT_TYPE) - .run_async(&handler) - .await; - assert_response!(conn, Status::Ok); - let mut resp: Vec = serde_json::from_slice( - &conn - .take_response_body() - .unwrap() - .into_bytes() - .await - .unwrap(), - ) - .unwrap(); - resp.sort_by(|a, b| a.endpoint.cmp(&b.endpoint)); - - let mut expected = vec![ - TaskprovPeerAggregatorResp { - endpoint: leader.endpoint().clone(), - role: *leader.role(), - collector_hpke_config: leader.collector_hpke_config().clone(), - report_expiry_age: leader.report_expiry_age().cloned(), - tolerable_clock_skew: *leader.tolerable_clock_skew(), - }, - TaskprovPeerAggregatorResp { - endpoint: helper.endpoint().clone(), - role: *helper.role(), - collector_hpke_config: helper.collector_hpke_config().clone(), - report_expiry_age: helper.report_expiry_age().cloned(), - tolerable_clock_skew: *helper.tolerable_clock_skew(), - }, - ]; - expected.sort_by(|a, b| a.endpoint.cmp(&b.endpoint)); - - assert_eq!(resp, expected); - - // Missing authorization. - assert_response!( - get("/taskprov/peer_aggregators") - .with_request_header("Accept", CONTENT_TYPE) - .with_request_header("Content-Type", CONTENT_TYPE) - .run_async(&handler) - .await, - Status::Unauthorized - ); -} - -#[tokio::test] -async fn post_taskprov_peer_aggregator() { - let (handler, _ephemeral_datastore, ds) = setup_api_test().await; - - let endpoint = Url::parse("https://leader.example.com/").unwrap(); - let leader = PeerAggregatorBuilder::new() - .with_endpoint(endpoint.clone()) - .with_role(Role::Leader) - .build(); - - let req = PostTaskprovPeerAggregatorReq { - endpoint, - role: Role::Leader, - collector_hpke_config: leader.collector_hpke_config().clone(), - verify_key_init: *leader.verify_key_init(), - report_expiry_age: leader.report_expiry_age().cloned(), - tolerable_clock_skew: *leader.tolerable_clock_skew(), - aggregator_auth_tokens: Vec::from(leader.aggregator_auth_tokens()), - collector_auth_tokens: Vec::from(leader.collector_auth_tokens()), - }; - - let mut conn = post("/taskprov/peer_aggregators") - .with_request_body(serde_json::to_vec(&req).unwrap()) - .with_request_header("Authorization", format!("Bearer {AUTH_TOKEN}")) - .with_request_header("Accept", CONTENT_TYPE) - .with_request_header("Content-Type", CONTENT_TYPE) - .run_async(&handler) - .await; - assert_response!(conn, Status::Created); - assert_eq!( - serde_json::from_slice::( - &conn - .take_response_body() - .unwrap() - .into_bytes() - .await - .unwrap(), - ) - .unwrap(), - leader.clone().into() - ); - - assert_eq!( - ds.run_tx(|tx| { Box::pin(async move { tx.get_taskprov_peer_aggregators().await }) }) - .await - .unwrap(), - vec![leader] - ); - - // Can't insert the same aggregator. - assert_response!( - post("/taskprov/peer_aggregators") - .with_request_body(serde_json::to_vec(&req).unwrap()) - .with_request_header("Authorization", format!("Bearer {AUTH_TOKEN}")) - .with_request_header("Accept", CONTENT_TYPE) - .with_request_header("Content-Type", CONTENT_TYPE) - .run_async(&handler) - .await, - Status::Conflict - ); - - // Missing authorization. - assert_response!( - post("/taskprov/peer_aggregators") - .with_request_body(serde_json::to_vec(&req).unwrap()) - .with_request_header("Accept", CONTENT_TYPE) - .with_request_header("Content-Type", CONTENT_TYPE) - .run_async(&handler) - .await, - Status::Unauthorized - ); -} - -#[tokio::test] -async fn delete_taskprov_peer_aggregator() { - let (handler, _ephemeral_datastore, ds) = setup_api_test().await; - - let endpoint = Url::parse("https://leader.example.com/").unwrap(); - let leader = PeerAggregatorBuilder::new() - .with_endpoint(endpoint.clone()) - .with_role(Role::Leader) - .build(); - - ds.run_tx(|tx| { - let leader = leader.clone(); - Box::pin(async move { tx.put_taskprov_peer_aggregator(&leader).await }) - }) - .await - .unwrap(); - - let req = DeleteTaskprovPeerAggregatorReq { - endpoint, - role: Role::Leader, - }; - - // Delete target. - assert_response!( - delete("/taskprov/peer_aggregators") - .with_request_body(serde_json::to_vec(&req).unwrap()) - .with_request_header("Authorization", format!("Bearer {AUTH_TOKEN}")) - .with_request_header("Accept", CONTENT_TYPE) - .with_request_header("Content-Type", CONTENT_TYPE) - .run_async(&handler) - .await, - Status::NoContent - ); - - assert_eq!( - ds.run_tx(|tx| { Box::pin(async move { tx.get_taskprov_peer_aggregators().await }) }) - .await - .unwrap(), - vec![] - ); - - // Non-existent target. - assert_response!( - delete("/taskprov/peer_aggregators") - .with_request_body( - serde_json::to_vec(&DeleteTaskprovPeerAggregatorReq { - endpoint: Url::parse("https://doesnt-exist.example.com/").unwrap(), - role: Role::Leader, - }) - .unwrap() - ) - .with_request_header("Authorization", format!("Bearer {AUTH_TOKEN}")) - .with_request_header("Accept", CONTENT_TYPE) - .with_request_header("Content-Type", CONTENT_TYPE) - .run_async(&handler) - .await, - Status::NoContent - ); - - // Missing authorization. - assert_response!( - delete("/taskprov/peer_aggregators") - .with_request_header("Accept", CONTENT_TYPE) - .with_request_header("Content-Type", CONTENT_TYPE) - .run_async(&handler) - .await, - Status::Unauthorized - ); -} - #[test] fn get_task_ids_resp_serialization() { assert_ser_tokens( diff --git a/aggregator_core/src/datastore.rs b/aggregator_core/src/datastore.rs index a63c8b854..04e033479 100644 --- a/aggregator_core/src/datastore.rs +++ b/aggregator_core/src/datastore.rs @@ -10,15 +10,14 @@ use self::models::{ use crate::{ query_type::{AccumulableQueryType, CollectableQueryType}, task::{self, Task}, - taskprov::{self, PeerAggregator}, - SecretBytes, + taskprov, SecretBytes, }; use anyhow::anyhow; use chrono::NaiveDateTime; use futures::future::try_join_all; use janus_core::{ hpke::{HpkeKeypair, HpkePrivateKey}, - task::{AuthenticationToken, VdafInstance}, + task::VdafInstance, time::{Clock, TimeExt}, }; use janus_messages::{ @@ -4418,336 +4417,6 @@ impl Transaction<'_, C> { .await?, ) } - - #[tracing::instrument(skip(self), err)] - pub async fn get_taskprov_peer_aggregators(&self) -> Result, Error> { - let stmt = self - .prepare_cached( - "SELECT id, endpoint, role, verify_key_init, collector_hpke_config, - report_expiry_age, tolerable_clock_skew - FROM taskprov_peer_aggregators", - ) - .await?; - let peer_aggregator_rows = self.query(&stmt, &[]); - - let stmt = self - .prepare_cached( - "SELECT (SELECT p.id FROM taskprov_peer_aggregators AS p - WHERE p.id = a.peer_aggregator_id) AS peer_id, - ord, type, token FROM taskprov_aggregator_auth_tokens AS a - ORDER BY ord ASC", - ) - .await?; - let aggregator_auth_token_rows = self.query(&stmt, &[]); - - let stmt = self - .prepare_cached( - "SELECT (SELECT p.id FROM taskprov_peer_aggregators AS p - WHERE p.id = a.peer_aggregator_id) AS peer_id, - ord, type, token FROM taskprov_collector_auth_tokens AS a - ORDER BY ord ASC", - ) - .await?; - let collector_auth_token_rows = self.query(&stmt, &[]); - - let (peer_aggregator_rows, aggregator_auth_token_rows, collector_auth_token_rows) = try_join!( - peer_aggregator_rows, - aggregator_auth_token_rows, - collector_auth_token_rows, - )?; - - let mut aggregator_auth_token_rows_by_peer_id: HashMap> = HashMap::new(); - for row in aggregator_auth_token_rows { - aggregator_auth_token_rows_by_peer_id - .entry(row.get("peer_id")) - .or_default() - .push(row); - } - - let mut collector_auth_token_rows_by_peer_id: HashMap> = HashMap::new(); - for row in collector_auth_token_rows { - collector_auth_token_rows_by_peer_id - .entry(row.get("peer_id")) - .or_default() - .push(row); - } - - peer_aggregator_rows - .into_iter() - .map(|row| (row.get("id"), row)) - .map(|(peer_id, peer_aggregator_row)| { - self.taskprov_peer_aggregator_from_rows( - &peer_aggregator_row, - &aggregator_auth_token_rows_by_peer_id - .remove(&peer_id) - .unwrap_or_default(), - &collector_auth_token_rows_by_peer_id - .remove(&peer_id) - .unwrap_or_default(), - ) - }) - .collect() - } - - #[tracing::instrument(skip(self), err)] - pub async fn get_taskprov_peer_aggregator( - &self, - aggregator_url: &Url, - role: &Role, - ) -> Result, Error> { - let aggregator_url = aggregator_url.as_str(); - let role = AggregatorRole::from_role(*role)?; - let params: &[&(dyn ToSql + Sync)] = &[&aggregator_url, &role]; - - let stmt = self - .prepare_cached( - "SELECT id, endpoint, role, verify_key_init, collector_hpke_config, - report_expiry_age, tolerable_clock_skew - FROM taskprov_peer_aggregators WHERE endpoint = $1 AND role = $2", - ) - .await?; - let peer_aggregator_row = self.query_opt(&stmt, params); - - let stmt = self - .prepare_cached( - "SELECT ord, type, token FROM taskprov_aggregator_auth_tokens - WHERE peer_aggregator_id = (SELECT id FROM taskprov_peer_aggregators - WHERE endpoint = $1 AND role = $2) - ORDER BY ord ASC", - ) - .await?; - let aggregator_auth_token_rows = self.query(&stmt, params); - - let stmt = self - .prepare_cached( - "SELECT ord, type, token FROM taskprov_collector_auth_tokens - WHERE peer_aggregator_id = (SELECT id FROM taskprov_peer_aggregators - WHERE endpoint = $1 AND role = $2) - ORDER BY ord ASC", - ) - .await?; - let collector_auth_token_rows = self.query(&stmt, params); - - let (peer_aggregator_row, aggregator_auth_token_rows, collector_auth_token_rows) = try_join!( - peer_aggregator_row, - aggregator_auth_token_rows, - collector_auth_token_rows, - )?; - peer_aggregator_row - .map(|peer_aggregator_row| { - self.taskprov_peer_aggregator_from_rows( - &peer_aggregator_row, - &aggregator_auth_token_rows, - &collector_auth_token_rows, - ) - }) - .transpose() - } - - fn taskprov_peer_aggregator_from_rows( - &self, - peer_aggregator_row: &Row, - aggregator_auth_token_rows: &[Row], - collector_auth_token_rows: &[Row], - ) -> Result { - let endpoint = Url::parse(peer_aggregator_row.get::<_, &str>("endpoint"))?; - let endpoint_bytes = endpoint.as_str().as_ref(); - let role: AggregatorRole = peer_aggregator_row.get("role"); - let report_expiry_age = peer_aggregator_row - .get_nullable_bigint_and_convert("report_expiry_age")? - .map(Duration::from_seconds); - let tolerable_clock_skew = Duration::from_seconds( - peer_aggregator_row.get_bigint_and_convert("tolerable_clock_skew")?, - ); - let collector_hpke_config = - HpkeConfig::get_decoded(peer_aggregator_row.get("collector_hpke_config"))?; - - let encrypted_verify_key_init: Vec = peer_aggregator_row.get("verify_key_init"); - let verify_key_init = self - .crypter - .decrypt( - "taskprov_peer_aggregator", - endpoint_bytes, - "verify_key_init", - &encrypted_verify_key_init, - )? - .as_slice() - .try_into()?; - - let decrypt_tokens = |rows: &[Row], table| -> Result, Error> { - rows.iter() - .map(|row| { - let ord: i64 = row.get("ord"); - let auth_token_type: AuthenticationTokenType = row.get("type"); - let encrypted_token: Vec = row.get("token"); - - let mut row_id = Vec::new(); - row_id.extend_from_slice(endpoint_bytes); - row_id.extend_from_slice(&role.as_role().get_encoded()); - row_id.extend_from_slice(&ord.to_be_bytes()); - - auth_token_type.as_authentication(&self.crypter.decrypt( - table, - &row_id, - "token", - &encrypted_token, - )?) - }) - .collect() - }; - - let aggregator_auth_tokens = decrypt_tokens( - aggregator_auth_token_rows, - "taskprov_aggregator_auth_tokens", - )?; - let collector_auth_tokens = - decrypt_tokens(collector_auth_token_rows, "taskprov_collector_auth_tokens")?; - - Ok(PeerAggregator::new( - endpoint, - role.as_role(), - verify_key_init, - collector_hpke_config, - report_expiry_age, - tolerable_clock_skew, - aggregator_auth_tokens, - collector_auth_tokens, - )) - } - - #[tracing::instrument(skip(self), err)] - pub async fn put_taskprov_peer_aggregator( - &self, - peer_aggregator: &PeerAggregator, - ) -> Result<(), Error> { - let endpoint = peer_aggregator.endpoint().as_str(); - let role = &AggregatorRole::from_role(*peer_aggregator.role())?; - let encrypted_verify_key_init = self.crypter.encrypt( - "taskprov_peer_aggregator", - endpoint.as_ref(), - "verify_key_init", - peer_aggregator.verify_key_init().as_ref(), - )?; - - let stmt = self - .prepare_cached( - "INSERT INTO taskprov_peer_aggregators ( - endpoint, role, verify_key_init, tolerable_clock_skew, report_expiry_age, - collector_hpke_config - ) VALUES ($1, $2, $3, $4, $5, $6) - ON CONFLICT DO NOTHING", - ) - .await?; - check_insert( - self.execute( - &stmt, - &[ - /* endpoint */ &endpoint, - /* role */ role, - /* verify_key_init */ &encrypted_verify_key_init, - /* tolerable_clock_skew */ - &i64::try_from(peer_aggregator.tolerable_clock_skew().as_seconds())?, - /* report_expiry_age */ - &peer_aggregator - .report_expiry_age() - .map(Duration::as_seconds) - .map(i64::try_from) - .transpose()?, - /* collector_hpke_config */ - &peer_aggregator.collector_hpke_config().get_encoded(), - ], - ) - .await?, - )?; - - let encrypt_tokens = |tokens: &[AuthenticationToken], table| -> Result<_, Error> { - let mut ords = Vec::new(); - let mut types = Vec::new(); - let mut encrypted_tokens = Vec::new(); - for (ord, token) in tokens.iter().enumerate() { - let ord = i64::try_from(ord)?; - - let mut row_id = Vec::new(); - row_id.extend_from_slice(endpoint.as_ref()); - row_id.extend_from_slice(&role.as_role().get_encoded()); - row_id.extend_from_slice(&ord.to_be_bytes()); - - let encrypted_auth_token = - self.crypter - .encrypt(table, &row_id, "token", token.as_ref())?; - - ords.push(ord); - types.push(AuthenticationTokenType::from(token)); - encrypted_tokens.push(encrypted_auth_token); - } - Ok((ords, types, encrypted_tokens)) - }; - - let (aggregator_auth_token_ords, aggregator_auth_token_types, aggregator_auth_tokens) = - encrypt_tokens( - peer_aggregator.aggregator_auth_tokens(), - "taskprov_aggregator_auth_tokens", - )?; - let stmt = self - .prepare_cached( - "INSERT INTO taskprov_aggregator_auth_tokens (peer_aggregator_id, ord, type, token) - SELECT - (SELECT id FROM taskprov_peer_aggregators WHERE endpoint = $1 AND role = $2), - * FROM UNNEST($3::BIGINT[], $4::AUTH_TOKEN_TYPE[], $5::BYTEA[])", - ) - .await?; - let aggregator_auth_tokens_params: &[&(dyn ToSql + Sync)] = &[ - /* endpoint */ &endpoint, - /* role */ role, - /* ords */ &aggregator_auth_token_ords, - /* token_types */ &aggregator_auth_token_types, - /* tokens */ &aggregator_auth_tokens, - ]; - let aggregator_auth_tokens_future = self.execute(&stmt, aggregator_auth_tokens_params); - - let (collector_auth_token_ords, collector_auth_token_types, collector_auth_tokens) = - encrypt_tokens( - peer_aggregator.collector_auth_tokens(), - "taskprov_collector_auth_tokens", - )?; - let stmt = self - .prepare_cached( - "INSERT INTO taskprov_collector_auth_tokens (peer_aggregator_id, ord, type, token) - SELECT - (SELECT id FROM taskprov_peer_aggregators WHERE endpoint = $1 AND role = $2), - * FROM UNNEST($3::BIGINT[], $4::AUTH_TOKEN_TYPE[], $5::BYTEA[])", - ) - .await?; - let collector_auth_tokens_params: &[&(dyn ToSql + Sync)] = &[ - /* endpoint */ &endpoint, - /* role */ role, - /* ords */ &collector_auth_token_ords, - /* token_types */ &collector_auth_token_types, - /* tokens */ &collector_auth_tokens, - ]; - let collector_auth_tokens_future = self.execute(&stmt, collector_auth_tokens_params); - - try_join!(aggregator_auth_tokens_future, collector_auth_tokens_future)?; - Ok(()) - } - - #[tracing::instrument(skip(self), err)] - pub async fn delete_taskprov_peer_aggregator( - &self, - aggregator_url: &Url, - role: &Role, - ) -> Result<(), Error> { - let aggregator_url = aggregator_url.as_str(); - let role = AggregatorRole::from_role(*role)?; - - // Deletion of other data implemented via ON DELETE CASCADE. - let stmt = self - .prepare_cached( - "DELETE FROM taskprov_peer_aggregators WHERE endpoint = $1 AND role = $2", - ) - .await?; - check_single_row_mutation(self.execute(&stmt, &[&aggregator_url, &role]).await?) - } } fn check_insert(row_count: u64) -> Result<(), Error> { diff --git a/aggregator_core/src/datastore/tests.rs b/aggregator_core/src/datastore/tests.rs index 58542d8ed..848bfaeed 100644 --- a/aggregator_core/src/datastore/tests.rs +++ b/aggregator_core/src/datastore/tests.rs @@ -16,7 +16,6 @@ use crate::{ }, query_type::CollectableQueryType, task::{self, test_util::TaskBuilder, Task}, - taskprov::test_util::PeerAggregatorBuilder, test_util::noop_meter, }; @@ -56,7 +55,6 @@ use std::{ time::Duration as StdDuration, }; use tokio::time::timeout; -use url::Url; const OLDEST_ALLOWED_REPORT_TIMESTAMP: Time = Time::from_seconds_since_epoch(1000); const REPORT_EXPIRY_AGE: Duration = Duration::from_seconds(1000); @@ -6727,103 +6725,3 @@ async fn roundtrip_global_hpke_keypair(ephemeral_datastore: EphemeralDatastore) .await .unwrap(); } - -#[rstest_reuse::apply(schema_versions_template)] -#[tokio::test] -async fn roundtrip_taskprov_peer_aggregator(ephemeral_datastore: EphemeralDatastore) { - install_test_trace_subscriber(); - let datastore = ephemeral_datastore.datastore(MockClock::default()).await; - - // Basic aggregator. - let example_leader_peer_aggregator = - PeerAggregatorBuilder::new().with_role(Role::Leader).build(); - let example_helper_peer_aggregator = PeerAggregatorBuilder::new() - .with_role(Role::Helper) - .with_aggregator_auth_tokens(vec![random(), random()]) - .with_collector_auth_tokens(vec![]) - .build(); - let another_example_leader_peer_aggregator = PeerAggregatorBuilder::new() - .with_endpoint(Url::parse("https://another.example.com/").unwrap()) - .with_aggregator_auth_tokens(vec![]) - .with_collector_auth_tokens(vec![random(), random()]) - .build(); - - datastore - .run_tx(|tx| { - let example_leader_peer_aggregator = example_leader_peer_aggregator.clone(); - let example_helper_peer_aggregator = example_helper_peer_aggregator.clone(); - let another_example_leader_peer_aggregator = - another_example_leader_peer_aggregator.clone(); - Box::pin(async move { - tx.put_taskprov_peer_aggregator(&example_leader_peer_aggregator) - .await?; - tx.put_taskprov_peer_aggregator(&example_helper_peer_aggregator) - .await?; - tx.put_taskprov_peer_aggregator(&another_example_leader_peer_aggregator) - .await?; - Ok(()) - }) - }) - .await - .unwrap(); - - // Should not be able to put an aggregator with the same endpoint and role. - assert_matches!( - datastore - .run_tx(|tx| { - Box::pin(async move { - let colliding_peer_aggregator = PeerAggregatorBuilder::new().build(); - tx.put_taskprov_peer_aggregator(&colliding_peer_aggregator) - .await - }) - }) - .await, - Err(Error::MutationTargetAlreadyExists) - ); - - datastore - .run_tx(|tx| { - let example_leader_peer_aggregator = example_leader_peer_aggregator.clone(); - let example_helper_peer_aggregator = example_helper_peer_aggregator.clone(); - let another_example_leader_peer_aggregator = - another_example_leader_peer_aggregator.clone(); - Box::pin(async move { - for peer in [ - example_leader_peer_aggregator.clone(), - example_helper_peer_aggregator.clone(), - another_example_leader_peer_aggregator.clone(), - ] { - assert_eq!( - tx.get_taskprov_peer_aggregator(peer.endpoint(), peer.role()) - .await - .unwrap(), - Some(peer.clone()), - ); - } - - assert_eq!( - tx.get_taskprov_peer_aggregators().await.unwrap(), - vec![ - example_leader_peer_aggregator.clone(), - example_helper_peer_aggregator.clone(), - another_example_leader_peer_aggregator.clone(), - ] - ); - - for peer in [ - example_leader_peer_aggregator.clone(), - example_helper_peer_aggregator.clone(), - another_example_leader_peer_aggregator.clone(), - ] { - tx.delete_taskprov_peer_aggregator(peer.endpoint(), peer.role()) - .await - .unwrap(); - } - assert_eq!(tx.get_taskprov_peer_aggregators().await.unwrap(), vec![]); - - Ok(()) - }) - }) - .await - .unwrap(); -} diff --git a/aggregator_core/src/taskprov.rs b/aggregator_core/src/taskprov.rs index fb8facfe6..7c88104ae 100644 --- a/aggregator_core/src/taskprov.rs +++ b/aggregator_core/src/taskprov.rs @@ -4,8 +4,8 @@ use crate::{ }; use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; use derivative::Derivative; -use janus_core::task::{AuthenticationToken, VdafInstance}; -use janus_messages::{Duration, HpkeConfig, Role, TaskId, Time}; +use janus_core::task::VdafInstance; +use janus_messages::{Duration, Role, TaskId, Time}; use lazy_static::lazy_static; use rand::{distributions::Standard, prelude::Distribution}; use ring::hkdf::{KeyType, Salt, HKDF_SHA256}; @@ -87,41 +87,6 @@ impl Distribution for Standard { } } -/// Represents another aggregator that is peered with our aggregator for taskprov purposes. Contains -/// data that needs to be identical between both aggregators for the taskprov flow to work. -#[derive(Debug, Clone, Derivative, PartialEq, Eq)] -pub struct PeerAggregator { - /// The URL at which the peer aggregator can be reached. This, along with `role`, is used to - /// uniquely represent the peer aggregator. - endpoint: Url, - - /// The role that the peer aggregator takes in DAP. Must be [`Role::Leader`] or [`Role::Helper`]. - /// This, along with `endpoint`, uniquely represents the peer aggregator. - role: Role, - - /// The preshared key used to derive the VDAF verify key for each task. - verify_key_init: VerifyKeyInit, - - // The HPKE configuration of the collector. This needs to be shared out-of-band with the peer - // aggregator. - collector_hpke_config: HpkeConfig, - - /// How long reports exist until they're eligible for GC. Set to None for no GC. This value is - /// copied into the definition for a provisioned task. - report_expiry_age: Option, - - /// The maximum allowable clock skew between peers. This value is copied into the definition for - /// a provisioned task. - tolerable_clock_skew: Duration, - - /// Auth tokens used for authenticating Leader to Helper requests. - aggregator_auth_tokens: Vec, - - /// Auth tokens used for authenticating Collector to Leader requests. It should be empty if the - /// peer aggregator is the Leader. - collector_auth_tokens: Vec, -} - lazy_static! { /// Salt generated by the SHA256 of the string 'dap-taskprov". See [taskprov section 3.2][1]. /// @@ -136,101 +101,7 @@ lazy_static! { ); } -impl PeerAggregator { - #[allow(clippy::too_many_arguments)] - pub fn new( - endpoint: Url, - role: Role, - verify_key_init: VerifyKeyInit, - collector_hpke_config: HpkeConfig, - report_expiry_age: Option, - tolerable_clock_skew: Duration, - aggregator_auth_tokens: Vec, - collector_auth_tokens: Vec, - ) -> Self { - Self { - endpoint, - role, - verify_key_init, - collector_hpke_config, - report_expiry_age, - tolerable_clock_skew, - aggregator_auth_tokens, - collector_auth_tokens, - } - } - - /// Retrieve the URL endpoint of the peer. - pub fn endpoint(&self) -> &Url { - &self.endpoint - } - - /// Retrieve the role of the peer. - pub fn role(&self) -> &Role { - &self.role - } - - /// Retrieve the VDAF verify key initialization parameter, used for derivation of the VDAF - /// verify key for a task. - pub fn verify_key_init(&self) -> &VerifyKeyInit { - &self.verify_key_init - } - - /// Retrieve the collector HPKE configuration for this peer. - pub fn collector_hpke_config(&self) -> &HpkeConfig { - &self.collector_hpke_config - } - - /// Retrieve the report expiry age that each task will be configured with. - pub fn report_expiry_age(&self) -> Option<&Duration> { - self.report_expiry_age.as_ref() - } - - /// Retrieve the maximum tolerable clock skew that each task will be configured with. - pub fn tolerable_clock_skew(&self) -> &Duration { - &self.tolerable_clock_skew - } - - /// Retrieve the [`AuthenticationToken`]s used for authenticating leader to helper requests. - pub fn aggregator_auth_tokens(&self) -> &[AuthenticationToken] { - &self.aggregator_auth_tokens - } - - /// Retrieve the [`AuthenticationToken`]s used for authenticating collector to leader requests. - pub fn collector_auth_tokens(&self) -> &[AuthenticationToken] { - &self.collector_auth_tokens - } - - /// Returns the [`AuthenticationToken`] currently used by this peer to authenticate itself. - pub fn primary_aggregator_auth_token(&self) -> &AuthenticationToken { - self.aggregator_auth_tokens.iter().next_back().unwrap() - } - - /// Checks if the given aggregator authentication token is valid (i.e. matches with an - /// authentication token recognized by this task). - pub fn check_aggregator_auth_token(&self, auth_token: &AuthenticationToken) -> bool { - self.aggregator_auth_tokens - .iter() - .rev() - .any(|t| t == auth_token) - } - - /// Returns the [`AuthenticationToken`] currently used by the collector to authenticate itself - /// to the aggregators. - pub fn primary_collector_auth_token(&self) -> &AuthenticationToken { - // Unwrap safety: self.collector_auth_tokens is never empty - self.collector_auth_tokens.iter().next_back().unwrap() - } - - /// Checks if the given collector authentication token is valid (i.e. matches with an - /// authentication token recognized by this task). - pub fn check_collector_auth_token(&self, auth_token: &AuthenticationToken) -> bool { - self.collector_auth_tokens - .iter() - .rev() - .any(|t| t == auth_token) - } - +impl VerifyKeyInit { /// Computes the VDAF verify key using the method defined in [draft-wang-ppm-dap-taskprov][1]. /// /// [1]: https://www.ietf.org/archive/id/draft-wang-ppm-dap-taskprov-04.html#name-deriving-the-vdaf-verificat @@ -239,7 +110,7 @@ impl PeerAggregator { task_id: &TaskId, vdaf_instance: &VdafInstance, ) -> SecretBytes { - let prk = SALT.extract(self.verify_key_init.as_ref()); + let prk = SALT.extract(self.0.as_ref()); let info = [task_id.as_ref().as_slice()]; // Unwrap safety: this function only errors if the OKM length is too long @@ -332,106 +203,3 @@ impl From for task::Task { value.0 } } - -#[cfg(feature = "test-util")] -#[cfg_attr(docsrs, doc(cfg(feature = "test-util")))] -pub mod test_util { - use janus_core::{ - hpke::test_util::generate_test_hpke_config_and_private_key, task::AuthenticationToken, - }; - use janus_messages::{Duration, HpkeConfig, Role}; - use rand::random; - use url::Url; - - use super::{PeerAggregator, VerifyKeyInit}; - - #[derive(Debug, Clone)] - pub struct PeerAggregatorBuilder(PeerAggregator); - - impl PeerAggregatorBuilder { - pub fn new() -> Self { - Self(PeerAggregator::new( - Url::parse("https://example.com").unwrap(), - Role::Leader, - random(), - generate_test_hpke_config_and_private_key().config().clone(), - None, - Duration::from_seconds(1), - Vec::from([random()]), - Vec::from([random()]), - )) - } - - pub fn with_endpoint(self, endpoint: Url) -> Self { - Self(PeerAggregator { endpoint, ..self.0 }) - } - - pub fn with_role(self, role: Role) -> Self { - Self(PeerAggregator { role, ..self.0 }) - } - - pub fn with_verify_key_init(self, verify_key_init: VerifyKeyInit) -> Self { - Self(PeerAggregator { - verify_key_init, - ..self.0 - }) - } - - pub fn with_collector_hpke_config(self, collector_hpke_config: HpkeConfig) -> Self { - Self(PeerAggregator { - collector_hpke_config, - ..self.0 - }) - } - - pub fn with_report_expiry_age(self, report_expiry_age: Option) -> Self { - Self(PeerAggregator { - report_expiry_age, - ..self.0 - }) - } - - pub fn with_tolerable_clock_skew(self, tolerable_clock_skew: Duration) -> Self { - Self(PeerAggregator { - tolerable_clock_skew, - ..self.0 - }) - } - - pub fn with_aggregator_auth_tokens( - self, - aggregator_auth_tokens: Vec, - ) -> Self { - Self(PeerAggregator { - aggregator_auth_tokens, - ..self.0 - }) - } - - pub fn with_collector_auth_tokens( - self, - collector_auth_tokens: Vec, - ) -> Self { - Self(PeerAggregator { - collector_auth_tokens, - ..self.0 - }) - } - - pub fn build(self) -> PeerAggregator { - self.0 - } - } - - impl From for PeerAggregatorBuilder { - fn from(value: PeerAggregator) -> Self { - Self(value) - } - } - - impl Default for PeerAggregatorBuilder { - fn default() -> Self { - Self::new() - } - } -} diff --git a/db/00000000000003_taskprov_sub01.down.sql b/db/00000000000003_taskprov_sub01.down.sql new file mode 100644 index 000000000..c9b17e52d --- /dev/null +++ b/db/00000000000003_taskprov_sub01.down.sql @@ -0,0 +1,38 @@ +-- Another DAP aggregator who we've partnered with to use the taskprov extension. +CREATE TABLE taskprov_peer_aggregators( + id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, -- artificial ID, internal only. + endpoint TEXT NOT NULL, -- peer aggregator HTTPS endpoint + role AGGREGATOR_ROLE NOT NULL, -- the role of this aggregator relative to the peer + verify_key_init BYTEA NOT NULL, -- the preshared key used for VDAF verify key derivation. + + -- Parameters applied to every task created with this peer aggregator. + tolerable_clock_skew BIGINT NOT NULL, -- the maximum acceptable clock skew to allow between client and aggregator, in seconds + report_expiry_age BIGINT, -- the maximum age of a report before it is considered expired (and acceptable for garbage collection), in seconds. NULL means that GC is disabled. + collector_hpke_config BYTEA NOT NULL, -- the HPKE config of the collector (encoded HpkeConfig message) + + CONSTRAINT taskprov_peer_aggregator_endpoint_and_role_unique UNIQUE(endpoint, role) +); + +-- Task aggregator auth tokens that we've shared with the peer aggregator. +CREATE TABLE taskprov_aggregator_auth_tokens( + id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, -- artificial ID, internal-only + peer_aggregator_id BIGINT NOT NULL, -- task ID the token is associated with + ord BIGINT NOT NULL, -- a value used to specify the ordering of the authentication tokens + token BYTEA NOT NULL, -- bearer token used to authenticate messages to/from the other aggregator (encrypted) + type AUTH_TOKEN_TYPE NOT NULL DEFAULT 'BEARER', + + CONSTRAINT task_aggregator_auth_tokens_unique_peer_aggregator_id_and_ord UNIQUE(peer_aggregator_id, ord), + CONSTRAINT fk_peer_aggregator_id FOREIGN KEY(peer_aggregator_id) REFERENCES taskprov_peer_aggregators(id) ON DELETE CASCADE +); + +-- Task collector auth tokens that we've shared with the peer aggregator. +CREATE TABLE taskprov_collector_auth_tokens( + id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, -- artificial ID, internal-only + peer_aggregator_id BIGINT NOT NULL, -- task ID the token is associated with + ord BIGINT NOT NULL, -- a value used to specify the ordering of the authentication tokens + token BYTEA NOT NULL, -- bearer token used to authenticate messages to/from the other aggregator (encrypted) + type AUTH_TOKEN_TYPE NOT NULL DEFAULT 'BEARER', + + CONSTRAINT task_collector_auth_tokens_unique_peer_aggregator_id_and_ord UNIQUE(peer_aggregator_id, ord), + CONSTRAINT fk_peer_aggregator_id FOREIGN KEY(peer_aggregator_id) REFERENCES taskprov_peer_aggregators(id) ON DELETE CASCADE +); diff --git a/db/00000000000003_taskprov_sub01.up.sql b/db/00000000000003_taskprov_sub01.up.sql new file mode 100644 index 000000000..a9e736eab --- /dev/null +++ b/db/00000000000003_taskprov_sub01.up.sql @@ -0,0 +1,2 @@ +-- FK tables are dropped by cascade. +DROP TABLE taskprov_peer_aggregators;