From a7bc773fc5192d695f7d01373ce0f158972fc857 Mon Sep 17 00:00:00 2001 From: zhya Date: Sat, 25 May 2024 23:14:46 +0800 Subject: [PATCH] chore: add setting compact_max_block_selection (#15641) --- src/query/settings/src/settings_default.rs | 6 ++++++ src/query/settings/src/settings_getter_setter.rs | 8 ++++++++ .../storages/fuse/src/operations/mutation/mod.rs | 1 - .../mutation/mutator/block_compact_mutator.rs | 11 +++++++---- 4 files changed, 21 insertions(+), 5 deletions(-) diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index c47f9d1555d13..0a7d4e7f07374 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -576,6 +576,12 @@ impl DefaultSettings { mode: SettingMode::Both, range: Some(SettingRange::Numeric(0..=u64::MAX)), }), + ("compact_max_block_selection", DefaultSettingValue { + value: UserSettingValue::UInt64(10000), + desc: "Limits the maximum number of blocks that can be selected during a compact operation.", + mode: SettingMode::Both, + range: Some(SettingRange::Numeric(2..=u64::MAX)), + }), ("enable_distributed_recluster", DefaultSettingValue { value: UserSettingValue::UInt64(0), desc: "Enable distributed execution of table recluster.", diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 2e1d002de8f68..63ad26148da59 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -508,6 +508,14 @@ impl Settings { self.try_get_u64("recluster_block_size") } + pub fn set_compact_max_block_selection(&self, val: u64) -> Result<()> { + self.try_set_u64("compact_max_block_selection", val) + } + + pub fn get_compact_max_block_selection(&self) -> Result { + self.try_get_u64("compact_max_block_selection") + } + pub fn get_enable_distributed_recluster(&self) -> Result { Ok(self.try_get_u64("enable_distributed_recluster")? != 0) } diff --git a/src/query/storages/fuse/src/operations/mutation/mod.rs b/src/query/storages/fuse/src/operations/mutation/mod.rs index 28db295a87d3a..99205f85d3396 100644 --- a/src/query/storages/fuse/src/operations/mutation/mod.rs +++ b/src/query/storages/fuse/src/operations/mutation/mod.rs @@ -20,6 +20,5 @@ pub use meta::*; pub use mutator::*; pub use processors::*; -pub static MAX_BLOCK_COUNT: usize = 10_000; pub type SegmentIndex = usize; pub type BlockIndex = usize; diff --git a/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs b/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs index bd2acdc77e8e9..dd78404d195f1 100644 --- a/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs @@ -45,7 +45,6 @@ use crate::operations::mutation::CompactExtraInfo; use crate::operations::mutation::CompactLazyPartInfo; use crate::operations::mutation::CompactTaskInfo; use crate::operations::mutation::SegmentIndex; -use crate::operations::mutation::MAX_BLOCK_COUNT; use crate::operations::CompactOptions; use crate::statistics::reducers::merge_statistics_mut; use crate::statistics::sort_by_cluster_stats; @@ -84,6 +83,11 @@ impl BlockCompactMutator { let snapshot = self.compact_params.base_snapshot.clone(); let segment_locations = &snapshot.segments; let number_segments = segment_locations.len(); + + let settings = self.ctx.get_settings(); + let compact_max_block_selection = settings.get_compact_max_block_selection()? as usize; + let max_threads = settings.get_max_threads()? as usize; + let num_segment_limit = self .compact_params .num_segment_limit @@ -91,7 +95,7 @@ impl BlockCompactMutator { let num_block_limit = self .compact_params .num_block_limit - .unwrap_or(MAX_BLOCK_COUNT); + .unwrap_or(compact_max_block_selection); info!("block compaction limits: seg {num_segment_limit}, block {num_block_limit}"); @@ -112,7 +116,7 @@ impl BlockCompactMutator { let mut segment_idx = 0; let mut is_end = false; let mut parts = Vec::new(); - let chunk_size = self.ctx.get_settings().get_max_threads()? as usize * 4; + let chunk_size = max_threads * 4; for chunk in segment_locations.chunks(chunk_size) { // Read the segments information in parallel. let mut segment_infos = segments_io @@ -194,7 +198,6 @@ impl BlockCompactMutator { metrics_inc_compact_block_build_lazy_part_milliseconds(elapsed_time.as_millis() as u64); let cluster = self.ctx.get_cluster(); - let max_threads = self.ctx.get_settings().get_max_threads()? as usize; let partitions = if cluster.is_empty() || parts.len() < cluster.nodes.len() * max_threads { // NOTE: The snapshot schema does not contain the stream column. let column_ids = self