Skip to content

Commit

Permalink
feat: add buffer for spiller (databendlabs#15021)
Browse files Browse the repository at this point in the history
  • Loading branch information
xudong963 authored Mar 20, 2024
1 parent 4c852e2 commit 31a63b1
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ impl BuildSpillHandler {
if join_type == &JoinType::Cross {
return self.spill_cross_join().await;
}
if self.pending_spill_data.is_empty() && !self.spill_state().spiller.empty_buffer() {
// Spill data in spiller buffer
return self.spill_state_mut().spiller.spill_buffer().await;
}
// Concat the data blocks that pending to spill to reduce the spill file number.
let pending_spill_data = DataBlock::concat(&self.pending_spill_data)?;
let mut hashes = Vec::with_capacity(pending_spill_data.num_rows());
Expand Down Expand Up @@ -130,15 +134,7 @@ impl BuildSpillHandler {
build_state: &Arc<HashJoinBuildState>,
processor_id: usize,
) -> Result<HashJoinBuildStep> {
// Add spilled partition ids to `spill_partitions` of `HashJoinBuildState`
let spilled_partition_set = self.spill_state().spiller.spilled_partitions();
if build_state.join_type() != JoinType::Cross {
info!(
"build processor-{:?}: spill finished with spilled partitions {:?}",
processor_id, spilled_partition_set
);
}

// For left-related join, will spill all build input blocks which means there isn't first-round hash table.
// Because first-round hash table will make left join generate wrong results.
// Todo: make left-related join leverage first-round hash table to reduce I/O.
Expand All @@ -151,18 +147,28 @@ impl BuildSpillHandler {
return Ok(HashJoinBuildStep::Spill);
}

if !spilled_partition_set.is_empty() {
build_state
.spilled_partition_set
.write()
.extend(spilled_partition_set);
}
// The processor has accepted all data from downstream
// If there is still pending spill data, add to row space.
for data in self.pending_spill_data.iter() {
build_state.build(data.clone())?;
}
self.pending_spill_data.clear();
// Check if there is data in spiller buffer
if !self.spill_state().spiller.empty_buffer() {
return Ok(HashJoinBuildStep::Spill);
}
if build_state.join_type() != JoinType::Cross {
info!(
"build processor-{:?}: spill finished with spilled partitions {:?}",
processor_id, spilled_partition_set
);
}
if !spilled_partition_set.is_empty() {
build_state
.spilled_partition_set
.write()
.extend(spilled_partition_set);
}
Ok(HashJoinBuildStep::Running)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ pub struct ProbeSpillHandler {
next_restore_file: usize,
// Save input block from the processor if the input block has zero columns.
input_blocks: Vec<DataBlock>,
// The flag indicates whether spill the buffer data
spill_buffer: bool,
}

impl ProbeSpillHandler {
Expand All @@ -48,6 +50,7 @@ impl ProbeSpillHandler {
probe_first_round_hashtable: true,
next_restore_file: 0,
input_blocks: vec![],
spill_buffer: false,
}
}

Expand Down Expand Up @@ -85,6 +88,14 @@ impl ProbeSpillHandler {
self.spill_done = true;
}

pub fn need_spill_buffer(&self) -> bool {
self.spill_buffer
}

pub fn set_need_spill_buffer(&mut self) {
self.spill_buffer = true;
}

pub fn add_partition_loc(&mut self, id: u8, loc: Vec<String>) {
self.spill_state_mut()
.spiller
Expand Down Expand Up @@ -138,6 +149,16 @@ impl ProbeSpillHandler {
.spill_input(data_block, hashes, left_related_join, spilled_partitions)
.await
}

// Check if spiller buffer is empty
pub fn empty_buffer(&self) -> bool {
self.spill_state().spiller.empty_buffer()
}

// Spill buffer data
pub async fn spill_buffer(&mut self) -> Result<()> {
self.spill_state_mut().spiller.spill_buffer().await
}
}

/// The following methods only used for cross join
Expand Down Expand Up @@ -226,6 +247,12 @@ impl TransformHashJoinProbe {
// then add spill_partitions to `spill_partition_set` and set `spill_done` to true.
// change current step to `WaitBuild`
pub(crate) fn spill_finished(&mut self, processor_id: usize) -> Result<Event> {
if !self.spill_handler.empty_buffer() {
self.step = HashJoinProbeStep::Spill;
self.step_logs.push(HashJoinProbeStep::Spill);
self.spill_handler.set_need_spill_buffer();
return Ok(Event::Async);
}
self.spill_handler.set_spill_done();
// Add spilled partition ids to `spill_partitions` of `HashJoinProbeState`
let spilled_partition_set = &self.spill_handler.spilled_partitions();
Expand Down Expand Up @@ -293,6 +320,11 @@ impl TransformHashJoinProbe {

// Async spill action
pub(crate) async fn spill_action(&mut self) -> Result<()> {
if self.spill_handler.need_spill_buffer() {
self.spill_handler.spill_buffer().await?;
self.spill_finished(self.processor_id)?;
return Ok(());
}
// Before spilling, if there is a hash table, probe the hash table first
self.try_probe_first_round_hashtable(self.input_data.clone())?;
let left_related_join_type = matches!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,10 @@ impl TransformHashJoinProbe {
Ok(Event::Async)
}

fn spill(&self) -> Result<Event> {
Ok(Event::Async)
}

// Running
// When spilling is enabled, the method contains two running paths
// 1. Before spilling, it will pull data from input port and go to spill
Expand Down Expand Up @@ -391,7 +395,7 @@ impl Processor for TransformHashJoinProbe {
HashJoinProbeStep::Running => self.run(),
HashJoinProbeStep::Restore => self.restore(),
HashJoinProbeStep::FinalScan => self.final_scan(),
HashJoinProbeStep::Spill => unreachable!("{:?}", self.step),
HashJoinProbeStep::Spill => self.spill(),
}
}

Expand Down
1 change: 1 addition & 0 deletions src/query/service/src/spillers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

mod spiller;
mod spiller_buffer;

pub use spiller::Spiller;
pub use spiller::SpillerConfig;
Expand Down
30 changes: 27 additions & 3 deletions src/query/service/src/spillers/spiller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use databend_common_expression::DataBlock;
use opendal::Operator;

use crate::sessions::QueryContext;
use crate::spillers::spiller_buffer::SpillerBuffer;

/// Spiller type, currently only supports HashJoin
#[derive(Clone, Debug, Eq, PartialEq)]
Expand Down Expand Up @@ -72,6 +73,7 @@ pub struct Spiller {
operator: Operator,
config: SpillerConfig,
_spiller_type: SpillerType,
spiller_buffer: SpillerBuffer,
pub join_spilling_partition_bits: usize,
/// 1 partition -> N partition files
pub partition_location: HashMap<u8, Vec<String>>,
Expand All @@ -93,6 +95,7 @@ impl Spiller {
operator,
config,
_spiller_type: spiller_type,
spiller_buffer: SpillerBuffer::create(),
join_spilling_partition_bits,
partition_location: Default::default(),
columns_layout: Default::default(),
Expand Down Expand Up @@ -191,7 +194,6 @@ impl Spiller {
}

#[async_backtrace::framed]
// Directly spill input data without buffering.
// Need to compute hashes for data block advanced.
// For probe, only need to spill rows in build spilled partitions.
// For left-related join, need to record rows not in build spilled partitions.
Expand Down Expand Up @@ -234,8 +236,12 @@ impl Spiller {
&block_row_indexes,
row_indexes.len(),
);
// Spill block with partition id
self.spill_with_partition(*p_id, block).await?;
if let Some((p_id, block)) = self
.spiller_buffer
.add_partition_unspilled_data(*p_id, block)?
{
self.spill_with_partition(p_id, block).await?;
}
}
if !left_related_join {
return Ok(None);
Expand All @@ -251,6 +257,24 @@ impl Spiller {
)))
}

// Spill the data in the buffer at the end of spilling
pub(crate) async fn spill_buffer(&mut self) -> Result<()> {
let partition_unspilled_data = self.spiller_buffer.partition_unspilled_data();
for (partition_id, blocks) in partition_unspilled_data.iter() {
if !blocks.is_empty() {
let merged_block = DataBlock::concat(blocks)?;
self.spill_with_partition(*partition_id, merged_block)
.await?;
}
}
self.spiller_buffer.reset();
Ok(())
}

pub(crate) fn empty_buffer(&self) -> bool {
self.spiller_buffer.empty()
}

pub(crate) fn spilled_files(&self) -> Vec<String> {
self.columns_layout.keys().cloned().collect()
}
Expand Down
84 changes: 84 additions & 0 deletions src/query/service/src/spillers/spiller_buffer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::DataBlock;

// The spiller buffer will record each partition's unspilled data.
// When the buffer is full(>=8MB), it will pick the partition with the most unspilled data to spill.
#[derive(Clone)]
pub(crate) struct SpillerBuffer {
partition_unspilled_data: HashMap<u8, Vec<DataBlock>>,
buffer_size: usize,
}

impl SpillerBuffer {
pub(crate) fn create() -> Self {
SpillerBuffer {
partition_unspilled_data: HashMap::new(),
buffer_size: 0,
}
}

// Add a partition's unspilled data to the buffer
// The method will check if the buffer is full, if so, it will spill the partition with the most unspilled data
// After spilling, the buffer will be clear the spilled partition's unspilled data
// The return value is the partition id and the spilled data
pub(crate) fn add_partition_unspilled_data(
&mut self,
partition_id: u8,
data: DataBlock,
) -> Result<Option<(u8, DataBlock)>> {
fn rows(data: &[DataBlock]) -> usize {
data.iter().map(|block| block.num_rows()).sum()
}
let data_size = data.memory_size();
self.buffer_size += data_size;
self.partition_unspilled_data
.entry(partition_id)
.or_default()
.push(data);
if self.buffer_size >= 8 * 1024 * 1024 {
// Pick the partition with the most unspilled data in `partition_unspilled_data`
let (partition_id, blocks) = self
.partition_unspilled_data
.iter()
.max_by_key(|(_, v)| rows(v))
.map(|(k, v)| (*k, v.clone()))
.ok_or_else(|| ErrorCode::Internal("No unspilled data in the buffer"))?;
debug_assert!(!blocks.is_empty());
self.partition_unspilled_data.remove(&partition_id);
let merged_block = DataBlock::concat(&blocks)?;
self.buffer_size -= merged_block.memory_size();
return Ok(Some((partition_id, merged_block)));
}
Ok(None)
}

pub(crate) fn empty(&self) -> bool {
self.buffer_size == 0
}

pub(crate) fn partition_unspilled_data(&self) -> HashMap<u8, Vec<DataBlock>> {
self.partition_unspilled_data.clone()
}

pub(crate) fn reset(&mut self) {
self.partition_unspilled_data.clear();
self.buffer_size = 0;
}
}

0 comments on commit 31a63b1

Please sign in to comment.