From 0fd1322d70e9c035ac58fc866629ff8f2706bef1 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Mon, 26 Aug 2024 17:24:29 +0200 Subject: [PATCH] Pass TcpListener to quickwit_serve in integ tests --- quickwit/quickwit-cli/src/service.rs | 3 +- .../quickwit-config/src/node_config/mod.rs | 7 +- .../src/node_config/serialize.rs | 18 +- .../quickwit-integration-tests/Cargo.toml | 2 +- .../src/test_utils/cluster_sandbox.rs | 416 ++++++++---------- .../src/test_utils/mod.rs | 3 +- .../src/test_utils/shutdown.rs | 73 +++ .../src/tests/basic_tests.rs | 41 +- .../src/tests/ingest_tests.rs | 106 ++--- .../src/tests/otlp_tests.rs | 56 +-- .../src/tests/sqs_tests.rs | 9 +- .../tests/update_tests/doc_mapping_tests.rs | 7 +- .../update_tests/search_settings_tests.rs | 22 +- quickwit/quickwit-serve/Cargo.toml | 2 +- quickwit/quickwit-serve/src/grpc.rs | 12 +- quickwit/quickwit-serve/src/lib.rs | 7 +- quickwit/quickwit-serve/src/rest.rs | 10 +- quickwit/quickwit-serve/src/tcp_listener.rs | 77 ++++ 18 files changed, 514 insertions(+), 357 deletions(-) create mode 100644 quickwit/quickwit-integration-tests/src/test_utils/shutdown.rs create mode 100644 quickwit/quickwit-serve/src/tcp_listener.rs diff --git a/quickwit/quickwit-cli/src/service.rs b/quickwit/quickwit-cli/src/service.rs index 889c0455d5a..8041ad2a8fd 100644 --- a/quickwit/quickwit-cli/src/service.rs +++ b/quickwit/quickwit-cli/src/service.rs @@ -27,7 +27,7 @@ use quickwit_common::runtimes::RuntimesConfig; use quickwit_common::uri::{Protocol, Uri}; use quickwit_config::service::QuickwitService; use quickwit_config::NodeConfig; -use quickwit_serve::{serve_quickwit, BuildInfo, EnvFilterReloadFn}; +use quickwit_serve::{serve_quickwit, BuildInfo, EnvFilterReloadFn, TcpListenerResolver}; use quickwit_telemetry::payload::{QuickwitFeature, QuickwitTelemetryInfo, TelemetryEvent}; use tokio::signal; use tracing::{debug, info}; @@ -114,6 +114,7 @@ impl RunCliCommand { runtimes_config, metastore_resolver, storage_resolver, + TcpListenerResolver::default(), shutdown_signal, env_filter_reload_fn, ) diff --git a/quickwit/quickwit-config/src/node_config/mod.rs b/quickwit/quickwit-config/src/node_config/mod.rs index f8a5611d75e..cc3d9e1ce37 100644 --- a/quickwit/quickwit-config/src/node_config/mod.rs +++ b/quickwit/quickwit-config/src/node_config/mod.rs @@ -482,7 +482,12 @@ impl NodeConfig { #[cfg(any(test, feature = "testsuite"))] pub fn for_test() -> Self { - serialize::node_config_for_test() + serialize::node_config_for_tests() + } + + #[cfg(any(test, feature = "testsuite"))] + pub fn for_test_from_ports(rest_listen_port: u16, grpc_listen_port: u16) -> Self { + serialize::node_config_for_tests_from_ports(rest_listen_port, grpc_listen_port) } } diff --git a/quickwit/quickwit-config/src/node_config/serialize.rs b/quickwit/quickwit-config/src/node_config/serialize.rs index 208a929badc..0308bd6eeb9 100644 --- a/quickwit/quickwit-config/src/node_config/serialize.rs +++ b/quickwit/quickwit-config/src/node_config/serialize.rs @@ -411,13 +411,13 @@ impl RestConfigBuilder { } #[cfg(any(test, feature = "testsuite"))] -pub fn node_config_for_test() -> NodeConfig { - use quickwit_common::net::find_available_tcp_port; - +pub fn node_config_for_tests_from_ports( + rest_listen_port: u16, + grpc_listen_port: u16, +) -> NodeConfig { let node_id = NodeId::new(default_node_id().unwrap()); let enabled_services = QuickwitService::supported_services(); let listen_address = Host::default(); - let rest_listen_port = find_available_tcp_port().expect("OS should find an available port"); let rest_listen_addr = listen_address .with_port(rest_listen_port) .to_socket_addr() @@ -426,7 +426,6 @@ pub fn node_config_for_test() -> NodeConfig { .with_port(rest_listen_port) .to_socket_addr() .expect("default host should be an IP address"); - let grpc_listen_port = find_available_tcp_port().expect("OS should find an available port"); let grpc_listen_addr = listen_address .with_port(grpc_listen_port) .to_socket_addr() @@ -468,6 +467,15 @@ pub fn node_config_for_test() -> NodeConfig { } } +/// Creates a config with defaults that are suitable for testing. +/// +/// Uses the default ports without ensuring that they are available. +#[cfg(any(test, feature = "testsuite"))] +pub fn node_config_for_tests() -> NodeConfig { + let default_rest = default_rest_listen_port(); + node_config_for_tests_from_ports(default_rest, default_rest + 1) +} + #[cfg(test)] mod tests { use std::env; diff --git a/quickwit/quickwit-integration-tests/Cargo.toml b/quickwit/quickwit-integration-tests/Cargo.toml index ecfafd9b5a8..9a2892c3ad4 100644 --- a/quickwit/quickwit-integration-tests/Cargo.toml +++ b/quickwit/quickwit-integration-tests/Cargo.toml @@ -41,5 +41,5 @@ quickwit-metastore = { workspace = true, features = ["testsuite"] } quickwit-opentelemetry = { workspace = true, features = ["testsuite"] } quickwit-proto = { workspace = true, features = ["testsuite"] } quickwit-rest-client = { workspace = true } -quickwit-serve = { workspace = true } +quickwit-serve = { workspace = true, features = ["testsuite"] } quickwit-storage = { workspace = true, features = ["testsuite"] } diff --git a/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs b/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs index 2ccf38cc1a6..3d83c6a006a 100644 --- a/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs +++ b/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs @@ -20,7 +20,6 @@ use std::collections::{HashMap, HashSet}; use std::io::Write; use std::net::SocketAddr; -use std::path::PathBuf; use std::str::FromStr; use std::time::Duration; @@ -31,8 +30,7 @@ use quickwit_actors::ActorExitStatus; use quickwit_cli::tool::{local_ingest_docs_cli, LocalIngestDocsArgs}; use quickwit_common::new_coolid; use quickwit_common::runtimes::RuntimesConfig; -use quickwit_common::test_utils::{wait_for_server_ready, wait_until_predicate}; -use quickwit_common::tower::BoxFutureInfaillible; +use quickwit_common::test_utils::wait_until_predicate; use quickwit_common::uri::Uri as QuickwitUri; use quickwit_config::service::QuickwitService; use quickwit_config::NodeConfig; @@ -45,84 +43,219 @@ use quickwit_rest_client::models::IngestSource; use quickwit_rest_client::rest_client::{ CommitType, QuickwitClient, QuickwitClientBuilder, DEFAULT_BASE_URL, }; -use quickwit_serve::{serve_quickwit, ListSplitsQueryParams, SearchRequestQueryString}; +use quickwit_serve::{ + serve_quickwit, ListSplitsQueryParams, SearchRequestQueryString, TcpListenerResolver, +}; use quickwit_storage::StorageResolver; use reqwest::Url; use serde_json::Value; use tempfile::TempDir; -use tokio::sync::watch::{self, Receiver, Sender}; -use tokio::task::JoinHandle; +use tokio::net::TcpListener; use tonic::transport::channel; use tracing::debug; -/// Configuration of a node made of a [`NodeConfig`] and a -/// set of services. -#[derive(Clone)] +use super::shutdown::NodeShutdownHandle; + pub struct TestNodeConfig { - pub node_config: NodeConfig, pub services: HashSet, + pub enable_otlp: bool, } -type NodeJoinHandle = JoinHandle, anyhow::Error>>; +/// A test environment where you can start a Quickwit cluster and use the gRPC +/// or REST clients to test it. +pub struct ClusterSandbox { + pub node_configs: Vec<(NodeConfig, HashSet)>, + pub searcher_rest_client: QuickwitClient, + pub indexer_rest_client: QuickwitClient, + pub trace_client: TraceServiceClient, + pub logs_client: LogsServiceClient, + pub jaeger_client: SpanReaderPluginClient, + _temp_dir: TempDir, + node_shutdown_handles: Vec, +} + +pub struct ClusterSandboxConfig { + temp_dir: TempDir, + pub node_configs: Vec<(NodeConfig, HashSet)>, + tcp_listener_resolver: TcpListenerResolver, +} -struct NodeShutdownHandle { - sender: Sender<()>, - receiver: Receiver<()>, - node_services: HashSet, - node_id: NodeId, - join_handle_opt: Option, +pub struct ClusterSandboxConfigBuilder { + temp_dir: TempDir, + node_configs: Vec, } -impl NodeShutdownHandle { - fn new(node_id: NodeId, node_services: HashSet) -> Self { - let (sender, receiver) = watch::channel(()); +impl Default for ClusterSandboxConfigBuilder { + fn default() -> Self { Self { - sender, - receiver, - node_id, - node_services, - join_handle_opt: None, + temp_dir: tempfile::tempdir().unwrap(), + node_configs: Vec::new(), } } +} - fn shutdown_signal(&self) -> BoxFutureInfaillible<()> { - let receiver = self.receiver.clone(); - Box::pin(async move { - receiver.clone().changed().await.unwrap(); - }) +impl ClusterSandboxConfigBuilder { + pub fn add_node(mut self, services: impl IntoIterator) -> Self { + self.node_configs.push(TestNodeConfig { + services: HashSet::from_iter(services), + enable_otlp: false, + }); + self } - fn set_node_join_handle(&mut self, join_handle: NodeJoinHandle) { - self.join_handle_opt = Some(join_handle); + pub fn add_node_with_otlp( + mut self, + services: impl IntoIterator, + ) -> Self { + self.node_configs.push(TestNodeConfig { + services: HashSet::from_iter(services), + enable_otlp: true, + }); + self } - /// Initiate node shutdown and wait for it to complete - async fn shutdown(self) -> anyhow::Result> { - self.sender.send(()).unwrap(); - self.join_handle_opt - .expect("node join handle was not set before shutdown") + /// Builds a config that runs all the services in a single process + pub async fn build_standalone() -> ClusterSandboxConfig { + ClusterSandboxConfigBuilder::default() + .add_node(QuickwitService::supported_services()) + .build() .await - .unwrap() + } + + /// Builds a list of of [`NodeConfig`] from the node definitions added to + /// builder. For each node, a [`NodeConfig`] is built with the right + /// parameters such that we will be able to run `quickwit_serve` on them and + /// form a Quickwit cluster. For each node, we set: + /// - `data_dir_path` defined by `root_data_dir/node_id`. + /// - `metastore_uri` defined by `root_data_dir/metastore`. + /// - `default_index_root_uri` defined by `root_data_dir/indexes`. + /// - `peers` defined by others nodes `gossip_advertise_addr`. + pub async fn build(self) -> ClusterSandboxConfig { + let root_data_dir = self.temp_dir.path().to_path_buf(); + let cluster_id = new_coolid("test-cluster"); + let mut resolved_node_configs = Vec::new(); + let mut peers: Vec = Vec::new(); + let unique_dir_name = new_coolid("test-dir"); + let tcp_listener_resolver = TcpListenerResolver::for_test(); + for (node_idx, node_builder) in self.node_configs.iter().enumerate() { + let socket: SocketAddr = ([127, 0, 0, 1], 0u16).into(); + let rest_tcp_listener = TcpListener::bind(socket).await.unwrap(); + let grpc_tcp_listener = TcpListener::bind(socket).await.unwrap(); + let mut config = NodeConfig::for_test_from_ports( + rest_tcp_listener.local_addr().unwrap().port(), + grpc_tcp_listener.local_addr().unwrap().port(), + ); + tcp_listener_resolver.add_listener(rest_tcp_listener).await; + tcp_listener_resolver.add_listener(grpc_tcp_listener).await; + config.indexer_config.enable_otlp_endpoint = node_builder.enable_otlp; + config.enabled_services.clone_from(&node_builder.services); + config.jaeger_config.enable_endpoint = true; + config.cluster_id.clone_from(&cluster_id); + config.node_id = NodeId::new(format!("test-node-{node_idx}")); + config.data_dir_path = root_data_dir.join(config.node_id.as_str()); + config.metastore_uri = + QuickwitUri::from_str(&format!("ram:///{unique_dir_name}/metastore")).unwrap(); + config.default_index_root_uri = + QuickwitUri::from_str(&format!("ram:///{unique_dir_name}/indexes")).unwrap(); + peers.push(config.gossip_advertise_addr.to_string()); + resolved_node_configs.push((config, node_builder.services.clone())); + } + for node_config in resolved_node_configs.iter_mut() { + node_config.0.peer_seeds = peers + .clone() + .into_iter() + .filter(|seed| *seed != node_config.0.gossip_advertise_addr.to_string()) + .collect_vec(); + } + ClusterSandboxConfig { + temp_dir: self.temp_dir, + node_configs: resolved_node_configs, + tcp_listener_resolver, + } } } -/// Creates a Cluster Test environment. -/// -/// The goal is to start several nodes and use the gRPC or REST clients to -/// test it. -/// -/// WARNING: Currently, we cannot start an indexer in a different test as it will -/// will share the same `INGEST_API_SERVICE_INSTANCE`. The ingest API will be -/// dropped by the first running test and the other tests will fail. -pub struct ClusterSandbox { - pub node_configs: Vec, - pub searcher_rest_client: QuickwitClient, - pub indexer_rest_client: QuickwitClient, - pub trace_client: TraceServiceClient, - pub logs_client: LogsServiceClient, - pub jaeger_client: SpanReaderPluginClient, - _temp_dir: TempDir, - node_shutdown_handle: Vec, +impl ClusterSandboxConfig { + /// Start a cluster using this config and waits for the nodes to be ready + pub async fn start(self) -> ClusterSandbox { + let mut node_shutdown_handles = Vec::new(); + let runtimes_config = RuntimesConfig::light_for_tests(); + let storage_resolver = StorageResolver::unconfigured(); + let metastore_resolver = MetastoreResolver::unconfigured(); + let cluster_size = self.node_configs.len(); + for node_config in self.node_configs.iter() { + let mut shutdown_handler = + NodeShutdownHandle::new(node_config.0.node_id.clone(), node_config.1.clone()); + let shutdown_signal = shutdown_handler.shutdown_signal(); + let join_handle = tokio::spawn({ + let node_config = node_config.0.clone(); + let node_id = node_config.node_id.clone(); + let services = node_config.enabled_services.clone(); + let metastore_resolver = metastore_resolver.clone(); + let storage_resolver = storage_resolver.clone(); + let tcp_listener_resolver = self.tcp_listener_resolver.clone(); + + async move { + let result = serve_quickwit( + node_config, + runtimes_config, + metastore_resolver, + storage_resolver, + tcp_listener_resolver, + shutdown_signal, + quickwit_serve::do_nothing_env_filter_reload_fn(), + ) + .await?; + debug!("{} stopped successfully ({:?})", node_id, services); + Result::<_, anyhow::Error>::Ok(result) + } + }); + shutdown_handler.set_node_join_handle(join_handle); + node_shutdown_handles.push(shutdown_handler); + } + let searcher_config = self + .node_configs + .iter() + .find(|node_config| node_config.1.contains(&QuickwitService::Searcher)) + .cloned() + .unwrap(); + let indexer_config = self + .node_configs + .iter() + .find(|node_config| node_config.1.contains(&QuickwitService::Indexer)) + .cloned() + .unwrap(); + let indexer_channel = + channel::Endpoint::from_str(&format!("http://{}", indexer_config.0.grpc_listen_addr)) + .unwrap() + .connect_lazy(); + let searcher_channel = + channel::Endpoint::from_str(&format!("http://{}", searcher_config.0.grpc_listen_addr)) + .unwrap() + .connect_lazy(); + + let sandbox = ClusterSandbox { + node_configs: self.node_configs, + searcher_rest_client: QuickwitClientBuilder::new(transport_url( + searcher_config.0.rest_config.listen_addr, + )) + .build(), + indexer_rest_client: QuickwitClientBuilder::new(transport_url( + indexer_config.0.rest_config.listen_addr, + )) + .build(), + trace_client: TraceServiceClient::new(indexer_channel.clone()), + logs_client: LogsServiceClient::new(indexer_channel), + jaeger_client: SpanReaderPluginClient::new(searcher_channel), + _temp_dir: self.temp_dir, + node_shutdown_handles, + }; + sandbox + .wait_for_cluster_num_ready_nodes(cluster_size) + .await + .unwrap(); + sandbox + } } fn transport_url(addr: SocketAddr) -> Url { @@ -170,128 +303,12 @@ pub(crate) async fn ingest_with_retry( } impl ClusterSandbox { - pub async fn start_cluster_with_configs( - temp_dir: TempDir, - node_configs: Vec, - ) -> anyhow::Result { - let runtimes_config = RuntimesConfig::light_for_tests(); - let storage_resolver = StorageResolver::unconfigured(); - let metastore_resolver = MetastoreResolver::unconfigured(); - let mut node_shutdown_handlers = Vec::new(); - for node_config in node_configs.iter() { - let mut shutdown_handler = NodeShutdownHandle::new( - node_config.node_config.node_id.clone(), - node_config.services.clone(), - ); - let shutdown_signal = shutdown_handler.shutdown_signal(); - let join_handle = tokio::spawn({ - let node_config = node_config.node_config.clone(); - let node_id = node_config.node_id.clone(); - let services = node_config.enabled_services.clone(); - let metastore_resolver = metastore_resolver.clone(); - let storage_resolver = storage_resolver.clone(); - - async move { - let result = serve_quickwit( - node_config, - runtimes_config, - metastore_resolver, - storage_resolver, - shutdown_signal, - quickwit_serve::do_nothing_env_filter_reload_fn(), - ) - .await?; - debug!("{} stopped successfully ({:?})", node_id, services); - Result::<_, anyhow::Error>::Ok(result) - } - }); - shutdown_handler.set_node_join_handle(join_handle); - node_shutdown_handlers.push(shutdown_handler); - } - let searcher_config = node_configs - .iter() - .find(|node_config| node_config.services.contains(&QuickwitService::Searcher)) - .cloned() - .unwrap(); - let indexer_config = node_configs - .iter() - .find(|node_config| node_config.services.contains(&QuickwitService::Indexer)) - .cloned() - .unwrap(); - if node_configs.len() == 1 { - // We have only one node, so we can just wait for it to get started - wait_for_server_ready(node_configs[0].node_config.grpc_listen_addr).await?; - } else { - // Wait for a duration greater than chitchat GOSSIP_INTERVAL (50ms) so that the cluster - // is formed. - tokio::time::sleep(Duration::from_millis(100)).await; - } - let indexer_channel = channel::Endpoint::from_str(&format!( - "http://{}", - indexer_config.node_config.grpc_listen_addr - )) - .unwrap() - .connect_lazy(); - let searcher_channel = channel::Endpoint::from_str(&format!( - "http://{}", - searcher_config.node_config.grpc_listen_addr - )) - .unwrap() - .connect_lazy(); - Ok(Self { - node_configs, - searcher_rest_client: QuickwitClientBuilder::new(transport_url( - searcher_config.node_config.rest_config.listen_addr, - )) - .build(), - indexer_rest_client: QuickwitClientBuilder::new(transport_url( - indexer_config.node_config.rest_config.listen_addr, - )) - .build(), - trace_client: TraceServiceClient::new(indexer_channel.clone()), - logs_client: LogsServiceClient::new(indexer_channel), - jaeger_client: SpanReaderPluginClient::new(searcher_channel), - _temp_dir: temp_dir, - node_shutdown_handle: node_shutdown_handlers, - }) - } - pub fn enable_ingest_v2(&mut self) { self.indexer_rest_client.enable_ingest_v2(); self.searcher_rest_client.enable_ingest_v2(); } - // Starts one node that runs all the services and wait for it to be ready - pub async fn start_standalone_node() -> anyhow::Result { - let sandbox = Self::start_cluster_nodes(&[QuickwitService::supported_services()]).await?; - sandbox.wait_for_cluster_num_ready_nodes(1).await?; - Ok(sandbox) - } - - pub async fn start_cluster_with_otlp_service( - nodes_services: &[HashSet], - ) -> anyhow::Result { - let temp_dir = tempfile::tempdir()?; - let mut node_configs = build_node_configs(temp_dir.path().to_path_buf(), nodes_services); - // Set OTLP endpoint for indexers. - for node_config in node_configs.iter_mut() { - if node_config.services.contains(&QuickwitService::Indexer) { - node_config.node_config.indexer_config.enable_otlp_endpoint = true; - } - } - Self::start_cluster_with_configs(temp_dir, node_configs).await - } - - // Starts nodes with corresponding services given by `nodes_services`. - pub async fn start_cluster_nodes( - nodes_services: &[HashSet], - ) -> anyhow::Result { - let temp_dir = tempfile::tempdir()?; - let node_configs = build_node_configs(temp_dir.path().to_path_buf(), nodes_services); - Self::start_cluster_with_configs(temp_dir, node_configs).await - } - - pub async fn wait_for_cluster_num_ready_nodes( + async fn wait_for_cluster_num_ready_nodes( &self, expected_num_ready_nodes: usize, ) -> anyhow::Result<()> { @@ -405,7 +422,7 @@ impl ClusterSandbox { let test_conf = self .node_configs .iter() - .find(|config| config.services.contains(&QuickwitService::Indexer)) + .find(|config| config.1.contains(&QuickwitService::Indexer)) .ok_or(anyhow::anyhow!("No indexer node found"))?; // NodeConfig cannot be serialized, we write our own simplified config let mut tmp_config_file = tempfile::Builder::new().suffix(".yaml").tempfile().unwrap(); @@ -415,7 +432,7 @@ impl ClusterSandbox { metastore_uri: {} data_dir: {:?} "#, - test_conf.node_config.metastore_uri, test_conf.node_config.data_dir_path + test_conf.0.metastore_uri, test_conf.0.data_dir_path ); tmp_config_file.write_all(node_config.as_bytes())?; tmp_config_file.flush()?; @@ -479,10 +496,10 @@ impl ClusterSandbox { let mut shutdown_futures = Vec::new(); let mut shutdown_nodes = HashMap::new(); let mut i = 0; - while i < self.node_shutdown_handle.len() { - let handler_services = &self.node_shutdown_handle[i].node_services; + while i < self.node_shutdown_handles.len() { + let handler_services = &self.node_shutdown_handles[i].node_services; if handler_services.is_subset(shutdown_services) { - let handler_to_shutdown = self.node_shutdown_handle.remove(i); + let handler_to_shutdown = self.node_shutdown_handles.remove(i); shutdown_nodes.insert( handler_to_shutdown.node_id.clone(), handler_to_shutdown.node_services.clone(), @@ -508,48 +525,3 @@ impl ClusterSandbox { .await } } - -/// Builds a list of [`NodeConfig`] given a list of Quickwit services. -/// Each element of `nodes_services` defines the services of a given node. -/// For each node, a `NodeConfig` is built with the right parameters -/// such that we will be able to run `quickwit_serve` on them and form -/// a quickwit cluster. -/// For each node, we set: -/// - `data_dir_path` defined by `root_data_dir/node_id`. -/// - `metastore_uri` defined by `root_data_dir/metastore`. -/// - `default_index_root_uri` defined by `root_data_dir/indexes`. -/// - `peers` defined by others nodes `gossip_advertise_addr`. -pub fn build_node_configs( - root_data_dir: PathBuf, - nodes_services: &[HashSet], -) -> Vec { - let cluster_id = new_coolid("test-cluster"); - let mut node_configs = Vec::new(); - let mut peers: Vec = Vec::new(); - let unique_dir_name = new_coolid("test-dir"); - for (node_idx, node_services) in nodes_services.iter().enumerate() { - let mut config = NodeConfig::for_test(); - config.enabled_services.clone_from(node_services); - config.jaeger_config.enable_endpoint = true; - config.cluster_id.clone_from(&cluster_id); - config.node_id = NodeId::new(format!("test-node-{node_idx}")); - config.data_dir_path = root_data_dir.join(config.node_id.as_str()); - config.metastore_uri = - QuickwitUri::from_str(&format!("ram:///{unique_dir_name}/metastore")).unwrap(); - config.default_index_root_uri = - QuickwitUri::from_str(&format!("ram:///{unique_dir_name}/indexes")).unwrap(); - peers.push(config.gossip_advertise_addr.to_string()); - node_configs.push(TestNodeConfig { - node_config: config, - services: node_services.clone(), - }); - } - for node_config in node_configs.iter_mut() { - node_config.node_config.peer_seeds = peers - .clone() - .into_iter() - .filter(|seed| *seed != node_config.node_config.gossip_advertise_addr.to_string()) - .collect_vec(); - } - node_configs -} diff --git a/quickwit/quickwit-integration-tests/src/test_utils/mod.rs b/quickwit/quickwit-integration-tests/src/test_utils/mod.rs index 82d2213068f..ff39c608be6 100644 --- a/quickwit/quickwit-integration-tests/src/test_utils/mod.rs +++ b/quickwit/quickwit-integration-tests/src/test_utils/mod.rs @@ -18,5 +18,6 @@ // along with this program. If not, see . mod cluster_sandbox; +mod shutdown; -pub(crate) use cluster_sandbox::{ingest_with_retry, ClusterSandbox}; +pub(crate) use cluster_sandbox::{ingest_with_retry, ClusterSandbox, ClusterSandboxConfigBuilder}; diff --git a/quickwit/quickwit-integration-tests/src/test_utils/shutdown.rs b/quickwit/quickwit-integration-tests/src/test_utils/shutdown.rs new file mode 100644 index 00000000000..0e9c1aa4e38 --- /dev/null +++ b/quickwit/quickwit-integration-tests/src/test_utils/shutdown.rs @@ -0,0 +1,73 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::collections::{HashMap, HashSet}; + +use quickwit_actors::ActorExitStatus; +use quickwit_common::tower::BoxFutureInfaillible; +use quickwit_config::service::QuickwitService; +use quickwit_proto::types::NodeId; +use tokio::sync::watch::{self, Receiver, Sender}; +use tokio::task::JoinHandle; + +type NodeJoinHandle = JoinHandle, anyhow::Error>>; + +pub(crate) struct NodeShutdownHandle { + sender: Sender<()>, + receiver: Receiver<()>, + pub node_services: HashSet, + pub node_id: NodeId, + join_handle_opt: Option, +} + +impl NodeShutdownHandle { + pub(crate) fn new(node_id: NodeId, node_services: HashSet) -> Self { + let (sender, receiver) = watch::channel(()); + Self { + sender, + receiver, + node_id, + node_services, + join_handle_opt: None, + } + } + + pub(crate) fn shutdown_signal(&self) -> BoxFutureInfaillible<()> { + let receiver = self.receiver.clone(); + Box::pin(async move { + receiver.clone().changed().await.unwrap(); + }) + } + + pub(crate) fn set_node_join_handle(&mut self, join_handle: NodeJoinHandle) { + self.join_handle_opt = Some(join_handle); + } + + /// Initiate node shutdown and wait for it to complete + + pub(crate) async fn shutdown( + self, + ) -> anyhow::Result> { + self.sender.send(()).unwrap(); + self.join_handle_opt + .expect("node join handle was not set before shutdown") + .await + .unwrap() + } +} diff --git a/quickwit/quickwit-integration-tests/src/tests/basic_tests.rs b/quickwit/quickwit-integration-tests/src/tests/basic_tests.rs index ee6f18eafe0..b26bc9fc0a3 100644 --- a/quickwit/quickwit-integration-tests/src/tests/basic_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/basic_tests.rs @@ -17,7 +17,6 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::collections::HashSet; use std::path::PathBuf; use std::str::FromStr; use std::time::Duration; @@ -28,7 +27,7 @@ use quickwit_rest_client::models::IngestSource; use quickwit_rest_client::rest_client::CommitType; use quickwit_serve::SearchRequestQueryString; -use crate::test_utils::{ingest_with_retry, ClusterSandbox}; +use crate::test_utils::{ingest_with_retry, ClusterSandboxConfigBuilder}; fn get_ndjson_filepath(ndjson_dataset_filename: &str) -> String { format!( @@ -41,18 +40,18 @@ fn get_ndjson_filepath(ndjson_dataset_filename: &str) -> String { #[tokio::test] async fn test_ui_redirect_on_get() { quickwit_common::setup_logging_for_tests(); - let sandbox = ClusterSandbox::start_standalone_node().await.unwrap(); + let sandbox = ClusterSandboxConfigBuilder::build_standalone() + .await + .start() + .await; let node_config = sandbox.node_configs.first().unwrap(); let client = hyper::Client::builder() .pool_idle_timeout(Duration::from_secs(30)) .http2_only(true) .build_http(); - let root_uri = format!( - "http://{}/", - node_config.node_config.rest_config.listen_addr - ) - .parse::() - .unwrap(); + let root_uri = format!("http://{}/", node_config.0.rest_config.listen_addr) + .parse::() + .unwrap(); let response = client.get(root_uri.clone()).await.unwrap(); assert_eq!(response.status(), StatusCode::MOVED_PERMANENTLY); let post_request = Request::builder() @@ -68,7 +67,10 @@ async fn test_ui_redirect_on_get() { #[tokio::test] async fn test_standalone_server() { quickwit_common::setup_logging_for_tests(); - let sandbox = ClusterSandbox::start_standalone_node().await.unwrap(); + let sandbox = ClusterSandboxConfigBuilder::build_standalone() + .await + .start() + .await; { // The indexing service should be running. let counters = sandbox @@ -125,17 +127,16 @@ async fn test_standalone_server() { #[tokio::test] async fn test_multi_nodes_cluster() { quickwit_common::setup_logging_for_tests(); - let nodes_services = vec![ - HashSet::from_iter([QuickwitService::Searcher]), - HashSet::from_iter([QuickwitService::Metastore]), - HashSet::from_iter([QuickwitService::Indexer]), - HashSet::from_iter([QuickwitService::ControlPlane]), - HashSet::from_iter([QuickwitService::Janitor]), - ]; - let sandbox = ClusterSandbox::start_cluster_nodes(&nodes_services) + let sandbox = ClusterSandboxConfigBuilder::default() + .add_node([QuickwitService::Searcher]) + .add_node([QuickwitService::Metastore]) + .add_node([QuickwitService::Indexer]) + .add_node([QuickwitService::ControlPlane]) + .add_node([QuickwitService::Janitor]) + .build() .await - .unwrap(); - sandbox.wait_for_cluster_num_ready_nodes(5).await.unwrap(); + .start() + .await; { // Wait for indexer to fully start. diff --git a/quickwit/quickwit-integration-tests/src/tests/ingest_tests.rs b/quickwit/quickwit-integration-tests/src/tests/ingest_tests.rs index b4c3eab3325..41e295b5049 100644 --- a/quickwit/quickwit-integration-tests/src/tests/ingest_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/ingest_tests.rs @@ -17,7 +17,6 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::collections::HashSet; use std::time::Duration; use hyper::StatusCode; @@ -29,7 +28,7 @@ use quickwit_rest_client::rest_client::CommitType; use serde_json::json; use crate::ingest_json; -use crate::test_utils::{ingest_with_retry, ClusterSandbox}; +use crate::test_utils::{ingest_with_retry, ClusterSandboxConfigBuilder}; fn initialize_tests() { quickwit_common::setup_logging_for_tests(); @@ -39,7 +38,10 @@ fn initialize_tests() { #[tokio::test] async fn test_single_node_cluster() { initialize_tests(); - let mut sandbox = ClusterSandbox::start_standalone_node().await.unwrap(); + let mut sandbox = ClusterSandboxConfigBuilder::build_standalone() + .await + .start() + .await; let index_id = "test-single-node-cluster"; let index_config = format!( r#" @@ -59,7 +61,6 @@ async fn test_single_node_cluster() { index_id ); sandbox.enable_ingest_v2(); - sandbox.wait_for_cluster_num_ready_nodes(1).await.unwrap(); // Create the index. let current_index_metadata = sandbox @@ -203,20 +204,19 @@ async fn test_single_node_cluster() { #[tokio::test] async fn test_ingest_v2_index_not_found() { initialize_tests(); - let nodes_services = &[ - HashSet::from_iter([QuickwitService::Indexer, QuickwitService::Janitor]), - HashSet::from_iter([QuickwitService::Indexer, QuickwitService::Janitor]), - HashSet::from_iter([ + let mut sandbox = ClusterSandboxConfigBuilder::default() + .add_node([QuickwitService::Indexer, QuickwitService::Janitor]) + .add_node([QuickwitService::Indexer, QuickwitService::Janitor]) + .add_node([ QuickwitService::ControlPlane, QuickwitService::Metastore, QuickwitService::Searcher, - ]), - ]; - let mut sandbox = ClusterSandbox::start_cluster_nodes(&nodes_services[..]) + ]) + .build() .await - .unwrap(); + .start() + .await; sandbox.enable_ingest_v2(); - sandbox.wait_for_cluster_num_ready_nodes(3).await.unwrap(); let missing_index_err: Error = sandbox .indexer_rest_client .ingest( @@ -241,21 +241,19 @@ async fn test_ingest_v2_index_not_found() { #[tokio::test] async fn test_ingest_v2_happy_path() { initialize_tests(); - - let nodes_services = &[ - HashSet::from_iter([QuickwitService::Indexer, QuickwitService::Janitor]), - HashSet::from_iter([QuickwitService::Indexer, QuickwitService::Janitor]), - HashSet::from_iter([ + let mut sandbox = ClusterSandboxConfigBuilder::default() + .add_node([QuickwitService::Indexer, QuickwitService::Janitor]) + .add_node([QuickwitService::Indexer, QuickwitService::Janitor]) + .add_node([ QuickwitService::ControlPlane, QuickwitService::Metastore, QuickwitService::Searcher, - ]), - ]; - let mut sandbox = ClusterSandbox::start_cluster_nodes(&nodes_services[..]) + ]) + .build() .await - .unwrap(); + .start() + .await; sandbox.enable_ingest_v2(); - sandbox.wait_for_cluster_num_ready_nodes(3).await.unwrap(); let index_id = "test_happy_path"; let index_config = format!( r#" @@ -328,7 +326,10 @@ async fn test_ingest_v2_happy_path() { #[tokio::test] async fn test_commit_modes() { initialize_tests(); - let sandbox = ClusterSandbox::start_standalone_node().await.unwrap(); + let sandbox = ClusterSandboxConfigBuilder::build_standalone() + .await + .start() + .await; let index_id = "test_commit_modes"; let index_config = format!( r#" @@ -396,7 +397,10 @@ async fn test_commit_modes() { sandbox.assert_hit_count(index_id, "body:auto", 0).await; - tokio::time::sleep(Duration::from_secs(3)).await; + sandbox + .wait_for_splits(index_id, Some(vec![SplitState::Published]), 3) + .await + .unwrap(); sandbox.assert_hit_count(index_id, "body:auto", 1).await; @@ -407,18 +411,17 @@ async fn test_commit_modes() { #[tokio::test] async fn test_very_large_index_name() { initialize_tests(); - let nodes_services = vec![ - HashSet::from_iter([QuickwitService::Searcher]), - HashSet::from_iter([QuickwitService::Metastore]), - HashSet::from_iter([QuickwitService::Indexer]), - HashSet::from_iter([QuickwitService::ControlPlane]), - HashSet::from_iter([QuickwitService::Janitor]), - ]; - let mut sandbox = ClusterSandbox::start_cluster_nodes(&nodes_services) + let mut sandbox = ClusterSandboxConfigBuilder::default() + .add_node([QuickwitService::Searcher]) + .add_node([QuickwitService::Metastore]) + .add_node([QuickwitService::Indexer]) + .add_node([QuickwitService::ControlPlane]) + .add_node([QuickwitService::Janitor]) + .build() .await - .unwrap(); + .start() + .await; sandbox.enable_ingest_v2(); - sandbox.wait_for_cluster_num_ready_nodes(5).await.unwrap(); let index_id = "its_very_very_very_very_very_very_very_very_very_very_very_\ very_very_very_very_very_very_very_very_very_very_very_very_very_very_very_\ @@ -509,7 +512,10 @@ async fn test_very_large_index_name() { #[tokio::test] async fn test_shutdown_single_node() { initialize_tests(); - let mut sandbox = ClusterSandbox::start_standalone_node().await.unwrap(); + let mut sandbox = ClusterSandboxConfigBuilder::build_standalone() + .await + .start() + .await; let index_id = "test_shutdown_single_node"; sandbox.enable_ingest_v2(); @@ -571,18 +577,18 @@ async fn test_shutdown_single_node() { #[tokio::test] async fn test_shutdown_control_plane_early_shutdown() { initialize_tests(); - let nodes_services = vec![ - HashSet::from_iter([QuickwitService::Indexer]), - HashSet::from_iter([ + let sandbox = ClusterSandboxConfigBuilder::default() + .add_node([QuickwitService::Indexer]) + .add_node([ QuickwitService::ControlPlane, QuickwitService::Searcher, QuickwitService::Metastore, QuickwitService::Janitor, - ]), - ]; - let sandbox = ClusterSandbox::start_cluster_nodes(&nodes_services) + ]) + .build() .await - .unwrap(); + .start() + .await; let index_id = "test_shutdown_separate_indexer"; // TODO: make this test work with ingest v2 (#5068) @@ -632,18 +638,18 @@ async fn test_shutdown_control_plane_early_shutdown() { #[tokio::test] async fn test_shutdown_separate_indexer() { initialize_tests(); - let nodes_services = vec![ - HashSet::from_iter([QuickwitService::Indexer]), - HashSet::from_iter([ + let sandbox = ClusterSandboxConfigBuilder::default() + .add_node([QuickwitService::Indexer]) + .add_node([ QuickwitService::ControlPlane, QuickwitService::Searcher, QuickwitService::Metastore, QuickwitService::Janitor, - ]), - ]; - let sandbox = ClusterSandbox::start_cluster_nodes(&nodes_services) + ]) + .build() .await - .unwrap(); + .start() + .await; let index_id = "test_shutdown_separate_indexer"; // TODO: make this test work with ingest v2 (#5068) diff --git a/quickwit/quickwit-integration-tests/src/tests/otlp_tests.rs b/quickwit/quickwit-integration-tests/src/tests/otlp_tests.rs index 752fe45acf3..f8770322782 100644 --- a/quickwit/quickwit-integration-tests/src/tests/otlp_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/otlp_tests.rs @@ -37,7 +37,7 @@ use quickwit_proto::opentelemetry::proto::logs::v1::{LogRecord, ResourceLogs, Sc use quickwit_proto::opentelemetry::proto::trace::v1::{ResourceSpans, ScopeSpans, Span}; use tonic::codec::CompressionEncoding; -use crate::test_utils::ClusterSandbox; +use crate::test_utils::ClusterSandboxConfigBuilder; fn initialize_tests() { quickwit_common::setup_logging_for_tests(); @@ -47,16 +47,16 @@ fn initialize_tests() { #[tokio::test] async fn test_ingest_traces_with_otlp_grpc_api() { initialize_tests(); - let nodes_services = vec![ - HashSet::from_iter([QuickwitService::Searcher]), - HashSet::from_iter([QuickwitService::Metastore]), - HashSet::from_iter([QuickwitService::Indexer]), - HashSet::from_iter([QuickwitService::ControlPlane]), - HashSet::from_iter([QuickwitService::Janitor]), - ]; - let mut sandbox = ClusterSandbox::start_cluster_with_otlp_service(&nodes_services) + let mut sandbox = ClusterSandboxConfigBuilder::default() + .add_node([QuickwitService::Searcher]) + .add_node([QuickwitService::Metastore]) + .add_node_with_otlp([QuickwitService::Indexer]) + .add_node([QuickwitService::ControlPlane]) + .add_node([QuickwitService::Janitor]) + .build() .await - .unwrap(); + .start() + .await; // Wait for the pipelines to start (one for logs and one for traces) sandbox.wait_for_indexing_pipelines(2).await.unwrap(); @@ -142,16 +142,16 @@ async fn test_ingest_traces_with_otlp_grpc_api() { #[tokio::test] async fn test_ingest_logs_with_otlp_grpc_api() { initialize_tests(); - let nodes_services = vec![ - HashSet::from_iter([QuickwitService::Searcher]), - HashSet::from_iter([QuickwitService::Metastore]), - HashSet::from_iter([QuickwitService::Indexer]), - HashSet::from_iter([QuickwitService::ControlPlane]), - HashSet::from_iter([QuickwitService::Janitor]), - ]; - let mut sandbox = ClusterSandbox::start_cluster_with_otlp_service(&nodes_services) + let mut sandbox = ClusterSandboxConfigBuilder::default() + .add_node([QuickwitService::Searcher]) + .add_node([QuickwitService::Metastore]) + .add_node_with_otlp([QuickwitService::Indexer]) + .add_node([QuickwitService::ControlPlane]) + .add_node([QuickwitService::Janitor]) + .build() .await - .unwrap(); + .start() + .await; // Wait fo the pipelines to start (one for logs and one for traces) sandbox.wait_for_indexing_pipelines(2).await.unwrap(); @@ -218,16 +218,16 @@ async fn test_ingest_logs_with_otlp_grpc_api() { #[tokio::test] async fn test_jaeger_api() { initialize_tests(); - let nodes_services = vec![ - HashSet::from_iter([QuickwitService::Searcher]), - HashSet::from_iter([QuickwitService::Metastore]), - HashSet::from_iter([QuickwitService::Indexer]), - HashSet::from_iter([QuickwitService::ControlPlane]), - HashSet::from_iter([QuickwitService::Janitor]), - ]; - let mut sandbox = ClusterSandbox::start_cluster_with_otlp_service(&nodes_services) + let mut sandbox = ClusterSandboxConfigBuilder::default() + .add_node([QuickwitService::Searcher]) + .add_node([QuickwitService::Metastore]) + .add_node_with_otlp([QuickwitService::Indexer]) + .add_node([QuickwitService::ControlPlane]) + .add_node([QuickwitService::Janitor]) + .build() .await - .unwrap(); + .start() + .await; // Wait fo the pipelines to start (one for logs and one for traces) sandbox.wait_for_indexing_pipelines(2).await.unwrap(); diff --git a/quickwit/quickwit-integration-tests/src/tests/sqs_tests.rs b/quickwit/quickwit-integration-tests/src/tests/sqs_tests.rs index 419b1f04f2f..64abe0355ba 100644 --- a/quickwit/quickwit-integration-tests/src/tests/sqs_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/sqs_tests.rs @@ -32,7 +32,7 @@ use quickwit_serve::SearchRequestQueryString; use tempfile::NamedTempFile; use tracing::info; -use crate::test_utils::ClusterSandbox; +use crate::test_utils::ClusterSandboxConfigBuilder; fn create_mock_data_file(num_lines: usize) -> (NamedTempFile, Uri) { let mut temp_file = tempfile::NamedTempFile::new().unwrap(); @@ -48,7 +48,10 @@ fn create_mock_data_file(num_lines: usize) -> (NamedTempFile, Uri) { #[tokio::test] async fn test_sqs_single_node_cluster() { quickwit_common::setup_logging_for_tests(); - let sandbox = ClusterSandbox::start_standalone_node().await.unwrap(); + let sandbox = ClusterSandboxConfigBuilder::build_standalone() + .await + .start() + .await; let index_id = "test-sqs-source-single-node-cluster"; let index_config = format!( r#" @@ -68,8 +71,6 @@ async fn test_sqs_single_node_cluster() { let sqs_client = sqs_test_helpers::get_localstack_sqs_client().await.unwrap(); let queue_url = sqs_test_helpers::create_queue(&sqs_client, "test-single-node-cluster").await; - sandbox.wait_for_cluster_num_ready_nodes(1).await.unwrap(); - info!("create index"); sandbox .indexer_rest_client diff --git a/quickwit/quickwit-integration-tests/src/tests/update_tests/doc_mapping_tests.rs b/quickwit/quickwit-integration-tests/src/tests/update_tests/doc_mapping_tests.rs index cf44ed8bc54..d1b7ddcdef0 100644 --- a/quickwit/quickwit-integration-tests/src/tests/update_tests/doc_mapping_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/update_tests/doc_mapping_tests.rs @@ -22,7 +22,7 @@ use std::time::Duration; use serde_json::{json, Value}; use super::assert_hits_unordered; -use crate::test_utils::ClusterSandbox; +use crate::test_utils::ClusterSandboxConfigBuilder; /// Update the doc mapping between 2 calls to local-ingest (forces separate indexing pipelines) and /// assert the number of hits for the given query @@ -35,7 +35,10 @@ async fn validate_search_across_doc_mapping_updates( query_and_expect: &[(&str, Result<&[Value], ()>)], ) { quickwit_common::setup_logging_for_tests(); - let sandbox = ClusterSandbox::start_standalone_node().await.unwrap(); + let sandbox = ClusterSandboxConfigBuilder::build_standalone() + .await + .start() + .await; { // Wait for indexer to fully start. diff --git a/quickwit/quickwit-integration-tests/src/tests/update_tests/search_settings_tests.rs b/quickwit/quickwit-integration-tests/src/tests/update_tests/search_settings_tests.rs index 52d43627f22..7ab944278f1 100644 --- a/quickwit/quickwit-integration-tests/src/tests/update_tests/search_settings_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/update_tests/search_settings_tests.rs @@ -17,7 +17,6 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::collections::HashSet; use std::time::Duration; use quickwit_config::service::QuickwitService; @@ -26,22 +25,21 @@ use serde_json::json; use super::assert_hits_unordered; use crate::ingest_json; -use crate::test_utils::{ingest_with_retry, ClusterSandbox}; +use crate::test_utils::{ingest_with_retry, ClusterSandboxConfigBuilder}; #[tokio::test] async fn test_update_search_settings_on_multi_nodes_cluster() { quickwit_common::setup_logging_for_tests(); - let nodes_services = vec![ - HashSet::from_iter([QuickwitService::Searcher]), - HashSet::from_iter([QuickwitService::Metastore]), - HashSet::from_iter([QuickwitService::Indexer]), - HashSet::from_iter([QuickwitService::ControlPlane]), - HashSet::from_iter([QuickwitService::Janitor]), - ]; - let sandbox = ClusterSandbox::start_cluster_nodes(&nodes_services) + let sandbox = ClusterSandboxConfigBuilder::default() + .add_node([QuickwitService::Searcher]) + .add_node([QuickwitService::Metastore]) + .add_node([QuickwitService::Indexer]) + .add_node([QuickwitService::ControlPlane]) + .add_node([QuickwitService::Janitor]) + .build() .await - .unwrap(); - sandbox.wait_for_cluster_num_ready_nodes(5).await.unwrap(); + .start() + .await; { // Wait for indexer to fully start. diff --git a/quickwit/quickwit-serve/Cargo.toml b/quickwit/quickwit-serve/Cargo.toml index 22db2523d05..b82db775761 100644 --- a/quickwit/quickwit-serve/Cargo.toml +++ b/quickwit/quickwit-serve/Cargo.toml @@ -97,4 +97,4 @@ quickwit-storage = { workspace = true, features = ["testsuite"] } pprof = [ "dep:pprof" ] - +testsuite = [] diff --git a/quickwit/quickwit-serve/src/grpc.rs b/quickwit/quickwit-serve/src/grpc.rs index 7a1c24691ee..403ae46d853 100644 --- a/quickwit/quickwit-serve/src/grpc.rs +++ b/quickwit/quickwit-serve/src/grpc.rs @@ -18,7 +18,7 @@ // along with this program. If not, see . use std::collections::BTreeSet; -use std::net::SocketAddr; +use std::error::Error; use std::sync::Arc; use bytesize::ByteSize; @@ -32,7 +32,9 @@ use quickwit_proto::opentelemetry::proto::collector::logs::v1::logs_service_serv use quickwit_proto::opentelemetry::proto::collector::trace::v1::trace_service_server::TraceServiceServer; use quickwit_proto::search::search_service_server::SearchServiceServer; use quickwit_proto::tonic::codegen::CompressionEncoding; +use quickwit_proto::tonic::transport::server::TcpIncoming; use quickwit_proto::tonic::transport::Server; +use tokio::net::TcpListener; use tracing::*; use crate::developer_api::DeveloperApiServer; @@ -41,7 +43,7 @@ use crate::{QuickwitServices, INDEXING_GRPC_SERVER_METRICS_LAYER}; /// Starts and binds gRPC services to `grpc_listen_addr`. pub(crate) async fn start_grpc_server( - grpc_listen_addr: SocketAddr, + tcp_listener: TcpListener, max_message_size: ByteSize, services: Arc, readiness_trigger: BoxFutureInfaillible<()>, @@ -186,12 +188,16 @@ pub(crate) async fn start_grpc_server( .add_optional_service(otlp_trace_grpc_service) .add_optional_service(search_grpc_service); + let grpc_listen_addr = tcp_listener.local_addr()?; info!( enabled_grpc_services=?enabled_grpc_services, grpc_listen_addr=?grpc_listen_addr, "Starting gRPC server listening on {grpc_listen_addr}." ); - let serve_fut = server_router.serve_with_shutdown(grpc_listen_addr, shutdown_signal); + // nodelay=true and keepalive=None are the default values for Server::builder() + let tcp_incoming = TcpIncoming::from_listener(tcp_listener, true, None) + .map_err(|err: Box| anyhow::anyhow!(err))?; + let serve_fut = server_router.serve_with_incoming_shutdown(tcp_incoming, shutdown_signal); let (serve_res, _trigger_res) = tokio::join!(serve_fut, readiness_trigger); serve_res?; Ok(()) diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 2b5b7f123f9..8660f454a20 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -43,6 +43,7 @@ mod rest; mod rest_api_response; mod search_api; pub(crate) mod simple_list; +mod tcp_listener; mod template_api; mod ui_handler; @@ -115,6 +116,7 @@ use quickwit_search::{ SearchServiceClient, SearcherContext, SearcherPool, }; use quickwit_storage::{SplitCache, StorageResolver}; +pub use tcp_listener::TcpListenerResolver; use tokio::sync::oneshot; use tower::timeout::Timeout; use tower::ServiceBuilder; @@ -387,6 +389,7 @@ pub async fn serve_quickwit( runtimes_config: RuntimesConfig, metastore_resolver: MetastoreResolver, storage_resolver: StorageResolver, + tcp_listener_resolver: TcpListenerResolver, shutdown_signal: BoxFutureInfaillible<()>, env_filter_reload_fn: EnvFilterReloadFn, ) -> anyhow::Result> { @@ -712,7 +715,7 @@ pub async fn serve_quickwit( } }); let grpc_server = grpc::start_grpc_server( - grpc_listen_addr, + tcp_listener_resolver.resolve(grpc_listen_addr).await?, grpc_config.max_message_size, quickwit_services.clone(), grpc_readiness_trigger, @@ -732,7 +735,7 @@ pub async fn serve_quickwit( } }); let rest_server = rest::start_rest_server( - rest_listen_addr, + tcp_listener_resolver.resolve(rest_listen_addr).await?, quickwit_services, rest_readiness_trigger, rest_shutdown_signal, diff --git a/quickwit/quickwit-serve/src/rest.rs b/quickwit/quickwit-serve/src/rest.rs index 483961a7378..a021da9c6f7 100644 --- a/quickwit/quickwit-serve/src/rest.rs +++ b/quickwit/quickwit-serve/src/rest.rs @@ -18,13 +18,13 @@ // along with this program. If not, see . use std::fmt::Formatter; -use std::net::SocketAddr; use std::sync::Arc; use hyper::http::HeaderValue; use hyper::{http, Method, StatusCode}; use quickwit_common::tower::BoxFutureInfaillible; use quickwit_search::SearchService; +use tokio::net::TcpListener; use tower::make::Shared; use tower::ServiceBuilder; use tower_http::compression::predicate::{NotForContentType, Predicate, SizeAbove}; @@ -123,7 +123,7 @@ impl Predicate for CompressionPredicate { /// Starts REST services. pub(crate) async fn start_rest_server( - rest_listen_addr: SocketAddr, + tcp_listener: TcpListener, quickwit_services: Arc, readiness_trigger: BoxFutureInfaillible<()>, shutdown_signal: BoxFutureInfaillible<()>, @@ -209,24 +209,26 @@ pub(crate) async fn start_rest_server( .layer(cors) .service(warp_service); + let rest_listen_addr = tcp_listener.local_addr()?; info!( rest_listen_addr=?rest_listen_addr, "Starting REST server listening on {rest_listen_addr}." ); + let rest_listener_std = tcp_listener.into_std()?; // `graceful_shutdown()` seems to be blocking in presence of existing connections. // The following approach of dropping the serve supposedly is not bullet proof, but it seems to // work in our unit test. // // See more of the discussion here: // https://github.com/hyperium/hyper/issues/2386 + let serve_fut = async move { tokio::select! { - res = hyper::Server::bind(&rest_listen_addr).serve(Shared::new(service)) => { res } + res = hyper::Server::from_tcp(rest_listener_std)?.serve(Shared::new(service)) => { res } _ = shutdown_signal => { Ok(()) } } }; - let (serve_res, _trigger_res) = tokio::join!(serve_fut, readiness_trigger); serve_res?; Ok(()) diff --git a/quickwit/quickwit-serve/src/tcp_listener.rs b/quickwit/quickwit-serve/src/tcp_listener.rs new file mode 100644 index 00000000000..ebe7eeea831 --- /dev/null +++ b/quickwit/quickwit-serve/src/tcp_listener.rs @@ -0,0 +1,77 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::Arc; + +use anyhow::Context; +use tokio::net::TcpListener; +use tokio::sync::Mutex; + +/// A [`TcpListener`] constructor that can re-use existing listeners in tests. +/// +/// When building a local test cluster we want to reserve all the ports of all +/// the nodes before starting the first one. +#[derive(Clone)] +pub enum TcpListenerResolver { + Default, + #[cfg(any(test, feature = "testsuite"))] + Test(Arc>>), +} + +impl Default for TcpListenerResolver { + fn default() -> Self { + Self::Default + } +} + +#[cfg(any(test, feature = "testsuite"))] +impl TcpListenerResolver { + pub fn for_test() -> Self { + Self::Test(Arc::new(Mutex::new(HashMap::new()))) + } +} + +impl TcpListenerResolver { + pub async fn resolve(&self, addr: SocketAddr) -> anyhow::Result { + match self { + TcpListenerResolver::Default => TcpListener::bind(addr).await.map_err(|e| e.into()), + #[cfg(any(test, feature = "testsuite"))] + TcpListenerResolver::Test(listeners) => listeners + .lock() + .await + .remove(&addr) + .context(format!("No listener found for address {}", addr)), + } + } + + #[cfg(any(test, feature = "testsuite"))] + pub async fn add_listener(&self, listener: TcpListener) { + match self { + TcpListenerResolver::Default => { + panic!("Cannot add listener in default mode."); + } + TcpListenerResolver::Test(listeners) => listeners + .lock() + .await + .insert(listener.local_addr().unwrap(), listener), + }; + } +}