From cc42f30169260fd6b4d899ef843117898b110528 Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Thu, 12 Dec 2024 08:41:39 +0800 Subject: [PATCH 1/4] chore: min_skip_partial_aggregation_capacity --- src/query/expression/src/aggregate/mod.rs | 21 +++++++++++++++---- .../pipelines/builders/builder_aggregate.rs | 15 ++++++++++--- src/query/settings/src/settings_default.rs | 6 ++++++ .../settings/src/settings_getter_setter.rs | 4 ++++ 4 files changed, 39 insertions(+), 7 deletions(-) diff --git a/src/query/expression/src/aggregate/mod.rs b/src/query/expression/src/aggregate/mod.rs index 6911a0efc3cf..08c85e9d511f 100644 --- a/src/query/expression/src/aggregate/mod.rs +++ b/src/query/expression/src/aggregate/mod.rs @@ -89,7 +89,12 @@ impl HashTableConfig { self } - pub fn with_partial(mut self, partial_agg: bool, active_threads: usize) -> Self { + pub fn with_partial( + mut self, + partial_agg: bool, + active_threads: usize, + min_skip_partial_aggregation_capacity: usize, + ) -> Self { self.partial_agg = partial_agg; // init max_partial_capacity @@ -97,16 +102,24 @@ impl HashTableConfig { let cache_per_active_thread = L1_CACHE_SIZE + L2_CACHE_SIZE + total_shared_cache_size / active_threads; let size_per_entry = (8_f64 * LOAD_FACTOR) as usize; - let capacity = (cache_per_active_thread / size_per_entry).next_power_of_two(); + let capacity = (cache_per_active_thread / size_per_entry) + .next_power_of_two() + .max(min_skip_partial_aggregation_capacity); self.max_partial_capacity = capacity; self } - pub fn cluster_with_partial(mut self, partial_agg: bool, node_nums: usize) -> Self { + pub fn cluster_with_partial( + mut self, + partial_agg: bool, + node_nums: usize, + min_skip_partial_aggregation_capacity: usize, + ) -> Self { self.partial_agg = partial_agg; self.repartition_radix_bits_incr = 4; - self.max_partial_capacity = 131072 * (2 << node_nums); + self.max_partial_capacity = + (131072 * (2 << node_nums)).max(min_skip_partial_aggregation_capacity); self } diff --git a/src/query/service/src/pipelines/builders/builder_aggregate.rs b/src/query/service/src/pipelines/builders/builder_aggregate.rs index 1ecf01b6b0f8..e83579e052ac 100644 --- a/src/query/service/src/pipelines/builders/builder_aggregate.rs +++ b/src/query/service/src/pipelines/builders/builder_aggregate.rs @@ -127,13 +127,22 @@ impl PipelineBuilder { let schema_before_group_by = params.input_schema.clone(); let sample_block = DataBlock::empty_with_schema(schema_before_group_by); let method = DataBlock::choose_hash_method(&sample_block, group_cols, efficiently_memory)?; + let min_skip_partial_aggregation_capacity = + self.settings.get_min_skip_partial_aggregation_capacity()? as usize; // Need a global atomic to read the max current radix bits hint let partial_agg_config = if !self.is_exchange_neighbor { - HashTableConfig::default().with_partial(true, max_threads as usize) + HashTableConfig::default().with_partial( + true, + max_threads as usize, + min_skip_partial_aggregation_capacity, + ) } else { - HashTableConfig::default() - .cluster_with_partial(true, self.ctx.get_cluster().nodes.len()) + HashTableConfig::default().cluster_with_partial( + true, + self.ctx.get_cluster().nodes.len(), + min_skip_partial_aggregation_capacity, + ) }; self.main_pipeline.add_transform(|input, output| { diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 26944906f790..cceea801a920 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -713,6 +713,12 @@ impl DefaultSettings { mode: SettingMode::Both, range: Some(SettingRange::Numeric(0..=1)), }), + ("min_skip_partial_aggregation_capacity", DefaultSettingValue { + value: UserSettingValue::UInt64(0), + desc: "The min size of aggregate hashtable skip partial aggregation capacity", + mode: SettingMode::Both, + range: Some(SettingRange::Numeric(0..=usize::MAX)), + }), ("numeric_cast_option", DefaultSettingValue { value: UserSettingValue::String("rounding".to_string()), desc: "Set numeric cast mode as \"rounding\" or \"truncating\".", diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 7a71ede7e73a..5c48dc45b55e 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -423,6 +423,10 @@ impl Settings { Ok(self.try_get_u64("enable_experimental_aggregate_hashtable")? == 1) } + pub fn get_min_skip_partial_aggregation_capacity(&self) -> Result { + self.try_get_u64("min_skip_partial_aggregation_capacity") + } + pub fn get_lazy_read_threshold(&self) -> Result { self.try_get_u64("lazy_read_threshold") } From 7b84df049e949e752120a0fc8113ba058b7a2001 Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Thu, 12 Dec 2024 08:50:06 +0800 Subject: [PATCH 2/4] update --- .cargo/audit.toml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.cargo/audit.toml b/.cargo/audit.toml index 4334d3ec6c0a..c76f834db5a4 100644 --- a/.cargo/audit.toml +++ b/.cargo/audit.toml @@ -47,8 +47,10 @@ ignore = [ "RUSTSEC-2024-0351", # gix-path: improperly resolves configuration path reported by Git "RUSTSEC-2024-0371", - # Remotely exploitable Denial of Service in Tonic, ignored temporarily + # Remotely exploitable Denial of Service in Tonic, ignored temporarily "RUSTSEC-2024-0376", #rustls network-reachable panic in `Acceptor::accept` "RUSTSEC-2024-0399", + # `idna` accepts Punycode labels that do not produce any non-ASCII when decoded + "RUSTSEC-2024-0421", ] From 8ed0fc2f4f28eb7cf9d08bf77fefa9711508c586 Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Thu, 12 Dec 2024 09:01:58 +0800 Subject: [PATCH 3/4] update --- src/query/settings/src/settings_default.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index cceea801a920..d9bf23f3535c 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -717,7 +717,7 @@ impl DefaultSettings { value: UserSettingValue::UInt64(0), desc: "The min size of aggregate hashtable skip partial aggregation capacity", mode: SettingMode::Both, - range: Some(SettingRange::Numeric(0..=usize::MAX)), + range: Some(SettingRange::Numeric(0..=u64::MAX)), }), ("numeric_cast_option", DefaultSettingValue { value: UserSettingValue::String("rounding".to_string()), From 334350a64c20c4b7dee80ca94d183dd0ab37e3f7 Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Thu, 12 Dec 2024 16:40:40 +0800 Subject: [PATCH 4/4] chore(query): degrade lz4 in 636-rc --- Cargo.lock | 8 ++++---- .../service/src/pipelines/builders/builder_aggregate.rs | 6 ++++++ 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9325fe41311a..adbf8d91ccc0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10139,9 +10139,9 @@ dependencies = [ [[package]] name = "lz4" -version = "1.26.0" +version = "1.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "958b4caa893816eea05507c20cfe47574a43d9a697138a7872990bba8a0ece68" +checksum = "7e9e2dd86df36ce760a60f6ff6ad526f7ba1f14ba0356f8254fb6905e6494df1" dependencies = [ "libc", "lz4-sys", @@ -10149,9 +10149,9 @@ dependencies = [ [[package]] name = "lz4-sys" -version = "1.10.0" +version = "1.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "109de74d5d2353660401699a4174a4ff23fcc649caf553df71933c7fb45ad868" +checksum = "57d27b317e207b10f69f5e75494119e391a96f48861ae870d1da6edac98ca900" dependencies = [ "cc", "libc", diff --git a/src/query/service/src/pipelines/builders/builder_aggregate.rs b/src/query/service/src/pipelines/builders/builder_aggregate.rs index e83579e052ac..0245f2789260 100644 --- a/src/query/service/src/pipelines/builders/builder_aggregate.rs +++ b/src/query/service/src/pipelines/builders/builder_aggregate.rs @@ -145,6 +145,12 @@ impl PipelineBuilder { ) }; + log::info!( + "[build_aggregate_partial] partial_agg_config: {:?}, {}", + partial_agg_config, + self.is_exchange_neighbor + ); + self.main_pipeline.add_transform(|input, output| { Ok(ProcessorPtr::create( match params.aggregate_functions.is_empty() {