Skip to content

Commit

Permalink
Add single process integration tests (#2123)
Browse files Browse the repository at this point in the history
* Add channel to feed back socket address

* Rename Janus -> JanusContainer in integration test

* Add in-process Janus instance for use in tests

* Add in-process Janus-Janus integration tests

* Add hybrid in-process/Docker test for Daphne
  • Loading branch information
divergentdave authored Oct 13, 2023
1 parent 18ce8ef commit ce4cc45
Show file tree
Hide file tree
Showing 13 changed files with 611 additions and 80 deletions.
6 changes: 4 additions & 2 deletions aggregator/src/bin/janus_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,8 +504,10 @@ mod tests {
Vec::from(["datastore-key-1".to_string(), "datastore-key-2".to_string()]);

// Keys provided at command line, not present in k8s
let mut common_options = CommonBinaryOptions::default();
common_options.datastore_keys = expected_datastore_keys.clone();
let common_options = CommonBinaryOptions {
datastore_keys: expected_datastore_keys.clone(),
..Default::default()
};

let kubernetes_secret_options = KubernetesSecretOptions {
datastore_keys_secret_name: "secret-name".to_string(),
Expand Down
2 changes: 1 addition & 1 deletion aggregator/src/binaries/aggregation_job_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub async fn main_callback(ctx: BinaryContext<RealClock, Options, Config>) -> Re
)]
pub struct Options {
#[clap(flatten)]
common: CommonBinaryOptions,
pub common: CommonBinaryOptions,
}

impl BinaryOptions for Options {
Expand Down
2 changes: 1 addition & 1 deletion aggregator/src/binaries/aggregation_job_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub async fn main_callback(ctx: BinaryContext<RealClock, Options, Config>) -> Re
)]
pub struct Options {
#[clap(flatten)]
common: CommonBinaryOptions,
pub common: CommonBinaryOptions,
}

impl BinaryOptions for Options {
Expand Down
30 changes: 26 additions & 4 deletions aggregator/src/binaries/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,34 @@ use std::{
pin::Pin,
};
use std::{iter::Iterator, net::SocketAddr, sync::Arc, time::Duration};
use tokio::{join, time::interval};
use tokio::{join, sync::watch, time::interval};
use tracing::{error, info};
use trillium::{Handler, Headers};
use trillium_router::router;
use url::Url;

pub async fn main_callback(ctx: BinaryContext<RealClock, Options, Config>) -> Result<()> {
let (sender, _) = watch::channel(None);
run_aggregator(ctx, sender).await
}

/// This produces a future that runs the aggregator and provides a [`tokio::sync::watch::Receiver`]
/// that returns the socket address that the aggregator server listens on. This is useful when
/// specifying ephemeral socket addresses.
pub fn make_callback_ephemeral_address(
ctx: BinaryContext<RealClock, Options, Config>,
) -> (
impl Future<Output = Result<()>> + Send,
watch::Receiver<Option<SocketAddr>>,
) {
let (sender, receiver) = watch::channel(None);
(run_aggregator(ctx, sender), receiver)
}

async fn run_aggregator(
ctx: BinaryContext<RealClock, Options, Config>,
sender: watch::Sender<Option<SocketAddr>>,
) -> Result<()> {
let BinaryContext {
clock,
options,
Expand Down Expand Up @@ -70,7 +91,7 @@ pub async fn main_callback(ctx: BinaryContext<RealClock, Options, Config>) -> Re
}
};

let aggregator_api_future: Pin<Box<dyn Future<Output = ()> + 'static>> =
let aggregator_api_future: Pin<Box<dyn Future<Output = ()> + Send + 'static>> =
match build_aggregator_api_handler(&options, &config, &datastore)? {
Some((handler, config)) => {
if let Some(listen_address) = config.listen_address {
Expand Down Expand Up @@ -115,6 +136,7 @@ pub async fn main_callback(ctx: BinaryContext<RealClock, Options, Config>) -> Re
)
.await
.context("failed to create aggregator server")?;
sender.send_replace(Some(aggregator_bound_address));

info!(?aggregator_bound_address, "Running aggregator");

Expand Down Expand Up @@ -166,7 +188,7 @@ fn build_aggregator_api_handler<'a>(
)]
pub struct Options {
#[clap(flatten)]
common: CommonBinaryOptions,
pub common: CommonBinaryOptions,

/// Aggregator API authentication tokens.
#[clap(
Expand All @@ -177,7 +199,7 @@ pub struct Options {
use_value_delimiter = true,
help = "aggregator API auth tokens, encoded in base64 then comma-separated"
)]
aggregator_api_auth_tokens: Vec<String>,
pub aggregator_api_auth_tokens: Vec<String>,
}

impl BinaryOptions for Options {
Expand Down
2 changes: 1 addition & 1 deletion aggregator/src/binaries/collection_job_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub async fn main_callback(ctx: BinaryContext<RealClock, Options, Config>) -> Re
)]
pub struct Options {
#[clap(flatten)]
common: CommonBinaryOptions,
pub common: CommonBinaryOptions,
}

impl BinaryOptions for Options {
Expand Down
8 changes: 4 additions & 4 deletions aggregator/src/binary_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ pub trait BinaryOptions: Parser + Debug {
}

#[cfg_attr(doc, doc = "Common options that are used by all Janus binaries.")]
#[derive(Default, Parser)]
#[derive(Default, Clone, Parser)]
pub struct CommonBinaryOptions {
/// Path to configuration YAML.
#[clap(
Expand All @@ -187,7 +187,7 @@ pub struct CommonBinaryOptions {
required(true),
help = "path to configuration file"
)]
config_file: PathBuf,
pub config_file: PathBuf,

/// Password for the PostgreSQL database connection. If specified, must not be specified in the
/// connection string.
Expand Down Expand Up @@ -220,7 +220,7 @@ pub struct CommonBinaryOptions {
value_name = "KEY=value",
use_value_delimiter = true
)]
otlp_tracing_metadata: Vec<(String, String)>,
pub otlp_tracing_metadata: Vec<(String, String)>,

/// Additional OTLP/gRPC metadata key/value pairs. (concatenated with those in the metrics
/// configuration sections)
Expand All @@ -232,7 +232,7 @@ pub struct CommonBinaryOptions {
value_name = "KEY=value",
use_value_delimiter = true
)]
otlp_metrics_metadata: Vec<(String, String)>,
pub otlp_metrics_metadata: Vec<(String, String)>,
}

impl Debug for CommonBinaryOptions {
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ where
) -> Result<ClientImplementation<'static, V>, janus_client::Error> {
let (leader_aggregator_endpoint, helper_aggregator_endpoint) = task_parameters
.endpoint_fragments
.port_forwarded_endpoints(leader_port, helper_port);
.endpoints_for_host_client(leader_port, helper_port);
let client = Client::new(
task_parameters.task_id,
leader_aggregator_endpoint,
Expand Down Expand Up @@ -260,7 +260,7 @@ where
let http_client = reqwest::Client::new();
let (leader_aggregator_endpoint, helper_aggregator_endpoint) = task_parameters
.endpoint_fragments
.container_network_endpoints();
.endpoints_for_virtual_network_client();
ClientImplementation::Container(Box::new(ContainerClientImplementation {
_container: container,
leader: leader_aggregator_endpoint,
Expand Down
Loading

0 comments on commit ce4cc45

Please sign in to comment.