diff --git a/Cargo.lock b/Cargo.lock index 88b8310ecff3..481499022f54 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3243,6 +3243,7 @@ dependencies = [ "geozero", "gimli 0.31.1", "http 1.1.0", + "hyper 1.4.1", "libc", "object", "once_cell", @@ -4529,7 +4530,6 @@ dependencies = [ "databend-common-exception", "databend-common-expression", "databend-common-functions", - "databend-common-management", "databend-common-meta-api", "databend-common-meta-app", "databend-common-meta-types", @@ -4886,6 +4886,7 @@ dependencies = [ "serde", "serde_json", "serfig", + "span-map", "temp-env", "tempfile", "test-harness", @@ -13369,6 +13370,12 @@ dependencies = [ "smallvec", ] +[[package]] +name = "span-map" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9374d7f3fc7a34774575f8ebf4f55d7c19ccd09e77e7bd40d33064fe728e3926" + [[package]] name = "spdx" version = "0.10.6" diff --git a/Cargo.toml b/Cargo.toml index 2feae4bef7b2..d06925fafc43 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -450,6 +450,7 @@ sled = { version = "0.34", default-features = false } snailquote = "0.3.1" snap = "1" socket2 = "0.5.3" +span-map = { version = "0.2.0" } sqlx = { version = "0.8", features = ["mysql", "runtime-tokio"] } state = "0.6.0" stream-more = "0.1.3" diff --git a/benchmark/clickbench/benchmark_cloud.sh b/benchmark/clickbench/benchmark_cloud.sh index 0cd06e4c49a9..2cb50967d7f2 100755 --- a/benchmark/clickbench/benchmark_cloud.sh +++ b/benchmark/clickbench/benchmark_cloud.sh @@ -71,13 +71,15 @@ until bendsql --query="SHOW WAREHOUSES LIKE '${CLOUD_WAREHOUSE}'" | grep -q "Run sleep 10 done -export BENDSQL_DSN="databend://${CLOUD_USER}:${CLOUD_PASSWORD}@${CLOUD_GATEWAY}:443/${BENCHMARK_DATABASE}?warehouse=${CLOUD_WAREHOUSE}" +export BENDSQL_DSN="databend://${CLOUD_USER}:${CLOUD_PASSWORD}@${CLOUD_GATEWAY}:443?warehouse=${CLOUD_WAREHOUSE}" if [[ "${BENCHMARK_DATASET}" == "load" ]]; then echo "Creating database..." echo "CREATE DATABASE ${BENCHMARK_DATABASE};" | bendsql --database default fi +export BENDSQL_DSN="databend://${CLOUD_USER}:${CLOUD_PASSWORD}@${CLOUD_GATEWAY}:443/${BENCHMARK_DATABASE}?warehouse=${CLOUD_WAREHOUSE}" + echo "Checking session settings..." bendsql --query="select * from system.settings where value != default;" -o table @@ -117,6 +119,7 @@ for query in "${BENCHMARK_DATASET}"/queries/*.sql; do done echo "Cleaning up..." +export BENDSQL_DSN="databend://${CLOUD_USER}:${CLOUD_PASSWORD}@${CLOUD_GATEWAY}:443?login=disable" if [[ "${BENCHMARK_DATASET}" == "load" ]]; then echo "Dropping database..." echo "DROP DATABASE IF EXISTS ${BENCHMARK_DATABASE};" | bendsql diff --git a/src/common/base/src/rangemap/mod.rs b/src/common/base/src/rangemap/mod.rs index 2d75812d284f..f778d2773c09 100644 --- a/src/common/base/src/rangemap/mod.rs +++ b/src/common/base/src/rangemap/mod.rs @@ -12,10 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod range_map; -mod range_map_key; mod range_merger; -pub use range_map::RangeMap; -pub use range_map_key::RangeMapKey; pub use range_merger::RangeMerger; diff --git a/src/common/base/src/rangemap/range_map.rs b/src/common/base/src/rangemap/range_map.rs deleted file mode 100644 index 8dc149a82328..000000000000 --- a/src/common/base/src/rangemap/range_map.rs +++ /dev/null @@ -1,223 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use core::fmt::Debug; -use core::ops::Bound; -use core::ops::Range; -use std::collections::BTreeMap; - -use super::range_map_key::RangeMapKey; - -#[derive(Clone, Debug, Default)] -pub struct RangeMap { - pub(crate) map: BTreeMap, V>, -} - -impl RangeMap -where - RV: Ord + Clone + Debug, - ID: Ord + Clone + Debug + Default, -{ - pub fn new() -> Self { - RangeMap { - map: BTreeMap::new(), - } - } - - pub fn insert(&mut self, range: Range, id: ID, val: V) { - assert!(range.start <= range.end); - - let range_key: RangeMapKey = RangeMapKey::new(range, id); - - self.map.insert(range_key, val); - } - - // Return a vector of `RangeKey` which contain the point in the [start, end). - // If we have range [1,5],[2,4],[2,6], then: - // 1. `get_by_point(1)` return [1,5] - // 2. `get_by_point(2)` return [1,5],[2,4],[2,6] - // 3. `get_by_point(5)` return [2,4],[2,6] - // Use the default key when construct `RangeKey::key` for search. - pub fn get_by_point(&self, point: &RV) -> Vec<(&RangeMapKey, &V)> { - let key = point.clone(); - let range_key = RangeMapKey::new(key.clone()..key.clone(), ID::default()); - - self.map - .range((Bound::Included(range_key), Bound::Unbounded)) - .filter(|e| e.0.range.start <= key) - .collect() - } - - pub fn remove(&mut self, range: Range, id: ID) { - self.map.remove(&RangeMapKey::new(range, id)); - } - - pub fn remove_by_key(&mut self, key: &RangeMapKey) { - self.map.remove(key); - } - - /// Returns an iterator of all keys. - /// - /// A RangeMapKey includes the range and the identity. - pub fn keys(&self) -> impl Iterator> { - self.map.keys() - } - - /// Returns an iterator of all values. - pub fn values(&self) -> impl Iterator { - self.map.values() - } - - /// Returns an iterator of all key-values. - pub fn iter(&self) -> impl Iterator, &V)> { - self.map.iter() - } -} - -#[cfg(test)] -mod tests { - use crate::rangemap::RangeMap; - use crate::rangemap::RangeMapKey; - - #[test] - fn test_range_set() { - // test get_by_point for i32 - { - let mut a = RangeMap::new(); - - let r11 = (&RangeMapKey::new(1..1, 11), &11); - let r15 = (&RangeMapKey::new(1..5, 15), &15); - let r24 = (&RangeMapKey::new(2..4, 24), &24); - let r26 = (&RangeMapKey::new(2..6, 26), &26); - - a.insert(1..1, 11, 11); - a.insert(1..5, 15, 15); - a.insert(2..4, 24, 24); - a.insert(2..6, 26, 26); - - assert_eq!(a.get_by_point(&1), vec![r11, r15]); - assert_eq!(a.get_by_point(&2), vec![r24, r15, r26]); - assert_eq!(a.get_by_point(&5), vec![r26]); - - a.remove(1..5, 15); - assert_eq!(a.get_by_point(&1), vec![r11]); - assert_eq!(a.get_by_point(&2), vec![r24, r26]); - } - // test get_by_point for String - { - let mut a = RangeMap::new(); - - let a1 = "1".to_string(); - let a2 = "2".to_string(); - let a4 = "4".to_string(); - let a5 = "5".to_string(); - let a6 = "6".to_string(); - - let r11 = (&RangeMapKey::new(a1.clone()..a1.clone(), 11), &11); - let r15 = (&RangeMapKey::new(a1.clone()..a5.clone(), 15), &15); - let r24 = (&RangeMapKey::new(a2.clone()..a4.clone(), 24), &24); - let r26 = (&RangeMapKey::new(a2.clone()..a6.clone(), 26), &26); - - a.insert(a1.clone()..a1.clone(), 11, 11); - a.insert(a1.clone()..a5.clone(), 15, 15); - a.insert(a2.clone()..a4, 24, 24); - a.insert(a2.clone()..a6, 26, 26); - - assert_eq!(a.get_by_point(&a1), vec![r11, r15]); - assert_eq!(a.get_by_point(&a2), vec![r24, r15, r26]); - assert_eq!(a.get_by_point(&a5), vec![r26]); - - a.remove(a1.clone()..a5, 15); - assert_eq!(a.get_by_point(&a1), vec![r11]); - assert_eq!(a.get_by_point(&a2), vec![r24, r26]); - } - // test get_by_point for string prefix - { - let mut a = RangeMap::new(); - - let a1 = "11".to_string(); - let a2 = "12".to_string(); - - a.insert(a1..a2, 11, 11); - assert!(!a.get_by_point(&"11".to_string()).is_empty()); - assert!(!a.get_by_point(&"111".to_string()).is_empty()); - assert!(!a.get_by_point(&"11z".to_string()).is_empty()); - assert!(!a.get_by_point(&"11/".to_string()).is_empty()); - assert!(!a.get_by_point(&"11*".to_string()).is_empty()); - assert!(a.get_by_point(&"12".to_string()).is_empty()); - } - // test get_by_point for char upbound limit string prefix - { - let mut a = RangeMap::new(); - - let a1 = format!("{}", 255 as char); - let a2 = format!("{}{}", 255 as char, 255 as char); - - a.insert(a1..a2, 11, 11); - assert!(!a.get_by_point(&format!("{}", 255 as char)).is_empty()); - assert!(!a.get_by_point(&format!("{}z", 255 as char)).is_empty()); - assert!(!a.get_by_point(&format!("{}/", 255 as char)).is_empty()); - assert!(!a.get_by_point(&format!("{}*", 255 as char)).is_empty()); - } - // test get_by_point for char upbound limit string prefix - { - let mut a = RangeMap::new(); - - let a1 = "1".to_string(); - let a2 = format!("{}{}", a1, 255 as char); - - a.insert(a1.clone()..a2, 11, 11); - assert!(!a.get_by_point(&a1).is_empty()); - assert!(!a.get_by_point(&format!("{}z", a1)).is_empty()); - assert!(!a.get_by_point(&format!("{}*", a1)).is_empty()); - assert!(!a.get_by_point(&format!("{}/", a1)).is_empty()); - } - } - - #[test] - fn test_range_iter() { - let mut a = RangeMap::new(); - - a.insert(1..1, 11, 11); - a.insert(1..5, 15, 15); - a.insert(2..4, 24, 24); - a.insert(2..6, 26, 26); - - let r1 = RangeMapKey::new(1..1, 11); - let r2 = RangeMapKey::new(2..4, 24); - let r3 = RangeMapKey::new(1..5, 15); - let r4 = RangeMapKey::new(2..6, 26); - - // keys() - { - let got = a.keys().collect::>(); - let want = vec![&r1, &r2, &r3, &r4]; - assert_eq!(want, got); - } - - // values() - { - let got = a.values().collect::>(); - let want = vec![&11, &24, &15, &26]; - assert_eq!(want, got); - } - - // iter() - { - let got = a.iter().collect::>(); - let want = vec![(&r1, &11), (&r2, &24), (&r3, &15), (&r4, &26)]; - assert_eq!(want, got); - } - } -} diff --git a/src/common/base/src/rangemap/range_map_key.rs b/src/common/base/src/rangemap/range_map_key.rs deleted file mode 100644 index fd6f63f26295..000000000000 --- a/src/common/base/src/rangemap/range_map_key.rs +++ /dev/null @@ -1,146 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use core::cmp::Ordering; -use core::ops::Range; -use std::fmt::Debug; -use std::fmt::Display; -use std::fmt::{self}; - -/// `RangeMapKey` is a wrapper of `range` and `user defined key` -#[derive(Eq, Debug, Clone, PartialEq)] -pub struct RangeMapKey { - // range - pub range: Range, - // user defined key - pub key: ID, -} - -impl RangeMapKey -where - RV: Eq + Ord, - ID: Eq + Ord + Default, -{ - pub fn new(range: Range, key: ID) -> RangeMapKey { - RangeMapKey { range, key } - } -} - -impl Display for RangeMapKey -where - RV: Debug, - ID: Debug, -{ - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!( - f, - "{:?}-{:?}-{:?}", - self.range.start, self.range.end, self.key - ) - } -} - -impl Ord for RangeMapKey -where - RV: Ord + Debug + Clone, - ID: Ord + Debug + Clone, -{ - /// the compare weight is: range.end > range.start > key - /// example: ((2,3),5) < ((5,1),3) since 2 < 5 - fn cmp(&self, other: &RangeMapKey) -> Ordering { - let ret = self.range.end.cmp(&other.range.end); - if !ret.is_eq() { - return ret; - } - let ret = self.range.start.cmp(&other.range.start); - if !ret.is_eq() { - return ret; - } - self.key.cmp(&other.key) - } -} - -impl PartialOrd for RangeMapKey -where - RV: Ord + Debug + Clone, - ID: Ord + Debug + Clone, -{ - fn partial_cmp(&self, other: &RangeMapKey) -> Option { - Some(self.cmp(other)) - } -} - -#[cfg(test)] -mod tests { - - use core::cmp::Ordering; - use std::collections::BTreeMap; - use std::collections::BTreeSet; - - use crate::rangemap::RangeMapKey; - - fn upsert_cmp_map(map: &mut BTreeMap>, k: String, v: String) { - if map.get(&k).is_none() { - map.insert(k.clone(), BTreeSet::new()); - } - if let Some(set) = map.get_mut(&k) { - set.insert(v); - } - } - - /// test if or not RangeKey satisfy reflexive property - #[test] - fn test_range_wrapper_reflexive_property() { - let mut tests = vec![]; - for i in 0..10 { - tests.push(RangeMapKey::new(0..i, ())); - } - - let mut less_map: BTreeMap> = BTreeMap::new(); - let mut greater_map: BTreeMap> = BTreeMap::new(); - - // test antisymmetric property and construct {less|greater}_map - // antisymmetric property: if a > b then b > a. - for i in tests.iter() { - for j in tests.clone().iter() { - let ret_i_j = i.cmp(j); - let ret_j_i = j.cmp(i); - - match ret_i_j { - Ordering::Equal => { - assert_eq!(ret_j_i, Ordering::Equal); - } - Ordering::Less => { - assert_eq!(ret_j_i, Ordering::Greater); - upsert_cmp_map(&mut less_map, i.to_string(), j.to_string()); - } - Ordering::Greater => { - assert_eq!(ret_j_i, Ordering::Less); - upsert_cmp_map(&mut greater_map, i.to_string(), j.to_string()); - } - } - } - } - - // prove transitive property: if a for ErrorCode { tonic::Code::Unknown => { let details = status.details(); if details.is_empty() { + if status.source().map_or(false, |e| e.is::()) { + return ErrorCode::CannotConnectNode(format!( + "{}, source: {:?}", + status.message(), + status.source() + )); + } return ErrorCode::UnknownException(format!( "{}, source: {:?}", status.message(), diff --git a/src/meta/api/src/schema_api_impl.rs b/src/meta/api/src/schema_api_impl.rs index b7320ae499f2..3fc0baff53eb 100644 --- a/src/meta/api/src/schema_api_impl.rs +++ b/src/meta/api/src/schema_api_impl.rs @@ -1648,38 +1648,23 @@ impl + ?Sized> SchemaApi for KV { ) -> Result>, KVAppError> { debug!(req :? =(&table_ids); "SchemaApi: {}", func_name!()); - let mut id_name_kv_keys = Vec::with_capacity(table_ids.len()); - for id in table_ids { - let k = TableIdToName { table_id: *id }.to_string_key(); - id_name_kv_keys.push(k); - } - - // Batch get all table-name by id - let seq_names = self.mget_kv(&id_name_kv_keys).await?; - let mut table_names = Vec::with_capacity(table_ids.len()); - - for seq_name in seq_names { - if let Some(seq_name) = seq_name { - let name_ident: DBIdTableName = deserialize_struct(&seq_name.data)?; - table_names.push(Some(name_ident.table_name)); - } else { - table_names.push(None); - } - } + let id_to_name_idents = table_ids.iter().map(|id| TableIdToName { table_id: *id }); - let mut meta_kv_keys = Vec::with_capacity(table_ids.len()); - for id in table_ids { - let k = TableId { table_id: *id }.to_string_key(); - meta_kv_keys.push(k); - } + let seq_names = self.get_pb_values_vec(id_to_name_idents).await?; + let mut table_names = seq_names + .into_iter() + .map(|seq_name| seq_name.map(|s| s.data.table_name)) + .collect::>(); - let seq_metas = self.mget_kv(&meta_kv_keys).await?; + let id_idents = table_ids.iter().map(|id| TableId { table_id: *id }); + let seq_metas = self.get_pb_values_vec(id_idents).await?; for (i, seq_meta_opt) in seq_metas.iter().enumerate() { if let Some(seq_meta) = seq_meta_opt { - let table_meta: TableMeta = deserialize_struct(&seq_meta.data)?; - if table_meta.drop_on.is_some() { + if seq_meta.data.drop_on.is_some() { table_names[i] = None; } + } else { + table_names[i] = None; } } @@ -1714,39 +1699,26 @@ impl + ?Sized> SchemaApi for KV { ) -> Result>, KVAppError> { debug!(req :? =(&db_ids); "SchemaApi: {}", func_name!()); - let mut kv_keys = Vec::with_capacity(db_ids.len()); - for id in db_ids { - let k = DatabaseIdToName { db_id: *id }.to_string_key(); - kv_keys.push(k); - } + let id_to_name_keys = db_ids.iter().map(|id| DatabaseIdToName { db_id: *id }); - // Batch get all table-name by id - let seq_names = self.mget_kv(&kv_keys).await?; - // If multi drop/create db the capacity may not same - let mut db_names = Vec::with_capacity(db_ids.len()); + let seq_names = self.get_pb_values_vec(id_to_name_keys).await?; - for seq_name in seq_names { - if let Some(seq_name) = seq_name { - let name_ident: DatabaseNameIdentRaw = deserialize_struct(&seq_name.data)?; - db_names.push(Some(name_ident.database_name().to_string())); - } else { - db_names.push(None); - } - } + let mut db_names = seq_names + .into_iter() + .map(|seq_name| seq_name.map(|s| s.data.database_name().to_string())) + .collect::>(); - let mut meta_kv_keys = Vec::with_capacity(db_ids.len()); - for id in db_ids { - let k = DatabaseId { db_id: *id }.to_string_key(); - meta_kv_keys.push(k); - } + let id_keys = db_ids.iter().map(|id| DatabaseId { db_id: *id }); + + let seq_metas = self.get_pb_values_vec(id_keys).await?; - let seq_metas = self.mget_kv(&meta_kv_keys).await?; for (i, seq_meta_opt) in seq_metas.iter().enumerate() { if let Some(seq_meta) = seq_meta_opt { - let db_meta: DatabaseMeta = deserialize_struct(&seq_meta.data)?; - if db_meta.drop_on.is_some() { + if seq_meta.data.drop_on.is_some() { db_names[i] = None; } + } else { + db_names[i] = None; } } Ok(db_names) diff --git a/src/meta/raft-store/src/applier.rs b/src/meta/raft-store/src/applier.rs index df6bae352380..2b47f6a6078d 100644 --- a/src/meta/raft-store/src/applier.rs +++ b/src/meta/raft-store/src/applier.rs @@ -119,9 +119,9 @@ where SM: StateMachineApi + 'static }; // Send queued change events to subscriber - if let Some(subscriber) = self.sm.get_subscriber() { + if let Some(sender) = self.sm.event_sender() { for event in self.changes.drain(..) { - subscriber.kv_changed(event); + sender.send(event); } } diff --git a/src/meta/raft-store/src/mem_state_machine.rs b/src/meta/raft-store/src/mem_state_machine.rs index 709198caf4da..5c99e43a22dd 100644 --- a/src/meta/raft-store/src/mem_state_machine.rs +++ b/src/meta/raft-store/src/mem_state_machine.rs @@ -16,7 +16,7 @@ use databend_common_meta_types::sys_data::SysData; use crate::leveled_store::level::Level; use crate::state_machine::ExpireKey; -use crate::state_machine::StateMachineSubscriber; +use crate::state_machine_api::SMEventSender; use crate::state_machine_api::StateMachineApi; /// A pure in-memory state machine as mock for testing. #[derive(Debug, Default)] @@ -48,7 +48,7 @@ impl StateMachineApi for MemStateMachine { &mut self.level.sys_data } - fn get_subscriber(&self) -> Option<&dyn StateMachineSubscriber> { + fn event_sender(&self) -> Option<&dyn SMEventSender> { None } } diff --git a/src/meta/raft-store/src/sm_v003/sm_v003.rs b/src/meta/raft-store/src/sm_v003/sm_v003.rs index a2471eee258b..9729d8d598cb 100644 --- a/src/meta/raft-store/src/sm_v003/sm_v003.rs +++ b/src/meta/raft-store/src/sm_v003/sm_v003.rs @@ -29,7 +29,7 @@ use crate::leveled_store::leveled_map::LeveledMap; use crate::leveled_store::sys_data_api::SysDataApiRO; use crate::sm_v003::sm_v003_kv_api::SMV003KVApi; use crate::state_machine::ExpireKey; -use crate::state_machine::StateMachineSubscriber; +use crate::state_machine_api::SMEventSender; use crate::state_machine_api::StateMachineApi; #[derive(Debug, Default)] @@ -40,7 +40,7 @@ pub struct SMV003 { expire_cursor: ExpireKey, /// subscriber of state machine data - pub(crate) subscriber: Option>, + pub(crate) subscriber: Option>, } impl StateMachineApi for SMV003 { @@ -70,7 +70,7 @@ impl StateMachineApi for SMV003 { self.levels.sys_data_mut() } - fn get_subscriber(&self) -> Option<&dyn StateMachineSubscriber> { + fn event_sender(&self) -> Option<&dyn SMEventSender> { self.subscriber.as_ref().map(|x| x.as_ref()) } } @@ -171,7 +171,7 @@ impl SMV003 { self.map_mut() } - pub fn set_subscriber(&mut self, subscriber: Box) { + pub fn set_event_sender(&mut self, subscriber: Box) { self.subscriber = Some(subscriber); } diff --git a/src/meta/raft-store/src/state_machine/mod.rs b/src/meta/raft-store/src/state_machine/mod.rs index e7a8b535ba2c..dfe28645fe2d 100644 --- a/src/meta/raft-store/src/state_machine/mod.rs +++ b/src/meta/raft-store/src/state_machine/mod.rs @@ -17,7 +17,6 @@ pub use expire::ExpireKey; pub use expire::ExpireValue; pub use log_meta::LogMetaKey; pub use log_meta::LogMetaValue; -pub use sm::StateMachineSubscriber; pub use snapshot_id::MetaSnapshotId; pub use state_machine_meta::StateMachineMetaKey; pub use state_machine_meta::StateMachineMetaValue; @@ -25,7 +24,6 @@ pub use state_machine_meta::StateMachineMetaValue; pub mod client_last_resp; mod expire; pub mod log_meta; -pub mod sm; mod snapshot_id; pub mod state_machine_meta; diff --git a/src/meta/raft-store/src/state_machine_api.rs b/src/meta/raft-store/src/state_machine_api.rs index 0d4ff7e776f8..2d314155d7c2 100644 --- a/src/meta/raft-store/src/state_machine_api.rs +++ b/src/meta/raft-store/src/state_machine_api.rs @@ -12,11 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt::Debug; + use databend_common_meta_types::sys_data::SysData; +use databend_common_meta_types::Change; use crate::leveled_store::map_api::MapApi; use crate::state_machine::ExpireKey; -use crate::state_machine::StateMachineSubscriber; + +/// Send a key-value change event to subscribers. +pub trait SMEventSender: Debug + Sync + Send { + fn send(&self, change: Change, String>); +} /// The API a state machine implements pub trait StateMachineApi: Send + Sync { @@ -34,5 +41,5 @@ pub trait StateMachineApi: Send + Sync { fn sys_data_mut(&mut self) -> &mut SysData; - fn get_subscriber(&self) -> Option<&dyn StateMachineSubscriber>; + fn event_sender(&self) -> Option<&dyn SMEventSender>; } diff --git a/src/meta/service/Cargo.toml b/src/meta/service/Cargo.toml index c8fae89c3539..4f4fa95fcd01 100644 --- a/src/meta/service/Cargo.toml +++ b/src/meta/service/Cargo.toml @@ -57,6 +57,7 @@ semver = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } serfig = { workspace = true } +span-map = { workspace = true } tokio = { workspace = true } tokio-stream = { workspace = true } tonic = { workspace = true } diff --git a/src/meta/service/src/api/grpc/grpc_service.rs b/src/meta/service/src/api/grpc/grpc_service.rs index 19d6d7ceaf27..28d7de4bf893 100644 --- a/src/meta/service/src/api/grpc/grpc_service.rs +++ b/src/meta/service/src/api/grpc/grpc_service.rs @@ -393,18 +393,10 @@ impl MetaService for MetaServiceImpl { let mn = &self.meta_node; - let add_res = mn.add_watcher(request.into_inner(), tx).await; + let sender = mn.add_watcher(request.into_inner(), tx).await?; - match add_res { - Ok(watcher) => { - let stream = WatchStream::new(rx, watcher, mn.dispatcher_handle.clone()); - Ok(Response::new(Box::pin(stream) as Self::WatchStream)) - } - Err(e) => { - // TODO: test error return. - Err(Status::invalid_argument(e)) - } - } + let stream = WatchStream::new(rx, sender, mn.subscriber_handle.clone()); + Ok(Response::new(Box::pin(stream) as Self::WatchStream)) } async fn member_list( diff --git a/src/meta/service/src/meta_service/meta_node.rs b/src/meta/service/src/meta_service/meta_node.rs index fc6fbdae2e54..f6f3a7563817 100644 --- a/src/meta/service/src/meta_service/meta_node.rs +++ b/src/meta/service/src/meta_service/meta_node.rs @@ -40,6 +40,7 @@ use databend_common_meta_stoerr::MetaStorageError; use databend_common_meta_types::protobuf::raft_service_client::RaftServiceClient; use databend_common_meta_types::protobuf::raft_service_server::RaftServiceServer; use databend_common_meta_types::protobuf::WatchRequest; +use databend_common_meta_types::protobuf::WatchResponse; use databend_common_meta_types::raft_types::CommittedLeaderId; use databend_common_meta_types::raft_types::ForwardToLeader; use databend_common_meta_types::raft_types::InitializeError; @@ -63,7 +64,6 @@ use databend_common_meta_types::MetaStartupError; use databend_common_meta_types::Node; use fastrace::func_name; use fastrace::prelude::*; -use futures::channel::oneshot; use itertools::Itertools; use log::debug; use log::error; @@ -74,6 +74,8 @@ use openraft::Config; use openraft::Raft; use openraft::ServerState; use openraft::SnapshotPolicy; +use tokio::sync::mpsc; +use tonic::Status; use crate::configs::Config as MetaConfig; use crate::message::ForwardRequest; @@ -92,11 +94,9 @@ use crate::request_handling::Forwarder; use crate::request_handling::Handler; use crate::store::RaftStore; use crate::version::METASRV_COMMIT_VERSION; -use crate::watcher::DispatcherSender; -use crate::watcher::EventDispatcher; -use crate::watcher::EventDispatcherHandle; -use crate::watcher::Watcher; -use crate::watcher::WatcherSender; +use crate::watcher::EventSubscriber; +use crate::watcher::StreamSender; +use crate::watcher::SubscriberHandle; pub type LogStore = RaftStore; pub type SMStore = RaftStore; @@ -107,7 +107,7 @@ pub type MetaRaft = Raft; /// MetaNode is the container of metadata related components and threads, such as storage, the raft node and a raft-state monitor. pub struct MetaNode { pub raft_store: RaftStore, - pub dispatcher_handle: EventDispatcherHandle, + pub subscriber_handle: SubscriberHandle, pub raft: MetaRaft, pub running_tx: watch::Sender<()>, pub running_rx: watch::Receiver<()>, @@ -159,15 +159,15 @@ impl MetaNodeBuilder { let (tx, rx) = watch::channel::<()>(()); - let dispatcher_tx = EventDispatcher::spawn(); + let handle = EventSubscriber::spawn(); sto.get_state_machine() .await - .set_subscriber(Box::new(DispatcherSender(dispatcher_tx.clone()))); + .set_event_sender(Box::new(handle.clone())); let meta_node = Arc::new(MetaNode { raft_store: sto.clone(), - dispatcher_handle: EventDispatcherHandle::new(dispatcher_tx), + subscriber_handle: handle, raft: raft.clone(), running_tx: tx, running_rx: rx, @@ -1147,19 +1147,15 @@ impl MetaNode { pub(crate) async fn add_watcher( &self, request: WatchRequest, - tx: WatcherSender, - ) -> Result { - let (resp_tx, resp_rx) = oneshot::channel(); - - self.dispatcher_handle.request(|d: &mut EventDispatcher| { - let add_res = d.add_watcher(request, tx); - let _ = resp_tx.send(add_res); - }); + tx: mpsc::Sender>, + ) -> Result, Status> { + let stream_sender = self + .subscriber_handle + .request_blocking(|d: &mut EventSubscriber| d.add_watcher(request, tx)) + .await + .map_err(|_e| Status::internal("EventSubscriber closed"))? + .map_err(Status::invalid_argument)?; - let recv_res = resp_rx.await; - match recv_res { - Ok(add_res) => add_res, - Err(_e) => Err("dispatcher closed"), - } + Ok(stream_sender) } } diff --git a/src/meta/service/src/watcher/command.rs b/src/meta/service/src/watcher/command.rs new file mode 100644 index 000000000000..0970121b288b --- /dev/null +++ b/src/meta/service/src/watcher/command.rs @@ -0,0 +1,30 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_meta_types::Change; + +use crate::watcher::subscriber::EventSubscriber; + +/// An event sent to EventDispatcher. +pub(crate) enum Command { + /// Submit a kv change event to dispatcher + KVChange(Change, String>), + + /// Send a fn to [`EventSubscriber`] to run it. + /// + /// The function will be called with a mutable reference to the dispatcher. + Request { + req: Box, + }, +} diff --git a/src/meta/service/src/watcher/desc.rs b/src/meta/service/src/watcher/desc.rs new file mode 100644 index 000000000000..5ae086e56555 --- /dev/null +++ b/src/meta/service/src/watcher/desc.rs @@ -0,0 +1,40 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_meta_types::protobuf::watch_request::FilterType; + +use crate::watcher::id::WatcherId; +use crate::watcher::KeyRange; + +/// Attributes of a watcher that is interested in kv change events. +#[derive(Clone, Debug)] +pub struct WatchDesc { + pub watcher_id: WatcherId, + + /// Defines how to filter keys with `key_range`. + pub interested: FilterType, + + /// The range of key this watcher is interested in. + pub key_range: KeyRange, +} + +impl WatchDesc { + pub(crate) fn new(id: WatcherId, interested: FilterType, key_range: KeyRange) -> Self { + Self { + watcher_id: id, + interested, + key_range, + } + } +} diff --git a/src/meta/raft-store/src/state_machine/sm.rs b/src/meta/service/src/watcher/id.rs similarity index 73% rename from src/meta/raft-store/src/state_machine/sm.rs rename to src/meta/service/src/watcher/id.rs index 828cf9f00a3e..ef10c3f03ba6 100644 --- a/src/meta/raft-store/src/state_machine/sm.rs +++ b/src/meta/service/src/watcher/id.rs @@ -12,11 +12,4 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt::Debug; - -use databend_common_meta_types::Change; - -/// StateMachine subscriber trait -pub trait StateMachineSubscriber: Debug + Sync + Send { - fn kv_changed(&self, change: Change, String>); -} +pub type WatcherId = i64; diff --git a/src/meta/service/src/watcher/mod.rs b/src/meta/service/src/watcher/mod.rs index b5c07e906608..a8369900c968 100644 --- a/src/meta/service/src/watcher/mod.rs +++ b/src/meta/service/src/watcher/mod.rs @@ -12,14 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod watcher_manager; -mod watcher_stream; +//! The client watch a key range and get notified when the key range changes. -pub(crate) use watcher_manager::DispatcherSender; -pub(crate) use watcher_manager::EventDispatcher; -pub use watcher_manager::EventDispatcherHandle; -pub use watcher_manager::WatcherId; -pub use watcher_manager::WatcherSender; -pub use watcher_stream::WatchStream; -pub use watcher_stream::WatchStreamHandle; -pub use watcher_stream::Watcher; +mod command; +mod desc; +mod id; +mod stream; +mod stream_sender; +mod subscriber; +mod subscriber_handle; + +use std::collections::Bound; + +pub use desc::WatchDesc; +pub use id::WatcherId; +pub(crate) use stream::WatchStream; +pub(crate) use stream_sender::StreamSender; +pub(crate) use subscriber::EventSubscriber; +pub(crate) use subscriber_handle::SubscriberHandle; + +pub(crate) type KeyRange = (Bound, Bound); diff --git a/src/meta/service/src/watcher/stream.rs b/src/meta/service/src/watcher/stream.rs new file mode 100644 index 000000000000..d5fee25227ca --- /dev/null +++ b/src/meta/service/src/watcher/stream.rs @@ -0,0 +1,61 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::pin::Pin; +use std::sync::Arc; +use std::task::Context; +use std::task::Poll; + +use databend_common_base::base::tokio::sync::mpsc::Receiver; +use futures::Stream; + +use crate::watcher::StreamSender; +use crate::watcher::SubscriberHandle; + +/// A wrapper around [`tokio::sync::mpsc::Receiver`] that implements [`Stream`]. +#[derive(Debug)] +pub(crate) struct WatchStream { + rx: Receiver, + /// Hold a clone of the sender to remove itself from the dispatcher when dropped. + sender: Arc, + subscriber_handle: SubscriberHandle, +} + +impl Drop for WatchStream { + fn drop(&mut self) { + let sender = self.sender.clone(); + self.subscriber_handle.request(move |d| { + d.remove_watcher(sender); + }) + } +} + +impl WatchStream { + /// Create a new `WatcherStream`. + pub fn new(rx: Receiver, sender: Arc, dispatcher: SubscriberHandle) -> Self { + Self { + rx, + sender, + subscriber_handle: dispatcher, + } + } +} + +impl Stream for WatchStream { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.rx.poll_recv(cx) + } +} diff --git a/src/meta/service/src/watcher/stream_sender.rs b/src/meta/service/src/watcher/stream_sender.rs new file mode 100644 index 000000000000..b1218c417d6b --- /dev/null +++ b/src/meta/service/src/watcher/stream_sender.rs @@ -0,0 +1,70 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cmp::Ordering; +use std::fmt; +use std::fmt::Formatter; + +use databend_common_meta_types::protobuf::WatchResponse; +use tokio::sync::mpsc; +use tokio::sync::mpsc::error::SendError; +use tonic::Status; + +use crate::watcher::desc::WatchDesc; + +/// A handle of a watching stream, for feeding messages to the stream. +#[derive(Clone)] +pub struct StreamSender { + pub desc: WatchDesc, + tx: mpsc::Sender>, +} + +impl fmt::Debug for StreamSender { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!(f, "WatchStreamSender({:?})", self.desc) + } +} + +impl PartialEq for StreamSender { + fn eq(&self, other: &Self) -> bool { + self.desc.watcher_id == other.desc.watcher_id + } +} + +impl Eq for StreamSender {} + +impl PartialOrd for StreamSender { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for StreamSender { + fn cmp(&self, other: &Self) -> Ordering { + self.desc.watcher_id.cmp(&other.desc.watcher_id) + } +} + +impl StreamSender { + pub fn new(desc: WatchDesc, tx: mpsc::Sender>) -> Self { + StreamSender { desc, tx } + } + + pub async fn send( + &self, + resp: WatchResponse, + ) -> Result<(), SendError>> { + self.tx.send(Ok(resp)).await + } +} diff --git a/src/meta/service/src/watcher/subscriber.rs b/src/meta/service/src/watcher/subscriber.rs new file mode 100644 index 000000000000..334834e71243 --- /dev/null +++ b/src/meta/service/src/watcher/subscriber.rs @@ -0,0 +1,213 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeSet; +use std::collections::Bound; +use std::sync::Arc; + +use databend_common_meta_types::protobuf::watch_request::FilterType; +use databend_common_meta_types::protobuf::WatchRequest; +use databend_common_meta_types::protobuf::WatchResponse; +use databend_common_meta_types::Change; +use log::info; +use log::warn; +use prost::Message; +use span_map::SpanMap; +use tokio::sync::mpsc; +use tonic::Status; + +use crate::metrics::network_metrics; +use crate::metrics::server_metrics; +use crate::watcher::command::Command; +use crate::watcher::id::WatcherId; +use crate::watcher::subscriber_handle::SubscriberHandle; +use crate::watcher::KeyRange; +use crate::watcher::StreamSender; +use crate::watcher::WatchDesc; + +/// Receives events from event sources(such as raft state machine), +/// dispatches them to interested watchers. +pub struct EventSubscriber { + rx: mpsc::UnboundedReceiver, + + watchers: SpanMap>, + + current_watcher_id: WatcherId, +} + +impl EventSubscriber { + /// Spawn a dispatcher loop task. + pub(crate) fn spawn() -> SubscriberHandle { + let (tx, rx) = mpsc::unbounded_channel(); + + let subscriber = EventSubscriber { + rx, + watchers: SpanMap::new(), + current_watcher_id: 1, + }; + + let _h = databend_common_base::runtime::spawn(subscriber.main()); + + SubscriberHandle::new(tx) + } + + #[fastrace::trace] + async fn main(mut self) { + while let Some(event) = self.rx.recv().await { + match event { + Command::KVChange(kv_change) => { + self.dispatch(kv_change).await; + } + Command::Request { req } => req(&mut self), + } + } + + info!("EventDispatcher: all event senders are closed. quit."); + } + + /// Dispatch a kv change event to interested watchers. + async fn dispatch(&mut self, change: Change, String>) { + let Some(key) = change.ident.clone() else { + warn!("EventSubscriber: change event without key; ignore it"); + return; + }; + + let is_delete = change.result.is_none(); + + let resp = WatchResponse::new(&change).unwrap(); + let resp_size = resp.encoded_len() as u64; + + let mut removed = vec![]; + + for sender in self.watchers.get(&key) { + let interested = sender.desc.interested; + + match interested { + FilterType::All => {} + FilterType::Update => { + if is_delete { + continue; + } + } + FilterType::Delete => { + if !is_delete { + continue; + } + } + } + + if let Err(_err) = sender.send(resp.clone()).await { + warn!( + "EventSubscriber: fail to send to watcher {}; close this stream", + sender.desc.watcher_id + ); + removed.push(sender.clone()); + } else { + network_metrics::incr_sent_bytes(resp_size); + }; + } + + for sender in removed { + self.remove_watcher(sender); + } + } + + #[fastrace::trace] + pub fn add_watcher( + &mut self, + req: WatchRequest, + tx: mpsc::Sender>, + ) -> Result, &'static str> { + info!("EventSubscriber::add_watcher: {:?}", req); + + let interested = req.filter_type(); + let desc = self.new_watch_desc(req.key, req.key_end, interested)?; + + let stream_sender = Arc::new(StreamSender::new(desc, tx)); + + self.watchers + .insert(stream_sender.desc.key_range.clone(), stream_sender.clone()); + + server_metrics::incr_watchers(1); + + Ok(stream_sender) + } + + fn new_watch_desc( + &mut self, + key: String, + key_end: Option, + interested: FilterType, + ) -> Result { + self.current_watcher_id += 1; + let watcher_id = self.current_watcher_id; + + let range = Self::build_key_range(key.clone(), &key_end)?; + + let desc = WatchDesc::new(watcher_id, interested, range); + Ok(desc) + } + + #[fastrace::trace] + pub fn remove_watcher(&mut self, stream_sender: Arc) { + info!("EventSubscriber::remove_watcher: {:?}", stream_sender); + + self.watchers.remove(.., stream_sender); + + server_metrics::incr_watchers(-1); + } + + pub(crate) fn build_key_range( + key: String, + key_end: &Option, + ) -> Result { + let left = Bound::Included(key.clone()); + + match key_end { + Some(key_end) => { + if &key >= key_end { + return Err("empty range"); + } + Ok((left, Bound::Excluded(key_end.to_string()))) + } + None => Ok((left.clone(), left)), + } + } + + pub fn watch_senders(&self) -> BTreeSet<&Arc> { + self.watchers.values(..) + } +} + +#[cfg(test)] +mod tests { + use super::*; + #[test] + fn test_build_key_range() -> Result<(), &'static str> { + let x = EventSubscriber::build_key_range(s("a"), &None)?; + assert_eq!(x, (Bound::Included(s("a")), Bound::Included(s("a")))); + + let x = EventSubscriber::build_key_range(s("a"), &Some(s("b")))?; + assert_eq!(x, (Bound::Included(s("a")), Bound::Excluded(s("b")))); + + let x = EventSubscriber::build_key_range(s("a"), &Some(s("a"))); + assert_eq!(x, Err("empty range")); + + Ok(()) + } + + fn s(x: impl ToString) -> String { + x.to_string() + } +} diff --git a/src/meta/service/src/watcher/subscriber_handle.rs b/src/meta/service/src/watcher/subscriber_handle.rs new file mode 100644 index 000000000000..563798f5f16a --- /dev/null +++ b/src/meta/service/src/watcher/subscriber_handle.rs @@ -0,0 +1,65 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_meta_raft_store::state_machine_api::SMEventSender; +use databend_common_meta_types::Change; +use tokio::sync::mpsc; +use tokio::sync::oneshot; +use tokio::sync::oneshot::error::RecvError; + +use crate::watcher::command::Command; +use crate::watcher::EventSubscriber; + +#[derive(Clone, Debug)] +pub struct SubscriberHandle { + /// For sending event or command to the dispatcher. + pub(crate) tx: mpsc::UnboundedSender, +} + +impl SMEventSender for SubscriberHandle { + fn send(&self, change: Change, String>) { + let _ = self.tx.send(Command::KVChange(change)); + } +} + +impl SubscriberHandle { + pub(crate) fn new(tx: mpsc::UnboundedSender) -> Self { + Self { tx } + } + + /// Send a request to the watch dispatcher. + pub fn request(&self, req: impl FnOnce(&mut EventSubscriber) + Send + 'static) { + let _ = self.tx.send(Command::Request { req: Box::new(req) }); + } + + /// Send a request to the watch dispatcher and block until finished + pub async fn request_blocking( + &self, + req: impl FnOnce(&mut EventSubscriber) -> V + Send + 'static, + ) -> Result + where + V: Send + 'static, + { + let (tx, rx) = oneshot::channel(); + + let _ = self.tx.send(Command::Request { + req: Box::new(|dispatcher| { + let v = req(dispatcher); + let _ = tx.send(v); + }), + }); + + rx.await + } +} diff --git a/src/meta/service/src/watcher/watcher_manager.rs b/src/meta/service/src/watcher/watcher_manager.rs deleted file mode 100644 index 7078670965b2..000000000000 --- a/src/meta/service/src/watcher/watcher_manager.rs +++ /dev/null @@ -1,248 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use core::ops::Range; - -use databend_common_base::base::tokio::sync::mpsc; -use databend_common_base::base::tokio::sync::oneshot; -use databend_common_base::rangemap::RangeMap; -use databend_common_base::rangemap::RangeMapKey; -use databend_common_meta_raft_store::state_machine::StateMachineSubscriber; -use databend_common_meta_types::protobuf as pb; -use databend_common_meta_types::protobuf::watch_request::FilterType; -use databend_common_meta_types::protobuf::Event; -use databend_common_meta_types::protobuf::WatchRequest; -use databend_common_meta_types::protobuf::WatchResponse; -use databend_common_meta_types::Change; -use log::info; -use log::warn; -use prost::Message; -use tonic::Status; - -use super::WatchStreamHandle; -use crate::metrics::network_metrics; -use crate::metrics::server_metrics; -use crate::watcher::Watcher; - -pub type WatcherId = i64; - -/// A sender for dispatcher to send event to interested watchers. -pub type WatcherSender = mpsc::Sender>; - -/// A sender for event source, such as raft state machine, to send event to [`EventDispatcher`]. -#[derive(Clone, Debug)] -pub(crate) struct DispatcherSender(pub(crate) mpsc::UnboundedSender); - -/// An event sent to EventDispatcher. -pub(crate) enum WatchEvent { - /// Submit a kv change event to dispatcher - KVChange(Change, String>), - - /// Send a request to EventDispatcher. - /// - /// The function will be called with a mutable reference to the dispatcher. - Request { - req: Box, - }, -} - -#[derive(Clone, Debug)] -pub struct EventDispatcherHandle { - /// For sending event or command to the dispatcher. - pub(crate) tx: mpsc::UnboundedSender, -} - -impl EventDispatcherHandle { - pub(crate) fn new(tx: mpsc::UnboundedSender) -> Self { - Self { tx } - } - - /// Send a request to the watch dispatcher. - pub fn request(&self, req: impl FnOnce(&mut EventDispatcher) + Send + 'static) { - let _ = self.tx.send(WatchEvent::Request { req: Box::new(req) }); - } - - /// Send a request to the watch dispatcher and block until finished - pub async fn request_blocking(&self, req: impl FnOnce(&mut EventDispatcher) + Send + 'static) { - let (tx, rx) = oneshot::channel(); - - let _ = self.tx.send(WatchEvent::Request { - req: Box::new(|dispatcher| { - req(dispatcher); - let _ = tx.send(()); - }), - }); - - let _ = rx.await; - } -} - -/// Receives events from event sources, dispatches them to interested watchers. -pub struct EventDispatcher { - event_rx: mpsc::UnboundedReceiver, - - /// map range to WatcherId - watcher_range_map: RangeMap, - - current_watcher_id: WatcherId, -} - -impl EventDispatcher { - /// Spawn a dispatcher loop task. - pub(crate) fn spawn() -> mpsc::UnboundedSender { - let (event_tx, event_rx) = mpsc::unbounded_channel(); - - let dispatcher = EventDispatcher { - event_rx, - watcher_range_map: RangeMap::new(), - current_watcher_id: 1, - }; - - let _h = databend_common_base::runtime::spawn(dispatcher.main()); - - event_tx - } - - #[fastrace::trace] - async fn main(mut self) { - loop { - if let Some(event) = self.event_rx.recv().await { - match event { - WatchEvent::KVChange(kv_change) => { - self.dispatch_event(kv_change).await; - } - WatchEvent::Request { req } => req(&mut self), - } - } else { - info!("all event senders are closed. quit."); - break; - } - } - } - - /// Dispatch a kv change event to interested watchers. - async fn dispatch_event(&mut self, change: Change, String>) { - let k = change.ident.as_ref().unwrap(); - let set = self.watcher_range_map.get_by_point(k); - if set.is_empty() { - return; - } - - let current = change.result; - let prev = change.prev; - - let is_delete_event = current.is_none(); - let mut remove_range_keys: Vec> = vec![]; - - for range_key_stream in set.iter() { - let filter = range_key_stream.1.watcher.filter_type; - - // filter out event - if (filter == FilterType::Delete && !is_delete_event) - || (filter == FilterType::Update && is_delete_event) - { - continue; - } - - let watcher_id = range_key_stream.0.key; - let stream = range_key_stream.1; - assert_eq!(stream.watcher.id, watcher_id); - let resp = WatchResponse { - event: Some(Event { - key: k.to_string(), - current: current.clone().map(pb::SeqV::from), - prev: prev.clone().map(pb::SeqV::from), - }), - }; - - network_metrics::incr_sent_bytes(resp.encoded_len() as u64); - - if let Err(err) = stream.send(resp).await { - warn!( - "close watcher stream {:?} cause send err: {:?}", - watcher_id, err - ); - remove_range_keys.push(RangeMapKey::new( - stream.watcher.key_range.clone(), - watcher_id, - )); - }; - } - - // TODO: when a watcher stream is dropped, send a event to remove the watcher explicitly - for range_key in remove_range_keys { - self.remove_watcher(&range_key); - } - } - - #[fastrace::trace] - pub fn add_watcher( - &mut self, - create: WatchRequest, - tx: WatcherSender, - ) -> Result { - info!("add_watcher: {:?}", create); - - let range = EventDispatcher::build_key_range(create.key.clone(), &create.key_end)?; - - self.current_watcher_id += 1; - let watcher_id = self.current_watcher_id; - let filter: FilterType = create.filter_type(); - - let watcher = Watcher::new(watcher_id, filter, range.clone()); - let stream_handle = WatchStreamHandle::new(watcher.clone(), tx); - - self.watcher_range_map - .insert(range, watcher_id, stream_handle); - - server_metrics::incr_watchers(1); - - Ok(watcher) - } - - #[fastrace::trace] - pub fn remove_watcher(&mut self, key: &RangeMapKey) { - info!("remove_watcher: {:?}", key); - - self.watcher_range_map.remove_by_key(key); - - // TODO: decrease it only when the key is actually removed - server_metrics::incr_watchers(-1); - } - - fn build_key_range( - key: String, - key_end: &Option, - ) -> Result, &'static str> { - match key_end { - Some(key_end) => { - if &key > key_end { - return Err("empty range"); - } - Ok(key..key_end.to_string()) - } - None => Ok(key.clone()..key), - } - } - - pub fn watchers(&self) -> impl Iterator> { - self.watcher_range_map.keys() - } -} - -impl StateMachineSubscriber for DispatcherSender { - fn kv_changed(&self, change: Change, String>) { - let _ = self.0.send(WatchEvent::KVChange(change)); - } -} diff --git a/src/meta/service/src/watcher/watcher_stream.rs b/src/meta/service/src/watcher/watcher_stream.rs deleted file mode 100644 index db728834ae67..000000000000 --- a/src/meta/service/src/watcher/watcher_stream.rs +++ /dev/null @@ -1,127 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::ops::Range; -use std::pin::Pin; -use std::task::Context; -use std::task::Poll; - -use databend_common_base::base::tokio::sync::mpsc::error::SendError; -use databend_common_base::base::tokio::sync::mpsc::Receiver; -use databend_common_base::rangemap::RangeMapKey; -use databend_common_meta_types::protobuf::watch_request::FilterType; -use databend_common_meta_types::protobuf::WatchResponse; -use futures::Stream; -use tonic::Status; - -use super::WatcherId; -use super::WatcherSender; -use crate::watcher::EventDispatcherHandle; - -/// Attributes of a watcher that is interested in kv change events. -#[derive(Clone, Debug)] -pub struct Watcher { - pub id: WatcherId, - - /// Defines how to filter keys with `key_range`. - pub filter_type: FilterType, - - /// The range of key this watcher is interested in. - pub key_range: Range, -} - -impl Watcher { - pub fn new(id: WatcherId, filter_type: FilterType, key_range: Range) -> Self { - Self { - id, - filter_type, - key_range, - } - } -} - -/// A handle of a watching stream, for feeding messages to the stream. -pub struct WatchStreamHandle { - pub watcher: Watcher, - tx: WatcherSender, -} - -impl WatchStreamHandle { - pub fn new(watcher: Watcher, tx: WatcherSender) -> Self { - WatchStreamHandle { watcher, tx } - } - - pub async fn send( - &self, - resp: WatchResponse, - ) -> Result<(), SendError>> { - self.tx.send(Ok(resp)).await - } -} - -/// A wrapper around [`tokio::sync::mpsc::Receiver`] that implements [`Stream`]. -#[derive(Debug)] -pub struct WatchStream { - inner: Receiver, - watcher: Watcher, - dispatcher: EventDispatcherHandle, -} - -impl Drop for WatchStream { - fn drop(&mut self) { - let rng = self.watcher.key_range.clone(); - let id = self.watcher.id; - - self.dispatcher.request(move |d| { - let key = RangeMapKey::new(rng, id); - d.remove_watcher(&key) - }) - } -} - -impl WatchStream { - /// Create a new `WatcherStream`. - pub fn new(rx: Receiver, watcher: Watcher, dispatcher: EventDispatcherHandle) -> Self { - Self { - inner: rx, - watcher, - dispatcher, - } - } - - /// Closes the receiving half of a channel without dropping it. - pub fn close(&mut self) { - self.inner.close() - } -} - -impl Stream for WatchStream { - type Item = T; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_recv(cx) - } -} - -impl AsRef> for WatchStream { - fn as_ref(&self) -> &Receiver { - &self.inner - } -} - -impl AsMut> for WatchStream { - fn as_mut(&mut self) -> &mut Receiver { - &mut self.inner - } -} diff --git a/src/meta/service/tests/it/grpc/metasrv_grpc_watch.rs b/src/meta/service/tests/it/grpc/metasrv_grpc_watch.rs index cb295a737706..cc34cba7ad8b 100644 --- a/src/meta/service/tests/it/grpc/metasrv_grpc_watch.rs +++ b/src/meta/service/tests/it/grpc/metasrv_grpc_watch.rs @@ -64,15 +64,15 @@ async fn test_watch_main( loop { if let Ok(Some(resp)) = watch_stream.message().await { - if let Some(event) = resp.event { - assert!(!watch_events.is_empty()); + let event = resp.event.unwrap(); - assert_eq!(watch_events.first(), Some(&event)); - watch_events.remove(0); + assert!(!watch_events.is_empty()); - if watch_events.is_empty() { - break; - } + let want = watch_events.remove(0); + assert_eq!(want, event); + + if watch_events.is_empty() { + break; } } } @@ -114,6 +114,43 @@ async fn test_watch_txn_main( Ok(()) } +#[test(harness = meta_service_test_harness)] +#[fastrace::trace] +async fn test_watch_single_key() -> anyhow::Result<()> { + let (_tc, addr) = crate::tests::start_metasrv().await?; + + let seq: u64 = 1; + + let watch = WatchRequest { + key: s("a"), + key_end: None, + filter_type: FilterType::All.into(), + }; + + let key_a = s("a"); + let val_a = b("a"); + + let watch_events = vec![ + // set a->a + Event { + key: key_a.clone(), + current: Some(SeqV::new(seq, val_a.clone())), + prev: None, + }, + ]; + + let updates = vec![UpsertKV::new( + "a", + MatchSeq::GE(0), + Operation::Update(val_a), + None, + )]; + + test_watch_main(addr.clone(), watch, watch_events, updates).await?; + + Ok(()) +} + #[test(harness = meta_service_test_harness)] #[fastrace::trace] async fn test_watch() -> anyhow::Result<()> { @@ -466,19 +503,14 @@ async fn test_watch_stream_count() -> anyhow::Result<()> { let mn: Arc = tc.grpc_srv.as_ref().map(|x| x.get_meta_node()).unwrap(); - let watcher_count = Arc::new(std::sync::Mutex::new(0usize)); - info!("one watcher"); { - let cnt = watcher_count.clone(); + let got = mn + .subscriber_handle + .request_blocking(move |d| d.watch_senders().len()) + .await?; - mn.dispatcher_handle - .request_blocking(move |d| { - *cnt.lock().unwrap() = d.watchers().count(); - }) - .await; - - assert_eq!(1, *watcher_count.lock().unwrap()); + assert_eq!(1, got); } info!("second watcher"); @@ -486,15 +518,12 @@ async fn test_watch_stream_count() -> anyhow::Result<()> { let client2 = make_client(&addr)?; let _watch_stream2 = client2.request(watch_req()).await?; - let cnt = watcher_count.clone(); - - mn.dispatcher_handle - .request_blocking(move |d| { - *cnt.lock().unwrap() = d.watchers().count(); - }) - .await; + let got = mn + .subscriber_handle + .request_blocking(move |d| d.watch_senders().len()) + .await?; - assert_eq!(2, *watcher_count.lock().unwrap()); + assert_eq!(2, got); } info!("wait a while for MetaNode to process stream cleanup"); @@ -502,15 +531,12 @@ async fn test_watch_stream_count() -> anyhow::Result<()> { info!("second watcher is removed"); { - let cnt = watcher_count.clone(); - - mn.dispatcher_handle - .request_blocking(move |d| { - *cnt.lock().unwrap() = d.watchers().count(); - }) - .await; + let got = mn + .subscriber_handle + .request_blocking(move |d| d.watch_senders().len()) + .await?; - assert_eq!(1, *watcher_count.lock().unwrap()); + assert_eq!(1, got); } Ok(()) diff --git a/src/meta/types/src/proto_ext/mod.rs b/src/meta/types/src/proto_ext/mod.rs index 9d79632c68b9..e53210ba6ee6 100644 --- a/src/meta/types/src/proto_ext/mod.rs +++ b/src/meta/types/src/proto_ext/mod.rs @@ -20,3 +20,4 @@ mod snapshot_chunk_request_ext; mod stream_item_ext; mod transfer_leader_request_ext; mod txn_ext; +mod watch_ext; diff --git a/src/meta/types/src/proto_ext/watch_ext.rs b/src/meta/types/src/proto_ext/watch_ext.rs new file mode 100644 index 000000000000..83affb26745a --- /dev/null +++ b/src/meta/types/src/proto_ext/watch_ext.rs @@ -0,0 +1,29 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::protobuf as pb; +use crate::protobuf::WatchResponse; +use crate::Change; + +impl WatchResponse { + pub fn new(change: &Change, String>) -> Option { + let ev = pb::Event { + key: change.ident.clone()?, + prev: change.prev.clone().map(pb::SeqV::from), + current: change.result.clone().map(pb::SeqV::from), + }; + + Some(WatchResponse { event: Some(ev) }) + } +} diff --git a/src/query/service/src/clusters/cluster.rs b/src/query/service/src/clusters/cluster.rs index 9c66a8b5770c..07bedc6f3beb 100644 --- a/src/query/service/src/clusters/cluster.rs +++ b/src/query/service/src/clusters/cluster.rs @@ -50,12 +50,14 @@ use futures::future::Either; use futures::Future; use futures::StreamExt; use log::error; +use log::info; use log::warn; use parking_lot::RwLock; use rand::thread_rng; use rand::Rng; use serde::Deserialize; use serde::Serialize; +use tokio::time::sleep; use crate::servers::flight::FlightClient; @@ -81,11 +83,11 @@ pub trait ClusterHelper { fn get_nodes(&self) -> Vec>; - async fn do_action Deserialize<'de> + Send>( + async fn do_action Deserialize<'de> + Send>( &self, path: &str, message: HashMap, - timeout: u64, + flight_params: FlightParams, ) -> Result>; } @@ -118,11 +120,11 @@ impl ClusterHelper for Cluster { self.nodes.to_vec() } - async fn do_action Deserialize<'de> + Send>( + async fn do_action Deserialize<'de> + Send>( &self, path: &str, message: HashMap, - timeout: u64, + flight_params: FlightParams, ) -> Result> { fn get_node<'a>(nodes: &'a [Arc], id: &str) -> Result<&'a Arc> { for node in nodes { @@ -137,23 +139,47 @@ impl ClusterHelper for Cluster { ))) } - let mut response = HashMap::with_capacity(message.len()); + let mut futures = Vec::with_capacity(message.len()); for (id, message) in message { let node = get_node(&self.nodes, &id)?; - let config = GlobalConfig::instance(); - let flight_address = node.flight_address.clone(); - let node_secret = node.secret.clone(); - - let mut conn = create_client(&config, &flight_address).await?; - response.insert( - id, - conn.do_action::<_, Res>(path, node_secret, message, timeout) - .await?, - ); + futures.push({ + let config = GlobalConfig::instance(); + let flight_address = node.flight_address.clone(); + let node_secret = node.secret.clone(); + + async move { + let mut attempt = 0; + + loop { + let mut conn = create_client(&config, &flight_address).await?; + match conn + .do_action::<_, Res>( + path, + node_secret.clone(), + message.clone(), + flight_params.timeout, + ) + .await + { + Ok(result) => return Ok((id, result)), + Err(e) + if e.code() == ErrorCode::CANNOT_CONNECT_NODE + && attempt < flight_params.retry_times => + { + // only retry when error is network problem + info!("retry do_action, attempt: {}", attempt); + attempt += 1; + sleep(Duration::from_secs(flight_params.retry_interval)).await; + } + Err(e) => return Err(e), + } + } + } + }); } - - Ok(response) + let responses: Vec<(String, Res)> = futures::future::try_join_all(futures).await?; + Ok(responses.into_iter().collect::>()) } } @@ -537,3 +563,10 @@ pub async fn create_client(config: &InnerConfig, address: &str) -> Result Result { let cluster = self.ctx.get_cluster(); let settings = self.ctx.get_settings(); - let timeout = settings.get_flight_client_timeout()?; + let flight_params = FlightParams { + timeout: settings.get_flight_client_timeout()?, + retry_times: settings.get_flight_max_retry_times()?, + retry_interval: settings.get_flight_retry_interval()?, + }; let mut message = HashMap::with_capacity(cluster.nodes.len()); @@ -65,7 +70,7 @@ impl KillInterpreter { } let res = cluster - .do_action::<_, bool>(KILL_QUERY, message, timeout) + .do_action::<_, bool>(KILL_QUERY, message, flight_params) .await?; match res.values().any(|x| *x) { diff --git a/src/query/service/src/interpreters/interpreter_set_priority.rs b/src/query/service/src/interpreters/interpreter_set_priority.rs index 0dda6b9dd656..1bf830b24be0 100644 --- a/src/query/service/src/interpreters/interpreter_set_priority.rs +++ b/src/query/service/src/interpreters/interpreter_set_priority.rs @@ -21,6 +21,7 @@ use databend_common_exception::Result; use databend_common_sql::plans::SetPriorityPlan; use crate::clusters::ClusterHelper; +use crate::clusters::FlightParams; use crate::interpreters::Interpreter; use crate::pipelines::PipelineBuildResult; use crate::servers::flight::v1::actions::SET_PRIORITY; @@ -61,9 +62,13 @@ impl SetPriorityInterpreter { } let settings = self.ctx.get_settings(); - let timeout = settings.get_flight_client_timeout()?; + let flight_params = FlightParams { + timeout: settings.get_flight_client_timeout()?, + retry_times: settings.get_flight_max_retry_times()?, + retry_interval: settings.get_flight_retry_interval()?, + }; let res = cluster - .do_action::<_, bool>(SET_PRIORITY, message, timeout) + .do_action::<_, bool>(SET_PRIORITY, message, flight_params) .await?; match res.values().any(|x| *x) { diff --git a/src/query/service/src/interpreters/interpreter_system_action.rs b/src/query/service/src/interpreters/interpreter_system_action.rs index 86e747e865ee..ca7e3ffb50c7 100644 --- a/src/query/service/src/interpreters/interpreter_system_action.rs +++ b/src/query/service/src/interpreters/interpreter_system_action.rs @@ -22,6 +22,7 @@ use databend_common_sql::plans::SystemAction; use databend_common_sql::plans::SystemPlan; use crate::clusters::ClusterHelper; +use crate::clusters::FlightParams; use crate::interpreters::Interpreter; use crate::pipelines::PipelineBuildResult; use crate::servers::flight::v1::actions::SYSTEM_ACTION; @@ -74,9 +75,13 @@ impl Interpreter for SystemActionInterpreter { } let settings = self.ctx.get_settings(); - let timeout = settings.get_flight_client_timeout()?; + let flight_params = FlightParams { + timeout: settings.get_flight_client_timeout()?, + retry_times: settings.get_flight_max_retry_times()?, + retry_interval: settings.get_flight_retry_interval()?, + }; cluster - .do_action::<_, ()>(SYSTEM_ACTION, message, timeout) + .do_action::<_, ()>(SYSTEM_ACTION, message, flight_params) .await?; } diff --git a/src/query/service/src/interpreters/interpreter_table_truncate.rs b/src/query/service/src/interpreters/interpreter_table_truncate.rs index 09a19f79cc43..70afa08e3866 100644 --- a/src/query/service/src/interpreters/interpreter_table_truncate.rs +++ b/src/query/service/src/interpreters/interpreter_table_truncate.rs @@ -21,6 +21,7 @@ use databend_common_exception::Result; use databend_common_sql::plans::TruncateTablePlan; use crate::clusters::ClusterHelper; +use crate::clusters::FlightParams; use crate::interpreters::Interpreter; use crate::pipelines::PipelineBuildResult; use crate::servers::flight::v1::actions::TRUNCATE_TABLE; @@ -95,9 +96,13 @@ impl Interpreter for TruncateTableInterpreter { } let settings = self.ctx.get_settings(); - let timeout = settings.get_flight_client_timeout()?; + let flight_params = FlightParams { + timeout: settings.get_flight_client_timeout()?, + retry_times: settings.get_flight_max_retry_times()?, + retry_interval: settings.get_flight_retry_interval()?, + }; cluster - .do_action::<_, ()>(TRUNCATE_TABLE, message, timeout) + .do_action::<_, ()>(TRUNCATE_TABLE, message, flight_params) .await?; } diff --git a/src/query/service/src/servers/admin/v1/query_profiling.rs b/src/query/service/src/servers/admin/v1/query_profiling.rs index 649c16baeb3a..a479f228ac17 100644 --- a/src/query/service/src/servers/admin/v1/query_profiling.rs +++ b/src/query/service/src/servers/admin/v1/query_profiling.rs @@ -30,6 +30,7 @@ use poem::IntoResponse; use crate::clusters::ClusterDiscovery; use crate::clusters::ClusterHelper; +use crate::clusters::FlightParams; use crate::servers::flight::v1::actions::GET_PROFILE; use crate::sessions::SessionManager; @@ -104,8 +105,13 @@ async fn get_cluster_profile(query_id: &str) -> Result, ErrorCo } } + let flight_params = FlightParams { + timeout: 60, + retry_times: 3, + retry_interval: 3, + }; let res = cluster - .do_action::<_, Option>>(GET_PROFILE, message, 60) + .do_action::<_, Option>>(GET_PROFILE, message, flight_params) .await?; match res.into_values().find(Option::is_some) { diff --git a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs index 3f929c99f594..1cbb961f798d 100644 --- a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs +++ b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs @@ -52,6 +52,7 @@ use super::exchange_transform::ExchangeTransform; use super::statistics_receiver::StatisticsReceiver; use super::statistics_sender::StatisticsSender; use crate::clusters::ClusterHelper; +use crate::clusters::FlightParams; use crate::pipelines::executor::ExecutorSettings; use crate::pipelines::executor::PipelineCompleteExecutor; use crate::pipelines::PipelineBuildResult; @@ -416,13 +417,17 @@ impl DataExchangeManager { actions: QueryFragmentsActions, ) -> Result { let settings = ctx.get_settings(); - let timeout = settings.get_flight_client_timeout()?; + let flight_params = FlightParams { + timeout: settings.get_flight_client_timeout()?, + retry_times: settings.get_flight_max_retry_times()?, + retry_interval: settings.get_flight_retry_interval()?, + }; let root_actions = actions.get_root_actions()?; let conf = GlobalConfig::instance(); // Initialize query env between cluster nodes let query_env = actions.get_query_env()?; - query_env.init(&ctx, timeout).await?; + query_env.init(&ctx, flight_params).await?; // Submit distributed tasks to all nodes. let cluster = ctx.get_cluster(); @@ -431,7 +436,7 @@ impl DataExchangeManager { let local_fragments = query_fragments.remove(&conf.query.node_id); let _: HashMap = cluster - .do_action(INIT_QUERY_FRAGMENTS, query_fragments, timeout) + .do_action(INIT_QUERY_FRAGMENTS, query_fragments, flight_params) .await?; self.set_ctx(&ctx.get_id(), ctx.clone())?; @@ -444,7 +449,7 @@ impl DataExchangeManager { let prepared_query = actions.prepared_query()?; let _: HashMap = cluster - .do_action(START_PREPARED_QUERY, prepared_query, timeout) + .do_action(START_PREPARED_QUERY, prepared_query, flight_params) .await?; Ok(build_res) diff --git a/src/query/service/src/servers/flight/v1/packets/packet_executor.rs b/src/query/service/src/servers/flight/v1/packets/packet_executor.rs index c555184d8291..29999b7727b3 100644 --- a/src/query/service/src/servers/flight/v1/packets/packet_executor.rs +++ b/src/query/service/src/servers/flight/v1/packets/packet_executor.rs @@ -14,7 +14,7 @@ use crate::servers::flight::v1::packets::QueryFragment; -#[derive(Debug, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct QueryFragments { pub query_id: String, pub fragments: Vec, diff --git a/src/query/service/src/servers/flight/v1/packets/packet_publisher.rs b/src/query/service/src/servers/flight/v1/packets/packet_publisher.rs index 33c5f20e11b7..12bb47862775 100644 --- a/src/query/service/src/servers/flight/v1/packets/packet_publisher.rs +++ b/src/query/service/src/servers/flight/v1/packets/packet_publisher.rs @@ -34,6 +34,7 @@ use serde::Deserialize; use serde::Serialize; use crate::clusters::ClusterHelper; +use crate::clusters::FlightParams; use crate::servers::flight::v1::actions::INIT_QUERY_ENV; use crate::sessions::QueryContext; use crate::sessions::SessionManager; @@ -140,7 +141,7 @@ pub struct QueryEnv { } impl QueryEnv { - pub async fn init(&self, ctx: &Arc, timeout: u64) -> Result<()> { + pub async fn init(&self, ctx: &Arc, flight_params: FlightParams) -> Result<()> { debug!("Dataflow diagram {:?}", self.dataflow_diagram); let cluster = ctx.get_cluster(); @@ -151,7 +152,7 @@ impl QueryEnv { } let _ = cluster - .do_action::<_, ()>(INIT_QUERY_ENV, message, timeout) + .do_action::<_, ()>(INIT_QUERY_ENV, message, flight_params) .await?; Ok(()) diff --git a/src/query/service/src/servers/http/middleware/session.rs b/src/query/service/src/servers/http/middleware/session.rs index 95f693841f4a..47834d27f659 100644 --- a/src/query/service/src/servers/http/middleware/session.rs +++ b/src/query/service/src/servers/http/middleware/session.rs @@ -322,6 +322,12 @@ pub struct HTTPSessionEndpoint { pub auth_manager: Arc, } +fn make_cookie(name: impl Into, value: impl Into) -> Cookie { + let mut cookie = Cookie::new_with_str(name, value); + cookie.set_path("/"); + cookie +} + impl HTTPSessionEndpoint { #[async_backtrace::framed] async fn auth(&self, req: &Request, query_id: String) -> Result { @@ -364,8 +370,7 @@ impl HTTPSessionEndpoint { Some(id1.clone()) } (Some(id), None) => { - req.cookie() - .add(Cookie::new_with_str(COOKIE_SESSION_ID, id)); + req.cookie().add(make_cookie(COOKIE_SESSION_ID, id)); Some(id.clone()) } (None, Some(id)) => Some(id.clone()), @@ -373,8 +378,7 @@ impl HTTPSessionEndpoint { if cookie_enabled { let id = Uuid::new_v4().to_string(); info!("new session id: {}", id); - req.cookie() - .add(Cookie::new_with_str(COOKIE_SESSION_ID, &id)); + req.cookie().add(make_cookie(COOKIE_SESSION_ID, &id)); Some(id) } else { None @@ -399,8 +403,7 @@ impl HTTPSessionEndpoint { if cookie_enabled { let ts = unix_ts().as_secs().to_string(); - req.cookie() - .add(Cookie::new_with_str(COOKIE_LAST_ACCESS_TIME, ts)); + req.cookie().add(make_cookie(COOKIE_LAST_ACCESS_TIME, ts)); } let session = session_manager.register_session(session)?; diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 32a26283e058..b18b5cb4d2d6 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -958,6 +958,18 @@ impl DefaultSettings { mode: SettingMode::Both, range: Some(SettingRange::Numeric(0..=1)), }), + ("flight_connection_max_retry_times", DefaultSettingValue { + value: UserSettingValue::UInt64(3), + desc: "The maximum retry count for cluster flight. Disable if 0.", + mode: SettingMode::Both, + range: Some(SettingRange::Numeric(0..=10)), + }), + ("flight_connection_retry_interval", DefaultSettingValue { + value: UserSettingValue::UInt64(3), + desc: "The retry interval of cluster flight is in seconds.", + mode: SettingMode::Both, + range: Some(SettingRange::Numeric(0..=30)), + }), ]); Ok(Arc::new(DefaultSettings { diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 5dc160133268..b76b50d9dc33 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -825,4 +825,12 @@ impl Settings { pub fn get_persist_materialized_cte(&self) -> Result { Ok(self.try_get_u64("persist_materialized_cte")? != 0) } + + pub fn get_flight_max_retry_times(&self) -> Result { + self.try_get_u64("flight_connection_max_retry_times") + } + + pub fn get_flight_retry_interval(&self) -> Result { + self.try_get_u64("flight_connection_retry_interval") + } } diff --git a/src/query/storages/system/Cargo.toml b/src/query/storages/system/Cargo.toml index 3b31c6c3966f..d36f47e0c688 100644 --- a/src/query/storages/system/Cargo.toml +++ b/src/query/storages/system/Cargo.toml @@ -25,7 +25,6 @@ databend-common-config = { workspace = true } databend-common-exception = { workspace = true } databend-common-expression = { workspace = true } databend-common-functions = { workspace = true } -databend-common-management = { workspace = true } databend-common-meta-api = { workspace = true } databend-common-meta-app = { workspace = true } databend-common-meta-types = { workspace = true } diff --git a/src/query/storages/system/src/tables_table.rs b/src/query/storages/system/src/tables_table.rs index f092b4c22b62..9d427bc696d3 100644 --- a/src/query/storages/system/src/tables_table.rs +++ b/src/query/storages/system/src/tables_table.rs @@ -38,7 +38,6 @@ use databend_common_expression::TableField; use databend_common_expression::TableSchemaRef; use databend_common_expression::TableSchemaRefExt; use databend_common_functions::BUILTIN_FUNCTIONS; -use databend_common_management::RoleApi; use databend_common_meta_app::principal::OwnershipObject; use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent; use databend_common_meta_app::schema::TableIdent; @@ -47,6 +46,7 @@ use databend_common_meta_app::schema::TableMeta; use databend_common_meta_app::tenant::Tenant; use databend_common_storages_fuse::FuseTable; use databend_common_storages_view::view_table::QUERY; +use databend_common_users::GrantObjectVisibilityChecker; use databend_common_users::UserApiProvider; use log::warn; @@ -129,7 +129,9 @@ where TablesTable: HistoryAware .into_iter() .map(|cat| cat.disable_table_info_refresh()) .collect::>>()?; - self.get_full_data_from_catalogs(ctx, push_downs, catalogs) + let visibility_checker = ctx.get_visibility_checker(false).await?; + + self.get_full_data_from_catalogs(ctx, push_downs, catalogs, visibility_checker) .await } } @@ -235,6 +237,7 @@ where TablesTable: HistoryAware ctx: Arc, push_downs: Option, catalogs: Vec>, + visibility_checker: GrantObjectVisibilityChecker, ) -> Result { let tenant = ctx.get_tenant(); @@ -325,277 +328,199 @@ where TablesTable: HistoryAware ); } } - - // from system.tables where database = 'db' and name = 'name' - // from system.tables where database = 'db' and table_id = 123 - if db_name.len() == 1 - && !invalid_optimize - && tables_names.len() + tables_ids.len() == 1 - && !invalid_tables_ids - && !WITH_HISTORY - { - let visibility_checker = ctx.get_visibility_checker(true).await?; - for (ctl_name, ctl) in ctls.iter() { - for db in &db_name { - match ctl.get_database(&tenant, db.as_str()).await { - Ok(database) => dbs.push(database), - Err(err) => { - let msg = format!("Failed to get database: {}, {}", db, err); - warn!("{}", msg); + let catalog_dbs = visibility_checker.get_visibility_database(); + + for (ctl_name, ctl) in ctls.iter() { + if let Some(push_downs) = &push_downs { + if push_downs.filters.as_ref().map(|f| &f.filter).is_some() { + for db in &db_name { + match ctl.get_database(&tenant, db.as_str()).await { + Ok(database) => dbs.push(database), + Err(err) => { + let msg = format!("Failed to get database: {}, {}", db, err); + warn!("{}", msg); + } } } - } - if let Err(err) = ctl.mget_table_names_by_ids(&tenant, &tables_ids).await { - warn!("Failed to get tables: {}, {}", ctl.name(), err); - } else { - let new_tables_names = ctl - .mget_table_names_by_ids(&tenant, &tables_ids) - .await? - .into_iter() - .flatten() - .filter(|table| !tables_names.contains(table)) - .collect::>(); - tables_names.extend(new_tables_names); - } - for table_name in &tables_names { - for db in &dbs { - match ctl.get_table(&tenant, db.name(), table_name).await { - Ok(t) => { - let db_id = db.get_db_info().database_id.db_id; - let table_id = t.get_id(); - let role = user_api - .role_api(&tenant) - .get_ownership(&OwnershipObject::Table { - catalog_name: ctl_name.to_string(), - db_id, - table_id, - }) - .await? - .map(|o| o.role); - if visibility_checker.check_table_visibility( - ctl_name, - db.name(), - table_name, - db_id, - t.get_id(), - ) { - catalogs.push(ctl_name.as_str()); - databases.push(db.name().to_owned()); - database_tables.push(t); - owner.push(role); - } else if let Some(role) = role { - let roles = ctx.get_all_effective_roles().await?; - if roles.iter().any(|r| r.name == role) { - catalogs.push(ctl_name.as_str()); - databases.push(db.name().to_owned()); - database_tables.push(t); - owner.push(Some(role)); + if !WITH_HISTORY { + match ctl.mget_table_names_by_ids(&tenant, &tables_ids).await { + Ok(tables) => { + for table in tables.into_iter().flatten() { + if !tables_names.contains(&table) { + tables_names.push(table.clone()); } } } Err(err) => { - let msg = format!( - "Failed to get table in database: {}, {}", - db.name(), - err - ); - // warn no need to pad in ctx + let msg = format!("Failed to get tables: {}, {}", ctl.name(), err); warn!("{}", msg); - continue; } } } } } - } else { - let visibility_checker = ctx.get_visibility_checker(false).await?; - let catalog_dbs = visibility_checker.get_visibility_database(); - - for (ctl_name, ctl) in ctls.iter() { - if let Some(push_downs) = &push_downs { - if push_downs.filters.as_ref().map(|f| &f.filter).is_some() { - for db in &db_name { - match ctl.get_database(&tenant, db.as_str()).await { - Ok(database) => dbs.push(database), - Err(err) => { - let msg = format!("Failed to get database: {}, {}", db, err); - warn!("{}", msg); - } + + if dbs.is_empty() || invalid_optimize { + // None means has global level privileges + dbs = if let Some(catalog_dbs) = &catalog_dbs { + let mut final_dbs = vec![]; + for (catalog_name, dbs) in catalog_dbs { + if ctl.name() == catalog_name.to_string() { + let mut catalog_db_ids = vec![]; + let mut catalog_db_names = vec![]; + catalog_db_names.extend( + dbs.iter() + .filter_map(|(db_name, _)| *db_name) + .map(|db_name| db_name.to_string()), + ); + catalog_db_ids.extend(dbs.iter().filter_map(|(_, db_id)| *db_id)); + if let Ok(databases) = ctl + .mget_database_names_by_ids(&tenant, &catalog_db_ids) + .await + { + catalog_db_names.extend(databases.into_iter().flatten()); + } else { + let msg = + format!("Failed to get database name by id: {}", ctl.name()); + warn!("{}", msg); } + let db_idents = catalog_db_names + .iter() + .map(|name| DatabaseNameIdent::new(&tenant, name)) + .collect::>(); + let dbs = ctl.mget_databases(&tenant, &db_idents).await?; + final_dbs.extend(dbs); } + } + final_dbs + } else { + match ctl.list_databases(&tenant).await { + Ok(dbs) => dbs, + Err(err) => { + let msg = + format!("List databases failed on catalog {}: {}", ctl.name(), err); + warn!("{}", msg); + ctx.push_warning(msg); - if !WITH_HISTORY { - match ctl.mget_table_names_by_ids(&tenant, &tables_ids).await { - Ok(tables) => { - for table in tables.into_iter().flatten() { - if !tables_names.contains(&table) { - tables_names.push(table.clone()); - } - } - } - Err(err) => { - let msg = - format!("Failed to get tables: {}, {}", ctl.name(), err); - warn!("{}", msg); - } - } + vec![] } } } + } - if dbs.is_empty() || invalid_optimize { - // None means has global level privileges - dbs = if let Some(catalog_dbs) = &catalog_dbs { - let mut final_dbs = vec![]; - for (catalog_name, dbs) in catalog_dbs { - if ctl.name() == catalog_name.to_string() { - let mut catalog_db_ids = vec![]; - let mut catalog_db_names = vec![]; - catalog_db_names.extend( - dbs.iter() - .filter_map(|(db_name, _)| *db_name) - .map(|db_name| db_name.to_string()), - ); - catalog_db_ids.extend(dbs.iter().filter_map(|(_, db_id)| *db_id)); - if let Ok(databases) = ctl - .mget_database_names_by_ids(&tenant, &catalog_db_ids) - .await - { - catalog_db_names.extend(databases.into_iter().flatten()); - } else { - let msg = format!( - "Failed to get database name by id: {}", - ctl.name() - ); - warn!("{}", msg); - } - let db_idents = catalog_db_names - .iter() - .map(|name| DatabaseNameIdent::new(&tenant, name)) - .collect::>(); - let dbs = ctl.mget_databases(&tenant, &db_idents).await?; - final_dbs.extend(dbs); - } + let final_dbs = dbs + .clone() + .into_iter() + .filter(|db| { + visibility_checker.check_database_visibility( + ctl_name, + db.name(), + db.get_db_info().database_id.db_id, + ) + }) + .collect::>(); + + let ownership = if get_ownership { + user_api.get_ownerships(&tenant).await.unwrap_or_default() + } else { + HashMap::new() + }; + for db in final_dbs { + let db_id = db.get_db_info().database_id.db_id; + let db_name = db.name(); + let tables = if tables_names.is_empty() + || tables_names.len() > 10 + || invalid_tables_ids + || invalid_optimize + { + match Self::list_tables(ctl, &tenant, db_name, WITH_HISTORY, WITHOUT_VIEW).await + { + Ok(tables) => tables, + Err(err) => { + // swallow the errors related with remote database or tables, avoid ANY of bad table config corrupt ALL of the results. + // these databases might be: + // - sharing database + // - hive database + // - iceberg database + // - others + // TODO(liyz): return the warnings in the HTTP query protocol. + let msg = + format!("Failed to list tables in database: {}, {}", db_name, err); + warn!("{}", msg); + ctx.push_warning(msg); + + continue; } - final_dbs - } else { - match ctl.list_databases(&tenant).await { - Ok(dbs) => dbs, + } + } else if WITH_HISTORY { + // Only can call get_table + let mut tables = Vec::new(); + for table_name in &tables_names { + match ctl.get_table_history(&tenant, db_name, table_name).await { + Ok(t) => tables.extend(t), Err(err) => { let msg = format!( - "List databases failed on catalog {}: {}", - ctl.name(), - err + "Failed to get_table_history tables in database: {}, {}", + db_name, err ); + // warn no need to pad in ctx warn!("{}", msg); - ctx.push_warning(msg); - - vec![] + continue; } } } - } - - let final_dbs = dbs - .clone() - .into_iter() - .filter(|db| { - visibility_checker.check_database_visibility( - ctl_name, - db.name(), - db.get_db_info().database_id.db_id, - ) - }) - .collect::>(); - - let ownership = if get_ownership { - user_api.get_ownerships(&tenant).await.unwrap_or_default() + tables } else { - HashMap::new() - }; - for db in final_dbs { - let db_id = db.get_db_info().database_id.db_id; - let db_name = db.name(); - let tables = if tables_names.is_empty() - || tables_names.len() > 10 - || invalid_tables_ids - || invalid_optimize - { - match Self::list_tables(ctl, &tenant, db_name, WITH_HISTORY, WITHOUT_VIEW) - .await - { - Ok(tables) => tables, + // Only can call get_table + let mut tables = Vec::new(); + for table_name in &tables_names { + match ctl.get_table(&tenant, db_name, table_name).await { + Ok(t) => tables.push(t), Err(err) => { - // swallow the errors related with remote database or tables, avoid ANY of bad table config corrupt ALL of the results. - // these databases might be: - // - sharing database - // - hive database - // - iceberg database - // - others - // TODO(liyz): return the warnings in the HTTP query protocol. let msg = format!( - "Failed to list tables in database: {}, {}", + "Failed to get table in database: {}, {}", db_name, err ); + // warn no need to pad in ctx warn!("{}", msg); - ctx.push_warning(msg); - continue; } } - } else if WITH_HISTORY { - // Only can call get_table - let mut tables = Vec::new(); - for table_name in &tables_names { - match ctl.get_table_history(&tenant, db_name, table_name).await { - Ok(t) => tables.extend(t), - Err(err) => { - let msg = format!( - "Failed to get_table_history tables in database: {}, {}", - db_name, err - ); - // warn no need to pad in ctx - warn!("{}", msg); - continue; - } - } - } - tables - } else { - // Only can call get_table - let mut tables = Vec::new(); - for table_name in &tables_names { - match ctl.get_table(&tenant, db_name, table_name).await { - Ok(t) => tables.push(t), - Err(err) => { - let msg = format!( - "Failed to get table in database: {}, {}", - db_name, err - ); - // warn no need to pad in ctx - warn!("{}", msg); - continue; - } + } + tables + }; + + for table in tables { + let table_id = table.get_id(); + // If db1 is visible, do not mean db1.table1 is visible. A user may have a grant about db1.table2, so db1 is visible + // for her, but db1.table1 may be not visible. So we need an extra check about table here after db visibility check. + if visibility_checker.check_table_visibility( + ctl_name, + db_name, + table.name(), + db_id, + table_id, + ) && !table.is_stream() + { + if !WITHOUT_VIEW && table.get_table_info().engine() == "VIEW" { + catalogs.push(ctl_name.as_str()); + databases.push(db_name.to_owned()); + database_tables.push(table); + if ownership.is_empty() { + owner.push(None); + } else { + owner.push( + ownership + .get(&OwnershipObject::Table { + catalog_name: ctl_name.to_string(), + db_id, + table_id, + }) + .map(|role| role.to_string()), + ); } - } - tables - }; - - for table in tables { - let table_id = table.get_id(); - // If db1 is visible, do not mean db1.table1 is visible. A user may have a grant about db1.table2, so db1 is visible - // for her, but db1.table1 may be not visible. So we need an extra check about table here after db visibility check. - if (table.get_table_info().engine() == "VIEW" || WITHOUT_VIEW) - && !table.is_stream() - && visibility_checker.check_table_visibility( - ctl_name, - db_name, - table.name(), - db_id, - table_id, - ) - { + } else if WITHOUT_VIEW { // system.tables store view name but not store view query // decrease information_schema.tables union. catalogs.push(ctl_name.as_str()); diff --git a/tests/suites/1_stateful/09_http_handler/09_0009_cookie.py b/tests/suites/1_stateful/09_http_handler/09_0009_cookie.py index 521e887949f4..7af8f6a92eac 100755 --- a/tests/suites/1_stateful/09_http_handler/09_0009_cookie.py +++ b/tests/suites/1_stateful/09_http_handler/09_0009_cookie.py @@ -14,17 +14,14 @@ def __init__(self): super().__init__() def set_cookie(self, cookie: Cookie, *args, **kwargs): + assert cookie.path == "/" , cookie + # "" is prefix of any host name or IP, so it will be applied cookie.domain = "" - cookie.path = "/" super().set_cookie(cookie, *args, **kwargs) - def get_dict(self, domain=None, path=None): - # 忽略 domain 和 path 参数,返回所有 Cookie - return {cookie.name: cookie.value for cookie in self} - def do_query(session_client, query, session_state=None): - url = f"http://localhost:8000/v1/query" + url = f"http://127.0.0.1:8000/v1/query" query_payload = { "sql": query, "pagination": {"wait_time_secs": 100, "max_rows_per_page": 2}, @@ -47,12 +44,12 @@ def test_simple(): resp = do_query(client, "select 1") assert resp.status_code == 200, resp.text assert resp.json()["data"] == [["1"]], resp.text - sid = client.cookies.get("session_id") - # print(sid) + # print(client.cookies) + sid = client.cookies.get("session_id", path="/") last_access_time1 = int(client.cookies.get("last_access_time")) # print(last_access_time1) - assert time.time() - 10 < last_access_time1 < time.time() + assert time.time() - 10 < last_access_time1 <= time.time() time.sleep(1.5) @@ -60,9 +57,10 @@ def test_simple(): assert resp.status_code == 200, resp.text assert resp.json()["data"] == [["1"]], resp.text sid2 = client.cookies.get("session_id") + # print(client.cookies) last_access_time2 = int(client.cookies.get("last_access_time")) assert sid2 == sid - assert last_access_time1 < last_access_time2 < time.time() + assert last_access_time1 < last_access_time2 <= time.time() def test_temp_table():