diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/build_spill/transform_build_spill.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/build_spill/transform_build_spill.rs index 6ac3b11e8772..202bf20e58d6 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/build_spill/transform_build_spill.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/build_spill/transform_build_spill.rs @@ -99,6 +99,10 @@ impl BuildSpillHandler { if join_type == &JoinType::Cross { return self.spill_cross_join().await; } + if self.pending_spill_data.is_empty() && !self.spill_state().spiller.empty_buffer() { + // Spill data in spiller buffer + return self.spill_state_mut().spiller.spill_buffer().await; + } // Concat the data blocks that pending to spill to reduce the spill file number. let pending_spill_data = DataBlock::concat(&self.pending_spill_data)?; let mut hashes = Vec::with_capacity(pending_spill_data.num_rows()); @@ -130,15 +134,7 @@ impl BuildSpillHandler { build_state: &Arc, processor_id: usize, ) -> Result { - // Add spilled partition ids to `spill_partitions` of `HashJoinBuildState` let spilled_partition_set = self.spill_state().spiller.spilled_partitions(); - if build_state.join_type() != JoinType::Cross { - info!( - "build processor-{:?}: spill finished with spilled partitions {:?}", - processor_id, spilled_partition_set - ); - } - // For left-related join, will spill all build input blocks which means there isn't first-round hash table. // Because first-round hash table will make left join generate wrong results. // Todo: make left-related join leverage first-round hash table to reduce I/O. @@ -151,18 +147,28 @@ impl BuildSpillHandler { return Ok(HashJoinBuildStep::Spill); } - if !spilled_partition_set.is_empty() { - build_state - .spilled_partition_set - .write() - .extend(spilled_partition_set); - } // The processor has accepted all data from downstream // If there is still pending spill data, add to row space. for data in self.pending_spill_data.iter() { build_state.build(data.clone())?; } self.pending_spill_data.clear(); + // Check if there is data in spiller buffer + if !self.spill_state().spiller.empty_buffer() { + return Ok(HashJoinBuildStep::Spill); + } + if build_state.join_type() != JoinType::Cross { + info!( + "build processor-{:?}: spill finished with spilled partitions {:?}", + processor_id, spilled_partition_set + ); + } + if !spilled_partition_set.is_empty() { + build_state + .spilled_partition_set + .write() + .extend(spilled_partition_set); + } Ok(HashJoinBuildStep::Running) } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_spill/transform_probe_spill.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_spill/transform_probe_spill.rs index 37df11499702..afd9a4fea31e 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_spill/transform_probe_spill.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_spill/transform_probe_spill.rs @@ -38,6 +38,8 @@ pub struct ProbeSpillHandler { next_restore_file: usize, // Save input block from the processor if the input block has zero columns. input_blocks: Vec, + // The flag indicates whether spill the buffer data + spill_buffer: bool, } impl ProbeSpillHandler { @@ -48,6 +50,7 @@ impl ProbeSpillHandler { probe_first_round_hashtable: true, next_restore_file: 0, input_blocks: vec![], + spill_buffer: false, } } @@ -85,6 +88,14 @@ impl ProbeSpillHandler { self.spill_done = true; } + pub fn need_spill_buffer(&self) -> bool { + self.spill_buffer + } + + pub fn set_need_spill_buffer(&mut self) { + self.spill_buffer = true; + } + pub fn add_partition_loc(&mut self, id: u8, loc: Vec) { self.spill_state_mut() .spiller @@ -138,6 +149,16 @@ impl ProbeSpillHandler { .spill_input(data_block, hashes, left_related_join, spilled_partitions) .await } + + // Check if spiller buffer is empty + pub fn empty_buffer(&self) -> bool { + self.spill_state().spiller.empty_buffer() + } + + // Spill buffer data + pub async fn spill_buffer(&mut self) -> Result<()> { + self.spill_state_mut().spiller.spill_buffer().await + } } /// The following methods only used for cross join @@ -226,6 +247,12 @@ impl TransformHashJoinProbe { // then add spill_partitions to `spill_partition_set` and set `spill_done` to true. // change current step to `WaitBuild` pub(crate) fn spill_finished(&mut self, processor_id: usize) -> Result { + if !self.spill_handler.empty_buffer() { + self.step = HashJoinProbeStep::Spill; + self.step_logs.push(HashJoinProbeStep::Spill); + self.spill_handler.set_need_spill_buffer(); + return Ok(Event::Async); + } self.spill_handler.set_spill_done(); // Add spilled partition ids to `spill_partitions` of `HashJoinProbeState` let spilled_partition_set = &self.spill_handler.spilled_partitions(); @@ -293,6 +320,11 @@ impl TransformHashJoinProbe { // Async spill action pub(crate) async fn spill_action(&mut self) -> Result<()> { + if self.spill_handler.need_spill_buffer() { + self.spill_handler.spill_buffer().await?; + self.spill_finished(self.processor_id)?; + return Ok(()); + } // Before spilling, if there is a hash table, probe the hash table first self.try_probe_first_round_hashtable(self.input_data.clone())?; let left_related_join_type = matches!( diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs index 98e99b172688..7021d67a9fc1 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs @@ -235,6 +235,10 @@ impl TransformHashJoinProbe { Ok(Event::Async) } + fn spill(&self) -> Result { + Ok(Event::Async) + } + // Running // When spilling is enabled, the method contains two running paths // 1. Before spilling, it will pull data from input port and go to spill @@ -391,7 +395,7 @@ impl Processor for TransformHashJoinProbe { HashJoinProbeStep::Running => self.run(), HashJoinProbeStep::Restore => self.restore(), HashJoinProbeStep::FinalScan => self.final_scan(), - HashJoinProbeStep::Spill => unreachable!("{:?}", self.step), + HashJoinProbeStep::Spill => self.spill(), } } diff --git a/src/query/service/src/spillers/mod.rs b/src/query/service/src/spillers/mod.rs index 2d5afa9645ea..c692d741f1b4 100644 --- a/src/query/service/src/spillers/mod.rs +++ b/src/query/service/src/spillers/mod.rs @@ -13,6 +13,7 @@ // limitations under the License. mod spiller; +mod spiller_buffer; pub use spiller::Spiller; pub use spiller::SpillerConfig; diff --git a/src/query/service/src/spillers/spiller.rs b/src/query/service/src/spillers/spiller.rs index 91c7cf0b0045..a11587cf7b25 100644 --- a/src/query/service/src/spillers/spiller.rs +++ b/src/query/service/src/spillers/spiller.rs @@ -28,6 +28,7 @@ use databend_common_expression::DataBlock; use opendal::Operator; use crate::sessions::QueryContext; +use crate::spillers::spiller_buffer::SpillerBuffer; /// Spiller type, currently only supports HashJoin #[derive(Clone, Debug, Eq, PartialEq)] @@ -72,6 +73,7 @@ pub struct Spiller { operator: Operator, config: SpillerConfig, _spiller_type: SpillerType, + spiller_buffer: SpillerBuffer, pub join_spilling_partition_bits: usize, /// 1 partition -> N partition files pub partition_location: HashMap>, @@ -93,6 +95,7 @@ impl Spiller { operator, config, _spiller_type: spiller_type, + spiller_buffer: SpillerBuffer::create(), join_spilling_partition_bits, partition_location: Default::default(), columns_layout: Default::default(), @@ -191,7 +194,6 @@ impl Spiller { } #[async_backtrace::framed] - // Directly spill input data without buffering. // Need to compute hashes for data block advanced. // For probe, only need to spill rows in build spilled partitions. // For left-related join, need to record rows not in build spilled partitions. @@ -234,8 +236,12 @@ impl Spiller { &block_row_indexes, row_indexes.len(), ); - // Spill block with partition id - self.spill_with_partition(*p_id, block).await?; + if let Some((p_id, block)) = self + .spiller_buffer + .add_partition_unspilled_data(*p_id, block)? + { + self.spill_with_partition(p_id, block).await?; + } } if !left_related_join { return Ok(None); @@ -251,6 +257,24 @@ impl Spiller { ))) } + // Spill the data in the buffer at the end of spilling + pub(crate) async fn spill_buffer(&mut self) -> Result<()> { + let partition_unspilled_data = self.spiller_buffer.partition_unspilled_data(); + for (partition_id, blocks) in partition_unspilled_data.iter() { + if !blocks.is_empty() { + let merged_block = DataBlock::concat(blocks)?; + self.spill_with_partition(*partition_id, merged_block) + .await?; + } + } + self.spiller_buffer.reset(); + Ok(()) + } + + pub(crate) fn empty_buffer(&self) -> bool { + self.spiller_buffer.empty() + } + pub(crate) fn spilled_files(&self) -> Vec { self.columns_layout.keys().cloned().collect() } diff --git a/src/query/service/src/spillers/spiller_buffer.rs b/src/query/service/src/spillers/spiller_buffer.rs new file mode 100644 index 000000000000..9e64b3429f58 --- /dev/null +++ b/src/query/service/src/spillers/spiller_buffer.rs @@ -0,0 +1,84 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::DataBlock; + +// The spiller buffer will record each partition's unspilled data. +// When the buffer is full(>=8MB), it will pick the partition with the most unspilled data to spill. +#[derive(Clone)] +pub(crate) struct SpillerBuffer { + partition_unspilled_data: HashMap>, + buffer_size: usize, +} + +impl SpillerBuffer { + pub(crate) fn create() -> Self { + SpillerBuffer { + partition_unspilled_data: HashMap::new(), + buffer_size: 0, + } + } + + // Add a partition's unspilled data to the buffer + // The method will check if the buffer is full, if so, it will spill the partition with the most unspilled data + // After spilling, the buffer will be clear the spilled partition's unspilled data + // The return value is the partition id and the spilled data + pub(crate) fn add_partition_unspilled_data( + &mut self, + partition_id: u8, + data: DataBlock, + ) -> Result> { + fn rows(data: &[DataBlock]) -> usize { + data.iter().map(|block| block.num_rows()).sum() + } + let data_size = data.memory_size(); + self.buffer_size += data_size; + self.partition_unspilled_data + .entry(partition_id) + .or_default() + .push(data); + if self.buffer_size >= 8 * 1024 * 1024 { + // Pick the partition with the most unspilled data in `partition_unspilled_data` + let (partition_id, blocks) = self + .partition_unspilled_data + .iter() + .max_by_key(|(_, v)| rows(v)) + .map(|(k, v)| (*k, v.clone())) + .ok_or_else(|| ErrorCode::Internal("No unspilled data in the buffer"))?; + debug_assert!(!blocks.is_empty()); + self.partition_unspilled_data.remove(&partition_id); + let merged_block = DataBlock::concat(&blocks)?; + self.buffer_size -= merged_block.memory_size(); + return Ok(Some((partition_id, merged_block))); + } + Ok(None) + } + + pub(crate) fn empty(&self) -> bool { + self.buffer_size == 0 + } + + pub(crate) fn partition_unspilled_data(&self) -> HashMap> { + self.partition_unspilled_data.clone() + } + + pub(crate) fn reset(&mut self) { + self.partition_unspilled_data.clear(); + self.buffer_size = 0; + } +}