diff --git a/quickwit/quickwit-cli/src/service.rs b/quickwit/quickwit-cli/src/service.rs index 8041ad2a8fd..40693e9d4b5 100644 --- a/quickwit/quickwit-cli/src/service.rs +++ b/quickwit/quickwit-cli/src/service.rs @@ -27,7 +27,8 @@ use quickwit_common::runtimes::RuntimesConfig; use quickwit_common::uri::{Protocol, Uri}; use quickwit_config::service::QuickwitService; use quickwit_config::NodeConfig; -use quickwit_serve::{serve_quickwit, BuildInfo, EnvFilterReloadFn, TcpListenerResolver}; +use quickwit_serve::tcp_listener::DefaultTcpListenerResolver; +use quickwit_serve::{serve_quickwit, BuildInfo, EnvFilterReloadFn}; use quickwit_telemetry::payload::{QuickwitFeature, QuickwitTelemetryInfo, TelemetryEvent}; use tokio::signal; use tracing::{debug, info}; @@ -114,7 +115,7 @@ impl RunCliCommand { runtimes_config, metastore_resolver, storage_resolver, - TcpListenerResolver::default(), + DefaultTcpListenerResolver, shutdown_signal, env_filter_reload_fn, ) diff --git a/quickwit/quickwit-config/src/node_config/mod.rs b/quickwit/quickwit-config/src/node_config/mod.rs index cc3d9e1ce37..2321bd0399a 100644 --- a/quickwit/quickwit-config/src/node_config/mod.rs +++ b/quickwit/quickwit-config/src/node_config/mod.rs @@ -480,9 +480,12 @@ impl NodeConfig { self.storage_configs.redact(); } + /// Creates a config with defaults suitable for testing. + /// + /// Uses the default ports without ensuring that they are available. #[cfg(any(test, feature = "testsuite"))] pub fn for_test() -> Self { - serialize::node_config_for_tests() + serialize::node_config_for_tests_from_ports(7280, 7281) } #[cfg(any(test, feature = "testsuite"))] diff --git a/quickwit/quickwit-config/src/node_config/serialize.rs b/quickwit/quickwit-config/src/node_config/serialize.rs index 0308bd6eeb9..81b9260f01d 100644 --- a/quickwit/quickwit-config/src/node_config/serialize.rs +++ b/quickwit/quickwit-config/src/node_config/serialize.rs @@ -467,15 +467,6 @@ pub fn node_config_for_tests_from_ports( } } -/// Creates a config with defaults that are suitable for testing. -/// -/// Uses the default ports without ensuring that they are available. -#[cfg(any(test, feature = "testsuite"))] -pub fn node_config_for_tests() -> NodeConfig { - let default_rest = default_rest_listen_port(); - node_config_for_tests_from_ports(default_rest, default_rest + 1) -} - #[cfg(test)] mod tests { use std::env; diff --git a/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs b/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs index 3d83c6a006a..f6631e8114d 100644 --- a/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs +++ b/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs @@ -43,9 +43,8 @@ use quickwit_rest_client::models::IngestSource; use quickwit_rest_client::rest_client::{ CommitType, QuickwitClient, QuickwitClientBuilder, DEFAULT_BASE_URL, }; -use quickwit_serve::{ - serve_quickwit, ListSplitsQueryParams, SearchRequestQueryString, TcpListenerResolver, -}; +use quickwit_serve::tcp_listener::for_tests::TestTcpListenerResolver; +use quickwit_serve::{serve_quickwit, ListSplitsQueryParams, SearchRequestQueryString}; use quickwit_storage::StorageResolver; use reqwest::Url; use serde_json::Value; @@ -76,8 +75,8 @@ pub struct ClusterSandbox { pub struct ClusterSandboxConfig { temp_dir: TempDir, - pub node_configs: Vec<(NodeConfig, HashSet)>, - tcp_listener_resolver: TcpListenerResolver, + node_configs: Vec<(NodeConfig, HashSet)>, + tcp_listener_resolver: TestTcpListenerResolver, } pub struct ClusterSandboxConfigBuilder { @@ -136,7 +135,7 @@ impl ClusterSandboxConfigBuilder { let mut resolved_node_configs = Vec::new(); let mut peers: Vec = Vec::new(); let unique_dir_name = new_coolid("test-dir"); - let tcp_listener_resolver = TcpListenerResolver::for_test(); + let tcp_listener_resolver = TestTcpListenerResolver::default(); for (node_idx, node_builder) in self.node_configs.iter().enumerate() { let socket: SocketAddr = ([127, 0, 0, 1], 0u16).into(); let rest_tcp_listener = TcpListener::bind(socket).await.unwrap(); diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 8660f454a20..4de79fe5d1b 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -43,7 +43,7 @@ mod rest; mod rest_api_response; mod search_api; pub(crate) mod simple_list; -mod tcp_listener; +pub mod tcp_listener; mod template_api; mod ui_handler; @@ -116,7 +116,7 @@ use quickwit_search::{ SearchServiceClient, SearcherContext, SearcherPool, }; use quickwit_storage::{SplitCache, StorageResolver}; -pub use tcp_listener::TcpListenerResolver; +use tcp_listener::TcpListenerResolver; use tokio::sync::oneshot; use tower::timeout::Timeout; use tower::ServiceBuilder; @@ -389,7 +389,7 @@ pub async fn serve_quickwit( runtimes_config: RuntimesConfig, metastore_resolver: MetastoreResolver, storage_resolver: StorageResolver, - tcp_listener_resolver: TcpListenerResolver, + tcp_listener_resolver: impl TcpListenerResolver, shutdown_signal: BoxFutureInfaillible<()>, env_filter_reload_fn: EnvFilterReloadFn, ) -> anyhow::Result> { diff --git a/quickwit/quickwit-serve/src/tcp_listener.rs b/quickwit/quickwit-serve/src/tcp_listener.rs index ebe7eeea831..749690c648d 100644 --- a/quickwit/quickwit-serve/src/tcp_listener.rs +++ b/quickwit/quickwit-serve/src/tcp_listener.rs @@ -17,61 +17,65 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::collections::HashMap; use std::net::SocketAddr; -use std::sync::Arc; -use anyhow::Context; +use quickwit_proto::tonic; use tokio::net::TcpListener; -use tokio::sync::Mutex; +use tonic::async_trait; -/// A [`TcpListener`] constructor that can re-use existing listeners in tests. +/// Resolve `SocketAddr` into `TcpListener` instances. /// -/// When building a local test cluster we want to reserve all the ports of all -/// the nodes before starting the first one. -#[derive(Clone)] -pub enum TcpListenerResolver { - Default, - #[cfg(any(test, feature = "testsuite"))] - Test(Arc>>), +/// This trait can be used to inject existing [`TcpListener`] instances to the +/// Quickwit REST and gRPC servers when running them in tests. +#[async_trait] +pub trait TcpListenerResolver: Clone + Send + 'static { + async fn resolve(&self, addr: SocketAddr) -> anyhow::Result; } -impl Default for TcpListenerResolver { - fn default() -> Self { - Self::Default +#[derive(Clone)] +pub struct DefaultTcpListenerResolver; + +#[async_trait] +impl TcpListenerResolver for DefaultTcpListenerResolver { + async fn resolve(&self, addr: SocketAddr) -> anyhow::Result { + TcpListener::bind(addr) + .await + .map_err(|err| anyhow::anyhow!(err)) } } #[cfg(any(test, feature = "testsuite"))] -impl TcpListenerResolver { - pub fn for_test() -> Self { - Self::Test(Arc::new(Mutex::new(HashMap::new()))) +pub mod for_tests { + use std::collections::HashMap; + use std::sync::Arc; + + use anyhow::Context; + use tokio::sync::Mutex; + + use super::*; + + #[derive(Clone, Default)] + pub struct TestTcpListenerResolver { + listeners: Arc>>, } -} -impl TcpListenerResolver { - pub async fn resolve(&self, addr: SocketAddr) -> anyhow::Result { - match self { - TcpListenerResolver::Default => TcpListener::bind(addr).await.map_err(|e| e.into()), - #[cfg(any(test, feature = "testsuite"))] - TcpListenerResolver::Test(listeners) => listeners + #[async_trait] + impl TcpListenerResolver for TestTcpListenerResolver { + async fn resolve(&self, addr: SocketAddr) -> anyhow::Result { + self.listeners .lock() .await .remove(&addr) - .context(format!("No listener found for address {}", addr)), + .context(format!("No listener found for address {}", addr)) } } - #[cfg(any(test, feature = "testsuite"))] - pub async fn add_listener(&self, listener: TcpListener) { - match self { - TcpListenerResolver::Default => { - panic!("Cannot add listener in default mode."); - } - TcpListenerResolver::Test(listeners) => listeners + impl TestTcpListenerResolver { + pub async fn add_listener(&self, listener: TcpListener) { + self.listeners .lock() .await - .insert(listener.local_addr().unwrap(), listener), - }; + .insert(listener.local_addr().unwrap(), listener); + } } }