Skip to content

Commit

Permalink
feat(query): Support flight_connection_max_retry_times and `flight_…
Browse files Browse the repository at this point in the history
…connection_retry_interval` setting (#16856)

* feat(query): Support `query_max_failures` setting

* fix

* cluster do_action retry

* fix
  • Loading branch information
b41sh authored Dec 6, 2024
1 parent 2ede35d commit 73cf386
Show file tree
Hide file tree
Showing 15 changed files with 128 additions and 33 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/common/exception/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ bincode = { workspace = true }
geozero = { workspace = true }
gimli = { workspace = true }
http = { workspace = true }
hyper = { workspace = true }
libc = { workspace = true }
object = { workspace = true }
once_cell = { workspace = true }
Expand Down
7 changes: 7 additions & 0 deletions src/common/exception/src/exception_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,13 @@ impl From<tonic::Status> for ErrorCode {
tonic::Code::Unknown => {
let details = status.details();
if details.is_empty() {
if status.source().map_or(false, |e| e.is::<hyper::Error>()) {
return ErrorCode::CannotConnectNode(format!(
"{}, source: {:?}",
status.message(),
status.source()
));
}
return ErrorCode::UnknownException(format!(
"{}, source: {:?}",
status.message(),
Expand Down
67 changes: 50 additions & 17 deletions src/query/service/src/clusters/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,14 @@ use futures::future::Either;
use futures::Future;
use futures::StreamExt;
use log::error;
use log::info;
use log::warn;
use parking_lot::RwLock;
use rand::thread_rng;
use rand::Rng;
use serde::Deserialize;
use serde::Serialize;
use tokio::time::sleep;

use crate::servers::flight::FlightClient;

Expand All @@ -81,11 +83,11 @@ pub trait ClusterHelper {

fn get_nodes(&self) -> Vec<Arc<NodeInfo>>;

async fn do_action<T: Serialize + Send, Res: for<'de> Deserialize<'de> + Send>(
async fn do_action<T: Serialize + Send + Clone, Res: for<'de> Deserialize<'de> + Send>(
&self,
path: &str,
message: HashMap<String, T>,
timeout: u64,
flight_params: FlightParams,
) -> Result<HashMap<String, Res>>;
}

Expand Down Expand Up @@ -118,11 +120,11 @@ impl ClusterHelper for Cluster {
self.nodes.to_vec()
}

async fn do_action<T: Serialize + Send, Res: for<'de> Deserialize<'de> + Send>(
async fn do_action<T: Serialize + Send + Clone, Res: for<'de> Deserialize<'de> + Send>(
&self,
path: &str,
message: HashMap<String, T>,
timeout: u64,
flight_params: FlightParams,
) -> Result<HashMap<String, Res>> {
fn get_node<'a>(nodes: &'a [Arc<NodeInfo>], id: &str) -> Result<&'a Arc<NodeInfo>> {
for node in nodes {
Expand All @@ -137,23 +139,47 @@ impl ClusterHelper for Cluster {
)))
}

let mut response = HashMap::with_capacity(message.len());
let mut futures = Vec::with_capacity(message.len());
for (id, message) in message {
let node = get_node(&self.nodes, &id)?;

let config = GlobalConfig::instance();
let flight_address = node.flight_address.clone();
let node_secret = node.secret.clone();

let mut conn = create_client(&config, &flight_address).await?;
response.insert(
id,
conn.do_action::<_, Res>(path, node_secret, message, timeout)
.await?,
);
futures.push({
let config = GlobalConfig::instance();
let flight_address = node.flight_address.clone();
let node_secret = node.secret.clone();

async move {
let mut attempt = 0;

loop {
let mut conn = create_client(&config, &flight_address).await?;
match conn
.do_action::<_, Res>(
path,
node_secret.clone(),
message.clone(),
flight_params.timeout,
)
.await
{
Ok(result) => return Ok((id, result)),
Err(e)
if e.code() == ErrorCode::CANNOT_CONNECT_NODE
&& attempt < flight_params.retry_times =>
{
// only retry when error is network problem
info!("retry do_action, attempt: {}", attempt);
attempt += 1;
sleep(Duration::from_secs(flight_params.retry_interval)).await;
}
Err(e) => return Err(e),
}
}
}
});
}

Ok(response)
let responses: Vec<(String, Res)> = futures::future::try_join_all(futures).await?;
Ok(responses.into_iter().collect::<HashMap<String, Res>>())
}
}

Expand Down Expand Up @@ -537,3 +563,10 @@ pub async fn create_client(config: &InnerConfig, address: &str) -> Result<Flight
ConnectionFactory::create_rpc_channel(address.to_owned(), timeout, rpc_tls_config).await?,
)))
}

#[derive(Clone, Copy, Debug)]
pub struct FlightParams {
pub(crate) timeout: u64,
pub(crate) retry_times: u64,
pub(crate) retry_interval: u64,
}
1 change: 1 addition & 0 deletions src/query/service/src/clusters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ mod cluster;
pub use cluster::Cluster;
pub use cluster::ClusterDiscovery;
pub use cluster::ClusterHelper;
pub use cluster::FlightParams;
9 changes: 7 additions & 2 deletions src/query/service/src/interpreters/interpreter_kill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use databend_common_exception::Result;
use databend_common_sql::plans::KillPlan;

use crate::clusters::ClusterHelper;
use crate::clusters::FlightParams;
use crate::interpreters::Interpreter;
use crate::pipelines::PipelineBuildResult;
use crate::servers::flight::v1::actions::KILL_QUERY;
Expand Down Expand Up @@ -54,7 +55,11 @@ impl KillInterpreter {
async fn kill_cluster_query(&self) -> Result<PipelineBuildResult> {
let cluster = self.ctx.get_cluster();
let settings = self.ctx.get_settings();
let timeout = settings.get_flight_client_timeout()?;
let flight_params = FlightParams {
timeout: settings.get_flight_client_timeout()?,
retry_times: settings.get_flight_max_retry_times()?,
retry_interval: settings.get_flight_retry_interval()?,
};

let mut message = HashMap::with_capacity(cluster.nodes.len());

Expand All @@ -65,7 +70,7 @@ impl KillInterpreter {
}

let res = cluster
.do_action::<_, bool>(KILL_QUERY, message, timeout)
.do_action::<_, bool>(KILL_QUERY, message, flight_params)
.await?;

match res.values().any(|x| *x) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use databend_common_exception::Result;
use databend_common_sql::plans::SetPriorityPlan;

use crate::clusters::ClusterHelper;
use crate::clusters::FlightParams;
use crate::interpreters::Interpreter;
use crate::pipelines::PipelineBuildResult;
use crate::servers::flight::v1::actions::SET_PRIORITY;
Expand Down Expand Up @@ -61,9 +62,13 @@ impl SetPriorityInterpreter {
}

let settings = self.ctx.get_settings();
let timeout = settings.get_flight_client_timeout()?;
let flight_params = FlightParams {
timeout: settings.get_flight_client_timeout()?,
retry_times: settings.get_flight_max_retry_times()?,
retry_interval: settings.get_flight_retry_interval()?,
};
let res = cluster
.do_action::<_, bool>(SET_PRIORITY, message, timeout)
.do_action::<_, bool>(SET_PRIORITY, message, flight_params)
.await?;

match res.values().any(|x| *x) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use databend_common_sql::plans::SystemAction;
use databend_common_sql::plans::SystemPlan;

use crate::clusters::ClusterHelper;
use crate::clusters::FlightParams;
use crate::interpreters::Interpreter;
use crate::pipelines::PipelineBuildResult;
use crate::servers::flight::v1::actions::SYSTEM_ACTION;
Expand Down Expand Up @@ -74,9 +75,13 @@ impl Interpreter for SystemActionInterpreter {
}

let settings = self.ctx.get_settings();
let timeout = settings.get_flight_client_timeout()?;
let flight_params = FlightParams {
timeout: settings.get_flight_client_timeout()?,
retry_times: settings.get_flight_max_retry_times()?,
retry_interval: settings.get_flight_retry_interval()?,
};
cluster
.do_action::<_, ()>(SYSTEM_ACTION, message, timeout)
.do_action::<_, ()>(SYSTEM_ACTION, message, flight_params)
.await?;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use databend_common_exception::Result;
use databend_common_sql::plans::TruncateTablePlan;

use crate::clusters::ClusterHelper;
use crate::clusters::FlightParams;
use crate::interpreters::Interpreter;
use crate::pipelines::PipelineBuildResult;
use crate::servers::flight::v1::actions::TRUNCATE_TABLE;
Expand Down Expand Up @@ -95,9 +96,13 @@ impl Interpreter for TruncateTableInterpreter {
}

let settings = self.ctx.get_settings();
let timeout = settings.get_flight_client_timeout()?;
let flight_params = FlightParams {
timeout: settings.get_flight_client_timeout()?,
retry_times: settings.get_flight_max_retry_times()?,
retry_interval: settings.get_flight_retry_interval()?,
};
cluster
.do_action::<_, ()>(TRUNCATE_TABLE, message, timeout)
.do_action::<_, ()>(TRUNCATE_TABLE, message, flight_params)
.await?;
}

Expand Down
8 changes: 7 additions & 1 deletion src/query/service/src/servers/admin/v1/query_profiling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use poem::IntoResponse;

use crate::clusters::ClusterDiscovery;
use crate::clusters::ClusterHelper;
use crate::clusters::FlightParams;
use crate::servers::flight::v1::actions::GET_PROFILE;
use crate::sessions::SessionManager;

Expand Down Expand Up @@ -104,8 +105,13 @@ async fn get_cluster_profile(query_id: &str) -> Result<Vec<PlanProfile>, ErrorCo
}
}

let flight_params = FlightParams {
timeout: 60,
retry_times: 3,
retry_interval: 3,
};
let res = cluster
.do_action::<_, Option<Vec<PlanProfile>>>(GET_PROFILE, message, 60)
.do_action::<_, Option<Vec<PlanProfile>>>(GET_PROFILE, message, flight_params)
.await?;

match res.into_values().find(Option::is_some) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use super::exchange_transform::ExchangeTransform;
use super::statistics_receiver::StatisticsReceiver;
use super::statistics_sender::StatisticsSender;
use crate::clusters::ClusterHelper;
use crate::clusters::FlightParams;
use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelineCompleteExecutor;
use crate::pipelines::PipelineBuildResult;
Expand Down Expand Up @@ -416,13 +417,17 @@ impl DataExchangeManager {
actions: QueryFragmentsActions,
) -> Result<PipelineBuildResult> {
let settings = ctx.get_settings();
let timeout = settings.get_flight_client_timeout()?;
let flight_params = FlightParams {
timeout: settings.get_flight_client_timeout()?,
retry_times: settings.get_flight_max_retry_times()?,
retry_interval: settings.get_flight_retry_interval()?,
};
let root_actions = actions.get_root_actions()?;
let conf = GlobalConfig::instance();

// Initialize query env between cluster nodes
let query_env = actions.get_query_env()?;
query_env.init(&ctx, timeout).await?;
query_env.init(&ctx, flight_params).await?;

// Submit distributed tasks to all nodes.
let cluster = ctx.get_cluster();
Expand All @@ -431,7 +436,7 @@ impl DataExchangeManager {
let local_fragments = query_fragments.remove(&conf.query.node_id);

let _: HashMap<String, ()> = cluster
.do_action(INIT_QUERY_FRAGMENTS, query_fragments, timeout)
.do_action(INIT_QUERY_FRAGMENTS, query_fragments, flight_params)
.await?;

self.set_ctx(&ctx.get_id(), ctx.clone())?;
Expand All @@ -444,7 +449,7 @@ impl DataExchangeManager {

let prepared_query = actions.prepared_query()?;
let _: HashMap<String, ()> = cluster
.do_action(START_PREPARED_QUERY, prepared_query, timeout)
.do_action(START_PREPARED_QUERY, prepared_query, flight_params)
.await?;

Ok(build_res)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use crate::servers::flight::v1::packets::QueryFragment;

#[derive(Debug, serde::Serialize, serde::Deserialize)]
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct QueryFragments {
pub query_id: String,
pub fragments: Vec<QueryFragment>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use serde::Deserialize;
use serde::Serialize;

use crate::clusters::ClusterHelper;
use crate::clusters::FlightParams;
use crate::servers::flight::v1::actions::INIT_QUERY_ENV;
use crate::sessions::QueryContext;
use crate::sessions::SessionManager;
Expand Down Expand Up @@ -140,7 +141,7 @@ pub struct QueryEnv {
}

impl QueryEnv {
pub async fn init(&self, ctx: &Arc<QueryContext>, timeout: u64) -> Result<()> {
pub async fn init(&self, ctx: &Arc<QueryContext>, flight_params: FlightParams) -> Result<()> {
debug!("Dataflow diagram {:?}", self.dataflow_diagram);

let cluster = ctx.get_cluster();
Expand All @@ -151,7 +152,7 @@ impl QueryEnv {
}

let _ = cluster
.do_action::<_, ()>(INIT_QUERY_ENV, message, timeout)
.do_action::<_, ()>(INIT_QUERY_ENV, message, flight_params)
.await?;

Ok(())
Expand Down
12 changes: 12 additions & 0 deletions src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -958,6 +958,18 @@ impl DefaultSettings {
mode: SettingMode::Both,
range: Some(SettingRange::Numeric(0..=1)),
}),
("flight_connection_max_retry_times", DefaultSettingValue {
value: UserSettingValue::UInt64(3),
desc: "The maximum retry count for cluster flight. Disable if 0.",
mode: SettingMode::Both,
range: Some(SettingRange::Numeric(0..=10)),
}),
("flight_connection_retry_interval", DefaultSettingValue {
value: UserSettingValue::UInt64(3),
desc: "The retry interval of cluster flight is in seconds.",
mode: SettingMode::Both,
range: Some(SettingRange::Numeric(0..=30)),
}),
]);

Ok(Arc::new(DefaultSettings {
Expand Down
8 changes: 8 additions & 0 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -825,4 +825,12 @@ impl Settings {
pub fn get_persist_materialized_cte(&self) -> Result<bool> {
Ok(self.try_get_u64("persist_materialized_cte")? != 0)
}

pub fn get_flight_max_retry_times(&self) -> Result<u64> {
self.try_get_u64("flight_connection_max_retry_times")
}

pub fn get_flight_retry_interval(&self) -> Result<u64> {
self.try_get_u64("flight_connection_retry_interval")
}
}

0 comments on commit 73cf386

Please sign in to comment.