diff --git a/src/query/service/src/interpreters/interpreter_table_recluster.rs b/src/query/service/src/interpreters/interpreter_table_recluster.rs index 3e4f09cec6ae1..974c9c4847425 100644 --- a/src/query/service/src/interpreters/interpreter_table_recluster.rs +++ b/src/query/service/src/interpreters/interpreter_table_recluster.rs @@ -272,7 +272,7 @@ pub fn build_recluster_physical_plan( snapshot, catalog_info, is_distributed, - true, + need_lock, ), } } diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs index ca06c33db0de1..b628b907e3001 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use databend_common_base::base::tokio; use databend_common_catalog::plan::PartInfoType; +use databend_common_catalog::plan::Partitions; use databend_common_catalog::table::CompactionLimits; use databend_common_catalog::table::Table; use databend_common_exception::Result; @@ -33,9 +34,11 @@ use databend_query::schedulers::build_query_pipeline_without_render_result_set; use databend_query::sessions::QueryContext; use databend_query::sessions::TableContext; use databend_query::test_kits::*; +use databend_storages_common_table_meta::meta::Location; use databend_storages_common_table_meta::meta::SegmentInfo; use databend_storages_common_table_meta::meta::Statistics; use databend_storages_common_table_meta::meta::TableSnapshot; +use opendal::Operator; use rand::thread_rng; use rand::Rng; use uuid::Uuid; @@ -196,6 +199,7 @@ async fn test_safety() -> Result<()> { threshold, cluster_key_id, 5, + false, ) .await?; @@ -240,53 +244,72 @@ async fn test_safety() -> Result<()> { eprintln!("no target select"); continue; } - assert!(selections.partitions_type() != PartInfoType::LazyLevel); - - let mut actual_blocks_number = 0; - let mut compact_segment_indices = HashSet::new(); - let mut actual_block_ids = HashSet::new(); - for part in selections.partitions.into_iter() { - let part = CompactBlockPartInfo::from_part(&part)?; - match part { - CompactBlockPartInfo::CompactExtraInfo(extra) => { - compact_segment_indices.insert(extra.segment_index); - compact_segment_indices.extend(extra.removed_segment_indexes.iter()); - actual_blocks_number += extra.unchanged_blocks.len(); - for b in &extra.unchanged_blocks { - actual_block_ids.insert(b.1.location.clone()); - } + verify_compact_tasks( + ctx.get_data_operator()?.operator(), + selections, + locations, + HashSet::new(), + ) + .await?; + } + + Ok(()) +} + +pub async fn verify_compact_tasks( + dal: Operator, + parts: Partitions, + locations: Vec, + expected_segment_indices: HashSet, +) -> Result<()> { + assert!(parts.partitions_type() != PartInfoType::LazyLevel); + + let mut actual_blocks_number = 0; + let mut compact_segment_indices = HashSet::new(); + let mut actual_block_ids = HashSet::new(); + for part in parts.partitions.into_iter() { + let part = CompactBlockPartInfo::from_part(&part)?; + match part { + CompactBlockPartInfo::CompactExtraInfo(extra) => { + compact_segment_indices.insert(extra.segment_index); + compact_segment_indices.extend(extra.removed_segment_indexes.iter()); + actual_blocks_number += extra.unchanged_blocks.len(); + for b in &extra.unchanged_blocks { + actual_block_ids.insert(b.1.location.clone()); } - CompactBlockPartInfo::CompactTaskInfo(task) => { - compact_segment_indices.insert(task.index.segment_idx); - actual_blocks_number += task.blocks.len(); - for b in &task.blocks { - actual_block_ids.insert(b.location.clone()); - } + } + CompactBlockPartInfo::CompactTaskInfo(task) => { + compact_segment_indices.insert(task.index.segment_idx); + actual_blocks_number += task.blocks.len(); + for b in &task.blocks { + actual_block_ids.insert(b.location.clone()); } } } + } - eprintln!("compact_segment_indices: {:?}", compact_segment_indices); - let mut except_blocks_number = 0; - let mut except_block_ids = HashSet::new(); - for idx in compact_segment_indices.into_iter() { - let loc = locations.get(idx).unwrap(); - let compact_segment = SegmentsIO::read_compact_segment( - ctx.get_data_operator()?.operator(), - loc.clone(), - TestFixture::default_table_schema(), - false, - ) - .await?; - let segment = SegmentInfo::try_from(compact_segment)?; - except_blocks_number += segment.blocks.len(); - for b in &segment.blocks { - except_block_ids.insert(b.location.clone()); - } + eprintln!("compact_segment_indices: {:?}", compact_segment_indices); + if !expected_segment_indices.is_empty() { + assert_eq!(expected_segment_indices, compact_segment_indices); + } + let mut except_blocks_number = 0; + let mut except_block_ids = HashSet::new(); + for idx in compact_segment_indices.into_iter() { + let loc = locations.get(idx).unwrap(); + let compact_segment = SegmentsIO::read_compact_segment( + dal.clone(), + loc.clone(), + TestFixture::default_table_schema(), + false, + ) + .await?; + let segment = SegmentInfo::try_from(compact_segment)?; + except_blocks_number += segment.blocks.len(); + for b in &segment.blocks { + except_block_ids.insert(b.location.clone()); } - assert_eq!(except_blocks_number, actual_blocks_number); - assert_eq!(except_block_ids, actual_block_ids); } - + assert_eq!(except_blocks_number, actual_blocks_number); + assert_eq!(except_block_ids, actual_block_ids); Ok(()) } diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/mod.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/mod.rs index d9d9a3411b99d..154198361f62b 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/mod.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/mod.rs @@ -17,5 +17,6 @@ mod deletion; mod recluster_mutator; mod segments_compact_mutator; +pub use block_compact_mutator::verify_compact_tasks; pub use segments_compact_mutator::compact_segment; pub use segments_compact_mutator::CompactSegmentTestFixture; diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs index 5e42ca1779d31..7f683b9d5b722 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs @@ -47,6 +47,7 @@ use rand::thread_rng; use rand::Rng; use uuid::Uuid; +use crate::storages::fuse::operations::mutation::verify_compact_tasks; use crate::storages::fuse::operations::mutation::CompactSegmentTestFixture; #[tokio::test(flavor = "multi_thread")] @@ -203,6 +204,17 @@ async fn test_safety_for_recluster() -> Result<()> { number_of_segments, number_of_blocks, ); + let unclustered: bool = rand.gen(); + let mut unclustered_segment_indices = HashSet::new(); + if unclustered { + unclustered_segment_indices = block_number_of_segments + .iter() + .rev() + .enumerate() + .filter(|(_, &num)| num % 4 == 0) + .map(|(index, _)| index) + .collect(); + } let (locations, _, segment_infos) = CompactSegmentTestFixture::gen_segments( ctx.clone(), block_number_of_segments, @@ -210,6 +222,7 @@ async fn test_safety_for_recluster() -> Result<()> { threshold, Some(cluster_key_id), block_per_seg, + unclustered, ) .await?; @@ -241,7 +254,7 @@ async fn test_safety_for_recluster() -> Result<()> { } let ctx: Arc = ctx.clone(); - let segment_locations = create_segment_location_vector(locations, None); + let segment_locations = create_segment_location_vector(locations.clone(), None); let compact_segments = FuseTable::segment_pruning( &ctx, schema.clone(), @@ -284,46 +297,58 @@ async fn test_safety_for_recluster() -> Result<()> { eprintln!("need_recluster: {}", need_recluster); if need_recluster { - let ReclusterTasks::Recluster { - tasks, - remained_blocks, - removed_segment_indexes, - .. - } = mutator.tasks - else { - return Err(ErrorCode::Internal("Logical error, it's a bug")); - }; - assert!(tasks.len() <= max_tasks && !tasks.is_empty()); - eprintln!("tasks_num: {}, max_tasks: {}", tasks.len(), max_tasks); - let mut blocks = Vec::new(); - for task in tasks.into_iter() { - let parts = task.parts.partitions; - assert!(task.total_bytes <= recluster_block_size); - for part in parts.into_iter() { - let fuse_part = FuseBlockPartInfo::from_part(&part)?; - blocks.push(fuse_part.location.clone()); + match mutator.tasks { + ReclusterTasks::Recluster { + tasks, + remained_blocks, + removed_segment_indexes, + .. + } => { + assert!(unclustered_segment_indices.is_empty()); + assert!(tasks.len() <= max_tasks && !tasks.is_empty()); + eprintln!("tasks_num: {}, max_tasks: {}", tasks.len(), max_tasks); + let mut blocks = Vec::new(); + for task in tasks.into_iter() { + let parts = task.parts.partitions; + assert!(task.total_bytes <= recluster_block_size); + for part in parts.into_iter() { + let fuse_part = FuseBlockPartInfo::from_part(&part)?; + blocks.push(fuse_part.location.clone()); + } + } + + eprintln!( + "selected segments number {}, selected blocks number {}, remained blocks number {}", + removed_segment_indexes.len(), + blocks.len(), + remained_blocks.len() + ); + for remain in remained_blocks { + blocks.push(remain.location.0.clone()); + } + + let block_ids_after_target = HashSet::from_iter(blocks.into_iter()); + + let mut origin_blocks_ids = HashSet::new(); + for idx in &removed_segment_indexes { + for b in &segment_infos[*idx].blocks { + origin_blocks_ids.insert(b.location.0.clone()); + } + } + assert_eq!(block_ids_after_target, origin_blocks_ids); } - } - - eprintln!( - "selected segments number {}, selected blocks number {}, remained blocks number {}", - removed_segment_indexes.len(), - blocks.len(), - remained_blocks.len() - ); - for remain in remained_blocks { - blocks.push(remain.location.0.clone()); - } - - let block_ids_after_target = HashSet::from_iter(blocks.into_iter()); - - let mut origin_blocks_ids = HashSet::new(); - for idx in &removed_segment_indexes { - for b in &segment_infos[*idx].blocks { - origin_blocks_ids.insert(b.location.0.clone()); + ReclusterTasks::Compact(parts) => { + assert!(unclustered); + assert!(!unclustered_segment_indices.is_empty()); + verify_compact_tasks( + ctx.get_data_operator()?.operator(), + parts, + locations, + unclustered_segment_indices, + ) + .await?; } - } - assert_eq!(block_ids_after_target, origin_blocks_ids); + }; } } diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs index 383743788863c..6acd1e357e7a8 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs @@ -684,6 +684,7 @@ impl CompactSegmentTestFixture { BlockThresholds::default(), cluster_key_id, block_per_seg as usize, + false, ) .await?; let mut summary = Statistics::default(); @@ -707,6 +708,7 @@ impl CompactSegmentTestFixture { thresholds: BlockThresholds, cluster_key_id: Option, block_per_seg: usize, + unclustered: bool, ) -> Result<(Vec, Vec, Vec)> { let location_gen = TableMetaLocationGenerator::with_prefix("test/".to_owned()); let data_accessor = ctx.get_data_operator()?.operator(); @@ -731,7 +733,7 @@ impl CompactSegmentTestFixture { let col_stats = gen_columns_statistics(&block, None, &schema)?; - let cluster_stats = if num_blocks % 5 == 0 { + let cluster_stats = if unclustered && num_blocks % 4 == 0 { None } else { cluster_key_id.map(|v| { @@ -1013,6 +1015,7 @@ async fn test_compact_segment_with_cluster() -> Result<()> { BlockThresholds::default(), Some(cluster_key_id), block_per_seg as usize, + false, ) .await?; let mut summary = Statistics::default(); diff --git a/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs b/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs index b2b1737f37ace..c4de4da70b46b 100644 --- a/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs @@ -174,19 +174,18 @@ impl ReclusterMutator { compact_segments: Vec<(SegmentLocation, Arc)>, ) -> Result { match self.tasks { - ReclusterTasks::Compact(_) => self.generate_compact_tasks(compact_segments).await?, + ReclusterTasks::Compact(_) => self.generate_compact_tasks(compact_segments).await, ReclusterTasks::Recluster { .. } => { - self.generate_recluster_tasks(compact_segments).await? + self.generate_recluster_tasks(compact_segments).await } } - Ok(self.tasks.is_empty()) } #[async_backtrace::framed] pub async fn generate_recluster_tasks( &mut self, compact_segments: Vec<(SegmentLocation, Arc)>, - ) -> Result<()> { + ) -> Result { let mut selected_segments = Vec::with_capacity(compact_segments.len()); let mut selected_indices = Vec::with_capacity(compact_segments.len()); let mut selected_statistics = Vec::with_capacity(compact_segments.len()); @@ -198,7 +197,7 @@ impl ReclusterMutator { let blocks_map = self.gather_block_map(selected_segments).await?; if blocks_map.is_empty() { - return Ok(()); + return Ok(false); } let mem_info = sys_info::mem_info().map_err(ErrorCode::from_std_error)?; @@ -250,13 +249,13 @@ impl ReclusterMutator { { let block_metas: Vec<_> = block_metas.into_iter().map(|meta| (None, meta)).collect(); - self.generate_task( + tasks.push(self.generate_task( &block_metas, &column_nodes, total_rows as usize, total_bytes as usize, level, - ); + )); selected = true; continue; } @@ -342,13 +341,13 @@ impl ReclusterMutator { removed_segment_summary, }; } - Ok(()) + Ok(selected) } async fn generate_compact_tasks( &mut self, compact_segments: Vec<(SegmentLocation, Arc)>, - ) -> Result<()> { + ) -> Result { let mut parts = Vec::new(); let mut checker = SegmentCompactChecker::new(self.block_per_seg as u64, Some(self.cluster_key_id)); @@ -391,10 +390,10 @@ impl ReclusterMutator { } else { Partitions::create(PartitionsShuffleKind::Mod, parts) }; - if !partitions.is_empty() { - self.tasks = ReclusterTasks::Compact(partitions); - } - Ok(()) + + let selected = !partitions.is_empty(); + self.tasks = ReclusterTasks::Compact(partitions); + Ok(selected) } fn generate_task( diff --git a/tests/cloud_control_server/simple_server.py b/tests/cloud_control_server/simple_server.py index 0ef3de850ee90..f2371ae0c46fc 100644 --- a/tests/cloud_control_server/simple_server.py +++ b/tests/cloud_control_server/simple_server.py @@ -44,9 +44,9 @@ def load_data_from_json(): notification_history_data = json.load(f) notification_history = notification_pb2.NotificationHistory() json_format.ParseDict(notification_history_data, notification_history) - NOTIFICATION_HISTORY_DB[ - notification_history.name - ] = notification_history + NOTIFICATION_HISTORY_DB[notification_history.name] = ( + notification_history + ) def create_task_request_to_task(id, create_task_request): diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table.test index 987ef6d8aef8c..2a6e250bcaf12 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table.test @@ -762,6 +762,54 @@ insert into test_abc select * from test_abc_random limit 1000; statement ok alter table test_abc recluster final; + +statement ok +create table t14(a int not null) row_per_block=3 + +statement ok +insert into t14 values(0),(1),(4) + +statement ok +insert into t14 values(3) + +statement ok +insert into t14 values(-6),(-8) + +statement ok +ALTER TABLE t14 cluster by(abs(a)) + +statement ok +insert into t14 values(2),(5),(-7) + +query TIIIFFT +select * from clustering_information('db_09_0008','t14') +---- +(abs(a)) 4 0 3 0.0 1.0 {"00001":1} + +statement ok +alter table t14 recluster + +query TIIIFFT +select * from clustering_information('db_09_0008','t14') +---- +(abs(a)) 3 0 0 2.0 3.0 {"00003":3} + +statement ok +alter table t14 recluster + +query TIIIFFT +select * from clustering_information('db_09_0008','t14') +---- +(abs(a)) 3 0 0 0.0 1.0 {"00001":3} + +query III +select segment_count, block_count, row_count from fuse_snapshot('db_09_0008','t14') limit 3 +---- +1 3 9 +2 3 9 +4 4 9 + + statement ok DROP DATABASE db_09_0008