From 29da09cef05f9335432e79f42549e40dd6b1d42d Mon Sep 17 00:00:00 2001 From: Ameer Ghani Date: Thu, 21 Sep 2023 16:49:21 -0400 Subject: [PATCH] Use a global aggregator auth token --- aggregator/src/aggregator.rs | 67 ++++-- aggregator/src/aggregator/http_handlers.rs | 7 +- aggregator/src/aggregator/taskprov_tests.rs | 211 ++---------------- aggregator/src/bin/aggregator.rs | 27 ++- aggregator/tests/cmd/aggregator.trycmd | 6 +- aggregator/tests/graceful_shutdown.rs | 33 ++- docs/samples/advanced_config/aggregator.yaml | 10 + docs/samples/basic_config/aggregator.yaml | 9 + .../config/janus_interop_aggregator.yaml | 8 + .../src/bin/janus_interop_aggregator.rs | 26 ++- 10 files changed, 176 insertions(+), 228 deletions(-) diff --git a/aggregator/src/aggregator.rs b/aggregator/src/aggregator.rs index eb43de03f..0a550037d 100644 --- a/aggregator/src/aggregator.rs +++ b/aggregator/src/aggregator.rs @@ -30,12 +30,9 @@ use janus_aggregator_core::{ taskprov::{self, VerifyKeyInit}, }; #[cfg(feature = "test-util")] -use janus_core::test_util::dummy_vdaf; +use janus_core::{hpke::generate_hpke_config_and_private_key, test_util::dummy_vdaf}; use janus_core::{ - hpke::{ - self, aggregate_share_aad, generate_hpke_config_and_private_key, input_share_aad, - HpkeApplicationInfo, HpkeKeypair, Label, - }, + hpke::{self, aggregate_share_aad, input_share_aad, HpkeApplicationInfo, HpkeKeypair, Label}, http::response_to_problem_details, task::{AuthenticationToken, VdafInstance, PRIO3_VERIFY_KEY_LENGTH}, time::{Clock, DurationExt, IntervalExt, TimeExt}, @@ -46,10 +43,11 @@ use janus_messages::{ taskprov::TaskConfig, AggregateContinueReq, AggregateContinueResp, AggregateInitializeReq, AggregateInitializeResp, AggregateShareReq, AggregateShareResp, BatchSelector, CollectReq, CollectResp, CollectionJobId, - Duration, HpkeAeadId, HpkeCiphertext, HpkeConfig, HpkeConfigId, HpkeKdfId, HpkeKemId, Interval, - PartialBatchSelector, PrepareStep, PrepareStepResult, Report, ReportIdChecksum, ReportShare, - ReportShareError, Role, TaskId, + Duration, HpkeCiphertext, HpkeConfig, Interval, PartialBatchSelector, PrepareStep, + PrepareStepResult, Report, ReportIdChecksum, ReportShare, ReportShareError, Role, TaskId, }; +#[cfg(feature = "test-util")] +use janus_messages::{HpkeAeadId, HpkeConfigId, HpkeKdfId, HpkeKemId}; use opentelemetry::{ metrics::{Counter, Histogram, Meter, Unit}, KeyValue, @@ -193,8 +191,11 @@ pub struct Config { /// New tasks will have this tolerable clock skew. pub tolerable_clock_skew: Duration, - /// Defines the key used to deterministically derive the VDAF verify key for new tasks. + /// Defines the key used to derive the VDAF verify key for new tasks. pub verify_key_init: VerifyKeyInit, + + /// Authentication tokens used for requests from the leader. + pub auth_tokens: Vec, } // subscriber-01 only: the config now has mandatory fields, so default only makes sense as a helper @@ -218,6 +219,7 @@ impl Default for Config { report_expiry_age: None, tolerable_clock_skew: Duration::from_minutes(60).unwrap(), verify_key_init: random(), + auth_tokens: Vec::new(), } } } @@ -322,8 +324,13 @@ impl Aggregator { task_aggregator } None if taskprov_task_config.is_some() => { - self.taskprov_opt_in(&Role::Leader, task_id, taskprov_task_config.unwrap()) - .await?; + self.taskprov_opt_in( + &Role::Leader, + task_id, + taskprov_task_config.unwrap(), + auth_token.as_ref(), + ) + .await?; // Retry fetching the aggregator, since the last function would have just inserted // its task. @@ -366,8 +373,13 @@ impl Aggregator { } if taskprov_task_config.is_some() { - self.taskprov_authorize_request(&Role::Leader, task_id, taskprov_task_config.unwrap()) - .await?; + self.taskprov_authorize_request( + &Role::Leader, + task_id, + taskprov_task_config.unwrap(), + auth_token.as_ref(), + ) + .await?; } else if !auth_token .map(|t| task_aggregator.task.check_aggregator_auth_token(&t)) .unwrap_or(false) @@ -492,8 +504,13 @@ impl Aggregator { // Authorize the request and retrieve the collector's HPKE config. If this is a taskprov task, we // have to use the peer aggregator's collector config rather than the main task. let collector_hpke_config = if taskprov_task_config.is_some() { - self.taskprov_authorize_request(&Role::Leader, task_id, taskprov_task_config.unwrap()) - .await?; + self.taskprov_authorize_request( + &Role::Leader, + task_id, + taskprov_task_config.unwrap(), + auth_token.as_ref(), + ) + .await?; &self.cfg.collector_hpke_config } else { if !auth_token @@ -574,8 +591,9 @@ impl Aggregator { peer_role: &Role, task_id: &TaskId, task_config: &TaskConfig, + aggregator_auth_token: Option<&AuthenticationToken>, ) -> Result<(), Error> { - self.taskprov_authorize_request(peer_role, task_id, task_config) + self.taskprov_authorize_request(peer_role, task_id, task_config, aggregator_auth_token) .await?; let aggregator_urls = task_config @@ -583,6 +601,12 @@ impl Aggregator { .iter() .map(|url| url.try_into()) .collect::, _>>()?; + if aggregator_urls.len() < 2 { + return Err(Error::UnrecognizedMessage( + Some(*task_id), + "taskprov configuration is missing one or both aggregators", + )); + } // TODO(#1647): Check whether task config parameters are acceptable for privacy and // availability of the system. @@ -662,7 +686,18 @@ impl Aggregator { peer_role: &Role, task_id: &TaskId, task_config: &TaskConfig, + aggregator_auth_token: Option<&AuthenticationToken>, ) -> Result<(), Error> { + let request_token = aggregator_auth_token.ok_or(Error::UnauthorizedRequest(*task_id))?; + if !self + .cfg + .auth_tokens + .iter() + .any(|token| token == request_token) + { + return Err(Error::UnauthorizedRequest(*task_id)); + } + if self.clock.now() > *task_config.task_expiration() { return Err(Error::InvalidTask(*task_id, OptOutReason::TaskExpired)); } diff --git a/aggregator/src/aggregator/http_handlers.rs b/aggregator/src/aggregator/http_handlers.rs index 16ebc7762..e3acf320c 100644 --- a/aggregator/src/aggregator/http_handlers.rs +++ b/aggregator/src/aggregator/http_handlers.rs @@ -245,10 +245,10 @@ async fn aggregator_handler_with_aggregator( "hpke_config", hpke_config_cors_preflight, ) - .post("upload", instrumented(api(upload::))) - .with_route(trillium::Method::Options, "upload", upload_cors_preflight) .post("aggregate", instrumented(api(aggregate::))) .post("collect", instrumented(api(collect_post::))) + .post("aggregate_share", instrumented(api(aggregate_share::))) + // TODO(#1728): remove these unnecessary routes, subscriber-01 is helper-only. .get( "collect/:task_id/:collection_job_id", instrumented(api(collect_get::)), @@ -257,7 +257,8 @@ async fn aggregator_handler_with_aggregator( "collect/:task_id/:collection_job_id", instrumented(api(collect_delete::)), ) - .post("aggregate_share", instrumented(api(aggregate_share::))), + .post("upload", instrumented(api(upload::))) + .with_route(trillium::Method::Options, "upload", upload_cors_preflight), StatusCounter::new(meter), )) } diff --git a/aggregator/src/aggregator/taskprov_tests.rs b/aggregator/src/aggregator/taskprov_tests.rs index 56a335c90..d828872a2 100644 --- a/aggregator/src/aggregator/taskprov_tests.rs +++ b/aggregator/src/aggregator/taskprov_tests.rs @@ -26,7 +26,7 @@ use janus_core::{ HpkeApplicationInfo, HpkeKeypair, Label, }, report_id::ReportIdChecksumExt, - task::PRIO3_VERIFY_KEY_LENGTH, + task::{AuthenticationToken, PRIO3_VERIFY_KEY_LENGTH}, taskprov::TASKPROV_HEADER, test_util::{install_test_trace_subscriber, run_vdaf, VdafTranscript}, time::{Clock, DurationExt, MockClock, TimeExt}, @@ -71,6 +71,7 @@ pub struct TaskprovTestCase { task: Task, task_config: TaskConfig, task_id: TaskId, + aggregator_auth_token: AuthenticationToken, } async fn setup_taskprov_test() -> TaskprovTestCase { @@ -82,6 +83,7 @@ async fn setup_taskprov_test() -> TaskprovTestCase { let global_hpke_key = generate_test_hpke_config_and_private_key(); let collector_hpke_keypair = generate_test_hpke_config_and_private_key(); + let aggregator_auth_token: AuthenticationToken = random(); datastore .run_tx(|tx| { @@ -94,9 +96,12 @@ async fn setup_taskprov_test() -> TaskprovTestCase { .await .unwrap(); + let tolerable_clock_skew = Duration::from_seconds(60); let config = Config { collector_hpke_config: collector_hpke_keypair.config().clone(), verify_key_init: random(), + auth_tokens: vec![aggregator_auth_token.clone()], + tolerable_clock_skew, ..Default::default() }; @@ -161,7 +166,7 @@ async fn setup_taskprov_test() -> TaskprovTestCase { config.report_expiry_age.clone(), min_batch_size as u64, Duration::from_seconds(1), - Duration::from_seconds(1), + tolerable_clock_skew, ) .unwrap(); @@ -200,6 +205,7 @@ async fn setup_taskprov_test() -> TaskprovTestCase { report_metadata, transcript, report_share, + aggregator_auth_token, } } @@ -218,10 +224,7 @@ async fn taskprov_aggregate_init() { Vec::from([test.report_share.clone()]), ); - let auth = test - .peer_aggregator - .primary_aggregator_auth_token() - .request_authentication(); + let auth = test.aggregator_auth_token.request_authentication(); let mut test_conn = post(test.task.aggregation_job_uri().unwrap().path()) .with_request_header(auth.0, "Bearer invalid_token") @@ -317,10 +320,7 @@ async fn taskprov_opt_out_task_expired() { Vec::from([test.report_share.clone()]), ); - let auth = test - .peer_aggregator - .primary_aggregator_auth_token() - .request_authentication(); + let auth = test.aggregator_auth_token.request_authentication(); // Advance clock past task expiry. test.clock.advance(&Duration::from_hours(48).unwrap()); @@ -390,10 +390,7 @@ async fn taskprov_opt_out_mismatched_task_id() { ) .unwrap(); - let auth = test - .peer_aggregator - .primary_aggregator_auth_token() - .request_authentication(); + let auth = test.aggregator_auth_token.request_authentication(); let mut test_conn = post( test @@ -470,10 +467,7 @@ async fn taskprov_opt_out_missing_aggregator() { Vec::from([test.report_share.clone()]), ); - let auth = test - .peer_aggregator - .primary_aggregator_auth_token() - .request_authentication(); + let auth = test.aggregator_auth_token.request_authentication(); let mut test_conn = post( test @@ -507,172 +501,6 @@ async fn taskprov_opt_out_missing_aggregator() { ); } -#[tokio::test] -async fn taskprov_opt_out_peer_aggregator_wrong_role() { - let test = setup_taskprov_test().await; - - let batch_id = random(); - let aggregation_job_id: AggregationJobId = random(); - - let task_expiration = test - .clock - .now() - .add(&Duration::from_hours(24).unwrap()) - .unwrap(); - let another_task_config = TaskConfig::new( - Vec::from("foobar".as_bytes()), - // Attempt to configure leader as a helper. - Vec::from([ - "https://helper.example.com/".as_bytes().try_into().unwrap(), - "https://leader.example.com/".as_bytes().try_into().unwrap(), - ]), - QueryConfig::new( - Duration::from_seconds(1), - 100, - 100, - TaskprovQuery::FixedSize { - max_batch_size: 100, - }, - ), - task_expiration, - VdafConfig::new(DpConfig::new(DpMechanism::None), VdafType::Prio3Aes128Count).unwrap(), - ) - .unwrap(); - let another_task_config_encoded = another_task_config.get_encoded(); - let another_task_id: TaskId = digest(&SHA256, &another_task_config_encoded) - .as_ref() - .try_into() - .unwrap(); - - let request = AggregateInitializeReq::new( - another_task_id, - aggregation_job_id, - ().get_encoded(), - PartialBatchSelector::new_fixed_size(batch_id), - Vec::from([test.report_share.clone()]), - ); - - let auth = test - .peer_aggregator - .primary_aggregator_auth_token() - .request_authentication(); - - let mut test_conn = post( - test - // Use the test case task's ID. - .task - .aggregation_job_uri() - .unwrap() - .path(), - ) - .with_request_header(auth.0, auth.1) - .with_request_header( - KnownHeaderName::ContentType, - AggregateInitializeReq::::MEDIA_TYPE, - ) - .with_request_header( - TASKPROV_HEADER, - URL_SAFE_NO_PAD.encode(another_task_config_encoded), - ) - .with_request_body(request.get_encoded()) - .run_async(&test.handler) - .await; - assert_eq!(test_conn.status(), Some(Status::BadRequest)); - assert_eq!( - take_problem_details(&mut test_conn).await, - json!({ - "status": Status::BadRequest as u16, - "type": "urn:ietf:params:ppm:dap:error:invalidTask", - "title": "Aggregator has opted out of the indicated task.", - "taskid": format!("{}", another_task_id - ), - }) - ); -} - -#[tokio::test] -async fn taskprov_opt_out_peer_aggregator_does_not_exist() { - let test = setup_taskprov_test().await; - - let batch_id = random(); - let aggregation_job_id: AggregationJobId = random(); - - let task_expiration = test - .clock - .now() - .add(&Duration::from_hours(24).unwrap()) - .unwrap(); - let another_task_config = TaskConfig::new( - Vec::from("foobar".as_bytes()), - Vec::from([ - // Some non-existent aggregator. - "https://foobar.example.com/".as_bytes().try_into().unwrap(), - "https://leader.example.com/".as_bytes().try_into().unwrap(), - ]), - QueryConfig::new( - Duration::from_seconds(1), - 100, - 100, - TaskprovQuery::FixedSize { - max_batch_size: 100, - }, - ), - task_expiration, - VdafConfig::new(DpConfig::new(DpMechanism::None), VdafType::Prio3Aes128Count).unwrap(), - ) - .unwrap(); - let another_task_config_encoded = another_task_config.get_encoded(); - let another_task_id: TaskId = digest(&SHA256, &another_task_config_encoded) - .as_ref() - .try_into() - .unwrap(); - - let request = AggregateInitializeReq::new( - another_task_id, - aggregation_job_id, - ().get_encoded(), - PartialBatchSelector::new_fixed_size(batch_id), - Vec::from([test.report_share.clone()]), - ); - - let auth = test - .peer_aggregator - .primary_aggregator_auth_token() - .request_authentication(); - - let mut test_conn = post( - test - // Use the test case task's ID. - .task - .aggregation_job_uri() - .unwrap() - .path(), - ) - .with_request_header(auth.0, auth.1) - .with_request_header( - KnownHeaderName::ContentType, - AggregateInitializeReq::::MEDIA_TYPE, - ) - .with_request_header( - TASKPROV_HEADER, - URL_SAFE_NO_PAD.encode(another_task_config_encoded), - ) - .with_request_body(request.get_encoded()) - .run_async(&test.handler) - .await; - assert_eq!(test_conn.status(), Some(Status::BadRequest)); - assert_eq!( - take_problem_details(&mut test_conn).await, - json!({ - "status": Status::BadRequest as u16, - "type": "urn:ietf:params:ppm:dap:error:invalidTask", - "title": "Aggregator has opted out of the indicated task.", - "taskid": format!("{}", another_task_id - ), - }) - ); -} - #[tokio::test] async fn taskprov_aggregate_continue() { let test = setup_taskprov_test().await; @@ -748,10 +576,7 @@ async fn taskprov_aggregate_continue() { )]), ); - let auth = test - .peer_aggregator - .primary_aggregator_auth_token() - .request_authentication(); + let auth = test.aggregator_auth_token.request_authentication(); // Attempt using the wrong credentials, should reject. let mut test_conn = post(test.task.aggregation_job_uri().unwrap().path()) @@ -869,10 +694,7 @@ async fn taskprov_aggregate_share() { ReportIdChecksum::get_decoded(&[3; 32]).unwrap(), ); - let auth = test - .peer_aggregator - .primary_aggregator_auth_token() - .request_authentication(); + let auth = test.aggregator_auth_token.request_authentication(); // Attempt using the wrong credentials, should reject. let mut test_conn = post(test.task.aggregate_shares_uri().unwrap().path()) @@ -938,10 +760,7 @@ async fn taskprov_aggregate_share() { #[tokio::test] async fn end_to_end() { let test = setup_taskprov_test().await; - let (auth_header_name, auth_header_value) = test - .peer_aggregator - .primary_aggregator_auth_token() - .request_authentication(); + let (auth_header_name, auth_header_value) = test.aggregator_auth_token.request_authentication(); let batch_id = random(); let aggregation_job_id = random(); diff --git a/aggregator/src/bin/aggregator.rs b/aggregator/src/bin/aggregator.rs index f96c72895..b21524f4e 100644 --- a/aggregator/src/bin/aggregator.rs +++ b/aggregator/src/bin/aggregator.rs @@ -45,12 +45,23 @@ async fn main() -> Result<()> { .response_headers() .context("failed to parse response headers")?; + // inahga: refactor auth token logic + let auth_tokens = options + .aggregator_api_auth_tokens + .iter() + .filter(|token| !token.is_empty()) + .map(|token| { + AuthenticationToken::new_bearer_token_from_string(token) + .context("invalid aggregator auth token") + }) + .collect::>>()?; + let mut handlers = ( aggregator_handler( Arc::clone(&datastore), clock, &meter, - config.aggregator_config(options.verify_key_init), + config.aggregator_config(options.verify_key_init, auth_tokens), ) .await?, None, @@ -199,7 +210,7 @@ struct Options { long, env = "AGGREGATOR_AUTH_TOKENS", hide_env_values = true, - num_args = 0..=1, + required = true, use_value_delimiter = true, help = "DAP aggregator tokens, encoded in base64 then comma-separated" )] @@ -393,7 +404,11 @@ impl Config { .collect() } - fn aggregator_config(&self, verify_key_init: VerifyKeyInit) -> aggregator::Config { + fn aggregator_config( + &self, + verify_key_init: VerifyKeyInit, + auth_tokens: Vec, + ) -> aggregator::Config { aggregator::Config { max_upload_batch_size: self.max_upload_batch_size, max_upload_batch_write_delay: Duration::from_millis( @@ -408,6 +423,7 @@ impl Config { report_expiry_age: self.taskprov_config.report_expiry_age, tolerable_clock_skew: self.taskprov_config.tolerable_clock_skew, verify_key_init, + auth_tokens, } } } @@ -441,7 +457,6 @@ mod tests { use janus_aggregator_core::taskprov::VerifyKeyInit; use janus_core::{ hpke::test_util::generate_test_hpke_config_and_private_key, test_util::roundtrip_encoding, - time::DurationExt, }; use janus_messages::{ HpkeAeadId, HpkeConfig, HpkeConfigId, HpkeKdfId, HpkeKemId, HpkePublicKey, @@ -703,7 +718,7 @@ mod tests { "# ) .unwrap() - .aggregator_config(verify_key_init.clone()), + .aggregator_config(verify_key_init.clone(), Vec::new()), aggregator::Config { max_upload_batch_size: 100, max_upload_batch_write_delay: Duration::from_millis(250), @@ -720,7 +735,7 @@ mod tests { ), ), report_expiry_age: None, - tolerable_clock_skew: janus_messages::Duration::from_minutes(60).unwrap(), + tolerable_clock_skew: janus_messages::Duration::from_seconds(60), verify_key_init, ..Default::default() } diff --git a/aggregator/tests/cmd/aggregator.trycmd b/aggregator/tests/cmd/aggregator.trycmd index 60d3e1ae4..cc69c0d4b 100644 --- a/aggregator/tests/cmd/aggregator.trycmd +++ b/aggregator/tests/cmd/aggregator.trycmd @@ -2,7 +2,7 @@ $ aggregator --help DAP aggregator server -Usage: aggregator [OPTIONS] --config-file +Usage: aggregator [OPTIONS] --config-file --aggregator-auth-tokens --verify-key-init Options: --config-file @@ -17,6 +17,10 @@ Options: additional OTLP/gRPC metadata key/value pairs for the metrics exporter [env: OTLP_METRICS_METADATA=] --aggregator-api-auth-tokens [] aggregator API auth tokens, encoded in base64 then comma-separated [env: AGGREGATOR_API_AUTH_TOKENS] + --aggregator-auth-tokens + DAP aggregator tokens, encoded in base64 then comma-separated [env: AGGREGATOR_AUTH_TOKENS] + --verify-key-init + Taskprov verify key init, encoded in base64 [env: VERIFY_KEY_INIT] -h, --help Print help -V, --version diff --git a/aggregator/tests/graceful_shutdown.rs b/aggregator/tests/graceful_shutdown.rs index 8a9710a7b..a499796b4 100644 --- a/aggregator/tests/graceful_shutdown.rs +++ b/aggregator/tests/graceful_shutdown.rs @@ -3,15 +3,25 @@ //! process. The process should promptly shut down, and this test will fail if //! it times out waiting for the process to do so. -use base64::{engine::general_purpose::STANDARD_NO_PAD, Engine}; +use base64::{ + engine::general_purpose::{STANDARD_NO_PAD, URL_SAFE_NO_PAD}, + Engine, +}; use janus_aggregator_core::{ datastore::test_util::ephemeral_datastore, task::{test_util::TaskBuilder, QueryType}, + taskprov::VerifyKeyInit, +}; +use janus_core::{ + hpke::test_util::generate_test_hpke_config_and_private_key, + task::{AuthenticationToken, VdafInstance}, + test_util::install_test_trace_subscriber, + time::RealClock, }; -use janus_core::{task::VdafInstance, test_util::install_test_trace_subscriber, time::RealClock}; use janus_messages::Role; +use rand::random; use reqwest::Url; -use serde_yaml::{Mapping, Value}; +use serde_yaml::{to_value, Mapping, Value}; use std::{ future::Future, io::{ErrorKind, Write}, @@ -119,6 +129,15 @@ async fn graceful_shutdown(binary: &Path, mut config: Mapping) { db_config.insert("url".into(), ephemeral_datastore.connection_string().into()); db_config.insert("connection_pool_timeout_secs".into(), "60".into()); config.insert("database".into(), db_config.into()); + + let mut taskprov_config = Mapping::new(); + taskprov_config.insert( + "collector_hpke_config".into(), + to_value(generate_test_hpke_config_and_private_key().config()).unwrap(), + ); + taskprov_config.insert("tolerable_clock_skew".into(), 60.into()); + config.insert("taskprov_config".into(), taskprov_config.into()); + config.insert( "health_check_listen_address".into(), format!("{health_check_listen_address}").into(), @@ -162,6 +181,14 @@ async fn graceful_shutdown(binary: &Path, mut config: Mapping) { "DATASTORE_KEYS", STANDARD_NO_PAD.encode(ephemeral_datastore.datastore_key_bytes()), ) + .env( + "VERIFY_KEY_INIT", + URL_SAFE_NO_PAD.encode(random::().as_ref()), + ) + .env( + "AGGREGATOR_AUTH_TOKENS", + URL_SAFE_NO_PAD.encode(random::()), + ) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn() diff --git a/docs/samples/advanced_config/aggregator.yaml b/docs/samples/advanced_config/aggregator.yaml index 1b06f69ed..69fd2e5ca 100644 --- a/docs/samples/advanced_config/aggregator.yaml +++ b/docs/samples/advanced_config/aggregator.yaml @@ -104,3 +104,13 @@ garbage_collection: # The maximum number of collection jobs (& related artifacts), per task, to delete in a single run # of the garbage collector. collection_limit: 50 + +taskprov_config: + collector_hpke_config: + id: 183 + kem_id: X25519HkdfSha256 + kdf_id: HkdfSha256 + aead_id: Aes128Gcm + public_key: 4qiv6IY5jrjCV3xbaQXULmPIpvoIml1oJmeXm-yOuAo + tolerable_clock_skew: 60 + report_expiry_age: 2592000 diff --git a/docs/samples/basic_config/aggregator.yaml b/docs/samples/basic_config/aggregator.yaml index 37c5932d4..8e6368982 100644 --- a/docs/samples/basic_config/aggregator.yaml +++ b/docs/samples/basic_config/aggregator.yaml @@ -21,3 +21,12 @@ max_upload_batch_write_delay_ms: 250 # Number of sharded database records per batch aggregation. Must not be greater # than the equivalent setting in the collection job driver. (required) batch_aggregation_shard_count: 32 + +taskprov_config: + collector_hpke_config: + id: 183 + kem_id: X25519HkdfSha256 + kdf_id: HkdfSha256 + aead_id: Aes128Gcm + public_key: 4qiv6IY5jrjCV3xbaQXULmPIpvoIml1oJmeXm-yOuAo + tolerable_clock_skew: 60 diff --git a/interop_binaries/config/janus_interop_aggregator.yaml b/interop_binaries/config/janus_interop_aggregator.yaml index 27ceeeb38..49715210a 100644 --- a/interop_binaries/config/janus_interop_aggregator.yaml +++ b/interop_binaries/config/janus_interop_aggregator.yaml @@ -7,3 +7,11 @@ health_check_listen_address: 0.0.0.0:8000 logging_config: force_json_output: true listen_address: 0.0.0.0:8080 +taskprov_config: + collector_hpke_config: + id: 183 + kem_id: X25519HkdfSha256 + kdf_id: HkdfSha256 + aead_id: Aes128Gcm + public_key: 4qiv6IY5jrjCV3xbaQXULmPIpvoIml1oJmeXm-yOuAo + tolerable_clock_skew: 60 diff --git a/interop_binaries/src/bin/janus_interop_aggregator.rs b/interop_binaries/src/bin/janus_interop_aggregator.rs index ffff95972..6d27a98d3 100644 --- a/interop_binaries/src/bin/janus_interop_aggregator.rs +++ b/interop_binaries/src/bin/janus_interop_aggregator.rs @@ -4,7 +4,8 @@ use clap::Parser; use janus_aggregator::{ aggregator::{self, http_handlers::aggregator_handler}, binary_utils::{janus_main, BinaryOptions, CommonBinaryOptions}, - config::{BinaryConfig, CommonConfig}, + cache::GlobalHpkeKeypairCache, + config::{BinaryConfig, CommonConfig, TaskprovConfig}, }; use janus_aggregator_core::{ datastore::{models::HpkeKeyState, Datastore}, @@ -12,7 +13,9 @@ use janus_aggregator_core::{ SecretBytes, }; use janus_core::{ - hpke::generate_hpke_config_and_private_key, task::AuthenticationToken, time::RealClock, + hpke::generate_hpke_config_and_private_key, + task::AuthenticationToken, + time::{DurationExt, RealClock}, }; use janus_interop_binaries::{ status::{ERROR, SUCCESS}, @@ -24,6 +27,7 @@ use janus_messages::{ }; use opentelemetry::metrics::Meter; use prio::codec::Decode; +use rand::random; use serde::{Deserialize, Serialize}; use sqlx::{migrate::Migrator, Connection, PgConnection}; use std::{ @@ -161,7 +165,21 @@ async fn make_handler( max_upload_batch_size: 100, max_upload_batch_write_delay: std::time::Duration::from_millis(100), batch_aggregation_shard_count: 32, - ..Default::default() + global_hpke_configs_refresh_interval: GlobalHpkeKeypairCache::DEFAULT_REFRESH_INTERVAL, + + // TODO(janus-ops#991): Give these taskprov parameters actual values to facilitiate an E2E test. + collector_hpke_config: generate_hpke_config_and_private_key( + HpkeConfigId::from(1), + HpkeKemId::X25519HkdfSha256, + HpkeKdfId::HkdfSha256, + HpkeAeadId::Aes128Gcm, + ) + .config() + .clone(), + report_expiry_age: None, + tolerable_clock_skew: Duration::from_minutes(60).unwrap(), + verify_key_init: random(), + auth_tokens: Vec::new(), }, ) .await?; @@ -263,6 +281,8 @@ struct Config { /// Path prefix, e.g. `/dap/`, to serve DAP from. #[serde(default = "default_dap_serving_prefix")] dap_serving_prefix: String, + + taskprov_config: TaskprovConfig, } impl BinaryConfig for Config {