From 73cf386894c6f176767d3278eb0e724d204f2da7 Mon Sep 17 00:00:00 2001 From: baishen Date: Fri, 6 Dec 2024 13:26:26 +0800 Subject: [PATCH] feat(query): Support `flight_connection_max_retry_times` and `flight_connection_retry_interval` setting (#16856) * feat(query): Support `query_max_failures` setting * fix * cluster do_action retry * fix --- Cargo.lock | 1 + src/common/exception/Cargo.toml | 1 + src/common/exception/src/exception_into.rs | 7 ++ src/query/service/src/clusters/cluster.rs | 67 ++++++++++++++----- src/query/service/src/clusters/mod.rs | 1 + .../src/interpreters/interpreter_kill.rs | 9 ++- .../interpreters/interpreter_set_priority.rs | 9 ++- .../interpreters/interpreter_system_action.rs | 9 ++- .../interpreter_table_truncate.rs | 9 ++- .../src/servers/admin/v1/query_profiling.rs | 8 ++- .../flight/v1/exchange/exchange_manager.rs | 13 ++-- .../flight/v1/packets/packet_executor.rs | 2 +- .../flight/v1/packets/packet_publisher.rs | 5 +- src/query/settings/src/settings_default.rs | 12 ++++ .../settings/src/settings_getter_setter.rs | 8 +++ 15 files changed, 128 insertions(+), 33 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a0a638c02a87..ef546f926dec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3243,6 +3243,7 @@ dependencies = [ "geozero", "gimli 0.31.1", "http 1.1.0", + "hyper 1.4.1", "libc", "object", "once_cell", diff --git a/src/common/exception/Cargo.toml b/src/common/exception/Cargo.toml index bec53de42686..d4071ff4f757 100644 --- a/src/common/exception/Cargo.toml +++ b/src/common/exception/Cargo.toml @@ -21,6 +21,7 @@ bincode = { workspace = true } geozero = { workspace = true } gimli = { workspace = true } http = { workspace = true } +hyper = { workspace = true } libc = { workspace = true } object = { workspace = true } once_cell = { workspace = true } diff --git a/src/common/exception/src/exception_into.rs b/src/common/exception/src/exception_into.rs index f2d42093c023..43b4d185b8d8 100644 --- a/src/common/exception/src/exception_into.rs +++ b/src/common/exception/src/exception_into.rs @@ -353,6 +353,13 @@ impl From for ErrorCode { tonic::Code::Unknown => { let details = status.details(); if details.is_empty() { + if status.source().map_or(false, |e| e.is::()) { + return ErrorCode::CannotConnectNode(format!( + "{}, source: {:?}", + status.message(), + status.source() + )); + } return ErrorCode::UnknownException(format!( "{}, source: {:?}", status.message(), diff --git a/src/query/service/src/clusters/cluster.rs b/src/query/service/src/clusters/cluster.rs index 9c66a8b5770c..07bedc6f3beb 100644 --- a/src/query/service/src/clusters/cluster.rs +++ b/src/query/service/src/clusters/cluster.rs @@ -50,12 +50,14 @@ use futures::future::Either; use futures::Future; use futures::StreamExt; use log::error; +use log::info; use log::warn; use parking_lot::RwLock; use rand::thread_rng; use rand::Rng; use serde::Deserialize; use serde::Serialize; +use tokio::time::sleep; use crate::servers::flight::FlightClient; @@ -81,11 +83,11 @@ pub trait ClusterHelper { fn get_nodes(&self) -> Vec>; - async fn do_action Deserialize<'de> + Send>( + async fn do_action Deserialize<'de> + Send>( &self, path: &str, message: HashMap, - timeout: u64, + flight_params: FlightParams, ) -> Result>; } @@ -118,11 +120,11 @@ impl ClusterHelper for Cluster { self.nodes.to_vec() } - async fn do_action Deserialize<'de> + Send>( + async fn do_action Deserialize<'de> + Send>( &self, path: &str, message: HashMap, - timeout: u64, + flight_params: FlightParams, ) -> Result> { fn get_node<'a>(nodes: &'a [Arc], id: &str) -> Result<&'a Arc> { for node in nodes { @@ -137,23 +139,47 @@ impl ClusterHelper for Cluster { ))) } - let mut response = HashMap::with_capacity(message.len()); + let mut futures = Vec::with_capacity(message.len()); for (id, message) in message { let node = get_node(&self.nodes, &id)?; - let config = GlobalConfig::instance(); - let flight_address = node.flight_address.clone(); - let node_secret = node.secret.clone(); - - let mut conn = create_client(&config, &flight_address).await?; - response.insert( - id, - conn.do_action::<_, Res>(path, node_secret, message, timeout) - .await?, - ); + futures.push({ + let config = GlobalConfig::instance(); + let flight_address = node.flight_address.clone(); + let node_secret = node.secret.clone(); + + async move { + let mut attempt = 0; + + loop { + let mut conn = create_client(&config, &flight_address).await?; + match conn + .do_action::<_, Res>( + path, + node_secret.clone(), + message.clone(), + flight_params.timeout, + ) + .await + { + Ok(result) => return Ok((id, result)), + Err(e) + if e.code() == ErrorCode::CANNOT_CONNECT_NODE + && attempt < flight_params.retry_times => + { + // only retry when error is network problem + info!("retry do_action, attempt: {}", attempt); + attempt += 1; + sleep(Duration::from_secs(flight_params.retry_interval)).await; + } + Err(e) => return Err(e), + } + } + } + }); } - - Ok(response) + let responses: Vec<(String, Res)> = futures::future::try_join_all(futures).await?; + Ok(responses.into_iter().collect::>()) } } @@ -537,3 +563,10 @@ pub async fn create_client(config: &InnerConfig, address: &str) -> Result Result { let cluster = self.ctx.get_cluster(); let settings = self.ctx.get_settings(); - let timeout = settings.get_flight_client_timeout()?; + let flight_params = FlightParams { + timeout: settings.get_flight_client_timeout()?, + retry_times: settings.get_flight_max_retry_times()?, + retry_interval: settings.get_flight_retry_interval()?, + }; let mut message = HashMap::with_capacity(cluster.nodes.len()); @@ -65,7 +70,7 @@ impl KillInterpreter { } let res = cluster - .do_action::<_, bool>(KILL_QUERY, message, timeout) + .do_action::<_, bool>(KILL_QUERY, message, flight_params) .await?; match res.values().any(|x| *x) { diff --git a/src/query/service/src/interpreters/interpreter_set_priority.rs b/src/query/service/src/interpreters/interpreter_set_priority.rs index 0dda6b9dd656..1bf830b24be0 100644 --- a/src/query/service/src/interpreters/interpreter_set_priority.rs +++ b/src/query/service/src/interpreters/interpreter_set_priority.rs @@ -21,6 +21,7 @@ use databend_common_exception::Result; use databend_common_sql::plans::SetPriorityPlan; use crate::clusters::ClusterHelper; +use crate::clusters::FlightParams; use crate::interpreters::Interpreter; use crate::pipelines::PipelineBuildResult; use crate::servers::flight::v1::actions::SET_PRIORITY; @@ -61,9 +62,13 @@ impl SetPriorityInterpreter { } let settings = self.ctx.get_settings(); - let timeout = settings.get_flight_client_timeout()?; + let flight_params = FlightParams { + timeout: settings.get_flight_client_timeout()?, + retry_times: settings.get_flight_max_retry_times()?, + retry_interval: settings.get_flight_retry_interval()?, + }; let res = cluster - .do_action::<_, bool>(SET_PRIORITY, message, timeout) + .do_action::<_, bool>(SET_PRIORITY, message, flight_params) .await?; match res.values().any(|x| *x) { diff --git a/src/query/service/src/interpreters/interpreter_system_action.rs b/src/query/service/src/interpreters/interpreter_system_action.rs index 86e747e865ee..ca7e3ffb50c7 100644 --- a/src/query/service/src/interpreters/interpreter_system_action.rs +++ b/src/query/service/src/interpreters/interpreter_system_action.rs @@ -22,6 +22,7 @@ use databend_common_sql::plans::SystemAction; use databend_common_sql::plans::SystemPlan; use crate::clusters::ClusterHelper; +use crate::clusters::FlightParams; use crate::interpreters::Interpreter; use crate::pipelines::PipelineBuildResult; use crate::servers::flight::v1::actions::SYSTEM_ACTION; @@ -74,9 +75,13 @@ impl Interpreter for SystemActionInterpreter { } let settings = self.ctx.get_settings(); - let timeout = settings.get_flight_client_timeout()?; + let flight_params = FlightParams { + timeout: settings.get_flight_client_timeout()?, + retry_times: settings.get_flight_max_retry_times()?, + retry_interval: settings.get_flight_retry_interval()?, + }; cluster - .do_action::<_, ()>(SYSTEM_ACTION, message, timeout) + .do_action::<_, ()>(SYSTEM_ACTION, message, flight_params) .await?; } diff --git a/src/query/service/src/interpreters/interpreter_table_truncate.rs b/src/query/service/src/interpreters/interpreter_table_truncate.rs index 09a19f79cc43..70afa08e3866 100644 --- a/src/query/service/src/interpreters/interpreter_table_truncate.rs +++ b/src/query/service/src/interpreters/interpreter_table_truncate.rs @@ -21,6 +21,7 @@ use databend_common_exception::Result; use databend_common_sql::plans::TruncateTablePlan; use crate::clusters::ClusterHelper; +use crate::clusters::FlightParams; use crate::interpreters::Interpreter; use crate::pipelines::PipelineBuildResult; use crate::servers::flight::v1::actions::TRUNCATE_TABLE; @@ -95,9 +96,13 @@ impl Interpreter for TruncateTableInterpreter { } let settings = self.ctx.get_settings(); - let timeout = settings.get_flight_client_timeout()?; + let flight_params = FlightParams { + timeout: settings.get_flight_client_timeout()?, + retry_times: settings.get_flight_max_retry_times()?, + retry_interval: settings.get_flight_retry_interval()?, + }; cluster - .do_action::<_, ()>(TRUNCATE_TABLE, message, timeout) + .do_action::<_, ()>(TRUNCATE_TABLE, message, flight_params) .await?; } diff --git a/src/query/service/src/servers/admin/v1/query_profiling.rs b/src/query/service/src/servers/admin/v1/query_profiling.rs index 649c16baeb3a..a479f228ac17 100644 --- a/src/query/service/src/servers/admin/v1/query_profiling.rs +++ b/src/query/service/src/servers/admin/v1/query_profiling.rs @@ -30,6 +30,7 @@ use poem::IntoResponse; use crate::clusters::ClusterDiscovery; use crate::clusters::ClusterHelper; +use crate::clusters::FlightParams; use crate::servers::flight::v1::actions::GET_PROFILE; use crate::sessions::SessionManager; @@ -104,8 +105,13 @@ async fn get_cluster_profile(query_id: &str) -> Result, ErrorCo } } + let flight_params = FlightParams { + timeout: 60, + retry_times: 3, + retry_interval: 3, + }; let res = cluster - .do_action::<_, Option>>(GET_PROFILE, message, 60) + .do_action::<_, Option>>(GET_PROFILE, message, flight_params) .await?; match res.into_values().find(Option::is_some) { diff --git a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs index 3f929c99f594..1cbb961f798d 100644 --- a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs +++ b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs @@ -52,6 +52,7 @@ use super::exchange_transform::ExchangeTransform; use super::statistics_receiver::StatisticsReceiver; use super::statistics_sender::StatisticsSender; use crate::clusters::ClusterHelper; +use crate::clusters::FlightParams; use crate::pipelines::executor::ExecutorSettings; use crate::pipelines::executor::PipelineCompleteExecutor; use crate::pipelines::PipelineBuildResult; @@ -416,13 +417,17 @@ impl DataExchangeManager { actions: QueryFragmentsActions, ) -> Result { let settings = ctx.get_settings(); - let timeout = settings.get_flight_client_timeout()?; + let flight_params = FlightParams { + timeout: settings.get_flight_client_timeout()?, + retry_times: settings.get_flight_max_retry_times()?, + retry_interval: settings.get_flight_retry_interval()?, + }; let root_actions = actions.get_root_actions()?; let conf = GlobalConfig::instance(); // Initialize query env between cluster nodes let query_env = actions.get_query_env()?; - query_env.init(&ctx, timeout).await?; + query_env.init(&ctx, flight_params).await?; // Submit distributed tasks to all nodes. let cluster = ctx.get_cluster(); @@ -431,7 +436,7 @@ impl DataExchangeManager { let local_fragments = query_fragments.remove(&conf.query.node_id); let _: HashMap = cluster - .do_action(INIT_QUERY_FRAGMENTS, query_fragments, timeout) + .do_action(INIT_QUERY_FRAGMENTS, query_fragments, flight_params) .await?; self.set_ctx(&ctx.get_id(), ctx.clone())?; @@ -444,7 +449,7 @@ impl DataExchangeManager { let prepared_query = actions.prepared_query()?; let _: HashMap = cluster - .do_action(START_PREPARED_QUERY, prepared_query, timeout) + .do_action(START_PREPARED_QUERY, prepared_query, flight_params) .await?; Ok(build_res) diff --git a/src/query/service/src/servers/flight/v1/packets/packet_executor.rs b/src/query/service/src/servers/flight/v1/packets/packet_executor.rs index c555184d8291..29999b7727b3 100644 --- a/src/query/service/src/servers/flight/v1/packets/packet_executor.rs +++ b/src/query/service/src/servers/flight/v1/packets/packet_executor.rs @@ -14,7 +14,7 @@ use crate::servers::flight::v1::packets::QueryFragment; -#[derive(Debug, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct QueryFragments { pub query_id: String, pub fragments: Vec, diff --git a/src/query/service/src/servers/flight/v1/packets/packet_publisher.rs b/src/query/service/src/servers/flight/v1/packets/packet_publisher.rs index 33c5f20e11b7..12bb47862775 100644 --- a/src/query/service/src/servers/flight/v1/packets/packet_publisher.rs +++ b/src/query/service/src/servers/flight/v1/packets/packet_publisher.rs @@ -34,6 +34,7 @@ use serde::Deserialize; use serde::Serialize; use crate::clusters::ClusterHelper; +use crate::clusters::FlightParams; use crate::servers::flight::v1::actions::INIT_QUERY_ENV; use crate::sessions::QueryContext; use crate::sessions::SessionManager; @@ -140,7 +141,7 @@ pub struct QueryEnv { } impl QueryEnv { - pub async fn init(&self, ctx: &Arc, timeout: u64) -> Result<()> { + pub async fn init(&self, ctx: &Arc, flight_params: FlightParams) -> Result<()> { debug!("Dataflow diagram {:?}", self.dataflow_diagram); let cluster = ctx.get_cluster(); @@ -151,7 +152,7 @@ impl QueryEnv { } let _ = cluster - .do_action::<_, ()>(INIT_QUERY_ENV, message, timeout) + .do_action::<_, ()>(INIT_QUERY_ENV, message, flight_params) .await?; Ok(()) diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 6c6d39123687..ab561c5b43c9 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -958,6 +958,18 @@ impl DefaultSettings { mode: SettingMode::Both, range: Some(SettingRange::Numeric(0..=1)), }), + ("flight_connection_max_retry_times", DefaultSettingValue { + value: UserSettingValue::UInt64(3), + desc: "The maximum retry count for cluster flight. Disable if 0.", + mode: SettingMode::Both, + range: Some(SettingRange::Numeric(0..=10)), + }), + ("flight_connection_retry_interval", DefaultSettingValue { + value: UserSettingValue::UInt64(3), + desc: "The retry interval of cluster flight is in seconds.", + mode: SettingMode::Both, + range: Some(SettingRange::Numeric(0..=30)), + }), ]); Ok(Arc::new(DefaultSettings { diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 5dc160133268..b76b50d9dc33 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -825,4 +825,12 @@ impl Settings { pub fn get_persist_materialized_cte(&self) -> Result { Ok(self.try_get_u64("persist_materialized_cte")? != 0) } + + pub fn get_flight_max_retry_times(&self) -> Result { + self.try_get_u64("flight_connection_max_retry_times") + } + + pub fn get_flight_retry_interval(&self) -> Result { + self.try_get_u64("flight_connection_retry_interval") + } }