Skip to content

Commit

Permalink
Simplify cluster sandbox API
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Aug 27, 2024
1 parent d3346fb commit 49fd9bf
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,31 +60,12 @@ pub struct TestNodeConfig {
pub enable_otlp: bool,
}

/// A test environment where you can start a Quickwit cluster and use the gRPC
/// or REST clients to test it.
pub struct ClusterSandbox {
pub node_configs: Vec<(NodeConfig, HashSet<QuickwitService>)>,
pub searcher_rest_client: QuickwitClient,
pub indexer_rest_client: QuickwitClient,
pub trace_client: TraceServiceClient<tonic::transport::Channel>,
pub logs_client: LogsServiceClient<tonic::transport::Channel>,
pub jaeger_client: SpanReaderPluginClient<tonic::transport::Channel>,
_temp_dir: TempDir,
node_shutdown_handles: Vec<NodeShutdownHandle>,
}

pub struct ClusterSandboxConfig {
temp_dir: TempDir,
node_configs: Vec<(NodeConfig, HashSet<QuickwitService>)>,
tcp_listener_resolver: TestTcpListenerResolver,
}

pub struct ClusterSandboxConfigBuilder {
pub struct ClusterSandboxBuilder {
temp_dir: TempDir,
node_configs: Vec<TestNodeConfig>,
}

impl Default for ClusterSandboxConfigBuilder {
impl Default for ClusterSandboxBuilder {
fn default() -> Self {
Self {
temp_dir: tempfile::tempdir().unwrap(),
Expand All @@ -93,7 +74,7 @@ impl Default for ClusterSandboxConfigBuilder {
}
}

impl ClusterSandboxConfigBuilder {
impl ClusterSandboxBuilder {
pub fn add_node(mut self, services: impl IntoIterator<Item = QuickwitService>) -> Self {
self.node_configs.push(TestNodeConfig {
services: HashSet::from_iter(services),
Expand All @@ -113,14 +94,6 @@ impl ClusterSandboxConfigBuilder {
self
}

/// Builds a config that runs all the services in a single process
pub async fn build_standalone() -> ClusterSandboxConfig {
ClusterSandboxConfigBuilder::default()
.add_node(QuickwitService::supported_services())
.build()
.await
}

/// Builds a list of of [`NodeConfig`] from the node definitions added to
/// builder. For each node, a [`NodeConfig`] is built with the right
/// parameters such that we will be able to run `quickwit_serve` on them and
Expand All @@ -129,7 +102,7 @@ impl ClusterSandboxConfigBuilder {
/// - `metastore_uri` defined by `root_data_dir/metastore`.
/// - `default_index_root_uri` defined by `root_data_dir/indexes`.
/// - `peers` defined by others nodes `gossip_advertise_addr`.
pub async fn build(self) -> ClusterSandboxConfig {
async fn build_config(self) -> ResolvedClusterConfig {
let root_data_dir = self.temp_dir.path().to_path_buf();
let cluster_id = new_coolid("test-cluster");
let mut resolved_node_configs = Vec::new();
Expand Down Expand Up @@ -166,15 +139,36 @@ impl ClusterSandboxConfigBuilder {
.filter(|seed| *seed != node_config.0.gossip_advertise_addr.to_string())
.collect_vec();
}
ClusterSandboxConfig {
ResolvedClusterConfig {
temp_dir: self.temp_dir,
node_configs: resolved_node_configs,
tcp_listener_resolver,
}
}

pub async fn build_and_start(self) -> ClusterSandbox {
self.build_config().await.start().await
}

pub async fn build_and_start_standalone() -> ClusterSandbox {
ClusterSandboxBuilder::default()
.add_node(QuickwitService::supported_services())
.build_config()
.await
.start()
.await
}
}

/// Intermediate state where the ports of all the the test cluster nodes have
/// been reserved and the configurations have been generated.
struct ResolvedClusterConfig {
temp_dir: TempDir,
node_configs: Vec<(NodeConfig, HashSet<QuickwitService>)>,
tcp_listener_resolver: TestTcpListenerResolver,
}

impl ClusterSandboxConfig {
impl ResolvedClusterConfig {
/// Start a cluster using this config and waits for the nodes to be ready
pub async fn start(self) -> ClusterSandbox {
let mut node_shutdown_handles = Vec::new();
Expand Down Expand Up @@ -301,6 +295,19 @@ pub(crate) async fn ingest_with_retry(
Ok(())
}

/// A test environment where you can start a Quickwit cluster and use the gRPC
/// or REST clients to test it.
pub struct ClusterSandbox {
pub node_configs: Vec<(NodeConfig, HashSet<QuickwitService>)>,
pub searcher_rest_client: QuickwitClient,
pub indexer_rest_client: QuickwitClient,
pub trace_client: TraceServiceClient<tonic::transport::Channel>,
pub logs_client: LogsServiceClient<tonic::transport::Channel>,
pub jaeger_client: SpanReaderPluginClient<tonic::transport::Channel>,
_temp_dir: TempDir,
node_shutdown_handles: Vec<NodeShutdownHandle>,
}

impl ClusterSandbox {
pub fn enable_ingest_v2(&mut self) {
self.indexer_rest_client.enable_ingest_v2();
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-integration-tests/src/test_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@
mod cluster_sandbox;
mod shutdown;

pub(crate) use cluster_sandbox::{ingest_with_retry, ClusterSandbox, ClusterSandboxConfigBuilder};
pub(crate) use cluster_sandbox::{ingest_with_retry, ClusterSandbox, ClusterSandboxBuilder};
18 changes: 5 additions & 13 deletions quickwit/quickwit-integration-tests/src/tests/basic_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use quickwit_rest_client::models::IngestSource;
use quickwit_rest_client::rest_client::CommitType;
use quickwit_serve::SearchRequestQueryString;

use crate::test_utils::{ingest_with_retry, ClusterSandboxConfigBuilder};
use crate::test_utils::{ingest_with_retry, ClusterSandboxBuilder};

fn get_ndjson_filepath(ndjson_dataset_filename: &str) -> String {
format!(
Expand All @@ -40,10 +40,7 @@ fn get_ndjson_filepath(ndjson_dataset_filename: &str) -> String {
#[tokio::test]
async fn test_ui_redirect_on_get() {
quickwit_common::setup_logging_for_tests();
let sandbox = ClusterSandboxConfigBuilder::build_standalone()
.await
.start()
.await;
let sandbox = ClusterSandboxBuilder::build_and_start_standalone().await;
let node_config = sandbox.node_configs.first().unwrap();
let client = hyper::Client::builder()
.pool_idle_timeout(Duration::from_secs(30))
Expand All @@ -67,10 +64,7 @@ async fn test_ui_redirect_on_get() {
#[tokio::test]
async fn test_standalone_server() {
quickwit_common::setup_logging_for_tests();
let sandbox = ClusterSandboxConfigBuilder::build_standalone()
.await
.start()
.await;
let sandbox = ClusterSandboxBuilder::build_and_start_standalone().await;
{
// The indexing service should be running.
let counters = sandbox
Expand Down Expand Up @@ -127,15 +121,13 @@ async fn test_standalone_server() {
#[tokio::test]
async fn test_multi_nodes_cluster() {
quickwit_common::setup_logging_for_tests();
let sandbox = ClusterSandboxConfigBuilder::default()
let sandbox = ClusterSandboxBuilder::default()
.add_node([QuickwitService::Searcher])
.add_node([QuickwitService::Metastore])
.add_node([QuickwitService::Indexer])
.add_node([QuickwitService::ControlPlane])
.add_node([QuickwitService::Janitor])
.build()
.await
.start()
.build_and_start()
.await;

{
Expand Down
47 changes: 14 additions & 33 deletions quickwit/quickwit-integration-tests/src/tests/ingest_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use quickwit_rest_client::rest_client::CommitType;
use serde_json::json;

use crate::ingest_json;
use crate::test_utils::{ingest_with_retry, ClusterSandboxConfigBuilder};
use crate::test_utils::{ingest_with_retry, ClusterSandboxBuilder};

fn initialize_tests() {
quickwit_common::setup_logging_for_tests();
Expand All @@ -38,10 +38,7 @@ fn initialize_tests() {
#[tokio::test]
async fn test_single_node_cluster() {
initialize_tests();
let mut sandbox = ClusterSandboxConfigBuilder::build_standalone()
.await
.start()
.await;
let mut sandbox = ClusterSandboxBuilder::build_and_start_standalone().await;
let index_id = "test-single-node-cluster";
let index_config = format!(
r#"
Expand Down Expand Up @@ -204,17 +201,15 @@ async fn test_single_node_cluster() {
#[tokio::test]
async fn test_ingest_v2_index_not_found() {
initialize_tests();
let mut sandbox = ClusterSandboxConfigBuilder::default()
let mut sandbox = ClusterSandboxBuilder::default()
.add_node([QuickwitService::Indexer, QuickwitService::Janitor])
.add_node([QuickwitService::Indexer, QuickwitService::Janitor])
.add_node([
QuickwitService::ControlPlane,
QuickwitService::Metastore,
QuickwitService::Searcher,
])
.build()
.await
.start()
.build_and_start()
.await;
sandbox.enable_ingest_v2();
let missing_index_err: Error = sandbox
Expand All @@ -241,17 +236,15 @@ async fn test_ingest_v2_index_not_found() {
#[tokio::test]
async fn test_ingest_v2_happy_path() {
initialize_tests();
let mut sandbox = ClusterSandboxConfigBuilder::default()
let mut sandbox = ClusterSandboxBuilder::default()
.add_node([QuickwitService::Indexer, QuickwitService::Janitor])
.add_node([QuickwitService::Indexer, QuickwitService::Janitor])
.add_node([
QuickwitService::ControlPlane,
QuickwitService::Metastore,
QuickwitService::Searcher,
])
.build()
.await
.start()
.build_and_start()
.await;
sandbox.enable_ingest_v2();
let index_id = "test_happy_path";
Expand Down Expand Up @@ -326,10 +319,7 @@ async fn test_ingest_v2_happy_path() {
#[tokio::test]
async fn test_commit_modes() {
initialize_tests();
let sandbox = ClusterSandboxConfigBuilder::build_standalone()
.await
.start()
.await;
let sandbox = ClusterSandboxBuilder::build_and_start_standalone().await;
let index_id = "test_commit_modes";
let index_config = format!(
r#"
Expand Down Expand Up @@ -411,15 +401,13 @@ async fn test_commit_modes() {
#[tokio::test]
async fn test_very_large_index_name() {
initialize_tests();
let mut sandbox = ClusterSandboxConfigBuilder::default()
let mut sandbox = ClusterSandboxBuilder::default()
.add_node([QuickwitService::Searcher])
.add_node([QuickwitService::Metastore])
.add_node([QuickwitService::Indexer])
.add_node([QuickwitService::ControlPlane])
.add_node([QuickwitService::Janitor])
.build()
.await
.start()
.build_and_start()
.await;
sandbox.enable_ingest_v2();

Expand Down Expand Up @@ -512,10 +500,7 @@ async fn test_very_large_index_name() {
#[tokio::test]
async fn test_shutdown_single_node() {
initialize_tests();
let mut sandbox = ClusterSandboxConfigBuilder::build_standalone()
.await
.start()
.await;
let mut sandbox = ClusterSandboxBuilder::build_and_start_standalone().await;
let index_id = "test_shutdown_single_node";

sandbox.enable_ingest_v2();
Expand Down Expand Up @@ -577,17 +562,15 @@ async fn test_shutdown_single_node() {
#[tokio::test]
async fn test_shutdown_control_plane_early_shutdown() {
initialize_tests();
let sandbox = ClusterSandboxConfigBuilder::default()
let sandbox = ClusterSandboxBuilder::default()
.add_node([QuickwitService::Indexer])
.add_node([
QuickwitService::ControlPlane,
QuickwitService::Searcher,
QuickwitService::Metastore,
QuickwitService::Janitor,
])
.build()
.await
.start()
.build_and_start()
.await;
let index_id = "test_shutdown_separate_indexer";

Expand Down Expand Up @@ -638,17 +621,15 @@ async fn test_shutdown_control_plane_early_shutdown() {
#[tokio::test]
async fn test_shutdown_separate_indexer() {
initialize_tests();
let sandbox = ClusterSandboxConfigBuilder::default()
let sandbox = ClusterSandboxBuilder::default()
.add_node([QuickwitService::Indexer])
.add_node([
QuickwitService::ControlPlane,
QuickwitService::Searcher,
QuickwitService::Metastore,
QuickwitService::Janitor,
])
.build()
.await
.start()
.build_and_start()
.await;
let index_id = "test_shutdown_separate_indexer";

Expand Down
20 changes: 7 additions & 13 deletions quickwit/quickwit-integration-tests/src/tests/otlp_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use quickwit_proto::opentelemetry::proto::logs::v1::{LogRecord, ResourceLogs, Sc
use quickwit_proto::opentelemetry::proto::trace::v1::{ResourceSpans, ScopeSpans, Span};
use tonic::codec::CompressionEncoding;

use crate::test_utils::ClusterSandboxConfigBuilder;
use crate::test_utils::ClusterSandboxBuilder;

fn initialize_tests() {
quickwit_common::setup_logging_for_tests();
Expand All @@ -47,15 +47,13 @@ fn initialize_tests() {
#[tokio::test]
async fn test_ingest_traces_with_otlp_grpc_api() {
initialize_tests();
let mut sandbox = ClusterSandboxConfigBuilder::default()
let mut sandbox = ClusterSandboxBuilder::default()
.add_node([QuickwitService::Searcher])
.add_node([QuickwitService::Metastore])
.add_node_with_otlp([QuickwitService::Indexer])
.add_node([QuickwitService::ControlPlane])
.add_node([QuickwitService::Janitor])
.build()
.await
.start()
.build_and_start()
.await;
// Wait for the pipelines to start (one for logs and one for traces)
sandbox.wait_for_indexing_pipelines(2).await.unwrap();
Expand Down Expand Up @@ -142,15 +140,13 @@ async fn test_ingest_traces_with_otlp_grpc_api() {
#[tokio::test]
async fn test_ingest_logs_with_otlp_grpc_api() {
initialize_tests();
let mut sandbox = ClusterSandboxConfigBuilder::default()
let mut sandbox = ClusterSandboxBuilder::default()
.add_node([QuickwitService::Searcher])
.add_node([QuickwitService::Metastore])
.add_node_with_otlp([QuickwitService::Indexer])
.add_node([QuickwitService::ControlPlane])
.add_node([QuickwitService::Janitor])
.build()
.await
.start()
.build_and_start()
.await;
// Wait fo the pipelines to start (one for logs and one for traces)
sandbox.wait_for_indexing_pipelines(2).await.unwrap();
Expand Down Expand Up @@ -218,15 +214,13 @@ async fn test_ingest_logs_with_otlp_grpc_api() {
#[tokio::test]
async fn test_jaeger_api() {
initialize_tests();
let mut sandbox = ClusterSandboxConfigBuilder::default()
let mut sandbox = ClusterSandboxBuilder::default()
.add_node([QuickwitService::Searcher])
.add_node([QuickwitService::Metastore])
.add_node_with_otlp([QuickwitService::Indexer])
.add_node([QuickwitService::ControlPlane])
.add_node([QuickwitService::Janitor])
.build()
.await
.start()
.build_and_start()
.await;
// Wait fo the pipelines to start (one for logs and one for traces)
sandbox.wait_for_indexing_pipelines(2).await.unwrap();
Expand Down
Loading

0 comments on commit 49fd9bf

Please sign in to comment.