From 5647d4a364a2ffac5f40e95f987430bbd15cca85 Mon Sep 17 00:00:00 2001 From: zhyass Date: Wed, 22 May 2024 15:34:08 +0800 Subject: [PATCH] refactor recluster --- .../interpreter_table_optimize.rs | 5 +- .../interpreter_table_recluster.rs | 93 ++++--- .../operations/mutation/recluster_mutator.rs | 42 +-- .../mutation/mutator/block_compact_mutator.rs | 56 ++-- .../src/operations/mutation/mutator/mod.rs | 2 + .../mutation/mutator/recluster_mutator.rs | 260 +++++++++++++++--- .../storages/fuse/src/operations/recluster.rs | 50 +--- 7 files changed, 332 insertions(+), 176 deletions(-) diff --git a/src/query/service/src/interpreters/interpreter_table_optimize.rs b/src/query/service/src/interpreters/interpreter_table_optimize.rs index 58fca664fe5ba..47691042b17c6 100644 --- a/src/query/service/src/interpreters/interpreter_table_optimize.rs +++ b/src/query/service/src/interpreters/interpreter_table_optimize.rs @@ -238,15 +238,14 @@ impl OptimizeTableInterpreter { .await? { if !mutator.tasks.is_empty() { + let is_distributed = mutator.is_distributed(); let reclustered_block_count = mutator.recluster_blocks_count; let physical_plan = build_recluster_physical_plan( mutator.tasks, table.get_table_info().clone(), catalog.info(), mutator.snapshot, - mutator.remained_blocks, - mutator.removed_segment_indexes, - mutator.removed_segment_summary, + is_distributed, self.plan.need_lock, )?; diff --git a/src/query/service/src/interpreters/interpreter_table_recluster.rs b/src/query/service/src/interpreters/interpreter_table_recluster.rs index 316081ecc79cc..3e4f09cec6ae1 100644 --- a/src/query/service/src/interpreters/interpreter_table_recluster.rs +++ b/src/query/service/src/interpreters/interpreter_table_recluster.rs @@ -29,17 +29,16 @@ use databend_common_sql::executor::physical_plans::Exchange; use databend_common_sql::executor::physical_plans::FragmentKind; use databend_common_sql::executor::physical_plans::ReclusterSink; use databend_common_sql::executor::physical_plans::ReclusterSource; -use databend_common_sql::executor::physical_plans::ReclusterTask; use databend_common_sql::executor::PhysicalPlan; +use databend_common_storages_fuse::operations::ReclusterTasks; use databend_common_storages_fuse::FuseTable; -use databend_storages_common_table_meta::meta::BlockMeta; -use databend_storages_common_table_meta::meta::Statistics; use databend_storages_common_table_meta::meta::TableSnapshot; use log::error; use log::warn; use crate::interpreters::Interpreter; use crate::interpreters::InterpreterClusteringHistory; +use crate::interpreters::OptimizeTableInterpreter; use crate::locks::LockExt; use crate::locks::LockManager; use crate::pipelines::executor::ExecutorSettings; @@ -149,16 +148,15 @@ impl Interpreter for ReclusterTableInterpreter { let mutator = mutator.unwrap(); if mutator.tasks.is_empty() { break; - }; + } + let is_distributed = mutator.is_distributed(); block_count += mutator.recluster_blocks_count; let physical_plan = build_recluster_physical_plan( mutator.tasks, table_info, catalog.info(), mutator.snapshot, - mutator.remained_blocks, - mutator.removed_segment_indexes, - mutator.removed_segment_summary, + is_distributed, true, )?; @@ -222,46 +220,59 @@ impl Interpreter for ReclusterTableInterpreter { } } -#[allow(clippy::too_many_arguments)] pub fn build_recluster_physical_plan( - tasks: Vec, + tasks: ReclusterTasks, table_info: TableInfo, catalog_info: CatalogInfo, snapshot: Arc, - remained_blocks: Vec>, - removed_segment_indexes: Vec, - removed_segment_summary: Statistics, + is_distributed: bool, need_lock: bool, ) -> Result { - let is_distributed = tasks.len() > 1; - let mut root = PhysicalPlan::ReclusterSource(Box::new(ReclusterSource { - tasks, - table_info: table_info.clone(), - catalog_info: catalog_info.clone(), - plan_id: u32::MAX, - })); + match tasks { + ReclusterTasks::Recluster { + tasks, + remained_blocks, + removed_segment_indexes, + removed_segment_summary, + } => { + let mut root = PhysicalPlan::ReclusterSource(Box::new(ReclusterSource { + tasks, + table_info: table_info.clone(), + catalog_info: catalog_info.clone(), + plan_id: u32::MAX, + })); - if is_distributed { - root = PhysicalPlan::Exchange(Exchange { - plan_id: 0, - input: Box::new(root), - kind: FragmentKind::Merge, - keys: vec![], - allow_adjust_parallelism: true, - ignore_exchange: false, - }); + if is_distributed { + root = PhysicalPlan::Exchange(Exchange { + plan_id: 0, + input: Box::new(root), + kind: FragmentKind::Merge, + keys: vec![], + allow_adjust_parallelism: true, + ignore_exchange: false, + }); + } + let mut plan = PhysicalPlan::ReclusterSink(Box::new(ReclusterSink { + input: Box::new(root), + table_info, + catalog_info, + snapshot, + remained_blocks, + removed_segment_indexes, + removed_segment_summary, + plan_id: u32::MAX, + need_lock, + })); + plan.adjust_plan_id(&mut 0); + Ok(plan) + } + ReclusterTasks::Compact(parts) => OptimizeTableInterpreter::build_physical_plan( + parts, + table_info, + snapshot, + catalog_info, + is_distributed, + true, + ), } - let mut plan = PhysicalPlan::ReclusterSink(Box::new(ReclusterSink { - input: Box::new(root), - table_info, - catalog_info, - snapshot, - remained_blocks, - removed_segment_indexes, - removed_segment_summary, - plan_id: u32::MAX, - need_lock, - })); - plan.adjust_plan_id(&mut 0); - Ok(plan) } diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs index 6ea7f85924cb1..5e42ca1779d31 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs @@ -28,6 +28,7 @@ use databend_common_expression::TableSchemaRef; use databend_common_storages_fuse::io::SegmentWriter; use databend_common_storages_fuse::io::TableMetaLocationGenerator; use databend_common_storages_fuse::operations::ReclusterMutator; +use databend_common_storages_fuse::operations::ReclusterTasks; use databend_common_storages_fuse::pruning::create_segment_location_vector; use databend_common_storages_fuse::statistics::reducers::merge_statistics_mut; use databend_common_storages_fuse::statistics::reducers::reduce_block_metas; @@ -134,7 +135,7 @@ async fn test_recluster_mutator_block_select() -> Result<()> { ) .await?; - let mut mutator = ReclusterMutator::try_create( + let mut mutator = ReclusterMutator::new( ctx, Arc::new(snapshot), schema, @@ -142,11 +143,15 @@ async fn test_recluster_mutator_block_select() -> Result<()> { BlockThresholds::default(), cluster_key_id, 1, - )?; + 1000, + ); let need_recluster = mutator.target_select(compact_segments).await?; assert!(need_recluster); - assert_eq!(mutator.tasks.len(), 1); - let total_block_nums = mutator.tasks.iter().map(|t| t.parts.len()).sum::(); + let ReclusterTasks::Recluster { tasks, .. } = mutator.tasks else { + return Err(ErrorCode::Internal("Logical error, it's a bug")); + }; + assert_eq!(tasks.len(), 1); + let total_block_nums = tasks.iter().map(|t| t.parts.len()).sum::(); assert_eq!(total_block_nums, 3); Ok(()) @@ -247,7 +252,7 @@ async fn test_safety_for_recluster() -> Result<()> { .await?; let mut need_recluster = false; - let mut mutator = ReclusterMutator::try_create( + let mut mutator = ReclusterMutator::new( ctx.clone(), snapshot, schema.clone(), @@ -255,16 +260,12 @@ async fn test_safety_for_recluster() -> Result<()> { threshold, cluster_key_id, max_tasks, - )?; - let selected_segs = - ReclusterMutator::select_segments(&compact_segments, block_per_seg, 8, cluster_key_id)?; + block_per_seg, + ); + let selected_segs = mutator.select_segments(&compact_segments, 8)?; if selected_segs.is_empty() { for compact_segment in compact_segments.into_iter() { - if !ReclusterMutator::segment_can_recluster( - &compact_segment.1.summary, - block_per_seg, - cluster_key_id, - ) { + if !mutator.segment_can_recluster(&compact_segment.1.summary) { continue; } @@ -283,7 +284,15 @@ async fn test_safety_for_recluster() -> Result<()> { eprintln!("need_recluster: {}", need_recluster); if need_recluster { - let tasks = mutator.tasks; + let ReclusterTasks::Recluster { + tasks, + remained_blocks, + removed_segment_indexes, + .. + } = mutator.tasks + else { + return Err(ErrorCode::Internal("Logical error, it's a bug")); + }; assert!(tasks.len() <= max_tasks && !tasks.is_empty()); eprintln!("tasks_num: {}, max_tasks: {}", tasks.len(), max_tasks); let mut blocks = Vec::new(); @@ -296,10 +305,9 @@ async fn test_safety_for_recluster() -> Result<()> { } } - let remained_blocks = std::mem::take(&mut mutator.remained_blocks); eprintln!( "selected segments number {}, selected blocks number {}, remained blocks number {}", - mutator.removed_segment_indexes.len(), + removed_segment_indexes.len(), blocks.len(), remained_blocks.len() ); @@ -310,7 +318,7 @@ async fn test_safety_for_recluster() -> Result<()> { let block_ids_after_target = HashSet::from_iter(blocks.into_iter()); let mut origin_blocks_ids = HashSet::new(); - for idx in &mutator.removed_segment_indexes { + for idx in &removed_segment_indexes { for b in &segment_infos[*idx].blocks { origin_blocks_ids.insert(b.location.0.clone()); } diff --git a/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs b/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs index bd2acdc77e8e9..4b789b5bea92f 100644 --- a/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs @@ -144,7 +144,7 @@ impl BlockCompactMutator { for (segment_idx, compact_segment) in segment_infos.into_iter() { let segments_vec = checker.add(segment_idx, compact_segment); for segments in segments_vec { - self.generate_part(segments, &mut parts, &mut checker); + checker.generate_part(segments, &mut parts); } let residual_segment_cnt = checker.segments.len(); @@ -177,11 +177,7 @@ impl BlockCompactMutator { } // finalize the compaction. - self.generate_part( - std::mem::take(&mut checker.segments), - &mut parts, - &mut checker, - ); + checker.finalize(&mut parts); // Status. let elapsed_time = start.elapsed(); @@ -295,28 +291,9 @@ impl BlockCompactMutator { } } } - - fn generate_part( - &mut self, - segments: Vec<(SegmentIndex, Arc)>, - parts: &mut Vec, - checker: &mut SegmentCompactChecker, - ) { - if !segments.is_empty() && checker.check_for_compact(&segments) { - let mut segment_indices = Vec::with_capacity(segments.len()); - let mut compact_segments = Vec::with_capacity(segments.len()); - for (idx, segment) in segments.into_iter() { - segment_indices.push(idx); - compact_segments.push(segment); - } - - let lazy_part = CompactLazyPartInfo::create(segment_indices, compact_segments); - parts.push(lazy_part); - } - } } -struct SegmentCompactChecker { +pub struct SegmentCompactChecker { segments: Vec<(SegmentIndex, Arc)>, total_block_count: u64, block_threshold: u64, @@ -327,7 +304,7 @@ struct SegmentCompactChecker { } impl SegmentCompactChecker { - fn new(block_threshold: u64, cluster_key_id: Option) -> Self { + pub fn new(block_threshold: u64, cluster_key_id: Option) -> Self { Self { segments: vec![], total_block_count: 0, @@ -361,7 +338,7 @@ impl SegmentCompactChecker { true } - fn add( + pub fn add( &mut self, idx: SegmentIndex, segment: Arc, @@ -386,6 +363,29 @@ impl SegmentCompactChecker { self.segments.push((idx, segment)); vec![std::mem::take(&mut self.segments)] } + + pub fn generate_part( + &mut self, + segments: Vec<(SegmentIndex, Arc)>, + parts: &mut Vec, + ) { + if !segments.is_empty() && self.check_for_compact(&segments) { + let mut segment_indices = Vec::with_capacity(segments.len()); + let mut compact_segments = Vec::with_capacity(segments.len()); + for (idx, segment) in segments.into_iter() { + segment_indices.push(idx); + compact_segments.push(segment); + } + + let lazy_part = CompactLazyPartInfo::create(segment_indices, compact_segments); + parts.push(lazy_part); + } + } + + pub fn finalize(&mut self, parts: &mut Vec) { + let final_segments = std::mem::take(&mut self.segments); + self.generate_part(final_segments, parts); + } } struct CompactTaskBuilder { diff --git a/src/query/storages/fuse/src/operations/mutation/mutator/mod.rs b/src/query/storages/fuse/src/operations/mutation/mutator/mod.rs index 7fd293d878035..0f63594424c08 100644 --- a/src/query/storages/fuse/src/operations/mutation/mutator/mod.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutator/mod.rs @@ -17,7 +17,9 @@ mod recluster_mutator; mod segment_compact_mutator; pub use block_compact_mutator::BlockCompactMutator; +pub use block_compact_mutator::SegmentCompactChecker; pub use recluster_mutator::ReclusterMutator; +pub use recluster_mutator::ReclusterTasks; pub use segment_compact_mutator::SegmentCompactMutator; pub use segment_compact_mutator::SegmentCompactionState; pub use segment_compact_mutator::SegmentCompactor; diff --git a/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs b/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs index ff7a536756768..b2b1737f37ace 100644 --- a/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs @@ -19,6 +19,9 @@ use std::collections::HashSet; use std::sync::Arc; use databend_common_base::runtime::execute_futures_in_parallel; +use databend_common_catalog::plan::Partitions; +use databend_common_catalog::plan::PartitionsShuffleKind; +use databend_common_catalog::table::Table; use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -39,10 +42,50 @@ use minitrace::full_name; use minitrace::future::FutureExt; use minitrace::Span; +use crate::operations::mutation::SegmentCompactChecker; +use crate::operations::BlockCompactMutator; +use crate::operations::CompactLazyPartInfo; use crate::statistics::reducers::merge_statistics_mut; use crate::table_functions::cmp_with_null; use crate::FuseTable; use crate::SegmentLocation; +use crate::DEFAULT_AVG_DEPTH_THRESHOLD; +use crate::DEFAULT_BLOCK_PER_SEGMENT; +use crate::FUSE_OPT_KEY_BLOCK_PER_SEGMENT; +use crate::FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD; + +#[derive(Clone)] +pub enum ReclusterTasks { + Recluster { + tasks: Vec, + remained_blocks: Vec>, + removed_segment_indexes: Vec, + removed_segment_summary: Statistics, + }, + Compact(Partitions), +} + +impl ReclusterTasks { + pub fn is_empty(&self) -> bool { + match self { + ReclusterTasks::Recluster { tasks, .. } => tasks.is_empty(), + ReclusterTasks::Compact(parts) => parts.is_empty(), + } + } + + pub fn new_recluster_tasks() -> Self { + Self::Recluster { + tasks: vec![], + remained_blocks: vec![], + removed_segment_indexes: vec![], + removed_segment_summary: Statistics::default(), + } + } + + pub fn new_compact_tasks() -> Self { + Self::Compact(Partitions::default()) + } +} #[derive(Clone)] pub struct ReclusterMutator { @@ -52,17 +95,56 @@ pub struct ReclusterMutator { pub(crate) cluster_key_id: u32, pub(crate) schema: TableSchemaRef, pub(crate) max_tasks: usize, + pub(crate) block_per_seg: usize, pub snapshot: Arc, - pub tasks: Vec, pub recluster_blocks_count: u64, - pub remained_blocks: Vec>, - pub removed_segment_indexes: Vec, - pub removed_segment_summary: Statistics, + pub tasks: ReclusterTasks, } impl ReclusterMutator { pub fn try_create( + table: &FuseTable, + ctx: Arc, + snapshot: Arc, + ) -> Result { + let schema = table.schema_with_stream(); + let cluster_key_id = table.cluster_key_meta.clone().unwrap().0; + let block_thresholds = table.get_block_thresholds(); + let block_per_seg = + table.get_option(FUSE_OPT_KEY_BLOCK_PER_SEGMENT, DEFAULT_BLOCK_PER_SEGMENT); + + let avg_depth_threshold = table.get_option( + FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD, + DEFAULT_AVG_DEPTH_THRESHOLD, + ); + let depth_threshold = (snapshot.summary.block_count as f64 * avg_depth_threshold) + .max(1.0) + .min(64.0); + + let mut max_tasks = 1; + let cluster = ctx.get_cluster(); + if !cluster.is_empty() && ctx.get_settings().get_enable_distributed_recluster()? { + max_tasks = cluster.nodes.len(); + } + + Ok(Self { + ctx, + schema, + depth_threshold, + block_thresholds, + cluster_key_id, + max_tasks, + block_per_seg, + snapshot, + recluster_blocks_count: 0, + tasks: ReclusterTasks::new_recluster_tasks(), + }) + } + + /// Used for tests. + #[allow(clippy::too_many_arguments)] + pub fn new( ctx: Arc, snapshot: Arc, schema: TableSchemaRef, @@ -70,21 +152,20 @@ impl ReclusterMutator { block_thresholds: BlockThresholds, cluster_key_id: u32, max_tasks: usize, - ) -> Result { - Ok(Self { + block_per_seg: usize, + ) -> Self { + Self { ctx, schema, depth_threshold, block_thresholds, cluster_key_id, max_tasks, + block_per_seg, snapshot, - tasks: Vec::new(), - remained_blocks: Vec::new(), recluster_blocks_count: 0, - removed_segment_indexes: Vec::new(), - removed_segment_summary: Statistics::default(), - }) + tasks: ReclusterTasks::new_recluster_tasks(), + } } #[async_backtrace::framed] @@ -92,6 +173,20 @@ impl ReclusterMutator { &mut self, compact_segments: Vec<(SegmentLocation, Arc)>, ) -> Result { + match self.tasks { + ReclusterTasks::Compact(_) => self.generate_compact_tasks(compact_segments).await?, + ReclusterTasks::Recluster { .. } => { + self.generate_recluster_tasks(compact_segments).await? + } + } + Ok(self.tasks.is_empty()) + } + + #[async_backtrace::framed] + pub async fn generate_recluster_tasks( + &mut self, + compact_segments: Vec<(SegmentLocation, Arc)>, + ) -> Result<()> { let mut selected_segments = Vec::with_capacity(compact_segments.len()); let mut selected_indices = Vec::with_capacity(compact_segments.len()); let mut selected_statistics = Vec::with_capacity(compact_segments.len()); @@ -103,7 +198,7 @@ impl ReclusterMutator { let blocks_map = self.gather_block_map(selected_segments).await?; if blocks_map.is_empty() { - return Ok(false); + return Ok(()); } let mem_info = sys_info::mem_info().map_err(ErrorCode::from_std_error)?; @@ -121,6 +216,7 @@ impl ReclusterMutator { let column_nodes = ColumnNodes::new_from_schema(&arrow_schema, Some(&self.schema)); let mut remained_blocks = Vec::new(); + let mut tasks = Vec::new(); let mut selected = false; for (level, block_metas) in blocks_map.into_iter() { let len = block_metas.len(); @@ -191,19 +287,19 @@ impl ReclusterMutator { let block_size = block_meta.block_size as usize; let row_count = block_meta.row_count as usize; if task_bytes + block_size > memory_threshold && selected_blocks.len() > 1 { - self.generate_task( + tasks.push(self.generate_task( &selected_blocks, &column_nodes, task_rows, task_bytes, level, - ); + )); task_rows = 0; task_bytes = 0; selected_blocks.clear(); - if self.tasks.len() >= self.max_tasks { + if tasks.len() >= self.max_tasks { remained_blocks.push(block_meta); over_memory = true; continue; @@ -219,30 +315,86 @@ impl ReclusterMutator { match selected_blocks.len() { 0 => (), 1 => remained_blocks.push(selected_blocks[0].1.clone()), - _ => self.generate_task( + _ => tasks.push(self.generate_task( &selected_blocks, &column_nodes, task_rows, task_bytes, level, - ), + )), } selected = true; } if selected { - self.remained_blocks = remained_blocks; - selected_indices.sort_by(|a, b| b.cmp(a)); - self.removed_segment_indexes = selected_indices; let default_cluster_key_id = Some(self.cluster_key_id); + let mut removed_segment_summary = Statistics::default(); selected_statistics.iter().for_each(|v| { - merge_statistics_mut(&mut self.removed_segment_summary, v, default_cluster_key_id) + merge_statistics_mut(&mut removed_segment_summary, v, default_cluster_key_id) }); + self.tasks = ReclusterTasks::Recluster { + tasks, + remained_blocks, + removed_segment_indexes: selected_indices, + removed_segment_summary, + }; + } + Ok(()) + } + + async fn generate_compact_tasks( + &mut self, + compact_segments: Vec<(SegmentLocation, Arc)>, + ) -> Result<()> { + let mut parts = Vec::new(); + let mut checker = + SegmentCompactChecker::new(self.block_per_seg as u64, Some(self.cluster_key_id)); + + for (loc, compact_segment) in compact_segments.into_iter() { + self.recluster_blocks_count += compact_segment.summary.block_count; + let segments_vec = checker.add(loc.segment_idx, compact_segment); + for segments in segments_vec { + checker.generate_part(segments, &mut parts); + } + } + // finalize the compaction. + checker.finalize(&mut parts); + + let cluster = self.ctx.get_cluster(); + let max_threads = self.ctx.get_settings().get_max_threads()? as usize; + let partitions = if cluster.is_empty() || parts.len() < cluster.nodes.len() * max_threads { + // NOTE: The snapshot schema does not contain the stream column. + let column_ids = self.snapshot.schema.to_leaf_column_id_set(); + let lazy_parts = parts + .into_iter() + .map(|v| { + v.as_any() + .downcast_ref::() + .unwrap() + .clone() + }) + .collect::>(); + Partitions::create( + PartitionsShuffleKind::Mod, + BlockCompactMutator::build_compact_tasks( + self.ctx.clone(), + column_ids, + Some(self.cluster_key_id), + self.block_thresholds, + lazy_parts, + ) + .await?, + ) + } else { + Partitions::create(PartitionsShuffleKind::Mod, parts) + }; + if !partitions.is_empty() { + self.tasks = ReclusterTasks::Compact(partitions); } - Ok(selected) + Ok(()) } fn generate_task( @@ -252,35 +404,44 @@ impl ReclusterMutator { total_rows: usize, total_bytes: usize, level: i32, - ) { + ) -> ReclusterTask { let (stats, parts) = FuseTable::to_partitions(Some(&self.schema), block_metas, column_nodes, None, None); - let task = ReclusterTask { + self.recluster_blocks_count += block_metas.len() as u64; + ReclusterTask { parts, stats, total_rows, total_bytes, level, - }; - self.tasks.push(task); - self.recluster_blocks_count += block_metas.len() as u64; + } } pub fn select_segments( + &mut self, compact_segments: &[(SegmentLocation, Arc)], - block_per_seg: usize, max_len: usize, - cluster_key_id: u32, ) -> Result> { let mut blocks_num = 0; let mut indices = IndexSet::new(); let mut points_map: HashMap, (Vec, Vec)> = HashMap::new(); + let mut unclustered_sg = IndexSet::new(); for (i, (_, compact_segment)) in compact_segments.iter().enumerate() { - if !ReclusterMutator::segment_can_recluster( - &compact_segment.summary, - block_per_seg, - cluster_key_id, - ) { + let mut level = -1; + let clustered = compact_segment + .summary + .cluster_stats + .as_ref() + .is_some_and(|v| { + level = v.level; + v.cluster_key_id == self.cluster_key_id + }); + if !clustered { + unclustered_sg.insert(i); + continue; + } + + if level < 0 && (compact_segment.summary.block_count as usize) >= self.block_per_seg { continue; } @@ -298,21 +459,22 @@ impl ReclusterMutator { } } - if indices.len() < 2 || blocks_num < block_per_seg { + if !unclustered_sg.is_empty() { + self.tasks = ReclusterTasks::Compact(Partitions::default()); + return Ok(unclustered_sg); + } + + if indices.len() < 2 || blocks_num < self.block_per_seg { return Ok(indices); } ReclusterMutator::fetch_max_depth(points_map, 1.0, max_len) } - pub fn segment_can_recluster( - summary: &Statistics, - block_per_seg: usize, - cluster_key_id: u32, - ) -> bool { + pub fn segment_can_recluster(&self, summary: &Statistics) -> bool { if let Some(stats) = &summary.cluster_stats { - stats.cluster_key_id == cluster_key_id - && (stats.level >= 0 || (summary.block_count as usize) < block_per_seg) + stats.cluster_key_id == self.cluster_key_id + && (stats.level >= 0 || (summary.block_count as usize) < self.block_per_seg) } else { false } @@ -472,4 +634,18 @@ impl ReclusterMutator { let set: HashSet = HashSet::from_iter(start.iter().chain(end.iter()).cloned()); set.len() == 2 } + + pub fn is_distributed(&self) -> bool { + match &self.tasks { + ReclusterTasks::Recluster { tasks, .. } => tasks.len() > 1, + ReclusterTasks::Compact(_) => { + (!self.ctx.get_cluster().is_empty()) + && self + .ctx + .get_settings() + .get_enable_distributed_compact() + .unwrap_or(false) + } + } + } } diff --git a/src/query/storages/fuse/src/operations/recluster.rs b/src/query/storages/fuse/src/operations/recluster.rs index 7fdd335582934..6d0535e239f37 100644 --- a/src/query/storages/fuse/src/operations/recluster.rs +++ b/src/query/storages/fuse/src/operations/recluster.rs @@ -35,10 +35,6 @@ use crate::pruning::PruningContext; use crate::pruning::SegmentPruner; use crate::FuseTable; use crate::SegmentLocation; -use crate::DEFAULT_AVG_DEPTH_THRESHOLD; -use crate::DEFAULT_BLOCK_PER_SEGMENT; -use crate::FUSE_OPT_KEY_BLOCK_PER_SEGMENT; -use crate::FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD; impl FuseTable { /// The flow of Pipeline is as follows: @@ -88,39 +84,12 @@ impl FuseTable { let start = Instant::now(); - let settings = ctx.get_settings(); - let mut max_tasks = 1; - let cluster = ctx.get_cluster(); - if !cluster.is_empty() && settings.get_enable_distributed_recluster()? { - max_tasks = cluster.nodes.len(); - } - - let schema = self.schema_with_stream(); - let default_cluster_key_id = self.cluster_key_meta.clone().unwrap().0; - let block_thresholds = self.get_block_thresholds(); - let block_per_seg = - self.get_option(FUSE_OPT_KEY_BLOCK_PER_SEGMENT, DEFAULT_BLOCK_PER_SEGMENT); - let avg_depth_threshold = self.get_option( - FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD, - DEFAULT_AVG_DEPTH_THRESHOLD, - ); - let threshold = (snapshot.summary.block_count as f64 * avg_depth_threshold) - .max(1.0) - .min(64.0); - let mut mutator = ReclusterMutator::try_create( - ctx.clone(), - snapshot.clone(), - schema, - threshold, - block_thresholds, - default_cluster_key_id, - max_tasks, - )?; + let mut mutator = ReclusterMutator::try_create(self, ctx.clone(), snapshot.clone())?; let segment_locations = snapshot.segments.clone(); let segment_locations = create_segment_location_vector(segment_locations, None); - let max_threads = settings.get_max_threads()? as usize; + let max_threads = ctx.get_settings().get_max_threads()? as usize; let limit = limit.unwrap_or(1000); // The default limit might be too small, which makes // the scanning of recluster candidates slow. @@ -159,28 +128,19 @@ impl FuseTable { } // select the segments with the highest depth. - let selected_segs = ReclusterMutator::select_segments( - &compact_segments, - block_per_seg, - max_seg_num, - default_cluster_key_id, - )?; + let selected_segs = mutator.select_segments(&compact_segments, max_seg_num)?; // select the blocks with the highest depth. if selected_segs.is_empty() { let mut selected_segs = vec![]; let mut block_count = 0; for compact_segment in compact_segments.into_iter() { - if !ReclusterMutator::segment_can_recluster( - &compact_segment.1.summary, - block_per_seg, - default_cluster_key_id, - ) { + if !mutator.segment_can_recluster(&compact_segment.1.summary) { continue; } block_count += compact_segment.1.summary.block_count as usize; selected_segs.push(compact_segment); - if block_count >= block_per_seg { + if block_count >= mutator.block_per_seg { let seg_num = selected_segs.len(); if mutator .target_select(std::mem::take(&mut selected_segs))