Skip to content

Commit

Permalink
chore: add setting compact_max_block_selection
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed May 25, 2024
1 parent c8e3c20 commit bdd7f0f
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 5 deletions.
6 changes: 6 additions & 0 deletions src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,12 @@ impl DefaultSettings {
mode: SettingMode::Both,
range: Some(SettingRange::Numeric(0..=u64::MAX)),
}),
("compact_max_block_selection", DefaultSettingValue {
value: UserSettingValue::UInt64(10000),
desc: "Limits the maximum number of blocks that can be selected during a compact operation.",
mode: SettingMode::Both,
range: Some(SettingRange::Numeric(2..=u64::MAX)),
}),
("enable_distributed_recluster", DefaultSettingValue {
value: UserSettingValue::UInt64(0),
desc: "Enable distributed execution of table recluster.",
Expand Down
8 changes: 8 additions & 0 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,14 @@ impl Settings {
self.try_get_u64("recluster_block_size")
}

pub fn set_compact_max_block_selection(&self, val: u64) -> Result<()> {
self.try_set_u64("compact_max_block_selection", val)
}

pub fn get_compact_max_block_selection(&self) -> Result<u64> {
self.try_get_u64("compact_max_block_selection")
}

pub fn get_enable_distributed_recluster(&self) -> Result<bool> {
Ok(self.try_get_u64("enable_distributed_recluster")? != 0)
}
Expand Down
1 change: 0 additions & 1 deletion src/query/storages/fuse/src/operations/mutation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,5 @@ pub use meta::*;
pub use mutator::*;
pub use processors::*;

pub static MAX_BLOCK_COUNT: usize = 10_000;
pub type SegmentIndex = usize;
pub type BlockIndex = usize;
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ use crate::operations::mutation::CompactExtraInfo;
use crate::operations::mutation::CompactLazyPartInfo;
use crate::operations::mutation::CompactTaskInfo;
use crate::operations::mutation::SegmentIndex;
use crate::operations::mutation::MAX_BLOCK_COUNT;
use crate::operations::CompactOptions;
use crate::statistics::reducers::merge_statistics_mut;
use crate::statistics::sort_by_cluster_stats;
Expand Down Expand Up @@ -84,14 +83,19 @@ impl BlockCompactMutator {
let snapshot = self.compact_params.base_snapshot.clone();
let segment_locations = &snapshot.segments;
let number_segments = segment_locations.len();

let settings = self.ctx.get_settings();
let compact_max_block_selection = settings.get_compact_max_block_selection()? as usize;
let max_threads = settings.get_max_threads()? as usize;

let num_segment_limit = self
.compact_params
.num_segment_limit
.unwrap_or(number_segments);
let num_block_limit = self
.compact_params
.num_block_limit
.unwrap_or(MAX_BLOCK_COUNT);
.unwrap_or(compact_max_block_selection);

info!("block compaction limits: seg {num_segment_limit}, block {num_block_limit}");

Expand All @@ -112,7 +116,7 @@ impl BlockCompactMutator {
let mut segment_idx = 0;
let mut is_end = false;
let mut parts = Vec::new();
let chunk_size = self.ctx.get_settings().get_max_threads()? as usize * 4;
let chunk_size = max_threads * 4;
for chunk in segment_locations.chunks(chunk_size) {
// Read the segments information in parallel.
let mut segment_infos = segments_io
Expand Down Expand Up @@ -194,7 +198,6 @@ impl BlockCompactMutator {
metrics_inc_compact_block_build_lazy_part_milliseconds(elapsed_time.as_millis() as u64);

let cluster = self.ctx.get_cluster();
let max_threads = self.ctx.get_settings().get_max_threads()? as usize;
let partitions = if cluster.is_empty() || parts.len() < cluster.nodes.len() * max_threads {
// NOTE: The snapshot schema does not contain the stream column.
let column_ids = self
Expand Down

0 comments on commit bdd7f0f

Please sign in to comment.