diff --git a/aggregator/src/bin/aggregator.rs b/aggregator/src/bin/aggregator.rs index c46bbfa52..c4428b214 100644 --- a/aggregator/src/bin/aggregator.rs +++ b/aggregator/src/bin/aggregator.rs @@ -12,7 +12,7 @@ use janus_aggregator::{ use janus_aggregator_api::{self, aggregator_api_handler}; use janus_aggregator_core::datastore::Datastore; use janus_core::{task::AuthenticationToken, time::RealClock}; -use serde::{Deserialize, Serialize}; +use serde::{de, Deserialize, Deserializer, Serialize}; use std::{ future::{ready, Future}, pin::Pin, @@ -79,43 +79,38 @@ async fn main() -> Result<()> { let aggregator_api_future: Pin + 'static>> = match build_aggregator_api_handler(&options, &config, &datastore)? { - Some(( - inner_aggregator_api_handler, - AggregatorApi::ListenAddress { listen_address, .. }, - )) => { - // Bind the requested address and spawn a future that serves the aggregator API - // on it, which we'll `tokio::join!` on below - let (aggregator_api_bound_address, aggregator_api_server) = setup_server( - *listen_address, - response_headers.clone(), - stopper.clone(), - inner_aggregator_api_handler, - ) - .await - .context("failed to create aggregator API server")?; - - info!(?aggregator_api_bound_address, "Running aggregator API"); - - Box::pin(aggregator_api_server) - } - - Some(( - inner_aggregator_api_handler, - AggregatorApi::PathPrefix { path_prefix, .. }, - )) => { - // Create a Trillium handler under the requested path prefix, which we'll add to - // the DAP API handler in the setup_server call below - info!( - aggregator_bound_address = ?config.listen_address, - path_prefix, - "Serving aggregator API relative to DAP API" - ); - // Append wildcard so that this handler will match anything under the prefix - let path_prefix = format!("{path_prefix}/*"); - handlers.1 = Some(router().all(path_prefix, inner_aggregator_api_handler)); - Box::pin(ready(())) + Some((handler, config)) => { + if let Some(listen_address) = config.listen_address { + // Bind the requested address and spawn a future that serves the aggregator API + // on it, which we'll `tokio::join!` on below + let (aggregator_api_bound_address, aggregator_api_server) = setup_server( + listen_address, + response_headers.clone(), + stopper.clone(), + handler, + ) + .await + .context("failed to create aggregator API server")?; + + info!(?aggregator_api_bound_address, "Running aggregator API"); + + Box::pin(aggregator_api_server) + } else if let Some(path_prefix) = &config.path_prefix { + // Create a Trillium handler under the requested path prefix, which we'll add to + // the DAP API handler in the setup_server call below + info!( + aggregator_bound_address = ?config.listen_address, + path_prefix, + "Serving aggregator API relative to DAP API" + ); + // Append wildcard so that this handler will match anything under the prefix + let path_prefix = format!("{path_prefix}/*"); + handlers.1 = Some(router().all(path_prefix, handler)); + Box::pin(ready(())) + } else { + unreachable!("the configuration should not have deserialized to this state") + } } - None => Box::pin(ready(())), }; @@ -164,7 +159,7 @@ fn build_aggregator_api_handler<'a>( Arc::clone(datastore), janus_aggregator_api::Config { auth_tokens: aggregator_api_auth_tokens, - public_dap_url: aggregator_api.public_dap_url().clone(), + public_dap_url: aggregator_api.public_dap_url.clone(), }, ), aggregator_api, @@ -209,36 +204,44 @@ pub struct HeaderEntry { /// Options for serving the aggregator API. #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] -#[serde(untagged)] -pub enum AggregatorApi { - ListenAddress { - /// Address on which this server should listen for connections to the Janus aggregator API - /// and serve its API endpoints, independently from the address on which the DAP API is - /// served. - listen_address: SocketAddr, - /// Resource location at which the DAP service managed by this aggregator api can be found - /// on the public internet. Required. - public_dap_url: Url, - }, - PathPrefix { - /// The Janus aggregator API will be served on the same address as the DAP API, but relative - /// to the provided prefix. e.g., if `path_prefix` is `aggregator-api`, then the DAP API's - /// uploads endpoint would be `{listen-address}/tasks/{task-id}/reports`, while task IDs - /// could be obtained from the aggregator API at `{listen-address}/aggregator-api/task_ids`. - path_prefix: String, - /// Resource location at which the DAP service managed by this aggregator api can be found - /// on the public internet. Required. - public_dap_url: Url, - }, +#[serde(deny_unknown_fields)] +pub struct AggregatorApi { + /// Address on which this server should listen for connections to the Janus aggregator API + /// and serve its API endpoints, independently from the address on which the DAP API is + /// served. This is mutually exclusive with `path_prefix`. + listen_address: Option, + /// The Janus aggregator API will be served on the same address as the DAP API, but relative + /// to the provided prefix. e.g., if `path_prefix` is `aggregator-api`, then the DAP API's + /// uploads endpoint would be `{listen-address}/tasks/{task-id}/reports`, while task IDs + /// could be obtained from the aggregator API at `{listen-address}/aggregator-api/task_ids`. + /// This is mutually exclusive with `listen_address`. + path_prefix: Option, + /// Resource location at which the DAP service managed by this aggregator api can be found + /// on the public internet. Required. + public_dap_url: Url, } -impl AggregatorApi { - fn public_dap_url(&self) -> &Url { - match self { - AggregatorApi::ListenAddress { public_dap_url, .. } => public_dap_url, - AggregatorApi::PathPrefix { public_dap_url, .. } => public_dap_url, +fn deserialize_aggregator_api<'de, D>(deserializer: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + let aggregator_api: Option = Deserialize::deserialize(deserializer)?; + if let Some(ref aggregator_api) = aggregator_api { + match (aggregator_api.listen_address, &aggregator_api.path_prefix) { + (None, None) => { + return Err(de::Error::custom( + "one of listen_address or path_prefix must be provided", + )) + } + (Some(_), Some(_)) => { + return Err(de::Error::custom( + "only one of listen_address and path_prefix must be specified", + )) + } + _ => {} } } + Ok(aggregator_api) } /// Non-secret configuration options for a Janus aggregator, deserialized from YAML. @@ -311,6 +314,7 @@ struct Config { listen_address: SocketAddr, /// How to serve the Janus aggregator API. If not set, the aggregator API is not served. + #[serde(default, deserialize_with = "deserialize_aggregator_api")] aggregator_api: Option, /// Additional headers that will be added to all responses. @@ -398,6 +402,7 @@ impl BinaryConfig for Config { #[cfg(test)] mod tests { use super::{AggregatorApi, Config, GarbageCollectorConfig, HeaderEntry, Options}; + use assert_matches::assert_matches; use clap::CommandFactory; use janus_aggregator::{ aggregator, @@ -423,12 +428,14 @@ mod tests { } #[rstest::rstest] - #[case::listen_address(AggregatorApi::ListenAddress { - listen_address: SocketAddr::from((Ipv4Addr::UNSPECIFIED, 8081)), + #[case::listen_address(AggregatorApi { + listen_address: Some(SocketAddr::from((Ipv4Addr::UNSPECIFIED, 8081))), + path_prefix: None, public_dap_url: "https://dap.url".parse().unwrap() })] - #[case::path_prefix(AggregatorApi::PathPrefix { - path_prefix: "prefix".to_string(), + #[case::path_prefix(AggregatorApi { + listen_address: None, + path_prefix: Some("prefix".to_string()), public_dap_url: "https://dap.url".parse().unwrap() })] #[test] @@ -551,8 +558,9 @@ mod tests { ) .unwrap() .aggregator_api, - Some(AggregatorApi::ListenAddress { - listen_address: SocketAddr::from((Ipv4Addr::UNSPECIFIED, 8081)), + Some(AggregatorApi { + listen_address: Some(SocketAddr::from((Ipv4Addr::UNSPECIFIED, 8081))), + path_prefix: None, public_dap_url: "https://dap.url".parse().unwrap() }) ); @@ -577,13 +585,56 @@ mod tests { ) .unwrap() .aggregator_api, - Some(AggregatorApi::PathPrefix { - path_prefix: "aggregator-api".to_string(), + Some(AggregatorApi { + listen_address: None, + path_prefix: Some("aggregator-api".to_string()), public_dap_url: "https://dap.url".parse().unwrap() }) ); } + #[test] + fn config_aggregator_mutually_exclusive() { + assert_matches!( + serde_yaml::from_str::( + r#"--- + listen_address: "0.0.0.0:8080" + database: + url: "postgres://postgres:postgres@localhost:5432/postgres" + connection_pool_timeouts_secs: 60 + max_upload_batch_size: 100 + max_upload_batch_write_delay_ms: 250 + batch_aggregation_shard_count: 32 + aggregator_api: + path_prefix: "aggregator-api" + listen_address: "0.0.0.0:8081" + public_dap_url: "https://dap.url" + "# + ), + Err(_) + ); + } + + #[test] + fn config_aggregator_api_missing_parameters() { + assert_matches!( + serde_yaml::from_str::( + r#"--- + listen_address: "0.0.0.0:8080" + database: + url: "postgres://postgres:postgres@localhost:5432/postgres" + connection_pool_timeouts_secs: 60 + max_upload_batch_size: 100 + max_upload_batch_write_delay_ms: 250 + batch_aggregation_shard_count: 32 + aggregator_api: + public_dap_url: "https://dap.url" + "# + ), + Err(_) + ); + } + /// Check that configuration fragments in the README and other documentation can be parsed /// correctly. #[test] diff --git a/docs/samples/advanced_config/aggregator.yaml b/docs/samples/advanced_config/aggregator.yaml index e67acbe14..36728d303 100644 --- a/docs/samples/advanced_config/aggregator.yaml +++ b/docs/samples/advanced_config/aggregator.yaml @@ -69,9 +69,12 @@ listen_address: "0.0.0.0:8080" # How to serve the Janus aggregator API. If not set, Janus aggregator API is not served. (optional) aggregator_api: - # Socket address on which to listen for requests. + # Serve the aggregator API on an address and port that is separate from the DAP API. This is + # mutually exclusive with path_prefix. listen_address: "0.0.0.0:8081" - # Alternately, the aggregator API may be served on `listen_address`, at an arbitrary path prefix. + + # Alternatively, serve the aggregator API on the same address as the DAP API, but on a separate + # path denoted by `path_prefix`. This is mutually exclusive with listen_address. # path_prefix: "aggregator-api" # Resource location at which the DAP service managed by this