diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 78da4dc9c53f..b931191f4b91 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -62,6 +62,7 @@ hashbrown = { workspace = true } indexmap = { workspace = true } itertools = { workspace = true, features = ["use_std"] } log = { workspace = true } +num = "0.2.0" once_cell = "1.18.0" parking_lot = { workspace = true } pin-project-lite = "^0.2.7" diff --git a/datafusion/physical-plan/src/coalesce/filter.rs b/datafusion/physical-plan/src/coalesce/filter.rs new file mode 100644 index 000000000000..2ecac841c03f --- /dev/null +++ b/datafusion/physical-plan/src/coalesce/filter.rs @@ -0,0 +1,750 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// COPIED FROM https://github.com/apache/arrow-rs/blob/master/arrow-select/src/filter.rs +// The idea is to expand the API +// to allow multiple input arrays to be filtered into a single output array + +//! Defines filter kernels + +use arrow::array::{ArrayData, ArrayDataBuilder, MutableArrayData}; +use arrow_array::builder::BooleanBufferBuilder; +use arrow_array::cast::AsArray; +use arrow_array::types::{ + ArrowDictionaryKeyType, ArrowPrimitiveType, ByteArrayType, ByteViewType, + RunEndIndexType, +}; +use arrow_array::*; +use arrow_buffer::bit_iterator::{BitIndexIterator, BitSliceIterator}; +use arrow_buffer::{bit_util, ArrowNativeType, BooleanBuffer, NullBuffer, RunEndBuffer}; +use arrow_buffer::{Buffer, MutableBuffer}; +use arrow_schema::*; +use std::ops::AddAssign; +use std::sync::Arc; + +/// If the filter selects more than this fraction of rows, use +/// [`SlicesIterator`] to copy ranges of values. Otherwise iterate +/// over individual rows using [`IndexIterator`] +/// +/// Threshold of 0.8 chosen based on +/// +const FILTER_SLICES_SELECTIVITY_THRESHOLD: f64 = 0.8; + +/// An iterator of `(usize, usize)` each representing an interval +/// `[start, end)` whose slots of a bitmap [Buffer] are true. Each +/// interval corresponds to a contiguous region of memory to be +/// "taken" from an array to be filtered. +/// +/// ## Notes: +/// +/// 1. Ignores the validity bitmap (ignores nulls) +/// +/// 2. Only performant for filters that copy across long contiguous runs +#[derive(Debug)] +pub struct SlicesIterator<'a>(BitSliceIterator<'a>); + +impl<'a> SlicesIterator<'a> { + pub fn new(filter: &'a BooleanArray) -> Self { + Self(filter.values().set_slices()) + } +} + +impl<'a> Iterator for SlicesIterator<'a> { + type Item = (usize, usize); + + fn next(&mut self) -> Option { + self.0.next() + } +} + +/// An iterator of `usize` whose index in [`BooleanArray`] is true +/// +/// This provides the best performance on most predicates, apart from those which keep +/// large runs and therefore favour [`SlicesIterator`] +struct IndexIterator<'a> { + remaining: usize, + iter: BitIndexIterator<'a>, +} + +impl<'a> IndexIterator<'a> { + fn new(filter: &'a BooleanArray, remaining: usize) -> Self { + assert_eq!(filter.null_count(), 0); + let iter = filter.values().set_indices(); + Self { remaining, iter } + } +} + +impl<'a> Iterator for IndexIterator<'a> { + type Item = usize; + + fn next(&mut self) -> Option { + if self.remaining != 0 { + // Fascinatingly swapping these two lines around results in a 50% + // performance regression for some benchmarks + let next = self.iter.next().expect("IndexIterator exhausted early"); + self.remaining -= 1; + // Must panic if exhausted early as trusted length iterator + return Some(next); + } + None + } + + fn size_hint(&self) -> (usize, Option) { + (self.remaining, Some(self.remaining)) + } +} + +/// Counts the number of set bits in `filter` +fn filter_count(filter: &BooleanArray) -> usize { + filter.values().count_set_bits() +} + +/// Function that can filter arbitrary arrays +/// +/// Deprecated: Use [`FilterPredicate`] instead +#[deprecated] +pub type Filter<'a> = Box ArrayData + 'a>; + +/// Returns a prepared function optimized to filter multiple arrays. +/// Creating this function requires time, but using it is faster than [arrow::compute::filter] when the +/// same filter needs to be applied to multiple arrays (e.g. a multi-column `RecordBatch`). +/// WARNING: the nulls of `filter` are ignored and the value on its slot is considered. +/// Therefore, it is considered undefined behavior to pass `filter` with null values. +/// +/// Deprecated: Use [`FilterBuilder`] instead +#[deprecated] +#[allow(deprecated)] +pub fn build_filter(filter: &BooleanArray) -> Result { + let iter = SlicesIterator::new(filter); + let filter_count = filter_count(filter); + let chunks = iter.collect::>(); + + Ok(Box::new(move |array: &ArrayData| { + match filter_count { + // return all + len if len == array.len() => array.clone(), + 0 => ArrayData::new_empty(array.data_type()), + _ => { + let mut mutable = MutableArrayData::new(vec![array], false, filter_count); + chunks + .iter() + .for_each(|(start, end)| mutable.extend(0, *start, *end)); + mutable.freeze() + } + } + })) +} + +/// Remove null values by do a bitmask AND operation with null bits and the boolean bits. +pub fn prep_null_mask_filter(filter: &BooleanArray) -> BooleanArray { + let nulls = filter.nulls().unwrap(); + let mask = filter.values() & nulls.inner(); + BooleanArray::new(mask, None) +} + +/// Filters an [Array], returning elements matching the filter (i.e. where the values are true). +/// +/// # Example +/// ```rust +/// # use arrow_array::{Int32Array, BooleanArray}; +/// # use arrow_select::filter::filter; +/// let array = Int32Array::from(vec![5, 6, 7, 8, 9]); +/// let filter_array = BooleanArray::from(vec![true, false, false, true, false]); +/// let c = filter(&array, &filter_array).unwrap(); +/// let c = c.as_any().downcast_ref::().unwrap(); +/// assert_eq!(c, &Int32Array::from(vec![5, 8])); +/// ``` +pub fn filter( + values: &dyn Array, + predicate: &BooleanArray, +) -> Result { + let predicate = FilterBuilder::new(predicate).build(); + filter_array(values, &predicate) +} + +/// Returns a new [RecordBatch] with arrays containing only values matching the filter. +pub fn filter_record_batch( + record_batch: &RecordBatch, + predicate: &BooleanArray, +) -> Result { + let mut filter_builder = FilterBuilder::new(predicate); + if record_batch.num_columns() > 1 { + // Only optimize if filtering more than one column + filter_builder = filter_builder.optimize(); + } + let filter = filter_builder.build(); + + let filtered_arrays = record_batch + .columns() + .iter() + .map(|a| filter_array(a, &filter)) + .collect::, _>>()?; + let options = RecordBatchOptions::default().with_row_count(Some(filter.count())); + RecordBatch::try_new_with_options(record_batch.schema(), filtered_arrays, &options) +} + +/// A builder to construct [`FilterPredicate`] +#[derive(Debug)] +pub struct FilterBuilder { + filter: BooleanArray, + count: usize, + strategy: IterationStrategy, +} + +impl FilterBuilder { + /// Create a new [`FilterBuilder`] that can be used to construct a [`FilterPredicate`] + pub fn new(filter: &BooleanArray) -> Self { + let filter = match filter.null_count() { + 0 => filter.clone(), + _ => arrow::compute::prep_null_mask_filter(filter), + }; + + let count = filter_count(&filter); + let strategy = IterationStrategy::default_strategy(filter.len(), count); + + Self { + filter, + count, + strategy, + } + } + + /// Compute an optimised representation of the provided `filter` mask that can be + /// applied to an array more quickly. + /// + /// Note: There is limited benefit to calling this to then filter a single array + /// Note: This will likely have a larger memory footprint than the original mask + pub fn optimize(mut self) -> Self { + match self.strategy { + IterationStrategy::SlicesIterator => { + let slices = SlicesIterator::new(&self.filter).collect(); + self.strategy = IterationStrategy::Slices(slices) + } + IterationStrategy::IndexIterator => { + let indices = IndexIterator::new(&self.filter, self.count).collect(); + self.strategy = IterationStrategy::Indices(indices) + } + _ => {} + } + self + } + + /// Construct the final `FilterPredicate` + pub fn build(self) -> FilterPredicate { + FilterPredicate { + filter: self.filter, + count: self.count, + strategy: self.strategy, + } + } +} + +/// The iteration strategy used to evaluate [`FilterPredicate`] +#[derive(Debug)] +enum IterationStrategy { + /// A lazily evaluated iterator of ranges + SlicesIterator, + /// A lazily evaluated iterator of indices + IndexIterator, + /// A precomputed list of indices + Indices(Vec), + /// A precomputed array of ranges + Slices(Vec<(usize, usize)>), + /// Select all rows + All, + /// Select no rows + None, +} + +impl IterationStrategy { + /// The default [`IterationStrategy`] for a filter of length `filter_length` + /// and selecting `filter_count` rows + fn default_strategy(filter_length: usize, filter_count: usize) -> Self { + if filter_length == 0 || filter_count == 0 { + return IterationStrategy::None; + } + + if filter_count == filter_length { + return IterationStrategy::All; + } + + // Compute the selectivity of the predicate by dividing the number of true + // bits in the predicate by the predicate's total length + // + // This can then be used as a heuristic for the optimal iteration strategy + let selectivity_frac = filter_count as f64 / filter_length as f64; + if selectivity_frac > FILTER_SLICES_SELECTIVITY_THRESHOLD { + return IterationStrategy::SlicesIterator; + } + IterationStrategy::IndexIterator + } +} + +/// A filtering predicate that can be applied to an [`Array`] +#[derive(Debug)] +pub struct FilterPredicate { + filter: BooleanArray, + count: usize, + strategy: IterationStrategy, +} + +impl FilterPredicate { + /// Selects rows from `values` based on this [`FilterPredicate`] + pub fn filter(&self, values: &dyn Array) -> Result { + filter_array(values, self) + } + + /// Number of rows being selected based on this [`FilterPredicate`] + pub fn count(&self) -> usize { + self.count + } +} + +pub fn filter_array( + values: &dyn Array, + predicate: &FilterPredicate, +) -> Result { + if predicate.filter.len() > values.len() { + return Err(ArrowError::InvalidArgumentError(format!( + "Filter predicate of length {} is larger than target array of length {}", + predicate.filter.len(), + values.len() + ))); + } + + match predicate.strategy { + IterationStrategy::None => Ok(new_empty_array(values.data_type())), + IterationStrategy::All => Ok(values.slice(0, predicate.count)), + // actually filter + _ => downcast_primitive_array! { + values => Ok(Arc::new(filter_primitive(values, predicate))), + DataType::Boolean => { + let values = values.as_any().downcast_ref::().unwrap(); + Ok(Arc::new(filter_boolean(values, predicate))) + } + DataType::Utf8 => { + Ok(Arc::new(filter_bytes(values.as_string::(), predicate))) + } + DataType::LargeUtf8 => { + Ok(Arc::new(filter_bytes(values.as_string::(), predicate))) + } + DataType::Utf8View => { + Ok(Arc::new(filter_byte_view(values.as_string_view(), predicate))) + } + DataType::Binary => { + Ok(Arc::new(filter_bytes(values.as_binary::(), predicate))) + } + DataType::LargeBinary => { + Ok(Arc::new(filter_bytes(values.as_binary::(), predicate))) + } + DataType::BinaryView => { + Ok(Arc::new(filter_byte_view(values.as_binary_view(), predicate))) + } + DataType::RunEndEncoded(_, _) => { + downcast_run_array!{ + values => Ok(Arc::new(filter_run_end_array(values, predicate)?)), + t => unimplemented!("Filter not supported for RunEndEncoded type {:?}", t) + } + } + DataType::Dictionary(_, _) => downcast_dictionary_array! { + values => Ok(Arc::new(filter_dict(values, predicate))), + t => unimplemented!("Filter not supported for dictionary type {:?}", t) + } + _ => { + let data = values.to_data(); + // fallback to using MutableArrayData + let mut mutable = MutableArrayData::new( + vec![&data], + false, + predicate.count, + ); + + match &predicate.strategy { + IterationStrategy::Slices(slices) => { + slices + .iter() + .for_each(|(start, end)| mutable.extend(0, *start, *end)); + } + _ => { + let iter = SlicesIterator::new(&predicate.filter); + iter.for_each(|(start, end)| mutable.extend(0, start, end)); + } + } + + let data = mutable.freeze(); + Ok(make_array(data)) + } + }, + } +} + +/// Filter any supported [`RunArray`] based on a [`FilterPredicate`] +fn filter_run_end_array( + re_arr: &RunArray, + pred: &FilterPredicate, +) -> Result, ArrowError> +where + R::Native: Into + From, + R::Native: AddAssign, +{ + let run_ends: &RunEndBuffer = re_arr.run_ends(); + let mut values_filter = BooleanBufferBuilder::new(run_ends.len()); + let mut new_run_ends = vec![R::default_value(); run_ends.len()]; + + let mut start = 0i64; + let mut i = 0; + let filter_values = pred.filter.values(); + let mut count = R::default_value(); + + for end in run_ends.inner().into_iter().map(|i| (*i).into()) { + let mut keep = false; + // in filter_array the predicate array is checked to have the same len as the run end array + // this means the largest value in the run_ends is == to pred.len() + // so we're always within bounds when calling value_unchecked + for pred in + (start..end).map(|i| unsafe { filter_values.value_unchecked(i as usize) }) + { + count += R::Native::from(pred); + keep |= pred + } + // this is to avoid branching + new_run_ends[i] = count; + i += keep as usize; + + values_filter.append(keep); + start = end; + } + + new_run_ends.truncate(i); + + if values_filter.is_empty() { + new_run_ends.clear(); + } + + let values = re_arr.values(); + let pred = BooleanArray::new(values_filter.finish(), None); + let values = arrow::compute::filter(&values, &pred)?; + + let run_ends = PrimitiveArray::::new(new_run_ends.into(), None); + RunArray::try_new(&run_ends, &values) +} + +/// Computes a new null mask for `data` based on `predicate` +/// +/// If the predicate selected no null-rows, returns `None`, otherwise returns +/// `Some((null_count, null_buffer))` where `null_count` is the number of nulls +/// in the filtered output, and `null_buffer` is the filtered null buffer +/// +fn filter_null_mask( + nulls: Option<&NullBuffer>, + predicate: &FilterPredicate, +) -> Option<(usize, Buffer)> { + let nulls = nulls?; + if nulls.null_count() == 0 { + return None; + } + + let nulls = filter_bits(nulls.inner(), predicate); + // The filtered `nulls` has a length of `predicate.count` bits and + // therefore the null count is this minus the number of valid bits + let null_count = predicate.count - nulls.count_set_bits_offset(0, predicate.count); + + if null_count == 0 { + return None; + } + + Some((null_count, nulls)) +} + +/// Filter the packed bitmask `buffer`, with `predicate` starting at bit offset `offset` +fn filter_bits(buffer: &BooleanBuffer, predicate: &FilterPredicate) -> Buffer { + let src = buffer.values(); + let offset = buffer.offset(); + + match &predicate.strategy { + IterationStrategy::IndexIterator => { + let bits = IndexIterator::new(&predicate.filter, predicate.count) + .map(|src_idx| bit_util::get_bit(src, src_idx + offset)); + + // SAFETY: `IndexIterator` reports its size correctly + unsafe { MutableBuffer::from_trusted_len_iter_bool(bits).into() } + } + IterationStrategy::Indices(indices) => { + let bits = indices + .iter() + .map(|src_idx| bit_util::get_bit(src, *src_idx + offset)); + + // SAFETY: `Vec::iter()` reports its size correctly + unsafe { MutableBuffer::from_trusted_len_iter_bool(bits).into() } + } + IterationStrategy::SlicesIterator => { + let mut builder = + BooleanBufferBuilder::new(bit_util::ceil(predicate.count, 8)); + for (start, end) in SlicesIterator::new(&predicate.filter) { + builder.append_packed_range(start + offset..end + offset, src) + } + builder.into() + } + IterationStrategy::Slices(slices) => { + let mut builder = + BooleanBufferBuilder::new(bit_util::ceil(predicate.count, 8)); + for (start, end) in slices { + builder.append_packed_range(*start + offset..*end + offset, src) + } + builder.into() + } + IterationStrategy::All | IterationStrategy::None => unreachable!(), + } +} + +/// `filter` implementation for boolean buffers +fn filter_boolean(array: &BooleanArray, predicate: &FilterPredicate) -> BooleanArray { + let values = filter_bits(array.values(), predicate); + + let mut builder = ArrayDataBuilder::new(DataType::Boolean) + .len(predicate.count) + .add_buffer(values); + + if let Some((null_count, nulls)) = filter_null_mask(array.nulls(), predicate) { + builder = builder.null_count(null_count).null_bit_buffer(Some(nulls)); + } + + let data = unsafe { builder.build_unchecked() }; + BooleanArray::from(data) +} + +#[inline(never)] +fn filter_native( + values: &[T], + predicate: &FilterPredicate, +) -> Buffer { + assert!(values.len() >= predicate.filter.len()); + + let buffer = match &predicate.strategy { + IterationStrategy::SlicesIterator => { + let mut buffer = + MutableBuffer::with_capacity(predicate.count * T::get_byte_width()); + for (start, end) in SlicesIterator::new(&predicate.filter) { + buffer.extend_from_slice(&values[start..end]); + } + buffer + } + IterationStrategy::Slices(slices) => { + let mut buffer = + MutableBuffer::with_capacity(predicate.count * T::get_byte_width()); + for (start, end) in slices { + buffer.extend_from_slice(&values[*start..*end]); + } + buffer + } + IterationStrategy::IndexIterator => { + let iter = + IndexIterator::new(&predicate.filter, predicate.count).map(|x| values[x]); + + // SAFETY: IndexIterator is trusted length + unsafe { MutableBuffer::from_trusted_len_iter(iter) } + } + IterationStrategy::Indices(indices) => { + let iter = indices.iter().map(|x| values[*x]); + + // SAFETY: `Vec::iter` is trusted length + unsafe { MutableBuffer::from_trusted_len_iter(iter) } + } + IterationStrategy::All | IterationStrategy::None => unreachable!(), + }; + + buffer.into() +} + +/// `filter` implementation for primitive arrays +pub(crate) fn filter_primitive( + array: &PrimitiveArray, + predicate: &FilterPredicate, +) -> PrimitiveArray +where + T: ArrowPrimitiveType, +{ + let values = array.values(); + let buffer = filter_native(values, predicate); + let mut builder = ArrayDataBuilder::new(array.data_type().clone()) + .len(predicate.count) + .add_buffer(buffer); + + if let Some((null_count, nulls)) = filter_null_mask(array.nulls(), predicate) { + builder = builder.null_count(null_count).null_bit_buffer(Some(nulls)); + } + + let data = unsafe { builder.build_unchecked() }; + PrimitiveArray::from(data) +} + +/// [`FilterBytes`] is created from a source [`GenericByteArray`] and can be +/// used to build a new [`GenericByteArray`] by copying values from the source +/// +/// TODO(raphael): Could this be used for the take kernel as well? +struct FilterBytes<'a, OffsetSize> { + src_offsets: &'a [OffsetSize], + src_values: &'a [u8], + dst_offsets: MutableBuffer, + dst_values: MutableBuffer, + cur_offset: OffsetSize, +} + +impl<'a, OffsetSize> FilterBytes<'a, OffsetSize> +where + OffsetSize: OffsetSizeTrait, +{ + fn new(capacity: usize, array: &'a GenericByteArray) -> Self + where + T: ByteArrayType, + { + let num_offsets_bytes = (capacity + 1) * std::mem::size_of::(); + let mut dst_offsets = MutableBuffer::new(num_offsets_bytes); + let dst_values = MutableBuffer::new(0); + let cur_offset = OffsetSize::from_usize(0).unwrap(); + dst_offsets.push(cur_offset); + + Self { + src_offsets: array.value_offsets(), + src_values: array.value_data(), + dst_offsets, + dst_values, + cur_offset, + } + } + + /// Returns the byte offset at `idx` + #[inline] + fn get_value_offset(&self, idx: usize) -> usize { + self.src_offsets[idx].as_usize() + } + + /// Returns the start and end of the value at index `idx` along with its length + #[inline] + fn get_value_range(&self, idx: usize) -> (usize, usize, OffsetSize) { + // These can only fail if `array` contains invalid data + let start = self.get_value_offset(idx); + let end = self.get_value_offset(idx + 1); + let len = OffsetSize::from_usize(end - start).expect("illegal offset range"); + (start, end, len) + } + + /// Extends the in-progress array by the indexes in the provided iterator + fn extend_idx(&mut self, iter: impl Iterator) { + for idx in iter { + let (start, end, len) = self.get_value_range(idx); + self.cur_offset += len; + self.dst_offsets.push(self.cur_offset); + self.dst_values + .extend_from_slice(&self.src_values[start..end]); + } + } + + /// Extends the in-progress array by the ranges in the provided iterator + fn extend_slices(&mut self, iter: impl Iterator) { + for (start, end) in iter { + // These can only fail if `array` contains invalid data + for idx in start..end { + let (_, _, len) = self.get_value_range(idx); + self.cur_offset += len; + self.dst_offsets.push(self.cur_offset); // push_unchecked? + } + + let value_start = self.get_value_offset(start); + let value_end = self.get_value_offset(end); + self.dst_values + .extend_from_slice(&self.src_values[value_start..value_end]); + } + } +} + +/// `filter` implementation for byte arrays +/// +/// Note: NULLs with a non-zero slot length in `array` will have the corresponding +/// data copied across. This allows handling the null mask separately from the data +fn filter_bytes( + array: &GenericByteArray, + predicate: &FilterPredicate, +) -> GenericByteArray +where + T: ByteArrayType, +{ + let mut filter = FilterBytes::new(predicate.count, array); + + match &predicate.strategy { + IterationStrategy::SlicesIterator => { + filter.extend_slices(SlicesIterator::new(&predicate.filter)) + } + IterationStrategy::Slices(slices) => filter.extend_slices(slices.iter().cloned()), + IterationStrategy::IndexIterator => { + filter.extend_idx(IndexIterator::new(&predicate.filter, predicate.count)) + } + IterationStrategy::Indices(indices) => filter.extend_idx(indices.iter().cloned()), + IterationStrategy::All | IterationStrategy::None => unreachable!(), + } + + let mut builder = ArrayDataBuilder::new(T::DATA_TYPE) + .len(predicate.count) + .add_buffer(filter.dst_offsets.into()) + .add_buffer(filter.dst_values.into()); + + if let Some((null_count, nulls)) = filter_null_mask(array.nulls(), predicate) { + builder = builder.null_count(null_count).null_bit_buffer(Some(nulls)); + } + + let data = unsafe { builder.build_unchecked() }; + GenericByteArray::from(data) +} + +/// `filter` implementation for byte view arrays. +fn filter_byte_view( + array: &GenericByteViewArray, + predicate: &FilterPredicate, +) -> GenericByteViewArray { + let new_view_buffer = filter_native(array.views(), predicate); + + let mut builder = ArrayDataBuilder::new(T::DATA_TYPE) + .len(predicate.count) + .add_buffer(new_view_buffer) + .add_buffers(array.data_buffers().to_vec()); + + if let Some((null_count, nulls)) = filter_null_mask(array.nulls(), predicate) { + builder = builder.null_count(null_count).null_bit_buffer(Some(nulls)); + } + + GenericByteViewArray::from(unsafe { builder.build_unchecked() }) +} + +/// `filter` implementation for dictionaries +fn filter_dict( + array: &DictionaryArray, + predicate: &FilterPredicate, +) -> DictionaryArray +where + T: ArrowDictionaryKeyType, + T::Native: num::Num, +{ + let builder = filter_primitive::(array.keys(), predicate) + .into_data() + .into_builder() + .data_type(array.data_type().clone()) + .child_data(vec![array.values().to_data()]); + + // SAFETY: + // Keys were valid before, filtered subset is therefore still valid + DictionaryArray::from(unsafe { builder.build_unchecked() }) +} diff --git a/datafusion/physical-plan/src/coalesce/mod.rs b/datafusion/physical-plan/src/coalesce/mod.rs new file mode 100644 index 000000000000..5befa5ecda99 --- /dev/null +++ b/datafusion/physical-plan/src/coalesce/mod.rs @@ -0,0 +1,588 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::compute::concat_batches; +use arrow_array::builder::StringViewBuilder; +use arrow_array::cast::AsArray; +use arrow_array::{Array, ArrayRef, RecordBatch}; +use arrow_schema::SchemaRef; +use std::sync::Arc; + +/// Concatenate multiple [`RecordBatch`]es +/// +/// `BatchCoalescer` concatenates multiple small [`RecordBatch`]es, produced by +/// operations such as `FilterExec` and `RepartitionExec`, into larger ones for +/// more efficient processing by subsequent operations. +/// +/// # Background +/// +/// Generally speaking, larger [`RecordBatch`]es are more efficient to process +/// than smaller record batches (until the CPU cache is exceeded) because there +/// is fixed processing overhead per batch. DataFusion tries to operate on +/// batches of `target_batch_size` rows to amortize this overhead +/// +/// ```text +/// ┌────────────────────┐ +/// │ RecordBatch │ +/// │ num_rows = 23 │ +/// └────────────────────┘ ┌────────────────────┐ +/// │ │ +/// ┌────────────────────┐ Coalesce │ │ +/// │ │ Batches │ │ +/// │ RecordBatch │ │ │ +/// │ num_rows = 50 │ ─ ─ ─ ─ ─ ─ ▶ │ │ +/// │ │ │ RecordBatch │ +/// │ │ │ num_rows = 106 │ +/// └────────────────────┘ │ │ +/// │ │ +/// ┌────────────────────┐ │ │ +/// │ │ │ │ +/// │ RecordBatch │ │ │ +/// │ num_rows = 33 │ └────────────────────┘ +/// │ │ +/// └────────────────────┘ +/// ``` +/// +/// # Notes: +/// +/// 1. Output rows are produced in the same order as the input rows +/// +/// 2. The output is a sequence of batches, with all but the last being at least +/// `target_batch_size` rows. +/// +/// 3. Eventually this may also be able to handle other optimizations such as a +/// combined filter/coalesce operation. +/// +#[derive(Debug)] +pub struct BatchCoalescer { + /// The input schema + schema: SchemaRef, + /// Minimum number of rows for coalesces batches + target_batch_size: usize, + /// Total number of rows returned so far + total_rows: usize, + /// Buffered batches + buffer: Vec, + /// Buffered row count + buffered_rows: usize, + /// Limit: maximum number of rows to fetch, `None` means fetch all rows + fetch: Option, +} + +impl BatchCoalescer { + /// Create a new `BatchCoalescer` + /// + /// # Arguments + /// - `schema` - the schema of the output batches + /// - `target_batch_size` - the minimum number of rows for each + /// output batch (until limit reached) + /// - `fetch` - the maximum number of rows to fetch, `None` means fetch all rows + pub fn new( + schema: SchemaRef, + target_batch_size: usize, + fetch: Option, + ) -> Self { + Self { + schema, + target_batch_size, + total_rows: 0, + buffer: vec![], + buffered_rows: 0, + fetch, + } + } + + /// Return the schema of the output batches + pub fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + /// Push next batch, and returns [`CoalescerState`] indicating the current + /// state of the buffer. + pub fn push_batch(&mut self, batch: RecordBatch) -> CoalescerState { + let batch = gc_string_view_batch(&batch); + if self.limit_reached(&batch) { + CoalescerState::LimitReached + } else if self.target_reached(batch) { + CoalescerState::TargetReached + } else { + CoalescerState::Continue + } + } + + /// Return true if the there is no data buffered + pub fn is_empty(&self) -> bool { + self.buffer.is_empty() + } + + /// Checks if the buffer will reach the specified limit after getting + /// `batch`. + /// + /// If fetch would be exceeded, slices the received batch, updates the + /// buffer with it, and returns `true`. + /// + /// Otherwise: does nothing and returns `false`. + fn limit_reached(&mut self, batch: &RecordBatch) -> bool { + match self.fetch { + Some(fetch) if self.total_rows + batch.num_rows() >= fetch => { + // Limit is reached + let remaining_rows = fetch - self.total_rows; + debug_assert!(remaining_rows > 0); + + let batch = batch.slice(0, remaining_rows); + self.buffered_rows += batch.num_rows(); + self.total_rows = fetch; + self.buffer.push(batch); + true + } + _ => false, + } + } + + /// Updates the buffer with the given batch. + /// + /// If the target batch size is reached, returns `true`. Otherwise, returns + /// `false`. + fn target_reached(&mut self, batch: RecordBatch) -> bool { + if batch.num_rows() == 0 { + false + } else { + self.total_rows += batch.num_rows(); + self.buffered_rows += batch.num_rows(); + self.buffer.push(batch); + self.buffered_rows >= self.target_batch_size + } + } + + /// Concatenates and returns all buffered batches, and clears the buffer. + pub fn finish_batch(&mut self) -> datafusion_common::Result { + let batch = concat_batches(&self.schema, &self.buffer)?; + self.buffer.clear(); + self.buffered_rows = 0; + Ok(batch) + } +} + +/// Indicates the state of the [`BatchCoalescer`] buffer after the +/// [`BatchCoalescer::push_batch()`] operation. +/// +/// The caller should take diferent actions, depending on the variant returned. +pub enum CoalescerState { + /// Neither the limit nor the target batch size is reached. + /// + /// Action: continue pushing batches. + Continue, + /// The limit has been reached. + /// + /// Action: call [`BatchCoalescer::finish_batch()`] to get the final + /// buffered results as a batch and finish the query. + LimitReached, + /// The specified minimum number of rows a batch should have is reached. + /// + /// Action: call [`BatchCoalescer::finish_batch()`] to get the current + /// buffered results as a batch and then continue pushing batches. + TargetReached, +} + +/// Heuristically compact `StringViewArray`s to reduce memory usage, if needed +/// +/// Decides when to consolidate the StringView into a new buffer to reduce +/// memory usage and improve string locality for better performance. +/// +/// This differs from `StringViewArray::gc` because: +/// 1. It may not compact the array depending on a heuristic. +/// 2. It uses a precise block size to reduce the number of buffers to track. +/// +/// # Heuristic +/// +/// If the average size of each view is larger than 32 bytes, we compact the array. +/// +/// `StringViewArray` include pointers to buffer that hold the underlying data. +/// One of the great benefits of `StringViewArray` is that many operations +/// (e.g., `filter`) can be done without copying the underlying data. +/// +/// However, after a while (e.g., after `FilterExec` or `HashJoinExec`) the +/// `StringViewArray` may only refer to a small portion of the buffer, +/// significantly increasing memory usage. +fn gc_string_view_batch(batch: &RecordBatch) -> RecordBatch { + let new_columns: Vec = batch + .columns() + .iter() + .map(|c| { + // Try to re-create the `StringViewArray` to prevent holding the underlying buffer too long. + let Some(s) = c.as_string_view_opt() else { + return Arc::clone(c); + }; + let ideal_buffer_size: usize = s + .views() + .iter() + .map(|v| { + let len = (*v as u32) as usize; + if len > 12 { + len + } else { + 0 + } + }) + .sum(); + let actual_buffer_size = s.get_buffer_memory_size(); + + // Re-creating the array copies data and can be time consuming. + // We only do it if the array is sparse + if actual_buffer_size > (ideal_buffer_size * 2) { + // We set the block size to `ideal_buffer_size` so that the new StringViewArray only has one buffer, which accelerate later concat_batches. + // See https://github.com/apache/arrow-rs/issues/6094 for more details. + let mut builder = StringViewBuilder::with_capacity(s.len()); + if ideal_buffer_size > 0 { + builder = builder.with_block_size(ideal_buffer_size as u32); + } + + for v in s.iter() { + builder.append_option(v); + } + + let gc_string = builder.finish(); + + debug_assert!(gc_string.data_buffers().len() <= 1); // buffer count can be 0 if the `ideal_buffer_size` is 0 + + Arc::new(gc_string) + } else { + Arc::clone(c) + } + }) + .collect(); + RecordBatch::try_new(batch.schema(), new_columns) + .expect("Failed to re-create the gc'ed record batch") +} + +#[cfg(test)] +mod tests { + use std::ops::Range; + + use super::*; + + use arrow::datatypes::{DataType, Field, Schema}; + use arrow_array::builder::ArrayBuilder; + use arrow_array::{StringViewArray, UInt32Array}; + + #[test] + fn test_coalesce() { + let batch = uint32_batch(0..8); + Test::new() + .with_batches(std::iter::repeat(batch).take(10)) + // expected output is batches of at least 20 rows (except for the final batch) + .with_target_batch_size(21) + .with_expected_output_sizes(vec![24, 24, 24, 8]) + .run() + } + + #[test] + fn test_coalesce_with_fetch_larger_than_input_size() { + let batch = uint32_batch(0..8); + Test::new() + .with_batches(std::iter::repeat(batch).take(10)) + // input is 10 batches x 8 rows (80 rows) with fetch limit of 100 + // expected to behave the same as `test_concat_batches` + .with_target_batch_size(21) + .with_fetch(Some(100)) + .with_expected_output_sizes(vec![24, 24, 24, 8]) + .run(); + } + + #[test] + fn test_coalesce_with_fetch_less_than_input_size() { + let batch = uint32_batch(0..8); + Test::new() + .with_batches(std::iter::repeat(batch).take(10)) + // input is 10 batches x 8 rows (80 rows) with fetch limit of 50 + .with_target_batch_size(21) + .with_fetch(Some(50)) + .with_expected_output_sizes(vec![24, 24, 2]) + .run(); + } + + #[test] + fn test_coalesce_with_fetch_less_than_target_and_no_remaining_rows() { + let batch = uint32_batch(0..8); + Test::new() + .with_batches(std::iter::repeat(batch).take(10)) + // input is 10 batches x 8 rows (80 rows) with fetch limit of 48 + .with_target_batch_size(21) + .with_fetch(Some(48)) + .with_expected_output_sizes(vec![24, 24]) + .run(); + } + + #[test] + fn test_coalesce_with_fetch_less_target_batch_size() { + let batch = uint32_batch(0..8); + Test::new() + .with_batches(std::iter::repeat(batch).take(10)) + // input is 10 batches x 8 rows (80 rows) with fetch limit of 10 + .with_target_batch_size(21) + .with_fetch(Some(10)) + .with_expected_output_sizes(vec![10]) + .run(); + } + + #[test] + fn test_coalesce_single_large_batch_over_fetch() { + let large_batch = uint32_batch(0..100); + Test::new() + .with_batch(large_batch) + .with_target_batch_size(20) + .with_fetch(Some(7)) + .with_expected_output_sizes(vec![7]) + .run() + } + + /// Test for [`BatchCoalescer`] + /// + /// Pushes the input batches to the coalescer and verifies that the resulting + /// batches have the expected number of rows and contents. + #[derive(Debug, Clone, Default)] + struct Test { + /// Batches to feed to the coalescer. Tests must have at least one + /// schema + input_batches: Vec, + /// Expected output sizes of the resulting batches + expected_output_sizes: Vec, + /// target batch size + target_batch_size: usize, + /// Fetch (limit) + fetch: Option, + } + + impl Test { + fn new() -> Self { + Self::default() + } + + /// Set the target batch size + fn with_target_batch_size(mut self, target_batch_size: usize) -> Self { + self.target_batch_size = target_batch_size; + self + } + + /// Set the fetch (limit) + fn with_fetch(mut self, fetch: Option) -> Self { + self.fetch = fetch; + self + } + + /// Extend the input batches with `batch` + fn with_batch(mut self, batch: RecordBatch) -> Self { + self.input_batches.push(batch); + self + } + + /// Extends the input batches with `batches` + fn with_batches( + mut self, + batches: impl IntoIterator, + ) -> Self { + self.input_batches.extend(batches); + self + } + + /// Extends `sizes` to expected output sizes + fn with_expected_output_sizes( + mut self, + sizes: impl IntoIterator, + ) -> Self { + self.expected_output_sizes.extend(sizes); + self + } + + /// Runs the test -- see documentation on [`Test`] for details + fn run(self) { + let Self { + input_batches, + target_batch_size, + fetch, + expected_output_sizes, + } = self; + + let schema = input_batches[0].schema(); + + // create a single large input batch for output comparison + let single_input_batch = concat_batches(&schema, &input_batches).unwrap(); + + let mut coalescer = + BatchCoalescer::new(Arc::clone(&schema), target_batch_size, fetch); + + let mut output_batches = vec![]; + for batch in input_batches { + match coalescer.push_batch(batch) { + CoalescerState::Continue => {} + CoalescerState::LimitReached => { + output_batches.push(coalescer.finish_batch().unwrap()); + break; + } + CoalescerState::TargetReached => { + coalescer.buffered_rows = 0; + output_batches.push(coalescer.finish_batch().unwrap()); + } + } + } + if coalescer.buffered_rows != 0 { + output_batches.extend(coalescer.buffer); + } + + // make sure we got the expected number of output batches and content + let mut starting_idx = 0; + assert_eq!(expected_output_sizes.len(), output_batches.len()); + for (i, (expected_size, batch)) in + expected_output_sizes.iter().zip(output_batches).enumerate() + { + assert_eq!( + *expected_size, + batch.num_rows(), + "Unexpected number of rows in Batch {i}" + ); + + // compare the contents of the batch (using `==` compares the + // underlying memory layout too) + let expected_batch = + single_input_batch.slice(starting_idx, *expected_size); + let batch_strings = batch_to_pretty_strings(&batch); + let expected_batch_strings = batch_to_pretty_strings(&expected_batch); + let batch_strings = batch_strings.lines().collect::>(); + let expected_batch_strings = + expected_batch_strings.lines().collect::>(); + assert_eq!( + expected_batch_strings, batch_strings, + "Unexpected content in Batch {i}:\ + \n\nExpected:\n{expected_batch_strings:#?}\n\nActual:\n{batch_strings:#?}" + ); + starting_idx += *expected_size; + } + } + } + + /// Return a batch of UInt32 with the specified range + fn uint32_batch(range: Range) -> RecordBatch { + let schema = + Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)])); + + RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(UInt32Array::from_iter_values(range))], + ) + .unwrap() + } + + #[test] + fn test_gc_string_view_batch_small_no_compact() { + // view with only short strings (no buffers) --> no need to compact + let array = StringViewTest { + rows: 1000, + strings: vec![Some("a"), Some("b"), Some("c")], + } + .build(); + + let gc_array = do_gc(array.clone()); + compare_string_array_values(&array, &gc_array); + assert_eq!(array.data_buffers().len(), 0); + assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction + } + + #[test] + fn test_gc_string_view_batch_large_no_compact() { + // view with large strings (has buffers) but full --> no need to compact + let array = StringViewTest { + rows: 1000, + strings: vec![Some("This string is longer than 12 bytes")], + } + .build(); + + let gc_array = do_gc(array.clone()); + compare_string_array_values(&array, &gc_array); + assert_eq!(array.data_buffers().len(), 5); + assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction + } + + #[test] + fn test_gc_string_view_batch_large_slice_compact() { + // view with large strings (has buffers) and only partially used --> no need to compact + let array = StringViewTest { + rows: 1000, + strings: vec![Some("this string is longer than 12 bytes")], + } + .build(); + + // slice only 11 rows, so most of the buffer is not used + let array = array.slice(11, 22); + + let gc_array = do_gc(array.clone()); + compare_string_array_values(&array, &gc_array); + assert_eq!(array.data_buffers().len(), 5); + assert_eq!(gc_array.data_buffers().len(), 1); // compacted into a single buffer + } + + /// Compares the values of two string view arrays + fn compare_string_array_values(arr1: &StringViewArray, arr2: &StringViewArray) { + assert_eq!(arr1.len(), arr2.len()); + for (s1, s2) in arr1.iter().zip(arr2.iter()) { + assert_eq!(s1, s2); + } + } + + /// runs garbage collection on string view array + /// and ensures the number of rows are the same + fn do_gc(array: StringViewArray) -> StringViewArray { + let batch = + RecordBatch::try_from_iter(vec![("a", Arc::new(array) as ArrayRef)]).unwrap(); + let gc_batch = gc_string_view_batch(&batch); + assert_eq!(batch.num_rows(), gc_batch.num_rows()); + assert_eq!(batch.schema(), gc_batch.schema()); + gc_batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + .clone() + } + + /// Describes parameters for creating a `StringViewArray` + struct StringViewTest { + /// The number of rows in the array + rows: usize, + /// The strings to use in the array (repeated over and over + strings: Vec>, + } + + impl StringViewTest { + /// Create a `StringViewArray` with the parameters specified in this struct + fn build(self) -> StringViewArray { + let mut builder = StringViewBuilder::with_capacity(100).with_block_size(8192); + loop { + for &v in self.strings.iter() { + builder.append_option(v); + if builder.len() >= self.rows { + return builder.finish(); + } + } + } + } + } + fn batch_to_pretty_strings(batch: &RecordBatch) -> String { + arrow::util::pretty::pretty_format_batches(&[batch.clone()]) + .unwrap() + .to_string() + } +} diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index 5589027694fe..7caf5b8ab65a 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -28,19 +28,17 @@ use crate::{ DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream, }; -use arrow::array::{AsArray, StringViewBuilder}; -use arrow::compute::concat_batches; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; -use arrow_array::{Array, ArrayRef}; use datafusion_common::Result; use datafusion_execution::TaskContext; +use crate::coalesce::{BatchCoalescer, CoalescerState}; use futures::ready; use futures::stream::{Stream, StreamExt}; /// `CoalesceBatchesExec` combines small batches into larger batches for more -/// efficient use of vectorized processing by later operators. +/// efficient vectorized processing by later operators. /// /// The operator buffers batches until it collects `target_batch_size` rows and /// then emits a single concatenated batch. When only a limited number of rows @@ -48,35 +46,7 @@ use futures::stream::{Stream, StreamExt}; /// buffering and returns the final batch once the number of collected rows /// reaches the `fetch` value. /// -/// # Background -/// -/// Generally speaking, larger RecordBatches are more efficient to process than -/// smaller record batches (until the CPU cache is exceeded) because there is -/// fixed processing overhead per batch. This code concatenates multiple small -/// record batches into larger ones to amortize this overhead. -/// -/// ```text -/// ┌────────────────────┐ -/// │ RecordBatch │ -/// │ num_rows = 23 │ -/// └────────────────────┘ ┌────────────────────┐ -/// │ │ -/// ┌────────────────────┐ Coalesce │ │ -/// │ │ Batches │ │ -/// │ RecordBatch │ │ │ -/// │ num_rows = 50 │ ─ ─ ─ ─ ─ ─ ▶ │ │ -/// │ │ │ RecordBatch │ -/// │ │ │ num_rows = 106 │ -/// └────────────────────┘ │ │ -/// │ │ -/// ┌────────────────────┐ │ │ -/// │ │ │ │ -/// │ RecordBatch │ │ │ -/// │ num_rows = 33 │ └────────────────────┘ -/// │ │ -/// └────────────────────┘ -/// ``` - +/// See [`BatchCoalescer`] for more information #[derive(Debug)] pub struct CoalesceBatchesExec { /// The input plan @@ -346,7 +316,7 @@ impl CoalesceBatchesStream { } CoalesceBatchesStreamState::Exhausted => { // Handle the end of the input stream. - return if self.coalescer.buffer.is_empty() { + return if self.coalescer.is_empty() { // If buffer is empty, return None indicating the stream is fully consumed. Poll::Ready(None) } else { @@ -365,511 +335,3 @@ impl RecordBatchStream for CoalesceBatchesStream { self.coalescer.schema() } } - -/// Concatenate multiple record batches into larger batches -/// -/// See [`CoalesceBatchesExec`] for more details. -/// -/// Notes: -/// -/// 1. The output rows is the same order as the input rows -/// -/// 2. The output is a sequence of batches, with all but the last being at least -/// `target_batch_size` rows. -/// -/// 3. Eventually this may also be able to handle other optimizations such as a -/// combined filter/coalesce operation. -#[derive(Debug)] -struct BatchCoalescer { - /// The input schema - schema: SchemaRef, - /// Minimum number of rows for coalesces batches - target_batch_size: usize, - /// Total number of rows returned so far - total_rows: usize, - /// Buffered batches - buffer: Vec, - /// Buffered row count - buffered_rows: usize, - /// Maximum number of rows to fetch, `None` means fetching all rows - fetch: Option, -} - -impl BatchCoalescer { - /// Create a new `BatchCoalescer` - /// - /// # Arguments - /// - `schema` - the schema of the output batches - /// - `target_batch_size` - the minimum number of rows for each - /// output batch (until limit reached) - /// - `fetch` - the maximum number of rows to fetch, `None` means fetch all rows - fn new(schema: SchemaRef, target_batch_size: usize, fetch: Option) -> Self { - Self { - schema, - target_batch_size, - total_rows: 0, - buffer: vec![], - buffered_rows: 0, - fetch, - } - } - - /// Return the schema of the output batches - fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema) - } - - /// Given a batch, it updates the buffer of [`BatchCoalescer`]. It returns - /// a variant of [`CoalescerState`] indicating the final state of the buffer. - fn push_batch(&mut self, batch: RecordBatch) -> CoalescerState { - let batch = gc_string_view_batch(&batch); - if self.limit_reached(&batch) { - CoalescerState::LimitReached - } else if self.target_reached(batch) { - CoalescerState::TargetReached - } else { - CoalescerState::Continue - } - } - - /// The function checks if the buffer can reach the specified limit after getting `batch`. - /// If it does, it slices the received batch as needed, updates the buffer with it, and - /// finally returns `true`. Otherwise; the function does nothing and returns `false`. - fn limit_reached(&mut self, batch: &RecordBatch) -> bool { - match self.fetch { - Some(fetch) if self.total_rows + batch.num_rows() >= fetch => { - // Limit is reached - let remaining_rows = fetch - self.total_rows; - debug_assert!(remaining_rows > 0); - - let batch = batch.slice(0, remaining_rows); - self.buffered_rows += batch.num_rows(); - self.total_rows = fetch; - self.buffer.push(batch); - true - } - _ => false, - } - } - - /// Updates the buffer with the given batch. If the target batch size is reached, - /// the function returns `true`. Otherwise, it returns `false`. - fn target_reached(&mut self, batch: RecordBatch) -> bool { - if batch.num_rows() == 0 { - false - } else { - self.total_rows += batch.num_rows(); - self.buffered_rows += batch.num_rows(); - self.buffer.push(batch); - self.buffered_rows >= self.target_batch_size - } - } - - /// Concatenates and returns all buffered batches, and clears the buffer. - fn finish_batch(&mut self) -> Result { - let batch = concat_batches(&self.schema, &self.buffer)?; - self.buffer.clear(); - self.buffered_rows = 0; - Ok(batch) - } -} - -/// This enumeration acts as a status indicator for the [`BatchCoalescer`] after a -/// [`BatchCoalescer::push_batch()`] operation. -enum CoalescerState { - /// Neither the limit nor the target batch size is reached. - Continue, - /// The sufficient row count to produce a complete query result is reached. - LimitReached, - /// The specified minimum number of rows a batch should have is reached. - TargetReached, -} - -/// Heuristically compact `StringViewArray`s to reduce memory usage, if needed -/// -/// This function decides when to consolidate the StringView into a new buffer -/// to reduce memory usage and improve string locality for better performance. -/// -/// This differs from `StringViewArray::gc` because: -/// 1. It may not compact the array depending on a heuristic. -/// 2. It uses a precise block size to reduce the number of buffers to track. -/// -/// # Heuristic -/// -/// If the average size of each view is larger than 32 bytes, we compact the array. -/// -/// `StringViewArray` include pointers to buffer that hold the underlying data. -/// One of the great benefits of `StringViewArray` is that many operations -/// (e.g., `filter`) can be done without copying the underlying data. -/// -/// However, after a while (e.g., after `FilterExec` or `HashJoinExec`) the -/// `StringViewArray` may only refer to a small portion of the buffer, -/// significantly increasing memory usage. -fn gc_string_view_batch(batch: &RecordBatch) -> RecordBatch { - let new_columns: Vec = batch - .columns() - .iter() - .map(|c| { - // Try to re-create the `StringViewArray` to prevent holding the underlying buffer too long. - let Some(s) = c.as_string_view_opt() else { - return Arc::clone(c); - }; - let ideal_buffer_size: usize = s - .views() - .iter() - .map(|v| { - let len = (*v as u32) as usize; - if len > 12 { - len - } else { - 0 - } - }) - .sum(); - let actual_buffer_size = s.get_buffer_memory_size(); - - // Re-creating the array copies data and can be time consuming. - // We only do it if the array is sparse - if actual_buffer_size > (ideal_buffer_size * 2) { - // We set the block size to `ideal_buffer_size` so that the new StringViewArray only has one buffer, which accelerate later concat_batches. - // See https://github.com/apache/arrow-rs/issues/6094 for more details. - let mut builder = StringViewBuilder::with_capacity(s.len()); - if ideal_buffer_size > 0 { - builder = builder.with_block_size(ideal_buffer_size as u32); - } - - for v in s.iter() { - builder.append_option(v); - } - - let gc_string = builder.finish(); - - debug_assert!(gc_string.data_buffers().len() <= 1); // buffer count can be 0 if the `ideal_buffer_size` is 0 - - Arc::new(gc_string) - } else { - Arc::clone(c) - } - }) - .collect(); - RecordBatch::try_new(batch.schema(), new_columns) - .expect("Failed to re-create the gc'ed record batch") -} - -#[cfg(test)] -mod tests { - use std::ops::Range; - - use super::*; - - use arrow::datatypes::{DataType, Field, Schema}; - use arrow_array::builder::ArrayBuilder; - use arrow_array::{StringViewArray, UInt32Array}; - - #[test] - fn test_coalesce() { - let batch = uint32_batch(0..8); - Test::new() - .with_batches(std::iter::repeat(batch).take(10)) - // expected output is batches of at least 20 rows (except for the final batch) - .with_target_batch_size(21) - .with_expected_output_sizes(vec![24, 24, 24, 8]) - .run() - } - - #[test] - fn test_coalesce_with_fetch_larger_than_input_size() { - let batch = uint32_batch(0..8); - Test::new() - .with_batches(std::iter::repeat(batch).take(10)) - // input is 10 batches x 8 rows (80 rows) with fetch limit of 100 - // expected to behave the same as `test_concat_batches` - .with_target_batch_size(21) - .with_fetch(Some(100)) - .with_expected_output_sizes(vec![24, 24, 24, 8]) - .run(); - } - - #[test] - fn test_coalesce_with_fetch_less_than_input_size() { - let batch = uint32_batch(0..8); - Test::new() - .with_batches(std::iter::repeat(batch).take(10)) - // input is 10 batches x 8 rows (80 rows) with fetch limit of 50 - .with_target_batch_size(21) - .with_fetch(Some(50)) - .with_expected_output_sizes(vec![24, 24, 2]) - .run(); - } - - #[test] - fn test_coalesce_with_fetch_less_than_target_and_no_remaining_rows() { - let batch = uint32_batch(0..8); - Test::new() - .with_batches(std::iter::repeat(batch).take(10)) - // input is 10 batches x 8 rows (80 rows) with fetch limit of 48 - .with_target_batch_size(21) - .with_fetch(Some(48)) - .with_expected_output_sizes(vec![24, 24]) - .run(); - } - - #[test] - fn test_coalesce_with_fetch_less_target_batch_size() { - let batch = uint32_batch(0..8); - Test::new() - .with_batches(std::iter::repeat(batch).take(10)) - // input is 10 batches x 8 rows (80 rows) with fetch limit of 10 - .with_target_batch_size(21) - .with_fetch(Some(10)) - .with_expected_output_sizes(vec![10]) - .run(); - } - - #[test] - fn test_coalesce_single_large_batch_over_fetch() { - let large_batch = uint32_batch(0..100); - Test::new() - .with_batch(large_batch) - .with_target_batch_size(20) - .with_fetch(Some(7)) - .with_expected_output_sizes(vec![7]) - .run() - } - - /// Test for [`BatchCoalescer`] - /// - /// Pushes the input batches to the coalescer and verifies that the resulting - /// batches have the expected number of rows and contents. - #[derive(Debug, Clone, Default)] - struct Test { - /// Batches to feed to the coalescer. Tests must have at least one - /// schema - input_batches: Vec, - /// Expected output sizes of the resulting batches - expected_output_sizes: Vec, - /// target batch size - target_batch_size: usize, - /// Fetch (limit) - fetch: Option, - } - - impl Test { - fn new() -> Self { - Self::default() - } - - /// Set the target batch size - fn with_target_batch_size(mut self, target_batch_size: usize) -> Self { - self.target_batch_size = target_batch_size; - self - } - - /// Set the fetch (limit) - fn with_fetch(mut self, fetch: Option) -> Self { - self.fetch = fetch; - self - } - - /// Extend the input batches with `batch` - fn with_batch(mut self, batch: RecordBatch) -> Self { - self.input_batches.push(batch); - self - } - - /// Extends the input batches with `batches` - fn with_batches( - mut self, - batches: impl IntoIterator, - ) -> Self { - self.input_batches.extend(batches); - self - } - - /// Extends `sizes` to expected output sizes - fn with_expected_output_sizes( - mut self, - sizes: impl IntoIterator, - ) -> Self { - self.expected_output_sizes.extend(sizes); - self - } - - /// Runs the test -- see documentation on [`Test`] for details - fn run(self) { - let Self { - input_batches, - target_batch_size, - fetch, - expected_output_sizes, - } = self; - - let schema = input_batches[0].schema(); - - // create a single large input batch for output comparison - let single_input_batch = concat_batches(&schema, &input_batches).unwrap(); - - let mut coalescer = - BatchCoalescer::new(Arc::clone(&schema), target_batch_size, fetch); - - let mut output_batches = vec![]; - for batch in input_batches { - match coalescer.push_batch(batch) { - CoalescerState::Continue => {} - CoalescerState::LimitReached => { - output_batches.push(coalescer.finish_batch().unwrap()); - break; - } - CoalescerState::TargetReached => { - coalescer.buffered_rows = 0; - output_batches.push(coalescer.finish_batch().unwrap()); - } - } - } - if coalescer.buffered_rows != 0 { - output_batches.extend(coalescer.buffer); - } - - // make sure we got the expected number of output batches and content - let mut starting_idx = 0; - assert_eq!(expected_output_sizes.len(), output_batches.len()); - for (i, (expected_size, batch)) in - expected_output_sizes.iter().zip(output_batches).enumerate() - { - assert_eq!( - *expected_size, - batch.num_rows(), - "Unexpected number of rows in Batch {i}" - ); - - // compare the contents of the batch (using `==` compares the - // underlying memory layout too) - let expected_batch = - single_input_batch.slice(starting_idx, *expected_size); - let batch_strings = batch_to_pretty_strings(&batch); - let expected_batch_strings = batch_to_pretty_strings(&expected_batch); - let batch_strings = batch_strings.lines().collect::>(); - let expected_batch_strings = - expected_batch_strings.lines().collect::>(); - assert_eq!( - expected_batch_strings, batch_strings, - "Unexpected content in Batch {i}:\ - \n\nExpected:\n{expected_batch_strings:#?}\n\nActual:\n{batch_strings:#?}" - ); - starting_idx += *expected_size; - } - } - } - - /// Return a batch of UInt32 with the specified range - fn uint32_batch(range: Range) -> RecordBatch { - let schema = - Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)])); - - RecordBatch::try_new( - Arc::clone(&schema), - vec![Arc::new(UInt32Array::from_iter_values(range))], - ) - .unwrap() - } - - #[test] - fn test_gc_string_view_batch_small_no_compact() { - // view with only short strings (no buffers) --> no need to compact - let array = StringViewTest { - rows: 1000, - strings: vec![Some("a"), Some("b"), Some("c")], - } - .build(); - - let gc_array = do_gc(array.clone()); - compare_string_array_values(&array, &gc_array); - assert_eq!(array.data_buffers().len(), 0); - assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction - } - - #[test] - fn test_gc_string_view_batch_large_no_compact() { - // view with large strings (has buffers) but full --> no need to compact - let array = StringViewTest { - rows: 1000, - strings: vec![Some("This string is longer than 12 bytes")], - } - .build(); - - let gc_array = do_gc(array.clone()); - compare_string_array_values(&array, &gc_array); - assert_eq!(array.data_buffers().len(), 5); - assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction - } - - #[test] - fn test_gc_string_view_batch_large_slice_compact() { - // view with large strings (has buffers) and only partially used --> no need to compact - let array = StringViewTest { - rows: 1000, - strings: vec![Some("this string is longer than 12 bytes")], - } - .build(); - - // slice only 11 rows, so most of the buffer is not used - let array = array.slice(11, 22); - - let gc_array = do_gc(array.clone()); - compare_string_array_values(&array, &gc_array); - assert_eq!(array.data_buffers().len(), 5); - assert_eq!(gc_array.data_buffers().len(), 1); // compacted into a single buffer - } - - /// Compares the values of two string view arrays - fn compare_string_array_values(arr1: &StringViewArray, arr2: &StringViewArray) { - assert_eq!(arr1.len(), arr2.len()); - for (s1, s2) in arr1.iter().zip(arr2.iter()) { - assert_eq!(s1, s2); - } - } - - /// runs garbage collection on string view array - /// and ensures the number of rows are the same - fn do_gc(array: StringViewArray) -> StringViewArray { - let batch = - RecordBatch::try_from_iter(vec![("a", Arc::new(array) as ArrayRef)]).unwrap(); - let gc_batch = gc_string_view_batch(&batch); - assert_eq!(batch.num_rows(), gc_batch.num_rows()); - assert_eq!(batch.schema(), gc_batch.schema()); - gc_batch - .column(0) - .as_any() - .downcast_ref::() - .unwrap() - .clone() - } - - /// Describes parameters for creating a `StringViewArray` - struct StringViewTest { - /// The number of rows in the array - rows: usize, - /// The strings to use in the array (repeated over and over - strings: Vec>, - } - - impl StringViewTest { - /// Create a `StringViewArray` with the parameters specified in this struct - fn build(self) -> StringViewArray { - let mut builder = StringViewBuilder::with_capacity(100).with_block_size(8192); - loop { - for &v in self.strings.iter() { - builder.append_option(v); - if builder.len() >= self.rows { - return builder.finish(); - } - } - } - } - } - fn batch_to_pretty_strings(batch: &RecordBatch) -> String { - arrow::util::pretty::pretty_format_batches(&[batch.clone()]) - .unwrap() - .to_string() - } -} diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 6aba3d817710..6fe28d64f9ce 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -44,6 +44,7 @@ use datafusion_physical_expr::{ analyze, split_conjunction, AnalysisContext, ConstExpr, ExprBoundaries, PhysicalExpr, }; +use crate::coalesce::{BatchCoalescer, CoalescerState}; use futures::stream::{Stream, StreamExt}; use log::trace; @@ -279,10 +280,12 @@ impl ExecutionPlan for FilterExec { trace!("Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id()); let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); Ok(Box::pin(FilterExecStream { - schema: self.input.schema(), + done: false, predicate: Arc::clone(&self.predicate), input: self.input.execute(partition, context)?, baseline_metrics, + // TODO use actual target batch size, for now hardcode the default size + coalescer: BatchCoalescer::new(self.input.schema(), 8192, self.fetch()), })) } @@ -337,14 +340,16 @@ fn collect_new_statistics( /// The FilterExec streams wraps the input iterator and applies the predicate expression to /// determine which rows to include in its output batches struct FilterExecStream { - /// Output schema, which is the same as the input schema for this operator - schema: SchemaRef, + /// Is the sstream done? + done: bool, /// The expression to filter on. This expression must evaluate to a boolean value. predicate: Arc, /// The input partition to filter. input: SendableRecordBatchStream, /// runtime metrics recording baseline_metrics: BaselineMetrics, + /// Build up output batches incrementally + coalescer: BatchCoalescer, } pub fn batch_filter( @@ -374,22 +379,47 @@ impl Stream for FilterExecStream { mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { + if self.done { + return Poll::Ready(None); + } let poll; loop { match ready!(self.input.poll_next_unpin(cx)) { Some(Ok(batch)) => { - let timer = self.baseline_metrics.elapsed_compute().timer(); + // clone timer so we can borrow self mutably + let elapsed_compute = self.baseline_metrics.elapsed_compute().clone(); + let _timer = elapsed_compute.timer(); // records time on drop let filtered_batch = batch_filter(&batch, &self.predicate)?; - timer.done(); - // skip entirely filtered batches - if filtered_batch.num_rows() == 0 { - continue; - } - poll = Poll::Ready(Some(Ok(filtered_batch))); + match self.coalescer.push_batch(filtered_batch) { + CoalescerState::TargetReached => { + let batch = self.coalescer.finish_batch(); + poll = Poll::Ready(Some(batch)); + break; + } + CoalescerState::Continue => { + // sill need more rows + continue; + } + CoalescerState::LimitReached => { + let batch = self.coalescer.finish_batch(); + poll = Poll::Ready(Some(batch)); + break; + } + }; + } + // end of input, see if we have any remaining batches + None => { + self.done = true; + let maybe_result = if self.coalescer.is_empty() { + None + } else { + Some(self.coalescer.finish_batch()) + }; + poll = Poll::Ready(maybe_result); break; } - value => { - poll = Poll::Ready(value); + err => { + poll = Poll::Ready(err); break; } } @@ -405,7 +435,7 @@ impl Stream for FilterExecStream { impl RecordBatchStream for FilterExecStream { fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema) + self.coalescer.schema() } } diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 59c5da6b6fb2..97621132d9c6 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -85,5 +85,158 @@ pub mod udaf { pub use datafusion_physical_expr_functions_aggregate::aggregate::AggregateFunctionExpr; } +pub mod coalesce; #[cfg(test)] + +mod tests { + use std::any::Any; + use std::sync::Arc; + + use arrow_schema::{Schema, SchemaRef}; + + use datafusion_common::{Result, Statistics}; + use datafusion_execution::{SendableRecordBatchStream, TaskContext}; + + use crate::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; + + #[derive(Debug)] + pub struct EmptyExec; + + impl EmptyExec { + pub fn new(_schema: SchemaRef) -> Self { + Self + } + } + + impl DisplayAs for EmptyExec { + fn fmt_as( + &self, + _t: DisplayFormatType, + _f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + unimplemented!() + } + } + + impl ExecutionPlan for EmptyExec { + fn name(&self) -> &'static str { + Self::static_name() + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + unimplemented!() + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + _: Vec>, + ) -> Result> { + unimplemented!() + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + unimplemented!() + } + + fn statistics(&self) -> Result { + unimplemented!() + } + } + + #[derive(Debug)] + pub struct RenamedEmptyExec; + + impl RenamedEmptyExec { + pub fn new(_schema: SchemaRef) -> Self { + Self + } + } + + impl DisplayAs for RenamedEmptyExec { + fn fmt_as( + &self, + _t: DisplayFormatType, + _f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + unimplemented!() + } + } + + impl ExecutionPlan for RenamedEmptyExec { + fn name(&self) -> &'static str { + Self::static_name() + } + + fn static_name() -> &'static str + where + Self: Sized, + { + "MyRenamedEmptyExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + unimplemented!() + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + _: Vec>, + ) -> Result> { + unimplemented!() + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + unimplemented!() + } + + fn statistics(&self) -> Result { + unimplemented!() + } + } + + #[test] + fn test_execution_plan_name() { + let schema1 = Arc::new(Schema::empty()); + let default_name_exec = EmptyExec::new(schema1); + assert_eq!(default_name_exec.name(), "EmptyExec"); + + let schema2 = Arc::new(Schema::empty()); + let renamed_exec = RenamedEmptyExec::new(schema2); + assert_eq!(renamed_exec.name(), "MyRenamedEmptyExec"); + assert_eq!(RenamedEmptyExec::static_name(), "MyRenamedEmptyExec"); + } + + /// A compilation test to ensure that the `ExecutionPlan::name()` method can + /// be called from a trait object. + /// Related ticket: https://github.com/apache/datafusion/pull/11047 + #[allow(dead_code)] + fn use_execution_plan_as_trait_object(plan: &dyn ExecutionPlan) { + let _ = plan.name(); + } +} + pub mod test;