Skip to content

Commit

Permalink
Use ContainerLogsDropGuard instead of custom Drop impls
Browse files Browse the repository at this point in the history
  • Loading branch information
inahga committed Sep 8, 2023
1 parent c8dd494 commit 5936c4c
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 103 deletions.
5 changes: 3 additions & 2 deletions integration_tests/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use anyhow::anyhow;
use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine};
use janus_client::{aggregator_hpke_config, default_http_client, Client, ClientParameters};
use janus_core::{task::VdafInstance, time::RealClock};
use janus_interop_binaries::ContainerLogsDropGuard;
use janus_interop_binaries::{ContainerLogsDropGuard, ContainerLogsSource};
use janus_messages::{Duration, Role, TaskId};
use prio::{
codec::Encode,
Expand Down Expand Up @@ -265,7 +265,8 @@ where
.with_network(network)
.with_container_name(client_container_name),
);
let container = ContainerLogsDropGuard::new(container);
let container =
ContainerLogsDropGuard::new(container, ContainerLogsSource::Path("/logs".to_string()));
let host_port = container.get_host_port_ipv4(8080);
let http_client = reqwest::Client::new();
let (leader_aggregator_endpoint, helper_aggregator_endpoint) = task_parameters
Expand Down
51 changes: 10 additions & 41 deletions integration_tests/src/daphne.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,17 @@
use crate::interop_api;
use janus_aggregator_core::task::{test_util::TaskBuilder, Task};
use janus_interop_binaries::log_export_path;
use janus_interop_binaries::test_util::await_http_server;
use janus_messages::{Role, Time};
use std::{
fs::{create_dir_all, File},
process::{Command, Stdio},
thread::panicking,
use janus_interop_binaries::{
test_util::await_http_server, ContainerLogsDropGuard, ContainerLogsSource,
};
use testcontainers::{clients::Cli, images::generic::GenericImage, Container, RunnableImage};
use janus_messages::{Role, Time};
use testcontainers::{clients::Cli, images::generic::GenericImage, RunnableImage};

const DAPHNE_HELPER_IMAGE_NAME_AND_TAG: &str = "cloudflare/daphne-worker-helper:sha-f6b3ef1";

/// Represents a running Daphne test instance.
pub struct Daphne<'a> {
daphne_container: Container<'a, GenericImage>,
role: Role,
daphne_container: ContainerLogsDropGuard<'a, GenericImage>,
}

impl<'a> Daphne<'a> {
Expand All @@ -40,7 +35,10 @@ impl<'a> Daphne<'a> {
let runnable_image = RunnableImage::from(GenericImage::new(image_name, image_tag))
.with_network(network)
.with_container_name(endpoint.host_str().unwrap());
let daphne_container = container_client.run(runnable_image);
let daphne_container = ContainerLogsDropGuard::new(
container_client.run(runnable_image),
ContainerLogsSource::Docker,
);
let port = daphne_container.get_host_port_ipv4(Self::INTERNAL_SERVING_PORT);

// Wait for Daphne container to begin listening on the port.
Expand All @@ -55,15 +53,11 @@ impl<'a> Daphne<'a> {
} else {
task.clone()
};
let role = *task.role();

// Write the given task to the Daphne instance we started.
interop_api::aggregator_add_task(port, task).await;

Self {
daphne_container,
role,
}
Self { daphne_container }
}

/// Returns the port of the aggregator on the host.
Expand All @@ -72,28 +66,3 @@ impl<'a> Daphne<'a> {
.get_host_port_ipv4(Self::INTERNAL_SERVING_PORT)
}
}

impl<'a> Drop for Daphne<'a> {
fn drop(&mut self) {
// We assume that if a Daphne value is dropped during a panic, we are in the middle of
// test failure. In this case, export logs if logs_path() suggests doing so.
if !panicking() {
return;
}
if let Some(mut destination_path) = log_export_path() {
destination_path.push(format!("{}-{}", self.role, self.daphne_container.id()));
create_dir_all(&destination_path).unwrap();
let docker_logs_status = Command::new("docker")
.args(["logs", "--timestamps", self.daphne_container.id()])
.stdin(Stdio::null())
.stdout(File::create(destination_path.join("stdout.log")).unwrap())
.stderr(File::create(destination_path.join("stderr.log")).unwrap())
.status()
.expect("Failed to execute `docker logs`");
assert!(
docker_logs_status.success(),
"`docker logs` failed with status {docker_logs_status:?}"
);
}
}
}
59 changes: 12 additions & 47 deletions integration_tests/src/janus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,15 @@
use crate::interop_api;
use janus_aggregator_core::task::Task;
use janus_interop_binaries::{
log_export_path, test_util::await_http_server, testcontainer::Aggregator,
test_util::await_http_server, testcontainer::Aggregator, ContainerLogsDropGuard,
ContainerLogsSource,
};
use janus_messages::Role;
use std::{
process::{Command, Stdio},
thread::panicking,
};
use testcontainers::{clients::Cli, Container, RunnableImage};
use testcontainers::{clients::Cli, RunnableImage};

/// Represents a running Janus test instance in a container.
pub struct Janus<'a> {
role: Role,
container: Container<'a, Aggregator>,
container: ContainerLogsDropGuard<'a, Aggregator>,
}

impl<'a> Janus<'a> {
Expand All @@ -28,10 +24,13 @@ impl<'a> Janus<'a> {
Role::Helper => task.helper_aggregator_endpoint(),
_ => panic!("unexpected task role"),
};
let container = container_client.run(
RunnableImage::from(Aggregator::default())
.with_network(network)
.with_container_name(endpoint.host_str().unwrap()),
let container = ContainerLogsDropGuard::new(
container_client.run(
RunnableImage::from(Aggregator::default())
.with_network(network)
.with_container_name(endpoint.host_str().unwrap()),
),
ContainerLogsSource::Path("/logs".to_string()),
);
let port = container.get_host_port_ipv4(Aggregator::INTERNAL_SERVING_PORT);

Expand All @@ -41,10 +40,7 @@ impl<'a> Janus<'a> {
// Write the given task to the Janus instance we started.
interop_api::aggregator_add_task(port, task.clone()).await;

Self {
role: *task.role(),
container,
}
Self { container }
}

/// Returns the port of the aggregator on the host.
Expand All @@ -53,34 +49,3 @@ impl<'a> Janus<'a> {
.get_host_port_ipv4(Aggregator::INTERNAL_SERVING_PORT)
}
}

impl<'a> Drop for Janus<'a> {
fn drop(&mut self) {
// We assume that if a Janus value is dropped during a panic, we are in the middle of
// test failure. In this case, export logs if log_export_path() suggests doing so.

if !panicking() {
return;
}
if let Some(mut destination_path) = log_export_path() {
destination_path.push(format!("{}-{}", self.role, self.container.id()));
if let Ok(docker_cp_status) = Command::new("docker")
.args([
"cp",
&format!("{}:logs/", self.container.id()),
destination_path.as_os_str().to_str().unwrap(),
])
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.status()
{
if !docker_cp_status.success() {
println!("`docker cp` failed with status {docker_cp_status:?}");
}
} else {
println!("Failed to execute `docker cp`");
}
}
}
}
2 changes: 1 addition & 1 deletion integration_tests/tests/daphne.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ async fn daphne_janus() {

// This test places Janus in the leader role & Daphne in the helper role.
#[tokio::test(flavor = "multi_thread")]
#[ignore = "Daphne does not currently support DAP-05 (issue #1669)"]
// #[ignore = "Daphne does not currently support DAP-05 (issue #1669)"]
async fn janus_daphne() {
install_test_trace_subscriber();

Expand Down
51 changes: 40 additions & 11 deletions interop_binaries/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ use std::{
collections::HashMap,
env::{self, VarError},
fmt::Display,
fs::create_dir_all,
fs::{create_dir_all, File},
io::{stderr, Write},
marker::PhantomData,
ops::Deref,
path::PathBuf,
process::Command,
process::{Command, Stdio},
str::FromStr,
sync::Arc,
thread::panicking,
Expand Down Expand Up @@ -390,11 +390,19 @@ struct ContainerInspectEntry {

pub struct ContainerLogsDropGuard<'d, I: Image> {
container: Container<'d, I>,
source: ContainerLogsSource,
}

pub enum ContainerLogsSource {
/// Logs can be gathered through the `docker logs` command.
Docker,
/// Logs are present inside the container at the given path.
Path(String),
}

impl<'d, I: Image> ContainerLogsDropGuard<'d, I> {
pub fn new(container: Container<I>) -> ContainerLogsDropGuard<I> {
ContainerLogsDropGuard { container }
pub fn new(container: Container<I>, source: ContainerLogsSource) -> ContainerLogsDropGuard<I> {
ContainerLogsDropGuard { container, source }
}
}

Expand All @@ -403,6 +411,12 @@ impl<'d, I: Image> Drop for ContainerLogsDropGuard<'d, I> {
if !panicking() {
return;
}
// If we're panicking then we're probably in the middle of test failure. In this case,
// export logs if log_export_path() suggests doing so.
//
// The unwraps in this code block would induce a double panic, but we accept this risk
// since it happens only in test code. This is also our main method of debugging
// integration tests, so if it's broken we should be alerted and have it fixed ASAP.
if let Some(base_dir) = log_export_path() {
create_dir_all(&base_dir).expect("could not create log output directory");

Expand All @@ -427,13 +441,28 @@ impl<'d, I: Image> Drop for ContainerLogsDropGuard<'d, I> {

let destination = base_dir.join(name);

let copy_status = Command::new("docker")
.arg("cp")
.arg(format!("{id}:/logs"))
.arg(destination)
.status()
.expect("running `docker cp` failed");
assert!(copy_status.success());
let command_status = match self.source {
ContainerLogsSource::Docker => {
create_dir_all(&destination).unwrap();
Command::new("docker")
.args(["logs", "--timestamps", id])
.stdin(Stdio::null())
.stdout(File::create(destination.join("stdout.log")).unwrap())
.stderr(File::create(destination.join("stderr.log")).unwrap())
.status()
.expect("running `docker logs` failed")
}
ContainerLogsSource::Path(ref path) => Command::new("docker")
.arg("cp")
.arg(format!("{id}:{path}"))
.arg(destination)
.status()
.expect("running `docker cp` failed"),
};
assert!(
command_status.success(),
"log extraction failed {command_status:?}"
);
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion interop_binaries/tests/end_to_end.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use janus_core::{
use janus_interop_binaries::{
test_util::{await_ready_ok, generate_network_name, generate_unique_name},
testcontainer::{Aggregator, Client, Collector},
ContainerLogsDropGuard,
ContainerLogsDropGuard, ContainerLogsSource,
};
use janus_messages::{
query_type::{FixedSize, QueryType, TimeInterval},
Expand Down Expand Up @@ -66,6 +66,7 @@ async fn run(
.with_network(&network)
.with_container_name(generate_unique_name("client")),
),
ContainerLogsSource::Path("/logs".to_string()),
);
let client_port = client_container.get_host_port_ipv4(Client::INTERNAL_SERVING_PORT);

Expand All @@ -76,6 +77,7 @@ async fn run(
.with_network(&network)
.with_container_name(leader_name.clone()),
),
ContainerLogsSource::Path("/logs".to_string()),
);
let leader_port = leader_container.get_host_port_ipv4(Aggregator::INTERNAL_SERVING_PORT);

Expand All @@ -86,6 +88,7 @@ async fn run(
.with_network(&network)
.with_container_name(helper_name.clone()),
),
ContainerLogsSource::Path("/logs".to_string()),
);
let helper_port = helper_container.get_host_port_ipv4(Aggregator::INTERNAL_SERVING_PORT);

Expand All @@ -95,6 +98,7 @@ async fn run(
.with_network(&network)
.with_container_name(generate_unique_name("collector")),
),
ContainerLogsSource::Path("/logs".to_string()),
);
let collector_port = collector_container.get_host_port_ipv4(Collector::INTERNAL_SERVING_PORT);

Expand Down

0 comments on commit 5936c4c

Please sign in to comment.