Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use ContainerLogsDropGuard instead of custom Drop impls #1804

Merged
merged 2 commits into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion integration_tests/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ where
.with_network(network)
.with_container_name(client_container_name),
);
let container = ContainerLogsDropGuard::new(container);
let container = ContainerLogsDropGuard::new_janus(container);
let host_port = container.get_host_port_ipv4(8080);
let http_client = reqwest::Client::new();
let (leader_aggregator_endpoint, helper_aggregator_endpoint) = task_parameters
Expand Down
51 changes: 10 additions & 41 deletions integration_tests/src/daphne.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,17 @@

use crate::interop_api;
use janus_aggregator_core::task::{test_util::TaskBuilder, Task};
use janus_interop_binaries::log_export_path;
use janus_interop_binaries::test_util::await_http_server;
use janus_messages::{Role, Time};
use std::{
fs::{create_dir_all, File},
process::{Command, Stdio},
thread::panicking,
use janus_interop_binaries::{
test_util::await_http_server, ContainerLogsDropGuard, ContainerLogsSource,
};
use testcontainers::{clients::Cli, images::generic::GenericImage, Container, RunnableImage};
use janus_messages::{Role, Time};
use testcontainers::{clients::Cli, images::generic::GenericImage, RunnableImage};

const DAPHNE_HELPER_IMAGE_NAME_AND_TAG: &str = "cloudflare/daphne-worker-helper:sha-f6b3ef1";

/// Represents a running Daphne test instance.
pub struct Daphne<'a> {
daphne_container: Container<'a, GenericImage>,
role: Role,
daphne_container: ContainerLogsDropGuard<'a, GenericImage>,
}

impl<'a> Daphne<'a> {
Expand All @@ -40,7 +35,10 @@ impl<'a> Daphne<'a> {
let runnable_image = RunnableImage::from(GenericImage::new(image_name, image_tag))
.with_network(network)
.with_container_name(endpoint.host_str().unwrap());
let daphne_container = container_client.run(runnable_image);
let daphne_container = ContainerLogsDropGuard::new(
container_client.run(runnable_image),
ContainerLogsSource::Docker,
);
let port = daphne_container.get_host_port_ipv4(Self::INTERNAL_SERVING_PORT);

// Wait for Daphne container to begin listening on the port.
Expand All @@ -55,15 +53,11 @@ impl<'a> Daphne<'a> {
} else {
task.clone()
};
let role = *task.role();

// Write the given task to the Daphne instance we started.
interop_api::aggregator_add_task(port, task).await;

Self {
daphne_container,
role,
}
Self { daphne_container }
}

/// Returns the port of the aggregator on the host.
Expand All @@ -72,28 +66,3 @@ impl<'a> Daphne<'a> {
.get_host_port_ipv4(Self::INTERNAL_SERVING_PORT)
}
}

impl<'a> Drop for Daphne<'a> {
fn drop(&mut self) {
// We assume that if a Daphne value is dropped during a panic, we are in the middle of
// test failure. In this case, export logs if logs_path() suggests doing so.
if !panicking() {
return;
}
if let Some(mut destination_path) = log_export_path() {
destination_path.push(format!("{}-{}", self.role, self.daphne_container.id()));
create_dir_all(&destination_path).unwrap();
let docker_logs_status = Command::new("docker")
.args(["logs", "--timestamps", self.daphne_container.id()])
.stdin(Stdio::null())
.stdout(File::create(destination_path.join("stdout.log")).unwrap())
.stderr(File::create(destination_path.join("stderr.log")).unwrap())
.status()
.expect("Failed to execute `docker logs`");
assert!(
docker_logs_status.success(),
"`docker logs` failed with status {docker_logs_status:?}"
);
}
}
}
57 changes: 10 additions & 47 deletions integration_tests/src/janus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,14 @@
use crate::interop_api;
use janus_aggregator_core::task::Task;
use janus_interop_binaries::{
log_export_path, test_util::await_http_server, testcontainer::Aggregator,
test_util::await_http_server, testcontainer::Aggregator, ContainerLogsDropGuard,
};
use janus_messages::Role;
use std::{
process::{Command, Stdio},
thread::panicking,
};
use testcontainers::{clients::Cli, Container, RunnableImage};
use testcontainers::{clients::Cli, RunnableImage};

/// Represents a running Janus test instance in a container.
pub struct Janus<'a> {
role: Role,
container: Container<'a, Aggregator>,
container: ContainerLogsDropGuard<'a, Aggregator>,
}

impl<'a> Janus<'a> {
Expand All @@ -28,10 +23,12 @@ impl<'a> Janus<'a> {
Role::Helper => task.helper_aggregator_endpoint(),
_ => panic!("unexpected task role"),
};
let container = container_client.run(
RunnableImage::from(Aggregator::default())
.with_network(network)
.with_container_name(endpoint.host_str().unwrap()),
let container = ContainerLogsDropGuard::new_janus(
container_client.run(
RunnableImage::from(Aggregator::default())
.with_network(network)
.with_container_name(endpoint.host_str().unwrap()),
),
);
let port = container.get_host_port_ipv4(Aggregator::INTERNAL_SERVING_PORT);

Expand All @@ -41,10 +38,7 @@ impl<'a> Janus<'a> {
// Write the given task to the Janus instance we started.
interop_api::aggregator_add_task(port, task.clone()).await;

Self {
role: *task.role(),
container,
}
Self { container }
}

/// Returns the port of the aggregator on the host.
Expand All @@ -53,34 +47,3 @@ impl<'a> Janus<'a> {
.get_host_port_ipv4(Aggregator::INTERNAL_SERVING_PORT)
}
}

impl<'a> Drop for Janus<'a> {
fn drop(&mut self) {
// We assume that if a Janus value is dropped during a panic, we are in the middle of
// test failure. In this case, export logs if log_export_path() suggests doing so.

if !panicking() {
return;
}
if let Some(mut destination_path) = log_export_path() {
destination_path.push(format!("{}-{}", self.role, self.container.id()));
if let Ok(docker_cp_status) = Command::new("docker")
.args([
"cp",
&format!("{}:logs/", self.container.id()),
destination_path.as_os_str().to_str().unwrap(),
])
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.status()
{
if !docker_cp_status.success() {
println!("`docker cp` failed with status {docker_cp_status:?}");
}
} else {
println!("Failed to execute `docker cp`");
}
}
}
}
58 changes: 47 additions & 11 deletions interop_binaries/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ use std::{
collections::HashMap,
env::{self, VarError},
fmt::Display,
fs::create_dir_all,
fs::{create_dir_all, File},
io::{stderr, Write},
marker::PhantomData,
ops::Deref,
path::PathBuf,
process::Command,
process::{Command, Stdio},
str::FromStr,
sync::Arc,
thread::panicking,
Expand Down Expand Up @@ -390,11 +390,26 @@ struct ContainerInspectEntry {

pub struct ContainerLogsDropGuard<'d, I: Image> {
container: Container<'d, I>,
source: ContainerLogsSource,
}

pub enum ContainerLogsSource {
/// Logs can be gathered through the `docker logs` command.
Docker,
/// Logs are present inside the container at the given path.
Path(String),
}

impl<'d, I: Image> ContainerLogsDropGuard<'d, I> {
pub fn new(container: Container<I>) -> ContainerLogsDropGuard<I> {
ContainerLogsDropGuard { container }
pub fn new(container: Container<I>, source: ContainerLogsSource) -> ContainerLogsDropGuard<I> {
ContainerLogsDropGuard { container, source }
}

pub fn new_janus(container: Container<I>) -> ContainerLogsDropGuard<I> {
ContainerLogsDropGuard {
container,
source: ContainerLogsSource::Path("/logs".to_string()),
}
}
}

Expand All @@ -403,6 +418,12 @@ impl<'d, I: Image> Drop for ContainerLogsDropGuard<'d, I> {
if !panicking() {
return;
}
// If we're panicking then we're probably in the middle of test failure. In this case,
// export logs if log_export_path() suggests doing so.
//
// The unwraps in this code block would induce a double panic, but we accept this risk
// since it happens only in test code. This is also our main method of debugging
// integration tests, so if it's broken we should be alerted and have it fixed ASAP.
if let Some(base_dir) = log_export_path() {
create_dir_all(&base_dir).expect("could not create log output directory");

Expand All @@ -427,13 +448,28 @@ impl<'d, I: Image> Drop for ContainerLogsDropGuard<'d, I> {

let destination = base_dir.join(name);

let copy_status = Command::new("docker")
.arg("cp")
.arg(format!("{id}:/logs"))
.arg(destination)
.status()
.expect("running `docker cp` failed");
assert!(copy_status.success());
let command_status = match self.source {
ContainerLogsSource::Docker => {
create_dir_all(&destination).unwrap();
Command::new("docker")
.args(["logs", "--timestamps", id])
.stdin(Stdio::null())
.stdout(File::create(destination.join("stdout.log")).unwrap())
.stderr(File::create(destination.join("stderr.log")).unwrap())
.status()
.expect("running `docker logs` failed")
}
ContainerLogsSource::Path(ref path) => Command::new("docker")
.arg("cp")
.arg(format!("{id}:{path}"))
.arg(destination)
.status()
.expect("running `docker cp` failed"),
};
assert!(
command_status.success(),
"log extraction failed {command_status:?}"
);
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions interop_binaries/tests/end_to_end.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ async fn run(
let container_client = container_client();
let network = generate_network_name();

let client_container = ContainerLogsDropGuard::new(
let client_container = ContainerLogsDropGuard::new_janus(
container_client.run(
RunnableImage::from(Client::default())
.with_network(&network)
Expand All @@ -70,7 +70,7 @@ async fn run(
let client_port = client_container.get_host_port_ipv4(Client::INTERNAL_SERVING_PORT);

let leader_name = generate_unique_name("leader");
let leader_container = ContainerLogsDropGuard::new(
let leader_container = ContainerLogsDropGuard::new_janus(
container_client.run(
RunnableImage::from(Aggregator::default())
.with_network(&network)
Expand All @@ -80,7 +80,7 @@ async fn run(
let leader_port = leader_container.get_host_port_ipv4(Aggregator::INTERNAL_SERVING_PORT);

let helper_name = generate_unique_name("helper");
let helper_container = ContainerLogsDropGuard::new(
let helper_container = ContainerLogsDropGuard::new_janus(
container_client.run(
RunnableImage::from(Aggregator::default())
.with_network(&network)
Expand All @@ -89,7 +89,7 @@ async fn run(
);
let helper_port = helper_container.get_host_port_ipv4(Aggregator::INTERNAL_SERVING_PORT);

let collector_container = ContainerLogsDropGuard::new(
let collector_container = ContainerLogsDropGuard::new_janus(
container_client.run(
RunnableImage::from(Collector::default())
.with_network(&network)
Expand Down