Skip to content

Commit

Permalink
Use static dispatch instead of enum
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Aug 29, 2024
1 parent 0fd1322 commit e76d765
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 56 deletions.
5 changes: 3 additions & 2 deletions quickwit/quickwit-cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ use quickwit_common::runtimes::RuntimesConfig;
use quickwit_common::uri::{Protocol, Uri};
use quickwit_config::service::QuickwitService;
use quickwit_config::NodeConfig;
use quickwit_serve::{serve_quickwit, BuildInfo, EnvFilterReloadFn, TcpListenerResolver};
use quickwit_serve::tcp_listener::DefaultTcpListenerResolver;
use quickwit_serve::{serve_quickwit, BuildInfo, EnvFilterReloadFn};
use quickwit_telemetry::payload::{QuickwitFeature, QuickwitTelemetryInfo, TelemetryEvent};
use tokio::signal;
use tracing::{debug, info};
Expand Down Expand Up @@ -114,7 +115,7 @@ impl RunCliCommand {
runtimes_config,
metastore_resolver,
storage_resolver,
TcpListenerResolver::default(),
DefaultTcpListenerResolver,
shutdown_signal,
env_filter_reload_fn,
)
Expand Down
5 changes: 4 additions & 1 deletion quickwit/quickwit-config/src/node_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,9 +480,12 @@ impl NodeConfig {
self.storage_configs.redact();
}

/// Creates a config with defaults suitable for testing.
///
/// Uses the default ports without ensuring that they are available.
#[cfg(any(test, feature = "testsuite"))]
pub fn for_test() -> Self {
serialize::node_config_for_tests()
serialize::node_config_for_tests_from_ports(7280, 7281)
}

#[cfg(any(test, feature = "testsuite"))]
Expand Down
9 changes: 0 additions & 9 deletions quickwit/quickwit-config/src/node_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,15 +467,6 @@ pub fn node_config_for_tests_from_ports(
}
}

/// Creates a config with defaults that are suitable for testing.
///
/// Uses the default ports without ensuring that they are available.
#[cfg(any(test, feature = "testsuite"))]
pub fn node_config_for_tests() -> NodeConfig {
let default_rest = default_rest_listen_port();
node_config_for_tests_from_ports(default_rest, default_rest + 1)
}

#[cfg(test)]
mod tests {
use std::env;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,8 @@ use quickwit_rest_client::models::IngestSource;
use quickwit_rest_client::rest_client::{
CommitType, QuickwitClient, QuickwitClientBuilder, DEFAULT_BASE_URL,
};
use quickwit_serve::{
serve_quickwit, ListSplitsQueryParams, SearchRequestQueryString, TcpListenerResolver,
};
use quickwit_serve::tcp_listener::for_tests::TestTcpListenerResolver;
use quickwit_serve::{serve_quickwit, ListSplitsQueryParams, SearchRequestQueryString};
use quickwit_storage::StorageResolver;
use reqwest::Url;
use serde_json::Value;
Expand Down Expand Up @@ -76,8 +75,8 @@ pub struct ClusterSandbox {

pub struct ClusterSandboxConfig {
temp_dir: TempDir,
pub node_configs: Vec<(NodeConfig, HashSet<QuickwitService>)>,
tcp_listener_resolver: TcpListenerResolver,
node_configs: Vec<(NodeConfig, HashSet<QuickwitService>)>,
tcp_listener_resolver: TestTcpListenerResolver,
}

pub struct ClusterSandboxConfigBuilder {
Expand Down Expand Up @@ -136,7 +135,7 @@ impl ClusterSandboxConfigBuilder {
let mut resolved_node_configs = Vec::new();
let mut peers: Vec<String> = Vec::new();
let unique_dir_name = new_coolid("test-dir");
let tcp_listener_resolver = TcpListenerResolver::for_test();
let tcp_listener_resolver = TestTcpListenerResolver::default();
for (node_idx, node_builder) in self.node_configs.iter().enumerate() {
let socket: SocketAddr = ([127, 0, 0, 1], 0u16).into();
let rest_tcp_listener = TcpListener::bind(socket).await.unwrap();
Expand Down
6 changes: 3 additions & 3 deletions quickwit/quickwit-serve/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ mod rest;
mod rest_api_response;
mod search_api;
pub(crate) mod simple_list;
mod tcp_listener;
pub mod tcp_listener;
mod template_api;
mod ui_handler;

Expand Down Expand Up @@ -116,7 +116,7 @@ use quickwit_search::{
SearchServiceClient, SearcherContext, SearcherPool,
};
use quickwit_storage::{SplitCache, StorageResolver};
pub use tcp_listener::TcpListenerResolver;
use tcp_listener::TcpListenerResolver;
use tokio::sync::oneshot;
use tower::timeout::Timeout;
use tower::ServiceBuilder;
Expand Down Expand Up @@ -389,7 +389,7 @@ pub async fn serve_quickwit(
runtimes_config: RuntimesConfig,
metastore_resolver: MetastoreResolver,
storage_resolver: StorageResolver,
tcp_listener_resolver: TcpListenerResolver,
tcp_listener_resolver: impl TcpListenerResolver,
shutdown_signal: BoxFutureInfaillible<()>,
env_filter_reload_fn: EnvFilterReloadFn,
) -> anyhow::Result<HashMap<String, ActorExitStatus>> {
Expand Down
74 changes: 39 additions & 35 deletions quickwit/quickwit-serve/src/tcp_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,61 +17,65 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;

use anyhow::Context;
use quickwit_proto::tonic;
use tokio::net::TcpListener;
use tokio::sync::Mutex;
use tonic::async_trait;

/// A [`TcpListener`] constructor that can re-use existing listeners in tests.
/// Resolve `SocketAddr` into `TcpListener` instances.
///
/// When building a local test cluster we want to reserve all the ports of all
/// the nodes before starting the first one.
#[derive(Clone)]
pub enum TcpListenerResolver {
Default,
#[cfg(any(test, feature = "testsuite"))]
Test(Arc<Mutex<HashMap<SocketAddr, TcpListener>>>),
/// This trait can be used to inject existing [`TcpListener`] instances to the
/// Quickwit REST and gRPC servers when running them in tests.
#[async_trait]
pub trait TcpListenerResolver: Clone + Send + 'static {
async fn resolve(&self, addr: SocketAddr) -> anyhow::Result<TcpListener>;
}

impl Default for TcpListenerResolver {
fn default() -> Self {
Self::Default
#[derive(Clone)]
pub struct DefaultTcpListenerResolver;

#[async_trait]
impl TcpListenerResolver for DefaultTcpListenerResolver {
async fn resolve(&self, addr: SocketAddr) -> anyhow::Result<TcpListener> {
TcpListener::bind(addr)
.await
.map_err(|err| anyhow::anyhow!(err))
}
}

#[cfg(any(test, feature = "testsuite"))]
impl TcpListenerResolver {
pub fn for_test() -> Self {
Self::Test(Arc::new(Mutex::new(HashMap::new())))
pub mod for_tests {
use std::collections::HashMap;
use std::sync::Arc;

use anyhow::Context;
use tokio::sync::Mutex;

use super::*;

#[derive(Clone, Default)]
pub struct TestTcpListenerResolver {
listeners: Arc<Mutex<HashMap<SocketAddr, TcpListener>>>,
}
}

impl TcpListenerResolver {
pub async fn resolve(&self, addr: SocketAddr) -> anyhow::Result<TcpListener> {
match self {
TcpListenerResolver::Default => TcpListener::bind(addr).await.map_err(|e| e.into()),
#[cfg(any(test, feature = "testsuite"))]
TcpListenerResolver::Test(listeners) => listeners
#[async_trait]
impl TcpListenerResolver for TestTcpListenerResolver {
async fn resolve(&self, addr: SocketAddr) -> anyhow::Result<TcpListener> {
self.listeners
.lock()
.await
.remove(&addr)
.context(format!("No listener found for address {}", addr)),
.context(format!("No listener found for address {}", addr))
}
}

#[cfg(any(test, feature = "testsuite"))]
pub async fn add_listener(&self, listener: TcpListener) {
match self {
TcpListenerResolver::Default => {
panic!("Cannot add listener in default mode.");
}
TcpListenerResolver::Test(listeners) => listeners
impl TestTcpListenerResolver {
pub async fn add_listener(&self, listener: TcpListener) {
self.listeners
.lock()
.await
.insert(listener.local_addr().unwrap(), listener),
};
.insert(listener.local_addr().unwrap(), listener);
}
}
}

0 comments on commit e76d765

Please sign in to comment.