From d444db29aa2c35ebc5cc0afa7cc9252f2ce20049 Mon Sep 17 00:00:00 2001 From: Winter Zhang Date: Thu, 12 Sep 2024 08:30:01 +0800 Subject: [PATCH] chore(query): limit max concurrency of spill io requests (#16442) --- .../pipelines/builders/builder_aggregate.rs | 7 +++++ .../src/pipelines/builders/builder_window.rs | 12 ++++++- .../aggregator/aggregator_params.rs | 3 ++ .../serde/transform_spill_reader.rs | 7 +++++ .../aggregator/transform_partition_bucket.rs | 31 ++++++++++++++++--- ...transform_window_partition_spill_reader.rs | 6 ++++ src/query/settings/src/settings_default.rs | 19 +++++++++++- .../settings/src/settings_getter_setter.rs | 4 +++ 8 files changed, 83 insertions(+), 6 deletions(-) diff --git a/src/query/service/src/pipelines/builders/builder_aggregate.rs b/src/query/service/src/pipelines/builders/builder_aggregate.rs index 28e35c504a81..1ecf01b6b0f8 100644 --- a/src/query/service/src/pipelines/builders/builder_aggregate.rs +++ b/src/query/service/src/pipelines/builders/builder_aggregate.rs @@ -98,6 +98,7 @@ impl PipelineBuilder { let max_block_size = self.settings.get_max_block_size()?; let max_threads = self.settings.get_max_threads()?; + let max_spill_io_requests = self.settings.get_max_spill_io_requests()?; let enable_experimental_aggregate_hashtable = self .settings @@ -111,6 +112,7 @@ impl PipelineBuilder { self.is_exchange_neighbor, max_block_size as usize, None, + max_spill_io_requests as usize, )?; if params.group_columns.is_empty() { @@ -214,6 +216,8 @@ impl PipelineBuilder { let enable_experimental_aggregate_hashtable = self .settings .get_enable_experimental_aggregate_hashtable()?; + let max_spill_io_requests = self.settings.get_max_spill_io_requests()?; + let params = Self::build_aggregator_params( aggregate.before_group_by_schema.clone(), &aggregate.group_by, @@ -222,6 +226,7 @@ impl PipelineBuilder { self.is_exchange_neighbor, max_block_size as usize, aggregate.limit, + max_spill_io_requests as usize, )?; if params.group_columns.is_empty() { @@ -288,6 +293,7 @@ impl PipelineBuilder { cluster_aggregator: bool, max_block_size: usize, limit: Option, + max_spill_io_requests: usize, ) -> Result> { let mut agg_args = Vec::with_capacity(agg_funcs.len()); let (group_by, group_data_types) = group_by @@ -330,6 +336,7 @@ impl PipelineBuilder { cluster_aggregator, max_block_size, limit, + max_spill_io_requests, )?; Ok(params) diff --git a/src/query/service/src/pipelines/builders/builder_window.rs b/src/query/service/src/pipelines/builders/builder_window.rs index f81b7df0c6e4..ece829239a5b 100644 --- a/src/query/service/src/pipelines/builders/builder_window.rs +++ b/src/query/service/src/pipelines/builders/builder_window.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; use databend_common_expression::types::DataType; @@ -26,6 +28,7 @@ use databend_common_pipeline_core::PipeItem; use databend_common_sql::executor::physical_plans::Window; use databend_common_sql::executor::physical_plans::WindowPartition; use databend_common_storage::DataOperator; +use tokio::sync::Semaphore; use crate::pipelines::processors::transforms::FrameBound; use crate::pipelines::processors::transforms::TransformWindowPartitionBucket; @@ -205,8 +208,15 @@ impl PipelineBuilder { self.main_pipeline.try_resize(input_nums)?; + let max_spill_io_requests = self.settings.get_max_spill_io_requests()? as usize; + let semaphore = Arc::new(Semaphore::new(max_spill_io_requests)); self.main_pipeline.add_transform(|input, output| { - TransformWindowPartitionSpillReader::create(input, output, operator.clone()) + TransformWindowPartitionSpillReader::create( + input, + output, + operator.clone(), + semaphore.clone(), + ) })?; let block_size = self.settings.get_max_block_size()? as usize; diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_params.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_params.rs index 643c758f2472..9c1466184a77 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_params.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_params.rs @@ -46,6 +46,7 @@ pub struct AggregatorParams { pub max_block_size: usize, // Limit is push down to AggregatorTransform pub limit: Option, + pub max_spill_io_requests: usize, } impl AggregatorParams { @@ -59,6 +60,7 @@ impl AggregatorParams { cluster_aggregator: bool, max_block_size: usize, limit: Option, + max_spill_io_requests: usize, ) -> Result> { let mut states_offsets: Vec = Vec::with_capacity(agg_funcs.len()); let mut states_layout = None; @@ -79,6 +81,7 @@ impl AggregatorParams { cluster_aggregator, max_block_size, limit, + max_spill_io_requests, })) } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_spill_reader.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_spill_reader.rs index 8727754231c5..3bf4f8bd7d3a 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_spill_reader.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_spill_reader.rs @@ -34,6 +34,7 @@ use databend_common_pipeline_core::processors::ProcessorPtr; use itertools::Itertools; use log::info; use opendal::Operator; +use tokio::sync::Semaphore; use crate::pipelines::processors::transforms::aggregator::AggregateMeta; use crate::pipelines::processors::transforms::aggregator::BucketSpilledPayload; @@ -47,6 +48,7 @@ pub struct TransformSpillReader, operator: Operator, + semaphore: Arc, deserialized_meta: Option, reading_meta: Option>, deserializing_meta: Option>, @@ -183,6 +185,7 @@ impl Processor AggregateMeta::AggregateSpilling(_) => unreachable!(), AggregateMeta::Serialized(_) => unreachable!(), AggregateMeta::BucketSpilled(payload) => { + let _guard = self.semaphore.acquire().await; let instant = Instant::now(); let data = self .operator @@ -211,7 +214,9 @@ impl Processor let location = payload.location.clone(); let operator = self.operator.clone(); let data_range = payload.data_range.clone(); + let semaphore = self.semaphore.clone(); read_data.push(databend_common_base::runtime::spawn(async move { + let _guard = semaphore.acquire().await; let instant = Instant::now(); let data = operator .read_with(&location) @@ -282,6 +287,7 @@ impl TransformSpillReader, output: Arc, operator: Operator, + semaphore: Arc, ) -> Result { Ok(ProcessorPtr::create(Box::new(TransformSpillReader::< Method, @@ -290,6 +296,7 @@ impl TransformSpillReader TransformGroupBySpillReader::::create(input, output, operator), - false => TransformAggregateSpillReader::::create(input, output, operator), + true => TransformGroupBySpillReader::::create( + input, + output, + operator, + semaphore.clone(), + ), + false => TransformAggregateSpillReader::::create( + input, + output, + operator, + semaphore.clone(), + ), } })?; @@ -501,12 +513,23 @@ pub fn build_partition_bucket TransformGroupBySpillReader::::create(input, output, operator), - false => TransformAggregateSpillReader::::create(input, output, operator), + true => TransformGroupBySpillReader::::create( + input, + output, + operator, + semaphore.clone(), + ), + false => TransformAggregateSpillReader::::create( + input, + output, + operator, + semaphore.clone(), + ), } })?; diff --git a/src/query/service/src/pipelines/processors/transforms/window/partition_by/transform_window_partition_spill_reader.rs b/src/query/service/src/pipelines/processors/transforms/window/partition_by/transform_window_partition_spill_reader.rs index 732a2d94c8ee..e16359036432 100644 --- a/src/query/service/src/pipelines/processors/transforms/window/partition_by/transform_window_partition_spill_reader.rs +++ b/src/query/service/src/pipelines/processors/transforms/window/partition_by/transform_window_partition_spill_reader.rs @@ -34,6 +34,7 @@ use databend_common_pipeline_core::processors::ProcessorPtr; use itertools::Itertools; use log::info; use opendal::Operator; +use tokio::sync::Semaphore; use super::WindowPartitionMeta; use super::WindowPayload; @@ -46,6 +47,7 @@ pub struct TransformWindowPartitionSpillReader { output: Arc, operator: Operator, + semaphore: Arc, deserialized_meta: Option, reading_meta: Option, deserializing_meta: Option, @@ -170,7 +172,9 @@ impl Processor for TransformWindowPartitionSpillReader { let location = p.location.clone(); let operator = self.operator.clone(); let data_range = p.data_range.clone(); + let semaphore = self.semaphore.clone(); read_data.push(databend_common_base::runtime::spawn(async move { + let _guard = semaphore.acquire().await; let instant = Instant::now(); let data = operator .read_with(&location) @@ -243,12 +247,14 @@ impl TransformWindowPartitionSpillReader { input: Arc, output: Arc, operator: Operator, + semaphore: Arc, ) -> Result { Ok(ProcessorPtr::create(Box::new( TransformWindowPartitionSpillReader { input, output, operator, + semaphore, deserialized_meta: None, reading_meta: None, deserializing_meta: None, diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index fe297db2e40f..2c189bb5681f 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -116,6 +116,7 @@ impl DefaultSettings { let num_cpus = Self::num_cpus(); let max_memory_usage = Self::max_memory_usage()?; let recluster_block_size = Self::recluster_block_size()?; + let default_max_spill_io_requests = Self::spill_io_requests(num_cpus); let default_max_storage_io_requests = Self::storage_io_requests(num_cpus); let data_retention_time_in_days_max = Self::data_retention_time_in_days_max(); let global_conf = GlobalConfig::try_get_instance(); @@ -159,9 +160,15 @@ impl DefaultSettings { mode: SettingMode::Both, range: Some(SettingRange::Numeric(0..=data_retention_time_in_days_max)), }), + ("max_spill_io_requests", DefaultSettingValue { + value: UserSettingValue::UInt64(default_max_spill_io_requests), + desc: "Sets the maximum number of concurrent spill I/O requests.", + mode: SettingMode::Both, + range: Some(SettingRange::Numeric(1..=1024)), + }), ("max_storage_io_requests", DefaultSettingValue { value: UserSettingValue::UInt64(default_max_storage_io_requests), - desc: "Sets the maximum number of concurrent I/O requests.", + desc: "Sets the maximum number of concurrent storage I/O requests.", mode: SettingMode::Both, range: Some(SettingRange::Numeric(1..=1024)), }), @@ -880,6 +887,16 @@ impl DefaultSettings { } } + fn spill_io_requests(num_cpus: u64) -> u64 { + match GlobalConfig::try_get_instance() { + None => std::cmp::min(num_cpus, 64), + Some(conf) => match conf.storage.params.is_fs() { + true => 48, + false => std::cmp::min(num_cpus, 64), + }, + } + } + /// The maximum number of days that data can be retained. /// The max is read from the global config:data_retention_time_in_days_max /// If the global config is not set, the default value is 90 days. diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 7453cc368f8b..866e56772072 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -709,4 +709,8 @@ impl Settings { pub fn get_dynamic_sample_time_budget_ms(&self) -> Result { self.try_get_u64("dynamic_sample_time_budget_ms") } + + pub fn get_max_spill_io_requests(&self) -> Result { + self.try_get_u64("max_spill_io_requests") + } }