From 97963d888b9b3bba73eb4e09547fb5f5a6a906d2 Mon Sep 17 00:00:00 2001 From: zhyass Date: Mon, 2 Dec 2024 20:58:28 +0800 Subject: [PATCH] update --- Cargo.lock | 6 ++-- src/query/catalog/src/table_context.rs | 18 +++++++++-- .../src/interpreters/hook/compact_hook.rs | 2 +- .../src/interpreters/hook/refresh_hook.rs | 6 ++-- .../interpreter_table_recluster.rs | 2 +- src/query/service/src/sessions/query_ctx.rs | 31 ++++++++++++++----- .../tests/it/sql/exec/get_table_bind_test.rs | 6 ++-- .../it/storages/fuse/operations/commit.rs | 6 ++-- .../binder/bind_table_reference/bind_table.rs | 3 +- .../table_meta/src/table/stream_keys.rs | 2 +- .../common/processors/sink_commit.rs | 2 +- 11 files changed, 57 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 481499022f54..56ee0e26f41b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 4 +version = 3 [[package]] name = "addr2line" @@ -12726,9 +12726,9 @@ dependencies = [ [[package]] name = "ruzstd" -version = "0.7.3" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fad02996bfc73da3e301efe90b1837be9ed8f4a462b6ed410aa35d00381de89f" +checksum = "99c3938e133aac070997ddc684d4b393777d293ba170f2988c8fd5ea2ad4ce21" dependencies = [ "twox-hash", ] diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index a6b4e4227a5b..04d8933783f4 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -280,11 +280,23 @@ pub trait TableContext: Send + Sync { max_files: Option, ) -> Result; - fn add_segment_location(&self, segment_loc: Location) -> Result<()>; + fn add_inserted_segment_location(&self, segment_loc: Location) -> Result<()>; - fn clear_segment_locations(&self) -> Result<()>; + fn clear_inserted_segment_locations(&self) -> Result<()>; - fn get_segment_locations(&self) -> Result>; + fn get_inserted_segment_locations(&self) -> Result>; + + fn add_target_segment_location(&self, _index: usize, _location: Location) -> Result<()> { + unimplemented!() + } + + fn get_target_segment_locations(&self) -> Result> { + unimplemented!() + } + + fn clear_target_segment_locations(&self) -> Result<()> { + unimplemented!() + } fn add_file_status(&self, file_path: &str, file_status: FileStatus) -> Result<()>; diff --git a/src/query/service/src/interpreters/hook/compact_hook.rs b/src/query/service/src/interpreters/hook/compact_hook.rs index 8f2962621421..2df7535787a3 100644 --- a/src/query/service/src/interpreters/hook/compact_hook.rs +++ b/src/query/service/src/interpreters/hook/compact_hook.rs @@ -182,7 +182,7 @@ async fn compact_table( PipelineCompleteExecutor::from_pipelines(pipelines, executor_settings)?; // Clears previously generated segment locations to avoid duplicate data in the refresh phase - ctx.clear_segment_locations()?; + ctx.clear_inserted_segment_locations()?; ctx.set_executor(complete_executor.get_inner())?; complete_executor.execute()?; drop(complete_executor); diff --git a/src/query/service/src/interpreters/hook/refresh_hook.rs b/src/query/service/src/interpreters/hook/refresh_hook.rs index ecdcbab1e9fb..5860e1c7ead5 100644 --- a/src/query/service/src/interpreters/hook/refresh_hook.rs +++ b/src/query/service/src/interpreters/hook/refresh_hook.rs @@ -209,7 +209,7 @@ async fn generate_refresh_index_plan( catalog: &str, table_id: MetaId, ) -> Result> { - let segment_locs = ctx.get_segment_locations()?; + let segment_locs = ctx.get_inserted_segment_locations()?; let catalog = ctx.get_catalog(catalog).await?; let mut plans = vec![]; let indexes = catalog @@ -272,7 +272,7 @@ async fn generate_refresh_inverted_index_plan( desc: &RefreshDesc, table: Arc, ) -> Result> { - let segment_locs = ctx.get_segment_locations()?; + let segment_locs = ctx.get_inserted_segment_locations()?; let mut plans = vec![]; let table_meta = &table.get_table_info().meta; @@ -296,7 +296,7 @@ async fn generate_refresh_virtual_column_plan( ctx: Arc, desc: &RefreshDesc, ) -> Result> { - let segment_locs = ctx.get_segment_locations()?; + let segment_locs = ctx.get_inserted_segment_locations()?; let table_info = ctx .get_table(&desc.catalog, &desc.database, &desc.table) diff --git a/src/query/service/src/interpreters/interpreter_table_recluster.rs b/src/query/service/src/interpreters/interpreter_table_recluster.rs index ffb432920372..2c5563745bee 100644 --- a/src/query/service/src/interpreters/interpreter_table_recluster.rs +++ b/src/query/service/src/interpreters/interpreter_table_recluster.rs @@ -176,7 +176,7 @@ impl ReclusterTableInterpreter { let complete_executor = PipelineCompleteExecutor::from_pipelines(pipelines, executor_settings)?; - self.ctx.clear_segment_locations()?; + self.ctx.clear_inserted_segment_locations()?; self.ctx.set_executor(complete_executor.get_inner())?; complete_executor.execute()?; // make sure the executor is dropped before the next loop. diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 2a284a2d22ff..975c9d5e91ec 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -99,6 +99,7 @@ use databend_common_storage::StageFileInfo; use databend_common_storage::StageFilesInfo; use databend_common_storage::StorageMetrics; use databend_common_storages_delta::DeltaTable; +use databend_common_storages_fuse::operations::SegmentIndex; use databend_common_storages_fuse::TableContext; use databend_common_storages_iceberg::IcebergTable; use databend_common_storages_orc::OrcTable; @@ -152,9 +153,7 @@ pub struct QueryContext { fragment_id: Arc, // Used by synchronized generate aggregating indexes when new data written. inserted_segment_locs: Arc>>, - // Temp table for materialized CTE, first string is the database_name, second string is the table_name - // All temp tables' catalog is `CATALOG_DEFAULT`, so we don't need to store it. - m_cte_temp_table: Arc>>, + target_segment_locs: Arc>, } impl QueryContext { @@ -180,7 +179,7 @@ impl QueryContext { fragment_id: Arc::new(AtomicUsize::new(0)), inserted_segment_locs: Arc::new(RwLock::new(HashSet::new())), block_threshold: Arc::new(RwLock::new(BlockThresholds::default())), - m_cte_temp_table: Arc::new(Default::default()), + target_segment_locs: Arc::new(DashMap::new()), }) } @@ -1068,19 +1067,19 @@ impl TableContext for QueryContext { }) } - fn add_segment_location(&self, segment_loc: Location) -> Result<()> { + fn add_inserted_segment_location(&self, segment_loc: Location) -> Result<()> { let mut segment_locations = self.inserted_segment_locs.write(); segment_locations.insert(segment_loc); Ok(()) } - fn clear_segment_locations(&self) -> Result<()> { + fn clear_inserted_segment_locations(&self) -> Result<()> { let mut segment_locations = self.inserted_segment_locs.write(); segment_locations.clear(); Ok(()) } - fn get_segment_locations(&self) -> Result> { + fn get_inserted_segment_locations(&self) -> Result> { Ok(self .inserted_segment_locs .read() @@ -1089,6 +1088,24 @@ impl TableContext for QueryContext { .collect::>()) } + fn add_target_segment_location(&self, index: usize, location: Location) -> Result<()> { + self.target_segment_locs.insert(index, location); + Ok(()) + } + + fn get_target_segment_locations(&self) -> Result> { + Ok(self + .target_segment_locs + .iter() + .map(|entry| (*entry.key(), entry.value().clone())) + .collect()) + } + + fn clear_target_segment_locations(&self) -> Result<()> { + self.target_segment_locs.clear(); + Ok(()) + } + fn add_file_status(&self, file_path: &str, file_status: FileStatus) -> Result<()> { if matches!(self.get_query_kind(), QueryKind::CopyIntoTable) { self.shared.copy_status.add_chunk(file_path, file_status); diff --git a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs index cbce3003936b..5a1fb7735a40 100644 --- a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs +++ b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs @@ -843,15 +843,15 @@ impl TableContext for CtxDelegation { todo!() } - fn add_segment_location(&self, _segment_loc: Location) -> Result<()> { + fn add_inserted_segment_location(&self, _segment_loc: Location) -> Result<()> { todo!() } - fn clear_segment_locations(&self) -> Result<()> { + fn clear_inserted_segment_locations(&self) -> Result<()> { todo!() } - fn get_segment_locations(&self) -> Result> { + fn get_inserted_segment_locations(&self) -> Result> { todo!() } diff --git a/src/query/service/tests/it/storages/fuse/operations/commit.rs b/src/query/service/tests/it/storages/fuse/operations/commit.rs index df0129b412be..f9f872643f9c 100644 --- a/src/query/service/tests/it/storages/fuse/operations/commit.rs +++ b/src/query/service/tests/it/storages/fuse/operations/commit.rs @@ -732,15 +732,15 @@ impl TableContext for CtxDelegation { HashMap::new() } - fn add_segment_location(&self, _segment_loc: Location) -> Result<()> { + fn add_inserted_segment_location(&self, _segment_loc: Location) -> Result<()> { todo!() } - fn clear_segment_locations(&self) -> Result<()> { + fn clear_inserted_segment_locations(&self) -> Result<()> { todo!() } - fn get_segment_locations(&self) -> Result> { + fn get_inserted_segment_locations(&self) -> Result> { todo!() } diff --git a/src/query/sql/src/planner/binder/bind_table_reference/bind_table.rs b/src/query/sql/src/planner/binder/bind_table_reference/bind_table.rs index 3de1b550ca43..e5bcf78191d6 100644 --- a/src/query/sql/src/planner/binder/bind_table_reference/bind_table.rs +++ b/src/query/sql/src/planner/binder/bind_table_reference/bind_table.rs @@ -28,7 +28,8 @@ use databend_common_catalog::table_with_options::get_with_opt_max_batch_size; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_storages_view::view_table::QUERY; -use databend_storages_common_table_meta::table::{get_change_type, is_hilbert_recluster}; +use databend_storages_common_table_meta::table::get_change_type; +use databend_storages_common_table_meta::table::is_hilbert_recluster; use crate::binder::util::TableIdentifier; use crate::binder::Binder; diff --git a/src/query/storages/common/table_meta/src/table/stream_keys.rs b/src/query/storages/common/table_meta/src/table/stream_keys.rs index b6d8eeffd26c..095b9c69da99 100644 --- a/src/query/storages/common/table_meta/src/table/stream_keys.rs +++ b/src/query/storages/common/table_meta/src/table/stream_keys.rs @@ -90,7 +90,7 @@ pub fn is_hilbert_recluster(table_alias_name: &Option) -> bool { if let Ok(suffix) = i64::from_str_radix(alias_param[1], 16) { // 2023-01-01 00:00:00. let base_timestamp = 1672502400; - if suffix > base_timestamp && alias_param[0] == "_compact"{ + if suffix > base_timestamp && alias_param[0] == "_compact" { return true; } } diff --git a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs index 551d0aa2c1c3..441f6e68d039 100644 --- a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs +++ b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs @@ -443,7 +443,7 @@ where F: SnapshotGenerator + Send + 'static metrics_inc_commit_copied_files(files.file_info.len() as u64); } for segment_loc in std::mem::take(&mut self.new_segment_locs).into_iter() { - self.ctx.add_segment_location(segment_loc)?; + self.ctx.add_inserted_segment_location(segment_loc)?; } let target_descriptions = {