diff --git a/datafusion/common/src/utils/memory.rs b/datafusion/common/src/utils/memory.rs index d5ce59e3421b..bb68d59eed59 100644 --- a/datafusion/common/src/utils/memory.rs +++ b/datafusion/common/src/utils/memory.rs @@ -102,7 +102,7 @@ pub fn estimate_memory_size(num_elements: usize, fixed_size: usize) -> Result #[cfg(test)] mod tests { - use std::collections::HashSet; + use std::{collections::HashSet, mem::size_of}; use super::estimate_memory_size; diff --git a/datafusion/core/tests/user_defined/user_defined_aggregates.rs b/datafusion/core/tests/user_defined/user_defined_aggregates.rs index 497addd23094..99c00615376f 100644 --- a/datafusion/core/tests/user_defined/user_defined_aggregates.rs +++ b/datafusion/core/tests/user_defined/user_defined_aggregates.rs @@ -19,6 +19,7 @@ //! user defined aggregate functions use std::hash::{DefaultHasher, Hash, Hasher}; +use std::mem::{size_of, size_of_val}; use std::sync::{ atomic::{AtomicBool, Ordering}, Arc, diff --git a/datafusion/physical-plan/src/aggregates/group_values/column.rs b/datafusion/physical-plan/src/aggregates/group_values/column.rs index 958a4b58d800..8100bb876ded 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/column.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +use std::mem::{self, size_of}; + use crate::aggregates::group_values::group_column::{ ByteGroupValueBuilder, ByteViewGroupValueBuilder, GroupColumn, PrimitiveGroupValueBuilder, @@ -35,29 +37,100 @@ use datafusion_common::{not_impl_err, DataFusionError, Result}; use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt}; use datafusion_expr::EmitTo; use datafusion_physical_expr::binary_map::OutputType; + use hashbrown::raw::RawTable; -use std::mem::size_of; -/// A [`GroupValues`] that stores multiple columns of group values. +const NON_INLINED_FLAG: u64 = 0x8000000000000000; +const VALUE_MASK: u64 = 0x7FFFFFFFFFFFFFFF; + +/// The view of indices pointing to the actual values in `GroupValues` +/// +/// If only single `group index` represented by view, +/// value of view is just the `group index`, and we call it a `inlined view`. +/// +/// If multiple `group indices` represented by view, +/// value of view is the actually the index pointing to `group indices`, +/// and we call it `non-inlined view`. +/// +/// The view(a u64) format is like: +/// +---------------------+---------------------------------------------+ +/// | inlined flag(1bit) | group index / index to group indices(63bit) | +/// +---------------------+---------------------------------------------+ /// +/// `inlined flag`: 1 represents `non-inlined`, and 0 represents `inlined` /// -pub struct GroupValuesColumn { +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +struct GroupIndexView(u64); + +impl GroupIndexView { + #[inline] + pub fn is_non_inlined(&self) -> bool { + (self.0 & NON_INLINED_FLAG) > 0 + } + + #[inline] + pub fn new_inlined(group_index: u64) -> Self { + Self(group_index) + } + + #[inline] + pub fn new_non_inlined(list_offset: u64) -> Self { + let non_inlined_value = list_offset | NON_INLINED_FLAG; + Self(non_inlined_value) + } + + #[inline] + pub fn value(&self) -> u64 { + self.0 & VALUE_MASK + } +} + +/// A [`GroupValues`] that stores multiple columns of group values, +/// and supports vectorized operators for them +/// +pub struct GroupValuesColumn { /// The output schema schema: SchemaRef, /// Logically maps group values to a group_index in /// [`Self::group_values`] and in each accumulator /// - /// Uses the raw API of hashbrown to avoid actually storing the - /// keys (group values) in the table + /// It is a `hashtable` based on `hashbrown`. + /// + /// Key and value in the `hashtable`: + /// - The `key` is `hash value(u64)` of the `group value` + /// - The `value` is the `group values` with the same `hash value` /// - /// keys: u64 hashes of the GroupValue - /// values: (hash, group_index) - map: RawTable<(u64, usize)>, + /// We don't really store the actual `group values` in `hashtable`, + /// instead we store the `group indices` pointing to values in `GroupValues`. + /// And we use [`GroupIndexView`] to represent such `group indices` in table. + /// + /// + map: RawTable<(u64, GroupIndexView)>, /// The size of `map` in bytes map_size: usize, + /// The lists for group indices with the same hash value + /// + /// It is possible that hash value collision exists, + /// and we will chain the `group indices` with same hash value + /// + /// The chained indices is like: + /// `latest group index -> older group index -> even older group index -> ...` + /// + group_index_lists: Vec>, + + /// When emitting first n, we need to decrease/erase group indices in + /// `map` and `group_index_lists`. + /// + /// This buffer is used to temporarily store the remaining group indices in + /// a specific list in `group_index_lists`. + emit_group_index_list_buffer: Vec, + + /// Buffers for `vectorized_append` and `vectorized_equal_to` + vectorized_operation_buffers: VectorizedOperationBuffers, + /// The actual group by values, stored column-wise. Compare from /// the left to right, each column is stored as [`GroupColumn`]. /// @@ -75,13 +148,52 @@ pub struct GroupValuesColumn { random_state: RandomState, } -impl GroupValuesColumn { +/// Buffers to store intermediate results in `vectorized_append` +/// and `vectorized_equal_to`, for reducing memory allocation +#[derive(Default)] +struct VectorizedOperationBuffers { + /// The `vectorized append` row indices buffer + append_row_indices: Vec, + + /// The `vectorized_equal_to` row indices buffer + equal_to_row_indices: Vec, + + /// The `vectorized_equal_to` group indices buffer + equal_to_group_indices: Vec, + + /// The `vectorized_equal_to` result buffer + equal_to_results: Vec, + + /// The buffer for storing row indices found not equal to + /// exist groups in `group_values` in `vectorized_equal_to`. + /// We will perform `scalarized_intern` for such rows. + remaining_row_indices: Vec, +} + +impl VectorizedOperationBuffers { + fn clear(&mut self) { + self.append_row_indices.clear(); + self.equal_to_row_indices.clear(); + self.equal_to_group_indices.clear(); + self.equal_to_results.clear(); + self.remaining_row_indices.clear(); + } +} + +impl GroupValuesColumn { + // ======================================================================== + // Initialization functions + // ======================================================================== + /// Create a new instance of GroupValuesColumn if supported for the specified schema pub fn try_new(schema: SchemaRef) -> Result { let map = RawTable::with_capacity(0); Ok(Self { schema, map, + group_index_lists: Vec::new(), + emit_group_index_list_buffer: Vec::new(), + vectorized_operation_buffers: VectorizedOperationBuffers::default(), map_size: 0, group_values: vec![], hashes_buffer: Default::default(), @@ -89,41 +201,600 @@ impl GroupValuesColumn { }) } - /// Returns true if [`GroupValuesColumn`] supported for the specified schema - pub fn supported_schema(schema: &Schema) -> bool { - schema - .fields() + // ======================================================================== + // Scalarized intern + // ======================================================================== + + /// Scalarized intern + /// + /// This is used only for `streaming aggregation`, because `streaming aggregation` + /// depends on the order between `input rows` and their corresponding `group indices`. + /// + /// For example, assuming `input rows` in `cols` with 4 new rows + /// (not equal to `exist rows` in `group_values`, and need to create + /// new groups for them): + /// + /// ```text + /// row1 (hash collision with the exist rows) + /// row2 + /// row3 (hash collision with the exist rows) + /// row4 + /// ``` + /// + /// # In `scalarized_intern`, their `group indices` will be + /// + /// ```text + /// row1 --> 0 + /// row2 --> 1 + /// row3 --> 2 + /// row4 --> 3 + /// ``` + /// + /// `Group indices` order agrees with their input order, and the `streaming aggregation` + /// depends on this. + /// + /// # However In `vectorized_intern`, their `group indices` will be + /// + /// ```text + /// row1 --> 2 + /// row2 --> 0 + /// row3 --> 3 + /// row4 --> 1 + /// ``` + /// + /// `Group indices` order are against with their input order, and this will lead to error + /// in `streaming aggregation`. + /// + fn scalarized_intern( + &mut self, + cols: &[ArrayRef], + groups: &mut Vec, + ) -> Result<()> { + let n_rows = cols[0].len(); + + // tracks to which group each of the input rows belongs + groups.clear(); + + // 1.1 Calculate the group keys for the group values + let batch_hashes = &mut self.hashes_buffer; + batch_hashes.clear(); + batch_hashes.resize(n_rows, 0); + create_hashes(cols, &self.random_state, batch_hashes)?; + + for (row, &target_hash) in batch_hashes.iter().enumerate() { + let entry = self + .map + .get_mut(target_hash, |(exist_hash, group_idx_view)| { + // It is ensured to be inlined in `scalarized_intern` + debug_assert!(!group_idx_view.is_non_inlined()); + + // Somewhat surprisingly, this closure can be called even if the + // hash doesn't match, so check the hash first with an integer + // comparison first avoid the more expensive comparison with + // group value. https://github.com/apache/datafusion/pull/11718 + if target_hash != *exist_hash { + return false; + } + + fn check_row_equal( + array_row: &dyn GroupColumn, + lhs_row: usize, + array: &ArrayRef, + rhs_row: usize, + ) -> bool { + array_row.equal_to(lhs_row, array, rhs_row) + } + + for (i, group_val) in self.group_values.iter().enumerate() { + if !check_row_equal( + group_val.as_ref(), + group_idx_view.value() as usize, + &cols[i], + row, + ) { + return false; + } + } + + true + }); + + let group_idx = match entry { + // Existing group_index for this group value + Some((_hash, group_idx_view)) => group_idx_view.value() as usize, + // 1.2 Need to create new entry for the group + None => { + // Add new entry to aggr_state and save newly created index + // let group_idx = group_values.num_rows(); + // group_values.push(group_rows.row(row)); + + let mut checklen = 0; + let group_idx = self.group_values[0].len(); + for (i, group_value) in self.group_values.iter_mut().enumerate() { + group_value.append_val(&cols[i], row); + let len = group_value.len(); + if i == 0 { + checklen = len; + } else { + debug_assert_eq!(checklen, len); + } + } + + // for hasher function, use precomputed hash value + self.map.insert_accounted( + (target_hash, GroupIndexView::new_inlined(group_idx as u64)), + |(hash, _group_index)| *hash, + &mut self.map_size, + ); + group_idx + } + }; + groups.push(group_idx); + } + + Ok(()) + } + + // ======================================================================== + // Vectorized intern + // ======================================================================== + + /// Vectorized intern + /// + /// This is used in `non-streaming aggregation` without requiring the order between + /// rows in `cols` and corresponding groups in `group_values`. + /// + /// The vectorized approach can offer higher performance for avoiding row by row + /// downcast for `cols` and being able to implement even more optimizations(like simd). + /// + fn vectorized_intern( + &mut self, + cols: &[ArrayRef], + groups: &mut Vec, + ) -> Result<()> { + let n_rows = cols[0].len(); + + // tracks to which group each of the input rows belongs + groups.clear(); + groups.resize(n_rows, usize::MAX); + + let mut batch_hashes = mem::take(&mut self.hashes_buffer); + batch_hashes.clear(); + batch_hashes.resize(n_rows, 0); + create_hashes(cols, &self.random_state, &mut batch_hashes)?; + + // General steps for one round `vectorized equal_to & append`: + // 1. Collect vectorized context by checking hash values of `cols` in `map`, + // mainly fill `vectorized_append_row_indices`, `vectorized_equal_to_row_indices` + // and `vectorized_equal_to_group_indices` + // + // 2. Perform `vectorized_append` for `vectorized_append_row_indices`. + // `vectorized_append` must be performed before `vectorized_equal_to`, + // because some `group indices` in `vectorized_equal_to_group_indices` + // maybe still point to no actual values in `group_values` before performing append. + // + // 3. Perform `vectorized_equal_to` for `vectorized_equal_to_row_indices` + // and `vectorized_equal_to_group_indices`. If found some rows in input `cols` + // not equal to `exist rows` in `group_values`, place them in `remaining_row_indices` + // and perform `scalarized_intern_remaining` for them similar as `scalarized_intern` + // after. + // + // 4. Perform `scalarized_intern_remaining` for rows mentioned above, about in what situation + // we will process this can see the comments of `scalarized_intern_remaining`. + // + + // 1. Collect vectorized context by checking hash values of `cols` in `map` + self.collect_vectorized_process_context(&batch_hashes, groups); + + // 2. Perform `vectorized_append` + self.vectorized_append(cols); + + // 3. Perform `vectorized_equal_to` + self.vectorized_equal_to(cols, groups); + + // 4. Perform scalarized inter for remaining rows + // (about remaining rows, can see comments for `remaining_row_indices`) + self.scalarized_intern_remaining(cols, &batch_hashes, groups); + + self.hashes_buffer = batch_hashes; + + Ok(()) + } + + /// Collect vectorized context by checking hash values of `cols` in `map` + /// + /// 1. If bucket not found + /// - Build and insert the `new inlined group index view` + /// and its hash value to `map` + /// - Add row index to `vectorized_append_row_indices` + /// - Set group index to row in `groups` + /// + /// 2. bucket found + /// - Add row index to `vectorized_equal_to_row_indices` + /// - Check if the `group index view` is `inlined` or `non_inlined`: + /// If it is inlined, add to `vectorized_equal_to_group_indices` directly. + /// Otherwise get all group indices from `group_index_lists`, and add them. + /// + fn collect_vectorized_process_context( + &mut self, + batch_hashes: &[u64], + groups: &mut [usize], + ) { + self.vectorized_operation_buffers.append_row_indices.clear(); + self.vectorized_operation_buffers + .equal_to_row_indices + .clear(); + self.vectorized_operation_buffers + .equal_to_group_indices + .clear(); + + let mut group_values_len = self.group_values[0].len(); + for (row, &target_hash) in batch_hashes.iter().enumerate() { + let entry = self + .map + .get(target_hash, |(exist_hash, _)| target_hash == *exist_hash); + + let Some((_, group_index_view)) = entry else { + // 1. Bucket not found case + // Build `new inlined group index view` + let current_group_idx = group_values_len; + let group_index_view = + GroupIndexView::new_inlined(current_group_idx as u64); + + // Insert the `group index view` and its hash into `map` + // for hasher function, use precomputed hash value + self.map.insert_accounted( + (target_hash, group_index_view), + |(hash, _)| *hash, + &mut self.map_size, + ); + + // Add row index to `vectorized_append_row_indices` + self.vectorized_operation_buffers + .append_row_indices + .push(row); + + // Set group index to row in `groups` + groups[row] = current_group_idx; + + group_values_len += 1; + continue; + }; + + // 2. bucket found + // Check if the `group index view` is `inlined` or `non_inlined` + if group_index_view.is_non_inlined() { + // Non-inlined case, the value of view is offset in `group_index_lists`. + // We use it to get `group_index_list`, and add related `rows` and `group_indices` + // into `vectorized_equal_to_row_indices` and `vectorized_equal_to_group_indices`. + let list_offset = group_index_view.value() as usize; + let group_index_list = &self.group_index_lists[list_offset]; + for &group_index in group_index_list { + self.vectorized_operation_buffers + .equal_to_row_indices + .push(row); + self.vectorized_operation_buffers + .equal_to_group_indices + .push(group_index); + } + } else { + let group_index = group_index_view.value() as usize; + self.vectorized_operation_buffers + .equal_to_row_indices + .push(row); + self.vectorized_operation_buffers + .equal_to_group_indices + .push(group_index); + } + } + } + + /// Perform `vectorized_append`` for `rows` in `vectorized_append_row_indices` + fn vectorized_append(&mut self, cols: &[ArrayRef]) { + if self + .vectorized_operation_buffers + .append_row_indices + .is_empty() + { + return; + } + + let iter = self.group_values.iter_mut().zip(cols.iter()); + for (group_column, col) in iter { + group_column.vectorized_append( + col, + &self.vectorized_operation_buffers.append_row_indices, + ); + } + } + + /// Perform `vectorized_equal_to` + /// + /// 1. Perform `vectorized_equal_to` for `rows` in `vectorized_equal_to_group_indices` + /// and `group_indices` in `vectorized_equal_to_group_indices`. + /// + /// 2. Check `equal_to_results`: + /// + /// If found equal to `rows`, set the `group_indices` to `rows` in `groups`. + /// + /// If found not equal to `row`s, just add them to `scalarized_indices`, + /// and perform `scalarized_intern` for them after. + /// Usually, such `rows` having same hash but different value with `exists rows` + /// are very few. + fn vectorized_equal_to(&mut self, cols: &[ArrayRef], groups: &mut [usize]) { + assert_eq!( + self.vectorized_operation_buffers + .equal_to_group_indices + .len(), + self.vectorized_operation_buffers.equal_to_row_indices.len() + ); + + self.vectorized_operation_buffers + .remaining_row_indices + .clear(); + + if self + .vectorized_operation_buffers + .equal_to_group_indices + .is_empty() + { + return; + } + + // 1. Perform `vectorized_equal_to` for `rows` in `vectorized_equal_to_group_indices` + // and `group_indices` in `vectorized_equal_to_group_indices` + let mut equal_to_results = + mem::take(&mut self.vectorized_operation_buffers.equal_to_results); + equal_to_results.clear(); + equal_to_results.resize( + self.vectorized_operation_buffers + .equal_to_group_indices + .len(), + true, + ); + + for (col_idx, group_col) in self.group_values.iter().enumerate() { + group_col.vectorized_equal_to( + &self.vectorized_operation_buffers.equal_to_group_indices, + &cols[col_idx], + &self.vectorized_operation_buffers.equal_to_row_indices, + &mut equal_to_results, + ); + } + + // 2. Check `equal_to_results`, if found not equal to `row`s, just add them + // to `scalarized_indices`, and perform `scalarized_intern` for them after. + let mut current_row_equal_to_result = false; + for (idx, &row) in self + .vectorized_operation_buffers + .equal_to_row_indices .iter() - .map(|f| f.data_type()) - .all(Self::supported_type) + .enumerate() + { + let equal_to_result = equal_to_results[idx]; + + // Equal to case, set the `group_indices` to `rows` in `groups` + if equal_to_result { + groups[row] = + self.vectorized_operation_buffers.equal_to_group_indices[idx]; + } + current_row_equal_to_result |= equal_to_result; + + // Look forward next one row to check if have checked all results + // of current row + let next_row = self + .vectorized_operation_buffers + .equal_to_row_indices + .get(idx + 1) + .unwrap_or(&usize::MAX); + + // Have checked all results of current row, check the total result + if row != *next_row { + // Not equal to case, add `row` to `scalarized_indices` + if !current_row_equal_to_result { + self.vectorized_operation_buffers + .remaining_row_indices + .push(row); + } + + // Init the total result for checking next row + current_row_equal_to_result = false; + } + } + + self.vectorized_operation_buffers.equal_to_results = equal_to_results; } - /// Returns true if the specified data type is supported by [`GroupValuesColumn`] - /// - /// In order to be supported, there must be a specialized implementation of - /// [`GroupColumn`] for the data type, instantiated in [`Self::intern`] - fn supported_type(data_type: &DataType) -> bool { - matches!( - *data_type, - DataType::Int8 - | DataType::Int16 - | DataType::Int32 - | DataType::Int64 - | DataType::UInt8 - | DataType::UInt16 - | DataType::UInt32 - | DataType::UInt64 - | DataType::Float32 - | DataType::Float64 - | DataType::Utf8 - | DataType::LargeUtf8 - | DataType::Binary - | DataType::LargeBinary - | DataType::Date32 - | DataType::Date64 - | DataType::Utf8View - | DataType::BinaryView - ) + /// It is possible that some `input rows` have the same + /// hash values with the `exist rows`, but have the different + /// actual values the exists. + /// + /// We can found them in `vectorized_equal_to`, and put them + /// into `scalarized_indices`. And for these `input rows`, + /// we will perform the `scalarized_intern` similar as what in + /// [`GroupValuesColumn`]. + /// + /// This design can make the process simple and still efficient enough: + /// + /// # About making the process simple + /// + /// Some corner cases become really easy to solve, like following cases: + /// + /// ```text + /// input row1 (same hash value with exist rows, but value different) + /// input row1 + /// ... + /// input row1 + /// ``` + /// + /// After performing `vectorized_equal_to`, we will found multiple `input rows` + /// not equal to the `exist rows`. However such `input rows` are repeated, only + /// one new group should be create for them. + /// + /// If we don't fallback to `scalarized_intern`, it is really hard for us to + /// distinguish the such `repeated rows` in `input rows`. And if we just fallback, + /// it is really easy to solve, and the performance is at least not worse than origin. + /// + /// # About performance + /// + /// The hash collision may be not frequent, so the fallback will indeed hardly happen. + /// In most situations, `scalarized_indices` will found to be empty after finishing to + /// preform `vectorized_equal_to`. + /// + fn scalarized_intern_remaining( + &mut self, + cols: &[ArrayRef], + batch_hashes: &[u64], + groups: &mut [usize], + ) { + if self + .vectorized_operation_buffers + .remaining_row_indices + .is_empty() + { + return; + } + + let mut map = mem::take(&mut self.map); + + for &row in &self.vectorized_operation_buffers.remaining_row_indices { + let target_hash = batch_hashes[row]; + let entry = map.get_mut(target_hash, |(exist_hash, _)| { + // Somewhat surprisingly, this closure can be called even if the + // hash doesn't match, so check the hash first with an integer + // comparison first avoid the more expensive comparison with + // group value. https://github.com/apache/datafusion/pull/11718 + target_hash == *exist_hash + }); + + // Only `rows` having the same hash value with `exist rows` but different value + // will be process in `scalarized_intern`. + // So related `buckets` in `map` is ensured to be `Some`. + let Some((_, group_index_view)) = entry else { + unreachable!() + }; + + // Perform scalarized equal to + if self.scalarized_equal_to_remaining(group_index_view, cols, row, groups) { + // Found the row actually exists in group values, + // don't need to create new group for it. + continue; + } + + // Insert the `row` to `group_values` before checking `next row` + let group_idx = self.group_values[0].len(); + let mut checklen = 0; + for (i, group_value) in self.group_values.iter_mut().enumerate() { + group_value.append_val(&cols[i], row); + let len = group_value.len(); + if i == 0 { + checklen = len; + } else { + debug_assert_eq!(checklen, len); + } + } + + // Check if the `view` is `inlined` or `non-inlined` + if group_index_view.is_non_inlined() { + // Non-inlined case, get `group_index_list` from `group_index_lists`, + // then add the new `group` with the same hash values into it. + let list_offset = group_index_view.value() as usize; + let group_index_list = &mut self.group_index_lists[list_offset]; + group_index_list.push(group_idx); + } else { + // Inlined case + let list_offset = self.group_index_lists.len(); + + // Create new `group_index_list` including + // `exist group index` + `new group index`. + // Add new `group_index_list` into ``group_index_lists`. + let exist_group_index = group_index_view.value() as usize; + let new_group_index_list = vec![exist_group_index, group_idx]; + self.group_index_lists.push(new_group_index_list); + + // Update the `group_index_view` to non-inlined + let new_group_index_view = + GroupIndexView::new_non_inlined(list_offset as u64); + *group_index_view = new_group_index_view; + } + + groups[row] = group_idx; + } + + self.map = map; + } + + fn scalarized_equal_to_remaining( + &self, + group_index_view: &GroupIndexView, + cols: &[ArrayRef], + row: usize, + groups: &mut [usize], + ) -> bool { + // Check if this row exists in `group_values` + fn check_row_equal( + array_row: &dyn GroupColumn, + lhs_row: usize, + array: &ArrayRef, + rhs_row: usize, + ) -> bool { + array_row.equal_to(lhs_row, array, rhs_row) + } + + if group_index_view.is_non_inlined() { + let list_offset = group_index_view.value() as usize; + let group_index_list = &self.group_index_lists[list_offset]; + + for &group_idx in group_index_list { + let mut check_result = true; + for (i, group_val) in self.group_values.iter().enumerate() { + if !check_row_equal(group_val.as_ref(), group_idx, &cols[i], row) { + check_result = false; + break; + } + } + + if check_result { + groups[row] = group_idx; + return true; + } + } + + // All groups unmatched, return false result + false + } else { + let group_idx = group_index_view.value() as usize; + for (i, group_val) in self.group_values.iter().enumerate() { + if !check_row_equal(group_val.as_ref(), group_idx, &cols[i], row) { + return false; + } + } + + groups[row] = group_idx; + true + } + } + + /// Return group indices of the hash, also if its `group_index_view` is non-inlined + #[cfg(test)] + fn get_indices_by_hash(&self, hash: u64) -> Option<(Vec, GroupIndexView)> { + let entry = self.map.get(hash, |(exist_hash, _)| hash == *exist_hash); + + match entry { + Some((_, group_index_view)) => { + if group_index_view.is_non_inlined() { + let list_offset = group_index_view.value() as usize; + Some(( + self.group_index_lists[list_offset].clone(), + *group_index_view, + )) + } else { + let group_index = group_index_view.value() as usize; + Some((vec![group_index], *group_index_view)) + } + } + None => None, + } } } @@ -146,10 +817,8 @@ macro_rules! instantiate_primitive { }; } -impl GroupValues for GroupValuesColumn { +impl GroupValues for GroupValuesColumn { fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec) -> Result<()> { - let n_rows = cols[0].len(); - if self.group_values.is_empty() { let mut v = Vec::with_capacity(cols.len()); @@ -204,77 +873,11 @@ impl GroupValues for GroupValuesColumn { self.group_values = v; } - // tracks to which group each of the input rows belongs - groups.clear(); - - // 1.1 Calculate the group keys for the group values - let batch_hashes = &mut self.hashes_buffer; - batch_hashes.clear(); - batch_hashes.resize(n_rows, 0); - create_hashes(cols, &self.random_state, batch_hashes)?; - - for (row, &target_hash) in batch_hashes.iter().enumerate() { - let entry = self.map.get_mut(target_hash, |(exist_hash, group_idx)| { - // Somewhat surprisingly, this closure can be called even if the - // hash doesn't match, so check the hash first with an integer - // comparison first avoid the more expensive comparison with - // group value. https://github.com/apache/datafusion/pull/11718 - if target_hash != *exist_hash { - return false; - } - - fn check_row_equal( - array_row: &dyn GroupColumn, - lhs_row: usize, - array: &ArrayRef, - rhs_row: usize, - ) -> bool { - array_row.equal_to(lhs_row, array, rhs_row) - } - - for (i, group_val) in self.group_values.iter().enumerate() { - if !check_row_equal(group_val.as_ref(), *group_idx, &cols[i], row) { - return false; - } - } - - true - }); - - let group_idx = match entry { - // Existing group_index for this group value - Some((_hash, group_idx)) => *group_idx, - // 1.2 Need to create new entry for the group - None => { - // Add new entry to aggr_state and save newly created index - // let group_idx = group_values.num_rows(); - // group_values.push(group_rows.row(row)); - - let mut checklen = 0; - let group_idx = self.group_values[0].len(); - for (i, group_value) in self.group_values.iter_mut().enumerate() { - group_value.append_val(&cols[i], row); - let len = group_value.len(); - if i == 0 { - checklen = len; - } else { - debug_assert_eq!(checklen, len); - } - } - - // for hasher function, use precomputed hash value - self.map.insert_accounted( - (target_hash, group_idx), - |(hash, _group_index)| *hash, - &mut self.map_size, - ); - group_idx - } - }; - groups.push(group_idx); + if !STREAMING { + self.vectorized_intern(cols, groups) + } else { + self.scalarized_intern(cols, groups) } - - Ok(()) } fn size(&self) -> usize { @@ -297,7 +900,7 @@ impl GroupValues for GroupValuesColumn { fn emit(&mut self, emit_to: EmitTo) -> Result> { let mut output = match emit_to { EmitTo::All => { - let group_values = std::mem::take(&mut self.group_values); + let group_values = mem::take(&mut self.group_values); debug_assert!(self.group_values.is_empty()); group_values @@ -311,20 +914,74 @@ impl GroupValues for GroupValuesColumn { .iter_mut() .map(|v| v.take_n(n)) .collect::>(); + let mut next_new_list_offset = 0; // SAFETY: self.map outlives iterator and is not modified concurrently unsafe { for bucket in self.map.iter() { - // Decrement group index by n - match bucket.as_ref().1.checked_sub(n) { + // In non-streaming case, we need to check if the `group index view` + // is `inlined` or `non-inlined` + if !STREAMING && bucket.as_ref().1.is_non_inlined() { + // Non-inlined case + // We take `group_index_list` from `old_group_index_lists` + + // list_offset is incrementally + self.emit_group_index_list_buffer.clear(); + let list_offset = bucket.as_ref().1.value() as usize; + for group_index in self.group_index_lists[list_offset].iter() + { + if let Some(remaining) = group_index.checked_sub(n) { + self.emit_group_index_list_buffer.push(remaining); + } + } + + // The possible results: + // - `new_group_index_list` is empty, we should erase this bucket + // - only one value in `new_group_index_list`, switch the `view` to `inlined` + // - still multiple values in `new_group_index_list`, build and set the new `unlined view` + if self.emit_group_index_list_buffer.is_empty() { + self.map.erase(bucket); + } else if self.emit_group_index_list_buffer.len() == 1 { + let group_index = + self.emit_group_index_list_buffer.first().unwrap(); + bucket.as_mut().1 = + GroupIndexView::new_inlined(*group_index as u64); + } else { + let group_index_list = + &mut self.group_index_lists[next_new_list_offset]; + group_index_list.clear(); + group_index_list + .extend(self.emit_group_index_list_buffer.iter()); + bucket.as_mut().1 = GroupIndexView::new_non_inlined( + next_new_list_offset as u64, + ); + next_new_list_offset += 1; + } + + continue; + } + + // In `streaming case`, the `group index view` is ensured to be `inlined` + debug_assert!(!bucket.as_ref().1.is_non_inlined()); + + // Inlined case, we just decrement group index by n) + let group_index = bucket.as_ref().1.value() as usize; + match group_index.checked_sub(n) { // Group index was >= n, shift value down - Some(sub) => bucket.as_mut().1 = sub, + Some(sub) => { + bucket.as_mut().1 = + GroupIndexView::new_inlined(sub as u64) + } // Group index was < n, so remove from table None => self.map.erase(bucket), } } } + if !STREAMING { + self.group_index_lists.truncate(next_new_list_offset); + } + output } }; @@ -354,5 +1011,610 @@ impl GroupValues for GroupValuesColumn { self.map_size = self.map.capacity() * size_of::<(u64, usize)>(); self.hashes_buffer.clear(); self.hashes_buffer.shrink_to(count); + + // Such structures are only used in `non-streaming` case + if !STREAMING { + self.group_index_lists.clear(); + self.emit_group_index_list_buffer.clear(); + self.vectorized_operation_buffers.clear(); + } + } +} + +/// Returns true if [`GroupValuesColumn`] supported for the specified schema +pub fn supported_schema(schema: &Schema) -> bool { + schema + .fields() + .iter() + .map(|f| f.data_type()) + .all(supported_type) +} + +/// Returns true if the specified data type is supported by [`GroupValuesColumn`] +/// +/// In order to be supported, there must be a specialized implementation of +/// [`GroupColumn`] for the data type, instantiated in [`GroupValuesColumn::intern`] +fn supported_type(data_type: &DataType) -> bool { + matches!( + *data_type, + DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 + | DataType::Float32 + | DataType::Float64 + | DataType::Utf8 + | DataType::LargeUtf8 + | DataType::Binary + | DataType::LargeBinary + | DataType::Date32 + | DataType::Date64 + | DataType::Utf8View + | DataType::BinaryView + ) +} + +#[cfg(test)] +mod tests { + use std::{collections::HashMap, sync::Arc}; + + use arrow::{compute::concat_batches, util::pretty::pretty_format_batches}; + use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringArray, StringViewArray}; + use arrow_schema::{DataType, Field, Schema, SchemaRef}; + use datafusion_common::utils::proxy::RawTableAllocExt; + use datafusion_expr::EmitTo; + + use crate::aggregates::group_values::{column::GroupValuesColumn, GroupValues}; + + use super::GroupIndexView; + + #[test] + fn test_intern_for_vectorized_group_values() { + let data_set = VectorizedTestDataSet::new(); + let mut group_values = + GroupValuesColumn::::try_new(data_set.schema()).unwrap(); + + data_set.load_to_group_values(&mut group_values); + let actual_batch = group_values.emit(EmitTo::All).unwrap(); + let actual_batch = RecordBatch::try_new(data_set.schema(), actual_batch).unwrap(); + + check_result(&actual_batch, &data_set.expected_batch); + } + + #[test] + fn test_emit_first_n_for_vectorized_group_values() { + let data_set = VectorizedTestDataSet::new(); + let mut group_values = + GroupValuesColumn::::try_new(data_set.schema()).unwrap(); + + // 1~num_rows times to emit the groups + let num_rows = data_set.expected_batch.num_rows(); + let schema = data_set.schema(); + for times_to_take in 1..=num_rows { + // Write data after emitting + data_set.load_to_group_values(&mut group_values); + + // Emit `times_to_take` times, collect and concat the sub-results to total result, + // then check it + let suggest_num_emit = data_set.expected_batch.num_rows() / times_to_take; + let mut num_remaining_rows = num_rows; + let mut actual_sub_batches = Vec::new(); + + for nth_time in 0..times_to_take { + let num_emit = if nth_time == times_to_take - 1 { + num_remaining_rows + } else { + suggest_num_emit + }; + + let sub_batch = group_values.emit(EmitTo::First(num_emit)).unwrap(); + let sub_batch = + RecordBatch::try_new(Arc::clone(&schema), sub_batch).unwrap(); + actual_sub_batches.push(sub_batch); + + num_remaining_rows -= num_emit; + } + assert!(num_remaining_rows == 0); + + let actual_batch = concat_batches(&schema, &actual_sub_batches).unwrap(); + check_result(&actual_batch, &data_set.expected_batch); + } + } + + #[test] + fn test_hashtable_modifying_in_emit_first_n() { + // Situations should be covered: + // 1. Erase inlined group index view + // 2. Erase whole non-inlined group index view + // 3. Erase + decrease group indices in non-inlined group index view + // + view still non-inlined after decreasing + // 4. Erase + decrease group indices in non-inlined group index view + // + view switch to inlined after decreasing + // 5. Only decrease group index in inlined group index view + // 6. Only decrease group indices in non-inlined group index view + // 7. Erase all things + + let field = Field::new("item", DataType::Int32, true); + let schema = Arc::new(Schema::new_with_metadata(vec![field], HashMap::new())); + let mut group_values = GroupValuesColumn::::try_new(schema).unwrap(); + + // Insert group index views and check if success to insert + insert_inline_group_index_view(&mut group_values, 0, 0); + insert_non_inline_group_index_view(&mut group_values, 1, vec![1, 2]); + insert_non_inline_group_index_view(&mut group_values, 2, vec![3, 4, 5]); + insert_inline_group_index_view(&mut group_values, 3, 6); + insert_non_inline_group_index_view(&mut group_values, 4, vec![7, 8]); + insert_non_inline_group_index_view(&mut group_values, 5, vec![9, 10, 11]); + + assert_eq!( + group_values.get_indices_by_hash(0).unwrap(), + (vec![0], GroupIndexView::new_inlined(0)) + ); + assert_eq!( + group_values.get_indices_by_hash(1).unwrap(), + (vec![1, 2], GroupIndexView::new_non_inlined(0)) + ); + assert_eq!( + group_values.get_indices_by_hash(2).unwrap(), + (vec![3, 4, 5], GroupIndexView::new_non_inlined(1)) + ); + assert_eq!( + group_values.get_indices_by_hash(3).unwrap(), + (vec![6], GroupIndexView::new_inlined(6)) + ); + assert_eq!( + group_values.get_indices_by_hash(4).unwrap(), + (vec![7, 8], GroupIndexView::new_non_inlined(2)) + ); + assert_eq!( + group_values.get_indices_by_hash(5).unwrap(), + (vec![9, 10, 11], GroupIndexView::new_non_inlined(3)) + ); + assert_eq!(group_values.map.len(), 6); + + // Emit first 4 to test cases 1~3, 5~6 + let _ = group_values.emit(EmitTo::First(4)).unwrap(); + assert!(group_values.get_indices_by_hash(0).is_none()); + assert!(group_values.get_indices_by_hash(1).is_none()); + assert_eq!( + group_values.get_indices_by_hash(2).unwrap(), + (vec![0, 1], GroupIndexView::new_non_inlined(0)) + ); + assert_eq!( + group_values.get_indices_by_hash(3).unwrap(), + (vec![2], GroupIndexView::new_inlined(2)) + ); + assert_eq!( + group_values.get_indices_by_hash(4).unwrap(), + (vec![3, 4], GroupIndexView::new_non_inlined(1)) + ); + assert_eq!( + group_values.get_indices_by_hash(5).unwrap(), + (vec![5, 6, 7], GroupIndexView::new_non_inlined(2)) + ); + assert_eq!(group_values.map.len(), 4); + + // Emit first 1 to test case 4, and cases 5~6 again + let _ = group_values.emit(EmitTo::First(1)).unwrap(); + assert_eq!( + group_values.get_indices_by_hash(2).unwrap(), + (vec![0], GroupIndexView::new_inlined(0)) + ); + assert_eq!( + group_values.get_indices_by_hash(3).unwrap(), + (vec![1], GroupIndexView::new_inlined(1)) + ); + assert_eq!( + group_values.get_indices_by_hash(4).unwrap(), + (vec![2, 3], GroupIndexView::new_non_inlined(0)) + ); + assert_eq!( + group_values.get_indices_by_hash(5).unwrap(), + (vec![4, 5, 6], GroupIndexView::new_non_inlined(1)) + ); + assert_eq!(group_values.map.len(), 4); + + // Emit first 5 to test cases 1~3 again + let _ = group_values.emit(EmitTo::First(5)).unwrap(); + assert_eq!( + group_values.get_indices_by_hash(5).unwrap(), + (vec![0, 1], GroupIndexView::new_non_inlined(0)) + ); + assert_eq!(group_values.map.len(), 1); + + // Emit first 1 to test cases 4 again + let _ = group_values.emit(EmitTo::First(1)).unwrap(); + assert_eq!( + group_values.get_indices_by_hash(5).unwrap(), + (vec![0], GroupIndexView::new_inlined(0)) + ); + assert_eq!(group_values.map.len(), 1); + + // Emit first 1 to test cases 7 + let _ = group_values.emit(EmitTo::First(1)).unwrap(); + assert!(group_values.map.is_empty()); + } + + /// Test data set for [`GroupValuesColumn::vectorized_intern`] + /// + /// Define the test data and support loading them into test [`GroupValuesColumn::vectorized_intern`] + /// + /// The covering situations: + /// + /// Array type: + /// - Primitive array + /// - String(byte) array + /// - String view(byte view) array + /// + /// Repeation and nullability in single batch: + /// - All not null rows + /// - Mixed null + not null rows + /// - All null rows + /// - All not null rows(repeated) + /// - Null + not null rows(repeated) + /// - All not null rows(repeated) + /// + /// If group exists in `map`: + /// - Group exists in inlined group view + /// - Group exists in non-inlined group view + /// - Group not exist + bucket not found in `map` + /// - Group not exist + not equal to inlined group view(tested in hash collision) + /// - Group not exist + not equal to non-inlined group view(tested in hash collision) + /// + struct VectorizedTestDataSet { + test_batches: Vec>, + expected_batch: RecordBatch, + } + + impl VectorizedTestDataSet { + fn new() -> Self { + // Intern batch 1 + let col1 = Int64Array::from(vec![ + // Repeated rows in batch + Some(42), // all not nulls + repeated rows + exist in map case + None, // mixed + repeated rows + exist in map case + None, // mixed + repeated rows + not exist in map case + Some(1142), // mixed + repeated rows + not exist in map case + None, // all nulls + repeated rows + exist in map case + Some(42), + None, + None, + Some(1142), + None, + // Unique rows in batch + Some(4211), // all not nulls + unique rows + exist in map case + None, // mixed + unique rows + exist in map case + None, // mixed + unique rows + not exist in map case + Some(4212), // mixed + unique rows + not exist in map case + ]); + + let col2 = StringArray::from(vec![ + // Repeated rows in batch + Some("string1"), // all not nulls + repeated rows + exist in map case + None, // mixed + repeated rows + exist in map case + Some("string2"), // mixed + repeated rows + not exist in map case + None, // mixed + repeated rows + not exist in map case + None, // all nulls + repeated rows + exist in map case + Some("string1"), + None, + Some("string2"), + None, + None, + // Unique rows in batch + Some("string3"), // all not nulls + unique rows + exist in map case + None, // mixed + unique rows + exist in map case + Some("string4"), // mixed + unique rows + not exist in map case + None, // mixed + unique rows + not exist in map case + ]); + + let col3 = StringViewArray::from(vec![ + // Repeated rows in batch + Some("stringview1"), // all not nulls + repeated rows + exist in map case + Some("stringview2"), // mixed + repeated rows + exist in map case + None, // mixed + repeated rows + not exist in map case + None, // mixed + repeated rows + not exist in map case + None, // all nulls + repeated rows + exist in map case + Some("stringview1"), + Some("stringview2"), + None, + None, + None, + // Unique rows in batch + Some("stringview3"), // all not nulls + unique rows + exist in map case + Some("stringview4"), // mixed + unique rows + exist in map case + None, // mixed + unique rows + not exist in map case + None, // mixed + unique rows + not exist in map case + ]); + let batch1 = vec![ + Arc::new(col1) as _, + Arc::new(col2) as _, + Arc::new(col3) as _, + ]; + + // Intern batch 2 + let col1 = Int64Array::from(vec![ + // Repeated rows in batch + Some(42), // all not nulls + repeated rows + exist in map case + None, // mixed + repeated rows + exist in map case + None, // mixed + repeated rows + not exist in map case + Some(21142), // mixed + repeated rows + not exist in map case + None, // all nulls + repeated rows + exist in map case + Some(42), + None, + None, + Some(21142), + None, + // Unique rows in batch + Some(4211), // all not nulls + unique rows + exist in map case + None, // mixed + unique rows + exist in map case + None, // mixed + unique rows + not exist in map case + Some(24212), // mixed + unique rows + not exist in map case + ]); + + let col2 = StringArray::from(vec![ + // Repeated rows in batch + Some("string1"), // all not nulls + repeated rows + exist in map case + None, // mixed + repeated rows + exist in map case + Some("2string2"), // mixed + repeated rows + not exist in map case + None, // mixed + repeated rows + not exist in map case + None, // all nulls + repeated rows + exist in map case + Some("string1"), + None, + Some("2string2"), + None, + None, + // Unique rows in batch + Some("string3"), // all not nulls + unique rows + exist in map case + None, // mixed + unique rows + exist in map case + Some("2string4"), // mixed + unique rows + not exist in map case + None, // mixed + unique rows + not exist in map case + ]); + + let col3 = StringViewArray::from(vec![ + // Repeated rows in batch + Some("stringview1"), // all not nulls + repeated rows + exist in map case + Some("stringview2"), // mixed + repeated rows + exist in map case + None, // mixed + repeated rows + not exist in map case + None, // mixed + repeated rows + not exist in map case + None, // all nulls + repeated rows + exist in map case + Some("stringview1"), + Some("stringview2"), + None, + None, + None, + // Unique rows in batch + Some("stringview3"), // all not nulls + unique rows + exist in map case + Some("stringview4"), // mixed + unique rows + exist in map case + None, // mixed + unique rows + not exist in map case + None, // mixed + unique rows + not exist in map case + ]); + let batch2 = vec![ + Arc::new(col1) as _, + Arc::new(col2) as _, + Arc::new(col3) as _, + ]; + + // Intern batch 3 + let col1 = Int64Array::from(vec![ + // Repeated rows in batch + Some(42), // all not nulls + repeated rows + exist in map case + None, // mixed + repeated rows + exist in map case + None, // mixed + repeated rows + not exist in map case + Some(31142), // mixed + repeated rows + not exist in map case + None, // all nulls + repeated rows + exist in map case + Some(42), + None, + None, + Some(31142), + None, + // Unique rows in batch + Some(4211), // all not nulls + unique rows + exist in map case + None, // mixed + unique rows + exist in map case + None, // mixed + unique rows + not exist in map case + Some(34212), // mixed + unique rows + not exist in map case + ]); + + let col2 = StringArray::from(vec![ + // Repeated rows in batch + Some("string1"), // all not nulls + repeated rows + exist in map case + None, // mixed + repeated rows + exist in map case + Some("3string2"), // mixed + repeated rows + not exist in map case + None, // mixed + repeated rows + not exist in map case + None, // all nulls + repeated rows + exist in map case + Some("string1"), + None, + Some("3string2"), + None, + None, + // Unique rows in batch + Some("string3"), // all not nulls + unique rows + exist in map case + None, // mixed + unique rows + exist in map case + Some("3string4"), // mixed + unique rows + not exist in map case + None, // mixed + unique rows + not exist in map case + ]); + + let col3 = StringViewArray::from(vec![ + // Repeated rows in batch + Some("stringview1"), // all not nulls + repeated rows + exist in map case + Some("stringview2"), // mixed + repeated rows + exist in map case + None, // mixed + repeated rows + not exist in map case + None, // mixed + repeated rows + not exist in map case + None, // all nulls + repeated rows + exist in map case + Some("stringview1"), + Some("stringview2"), + None, + None, + None, + // Unique rows in batch + Some("stringview3"), // all not nulls + unique rows + exist in map case + Some("stringview4"), // mixed + unique rows + exist in map case + None, // mixed + unique rows + not exist in map case + None, // mixed + unique rows + not exist in map case + ]); + let batch3 = vec![ + Arc::new(col1) as _, + Arc::new(col2) as _, + Arc::new(col3) as _, + ]; + + // Expected batch + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int64, true), + Field::new("b", DataType::Utf8, true), + Field::new("c", DataType::Utf8View, true), + ])); + + let col1 = Int64Array::from(vec![ + // Repeated rows in batch + Some(42), + None, + None, + Some(1142), + None, + Some(21142), + None, + Some(31142), + None, + // Unique rows in batch + Some(4211), + None, + None, + Some(4212), + None, + Some(24212), + None, + Some(34212), + ]); + + let col2 = StringArray::from(vec![ + // Repeated rows in batch + Some("string1"), + None, + Some("string2"), + None, + Some("2string2"), + None, + Some("3string2"), + None, + None, + // Unique rows in batch + Some("string3"), + None, + Some("string4"), + None, + Some("2string4"), + None, + Some("3string4"), + None, + ]); + + let col3 = StringViewArray::from(vec![ + // Repeated rows in batch + Some("stringview1"), + Some("stringview2"), + None, + None, + None, + None, + None, + None, + None, + // Unique rows in batch + Some("stringview3"), + Some("stringview4"), + None, + None, + None, + None, + None, + None, + ]); + let expected_batch = vec![ + Arc::new(col1) as _, + Arc::new(col2) as _, + Arc::new(col3) as _, + ]; + let expected_batch = RecordBatch::try_new(schema, expected_batch).unwrap(); + + Self { + test_batches: vec![batch1, batch2, batch3], + expected_batch, + } + } + + fn load_to_group_values(&self, group_values: &mut impl GroupValues) { + for batch in self.test_batches.iter() { + group_values.intern(batch, &mut vec![]).unwrap(); + } + } + + fn schema(&self) -> SchemaRef { + self.expected_batch.schema() + } + } + + fn check_result(actual_batch: &RecordBatch, expected_batch: &RecordBatch) { + let formatted_actual_batch = pretty_format_batches(&[actual_batch.clone()]) + .unwrap() + .to_string(); + let mut formatted_actual_batch_sorted: Vec<&str> = + formatted_actual_batch.trim().lines().collect(); + formatted_actual_batch_sorted.sort_unstable(); + + let formatted_expected_batch = pretty_format_batches(&[expected_batch.clone()]) + .unwrap() + .to_string(); + let mut formatted_expected_batch_sorted: Vec<&str> = + formatted_expected_batch.trim().lines().collect(); + formatted_expected_batch_sorted.sort_unstable(); + + for (i, (actual_line, expected_line)) in formatted_actual_batch_sorted + .iter() + .zip(&formatted_expected_batch_sorted) + .enumerate() + { + assert_eq!( + (i, actual_line), + (i, expected_line), + "Inconsistent result\n\n\ + Actual batch:\n{}\n\ + Expected batch:\n{}\n\ + ", + formatted_actual_batch, + formatted_expected_batch, + ); + } + } + + fn insert_inline_group_index_view( + group_values: &mut GroupValuesColumn, + hash_key: u64, + group_index: u64, + ) { + let group_index_view = GroupIndexView::new_inlined(group_index); + group_values.map.insert_accounted( + (hash_key, group_index_view), + |(hash, _)| *hash, + &mut group_values.map_size, + ); + } + + fn insert_non_inline_group_index_view( + group_values: &mut GroupValuesColumn, + hash_key: u64, + group_indices: Vec, + ) { + let list_offset = group_values.group_index_lists.len(); + let group_index_view = GroupIndexView::new_non_inlined(list_offset as u64); + group_values.group_index_lists.push(group_indices); + group_values.map.insert_accounted( + (hash_key, group_index_view), + |(hash, _)| *hash, + &mut group_values.map_size, + ); } } diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index bba59b6d0caa..1f59c617d883 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -29,13 +29,16 @@ use arrow::datatypes::ByteArrayType; use arrow::datatypes::ByteViewType; use arrow::datatypes::DataType; use arrow::datatypes::GenericBinaryType; +use arrow_array::GenericByteArray; use arrow_array::GenericByteViewArray; use arrow_buffer::Buffer; use datafusion_common::utils::proxy::VecAllocExt; +use itertools::izip; use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder; use arrow_array::types::GenericStringType; use datafusion_physical_expr_common::binary_map::{OutputType, INITIAL_BUFFER_CAPACITY}; +use std::iter; use std::marker::PhantomData; use std::mem::{replace, size_of}; use std::sync::Arc; @@ -56,14 +59,40 @@ pub trait GroupColumn: Send + Sync { /// /// Note that this comparison returns true if both elements are NULL fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool; + /// Appends the row at `row` in `array` to this builder fn append_val(&mut self, array: &ArrayRef, row: usize); + + /// The vectorized version equal to + /// + /// When found nth row stored in this builder at `lhs_row` + /// is equal to the row in `array` at `rhs_row`, + /// it will record the `true` result at the corresponding + /// position in `equal_to_results`. + /// + /// And if found nth result in `equal_to_results` is already + /// `false`, the check for nth row will be skipped. + /// + fn vectorized_equal_to( + &self, + lhs_rows: &[usize], + array: &ArrayRef, + rhs_rows: &[usize], + equal_to_results: &mut [bool], + ); + + /// The vectorized version `append_val` + fn vectorized_append(&mut self, array: &ArrayRef, rows: &[usize]); + /// Returns the number of rows stored in this builder fn len(&self) -> usize; + /// Returns the number of bytes used by this [`GroupColumn`] fn size(&self) -> usize; + /// Builds a new array from all of the stored rows fn build(self: Box) -> ArrayRef; + /// Builds a new array from the first `n` stored rows, shifting the /// remaining rows to the start of the builder fn take_n(&mut self, n: usize) -> ArrayRef; @@ -128,6 +157,89 @@ impl GroupColumn } } + fn vectorized_equal_to( + &self, + lhs_rows: &[usize], + array: &ArrayRef, + rhs_rows: &[usize], + equal_to_results: &mut [bool], + ) { + let array = array.as_primitive::(); + + let iter = izip!( + lhs_rows.iter(), + rhs_rows.iter(), + equal_to_results.iter_mut(), + ); + + for (&lhs_row, &rhs_row, equal_to_result) in iter { + // Has found not equal to in previous column, don't need to check + if !*equal_to_result { + continue; + } + + // Perf: skip null check (by short circuit) if input is not nullable + if NULLABLE { + let exist_null = self.nulls.is_null(lhs_row); + let input_null = array.is_null(rhs_row); + if let Some(result) = nulls_equal_to(exist_null, input_null) { + *equal_to_result = result; + continue; + } + // Otherwise, we need to check their values + } + + *equal_to_result = self.group_values[lhs_row] == array.value(rhs_row); + } + } + + fn vectorized_append(&mut self, array: &ArrayRef, rows: &[usize]) { + let arr = array.as_primitive::(); + + let null_count = array.null_count(); + let num_rows = array.len(); + let all_null_or_non_null = if null_count == 0 { + Some(true) + } else if null_count == num_rows { + Some(false) + } else { + None + }; + + match (NULLABLE, all_null_or_non_null) { + (true, None) => { + for &row in rows { + if array.is_null(row) { + self.nulls.append(true); + self.group_values.push(T::default_value()); + } else { + self.nulls.append(false); + self.group_values.push(arr.value(row)); + } + } + } + + (true, Some(true)) => { + self.nulls.append_n(rows.len(), false); + for &row in rows { + self.group_values.push(arr.value(row)); + } + } + + (true, Some(false)) => { + self.nulls.append_n(rows.len(), true); + self.group_values + .extend(iter::repeat(T::default_value()).take(rows.len())); + } + + (false, _) => { + for &row in rows { + self.group_values.push(arr.value(row)); + } + } + } + } + fn len(&self) -> usize { self.group_values.len() } @@ -200,6 +312,14 @@ where } } + fn equal_to_inner(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool + where + B: ByteArrayType, + { + let array = array.as_bytes::(); + self.do_equal_to_inner(lhs_row, array, rhs_row) + } + fn append_val_inner(&mut self, array: &ArrayRef, row: usize) where B: ByteArrayType, @@ -212,17 +332,93 @@ where self.offsets.push(O::usize_as(offset)); } else { self.nulls.append(false); - let value: &[u8] = arr.value(row).as_ref(); - self.buffer.append_slice(value); - self.offsets.push(O::usize_as(self.buffer.len())); + self.do_append_val_inner(arr, row); } } - fn equal_to_inner(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool - where + fn vectorized_equal_to_inner( + &self, + lhs_rows: &[usize], + array: &ArrayRef, + rhs_rows: &[usize], + equal_to_results: &mut [bool], + ) where B: ByteArrayType, { let array = array.as_bytes::(); + + let iter = izip!( + lhs_rows.iter(), + rhs_rows.iter(), + equal_to_results.iter_mut(), + ); + + for (&lhs_row, &rhs_row, equal_to_result) in iter { + // Has found not equal to, don't need to check + if !*equal_to_result { + continue; + } + + *equal_to_result = self.do_equal_to_inner(lhs_row, array, rhs_row); + } + } + + fn vectorized_append_inner(&mut self, array: &ArrayRef, rows: &[usize]) + where + B: ByteArrayType, + { + let arr = array.as_bytes::(); + let null_count = array.null_count(); + let num_rows = array.len(); + let all_null_or_non_null = if null_count == 0 { + Some(true) + } else if null_count == num_rows { + Some(false) + } else { + None + }; + + match all_null_or_non_null { + None => { + for &row in rows { + if arr.is_null(row) { + self.nulls.append(true); + // nulls need a zero length in the offset buffer + let offset = self.buffer.len(); + self.offsets.push(O::usize_as(offset)); + } else { + self.nulls.append(false); + self.do_append_val_inner(arr, row); + } + } + } + + Some(true) => { + self.nulls.append_n(rows.len(), false); + for &row in rows { + self.do_append_val_inner(arr, row); + } + } + + Some(false) => { + self.nulls.append_n(rows.len(), true); + + let new_len = self.offsets.len() + rows.len(); + let offset = self.buffer.len(); + self.offsets.resize(new_len, O::usize_as(offset)); + } + } + } + + fn do_equal_to_inner( + &self, + lhs_row: usize, + array: &GenericByteArray, + rhs_row: usize, + ) -> bool + where + B: ByteArrayType, + { let exist_null = self.nulls.is_null(lhs_row); let input_null = array.is_null(rhs_row); if let Some(result) = nulls_equal_to(exist_null, input_null) { @@ -232,6 +428,15 @@ where self.value(lhs_row) == (array.value(rhs_row).as_ref() as &[u8]) } + fn do_append_val_inner(&mut self, array: &GenericByteArray, row: usize) + where + B: ByteArrayType, + { + let value: &[u8] = array.value(row).as_ref(); + self.buffer.append_slice(value); + self.offsets.push(O::usize_as(self.buffer.len())); + } + /// return the current value of the specified row irrespective of null pub fn value(&self, row: usize) -> &[u8] { let l = self.offsets[row].as_usize(); @@ -287,6 +492,63 @@ where }; } + fn vectorized_equal_to( + &self, + lhs_rows: &[usize], + array: &ArrayRef, + rhs_rows: &[usize], + equal_to_results: &mut [bool], + ) { + // Sanity array type + match self.output_type { + OutputType::Binary => { + debug_assert!(matches!( + array.data_type(), + DataType::Binary | DataType::LargeBinary + )); + self.vectorized_equal_to_inner::>( + lhs_rows, + array, + rhs_rows, + equal_to_results, + ); + } + OutputType::Utf8 => { + debug_assert!(matches!( + array.data_type(), + DataType::Utf8 | DataType::LargeUtf8 + )); + self.vectorized_equal_to_inner::>( + lhs_rows, + array, + rhs_rows, + equal_to_results, + ); + } + _ => unreachable!("View types should use `ArrowBytesViewMap`"), + } + } + + fn vectorized_append(&mut self, column: &ArrayRef, rows: &[usize]) { + match self.output_type { + OutputType::Binary => { + debug_assert!(matches!( + column.data_type(), + DataType::Binary | DataType::LargeBinary + )); + self.vectorized_append_inner::>(column, rows) + } + OutputType::Utf8 => { + debug_assert!(matches!( + column.data_type(), + DataType::Utf8 | DataType::LargeUtf8 + )); + self.vectorized_append_inner::>(column, rows) + } + _ => unreachable!("View types should use `ArrowBytesViewMap`"), + }; + } + fn len(&self) -> usize { self.offsets.len() - 1 } @@ -446,10 +708,12 @@ impl ByteViewGroupValueBuilder { self } - fn append_val_inner(&mut self, array: &ArrayRef, row: usize) - where - B: ByteViewType, - { + fn equal_to_inner(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool { + let array = array.as_byte_view::(); + self.do_equal_to_inner(lhs_row, array, rhs_row) + } + + fn append_val_inner(&mut self, array: &ArrayRef, row: usize) { let arr = array.as_byte_view::(); // Null row case, set and return @@ -461,7 +725,80 @@ impl ByteViewGroupValueBuilder { // Not null row case self.nulls.append(false); - let value: &[u8] = arr.value(row).as_ref(); + self.do_append_val_inner(arr, row); + } + + fn vectorized_equal_to_inner( + &self, + lhs_rows: &[usize], + array: &ArrayRef, + rhs_rows: &[usize], + equal_to_results: &mut [bool], + ) { + let array = array.as_byte_view::(); + + let iter = izip!( + lhs_rows.iter(), + rhs_rows.iter(), + equal_to_results.iter_mut(), + ); + + for (&lhs_row, &rhs_row, equal_to_result) in iter { + // Has found not equal to, don't need to check + if !*equal_to_result { + continue; + } + + *equal_to_result = self.do_equal_to_inner(lhs_row, array, rhs_row); + } + } + + fn vectorized_append_inner(&mut self, array: &ArrayRef, rows: &[usize]) { + let arr = array.as_byte_view::(); + let null_count = array.null_count(); + let num_rows = array.len(); + let all_null_or_non_null = if null_count == 0 { + Some(true) + } else if null_count == num_rows { + Some(false) + } else { + None + }; + + match all_null_or_non_null { + None => { + for &row in rows { + // Null row case, set and return + if arr.is_valid(row) { + self.nulls.append(false); + self.do_append_val_inner(arr, row); + } else { + self.nulls.append(true); + self.views.push(0); + } + } + } + + Some(true) => { + self.nulls.append_n(rows.len(), false); + for &row in rows { + self.do_append_val_inner(arr, row); + } + } + + Some(false) => { + self.nulls.append_n(rows.len(), true); + let new_len = self.views.len() + rows.len(); + self.views.resize(new_len, 0); + } + } + } + + fn do_append_val_inner(&mut self, array: &GenericByteViewArray, row: usize) + where + B: ByteViewType, + { + let value: &[u8] = array.value(row).as_ref(); let value_len = value.len(); let view = if value_len <= 12 { @@ -497,9 +834,12 @@ impl ByteViewGroupValueBuilder { } } - fn equal_to_inner(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool { - let array = array.as_byte_view::(); - + fn do_equal_to_inner( + &self, + lhs_row: usize, + array: &GenericByteViewArray, + rhs_row: usize, + ) -> bool { // Check if nulls equal firstly let exist_null = self.nulls.is_null(lhs_row); let input_null = array.is_null(rhs_row); @@ -777,6 +1117,20 @@ impl GroupColumn for ByteViewGroupValueBuilder { self.append_val_inner(array, row) } + fn vectorized_equal_to( + &self, + group_indices: &[usize], + array: &ArrayRef, + rows: &[usize], + equal_to_results: &mut [bool], + ) { + self.vectorized_equal_to_inner(group_indices, array, rows, equal_to_results); + } + + fn vectorized_append(&mut self, array: &ArrayRef, rows: &[usize]) { + self.vectorized_append_inner(array, rows); + } + fn len(&self) -> usize { self.views.len() } @@ -826,7 +1180,7 @@ mod tests { array::AsArray, datatypes::{Int64Type, StringViewType}, }; - use arrow_array::{ArrayRef, Int64Array, StringArray, StringViewArray}; + use arrow_array::{Array, ArrayRef, Int64Array, StringArray, StringViewArray}; use arrow_buffer::{BooleanBufferBuilder, NullBuffer}; use datafusion_physical_expr::binary_map::OutputType; @@ -836,53 +1190,68 @@ mod tests { use super::{ByteGroupValueBuilder, GroupColumn}; + // ======================================================================== + // Tests for primitive builders + // ======================================================================== #[test] - fn test_take_n() { - let mut builder = ByteGroupValueBuilder::::new(OutputType::Utf8); - let array = Arc::new(StringArray::from(vec![Some("a"), None])) as ArrayRef; - // a, null, null - builder.append_val(&array, 0); - builder.append_val(&array, 1); - builder.append_val(&array, 1); - - // (a, null) remaining: null - let output = builder.take_n(2); - assert_eq!(&output, &array); + fn test_nullable_primitive_equal_to() { + let append = |builder: &mut PrimitiveGroupValueBuilder, + builder_array: &ArrayRef, + append_rows: &[usize]| { + for &index in append_rows { + builder.append_val(builder_array, index); + } + }; - // null, a, null, a - builder.append_val(&array, 0); - builder.append_val(&array, 1); - builder.append_val(&array, 0); + let equal_to = |builder: &PrimitiveGroupValueBuilder, + lhs_rows: &[usize], + input_array: &ArrayRef, + rhs_rows: &[usize], + equal_to_results: &mut Vec| { + let iter = lhs_rows.iter().zip(rhs_rows.iter()); + for (idx, (&lhs_row, &rhs_row)) in iter.enumerate() { + equal_to_results[idx] = builder.equal_to(lhs_row, input_array, rhs_row); + } + }; - // (null, a) remaining: (null, a) - let output = builder.take_n(2); - let array = Arc::new(StringArray::from(vec![None, Some("a")])) as ArrayRef; - assert_eq!(&output, &array); + test_nullable_primitive_equal_to_internal(append, equal_to); + } - let array = Arc::new(StringArray::from(vec![ - Some("a"), - None, - Some("longstringfortest"), - ])) as ArrayRef; + #[test] + fn test_nullable_primitive_vectorized_equal_to() { + let append = |builder: &mut PrimitiveGroupValueBuilder, + builder_array: &ArrayRef, + append_rows: &[usize]| { + builder.vectorized_append(builder_array, append_rows); + }; - // null, a, longstringfortest, null, null - builder.append_val(&array, 2); - builder.append_val(&array, 1); - builder.append_val(&array, 1); + let equal_to = |builder: &PrimitiveGroupValueBuilder, + lhs_rows: &[usize], + input_array: &ArrayRef, + rhs_rows: &[usize], + equal_to_results: &mut Vec| { + builder.vectorized_equal_to( + lhs_rows, + input_array, + rhs_rows, + equal_to_results, + ); + }; - // (null, a, longstringfortest, null) remaining: (null) - let output = builder.take_n(4); - let array = Arc::new(StringArray::from(vec![ - None, - Some("a"), - Some("longstringfortest"), - None, - ])) as ArrayRef; - assert_eq!(&output, &array); + test_nullable_primitive_equal_to_internal(append, equal_to); } - #[test] - fn test_nullable_primitive_equal_to() { + fn test_nullable_primitive_equal_to_internal(mut append: A, mut equal_to: E) + where + A: FnMut(&mut PrimitiveGroupValueBuilder, &ArrayRef, &[usize]), + E: FnMut( + &PrimitiveGroupValueBuilder, + &[usize], + &ArrayRef, + &[usize], + &mut Vec, + ), + { // Will cover such cases: // - exist null, input not null // - exist null, input null; values not equal @@ -901,12 +1270,7 @@ mod tests { Some(2), Some(3), ])) as ArrayRef; - builder.append_val(&builder_array, 0); - builder.append_val(&builder_array, 1); - builder.append_val(&builder_array, 2); - builder.append_val(&builder_array, 3); - builder.append_val(&builder_array, 4); - builder.append_val(&builder_array, 5); + append(&mut builder, &builder_array, &[0, 1, 2, 3, 4, 5]); // Define input array let (_nulls, values, _) = @@ -925,16 +1289,82 @@ mod tests { let input_array = Arc::new(Int64Array::new(values, Some(nulls))) as ArrayRef; // Check - assert!(!builder.equal_to(0, &input_array, 0)); - assert!(builder.equal_to(1, &input_array, 1)); - assert!(builder.equal_to(2, &input_array, 2)); - assert!(!builder.equal_to(3, &input_array, 3)); - assert!(!builder.equal_to(4, &input_array, 4)); - assert!(builder.equal_to(5, &input_array, 5)); + let mut equal_to_results = vec![true; builder.len()]; + equal_to( + &builder, + &[0, 1, 2, 3, 4, 5], + &input_array, + &[0, 1, 2, 3, 4, 5], + &mut equal_to_results, + ); + + assert!(!equal_to_results[0]); + assert!(equal_to_results[1]); + assert!(equal_to_results[2]); + assert!(!equal_to_results[3]); + assert!(!equal_to_results[4]); + assert!(equal_to_results[5]); } #[test] fn test_not_nullable_primitive_equal_to() { + let append = |builder: &mut PrimitiveGroupValueBuilder, + builder_array: &ArrayRef, + append_rows: &[usize]| { + for &index in append_rows { + builder.append_val(builder_array, index); + } + }; + + let equal_to = |builder: &PrimitiveGroupValueBuilder, + lhs_rows: &[usize], + input_array: &ArrayRef, + rhs_rows: &[usize], + equal_to_results: &mut Vec| { + let iter = lhs_rows.iter().zip(rhs_rows.iter()); + for (idx, (&lhs_row, &rhs_row)) in iter.enumerate() { + equal_to_results[idx] = builder.equal_to(lhs_row, input_array, rhs_row); + } + }; + + test_not_nullable_primitive_equal_to_internal(append, equal_to); + } + + #[test] + fn test_not_nullable_primitive_vectorized_equal_to() { + let append = |builder: &mut PrimitiveGroupValueBuilder, + builder_array: &ArrayRef, + append_rows: &[usize]| { + builder.vectorized_append(builder_array, append_rows); + }; + + let equal_to = |builder: &PrimitiveGroupValueBuilder, + lhs_rows: &[usize], + input_array: &ArrayRef, + rhs_rows: &[usize], + equal_to_results: &mut Vec| { + builder.vectorized_equal_to( + lhs_rows, + input_array, + rhs_rows, + equal_to_results, + ); + }; + + test_not_nullable_primitive_equal_to_internal(append, equal_to); + } + + fn test_not_nullable_primitive_equal_to_internal(mut append: A, mut equal_to: E) + where + A: FnMut(&mut PrimitiveGroupValueBuilder, &ArrayRef, &[usize]), + E: FnMut( + &PrimitiveGroupValueBuilder, + &[usize], + &ArrayRef, + &[usize], + &mut Vec, + ), + { // Will cover such cases: // - values equal // - values not equal @@ -943,19 +1373,244 @@ mod tests { let mut builder = PrimitiveGroupValueBuilder::::new(); let builder_array = Arc::new(Int64Array::from(vec![Some(0), Some(1)])) as ArrayRef; - builder.append_val(&builder_array, 0); - builder.append_val(&builder_array, 1); + append(&mut builder, &builder_array, &[0, 1]); // Define input array let input_array = Arc::new(Int64Array::from(vec![Some(0), Some(2)])) as ArrayRef; // Check - assert!(builder.equal_to(0, &input_array, 0)); - assert!(!builder.equal_to(1, &input_array, 1)); + let mut equal_to_results = vec![true; builder.len()]; + equal_to( + &builder, + &[0, 1], + &input_array, + &[0, 1], + &mut equal_to_results, + ); + + assert!(equal_to_results[0]); + assert!(!equal_to_results[1]); } #[test] - fn test_byte_array_equal_to() { + fn test_nullable_primitive_vectorized_operation_special_case() { + // Test the special `all nulls` or `not nulls` input array case + // for vectorized append and equal to + + let mut builder = PrimitiveGroupValueBuilder::::new(); + + // All nulls input array + let all_nulls_input_array = Arc::new(Int64Array::from(vec![ + Option::::None, + None, + None, + None, + None, + ])) as _; + builder.vectorized_append(&all_nulls_input_array, &[0, 1, 2, 3, 4]); + + let mut equal_to_results = vec![true; all_nulls_input_array.len()]; + builder.vectorized_equal_to( + &[0, 1, 2, 3, 4], + &all_nulls_input_array, + &[0, 1, 2, 3, 4], + &mut equal_to_results, + ); + + assert!(equal_to_results[0]); + assert!(equal_to_results[1]); + assert!(equal_to_results[2]); + assert!(equal_to_results[3]); + assert!(equal_to_results[4]); + + // All not nulls input array + let all_not_nulls_input_array = Arc::new(Int64Array::from(vec![ + Some(1), + Some(2), + Some(3), + Some(4), + Some(5), + ])) as _; + builder.vectorized_append(&all_not_nulls_input_array, &[0, 1, 2, 3, 4]); + + let mut equal_to_results = vec![true; all_not_nulls_input_array.len()]; + builder.vectorized_equal_to( + &[5, 6, 7, 8, 9], + &all_not_nulls_input_array, + &[0, 1, 2, 3, 4], + &mut equal_to_results, + ); + + assert!(equal_to_results[0]); + assert!(equal_to_results[1]); + assert!(equal_to_results[2]); + assert!(equal_to_results[3]); + assert!(equal_to_results[4]); + } + + // ======================================================================== + // Tests for byte builders + // ======================================================================== + #[test] + fn test_byte_take_n() { + let mut builder = ByteGroupValueBuilder::::new(OutputType::Utf8); + let array = Arc::new(StringArray::from(vec![Some("a"), None])) as ArrayRef; + // a, null, null + builder.append_val(&array, 0); + builder.append_val(&array, 1); + builder.append_val(&array, 1); + + // (a, null) remaining: null + let output = builder.take_n(2); + assert_eq!(&output, &array); + + // null, a, null, a + builder.append_val(&array, 0); + builder.append_val(&array, 1); + builder.append_val(&array, 0); + + // (null, a) remaining: (null, a) + let output = builder.take_n(2); + let array = Arc::new(StringArray::from(vec![None, Some("a")])) as ArrayRef; + assert_eq!(&output, &array); + + let array = Arc::new(StringArray::from(vec![ + Some("a"), + None, + Some("longstringfortest"), + ])) as ArrayRef; + + // null, a, longstringfortest, null, null + builder.append_val(&array, 2); + builder.append_val(&array, 1); + builder.append_val(&array, 1); + + // (null, a, longstringfortest, null) remaining: (null) + let output = builder.take_n(4); + let array = Arc::new(StringArray::from(vec![ + None, + Some("a"), + Some("longstringfortest"), + None, + ])) as ArrayRef; + assert_eq!(&output, &array); + } + + #[test] + fn test_byte_equal_to() { + let append = |builder: &mut ByteGroupValueBuilder, + builder_array: &ArrayRef, + append_rows: &[usize]| { + for &index in append_rows { + builder.append_val(builder_array, index); + } + }; + + let equal_to = |builder: &ByteGroupValueBuilder, + lhs_rows: &[usize], + input_array: &ArrayRef, + rhs_rows: &[usize], + equal_to_results: &mut Vec| { + let iter = lhs_rows.iter().zip(rhs_rows.iter()); + for (idx, (&lhs_row, &rhs_row)) in iter.enumerate() { + equal_to_results[idx] = builder.equal_to(lhs_row, input_array, rhs_row); + } + }; + + test_byte_equal_to_internal(append, equal_to); + } + + #[test] + fn test_byte_vectorized_equal_to() { + let append = |builder: &mut ByteGroupValueBuilder, + builder_array: &ArrayRef, + append_rows: &[usize]| { + builder.vectorized_append(builder_array, append_rows); + }; + + let equal_to = |builder: &ByteGroupValueBuilder, + lhs_rows: &[usize], + input_array: &ArrayRef, + rhs_rows: &[usize], + equal_to_results: &mut Vec| { + builder.vectorized_equal_to( + lhs_rows, + input_array, + rhs_rows, + equal_to_results, + ); + }; + + test_byte_equal_to_internal(append, equal_to); + } + + #[test] + fn test_byte_vectorized_operation_special_case() { + // Test the special `all nulls` or `not nulls` input array case + // for vectorized append and equal to + + let mut builder = ByteGroupValueBuilder::::new(OutputType::Utf8); + + // All nulls input array + let all_nulls_input_array = Arc::new(StringArray::from(vec![ + Option::<&str>::None, + None, + None, + None, + None, + ])) as _; + builder.vectorized_append(&all_nulls_input_array, &[0, 1, 2, 3, 4]); + + let mut equal_to_results = vec![true; all_nulls_input_array.len()]; + builder.vectorized_equal_to( + &[0, 1, 2, 3, 4], + &all_nulls_input_array, + &[0, 1, 2, 3, 4], + &mut equal_to_results, + ); + + assert!(equal_to_results[0]); + assert!(equal_to_results[1]); + assert!(equal_to_results[2]); + assert!(equal_to_results[3]); + assert!(equal_to_results[4]); + + // All not nulls input array + let all_not_nulls_input_array = Arc::new(StringArray::from(vec![ + Some("string1"), + Some("string2"), + Some("string3"), + Some("string4"), + Some("string5"), + ])) as _; + builder.vectorized_append(&all_not_nulls_input_array, &[0, 1, 2, 3, 4]); + + let mut equal_to_results = vec![true; all_not_nulls_input_array.len()]; + builder.vectorized_equal_to( + &[5, 6, 7, 8, 9], + &all_not_nulls_input_array, + &[0, 1, 2, 3, 4], + &mut equal_to_results, + ); + + assert!(equal_to_results[0]); + assert!(equal_to_results[1]); + assert!(equal_to_results[2]); + assert!(equal_to_results[3]); + assert!(equal_to_results[4]); + } + + fn test_byte_equal_to_internal(mut append: A, mut equal_to: E) + where + A: FnMut(&mut ByteGroupValueBuilder, &ArrayRef, &[usize]), + E: FnMut( + &ByteGroupValueBuilder, + &[usize], + &ArrayRef, + &[usize], + &mut Vec, + ), + { // Will cover such cases: // - exist null, input not null // - exist null, input null; values not equal @@ -974,12 +1629,7 @@ mod tests { Some("bar"), Some("baz"), ])) as ArrayRef; - builder.append_val(&builder_array, 0); - builder.append_val(&builder_array, 1); - builder.append_val(&builder_array, 2); - builder.append_val(&builder_array, 3); - builder.append_val(&builder_array, 4); - builder.append_val(&builder_array, 5); + append(&mut builder, &builder_array, &[0, 1, 2, 3, 4, 5]); // Define input array let (offsets, buffer, _nulls) = StringArray::from(vec![ @@ -1005,14 +1655,26 @@ mod tests { Arc::new(StringArray::new(offsets, buffer, Some(nulls))) as ArrayRef; // Check - assert!(!builder.equal_to(0, &input_array, 0)); - assert!(builder.equal_to(1, &input_array, 1)); - assert!(builder.equal_to(2, &input_array, 2)); - assert!(!builder.equal_to(3, &input_array, 3)); - assert!(!builder.equal_to(4, &input_array, 4)); - assert!(builder.equal_to(5, &input_array, 5)); + let mut equal_to_results = vec![true; builder.len()]; + equal_to( + &builder, + &[0, 1, 2, 3, 4, 5], + &input_array, + &[0, 1, 2, 3, 4, 5], + &mut equal_to_results, + ); + + assert!(!equal_to_results[0]); + assert!(equal_to_results[1]); + assert!(equal_to_results[2]); + assert!(!equal_to_results[3]); + assert!(!equal_to_results[4]); + assert!(equal_to_results[5]); } + // ======================================================================== + // Tests for byte view builders + // ======================================================================== #[test] fn test_byte_view_append_val() { let mut builder = @@ -1033,12 +1695,126 @@ mod tests { let output = Box::new(builder).build(); // should be 2 output buffers to hold all the data - assert_eq!(output.as_string_view().data_buffers().len(), 2,); + assert_eq!(output.as_string_view().data_buffers().len(), 2); assert_eq!(&output, &builder_array) } #[test] fn test_byte_view_equal_to() { + let append = |builder: &mut ByteViewGroupValueBuilder, + builder_array: &ArrayRef, + append_rows: &[usize]| { + for &index in append_rows { + builder.append_val(builder_array, index); + } + }; + + let equal_to = |builder: &ByteViewGroupValueBuilder, + lhs_rows: &[usize], + input_array: &ArrayRef, + rhs_rows: &[usize], + equal_to_results: &mut Vec| { + let iter = lhs_rows.iter().zip(rhs_rows.iter()); + for (idx, (&lhs_row, &rhs_row)) in iter.enumerate() { + equal_to_results[idx] = builder.equal_to(lhs_row, input_array, rhs_row); + } + }; + + test_byte_view_equal_to_internal(append, equal_to); + } + + #[test] + fn test_byte_view_vectorized_equal_to() { + let append = |builder: &mut ByteViewGroupValueBuilder, + builder_array: &ArrayRef, + append_rows: &[usize]| { + builder.vectorized_append(builder_array, append_rows); + }; + + let equal_to = |builder: &ByteViewGroupValueBuilder, + lhs_rows: &[usize], + input_array: &ArrayRef, + rhs_rows: &[usize], + equal_to_results: &mut Vec| { + builder.vectorized_equal_to( + lhs_rows, + input_array, + rhs_rows, + equal_to_results, + ); + }; + + test_byte_view_equal_to_internal(append, equal_to); + } + + #[test] + fn test_byte_view_vectorized_operation_special_case() { + // Test the special `all nulls` or `not nulls` input array case + // for vectorized append and equal to + + let mut builder = + ByteViewGroupValueBuilder::::new().with_max_block_size(60); + + // All nulls input array + let all_nulls_input_array = Arc::new(StringViewArray::from(vec![ + Option::<&str>::None, + None, + None, + None, + None, + ])) as _; + builder.vectorized_append(&all_nulls_input_array, &[0, 1, 2, 3, 4]); + + let mut equal_to_results = vec![true; all_nulls_input_array.len()]; + builder.vectorized_equal_to( + &[0, 1, 2, 3, 4], + &all_nulls_input_array, + &[0, 1, 2, 3, 4], + &mut equal_to_results, + ); + + assert!(equal_to_results[0]); + assert!(equal_to_results[1]); + assert!(equal_to_results[2]); + assert!(equal_to_results[3]); + assert!(equal_to_results[4]); + + // All not nulls input array + let all_not_nulls_input_array = Arc::new(StringViewArray::from(vec![ + Some("stringview1"), + Some("stringview2"), + Some("stringview3"), + Some("stringview4"), + Some("stringview5"), + ])) as _; + builder.vectorized_append(&all_not_nulls_input_array, &[0, 1, 2, 3, 4]); + + let mut equal_to_results = vec![true; all_not_nulls_input_array.len()]; + builder.vectorized_equal_to( + &[5, 6, 7, 8, 9], + &all_not_nulls_input_array, + &[0, 1, 2, 3, 4], + &mut equal_to_results, + ); + + assert!(equal_to_results[0]); + assert!(equal_to_results[1]); + assert!(equal_to_results[2]); + assert!(equal_to_results[3]); + assert!(equal_to_results[4]); + } + + fn test_byte_view_equal_to_internal(mut append: A, mut equal_to: E) + where + A: FnMut(&mut ByteViewGroupValueBuilder, &ArrayRef, &[usize]), + E: FnMut( + &ByteViewGroupValueBuilder, + &[usize], + &ArrayRef, + &[usize], + &mut Vec, + ), + { // Will cover such cases: // - exist null, input not null // - exist null, input null; values not equal @@ -1078,15 +1854,7 @@ mod tests { Some("I am a long string for test eq in completed"), Some("I am a long string for test eq in progress"), ])) as ArrayRef; - builder.append_val(&builder_array, 0); - builder.append_val(&builder_array, 1); - builder.append_val(&builder_array, 2); - builder.append_val(&builder_array, 3); - builder.append_val(&builder_array, 4); - builder.append_val(&builder_array, 5); - builder.append_val(&builder_array, 6); - builder.append_val(&builder_array, 7); - builder.append_val(&builder_array, 8); + append(&mut builder, &builder_array, &[0, 1, 2, 3, 4, 5, 6, 7, 8]); // Define input array let (views, buffer, _nulls) = StringViewArray::from(vec![ @@ -1124,18 +1892,27 @@ mod tests { Arc::new(StringViewArray::new(views, buffer, Some(nulls))) as ArrayRef; // Check - assert!(!builder.equal_to(0, &input_array, 0)); - assert!(builder.equal_to(1, &input_array, 1)); - assert!(builder.equal_to(2, &input_array, 2)); - assert!(!builder.equal_to(3, &input_array, 3)); - assert!(!builder.equal_to(4, &input_array, 4)); - assert!(!builder.equal_to(5, &input_array, 5)); - assert!(builder.equal_to(6, &input_array, 6)); - assert!(!builder.equal_to(7, &input_array, 7)); - assert!(!builder.equal_to(7, &input_array, 8)); - assert!(builder.equal_to(7, &input_array, 9)); - assert!(!builder.equal_to(8, &input_array, 10)); - assert!(builder.equal_to(8, &input_array, 11)); + let mut equal_to_results = vec![true; input_array.len()]; + equal_to( + &builder, + &[0, 1, 2, 3, 4, 5, 6, 7, 7, 7, 8, 8], + &input_array, + &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11], + &mut equal_to_results, + ); + + assert!(!equal_to_results[0]); + assert!(equal_to_results[1]); + assert!(equal_to_results[2]); + assert!(!equal_to_results[3]); + assert!(!equal_to_results[4]); + assert!(!equal_to_results[5]); + assert!(equal_to_results[6]); + assert!(!equal_to_results[7]); + assert!(!equal_to_results[8]); + assert!(equal_to_results[9]); + assert!(!equal_to_results[10]); + assert!(equal_to_results[11]); } #[test] diff --git a/datafusion/physical-plan/src/aggregates/group_values/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/mod.rs index fb7b66775092..aefd9c162246 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/mod.rs @@ -37,6 +37,8 @@ mod bytes_view; use bytes::GroupValuesByes; use datafusion_physical_expr::binary_map::OutputType; +use crate::aggregates::order::GroupOrdering; + mod group_column; mod null_builder; @@ -105,7 +107,10 @@ pub trait GroupValues: Send { } /// Return a specialized implementation of [`GroupValues`] for the given schema. -pub fn new_group_values(schema: SchemaRef) -> Result> { +pub fn new_group_values( + schema: SchemaRef, + group_ordering: &GroupOrdering, +) -> Result> { if schema.fields.len() == 1 { let d = schema.fields[0].data_type(); @@ -143,8 +148,12 @@ pub fn new_group_values(schema: SchemaRef) -> Result> { } } - if GroupValuesColumn::supported_schema(schema.as_ref()) { - Ok(Box::new(GroupValuesColumn::try_new(schema)?)) + if column::supported_schema(schema.as_ref()) { + if matches!(group_ordering, GroupOrdering::None) { + Ok(Box::new(GroupValuesColumn::::try_new(schema)?)) + } else { + Ok(Box::new(GroupValuesColumn::::try_new(schema)?)) + } } else { Ok(Box::new(GroupValuesRows::try_new(schema)?)) } diff --git a/datafusion/physical-plan/src/aggregates/group_values/null_builder.rs b/datafusion/physical-plan/src/aggregates/group_values/null_builder.rs index 0249390f38cd..a584cf58e50a 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/null_builder.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/null_builder.rs @@ -70,6 +70,24 @@ impl MaybeNullBufferBuilder { } } + pub fn append_n(&mut self, n: usize, is_null: bool) { + match self { + Self::NoNulls { row_count } if is_null => { + // have seen no nulls so far, this is the first null, + // need to create the nulls buffer for all currently valid values + // alloc 2x the need given we push a new but immediately + let mut nulls = BooleanBufferBuilder::new(*row_count * 2); + nulls.append_n(*row_count, true); + nulls.append_n(n, false); + *self = Self::Nulls(nulls); + } + Self::NoNulls { row_count } => { + *row_count += n; + } + Self::Nulls(builder) => builder.append_n(n, !is_null), + } + } + /// return the number of heap allocated bytes used by this structure to store boolean values pub fn allocated_size(&self) -> usize { match self { diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index fe05f7375ed3..0fa9f206f13d 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -514,7 +514,7 @@ impl GroupedHashAggregateStream { ordering.as_ref(), )?; - let group_values = new_group_values(group_schema)?; + let group_values = new_group_values(group_schema, &group_ordering)?; timer.done(); let exec_state = ExecutionState::ReadingInput; diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index 531563717482..ecfbaee23537 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -29,6 +29,7 @@ use prost::Message; use std::any::Any; use std::collections::HashMap; use std::fmt::{self, Debug, Formatter}; +use std::mem::size_of_val; use std::sync::Arc; use std::vec; diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs index 5687c9af540a..d4e2d48885ae 100644 --- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs @@ -23,6 +23,7 @@ use datafusion_substrait::logical_plan::{ consumer::from_substrait_plan, producer::to_substrait_plan, }; use std::cmp::Ordering; +use std::mem::size_of_val; use datafusion::arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit}; use datafusion::common::{not_impl_err, plan_err, DFSchema, DFSchemaRef};