Skip to content

Commit

Permalink
Add validation to enforce listen_address and path_prefix mutual exclu…
Browse files Browse the repository at this point in the history
…sion (#1809) (#1894)

* Provide clarification on aggregator API options

* listen_address and path_prefix are mutually exclusive

* PR feedback
  • Loading branch information
inahga authored Sep 7, 2023
1 parent ae13364 commit 93aa835
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 74 deletions.
195 changes: 123 additions & 72 deletions aggregator/src/bin/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use janus_aggregator::{
use janus_aggregator_api::{self, aggregator_api_handler};
use janus_aggregator_core::datastore::Datastore;
use janus_core::{task::AuthenticationToken, time::RealClock};
use serde::{Deserialize, Serialize};
use serde::{de, Deserialize, Deserializer, Serialize};
use std::{
future::{ready, Future},
pin::Pin,
Expand Down Expand Up @@ -79,43 +79,38 @@ async fn main() -> Result<()> {

let aggregator_api_future: Pin<Box<dyn Future<Output = ()> + 'static>> =
match build_aggregator_api_handler(&options, &config, &datastore)? {
Some((
inner_aggregator_api_handler,
AggregatorApi::ListenAddress { listen_address, .. },
)) => {
// Bind the requested address and spawn a future that serves the aggregator API
// on it, which we'll `tokio::join!` on below
let (aggregator_api_bound_address, aggregator_api_server) = setup_server(
*listen_address,
response_headers.clone(),
stopper.clone(),
inner_aggregator_api_handler,
)
.await
.context("failed to create aggregator API server")?;

info!(?aggregator_api_bound_address, "Running aggregator API");

Box::pin(aggregator_api_server)
}

Some((
inner_aggregator_api_handler,
AggregatorApi::PathPrefix { path_prefix, .. },
)) => {
// Create a Trillium handler under the requested path prefix, which we'll add to
// the DAP API handler in the setup_server call below
info!(
aggregator_bound_address = ?config.listen_address,
path_prefix,
"Serving aggregator API relative to DAP API"
);
// Append wildcard so that this handler will match anything under the prefix
let path_prefix = format!("{path_prefix}/*");
handlers.1 = Some(router().all(path_prefix, inner_aggregator_api_handler));
Box::pin(ready(()))
Some((handler, config)) => {
if let Some(listen_address) = config.listen_address {
// Bind the requested address and spawn a future that serves the aggregator API
// on it, which we'll `tokio::join!` on below
let (aggregator_api_bound_address, aggregator_api_server) = setup_server(
listen_address,
response_headers.clone(),
stopper.clone(),
handler,
)
.await
.context("failed to create aggregator API server")?;

info!(?aggregator_api_bound_address, "Running aggregator API");

Box::pin(aggregator_api_server)
} else if let Some(path_prefix) = &config.path_prefix {
// Create a Trillium handler under the requested path prefix, which we'll add to
// the DAP API handler in the setup_server call below
info!(
aggregator_bound_address = ?config.listen_address,
path_prefix,
"Serving aggregator API relative to DAP API"
);
// Append wildcard so that this handler will match anything under the prefix
let path_prefix = format!("{path_prefix}/*");
handlers.1 = Some(router().all(path_prefix, handler));
Box::pin(ready(()))
} else {
unreachable!("the configuration should not have deserialized to this state")
}
}

None => Box::pin(ready(())),
};

Expand Down Expand Up @@ -164,7 +159,7 @@ fn build_aggregator_api_handler<'a>(
Arc::clone(datastore),
janus_aggregator_api::Config {
auth_tokens: aggregator_api_auth_tokens,
public_dap_url: aggregator_api.public_dap_url().clone(),
public_dap_url: aggregator_api.public_dap_url.clone(),
},
),
aggregator_api,
Expand Down Expand Up @@ -209,36 +204,44 @@ pub struct HeaderEntry {

/// Options for serving the aggregator API.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(untagged)]
pub enum AggregatorApi {
ListenAddress {
/// Address on which this server should listen for connections to the Janus aggregator API
/// and serve its API endpoints, independently from the address on which the DAP API is
/// served.
listen_address: SocketAddr,
/// Resource location at which the DAP service managed by this aggregator api can be found
/// on the public internet. Required.
public_dap_url: Url,
},
PathPrefix {
/// The Janus aggregator API will be served on the same address as the DAP API, but relative
/// to the provided prefix. e.g., if `path_prefix` is `aggregator-api`, then the DAP API's
/// uploads endpoint would be `{listen-address}/tasks/{task-id}/reports`, while task IDs
/// could be obtained from the aggregator API at `{listen-address}/aggregator-api/task_ids`.
path_prefix: String,
/// Resource location at which the DAP service managed by this aggregator api can be found
/// on the public internet. Required.
public_dap_url: Url,
},
#[serde(deny_unknown_fields)]
pub struct AggregatorApi {
/// Address on which this server should listen for connections to the Janus aggregator API
/// and serve its API endpoints, independently from the address on which the DAP API is
/// served. This is mutually exclusive with `path_prefix`.
listen_address: Option<SocketAddr>,
/// The Janus aggregator API will be served on the same address as the DAP API, but relative
/// to the provided prefix. e.g., if `path_prefix` is `aggregator-api`, then the DAP API's
/// uploads endpoint would be `{listen-address}/tasks/{task-id}/reports`, while task IDs
/// could be obtained from the aggregator API at `{listen-address}/aggregator-api/task_ids`.
/// This is mutually exclusive with `listen_address`.
path_prefix: Option<String>,
/// Resource location at which the DAP service managed by this aggregator api can be found
/// on the public internet. Required.
public_dap_url: Url,
}

impl AggregatorApi {
fn public_dap_url(&self) -> &Url {
match self {
AggregatorApi::ListenAddress { public_dap_url, .. } => public_dap_url,
AggregatorApi::PathPrefix { public_dap_url, .. } => public_dap_url,
fn deserialize_aggregator_api<'de, D>(deserializer: D) -> Result<Option<AggregatorApi>, D::Error>
where
D: Deserializer<'de>,
{
let aggregator_api: Option<AggregatorApi> = Deserialize::deserialize(deserializer)?;
if let Some(ref aggregator_api) = aggregator_api {
match (aggregator_api.listen_address, &aggregator_api.path_prefix) {
(None, None) => {
return Err(de::Error::custom(
"one of listen_address or path_prefix must be provided",
))
}
(Some(_), Some(_)) => {
return Err(de::Error::custom(
"only one of listen_address and path_prefix must be specified",
))
}
_ => {}
}
}
Ok(aggregator_api)
}

/// Non-secret configuration options for a Janus aggregator, deserialized from YAML.
Expand Down Expand Up @@ -311,6 +314,7 @@ struct Config {
listen_address: SocketAddr,

/// How to serve the Janus aggregator API. If not set, the aggregator API is not served.
#[serde(default, deserialize_with = "deserialize_aggregator_api")]
aggregator_api: Option<AggregatorApi>,

/// Additional headers that will be added to all responses.
Expand Down Expand Up @@ -398,6 +402,7 @@ impl BinaryConfig for Config {
#[cfg(test)]
mod tests {
use super::{AggregatorApi, Config, GarbageCollectorConfig, HeaderEntry, Options};
use assert_matches::assert_matches;
use clap::CommandFactory;
use janus_aggregator::{
aggregator,
Expand All @@ -423,12 +428,14 @@ mod tests {
}

#[rstest::rstest]
#[case::listen_address(AggregatorApi::ListenAddress {
listen_address: SocketAddr::from((Ipv4Addr::UNSPECIFIED, 8081)),
#[case::listen_address(AggregatorApi {
listen_address: Some(SocketAddr::from((Ipv4Addr::UNSPECIFIED, 8081))),
path_prefix: None,
public_dap_url: "https://dap.url".parse().unwrap()
})]
#[case::path_prefix(AggregatorApi::PathPrefix {
path_prefix: "prefix".to_string(),
#[case::path_prefix(AggregatorApi {
listen_address: None,
path_prefix: Some("prefix".to_string()),
public_dap_url: "https://dap.url".parse().unwrap()
})]
#[test]
Expand Down Expand Up @@ -551,8 +558,9 @@ mod tests {
)
.unwrap()
.aggregator_api,
Some(AggregatorApi::ListenAddress {
listen_address: SocketAddr::from((Ipv4Addr::UNSPECIFIED, 8081)),
Some(AggregatorApi {
listen_address: Some(SocketAddr::from((Ipv4Addr::UNSPECIFIED, 8081))),
path_prefix: None,
public_dap_url: "https://dap.url".parse().unwrap()
})
);
Expand All @@ -577,13 +585,56 @@ mod tests {
)
.unwrap()
.aggregator_api,
Some(AggregatorApi::PathPrefix {
path_prefix: "aggregator-api".to_string(),
Some(AggregatorApi {
listen_address: None,
path_prefix: Some("aggregator-api".to_string()),
public_dap_url: "https://dap.url".parse().unwrap()
})
);
}

#[test]
fn config_aggregator_mutually_exclusive() {
assert_matches!(
serde_yaml::from_str::<Config>(
r#"---
listen_address: "0.0.0.0:8080"
database:
url: "postgres://postgres:postgres@localhost:5432/postgres"
connection_pool_timeouts_secs: 60
max_upload_batch_size: 100
max_upload_batch_write_delay_ms: 250
batch_aggregation_shard_count: 32
aggregator_api:
path_prefix: "aggregator-api"
listen_address: "0.0.0.0:8081"
public_dap_url: "https://dap.url"
"#
),
Err(_)
);
}

#[test]
fn config_aggregator_api_missing_parameters() {
assert_matches!(
serde_yaml::from_str::<Config>(
r#"---
listen_address: "0.0.0.0:8080"
database:
url: "postgres://postgres:postgres@localhost:5432/postgres"
connection_pool_timeouts_secs: 60
max_upload_batch_size: 100
max_upload_batch_write_delay_ms: 250
batch_aggregation_shard_count: 32
aggregator_api:
public_dap_url: "https://dap.url"
"#
),
Err(_)
);
}

/// Check that configuration fragments in the README and other documentation can be parsed
/// correctly.
#[test]
Expand Down
7 changes: 5 additions & 2 deletions docs/samples/advanced_config/aggregator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,12 @@ listen_address: "0.0.0.0:8080"

# How to serve the Janus aggregator API. If not set, Janus aggregator API is not served. (optional)
aggregator_api:
# Socket address on which to listen for requests.
# Serve the aggregator API on an address and port that is separate from the DAP API. This is
# mutually exclusive with path_prefix.
listen_address: "0.0.0.0:8081"
# Alternately, the aggregator API may be served on `listen_address`, at an arbitrary path prefix.

# Alternatively, serve the aggregator API on the same address as the DAP API, but on a separate
# path denoted by `path_prefix`. This is mutually exclusive with listen_address.
# path_prefix: "aggregator-api"

# Resource location at which the DAP service managed by this
Expand Down

0 comments on commit 93aa835

Please sign in to comment.