Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integration/E2E test logging improvements #2011

Merged
merged 4 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions integration_tests/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use anyhow::anyhow;
use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine};
use janus_client::{aggregator_hpke_config, default_http_client, Client, ClientParameters};
use janus_core::{time::RealClock, vdaf::VdafInstance};
use janus_interop_binaries::ContainerLogsDropGuard;
use janus_interop_binaries::{get_rust_log_level, ContainerLogsDropGuard};
use janus_messages::{Duration, Role, TaskId};
use prio::{
codec::Encode,
Expand Down Expand Up @@ -157,6 +157,7 @@ pub enum ClientBackend<'a> {
impl<'a> ClientBackend<'a> {
pub async fn build<V>(
&self,
test_name: &str,
task_parameters: &TaskParameters,
(leader_port, helper_port): (u16, u16),
vdaf: V,
Expand All @@ -177,6 +178,7 @@ impl<'a> ClientBackend<'a> {
container_image,
network,
} => Ok(ClientImplementation::new_container(
test_name,
container_client,
container_image.clone(),
network,
Expand Down Expand Up @@ -257,6 +259,7 @@ where
}

pub fn new_container(
test_name: &str,
container_client: &'d Cli,
container_image: InteropClient,
network: &str,
Expand All @@ -265,12 +268,15 @@ where
) -> Self {
let random_part = hex::encode(random::<[u8; 4]>());
let client_container_name = format!("client-{random_part}");
let container = container_client.run(
RunnableImage::from(container_image)
.with_network(network)
.with_container_name(client_container_name),
let container = ContainerLogsDropGuard::new_janus(
test_name,
container_client.run(
RunnableImage::from(container_image)
.with_network(network)
.with_env_var(get_rust_log_level())
.with_container_name(client_container_name),
),
);
let container = ContainerLogsDropGuard::new_janus(container);
let host_port = container.get_host_port_ipv4(8080);
let http_client = reqwest::Client::new();
let (leader_aggregator_endpoint, helper_aggregator_endpoint) = task_parameters
Expand Down
12 changes: 10 additions & 2 deletions integration_tests/src/daphne.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use crate::interop_api;
use janus_aggregator_core::task::{test_util::TaskBuilder, Task};
use janus_interop_binaries::{
test_util::await_http_server, ContainerLogsDropGuard, ContainerLogsSource,
get_rust_log_level, test_util::await_http_server, ContainerLogsDropGuard, ContainerLogsSource,
};
use janus_messages::{Role, Time};
use testcontainers::{clients::Cli, images::generic::GenericImage, RunnableImage};
Expand All @@ -20,7 +20,12 @@ impl<'a> Daphne<'a> {

/// Create and start a new hermetic Daphne test instance in the given Docker network, configured
/// to service the given task. The aggregator port is also exposed to the host.
pub async fn new(container_client: &'a Cli, network: &str, task: &Task) -> Daphne<'a> {
pub async fn new(
test_name: &str,
container_client: &'a Cli,
network: &str,
task: &Task,
) -> Daphne<'a> {
let (endpoint, image_name_and_tag) = match task.role() {
Role::Leader => panic!("A leader container image for Daphne is not yet available"),
Role::Helper => (
Expand All @@ -34,8 +39,11 @@ impl<'a> Daphne<'a> {
// Start the Daphne test container running.
let runnable_image = RunnableImage::from(GenericImage::new(image_name, image_tag))
.with_network(network)
// Daphne uses the DAP_TRACING environment variable for its tracing subscriber.
.with_env_var(("DAP_TRACING", get_rust_log_level().1))
.with_container_name(endpoint.host_str().unwrap());
let daphne_container = ContainerLogsDropGuard::new(
test_name,
container_client.run(runnable_image),
ContainerLogsSource::Docker,
);
Expand Down
12 changes: 10 additions & 2 deletions integration_tests/src/janus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
use crate::interop_api;
use janus_aggregator_core::task::Task;
use janus_interop_binaries::{
test_util::await_http_server, testcontainer::Aggregator, ContainerLogsDropGuard,
get_rust_log_level, test_util::await_http_server, testcontainer::Aggregator,
ContainerLogsDropGuard,
};
use janus_messages::Role;
use testcontainers::{clients::Cli, RunnableImage};
Expand All @@ -16,17 +17,24 @@ pub struct Janus<'a> {
impl<'a> Janus<'a> {
/// Create and start a new hermetic Janus test instance in the given Docker network, configured
/// to service the given task. The aggregator port is also exposed to the host.
pub async fn new(container_client: &'a Cli, network: &str, task: &Task) -> Janus<'a> {
pub async fn new(
test_name: &str,
container_client: &'a Cli,
network: &str,
task: &Task,
) -> Janus<'a> {
// Start the Janus interop aggregator container running.
let endpoint = match task.role() {
Role::Leader => task.leader_aggregator_endpoint(),
Role::Helper => task.helper_aggregator_endpoint(),
_ => panic!("unexpected task role"),
};
let container = ContainerLogsDropGuard::new_janus(
test_name,
container_client.run(
RunnableImage::from(Aggregator::default())
.with_network(network)
.with_env_var(get_rust_log_level())
.with_container_name(endpoint.host_str().unwrap()),
),
);
Expand Down
36 changes: 31 additions & 5 deletions integration_tests/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ pub async fn submit_measurements_and_verify_aggregate_generic<V>(
}

pub async fn submit_measurements_and_verify_aggregate(
test_name: &str,
task_parameters: &TaskParameters,
(leader_port, helper_port): (u16, u16),
client_backend: &ClientBackend<'_>,
Expand All @@ -242,7 +243,12 @@ pub async fn submit_measurements_and_verify_aggregate(
};

let client_implementation = client_backend
.build(task_parameters, (leader_port, helper_port), vdaf.clone())
.build(
test_name,
task_parameters,
(leader_port, helper_port),
vdaf.clone(),
)
.await
.unwrap();

Expand All @@ -269,7 +275,12 @@ pub async fn submit_measurements_and_verify_aggregate(
};

let client_implementation = client_backend
.build(task_parameters, (leader_port, helper_port), vdaf.clone())
.build(
test_name,
task_parameters,
(leader_port, helper_port),
vdaf.clone(),
)
.await
.unwrap();

Expand Down Expand Up @@ -312,7 +323,12 @@ pub async fn submit_measurements_and_verify_aggregate(
};

let client_implementation = client_backend
.build(task_parameters, (leader_port, helper_port), vdaf.clone())
.build(
test_name,
task_parameters,
(leader_port, helper_port),
vdaf.clone(),
)
.await
.unwrap();

Expand Down Expand Up @@ -346,7 +362,12 @@ pub async fn submit_measurements_and_verify_aggregate(
};

let client_implementation = client_backend
.build(task_parameters, (leader_port, helper_port), vdaf.clone())
.build(
test_name,
task_parameters,
(leader_port, helper_port),
vdaf.clone(),
)
.await
.unwrap();

Expand Down Expand Up @@ -388,7 +409,12 @@ pub async fn submit_measurements_and_verify_aggregate(
};

let client_implementation = client_backend
.build(task_parameters, (leader_port, helper_port), vdaf.clone())
.build(
test_name,
task_parameters,
(leader_port, helper_port),
vdaf.clone(),
)
.await
.unwrap();

Expand Down
12 changes: 8 additions & 4 deletions integration_tests/tests/daphne.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ mod common;
#[tokio::test(flavor = "multi_thread")]
#[ignore = "Daphne does not yet publish a leader container image"]
async fn daphne_janus() {
static TEST_NAME: &str = "daphne_janus";
install_test_trace_subscriber();

// Start servers.
Expand All @@ -35,11 +36,12 @@ async fn daphne_janus() {
.unwrap();

let container_client = container_client();
let leader = Daphne::new(&container_client, &network, &leader_task).await;
let helper = Janus::new(&container_client, &network, &helper_task).await;
let leader = Daphne::new(TEST_NAME, &container_client, &network, &leader_task).await;
let helper = Janus::new(TEST_NAME, &container_client, &network, &helper_task).await;

// Run the behavioral test.
submit_measurements_and_verify_aggregate(
TEST_NAME,
&task_parameters,
(leader.port(), helper.port()),
&ClientBackend::InProcess,
Expand All @@ -51,6 +53,7 @@ async fn daphne_janus() {
#[tokio::test(flavor = "multi_thread")]
#[ignore = "Daphne does not currently support DAP-07 (issue #1669)"]
async fn janus_daphne() {
static TEST_NAME: &str = "daphne_janus";
install_test_trace_subscriber();

// Start servers.
Expand All @@ -73,11 +76,12 @@ async fn janus_daphne() {
.unwrap();

let container_client = container_client();
let leader = Janus::new(&container_client, &network, &leader_task).await;
let helper = Daphne::new(&container_client, &network, &helper_task).await;
let leader = Janus::new(TEST_NAME, &container_client, &network, &leader_task).await;
let helper = Daphne::new(TEST_NAME, &container_client, &network, &helper_task).await;

// Run the behavioral test.
submit_measurements_and_verify_aggregate(
TEST_NAME,
&task_parameters,
(leader.port(), helper.port()),
&ClientBackend::InProcess,
Expand Down
26 changes: 21 additions & 5 deletions integration_tests/tests/divviup_ts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,24 @@ use testcontainers::clients::Cli;

mod common;

async fn run_divviup_ts_integration_test(container_client: &Cli, vdaf: VdafInstance) {
async fn run_divviup_ts_integration_test(
test_name: &str,
container_client: &Cli,
vdaf: VdafInstance,
) {
let (task_parameters, leader_task, helper_task) =
test_task_builders(vdaf, QueryType::TimeInterval);
let network = generate_network_name();
let leader = Janus::new(container_client, &network, &leader_task.build()).await;
let helper = Janus::new(container_client, &network, &helper_task.build()).await;
let leader = Janus::new(test_name, container_client, &network, &leader_task.build()).await;
let helper = Janus::new(test_name, container_client, &network, &helper_task.build()).await;

let client_backend = ClientBackend::Container {
container_client,
container_image: InteropClient::divviup_ts(),
network: &network,
};
submit_measurements_and_verify_aggregate(
test_name,
&task_parameters,
(leader.port(), helper.port()),
&client_backend,
Expand All @@ -40,15 +45,25 @@ async fn run_divviup_ts_integration_test(container_client: &Cli, vdaf: VdafInsta
async fn janus_divviup_ts_count() {
install_test_trace_subscriber();

run_divviup_ts_integration_test(&container_client(), VdafInstance::Prio3Count).await;
run_divviup_ts_integration_test(
"janus_divviup_ts_count",
&container_client(),
VdafInstance::Prio3Count,
)
.await;
}

#[tokio::test(flavor = "multi_thread")]
#[ignore = "divviup-ts does not currently support DAP-07 (issue #1669)"]
async fn janus_divviup_ts_sum() {
install_test_trace_subscriber();

run_divviup_ts_integration_test(&container_client(), VdafInstance::Prio3Sum { bits: 8 }).await;
run_divviup_ts_integration_test(
"janus_divviup_ts_sum",
&container_client(),
VdafInstance::Prio3Sum { bits: 8 },
)
.await;
}

#[tokio::test(flavor = "multi_thread")]
Expand All @@ -57,6 +72,7 @@ async fn janus_divviup_ts_histogram() {
install_test_trace_subscriber();

run_divviup_ts_integration_test(
"janus_divviup_ts_histogram",
&container_client(),
VdafInstance::Prio3Histogram {
length: 4,
Expand Down
4 changes: 4 additions & 0 deletions integration_tests/tests/in_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ async fn in_cluster_count() {

// Run the behavioral test.
submit_measurements_and_verify_aggregate(
"in_cluster_count",
&janus_pair.task_parameters,
(janus_pair.leader.port(), janus_pair.helper.port()),
&ClientBackend::InProcess,
Expand All @@ -285,6 +286,7 @@ async fn in_cluster_sum() {

// Run the behavioral test.
submit_measurements_and_verify_aggregate(
"in_cluster_sum",
&janus_pair.task_parameters,
(janus_pair.leader.port(), janus_pair.helper.port()),
&ClientBackend::InProcess,
Expand All @@ -308,6 +310,7 @@ async fn in_cluster_histogram() {

// Run the behavioral test.
submit_measurements_and_verify_aggregate(
"in_cluster_histogram",
&janus_pair.task_parameters,
(janus_pair.leader.port(), janus_pair.helper.port()),
&ClientBackend::InProcess,
Expand All @@ -331,6 +334,7 @@ async fn in_cluster_fixed_size() {

// Run the behavioral test.
submit_measurements_and_verify_aggregate(
"in_cluster_fixed_size",
&janus_pair.task_parameters,
(janus_pair.leader.port(), janus_pair.helper.port()),
&ClientBackend::InProcess,
Expand Down
Loading
Loading