From ca033e09ba15ff66e38c9821c52e1a6c61c76497 Mon Sep 17 00:00:00 2001 From: kamille Date: Tue, 8 Oct 2024 14:35:27 +0800 Subject: [PATCH 01/26] define `ByteGroupValueViewBuilder`. --- .../aggregates/group_values/group_column.rs | 66 +++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index 5d00f300e960..4092809b4fb6 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -20,12 +20,20 @@ use arrow::array::GenericBinaryArray; use arrow::array::GenericStringArray; use arrow::array::OffsetSizeTrait; use arrow::array::PrimitiveArray; +use arrow::array::PrimitiveBuilder; +use arrow::array::StringBuilder; +use arrow::array::StringViewBuilder; use arrow::array::{Array, ArrayRef, ArrowPrimitiveType, AsArray}; use arrow::buffer::OffsetBuffer; use arrow::buffer::ScalarBuffer; use arrow::datatypes::ByteArrayType; +use arrow::datatypes::ByteViewType; use arrow::datatypes::DataType; use arrow::datatypes::GenericBinaryType; +use arrow_array::BinaryViewArray; +use arrow_array::GenericByteViewArray; +use arrow_array::StringViewArray; +use arrow_buffer::Buffer; use datafusion_common::utils::proxy::VecAllocExt; use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder; @@ -376,6 +384,64 @@ where } } +/// An implementation of [`GroupColumn`] for binary view and utf8 view types. +/// +/// Stores a collection of binary view or utf8 view group values in a buffer +/// whose structure is similar to `GenericByteViewArray`, and we can get benefits: +/// +/// 1. Efficient comparison of incoming rows to existing rows +/// 2. Efficient construction of the final output array +/// 3. Efficient to perform `take_n` comparing to use `GenericByteViewBuilder` +pub struct ByteGroupValueViewBuilder { + output_type: OutputType, + + /// The views of string values + /// + /// If string len <= 12, the view's format will be: + /// string(12B) | len(4B) + /// + /// If string len > 12, its format will be: + /// offset(4B) | buffer_index(4B) | prefix(4B) | len(4B) + views: Vec, + + /// The progressing block + /// + /// New values will be inserted into it until its capacity + /// is not enough(detail can see `max_block_size`). + in_progress: Vec, + + /// The completed blocks + completed: Vec, + + /// The max size of `in_progress` + /// + /// `in_progress` will be flushed into `completed`, and create new `in_progress` + /// when found its remaining capacity(`max_block_size` - `len(in_progress)`), + /// is no enough to store the appended value. + max_block_size: usize, + + /// Nulls + nulls: MaybeNullBufferBuilder, +} + +// impl ByteGroupValueViewBuilder { +// fn append_val_inner(&mut self, array: &ArrayRef, row: usize) +// where +// B: ByteViewType, +// { +// let arr = array.as_byte_view::(); +// if arr.is_null(row) { +// self.nulls.append(true); +// self.views.push(0); +// } else { +// self.nulls.append(false); +// let value: &[u8] = arr.value(row).as_ref(); +// self.buffer.append_slice(value); +// self.offsets.push(O::usize_as(self.buffer.len())); +// } +// } +// } + /// Determines if the nullability of the existing and new input array can be used /// to short-circuit the comparison of the two values. /// From ffcc1a25a427999b36c72bc6a9d28521aad3643b Mon Sep 17 00:00:00 2001 From: kamille Date: Tue, 8 Oct 2024 17:07:06 +0800 Subject: [PATCH 02/26] impl append. --- .../aggregates/group_values/group_column.rs | 71 ++++++++++++++----- 1 file changed, 54 insertions(+), 17 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index 4092809b4fb6..e39afa5ba739 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use arrow::array::make_view; use arrow::array::BufferBuilder; use arrow::array::GenericBinaryArray; use arrow::array::GenericStringArray; @@ -39,6 +40,7 @@ use datafusion_common::utils::proxy::VecAllocExt; use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder; use arrow_array::types::GenericStringType; use datafusion_physical_expr_common::binary_map::{OutputType, INITIAL_BUFFER_CAPACITY}; +use std::mem; use std::sync::Arc; use std::vec; @@ -424,23 +426,58 @@ pub struct ByteGroupValueViewBuilder { nulls: MaybeNullBufferBuilder, } -// impl ByteGroupValueViewBuilder { -// fn append_val_inner(&mut self, array: &ArrayRef, row: usize) -// where -// B: ByteViewType, -// { -// let arr = array.as_byte_view::(); -// if arr.is_null(row) { -// self.nulls.append(true); -// self.views.push(0); -// } else { -// self.nulls.append(false); -// let value: &[u8] = arr.value(row).as_ref(); -// self.buffer.append_slice(value); -// self.offsets.push(O::usize_as(self.buffer.len())); -// } -// } -// } +impl ByteGroupValueViewBuilder { + fn append_val_inner(&mut self, array: &ArrayRef, row: usize) + where + B: ByteViewType, + { + let arr = array.as_byte_view::(); + + // If a null row, set and return + if arr.is_null(row) { + self.nulls.append(true); + self.views.push(0); + return; + } + + // Not null case + self.nulls.append(false); + let value: &[u8] = arr.value(row).as_ref(); + + let value_len = value.len(); + let view = if value_len > 12 { + // Ensure big enough block to hold the value firstly + self.ensure_in_progress_big_enough(value_len); + + // Append value + let block_id = self.completed.len(); + let offset = self.in_progress.len(); + self.in_progress.extend_from_slice(value); + + make_view(value, block_id, offset) + } else { + make_view(value, 0, 0) + }; + + // Append view + self.views.push(view); + } + + fn ensure_in_progress_big_enough(&mut self, value_len: usize) { + debug_assert!(value_len > 12); + let require_cap = self.in_progress.len() + value_len; + + // If current block isn't big enough, flush it and create a new in progress block + if require_cap > self.max_block_size { + let flushed_block = mem::replace( + &mut self.in_progress, + Vec::with_capacity(self.max_block_size), + ); + let buffer = Buffer::from_vec(flushed_block); + self.completed.push(buffer); + } + } +} /// Determines if the nullability of the existing and new input array can be used /// to short-circuit the comparison of the two values. From 48429654c9d3a2278b680168cd767a68ffe00ce9 Mon Sep 17 00:00:00 2001 From: kamille Date: Tue, 8 Oct 2024 19:01:46 +0800 Subject: [PATCH 03/26] impl equal to. --- .../aggregates/group_values/group_column.rs | 84 ++++++++++++++++++- 1 file changed, 81 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index e39afa5ba739..a4361cd1ab20 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -17,6 +17,7 @@ use arrow::array::make_view; use arrow::array::BufferBuilder; +use arrow::array::ByteView; use arrow::array::GenericBinaryArray; use arrow::array::GenericStringArray; use arrow::array::OffsetSizeTrait; @@ -445,7 +446,9 @@ impl ByteGroupValueViewBuilder { let value: &[u8] = arr.value(row).as_ref(); let value_len = value.len(); - let view = if value_len > 12 { + let view = if value_len <= 12 { + make_view(value, 0, 0) + } else { // Ensure big enough block to hold the value firstly self.ensure_in_progress_big_enough(value_len); @@ -455,8 +458,6 @@ impl ByteGroupValueViewBuilder { self.in_progress.extend_from_slice(value); make_view(value, block_id, offset) - } else { - make_view(value, 0, 0) }; // Append view @@ -477,6 +478,83 @@ impl ByteGroupValueViewBuilder { self.completed.push(buffer); } } + + fn equal_to_inner(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool + where + B: ByteViewType, + { + let array = array.as_byte_view::(); + + // Check if nulls equal firstly + let exist_null = self.nulls.is_null(lhs_row); + let input_null = array.is_null(rhs_row); + if let Some(result) = nulls_equal_to(exist_null, input_null) { + return result; + } + + // Otherwise, we need to check their values + let exist_view = self.views[lhs_row]; + let exist_view_len = exist_view as u32; + + let input_view = array.views()[rhs_row]; + let input_view_len = input_view as u32; + + // The check logic + // - Check len equality + // - If non-inlined, check prefix and then check value in buffer + // when needed + // - If inlined, check inlined value + if exist_view_len != input_view_len { + return false; + } + + if exist_view_len <= 12 { + let exist_inline = unsafe { + GenericByteViewArray::::inline_value( + &exist_view, + exist_view_len as usize, + ) + }; + let input_inline = unsafe { + GenericByteViewArray::::inline_value( + &input_view, + input_view_len as usize, + ) + }; + exist_inline == input_inline + } else { + let exist_prefix = + unsafe { GenericByteViewArray::::inline_value(&exist_view, 4) }; + let input_prefix = + unsafe { GenericByteViewArray::::inline_value(&input_view, 4) }; + + if exist_prefix != input_prefix { + return false; + } + + let exist_full = { + let byte_view = ByteView::from(exist_view); + self.value( + byte_view.buffer_index as usize, + byte_view.offset as usize, + byte_view.length as usize, + ) + }; + let input_full: &[u8] = unsafe { array.value_unchecked(rhs_row).as_ref() }; + exist_full == input_full + } + } + + fn value(&self, buffer_index: usize, offset: usize, length: usize) -> &[u8] { + debug_assert!(buffer_index <= self.completed.len()); + + if buffer_index < self.completed.len() { + let block = &self.completed[buffer_index]; + &block[offset..offset + length] + } else { + &self.in_progress[offset..offset + length] + } + } } /// Determines if the nullability of the existing and new input array can be used From 66bb7be75b1ec8952e6b94535bc10e9af6623f9f Mon Sep 17 00:00:00 2001 From: kamille Date: Thu, 10 Oct 2024 01:52:09 +0800 Subject: [PATCH 04/26] fix compile. --- .../src/aggregates/group_values/group_column.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index a4361cd1ab20..61894ffe4a05 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -453,11 +453,11 @@ impl ByteGroupValueViewBuilder { self.ensure_in_progress_big_enough(value_len); // Append value - let block_id = self.completed.len(); + let buffer_index = self.completed.len(); let offset = self.in_progress.len(); self.in_progress.extend_from_slice(value); - make_view(value, block_id, offset) + make_view(value, buffer_index as u32, offset as u32) }; // Append view @@ -510,13 +510,13 @@ impl ByteGroupValueViewBuilder { if exist_view_len <= 12 { let exist_inline = unsafe { - GenericByteViewArray::::inline_value( + GenericByteViewArray::::inline_value( &exist_view, exist_view_len as usize, ) }; let input_inline = unsafe { - GenericByteViewArray::::inline_value( + GenericByteViewArray::::inline_value( &input_view, input_view_len as usize, ) From ef1efcecf682bb4e39b6f9f91555075650572ea9 Mon Sep 17 00:00:00 2001 From: kamille Date: Thu, 10 Oct 2024 02:13:06 +0800 Subject: [PATCH 05/26] fix comments. --- .../physical-plan/src/aggregates/group_values/group_column.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index 61894ffe4a05..13fc0865115f 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -501,9 +501,9 @@ impl ByteGroupValueViewBuilder { // The check logic // - Check len equality + // - If inlined, check inlined value // - If non-inlined, check prefix and then check value in buffer // when needed - // - If inlined, check inlined value if exist_view_len != input_view_len { return false; } From 152a8b1a847ceec9e943a00ec5e2f18fcd2240e4 Mon Sep 17 00:00:00 2001 From: kamille Date: Sat, 12 Oct 2024 03:40:20 +0800 Subject: [PATCH 06/26] impl take_n. --- .../aggregates/group_values/group_column.rs | 125 ++++++++++++++++++ 1 file changed, 125 insertions(+) diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index 13fc0865115f..8276ee0067e9 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -28,10 +28,12 @@ use arrow::array::StringViewBuilder; use arrow::array::{Array, ArrayRef, ArrowPrimitiveType, AsArray}; use arrow::buffer::OffsetBuffer; use arrow::buffer::ScalarBuffer; +use arrow::datatypes::BinaryViewType; use arrow::datatypes::ByteArrayType; use arrow::datatypes::ByteViewType; use arrow::datatypes::DataType; use arrow::datatypes::GenericBinaryType; +use arrow::datatypes::StringViewType; use arrow_array::BinaryViewArray; use arrow_array::GenericByteViewArray; use arrow_array::StringViewArray; @@ -557,6 +559,129 @@ impl ByteGroupValueViewBuilder { } } +impl GroupColumn for ByteGroupValueViewBuilder { + fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool { + todo!() + } + + fn append_val(&mut self, array: &ArrayRef, row: usize) { + todo!() + } + + fn len(&self) -> usize { + todo!() + } + + fn size(&self) -> usize { + todo!() + } + + fn build(self: Box) -> ArrayRef { + todo!() + } + + fn take_n(&mut self, n: usize) -> ArrayRef { + debug_assert!(self.len() >= n); + + // Take n for nulls + let null_buffer = self.nulls.take_n(n); + + // Take n for values: + // - Take first n `view`s from `views` + // + // - Find the last non-inlined `view`, if all inlined, + // we can build array and return happily, otherwise we + // we need to continue to process related buffers + // + // - Get the last related `buffer index`(let's name it `buffer index n`) + // from last non-inlined `view` + // + // - Take `0 ~ buffer index n-1` buffers, clone the `buffer index` buffer + // (data part is wrapped by `Arc`, cheap to clone) if it is in `completed`, + // or split + // + // - Shift the `buffer index` of remaining non-inlined `views` + // + let first_n_views = self.views.drain(0..n).collect::>(); + + let last_non_inlined_view = first_n_views + .iter() + .rev() + .find(|view| ((**view) as u32) > 12); + + if let Some(view) = last_non_inlined_view { + let view = ByteView::from(*view); + let last_related_buffer_index = view.buffer_index as usize; + let mut taken_buffers = Vec::with_capacity(last_related_buffer_index + 1); + + // Take `0 ~ last_related_buffer_index - 1` buffers + if !self.completed.is_empty() { + taken_buffers.extend(self.completed.drain(0..last_related_buffer_index)); + } + + // Process the `last_related_buffer_index` buffers + let last_buffer = if last_related_buffer_index < self.completed.len() { + // If it is in `completed`, simply clone + self.completed[last_related_buffer_index].clone() + } else { + // If it is `in_progress`, copied `0 ~ offset` part + let taken_last_buffer = + self.in_progress[0..view.offset as usize].to_vec(); + Buffer::from_vec(taken_last_buffer) + }; + taken_buffers.push(last_buffer); + + // Shift `buffer index` finally + self.views.iter_mut().for_each(|view| { + if (*view as u32) > 12 { + let mut byte_view = ByteView::from(*view); + byte_view.buffer_index -= last_related_buffer_index as u32; + *view = byte_view.as_u128(); + } + }); + + // Build array and return + let views = ScalarBuffer::from(first_n_views); + match self.output_type { + OutputType::Utf8View => { + Arc::new(GenericByteViewArray::::new( + views, + taken_buffers, + null_buffer, + )) + } + OutputType::BinaryView => { + Arc::new(GenericByteViewArray::::new( + views, + taken_buffers, + null_buffer, + )) + } + _ => unreachable!("View types should use `ArrowBytesViewMap`"), + } + } else { + let views = ScalarBuffer::from(first_n_views); + match self.output_type { + OutputType::Utf8View => { + Arc::new(GenericByteViewArray::::new( + views, + Vec::new(), + null_buffer, + )) + } + OutputType::BinaryView => { + Arc::new(GenericByteViewArray::::new( + views, + Vec::new(), + null_buffer, + )) + } + _ => unreachable!("View types should use `ArrowBytesViewMap`"), + } + } + } +} + /// Determines if the nullability of the existing and new input array can be used /// to short-circuit the comparison of the two values. /// From d61c3ec6dc8d340d2a71f56c357554b076870f52 Mon Sep 17 00:00:00 2001 From: kamille Date: Sat, 12 Oct 2024 03:48:20 +0800 Subject: [PATCH 07/26] impl build. --- .../aggregates/group_values/group_column.rs | 38 ++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index 8276ee0067e9..d36123897786 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -577,7 +577,43 @@ impl GroupColumn for ByteGroupValueViewBuilder { } fn build(self: Box) -> ArrayRef { - todo!() + let Self { + output_type, + views, + in_progress, + mut completed, + max_block_size, + nulls, + } = *self; + + // Build nulls + let null_buffer = nulls.build(); + + // Build values + // Flush `in_process` firstly + if !in_progress.is_empty() { + let buffer = Buffer::from(in_progress); + completed.push(buffer); + } + + let views = ScalarBuffer::from(views); + match output_type { + OutputType::Utf8View => { + Arc::new(GenericByteViewArray::::new( + views, + completed, + null_buffer, + )) + } + OutputType::BinaryView => { + Arc::new(GenericByteViewArray::::new( + views, + completed, + null_buffer, + )) + } + _ => unreachable!("View types should use `ArrowBytesViewMap`"), + } } fn take_n(&mut self, n: usize) -> ArrayRef { From 151377e46da65f1bda9dfe43ee4ca8d1cbdbf061 Mon Sep 17 00:00:00 2001 From: kamille Date: Sat, 12 Oct 2024 04:17:52 +0800 Subject: [PATCH 08/26] impl rest functions in `GroupColumn`. --- .../aggregates/group_values/group_column.rs | 36 ++++++++++++++++--- 1 file changed, 31 insertions(+), 5 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index d36123897786..af8e85f4c585 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -561,19 +561,45 @@ impl ByteGroupValueViewBuilder { impl GroupColumn for ByteGroupValueViewBuilder { fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool { - todo!() + match self.output_type { + OutputType::Utf8View => { + self.equal_to_inner::(lhs_row, array, rhs_row) + } + OutputType::BinaryView => { + self.equal_to_inner::(lhs_row, array, rhs_row) + } + _ => unreachable!("View types should use `ArrowBytesViewMap`"), + } } fn append_val(&mut self, array: &ArrayRef, row: usize) { - todo!() + match self.output_type { + OutputType::Utf8View => { + self.append_val_inner::(array, row); + } + OutputType::BinaryView => { + self.append_val_inner::(array, row); + } + _ => unreachable!("View types should use `ArrowBytesViewMap`"), + } } fn len(&self) -> usize { - todo!() + self.views.len() } fn size(&self) -> usize { - todo!() + let buffers_size = self + .completed + .iter() + .map(|buf| buf.capacity() * std::mem::size_of::()) + .sum::(); + + self.nulls.allocated_size() + + self.views.capacity() * std::mem::size_of::() + + self.in_progress.capacity() * std::mem::size_of::() + + buffers_size + + std::mem::size_of::() } fn build(self: Box) -> ArrayRef { @@ -582,8 +608,8 @@ impl GroupColumn for ByteGroupValueViewBuilder { views, in_progress, mut completed, - max_block_size, nulls, + .. } = *self; // Build nulls From 63e11cbeb6da5f52c1edc2f35e6e1b8cfdc18831 Mon Sep 17 00:00:00 2001 From: kamille Date: Sat, 12 Oct 2024 04:21:11 +0800 Subject: [PATCH 09/26] fix output when panic. --- .../src/aggregates/group_values/group_column.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index af8e85f4c585..bcd9d10da14f 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -568,7 +568,7 @@ impl GroupColumn for ByteGroupValueViewBuilder { OutputType::BinaryView => { self.equal_to_inner::(lhs_row, array, rhs_row) } - _ => unreachable!("View types should use `ArrowBytesViewMap`"), + _ => unreachable!("String/Binary type should use ByteGroupValueBuilder"), } } @@ -580,7 +580,7 @@ impl GroupColumn for ByteGroupValueViewBuilder { OutputType::BinaryView => { self.append_val_inner::(array, row); } - _ => unreachable!("View types should use `ArrowBytesViewMap`"), + _ => unreachable!("String/Binary type should use ByteGroupValueBuilder"), } } @@ -638,7 +638,7 @@ impl GroupColumn for ByteGroupValueViewBuilder { null_buffer, )) } - _ => unreachable!("View types should use `ArrowBytesViewMap`"), + _ => unreachable!("String/Binary type should use ByteGroupValueBuilder"), } } @@ -719,7 +719,7 @@ impl GroupColumn for ByteGroupValueViewBuilder { null_buffer, )) } - _ => unreachable!("View types should use `ArrowBytesViewMap`"), + _ => unreachable!("String/Binary type should use ByteGroupValueBuilder"), } } else { let views = ScalarBuffer::from(first_n_views); @@ -738,7 +738,7 @@ impl GroupColumn for ByteGroupValueViewBuilder { null_buffer, )) } - _ => unreachable!("View types should use `ArrowBytesViewMap`"), + _ => unreachable!("String/Binary type should use ByteGroupValueBuilder"), } } } From 15d8349a4d8ea990c323132bcc90dffddb8ada20 Mon Sep 17 00:00:00 2001 From: kamille Date: Sat, 12 Oct 2024 19:14:58 +0800 Subject: [PATCH 10/26] add e2e sql tests. --- .../sqllogictest/test_files/group_by.slt | 62 +++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/datafusion/sqllogictest/test_files/group_by.slt b/datafusion/sqllogictest/test_files/group_by.slt index f561fa9e9ac8..72692290ab0e 100644 --- a/datafusion/sqllogictest/test_files/group_by.slt +++ b/datafusion/sqllogictest/test_files/group_by.slt @@ -5207,3 +5207,65 @@ NULL a 2 statement ok drop table t; + +# test multi group by int + utf8view +statement ok +create table source as values +-- use some strings that are larger than 12 characters as that goes through a different path +(1, 'a'), +(1, 'a'), +(2, 'thisstringislongerthan12'), +(2, 'thisstring'), +(3, 'abc'), +(3, 'cba'), +(2, 'thisstring'), +(null, null), +(null, 'a'), +(null, null), +(null, 'a'), +(2, 'thisstringisalsolongerthan12'), +(2, 'thisstringislongerthan12'), +(1, 'null') +; + +statement ok +create view t as select column1 as a, arrow_cast(column2, 'Utf8View') as b from source; + +query ITI +select a, b, count(*) from t group by a, b order by a, b; +---- +1 a 2 +1 null 1 +2 thisstring 2 +2 thisstringisalsolongerthan12 1 +2 thisstringislongerthan12 2 +3 abc 1 +3 cba 1 +NULL a 2 +NULL NULL 2 + +statement ok +drop view t + +# test with binary view +statement ok +create view t as select column1 as a, arrow_cast(column2, 'BinaryView') as b from source; + +query I?I +select a, b, count(*) from t group by a, b order by a, b; +---- +1 61 2 +1 6e756c6c 1 +2 74686973737472696e67 2 +2 74686973737472696e676973616c736f6c6f6e6765727468616e3132 1 +2 74686973737472696e6769736c6f6e6765727468616e3132 2 +3 616263 1 +3 636261 1 +NULL 61 2 +NULL NULL 2 + +statement ok +drop view t + +statement ok +drop table source; From d9ee724773b19a2a159d4b02d65cd77b61f0a576 Mon Sep 17 00:00:00 2001 From: kamille Date: Sat, 12 Oct 2024 19:15:56 +0800 Subject: [PATCH 11/26] add unit tests. --- .../aggregates/group_values/group_column.rs | 148 ++++++++++++++++++ 1 file changed, 148 insertions(+) diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index bcd9d10da14f..edd8efc66d63 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -947,4 +947,152 @@ mod tests { assert!(!builder.equal_to(4, &input_array, 4)); assert!(builder.equal_to(5, &input_array, 5)); } + + #[test] + fn test_byte_view_append_val() { + let mut builder = + ByteViewGroupValueBuilder::::new().with_max_block_size(60); + let builder_array = StringViewArray::from(vec![ + Some("this string is quite long"), // in buffer 0 + Some("foo"), + None, + Some("bar"), + Some("this string is also quite long"), // buffer 0 + Some("this string is quite long"), // buffer 1 + Some("bar"), + ]); + let builder_array: ArrayRef = Arc::new(builder_array); + for row in 0..builder_array.len() { + builder.append_val(&builder_array, row); + } + + let output = Box::new(builder).build(); + // should be 2 output buffers to hold all the data + assert_eq!(output.as_string_view().data_buffers().len(), 2,); + assert_eq!(&output, &builder_array) + } + + #[test] + fn test_byte_view_equal_to() { + // Will cover such cases: + // - exist null, input not null + // - exist null, input null; values not equal + // - exist null, input null; values equal + // - exist not null, input null + // - exist not null, input not null; values not equal + // - exist not null, input not null; values equal + + let mut builder = ByteViewGroupValueBuilder::::new(); + let builder_array = Arc::new(StringViewArray::from(vec![ + None, + None, + None, + Some("foo"), + Some("bar"), + Some("this string is quite long"), + Some("baz"), + ])) as ArrayRef; + builder.append_val(&builder_array, 0); + builder.append_val(&builder_array, 1); + builder.append_val(&builder_array, 2); + builder.append_val(&builder_array, 3); + builder.append_val(&builder_array, 4); + builder.append_val(&builder_array, 5); + builder.append_val(&builder_array, 6); + + // Define input array + let (views, buffer, _nulls) = StringViewArray::from(vec![ + Some("foo"), + Some("bar"), // set to null + Some("this string is quite long"), // set to null + None, + None, + Some("foo"), + Some("baz"), + ]) + .into_parts(); + + // explicitly build a boolean buffer where one of the null values also happens to match + let mut boolean_buffer_builder = BooleanBufferBuilder::new(6); + boolean_buffer_builder.append(true); + boolean_buffer_builder.append(false); // this sets Some("bar") to null above + boolean_buffer_builder.append(false); // this sets Some("thisstringisquitelong") to null above + boolean_buffer_builder.append(false); + boolean_buffer_builder.append(false); + boolean_buffer_builder.append(true); + boolean_buffer_builder.append(true); + let nulls = NullBuffer::new(boolean_buffer_builder.finish()); + let input_array = + Arc::new(StringViewArray::new(views, buffer, Some(nulls))) as ArrayRef; + + // Check + assert!(!builder.equal_to(0, &input_array, 0)); + assert!(builder.equal_to(1, &input_array, 1)); + assert!(builder.equal_to(2, &input_array, 2)); + assert!(!builder.equal_to(3, &input_array, 3)); + assert!(!builder.equal_to(4, &input_array, 4)); + assert!(!builder.equal_to(5, &input_array, 5)); + assert!(builder.equal_to(6, &input_array, 6)); + } + + #[test] + fn test_byte_view_take_n() { + let mut builder = + ByteViewGroupValueBuilder::::new().with_max_block_size(60); + let input_array = StringViewArray::from(vec![ + Some("this string is quite long"), // in buffer 0 + Some("foo"), + Some("bar"), + Some("this string is also quite long"), // buffer 0 + None, + Some("this string is quite long"), // buffer 1 + None, + Some("another string that is is quite long"), // buffer 1 + Some("bar"), + ]); + let input_array: ArrayRef = Arc::new(input_array); + for row in 0..input_array.len() { + builder.append_val(&input_array, row); + } + + // should be 2 completed, one in progress buffer to hold all output + assert_eq!(builder.completed.len(), 2); + assert!(builder.in_progress.len() > 0); + + let first_4 = builder.take_n(4); + println!( + "{}", + arrow::util::pretty::pretty_format_columns("first_4", &[first_4.clone()]) + .unwrap() + ); + assert_eq!(&first_4, &input_array.slice(0, 4)); + + // Add some new data after the first n + let input_array = StringViewArray::from(vec![ + Some("short"), + None, + Some("Some new data to add that is long"), // in buffer 0 + Some("short again"), + ]); + let input_array: ArrayRef = Arc::new(input_array); + for row in 0..input_array.len() { + builder.append_val(&input_array, row); + } + + let result = Box::new(builder).build(); + let expected: ArrayRef = Arc::new(StringViewArray::from(vec![ + // last rows of the original input + None, + Some("this string is quite long"), + None, + Some("another string that is is quite long"), + Some("bar"), + // the subsequent input + Some("short"), + None, + Some("Some new data to add that is long"), // in buffer 0 + Some("short again"), + ])); + assert_eq!(&result, &expected); + } } From beffa35c4d825ffb4a29d4c8d03965cf3a73bd98 Mon Sep 17 00:00:00 2001 From: kamille Date: Sat, 12 Oct 2024 22:14:21 +0800 Subject: [PATCH 12/26] switch to a really elegant style codes from alamb. --- .../aggregates/group_values/group_column.rs | 137 +++++++----------- 1 file changed, 54 insertions(+), 83 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index edd8efc66d63..5977c91a200a 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -43,10 +43,13 @@ use datafusion_common::utils::proxy::VecAllocExt; use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder; use arrow_array::types::GenericStringType; use datafusion_physical_expr_common::binary_map::{OutputType, INITIAL_BUFFER_CAPACITY}; +use std::marker::PhantomData; use std::mem; use std::sync::Arc; use std::vec; +const BYTE_VIEW_MAX_BLOCK_SIZE: usize = 2 * 1024 * 1024; + /// Trait for storing a single column of group values in [`GroupValuesColumn`] /// /// Implementations of this trait store an in-progress collection of group values @@ -397,9 +400,7 @@ where /// 1. Efficient comparison of incoming rows to existing rows /// 2. Efficient construction of the final output array /// 3. Efficient to perform `take_n` comparing to use `GenericByteViewBuilder` -pub struct ByteGroupValueViewBuilder { - output_type: OutputType, - +pub struct ByteViewGroupValueBuilder { /// The views of string values /// /// If string len <= 12, the view's format will be: @@ -423,14 +424,37 @@ pub struct ByteGroupValueViewBuilder { /// `in_progress` will be flushed into `completed`, and create new `in_progress` /// when found its remaining capacity(`max_block_size` - `len(in_progress)`), /// is no enough to store the appended value. + /// + /// Currently it is fixed at 2MB. max_block_size: usize, /// Nulls nulls: MaybeNullBufferBuilder, + + /// phantom data so the type requires + _phantom: PhantomData, } -impl ByteGroupValueViewBuilder { - fn append_val_inner(&mut self, array: &ArrayRef, row: usize) +impl ByteViewGroupValueBuilder { + fn new() -> Self { + Self { + views: Vec::new(), + in_progress: Vec::new(), + completed: Vec::new(), + max_block_size: BYTE_VIEW_MAX_BLOCK_SIZE, + nulls: MaybeNullBufferBuilder::new(), + _phantom: PhantomData {}, + } + } + + /// Set the max block size + #[cfg(test)] + fn with_max_block_size(mut self, max_block_size: usize) -> Self { + self.max_block_size = max_block_size; + self + } + + fn append_val_inner(&mut self, array: &ArrayRef, row: usize) where B: ByteViewType, { @@ -481,10 +505,7 @@ impl ByteGroupValueViewBuilder { } } - fn equal_to_inner(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool - where - B: ByteViewType, - { + fn equal_to_inner(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool { let array = array.as_byte_view::(); // Check if nulls equal firstly @@ -559,29 +580,13 @@ impl ByteGroupValueViewBuilder { } } -impl GroupColumn for ByteGroupValueViewBuilder { +impl GroupColumn for ByteViewGroupValueBuilder { fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool { - match self.output_type { - OutputType::Utf8View => { - self.equal_to_inner::(lhs_row, array, rhs_row) - } - OutputType::BinaryView => { - self.equal_to_inner::(lhs_row, array, rhs_row) - } - _ => unreachable!("String/Binary type should use ByteGroupValueBuilder"), - } + self.equal_to_inner(lhs_row, array, rhs_row) } fn append_val(&mut self, array: &ArrayRef, row: usize) { - match self.output_type { - OutputType::Utf8View => { - self.append_val_inner::(array, row); - } - OutputType::BinaryView => { - self.append_val_inner::(array, row); - } - _ => unreachable!("String/Binary type should use ByteGroupValueBuilder"), - } + self.append_val_inner(array, row) } fn len(&self) -> usize { @@ -604,7 +609,6 @@ impl GroupColumn for ByteGroupValueViewBuilder { fn build(self: Box) -> ArrayRef { let Self { - output_type, views, in_progress, mut completed, @@ -623,23 +627,12 @@ impl GroupColumn for ByteGroupValueViewBuilder { } let views = ScalarBuffer::from(views); - match output_type { - OutputType::Utf8View => { - Arc::new(GenericByteViewArray::::new( - views, - completed, - null_buffer, - )) - } - OutputType::BinaryView => { - Arc::new(GenericByteViewArray::::new( - views, - completed, - null_buffer, - )) - } - _ => unreachable!("String/Binary type should use ByteGroupValueBuilder"), - } + + Arc::new(GenericByteViewArray::::new( + views, + completed, + null_buffer, + )) } fn take_n(&mut self, n: usize) -> ArrayRef { @@ -704,42 +697,18 @@ impl GroupColumn for ByteGroupValueViewBuilder { // Build array and return let views = ScalarBuffer::from(first_n_views); - match self.output_type { - OutputType::Utf8View => { - Arc::new(GenericByteViewArray::::new( - views, - taken_buffers, - null_buffer, - )) - } - OutputType::BinaryView => { - Arc::new(GenericByteViewArray::::new( - views, - taken_buffers, - null_buffer, - )) - } - _ => unreachable!("String/Binary type should use ByteGroupValueBuilder"), - } + Arc::new(GenericByteViewArray::::new( + views, + taken_buffers, + null_buffer, + )) } else { let views = ScalarBuffer::from(first_n_views); - match self.output_type { - OutputType::Utf8View => { - Arc::new(GenericByteViewArray::::new( - views, - Vec::new(), - null_buffer, - )) - } - OutputType::BinaryView => { - Arc::new(GenericByteViewArray::::new( - views, - Vec::new(), - null_buffer, - )) - } - _ => unreachable!("String/Binary type should use ByteGroupValueBuilder"), - } + Arc::new(GenericByteViewArray::::new( + views, + Vec::new(), + null_buffer, + )) } } } @@ -762,12 +731,14 @@ fn nulls_equal_to(lhs_null: bool, rhs_null: bool) -> Option { mod tests { use std::sync::Arc; - use arrow::datatypes::Int64Type; - use arrow_array::{ArrayRef, Int64Array, StringArray}; + use arrow::{array::AsArray, datatypes::{Int64Type, StringViewType}}; + use arrow_array::{ArrayRef, Int64Array, StringArray, StringViewArray}; use arrow_buffer::{BooleanBufferBuilder, NullBuffer}; use datafusion_physical_expr::binary_map::OutputType; - use crate::aggregates::group_values::group_column::PrimitiveGroupValueBuilder; + use crate::aggregates::group_values::group_column::{ + ByteViewGroupValueBuilder, PrimitiveGroupValueBuilder, + }; use super::{ByteGroupValueBuilder, GroupColumn}; From 46822f961f5f81b8ecbfcd1ed135a4be56c684e6 Mon Sep 17 00:00:00 2001 From: kamille Date: Sun, 13 Oct 2024 01:25:19 +0800 Subject: [PATCH 13/26] fix take_n. --- .../aggregates/group_values/group_column.rs | 181 +++++++++++++----- 1 file changed, 129 insertions(+), 52 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index 5977c91a200a..78ceba49edd4 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -460,14 +460,14 @@ impl ByteViewGroupValueBuilder { { let arr = array.as_byte_view::(); - // If a null row, set and return + // Null row case, set and return if arr.is_null(row) { self.nulls.append(true); self.views.push(0); return; } - // Not null case + // Not null row case self.nulls.append(false); let value: &[u8] = arr.value(row).as_ref(); @@ -578,43 +578,15 @@ impl ByteViewGroupValueBuilder { &self.in_progress[offset..offset + length] } } -} - -impl GroupColumn for ByteViewGroupValueBuilder { - fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool { - self.equal_to_inner(lhs_row, array, rhs_row) - } - - fn append_val(&mut self, array: &ArrayRef, row: usize) { - self.append_val_inner(array, row) - } - - fn len(&self) -> usize { - self.views.len() - } - - fn size(&self) -> usize { - let buffers_size = self - .completed - .iter() - .map(|buf| buf.capacity() * std::mem::size_of::()) - .sum::(); - self.nulls.allocated_size() - + self.views.capacity() * std::mem::size_of::() - + self.in_progress.capacity() * std::mem::size_of::() - + buffers_size - + std::mem::size_of::() - } - - fn build(self: Box) -> ArrayRef { + fn build_inner(self: Self) -> ArrayRef { let Self { views, in_progress, mut completed, nulls, .. - } = *self; + } = self; // Build nulls let null_buffer = nulls.build(); @@ -635,9 +607,15 @@ impl GroupColumn for ByteViewGroupValueBuilder { )) } - fn take_n(&mut self, n: usize) -> ArrayRef { + fn take_n_inner(&mut self, n: usize) -> ArrayRef { debug_assert!(self.len() >= n); + // The `n == len` case, we need to take all + if self.len() == n { + return self.build_inner(); + } + + // The `n < len` case // Take n for nulls let null_buffer = self.nulls.take_n(n); @@ -652,8 +630,8 @@ impl GroupColumn for ByteViewGroupValueBuilder { // from last non-inlined `view` // // - Take `0 ~ buffer index n-1` buffers, clone the `buffer index` buffer - // (data part is wrapped by `Arc`, cheap to clone) if it is in `completed`, - // or split + // (data part is wrapped by `Arc`, cheap to clone) if it is one of `completed`, + // or copy to generate a new buffer for return if it is `in_progress` // // - Shift the `buffer index` of remaining non-inlined `views` // @@ -667,30 +645,34 @@ impl GroupColumn for ByteViewGroupValueBuilder { if let Some(view) = last_non_inlined_view { let view = ByteView::from(*view); let last_related_buffer_index = view.buffer_index as usize; - let mut taken_buffers = Vec::with_capacity(last_related_buffer_index + 1); - // Take `0 ~ last_related_buffer_index - 1` buffers - if !self.completed.is_empty() { - taken_buffers.extend(self.completed.drain(0..last_related_buffer_index)); - } + // Check should we take the whole `last_related_buffer_index` buffer + let take_whole_last_buffer = self.should_take_whole_buffer( + last_related_buffer_index, + view.offset + view.length, + ); - // Process the `last_related_buffer_index` buffers - let last_buffer = if last_related_buffer_index < self.completed.len() { - // If it is in `completed`, simply clone - self.completed[last_related_buffer_index].clone() + // Take related buffers + let buffers = if take_whole_last_buffer { + self.take_buffers_with_whole_last(last_related_buffer_index) } else { - // If it is `in_progress`, copied `0 ~ offset` part - let taken_last_buffer = - self.in_progress[0..view.offset as usize].to_vec(); - Buffer::from_vec(taken_last_buffer) + self.take_buffers_with_partial_last( + last_related_buffer_index, + view.offset + view.length, + ) + }; + + // Shift `buffer index`s finally + let shifts = if take_whole_last_buffer { + last_related_buffer_index + 1 + } else { + last_non_inlined_view }; - taken_buffers.push(last_buffer); - // Shift `buffer index` finally self.views.iter_mut().for_each(|view| { if (*view as u32) > 12 { let mut byte_view = ByteView::from(*view); - byte_view.buffer_index -= last_related_buffer_index as u32; + byte_view.buffer_index -= shifts as u32; *view = byte_view.as_u128(); } }); @@ -711,6 +693,98 @@ impl GroupColumn for ByteViewGroupValueBuilder { )) } } + + fn take_buffers_with_whole_last( + &mut self, + last_related_buffer_index: usize, + ) -> Vec { + if last_related_buffer_index == self.completed.len() { + self.flush_in_progress(); + } + self.completed + .drain(0..last_related_buffer_index + 1) + .collect() + } + + fn take_buffers_with_partial_last( + &mut self, + last_related_buffer_index: usize, + take_len: usize, + ) -> Vec { + let mut take_buffers = Vec::with_capacity(last_related_buffer_index + 1); + + // Take `0 ~ last_related_buffer_index - 1` buffers + if !self.completed.is_empty() || last_related_buffer_index == 0 { + take_buffers.extend(self.completed.drain(0..last_related_buffer_index)); + } + + // Process the `last_related_buffer_index` buffers + let last_buffer = if last_related_buffer_index < self.completed.len() { + // If it is in `completed`, simply clone + self.completed[last_related_buffer_index].clone() + } else { + // If it is `in_progress`, copied `0 ~ offset` part + let taken_last_buffer = self.in_progress[0..take_len].to_vec(); + Buffer::from_vec(taken_last_buffer) + }; + take_buffers.push(last_buffer); + + take_buffers + } + + #[inline] + fn should_take_whole_buffer(&self, buffer_index: usize, take_len: usize) -> bool { + if buffer_index < self.completed.len() { + take_len == self.completed[buffer_index].len() + } else { + take_len == self.in_progress.len() + } + } + + fn flush_in_progress(&mut self) { + let flushed_block = mem::replace( + &mut self.in_progress, + Vec::with_capacity(self.max_block_size), + ); + let buffer = Buffer::from_vec(flushed_block); + self.completed.push(buffer); + } +} + +impl GroupColumn for ByteViewGroupValueBuilder { + fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool { + self.equal_to_inner(lhs_row, array, rhs_row) + } + + fn append_val(&mut self, array: &ArrayRef, row: usize) { + self.append_val_inner(array, row) + } + + fn len(&self) -> usize { + self.views.len() + } + + fn size(&self) -> usize { + let buffers_size = self + .completed + .iter() + .map(|buf| buf.capacity() * std::mem::size_of::()) + .sum::(); + + self.nulls.allocated_size() + + self.views.capacity() * std::mem::size_of::() + + self.in_progress.capacity() * std::mem::size_of::() + + buffers_size + + std::mem::size_of::() + } + + fn build(self: Box) -> ArrayRef { + Self::build_inner(*self) + } + + fn take_n(&mut self, n: usize) -> ArrayRef { + self.take_n_inner(n) + } } /// Determines if the nullability of the existing and new input array can be used @@ -731,7 +805,10 @@ fn nulls_equal_to(lhs_null: bool, rhs_null: bool) -> Option { mod tests { use std::sync::Arc; - use arrow::{array::AsArray, datatypes::{Int64Type, StringViewType}}; + use arrow::{ + array::AsArray, + datatypes::{Int64Type, StringViewType}, + }; use arrow_array::{ArrayRef, Int64Array, StringArray, StringViewArray}; use arrow_buffer::{BooleanBufferBuilder, NullBuffer}; use datafusion_physical_expr::binary_map::OutputType; From 3a93584ce231c36cb8eb50e40e02111a77ab8f39 Mon Sep 17 00:00:00 2001 From: kamille Date: Sun, 13 Oct 2024 01:29:34 +0800 Subject: [PATCH 14/26] improve comments. --- .../src/aggregates/group_values/group_column.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index 78ceba49edd4..c5be822243f9 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -629,9 +629,10 @@ impl ByteViewGroupValueBuilder { // - Get the last related `buffer index`(let's name it `buffer index n`) // from last non-inlined `view` // - // - Take `0 ~ buffer index n-1` buffers, clone the `buffer index` buffer - // (data part is wrapped by `Arc`, cheap to clone) if it is one of `completed`, - // or copy to generate a new buffer for return if it is `in_progress` + // - Take buffers, the key is that we need to know if we need to take + // the whole last related buffer. The logic is a bit complex, you can + // detail in `take_buffers_with_whole_last`, `take_buffers_with_partial_last` + // and other related steps in following // // - Shift the `buffer index` of remaining non-inlined `views` // From f99f55c6d075223fa5b363217890ff69e136225d Mon Sep 17 00:00:00 2001 From: kamille Date: Sun, 13 Oct 2024 01:40:01 +0800 Subject: [PATCH 15/26] fix compile. --- .../src/aggregates/group_values/group_column.rs | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index c5be822243f9..9e7d01c6591b 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -612,7 +612,8 @@ impl ByteViewGroupValueBuilder { // The `n == len` case, we need to take all if self.len() == n { - return self.build_inner(); + let cur_builder = std::mem::replace(self, Self::new()); + return cur_builder.build_inner(); } // The `n < len` case @@ -650,7 +651,7 @@ impl ByteViewGroupValueBuilder { // Check should we take the whole `last_related_buffer_index` buffer let take_whole_last_buffer = self.should_take_whole_buffer( last_related_buffer_index, - view.offset + view.length, + (view.offset + view.length) as usize, ); // Take related buffers @@ -659,7 +660,7 @@ impl ByteViewGroupValueBuilder { } else { self.take_buffers_with_partial_last( last_related_buffer_index, - view.offset + view.length, + (view.offset + view.length) as usize, ) }; @@ -667,7 +668,7 @@ impl ByteViewGroupValueBuilder { let shifts = if take_whole_last_buffer { last_related_buffer_index + 1 } else { - last_non_inlined_view + last_related_buffer_index }; self.views.iter_mut().for_each(|view| { @@ -680,11 +681,7 @@ impl ByteViewGroupValueBuilder { // Build array and return let views = ScalarBuffer::from(first_n_views); - Arc::new(GenericByteViewArray::::new( - views, - taken_buffers, - null_buffer, - )) + Arc::new(GenericByteViewArray::::new(views, buffers, null_buffer)) } else { let views = ScalarBuffer::from(first_n_views); Arc::new(GenericByteViewArray::::new( From 37b48166c290c0bde1a59faa5a9113f4ae56a25c Mon Sep 17 00:00:00 2001 From: kamille Date: Sun, 13 Oct 2024 12:09:54 +0800 Subject: [PATCH 16/26] fix clippy. --- .../src/aggregates/group_values/group_column.rs | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index 9e7d01c6591b..7bd083d36c51 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -22,9 +22,6 @@ use arrow::array::GenericBinaryArray; use arrow::array::GenericStringArray; use arrow::array::OffsetSizeTrait; use arrow::array::PrimitiveArray; -use arrow::array::PrimitiveBuilder; -use arrow::array::StringBuilder; -use arrow::array::StringViewBuilder; use arrow::array::{Array, ArrayRef, ArrowPrimitiveType, AsArray}; use arrow::buffer::OffsetBuffer; use arrow::buffer::ScalarBuffer; @@ -33,10 +30,7 @@ use arrow::datatypes::ByteArrayType; use arrow::datatypes::ByteViewType; use arrow::datatypes::DataType; use arrow::datatypes::GenericBinaryType; -use arrow::datatypes::StringViewType; -use arrow_array::BinaryViewArray; use arrow_array::GenericByteViewArray; -use arrow_array::StringViewArray; use arrow_buffer::Buffer; use datafusion_common::utils::proxy::VecAllocExt; @@ -579,7 +573,7 @@ impl ByteViewGroupValueBuilder { } } - fn build_inner(self: Self) -> ArrayRef { + fn build_inner(self) -> ArrayRef { let Self { views, in_progress, @@ -1103,13 +1097,16 @@ mod tests { // should be 2 completed, one in progress buffer to hold all output assert_eq!(builder.completed.len(), 2); - assert!(builder.in_progress.len() > 0); + assert!(!builder.in_progress.is_empty()); let first_4 = builder.take_n(4); println!( "{}", - arrow::util::pretty::pretty_format_columns("first_4", &[first_4.clone()]) - .unwrap() + arrow::util::pretty::pretty_format_columns( + "first_4", + &[Arc::clone(&first_4)] + ) + .unwrap() ); assert_eq!(&first_4, &input_array.slice(0, 4)); From d78c68da4bf17218e8ff3f520357de22159e20fe Mon Sep 17 00:00:00 2001 From: kamille Date: Sun, 13 Oct 2024 17:07:22 +0800 Subject: [PATCH 17/26] define more testcases in `test_byte_view_take_n`. --- .../aggregates/group_values/group_column.rs | 120 +++++++++++++----- 1 file changed, 86 insertions(+), 34 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index 7bd083d36c51..d88c294b4135 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -1077,18 +1077,44 @@ mod tests { #[test] fn test_byte_view_take_n() { + // `take_n` is really complex, we should consider and test following situations: + // 1. Take nulls + // 2. Take all `inlined`s + // 3. Take non-inlined + partial last buffer in `completed` + // 4. Take non-inlined + whole last buffer in `completed` + // 5. Take non-inlined + partial last `in_progress` + // 6. Take non-inlined + while last buffer in ``in_progress` + // 7. Take all views at once + let mut builder = ByteViewGroupValueBuilder::::new().with_max_block_size(60); let input_array = StringViewArray::from(vec![ - Some("this string is quite long"), // in buffer 0 + // Test situation 1 + None, + None, + // Test situation 2 (also test take null together) + None, Some("foo"), Some("bar"), - Some("this string is also quite long"), // buffer 0 + // Test situation 3 (also test take null + inlined) + None, + Some("foo"), + Some("this string is quite long"), + Some("this string is also quite long"), + // Test situation 4 (also test take null + inlined) + None, + Some("bar"), + Some("this string is quite long"), + // Test situation 5 (also test take null + inlined) None, - Some("this string is quite long"), // buffer 1 + Some("foo"), + Some("another string that is is quite long"), + Some("this string not so long"), + // Test situation 6 (also test take null + inlined + insert again after taking) None, - Some("another string that is is quite long"), // buffer 1 Some("bar"), + Some("this string is quite long"), + // Finally, we insert the whole array again for testing situation 7 ]); let input_array: ArrayRef = Arc::new(input_array); for row in 0..input_array.len() { @@ -1097,45 +1123,71 @@ mod tests { // should be 2 completed, one in progress buffer to hold all output assert_eq!(builder.completed.len(), 2); + println!("{}", builder.in_progress.len()); assert!(!builder.in_progress.is_empty()); - let first_4 = builder.take_n(4); + // Take all `inlined`s + let taken_array = builder.take_n(2); println!( "{}", arrow::util::pretty::pretty_format_columns( - "first_4", - &[Arc::clone(&first_4)] + "taken_array", + &[Arc::clone(&taken_array)] ) .unwrap() ); - assert_eq!(&first_4, &input_array.slice(0, 4)); + assert_eq!(&taken_array, &input_array.slice(0, 2)); - // Add some new data after the first n - let input_array = StringViewArray::from(vec![ - Some("short"), - None, - Some("Some new data to add that is long"), // in buffer 0 - Some("short again"), - ]); - let input_array: ArrayRef = Arc::new(input_array); - for row in 0..input_array.len() { - builder.append_val(&input_array, row); - } + // Take non-inlined + partial buffer 0 + let taken_array = builder.take_n(1); + println!( + "{}", + arrow::util::pretty::pretty_format_columns( + "taken_array", + &[Arc::clone(&taken_array)] + ) + .unwrap() + ); + assert_eq!(&taken_array, &input_array.slice(2, 1)); - let result = Box::new(builder).build(); - let expected: ArrayRef = Arc::new(StringViewArray::from(vec![ - // last rows of the original input - None, - Some("this string is quite long"), - None, - Some("another string that is is quite long"), - Some("bar"), - // the subsequent input - Some("short"), - None, - Some("Some new data to add that is long"), // in buffer 0 - Some("short again"), - ])); - assert_eq!(&result, &expected); + // Take non-inlined + remaining partial buffer 0 + let taken_array = builder.take_n(1); + println!( + "{}", + arrow::util::pretty::pretty_format_columns( + "taken_array", + &[Arc::clone(&taken_array)] + ) + .unwrap() + ); + assert_eq!(&taken_array, &input_array.slice(3, 1)); + + // Add some new data after the first n + // let input_array = StringViewArray::from(vec![ + // Some("short"), + // None, + // Some("Some new data to add that is long"), // in buffer 0 + // Some("short again"), + // ]); + // let input_array: ArrayRef = Arc::new(input_array); + // for row in 0..input_array.len() { + // builder.append_val(&input_array, row); + // } + + // let result = Box::new(builder).build(); + // let expected: ArrayRef = Arc::new(StringViewArray::from(vec![ + // // last rows of the original input + // None, + // Some("this string is quite long"), + // None, + // Some("another string that is is quite long"), + // Some("bar"), + // // the subsequent input + // Some("short"), + // None, + // Some("Some new data to add that is long"), // in buffer 0 + // Some("short again"), + // ])); + // assert_eq!(&result, &expected); } } From 7cb7dfca3b50b5df3350809a17ea46e780b90f96 Mon Sep 17 00:00:00 2001 From: kamille Date: Sun, 13 Oct 2024 18:43:30 +0800 Subject: [PATCH 18/26] connect up. --- .../src/aggregates/group_values/column.rs | 18 ++- .../aggregates/group_values/group_column.rs | 132 +++++++++--------- 2 files changed, 81 insertions(+), 69 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/column.rs b/datafusion/physical-plan/src/aggregates/group_values/column.rs index 28f35b2bded2..4ad75844f7b7 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/column.rs @@ -16,14 +16,16 @@ // under the License. use crate::aggregates::group_values::group_column::{ - ByteGroupValueBuilder, GroupColumn, PrimitiveGroupValueBuilder, + ByteGroupValueBuilder, ByteViewGroupValueBuilder, GroupColumn, + PrimitiveGroupValueBuilder, }; use crate::aggregates::group_values::GroupValues; use ahash::RandomState; use arrow::compute::cast; use arrow::datatypes::{ - Date32Type, Date64Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, - Int8Type, UInt16Type, UInt32Type, UInt64Type, UInt8Type, + BinaryViewType, Date32Type, Date64Type, Float32Type, Float64Type, Int16Type, + Int32Type, Int64Type, Int8Type, StringViewType, UInt16Type, UInt32Type, UInt64Type, + UInt8Type, }; use arrow::record_batch::RecordBatch; use arrow_array::{Array, ArrayRef}; @@ -119,6 +121,8 @@ impl GroupValuesColumn { | DataType::LargeBinary | DataType::Date32 | DataType::Date64 + | DataType::Utf8View + | DataType::BinaryView ) } } @@ -184,6 +188,14 @@ impl GroupValues for GroupValuesColumn { let b = ByteGroupValueBuilder::::new(OutputType::Binary); v.push(Box::new(b) as _) } + &DataType::Utf8View => { + let b = ByteViewGroupValueBuilder::::new(); + v.push(Box::new(b) as _) + } + &DataType::BinaryView => { + let b = ByteViewGroupValueBuilder::::new(); + v.push(Box::new(b) as _) + } dt => { return not_impl_err!("{dt} not supported in GroupValuesColumn") } diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index d88c294b4135..2bb47c873047 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -25,7 +25,6 @@ use arrow::array::PrimitiveArray; use arrow::array::{Array, ArrayRef, ArrowPrimitiveType, AsArray}; use arrow::buffer::OffsetBuffer; use arrow::buffer::ScalarBuffer; -use arrow::datatypes::BinaryViewType; use arrow::datatypes::ByteArrayType; use arrow::datatypes::ByteViewType; use arrow::datatypes::DataType; @@ -430,7 +429,7 @@ pub struct ByteViewGroupValueBuilder { } impl ByteViewGroupValueBuilder { - fn new() -> Self { + pub fn new() -> Self { Self { views: Vec::new(), in_progress: Vec::new(), @@ -442,7 +441,6 @@ impl ByteViewGroupValueBuilder { } /// Set the max block size - #[cfg(test)] fn with_max_block_size(mut self, max_block_size: usize) -> Self { self.max_block_size = max_block_size; self @@ -606,7 +604,8 @@ impl ByteViewGroupValueBuilder { // The `n == len` case, we need to take all if self.len() == n { - let cur_builder = std::mem::replace(self, Self::new()); + let new_builder = Self::new().with_max_block_size(self.max_block_size); + let cur_builder = std::mem::replace(self, new_builder); return cur_builder.build_inner(); } @@ -1077,6 +1076,8 @@ mod tests { #[test] fn test_byte_view_take_n() { + // ####### Define cases and init ####### + // `take_n` is really complex, we should consider and test following situations: // 1. Take nulls // 2. Take all `inlined`s @@ -1114,80 +1115,79 @@ mod tests { None, Some("bar"), Some("this string is quite long"), - // Finally, we insert the whole array again for testing situation 7 + // Insert 4 and just take 3 to ensure it will go the path of situation 6 + None, + // Finally, we create a new builder, insert the whole array and then + // take whole at once for testing situation 7 ]); + let input_array: ArrayRef = Arc::new(input_array); - for row in 0..input_array.len() { + let first_ones_to_append = 16; // For testing situation 1~5 + let second_ones_to_append = 3; // For testing situation 6 + let final_ones_to_append = input_array.len(); // For testing situation 7 + + // ####### Test situation 1~5 ####### + for row in 0..first_ones_to_append { builder.append_val(&input_array, row); } - // should be 2 completed, one in progress buffer to hold all output assert_eq!(builder.completed.len(), 2); - println!("{}", builder.in_progress.len()); - assert!(!builder.in_progress.is_empty()); + assert_eq!(builder.in_progress.len(), 59); - // Take all `inlined`s + // Situation 1 let taken_array = builder.take_n(2); - println!( - "{}", - arrow::util::pretty::pretty_format_columns( - "taken_array", - &[Arc::clone(&taken_array)] - ) - .unwrap() - ); assert_eq!(&taken_array, &input_array.slice(0, 2)); - // Take non-inlined + partial buffer 0 + // Situation 2 + let taken_array = builder.take_n(3); + assert_eq!(&taken_array, &input_array.slice(2, 3)); + + // Situation 3 + let taken_array = builder.take_n(3); + assert_eq!(&taken_array, &input_array.slice(5, 3)); + let taken_array = builder.take_n(1); - println!( - "{}", - arrow::util::pretty::pretty_format_columns( - "taken_array", - &[Arc::clone(&taken_array)] - ) - .unwrap() - ); - assert_eq!(&taken_array, &input_array.slice(2, 1)); + assert_eq!(&taken_array, &input_array.slice(8, 1)); + + // Situation 4 + let taken_array = builder.take_n(3); + assert_eq!(&taken_array, &input_array.slice(9, 3)); + + // Situation 5 + let taken_array = builder.take_n(3); + assert_eq!(&taken_array, &input_array.slice(12, 3)); - // Take non-inlined + remaining partial buffer 0 let taken_array = builder.take_n(1); - println!( - "{}", - arrow::util::pretty::pretty_format_columns( - "taken_array", - &[Arc::clone(&taken_array)] - ) - .unwrap() - ); - assert_eq!(&taken_array, &input_array.slice(3, 1)); - - // Add some new data after the first n - // let input_array = StringViewArray::from(vec![ - // Some("short"), - // None, - // Some("Some new data to add that is long"), // in buffer 0 - // Some("short again"), - // ]); - // let input_array: ArrayRef = Arc::new(input_array); - // for row in 0..input_array.len() { - // builder.append_val(&input_array, row); - // } - - // let result = Box::new(builder).build(); - // let expected: ArrayRef = Arc::new(StringViewArray::from(vec![ - // // last rows of the original input - // None, - // Some("this string is quite long"), - // None, - // Some("another string that is is quite long"), - // Some("bar"), - // // the subsequent input - // Some("short"), - // None, - // Some("Some new data to add that is long"), // in buffer 0 - // Some("short again"), - // ])); - // assert_eq!(&result, &expected); + assert_eq!(&taken_array, &input_array.slice(15, 1)); + + // ####### Test situation 6 ####### + assert!(builder.completed.is_empty()); + assert!(builder.in_progress.is_empty()); + assert!(builder.views.is_empty()); + + for row in first_ones_to_append..first_ones_to_append + second_ones_to_append { + builder.append_val(&input_array, row); + } + + assert!(builder.completed.is_empty()); + assert_eq!(builder.in_progress.len(), 25); + + let taken_array = builder.take_n(3); + assert_eq!(&taken_array, &input_array.slice(16, 3)); + + // ####### Test situation 7 ####### + // Create a new builder + let mut builder = + ByteViewGroupValueBuilder::::new().with_max_block_size(60); + + for row in 0..final_ones_to_append { + builder.append_val(&input_array, row); + } + + assert_eq!(builder.completed.len(), 3); + assert_eq!(builder.in_progress.len(), 25); + + let taken_array = builder.take_n(final_ones_to_append); + assert_eq!(&taken_array, &input_array); } } From e6c7e7e6f517a6cb262079f4ec5524201b3ab23b Mon Sep 17 00:00:00 2001 From: kamille Date: Sun, 13 Oct 2024 18:55:57 +0800 Subject: [PATCH 19/26] fix doc. --- .../physical-plan/src/aggregates/group_values/group_column.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index 2bb47c873047..ae2e509d36b5 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -424,7 +424,7 @@ pub struct ByteViewGroupValueBuilder { /// Nulls nulls: MaybeNullBufferBuilder, - /// phantom data so the type requires + /// phantom data so the type requires `` _phantom: PhantomData, } From 36d556e3fcdd47832cd408ede6a1798d6d180442 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sun, 13 Oct 2024 07:55:26 -0400 Subject: [PATCH 20/26] Do not re-validate output is utf8 --- .../src/aggregates/group_values/group_column.rs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index ae2e509d36b5..3206af3aa230 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -592,11 +592,17 @@ impl ByteViewGroupValueBuilder { let views = ScalarBuffer::from(views); - Arc::new(GenericByteViewArray::::new( - views, - completed, - null_buffer, - )) + // Safety: + // * all views were correctly made + // * (if utf8): Input was valid Utf8 so buffer contents are + // valid utf8 as well + unsafe { + Arc::new(GenericByteViewArray::::new_unchecked( + views, + completed, + null_buffer, + )) + } } fn take_n_inner(&mut self, n: usize) -> ArrayRef { From 1fd926f1efbde6e8b543fac48af7b9989ae2345f Mon Sep 17 00:00:00 2001 From: kamille Date: Sun, 13 Oct 2024 21:28:13 +0800 Subject: [PATCH 21/26] switch to unchecked when building array. --- .../aggregates/group_values/group_column.rs | 32 +++++++++++++++---- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index 3206af3aa230..1b17f06269f5 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -680,14 +680,32 @@ impl ByteViewGroupValueBuilder { // Build array and return let views = ScalarBuffer::from(first_n_views); - Arc::new(GenericByteViewArray::::new(views, buffers, null_buffer)) + + // Safety: + // * all views were correctly made + // * (if utf8): Input was valid Utf8 so buffer contents are + // valid utf8 as well + unsafe { + Arc::new(GenericByteViewArray::::new_unchecked( + views, + buffers, + null_buffer, + )) + } } else { let views = ScalarBuffer::from(first_n_views); - Arc::new(GenericByteViewArray::::new( - views, - Vec::new(), - null_buffer, - )) + + // Safety: + // * all views were correctly made + // * (if utf8): Input was valid Utf8 so buffer contents are + // valid utf8 as well + unsafe { + Arc::new(GenericByteViewArray::::new_unchecked( + views, + Vec::new(), + null_buffer, + )) + } } } @@ -1090,7 +1108,7 @@ mod tests { // 3. Take non-inlined + partial last buffer in `completed` // 4. Take non-inlined + whole last buffer in `completed` // 5. Take non-inlined + partial last `in_progress` - // 6. Take non-inlined + while last buffer in ``in_progress` + // 6. Take non-inlined + while last buffer in `in_progress` // 7. Take all views at once let mut builder = From 34918cbfdea4671b953e3ea9c194fc172828349a Mon Sep 17 00:00:00 2001 From: kamille Date: Sun, 13 Oct 2024 21:41:56 +0800 Subject: [PATCH 22/26] improve naming. --- .../aggregates/group_values/group_column.rs | 40 +++++++++---------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index 1b17f06269f5..d47997d1e617 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -645,29 +645,29 @@ impl ByteViewGroupValueBuilder { if let Some(view) = last_non_inlined_view { let view = ByteView::from(*view); - let last_related_buffer_index = view.buffer_index as usize; + let last_remaining_buffer_index = view.buffer_index as usize; - // Check should we take the whole `last_related_buffer_index` buffer + // Check should we take the whole `last_remaining_buffer_index` buffer let take_whole_last_buffer = self.should_take_whole_buffer( - last_related_buffer_index, + last_remaining_buffer_index, (view.offset + view.length) as usize, ); // Take related buffers let buffers = if take_whole_last_buffer { - self.take_buffers_with_whole_last(last_related_buffer_index) + self.take_buffers_with_whole_last(last_remaining_buffer_index) } else { self.take_buffers_with_partial_last( - last_related_buffer_index, + last_remaining_buffer_index, (view.offset + view.length) as usize, ) }; // Shift `buffer index`s finally let shifts = if take_whole_last_buffer { - last_related_buffer_index + 1 + last_remaining_buffer_index + 1 } else { - last_related_buffer_index + last_remaining_buffer_index }; self.views.iter_mut().for_each(|view| { @@ -711,35 +711,35 @@ impl ByteViewGroupValueBuilder { fn take_buffers_with_whole_last( &mut self, - last_related_buffer_index: usize, + last_remaining_buffer_index: usize, ) -> Vec { - if last_related_buffer_index == self.completed.len() { + if last_remaining_buffer_index == self.completed.len() { self.flush_in_progress(); } self.completed - .drain(0..last_related_buffer_index + 1) + .drain(0..last_remaining_buffer_index + 1) .collect() } fn take_buffers_with_partial_last( &mut self, - last_related_buffer_index: usize, - take_len: usize, + last_remaining_buffer_index: usize, + last_take_len: usize, ) -> Vec { - let mut take_buffers = Vec::with_capacity(last_related_buffer_index + 1); + let mut take_buffers = Vec::with_capacity(last_remaining_buffer_index + 1); - // Take `0 ~ last_related_buffer_index - 1` buffers - if !self.completed.is_empty() || last_related_buffer_index == 0 { - take_buffers.extend(self.completed.drain(0..last_related_buffer_index)); + // Take `0 ~ last_remaining_buffer_index - 1` buffers + if !self.completed.is_empty() || last_remaining_buffer_index == 0 { + take_buffers.extend(self.completed.drain(0..last_remaining_buffer_index)); } - // Process the `last_related_buffer_index` buffers - let last_buffer = if last_related_buffer_index < self.completed.len() { + // Process the `last_remaining_buffer_index` buffers + let last_buffer = if last_remaining_buffer_index < self.completed.len() { // If it is in `completed`, simply clone - self.completed[last_related_buffer_index].clone() + self.completed[last_remaining_buffer_index].clone() } else { // If it is `in_progress`, copied `0 ~ offset` part - let taken_last_buffer = self.in_progress[0..take_len].to_vec(); + let taken_last_buffer = self.in_progress[0..last_take_len].to_vec(); Buffer::from_vec(taken_last_buffer) }; take_buffers.push(last_buffer); From 8348024d096377d6ae78b7b9584a0103bd8b344f Mon Sep 17 00:00:00 2001 From: kamille Date: Sun, 13 Oct 2024 21:47:26 +0800 Subject: [PATCH 23/26] use let else to make the codes clearer. --- .../aggregates/group_values/group_column.rs | 102 +++++++++--------- 1 file changed, 52 insertions(+), 50 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index d47997d1e617..21076e019398 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -643,42 +643,8 @@ impl ByteViewGroupValueBuilder { .rev() .find(|view| ((**view) as u32) > 12); - if let Some(view) = last_non_inlined_view { - let view = ByteView::from(*view); - let last_remaining_buffer_index = view.buffer_index as usize; - - // Check should we take the whole `last_remaining_buffer_index` buffer - let take_whole_last_buffer = self.should_take_whole_buffer( - last_remaining_buffer_index, - (view.offset + view.length) as usize, - ); - - // Take related buffers - let buffers = if take_whole_last_buffer { - self.take_buffers_with_whole_last(last_remaining_buffer_index) - } else { - self.take_buffers_with_partial_last( - last_remaining_buffer_index, - (view.offset + view.length) as usize, - ) - }; - - // Shift `buffer index`s finally - let shifts = if take_whole_last_buffer { - last_remaining_buffer_index + 1 - } else { - last_remaining_buffer_index - }; - - self.views.iter_mut().for_each(|view| { - if (*view as u32) > 12 { - let mut byte_view = ByteView::from(*view); - byte_view.buffer_index -= shifts as u32; - *view = byte_view.as_u128(); - } - }); - - // Build array and return + // All taken views inlined + let Some(view) = last_non_inlined_view else { let views = ScalarBuffer::from(first_n_views); // Safety: @@ -686,26 +652,62 @@ impl ByteViewGroupValueBuilder { // * (if utf8): Input was valid Utf8 so buffer contents are // valid utf8 as well unsafe { - Arc::new(GenericByteViewArray::::new_unchecked( + return Arc::new(GenericByteViewArray::::new_unchecked( views, - buffers, + Vec::new(), null_buffer, - )) + )); } + }; + + // Unfortunately, some taken views non-inlined + let view = ByteView::from(*view); + let last_remaining_buffer_index = view.buffer_index as usize; + + // Check should we take the whole `last_remaining_buffer_index` buffer + let take_whole_last_buffer = self.should_take_whole_buffer( + last_remaining_buffer_index, + (view.offset + view.length) as usize, + ); + + // Take related buffers + let buffers = if take_whole_last_buffer { + self.take_buffers_with_whole_last(last_remaining_buffer_index) } else { - let views = ScalarBuffer::from(first_n_views); + self.take_buffers_with_partial_last( + last_remaining_buffer_index, + (view.offset + view.length) as usize, + ) + }; - // Safety: - // * all views were correctly made - // * (if utf8): Input was valid Utf8 so buffer contents are - // valid utf8 as well - unsafe { - Arc::new(GenericByteViewArray::::new_unchecked( - views, - Vec::new(), - null_buffer, - )) + // Shift `buffer index`s finally + let shifts = if take_whole_last_buffer { + last_remaining_buffer_index + 1 + } else { + last_remaining_buffer_index + }; + + self.views.iter_mut().for_each(|view| { + if (*view as u32) > 12 { + let mut byte_view = ByteView::from(*view); + byte_view.buffer_index -= shifts as u32; + *view = byte_view.as_u128(); } + }); + + // Build array and return + let views = ScalarBuffer::from(first_n_views); + + // Safety: + // * all views were correctly made + // * (if utf8): Input was valid Utf8 so buffer contents are + // valid utf8 as well + unsafe { + Arc::new(GenericByteViewArray::::new_unchecked( + views, + buffers, + null_buffer, + )) } } From 023ed64431f2799e0e645933a233caa2fd593866 Mon Sep 17 00:00:00 2001 From: kamille Date: Sun, 13 Oct 2024 21:59:14 +0800 Subject: [PATCH 24/26] fix typo. --- .../physical-plan/src/aggregates/group_values/group_column.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index 21076e019398..87effb4aebb2 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -1110,7 +1110,7 @@ mod tests { // 3. Take non-inlined + partial last buffer in `completed` // 4. Take non-inlined + whole last buffer in `completed` // 5. Take non-inlined + partial last `in_progress` - // 6. Take non-inlined + while last buffer in `in_progress` + // 6. Take non-inlined + whole last buffer in `in_progress` // 7. Take all views at once let mut builder = From c4d45c7bb41287c7a5c8b44e11aed4484c30be11 Mon Sep 17 00:00:00 2001 From: kamille Date: Mon, 14 Oct 2024 01:39:00 +0800 Subject: [PATCH 25/26] improve unit test coverage for `ByteViewGroupValueBuilder`. --- .../aggregates/group_values/group_column.rs | 60 +++++++++++++++---- 1 file changed, 49 insertions(+), 11 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index 87effb4aebb2..41534958602e 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -1044,18 +1044,39 @@ mod tests { // - exist null, input null; values not equal // - exist null, input null; values equal // - exist not null, input null - // - exist not null, input not null; values not equal - // - exist not null, input not null; values equal + // - exist not null, input not null; value lens not equal + // - exist not null, input not null; value not equal(inlined case) + // - exist not null, input not null; value equal(inlined case) + // + // - exist not null, input not null; value not equal + // (non-inlined case + prefix not equal) + // + // - exist not null, input not null; value not equal + // (non-inlined case + value in `completed`) + // + // - exist not null, input not null; value equal + // (non-inlined case + value in `completed`) + // + // - exist not null, input not null; value not equal + // (non-inlined case + value in `in_progress`) + // + // - exist not null, input not null; value equal + // (non-inlined case + value in `in_progress`) - let mut builder = ByteViewGroupValueBuilder::::new(); + // Set the block size to 40 for ensuring some unlined values are in `in_progress`, + // and some are in `completed`, so both two branches in `value` function can be covered. + let mut builder = + ByteViewGroupValueBuilder::::new().with_max_block_size(60); let builder_array = Arc::new(StringViewArray::from(vec![ None, None, None, Some("foo"), + Some("bazz"), + Some("foo"), Some("bar"), - Some("this string is quite long"), - Some("baz"), + Some("I am a long string for test eq in completed"), + Some("I am a long string for test eq in progress"), ])) as ArrayRef; builder.append_val(&builder_array, 0); builder.append_val(&builder_array, 1); @@ -1064,28 +1085,40 @@ mod tests { builder.append_val(&builder_array, 4); builder.append_val(&builder_array, 5); builder.append_val(&builder_array, 6); + builder.append_val(&builder_array, 7); + builder.append_val(&builder_array, 8); // Define input array let (views, buffer, _nulls) = StringViewArray::from(vec![ Some("foo"), - Some("bar"), // set to null - Some("this string is quite long"), // set to null + Some("bar"), // set to null None, None, - Some("foo"), Some("baz"), + Some("oof"), + Some("bar"), + Some("i am a long string for test eq in completed"), + Some("I am a long string for test eq in COMPLETED"), + Some("I am a long string for test eq in completed"), + Some("I am a long string for test eq in PROGRESS"), + Some("I am a long string for test eq in progress"), ]) .into_parts(); // explicitly build a boolean buffer where one of the null values also happens to match - let mut boolean_buffer_builder = BooleanBufferBuilder::new(6); + let mut boolean_buffer_builder = BooleanBufferBuilder::new(9); boolean_buffer_builder.append(true); boolean_buffer_builder.append(false); // this sets Some("bar") to null above - boolean_buffer_builder.append(false); // this sets Some("thisstringisquitelong") to null above boolean_buffer_builder.append(false); boolean_buffer_builder.append(false); boolean_buffer_builder.append(true); boolean_buffer_builder.append(true); + boolean_buffer_builder.append(true); + boolean_buffer_builder.append(true); + boolean_buffer_builder.append(true); + boolean_buffer_builder.append(true); + boolean_buffer_builder.append(true); + boolean_buffer_builder.append(true); let nulls = NullBuffer::new(boolean_buffer_builder.finish()); let input_array = Arc::new(StringViewArray::new(views, buffer, Some(nulls))) as ArrayRef; @@ -1098,6 +1131,11 @@ mod tests { assert!(!builder.equal_to(4, &input_array, 4)); assert!(!builder.equal_to(5, &input_array, 5)); assert!(builder.equal_to(6, &input_array, 6)); + assert!(!builder.equal_to(7, &input_array, 7)); + assert!(!builder.equal_to(7, &input_array, 8)); + assert!(builder.equal_to(7, &input_array, 9)); + assert!(!builder.equal_to(8, &input_array, 10)); + assert!(builder.equal_to(8, &input_array, 11)); } #[test] @@ -1149,7 +1187,7 @@ mod tests { let input_array: ArrayRef = Arc::new(input_array); let first_ones_to_append = 16; // For testing situation 1~5 - let second_ones_to_append = 3; // For testing situation 6 + let second_ones_to_append = 4; // For testing situation 6 let final_ones_to_append = input_array.len(); // For testing situation 7 // ####### Test situation 1~5 ####### From 6e63eec290913f8b6931b56ab0928ed095e0a5ea Mon Sep 17 00:00:00 2001 From: kamille Date: Wed, 16 Oct 2024 03:19:38 +0800 Subject: [PATCH 26/26] tmp --- .../aggregates/group_values/group_column.rs | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index 41534958602e..51e646d02b97 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -58,6 +58,9 @@ pub trait GroupColumn: Send + Sync { fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool; /// Appends the row at `row` in `array` to this builder fn append_val(&mut self, array: &ArrayRef, row: usize); + + fn append_non_nullable_val(&mut self, array: &ArrayRef, row: usize); + /// Returns the number of rows stored in this builder fn len(&self) -> usize; /// Returns the number of bytes used by this [`GroupColumn`] @@ -218,6 +221,17 @@ where } } + fn append_non_nullable_val_inner(&mut self, array: &ArrayRef, row: usize) + where + B: ByteArrayType, + { + let arr = array.as_bytes::(); + self.nulls.append(false); + let value: &[u8] = arr.value(row).as_ref(); + self.buffer.append_slice(value); + self.offsets.push(O::usize_as(self.buffer.len())); + } + fn equal_to_inner(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool where B: ByteArrayType, @@ -383,6 +397,27 @@ where _ => unreachable!("View types should use `ArrowBytesViewMap`"), } } + + fn append_non_nullable_val(&mut self, array: &ArrayRef, row: usize) { + // Sanity array type + match self.output_type { + OutputType::Binary => { + debug_assert!(matches!( + column.data_type(), + DataType::Binary | DataType::LargeBinary + )); + self.append_val_inner::>(column, row) + } + OutputType::Utf8 => { + debug_assert!(matches!( + column.data_type(), + DataType::Utf8 | DataType::LargeUtf8 + )); + self.append_val_inner::>(column, row) + } + _ => unreachable!("View types should use `ArrowBytesViewMap`"), + }; + } } /// An implementation of [`GroupColumn`] for binary view and utf8 view types.