From 49fd9bf17b6c9c72624f3526edf1e0b0417ef4df Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Tue, 27 Aug 2024 09:45:22 +0200 Subject: [PATCH] Simplify cluster sandbox API --- .../src/test_utils/cluster_sandbox.rs | 73 ++++++++++--------- .../src/test_utils/mod.rs | 2 +- .../src/tests/basic_tests.rs | 18 ++--- .../src/tests/ingest_tests.rs | 47 ++++-------- .../src/tests/otlp_tests.rs | 20 ++--- .../src/tests/sqs_tests.rs | 7 +- .../tests/update_tests/doc_mapping_tests.rs | 7 +- .../update_tests/search_settings_tests.rs | 8 +- 8 files changed, 74 insertions(+), 108 deletions(-) diff --git a/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs b/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs index f6631e8114d..d529ecc112b 100644 --- a/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs +++ b/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs @@ -60,31 +60,12 @@ pub struct TestNodeConfig { pub enable_otlp: bool, } -/// A test environment where you can start a Quickwit cluster and use the gRPC -/// or REST clients to test it. -pub struct ClusterSandbox { - pub node_configs: Vec<(NodeConfig, HashSet)>, - pub searcher_rest_client: QuickwitClient, - pub indexer_rest_client: QuickwitClient, - pub trace_client: TraceServiceClient, - pub logs_client: LogsServiceClient, - pub jaeger_client: SpanReaderPluginClient, - _temp_dir: TempDir, - node_shutdown_handles: Vec, -} - -pub struct ClusterSandboxConfig { - temp_dir: TempDir, - node_configs: Vec<(NodeConfig, HashSet)>, - tcp_listener_resolver: TestTcpListenerResolver, -} - -pub struct ClusterSandboxConfigBuilder { +pub struct ClusterSandboxBuilder { temp_dir: TempDir, node_configs: Vec, } -impl Default for ClusterSandboxConfigBuilder { +impl Default for ClusterSandboxBuilder { fn default() -> Self { Self { temp_dir: tempfile::tempdir().unwrap(), @@ -93,7 +74,7 @@ impl Default for ClusterSandboxConfigBuilder { } } -impl ClusterSandboxConfigBuilder { +impl ClusterSandboxBuilder { pub fn add_node(mut self, services: impl IntoIterator) -> Self { self.node_configs.push(TestNodeConfig { services: HashSet::from_iter(services), @@ -113,14 +94,6 @@ impl ClusterSandboxConfigBuilder { self } - /// Builds a config that runs all the services in a single process - pub async fn build_standalone() -> ClusterSandboxConfig { - ClusterSandboxConfigBuilder::default() - .add_node(QuickwitService::supported_services()) - .build() - .await - } - /// Builds a list of of [`NodeConfig`] from the node definitions added to /// builder. For each node, a [`NodeConfig`] is built with the right /// parameters such that we will be able to run `quickwit_serve` on them and @@ -129,7 +102,7 @@ impl ClusterSandboxConfigBuilder { /// - `metastore_uri` defined by `root_data_dir/metastore`. /// - `default_index_root_uri` defined by `root_data_dir/indexes`. /// - `peers` defined by others nodes `gossip_advertise_addr`. - pub async fn build(self) -> ClusterSandboxConfig { + async fn build_config(self) -> ResolvedClusterConfig { let root_data_dir = self.temp_dir.path().to_path_buf(); let cluster_id = new_coolid("test-cluster"); let mut resolved_node_configs = Vec::new(); @@ -166,15 +139,36 @@ impl ClusterSandboxConfigBuilder { .filter(|seed| *seed != node_config.0.gossip_advertise_addr.to_string()) .collect_vec(); } - ClusterSandboxConfig { + ResolvedClusterConfig { temp_dir: self.temp_dir, node_configs: resolved_node_configs, tcp_listener_resolver, } } + + pub async fn build_and_start(self) -> ClusterSandbox { + self.build_config().await.start().await + } + + pub async fn build_and_start_standalone() -> ClusterSandbox { + ClusterSandboxBuilder::default() + .add_node(QuickwitService::supported_services()) + .build_config() + .await + .start() + .await + } +} + +/// Intermediate state where the ports of all the the test cluster nodes have +/// been reserved and the configurations have been generated. +struct ResolvedClusterConfig { + temp_dir: TempDir, + node_configs: Vec<(NodeConfig, HashSet)>, + tcp_listener_resolver: TestTcpListenerResolver, } -impl ClusterSandboxConfig { +impl ResolvedClusterConfig { /// Start a cluster using this config and waits for the nodes to be ready pub async fn start(self) -> ClusterSandbox { let mut node_shutdown_handles = Vec::new(); @@ -301,6 +295,19 @@ pub(crate) async fn ingest_with_retry( Ok(()) } +/// A test environment where you can start a Quickwit cluster and use the gRPC +/// or REST clients to test it. +pub struct ClusterSandbox { + pub node_configs: Vec<(NodeConfig, HashSet)>, + pub searcher_rest_client: QuickwitClient, + pub indexer_rest_client: QuickwitClient, + pub trace_client: TraceServiceClient, + pub logs_client: LogsServiceClient, + pub jaeger_client: SpanReaderPluginClient, + _temp_dir: TempDir, + node_shutdown_handles: Vec, +} + impl ClusterSandbox { pub fn enable_ingest_v2(&mut self) { self.indexer_rest_client.enable_ingest_v2(); diff --git a/quickwit/quickwit-integration-tests/src/test_utils/mod.rs b/quickwit/quickwit-integration-tests/src/test_utils/mod.rs index ff39c608be6..00bbdf73e82 100644 --- a/quickwit/quickwit-integration-tests/src/test_utils/mod.rs +++ b/quickwit/quickwit-integration-tests/src/test_utils/mod.rs @@ -20,4 +20,4 @@ mod cluster_sandbox; mod shutdown; -pub(crate) use cluster_sandbox::{ingest_with_retry, ClusterSandbox, ClusterSandboxConfigBuilder}; +pub(crate) use cluster_sandbox::{ingest_with_retry, ClusterSandbox, ClusterSandboxBuilder}; diff --git a/quickwit/quickwit-integration-tests/src/tests/basic_tests.rs b/quickwit/quickwit-integration-tests/src/tests/basic_tests.rs index b26bc9fc0a3..66b71c6738b 100644 --- a/quickwit/quickwit-integration-tests/src/tests/basic_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/basic_tests.rs @@ -27,7 +27,7 @@ use quickwit_rest_client::models::IngestSource; use quickwit_rest_client::rest_client::CommitType; use quickwit_serve::SearchRequestQueryString; -use crate::test_utils::{ingest_with_retry, ClusterSandboxConfigBuilder}; +use crate::test_utils::{ingest_with_retry, ClusterSandboxBuilder}; fn get_ndjson_filepath(ndjson_dataset_filename: &str) -> String { format!( @@ -40,10 +40,7 @@ fn get_ndjson_filepath(ndjson_dataset_filename: &str) -> String { #[tokio::test] async fn test_ui_redirect_on_get() { quickwit_common::setup_logging_for_tests(); - let sandbox = ClusterSandboxConfigBuilder::build_standalone() - .await - .start() - .await; + let sandbox = ClusterSandboxBuilder::build_and_start_standalone().await; let node_config = sandbox.node_configs.first().unwrap(); let client = hyper::Client::builder() .pool_idle_timeout(Duration::from_secs(30)) @@ -67,10 +64,7 @@ async fn test_ui_redirect_on_get() { #[tokio::test] async fn test_standalone_server() { quickwit_common::setup_logging_for_tests(); - let sandbox = ClusterSandboxConfigBuilder::build_standalone() - .await - .start() - .await; + let sandbox = ClusterSandboxBuilder::build_and_start_standalone().await; { // The indexing service should be running. let counters = sandbox @@ -127,15 +121,13 @@ async fn test_standalone_server() { #[tokio::test] async fn test_multi_nodes_cluster() { quickwit_common::setup_logging_for_tests(); - let sandbox = ClusterSandboxConfigBuilder::default() + let sandbox = ClusterSandboxBuilder::default() .add_node([QuickwitService::Searcher]) .add_node([QuickwitService::Metastore]) .add_node([QuickwitService::Indexer]) .add_node([QuickwitService::ControlPlane]) .add_node([QuickwitService::Janitor]) - .build() - .await - .start() + .build_and_start() .await; { diff --git a/quickwit/quickwit-integration-tests/src/tests/ingest_tests.rs b/quickwit/quickwit-integration-tests/src/tests/ingest_tests.rs index 41e295b5049..e109facf105 100644 --- a/quickwit/quickwit-integration-tests/src/tests/ingest_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/ingest_tests.rs @@ -28,7 +28,7 @@ use quickwit_rest_client::rest_client::CommitType; use serde_json::json; use crate::ingest_json; -use crate::test_utils::{ingest_with_retry, ClusterSandboxConfigBuilder}; +use crate::test_utils::{ingest_with_retry, ClusterSandboxBuilder}; fn initialize_tests() { quickwit_common::setup_logging_for_tests(); @@ -38,10 +38,7 @@ fn initialize_tests() { #[tokio::test] async fn test_single_node_cluster() { initialize_tests(); - let mut sandbox = ClusterSandboxConfigBuilder::build_standalone() - .await - .start() - .await; + let mut sandbox = ClusterSandboxBuilder::build_and_start_standalone().await; let index_id = "test-single-node-cluster"; let index_config = format!( r#" @@ -204,7 +201,7 @@ async fn test_single_node_cluster() { #[tokio::test] async fn test_ingest_v2_index_not_found() { initialize_tests(); - let mut sandbox = ClusterSandboxConfigBuilder::default() + let mut sandbox = ClusterSandboxBuilder::default() .add_node([QuickwitService::Indexer, QuickwitService::Janitor]) .add_node([QuickwitService::Indexer, QuickwitService::Janitor]) .add_node([ @@ -212,9 +209,7 @@ async fn test_ingest_v2_index_not_found() { QuickwitService::Metastore, QuickwitService::Searcher, ]) - .build() - .await - .start() + .build_and_start() .await; sandbox.enable_ingest_v2(); let missing_index_err: Error = sandbox @@ -241,7 +236,7 @@ async fn test_ingest_v2_index_not_found() { #[tokio::test] async fn test_ingest_v2_happy_path() { initialize_tests(); - let mut sandbox = ClusterSandboxConfigBuilder::default() + let mut sandbox = ClusterSandboxBuilder::default() .add_node([QuickwitService::Indexer, QuickwitService::Janitor]) .add_node([QuickwitService::Indexer, QuickwitService::Janitor]) .add_node([ @@ -249,9 +244,7 @@ async fn test_ingest_v2_happy_path() { QuickwitService::Metastore, QuickwitService::Searcher, ]) - .build() - .await - .start() + .build_and_start() .await; sandbox.enable_ingest_v2(); let index_id = "test_happy_path"; @@ -326,10 +319,7 @@ async fn test_ingest_v2_happy_path() { #[tokio::test] async fn test_commit_modes() { initialize_tests(); - let sandbox = ClusterSandboxConfigBuilder::build_standalone() - .await - .start() - .await; + let sandbox = ClusterSandboxBuilder::build_and_start_standalone().await; let index_id = "test_commit_modes"; let index_config = format!( r#" @@ -411,15 +401,13 @@ async fn test_commit_modes() { #[tokio::test] async fn test_very_large_index_name() { initialize_tests(); - let mut sandbox = ClusterSandboxConfigBuilder::default() + let mut sandbox = ClusterSandboxBuilder::default() .add_node([QuickwitService::Searcher]) .add_node([QuickwitService::Metastore]) .add_node([QuickwitService::Indexer]) .add_node([QuickwitService::ControlPlane]) .add_node([QuickwitService::Janitor]) - .build() - .await - .start() + .build_and_start() .await; sandbox.enable_ingest_v2(); @@ -512,10 +500,7 @@ async fn test_very_large_index_name() { #[tokio::test] async fn test_shutdown_single_node() { initialize_tests(); - let mut sandbox = ClusterSandboxConfigBuilder::build_standalone() - .await - .start() - .await; + let mut sandbox = ClusterSandboxBuilder::build_and_start_standalone().await; let index_id = "test_shutdown_single_node"; sandbox.enable_ingest_v2(); @@ -577,7 +562,7 @@ async fn test_shutdown_single_node() { #[tokio::test] async fn test_shutdown_control_plane_early_shutdown() { initialize_tests(); - let sandbox = ClusterSandboxConfigBuilder::default() + let sandbox = ClusterSandboxBuilder::default() .add_node([QuickwitService::Indexer]) .add_node([ QuickwitService::ControlPlane, @@ -585,9 +570,7 @@ async fn test_shutdown_control_plane_early_shutdown() { QuickwitService::Metastore, QuickwitService::Janitor, ]) - .build() - .await - .start() + .build_and_start() .await; let index_id = "test_shutdown_separate_indexer"; @@ -638,7 +621,7 @@ async fn test_shutdown_control_plane_early_shutdown() { #[tokio::test] async fn test_shutdown_separate_indexer() { initialize_tests(); - let sandbox = ClusterSandboxConfigBuilder::default() + let sandbox = ClusterSandboxBuilder::default() .add_node([QuickwitService::Indexer]) .add_node([ QuickwitService::ControlPlane, @@ -646,9 +629,7 @@ async fn test_shutdown_separate_indexer() { QuickwitService::Metastore, QuickwitService::Janitor, ]) - .build() - .await - .start() + .build_and_start() .await; let index_id = "test_shutdown_separate_indexer"; diff --git a/quickwit/quickwit-integration-tests/src/tests/otlp_tests.rs b/quickwit/quickwit-integration-tests/src/tests/otlp_tests.rs index f8770322782..31df6858fd6 100644 --- a/quickwit/quickwit-integration-tests/src/tests/otlp_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/otlp_tests.rs @@ -37,7 +37,7 @@ use quickwit_proto::opentelemetry::proto::logs::v1::{LogRecord, ResourceLogs, Sc use quickwit_proto::opentelemetry::proto::trace::v1::{ResourceSpans, ScopeSpans, Span}; use tonic::codec::CompressionEncoding; -use crate::test_utils::ClusterSandboxConfigBuilder; +use crate::test_utils::ClusterSandboxBuilder; fn initialize_tests() { quickwit_common::setup_logging_for_tests(); @@ -47,15 +47,13 @@ fn initialize_tests() { #[tokio::test] async fn test_ingest_traces_with_otlp_grpc_api() { initialize_tests(); - let mut sandbox = ClusterSandboxConfigBuilder::default() + let mut sandbox = ClusterSandboxBuilder::default() .add_node([QuickwitService::Searcher]) .add_node([QuickwitService::Metastore]) .add_node_with_otlp([QuickwitService::Indexer]) .add_node([QuickwitService::ControlPlane]) .add_node([QuickwitService::Janitor]) - .build() - .await - .start() + .build_and_start() .await; // Wait for the pipelines to start (one for logs and one for traces) sandbox.wait_for_indexing_pipelines(2).await.unwrap(); @@ -142,15 +140,13 @@ async fn test_ingest_traces_with_otlp_grpc_api() { #[tokio::test] async fn test_ingest_logs_with_otlp_grpc_api() { initialize_tests(); - let mut sandbox = ClusterSandboxConfigBuilder::default() + let mut sandbox = ClusterSandboxBuilder::default() .add_node([QuickwitService::Searcher]) .add_node([QuickwitService::Metastore]) .add_node_with_otlp([QuickwitService::Indexer]) .add_node([QuickwitService::ControlPlane]) .add_node([QuickwitService::Janitor]) - .build() - .await - .start() + .build_and_start() .await; // Wait fo the pipelines to start (one for logs and one for traces) sandbox.wait_for_indexing_pipelines(2).await.unwrap(); @@ -218,15 +214,13 @@ async fn test_ingest_logs_with_otlp_grpc_api() { #[tokio::test] async fn test_jaeger_api() { initialize_tests(); - let mut sandbox = ClusterSandboxConfigBuilder::default() + let mut sandbox = ClusterSandboxBuilder::default() .add_node([QuickwitService::Searcher]) .add_node([QuickwitService::Metastore]) .add_node_with_otlp([QuickwitService::Indexer]) .add_node([QuickwitService::ControlPlane]) .add_node([QuickwitService::Janitor]) - .build() - .await - .start() + .build_and_start() .await; // Wait fo the pipelines to start (one for logs and one for traces) sandbox.wait_for_indexing_pipelines(2).await.unwrap(); diff --git a/quickwit/quickwit-integration-tests/src/tests/sqs_tests.rs b/quickwit/quickwit-integration-tests/src/tests/sqs_tests.rs index 64abe0355ba..9ae9bd90e21 100644 --- a/quickwit/quickwit-integration-tests/src/tests/sqs_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/sqs_tests.rs @@ -32,7 +32,7 @@ use quickwit_serve::SearchRequestQueryString; use tempfile::NamedTempFile; use tracing::info; -use crate::test_utils::ClusterSandboxConfigBuilder; +use crate::test_utils::ClusterSandboxBuilder; fn create_mock_data_file(num_lines: usize) -> (NamedTempFile, Uri) { let mut temp_file = tempfile::NamedTempFile::new().unwrap(); @@ -48,10 +48,7 @@ fn create_mock_data_file(num_lines: usize) -> (NamedTempFile, Uri) { #[tokio::test] async fn test_sqs_single_node_cluster() { quickwit_common::setup_logging_for_tests(); - let sandbox = ClusterSandboxConfigBuilder::build_standalone() - .await - .start() - .await; + let sandbox = ClusterSandboxBuilder::build_and_start_standalone().await; let index_id = "test-sqs-source-single-node-cluster"; let index_config = format!( r#" diff --git a/quickwit/quickwit-integration-tests/src/tests/update_tests/doc_mapping_tests.rs b/quickwit/quickwit-integration-tests/src/tests/update_tests/doc_mapping_tests.rs index d1b7ddcdef0..c1d55860b20 100644 --- a/quickwit/quickwit-integration-tests/src/tests/update_tests/doc_mapping_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/update_tests/doc_mapping_tests.rs @@ -22,7 +22,7 @@ use std::time::Duration; use serde_json::{json, Value}; use super::assert_hits_unordered; -use crate::test_utils::ClusterSandboxConfigBuilder; +use crate::test_utils::ClusterSandboxBuilder; /// Update the doc mapping between 2 calls to local-ingest (forces separate indexing pipelines) and /// assert the number of hits for the given query @@ -35,10 +35,7 @@ async fn validate_search_across_doc_mapping_updates( query_and_expect: &[(&str, Result<&[Value], ()>)], ) { quickwit_common::setup_logging_for_tests(); - let sandbox = ClusterSandboxConfigBuilder::build_standalone() - .await - .start() - .await; + let sandbox = ClusterSandboxBuilder::build_and_start_standalone().await; { // Wait for indexer to fully start. diff --git a/quickwit/quickwit-integration-tests/src/tests/update_tests/search_settings_tests.rs b/quickwit/quickwit-integration-tests/src/tests/update_tests/search_settings_tests.rs index 7ab944278f1..9da970fbb98 100644 --- a/quickwit/quickwit-integration-tests/src/tests/update_tests/search_settings_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/update_tests/search_settings_tests.rs @@ -25,20 +25,18 @@ use serde_json::json; use super::assert_hits_unordered; use crate::ingest_json; -use crate::test_utils::{ingest_with_retry, ClusterSandboxConfigBuilder}; +use crate::test_utils::{ingest_with_retry, ClusterSandboxBuilder}; #[tokio::test] async fn test_update_search_settings_on_multi_nodes_cluster() { quickwit_common::setup_logging_for_tests(); - let sandbox = ClusterSandboxConfigBuilder::default() + let sandbox = ClusterSandboxBuilder::default() .add_node([QuickwitService::Searcher]) .add_node([QuickwitService::Metastore]) .add_node([QuickwitService::Indexer]) .add_node([QuickwitService::ControlPlane]) .add_node([QuickwitService::Janitor]) - .build() - .await - .start() + .build_and_start() .await; {