Skip to content

Commit

Permalink
Upgrade testcontainers to 0.16 (#3073)
Browse files Browse the repository at this point in the history
  • Loading branch information
divergentdave authored Apr 30, 2024
1 parent 165ea48 commit 9b117d9
Show file tree
Hide file tree
Showing 13 changed files with 401 additions and 271 deletions.
317 changes: 268 additions & 49 deletions Cargo.lock

Large diffs are not rendered by default.

24 changes: 6 additions & 18 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,17 +1,5 @@
[workspace]
members = [
"aggregator",
"aggregator_api",
"aggregator_core",
"build_script_utils",
"client",
"collector",
"core",
"integration_tests",
"interop_binaries",
"messages",
"tools",
]
members = ["aggregator", "aggregator_api", "aggregator_core", "build_script_utils", "client", "collector", "core", "integration_tests", "interop_binaries", "messages", "tools"]
resolver = "2"

[workspace.package]
Expand Down Expand Up @@ -42,7 +30,7 @@ janus_core = { version = "0.5", path = "core" }
janus_integration_tests = { version = "0.5", path = "integration_tests" }
janus_interop_binaries = { version = "0.5", path = "interop_binaries" }
janus_messages = { version = "0.5", path = "messages" }
k8s-openapi = { version = "0.20.0", features = ["v1_24"] } # keep this version in sync with what is referenced by the indirect dependency via `kube`
k8s-openapi = { version = "0.20.0", features = ["v1_24"] } # keep this version in sync with what is referenced by the indirect dependency via `kube`
kube = { version = "0.87.2", default-features = false, features = ["client", "rustls-tls"] }
opentelemetry = { version = "0.22", features = ["metrics"] }
opentelemetry_sdk = { version = "0.22", features = ["metrics"] }
Expand All @@ -53,7 +41,7 @@ serde_test = "1.0.175"
serde_yaml = "0.9.34"
reqwest = { version = "0.12.4", default-features = false, features = ["rustls-tls"] }
rstest = "0.17.0"
testcontainers = "0.15.0"
testcontainers = "0.16.5"
thiserror = "1.0"
tokio = { version = "1.37", features = ["full", "tracing"] }
trillium = "0.2.19"
Expand All @@ -75,6 +63,6 @@ debug = 0
# relatively fast compilation. It is intended for use in size-constrained testing scenarios, e.g.
# building a binary artifact that ends up embedded in another binary.
inherits = "dev"
opt-level = "z" # Optimize for size.
debug = false # Do not generate debug info.
strip = true # Strip symbols from binary.
opt-level = "z" # Optimize for size.
debug = false # Do not generate debug info.
strip = true # Strip symbols from binary.
48 changes: 10 additions & 38 deletions aggregator_core/src/datastore/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,18 @@ use sqlx::{
use std::{
path::PathBuf,
str::FromStr,
sync::{Arc, Barrier, Weak},
thread::{self, JoinHandle},
sync::{Arc, Weak},
};
use testcontainers::RunnableImage;
use tokio::sync::{oneshot, Mutex};
use testcontainers::{runners::AsyncRunner, ContainerAsync, RunnableImage};
use tokio::sync::Mutex;
use tokio_postgres::{connect, Config, NoTls};
use tracing::trace;

use super::SUPPORTED_SCHEMA_VERSIONS;

struct EphemeralDatabase {
_db_container: ContainerAsync<Postgres>,
port_number: u16,
shutdown_barrier: Arc<Barrier>,
join_handle: Option<JoinHandle<()>>,
}

impl EphemeralDatabase {
Expand All @@ -49,30 +47,15 @@ impl EphemeralDatabase {
}

async fn start() -> Self {
let (port_tx, port_rx) = oneshot::channel();
let shutdown_barrier = Arc::new(Barrier::new(2));
let join_handle = thread::spawn({
let shutdown_barrier = Arc::clone(&shutdown_barrier);
move || {
// Start an instance of Postgres running in a container.
let container_client = testcontainers::clients::Cli::default();
let db_container = container_client.run(RunnableImage::from(Postgres::default()));
const POSTGRES_DEFAULT_PORT: u16 = 5432;
let port_number = db_container.get_host_port_ipv4(POSTGRES_DEFAULT_PORT);
trace!("Postgres container is up with port {port_number}");
port_tx.send(port_number).unwrap();

// Wait for the barrier as a shutdown signal.
shutdown_barrier.wait();
trace!("Shutting down Postgres container with port {port_number}");
}
});
let port_number = port_rx.await.unwrap();
// Start an instance of Postgres running in a container.
let db_container = RunnableImage::from(Postgres::default()).start().await;
const POSTGRES_DEFAULT_PORT: u16 = 5432;
let port_number = db_container.get_host_port_ipv4(POSTGRES_DEFAULT_PORT).await;
trace!("Postgres container is up with port {port_number}");

Self {
_db_container: db_container,
port_number,
shutdown_barrier,
join_handle: Some(join_handle),
}
}

Expand All @@ -84,17 +67,6 @@ impl EphemeralDatabase {
}
}

impl Drop for EphemeralDatabase {
fn drop(&mut self) {
// Wait on the shutdown barrier, which will cause the container-management thread to
// begin shutdown. Then wait for the container-management thread itself to terminate.
// This guarantees container shutdown finishes before dropping the EphemeralDatabase
// completes.
self.shutdown_barrier.wait();
self.join_handle.take().unwrap().join().unwrap();
}
}

/// EphemeralDatastore represents an ephemeral datastore instance. It has methods allowing
/// creation of Datastores, as well as the ability to retrieve the underlying connection pool.
///
Expand Down
23 changes: 2 additions & 21 deletions core/src/test_util/testcontainers.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,7 @@
//! Testing functionality that interacts with the testcontainers library.
use std::{
collections::HashMap,
sync::{Arc, Mutex, Weak},
};
use testcontainers::{clients::Cli, core::WaitFor, Image};

/// Returns a container client, possibly shared with other callers to this function.
pub fn container_client() -> Arc<Cli> {
// Once `Weak::new` is const in stable Rust, this should be replaced by a static variable
// initialized to `Mutex::new(Weak::new())`. [https://github.com/rust-lang/rust/issues/95091]
lazy_static::lazy_static! {
static ref CONTAINER_CLIENT_MU: Mutex<Weak<Cli>> = Mutex::new(Weak::new());
}

let mut container_client = CONTAINER_CLIENT_MU.lock().unwrap();
container_client.upgrade().unwrap_or_else(|| {
let client = Arc::new(Cli::default());
*container_client = Arc::downgrade(&client);
client
})
}
use std::collections::HashMap;
use testcontainers::{core::WaitFor, Image};

/// A [`testcontainers::Image`] that provides a Postgres server.
#[derive(Debug)]
Expand Down
37 changes: 17 additions & 20 deletions integration_tests/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use prio::{
use rand::random;
use serde_json::{json, Value};
use std::env;
use testcontainers::{clients::Cli, core::WaitFor, Image, RunnableImage};
use testcontainers::{core::WaitFor, runners::AsyncRunner, Image, RunnableImage};
use url::Url;

/// Extension trait to encode measurements for VDAFs as JSON objects, according to
Expand Down Expand Up @@ -149,7 +149,6 @@ pub enum ClientBackend<'a> {
/// Uploads reports by starting a containerized client implementation, and sending it requests
/// using draft-dcook-ppm-dap-interop-test-design.
Container {
container_client: &'a Cli,
container_image: InteropClient,
network: &'a str,
},
Expand All @@ -161,7 +160,7 @@ impl<'a> ClientBackend<'a> {
task_parameters: &TaskParameters,
(leader_port, helper_port): (u16, u16),
vdaf: V,
) -> anyhow::Result<ClientImplementation<'a, V>>
) -> anyhow::Result<ClientImplementation<V>>
where
V: vdaf::Client<16> + InteropClientEncoding,
{
Expand All @@ -174,25 +173,24 @@ impl<'a> ClientBackend<'a> {
.await
.map_err(Into::into),
ClientBackend::Container {
container_client,
container_image,
network,
} => Ok(ClientImplementation::new_container(
container_client,
container_image.clone(),
network,
task_parameters,
vdaf,
)),
)
.await),
}
}
}

pub struct ContainerClientImplementation<'d, V>
pub struct ContainerClientImplementation<V>
where
V: vdaf::Client<16>,
{
_container: ContainerLogsDropGuard<'d, InteropClient>,
_container: ContainerLogsDropGuard<InteropClient>,
leader: Url,
helper: Url,
task_id: TaskId,
Expand All @@ -205,23 +203,23 @@ where

/// A DAP client implementation, specialized to work with a particular VDAF. See also
/// [`ClientBackend`].
pub enum ClientImplementation<'d, V>
pub enum ClientImplementation<V>
where
V: vdaf::Client<16>,
{
InProcess { client: Client<V, RealClock> },
Container(Box<ContainerClientImplementation<'d, V>>),
Container(Box<ContainerClientImplementation<V>>),
}

impl<'d, V> ClientImplementation<'d, V>
impl<V> ClientImplementation<V>
where
V: vdaf::Client<16> + InteropClientEncoding,
{
pub async fn new_in_process(
task_parameters: &TaskParameters,
(leader_port, helper_port): (u16, u16),
vdaf: V,
) -> Result<ClientImplementation<'static, V>, janus_client::Error> {
) -> Result<ClientImplementation<V>, janus_client::Error> {
let aggregator_endpoints = task_parameters
.endpoint_fragments
.port_forwarded_endpoints(leader_port, helper_port);
Expand Down Expand Up @@ -256,22 +254,21 @@ where
Ok(ClientImplementation::InProcess { client })
}

pub fn new_container(
container_client: &'d Cli,
pub async fn new_container(
container_image: InteropClient,
network: &str,
task_parameters: &TaskParameters,
vdaf: V,
) -> Self {
let random_part = hex::encode(random::<[u8; 4]>());
let client_container_name = format!("client-{random_part}");
let container = container_client.run(
RunnableImage::from(container_image)
.with_network(network)
.with_container_name(client_container_name),
);
let container = RunnableImage::from(container_image)
.with_network(network)
.with_container_name(client_container_name)
.start()
.await;
let container = ContainerLogsDropGuard::new(container);
let host_port = container.get_host_port_ipv4(8080);
let host_port = container.get_host_port_ipv4(8080).await;
let http_client = reqwest::Client::new();
let aggregator_endpoints = task_parameters
.endpoint_fragments
Expand Down
23 changes: 13 additions & 10 deletions integration_tests/src/daphne.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,23 @@ use std::{
process::{Command, Stdio},
thread::panicking,
};
use testcontainers::{clients::Cli, Container, GenericImage, RunnableImage};
use testcontainers::{runners::AsyncRunner, ContainerAsync, GenericImage, RunnableImage};

const DAPHNE_HELPER_IMAGE_NAME_AND_TAG: &str = "cloudflare/daphne-worker-helper:sha-f6b3ef1";

/// Represents a running Daphne test instance.
pub struct Daphne<'a> {
daphne_container: Container<'a, GenericImage>,
pub struct Daphne {
daphne_container: ContainerAsync<GenericImage>,
role: Role,
port: u16,
}

impl<'a> Daphne<'a> {
impl Daphne {
const INTERNAL_SERVING_PORT: u16 = 8080;

/// Create and start a new hermetic Daphne test instance in the given Docker network, configured
/// to service the given task. The aggregator port is also exposed to the host.
pub async fn new(container_client: &'a Cli, network: &str, task: &Task) -> Daphne<'a> {
pub async fn new(network: &str, task: &Task) -> Daphne {
let image_name_and_tag = match task.role() {
Role::Leader => panic!("A leader container image for Daphne is not yet available"),
Role::Helper => DAPHNE_HELPER_IMAGE_NAME_AND_TAG,
Expand All @@ -38,8 +39,10 @@ impl<'a> Daphne<'a> {
let runnable_image = RunnableImage::from(GenericImage::new(image_name, image_tag))
.with_network(network)
.with_container_name(endpoint.host_str().unwrap());
let daphne_container = container_client.run(runnable_image);
let port = daphne_container.get_host_port_ipv4(Self::INTERNAL_SERVING_PORT);
let daphne_container = runnable_image.start().await;
let port = daphne_container
.get_host_port_ipv4(Self::INTERNAL_SERVING_PORT)
.await;

// Wait for Daphne container to begin listening on the port.
await_http_server(port).await;
Expand All @@ -61,17 +64,17 @@ impl<'a> Daphne<'a> {
Self {
daphne_container,
role,
port,
}
}

/// Returns the port of the aggregator on the host.
pub fn port(&self) -> u16 {
self.daphne_container
.get_host_port_ipv4(Self::INTERNAL_SERVING_PORT)
self.port
}
}

impl<'a> Drop for Daphne<'a> {
impl Drop for Daphne {
fn drop(&mut self) {
// We assume that if a Daphne value is dropped during a panic, we are in the middle of
// test failure. In this case, export logs if logs_path() suggests doing so.
Expand Down
31 changes: 17 additions & 14 deletions integration_tests/src/janus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,29 @@ use std::{
process::{Command, Stdio},
thread::panicking,
};
use testcontainers::{clients::Cli, Container, RunnableImage};
use testcontainers::{runners::AsyncRunner, ContainerAsync, RunnableImage};

/// Represents a running Janus test instance in a container.
pub struct Janus<'a> {
pub struct Janus {
role: Role,
container: Container<'a, Aggregator>,
container: ContainerAsync<Aggregator>,
port: u16,
}

impl<'a> Janus<'a> {
impl Janus {
/// Create and start a new hermetic Janus test instance in the given Docker network, configured
/// to service the given task. The aggregator port is also exposed to the host.
pub async fn new(container_client: &'a Cli, network: &str, task: &Task) -> Janus<'a> {
pub async fn new(network: &str, task: &Task) -> Janus {
// Start the Janus interop aggregator container running.
let endpoint = task.aggregator_url(task.role()).unwrap();
let container = container_client.run(
RunnableImage::from(Aggregator::default())
.with_network(network)
.with_container_name(endpoint.host_str().unwrap()),
);
let port = container.get_host_port_ipv4(Aggregator::INTERNAL_SERVING_PORT);
let container = RunnableImage::from(Aggregator::default())
.with_network(network)
.with_container_name(endpoint.host_str().unwrap())
.start()
.await;
let port = container
.get_host_port_ipv4(Aggregator::INTERNAL_SERVING_PORT)
.await;

// Wait for the container to start listening on its port.
await_http_server(port).await;
Expand All @@ -40,17 +43,17 @@ impl<'a> Janus<'a> {
Self {
role: *task.role(),
container,
port,
}
}

/// Returns the port of the aggregator on the host.
pub fn port(&self) -> u16 {
self.container
.get_host_port_ipv4(Aggregator::INTERNAL_SERVING_PORT)
self.port
}
}

impl<'a> Drop for Janus<'a> {
impl Drop for Janus {
fn drop(&mut self) {
// We assume that if a Janus value is dropped during a panic, we are in the middle of
// test failure. In this case, export logs if log_export_path() suggests doing so.
Expand Down
Loading

0 comments on commit 9b117d9

Please sign in to comment.