diff --git a/datafusion/physical-expr-common/src/group_value_row.rs b/datafusion/physical-expr-common/src/group_value_row.rs index db45648bc6cbf..30ad14dad5fef 100644 --- a/datafusion/physical-expr-common/src/group_value_row.rs +++ b/datafusion/physical-expr-common/src/group_value_row.rs @@ -15,57 +15,37 @@ // specific language governing permissions and limitations // under the License. -use arrow::array::BooleanBufferBuilder; -use arrow::array::BufferBuilder; -use arrow::array::GenericBinaryArray; use arrow::array::GenericBinaryBuilder; -use arrow::array::GenericStringArray; use arrow::array::GenericStringBuilder; use arrow::array::OffsetSizeTrait; use arrow::array::PrimitiveBuilder; -use arrow::buffer::NullBuffer; -use arrow::buffer::OffsetBuffer; -use arrow::buffer::ScalarBuffer; -use arrow::datatypes::ArrowNativeType; -use arrow::datatypes::DataType; +use arrow::array::{Array, ArrayRef, ArrowPrimitiveType, AsArray}; use arrow::datatypes::GenericBinaryType; use arrow::datatypes::GenericStringType; -use arrow::{ - array::{Array, ArrayRef, ArrowPrimitiveType, AsArray, PrimitiveArray}, - datatypes::ByteArrayType, -}; - -use crate::binary_map::OutputType; -use crate::binary_map::INITIAL_BUFFER_CAPACITY; use std::sync::Arc; -pub trait ArrayEqV2: Send + Sync { +pub trait ArrayRowEq: Send + Sync { fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool; fn append_val(&mut self, array: &ArrayRef, row: usize); fn len(&self) -> usize; + // take n elements as ArrayRef and adjusted the underlying buffer + fn take_n(&mut self, n: usize) -> ArrayRef; fn build(&mut self) -> ArrayRef; } -pub trait ArrayEq: Send + Sync { - fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool; - fn append_val(&mut self, array: &ArrayRef, row: usize); - fn len(&self) -> usize; - fn build(self: Box) -> ArrayRef; -} - -impl ArrayEqV2 for PrimitiveBuilder +impl ArrayRowEq for PrimitiveBuilder where T: ArrowPrimitiveType, { fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool { let arr = array.as_primitive::(); - if let Some(nulls) = self.validity_slice() { - let null_slice_index = lhs_row / 8; - let null_bit_map_index = lhs_row % 8; - let is_elem_null = ((nulls[null_slice_index] >> null_bit_map_index) & 1) == 1; - if is_elem_null { + if let Some(validity_slice) = self.validity_slice() { + let validity_slice_index = lhs_row / 8; + let bit_map_index = lhs_row % 8; + let is_lhs_null = ((validity_slice[validity_slice_index] >> bit_map_index) & 1) == 0; + if is_lhs_null { return arr.is_null(rhs_row); } else if arr.is_null(rhs_row) { return false; @@ -86,109 +66,60 @@ where } } - fn len(&self) -> usize { - self.values_slice().len() - } + fn take_n(&mut self, n: usize) -> ArrayRef { + todo!("") - fn build(&mut self) -> ArrayRef { - Arc::new(self.finish()) - } -} + // let num_remaining = self.values_slice().len() - n; + // assert!(num_remaining >= 0); -pub struct PrimitiveGroupValueBuilder(Vec>); + // let mut builder = PrimitiveBuilder::::new(); + // let vs = self.values_slice(); + // builder.append_slice(vs); -impl PrimitiveGroupValueBuilder { - pub fn new() -> Self { - Self(vec![]) - } -} + // let mut values_left = vec![T::default_value(); num_remaining]; + // let mut null_buffer_left = NullBuffer::new_null(num_remaining); -impl ArrayEq for PrimitiveGroupValueBuilder { - fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool { - let elem = self.0[lhs_row]; - let arr = array.as_primitive::(); - let is_rhs_null = arr.is_null(rhs_row); - if elem.is_none() && is_rhs_null { - true - } else if elem.is_some() && !is_rhs_null { - elem.unwrap() == arr.value(rhs_row) - } else { - false - } - } + // let null_buffer_left = self.validity_slice(); + + // let len = self.len(); + // let nulls = self.null_buffer_builder.finish(); + // let builder = ArrayData::builder(self.data_type.clone()) + // .len(len) + // .add_buffer(self.values_builder.finish()) + // .nulls(nulls); + + // let array_data = unsafe { builder.build_unchecked() }; + // PrimitiveArray::::from(array_data) + + // let output = self.finish(); + + // let mut values_buffer = MutableBuffer::new(num_remaining); + // values_buffer.extend_from_slice(&values_left); + // let mut null_buffer = MutableBuffer::new_null(num_remaining); + // null_buffer.extend_from_slice(items) + - fn append_val(&mut self, array: &ArrayRef, row: usize) { - let arr = array.as_primitive::(); - if arr.is_null(row) { - self.0.push(None) - } else { - let elem = arr.value(row); - self.0.push(Some(elem)) - } } fn len(&self) -> usize { - self.0.len() + self.values_slice().len() } - fn build(self: Box) -> ArrayRef { - Arc::new(PrimitiveArray::::from_iter(self.0)) + fn build(&mut self) -> ArrayRef { + Arc::new(self.finish()) } } -// pub struct StringGroupValueBuilder(Vec>); - -// impl StringGroupValueBuilder { -// pub fn new() -> Self { -// Self(vec![]) -// } -// } - -// impl ArrayEq for StringGroupValueBuilder { -// fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool { -// let elem = &self.0[lhs_row]; -// let arr = array.as_string::(); -// let is_rhs_null = arr.is_null(rhs_row); -// if elem.is_none() && is_rhs_null { -// true -// } else if elem.is_some() && !is_rhs_null { -// let e = elem.as_ref().unwrap(); -// e.as_str() == arr.value(rhs_row) -// } else { -// false -// } -// } - -// fn append_val(&mut self, array: &ArrayRef, row: usize) { -// let arr = array.as_string::(); -// if arr.is_null(row) { -// self.0.push(None) -// } else { -// let elem = arr.value(row); -// self.0.push(Some(elem.to_string())) -// } -// } - -// fn len(&self) -> usize { -// self.0.len() -// } - -// fn build(self: Box) -> ArrayRef { -// Arc::new(StringArray::from_iter(self.0)) -// } -// } - -impl ArrayEqV2 for GenericStringBuilder +impl ArrayRowEq for GenericStringBuilder where O: OffsetSizeTrait, { fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool { let arr = array.as_bytes::>(); - if let Some(nulls) = self.validity_slice() { - let null_slice_index = lhs_row / 8; - let null_bit_map_index = lhs_row % 8; - - let is_lhs_null = ((nulls[null_slice_index] >> null_bit_map_index) & 1) == 1; + if let Some(validity_slice) = self.validity_slice() { + let validity_slice_index = lhs_row / 8; + let bit_map_index = lhs_row % 8; + let is_lhs_null = ((validity_slice[validity_slice_index] >> bit_map_index) & 1) == 0; if is_lhs_null { return arr.is_null(rhs_row); } else if arr.is_null(rhs_row) { @@ -216,6 +147,10 @@ where self.append_value(value); } + fn take_n(&mut self, n: usize) -> ArrayRef { + todo!("") + } + fn len(&self) -> usize { self.offsets_slice().len() - 1 } @@ -225,16 +160,16 @@ where } } -impl ArrayEqV2 for GenericBinaryBuilder +impl ArrayRowEq for GenericBinaryBuilder where O: OffsetSizeTrait, { fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool { let arr = array.as_bytes::>(); - if let Some(nulls) = self.validity_slice() { - let null_slice_index = lhs_row / 8; - let null_bit_map_index = lhs_row % 8; - let is_lhs_null = ((nulls[null_slice_index] >> null_bit_map_index) & 1) == 1; + if let Some(validity_slice) = self.validity_slice() { + let validity_slice_index = lhs_row / 8; + let bit_map_index = lhs_row % 8; + let is_lhs_null = ((validity_slice[validity_slice_index] >> bit_map_index) & 1) == 0; if is_lhs_null { return arr.is_null(rhs_row); } else if arr.is_null(rhs_row) { @@ -262,6 +197,10 @@ where self.append_value(value); } + fn take_n(&mut self, n: usize) -> ArrayRef { + todo!("") + } + fn len(&self) -> usize { self.values_slice().len() } @@ -271,521 +210,48 @@ where } } -pub struct ByteGroupValueBuilderNaive -where - O: OffsetSizeTrait, -{ - output_type: OutputType, - buffer: BufferBuilder, - /// Offsets into `buffer` for each distinct value. These offsets as used - /// directly to create the final `GenericBinaryArray`. The `i`th string is - /// stored in the range `offsets[i]..offsets[i+1]` in `buffer`. Null values - /// are stored as a zero length string. - offsets: Vec, - /// Null indexes in offsets - nulls: Vec, -} - -impl ByteGroupValueBuilderNaive -where - O: OffsetSizeTrait, -{ - pub fn new(output_type: OutputType) -> Self { - Self { - output_type, - buffer: BufferBuilder::new(INITIAL_BUFFER_CAPACITY), - offsets: vec![O::default()], - nulls: vec![], - } - } - - fn append_val_inner(&mut self, array: &ArrayRef, row: usize) - where - B: ByteArrayType, - { - let arr = array.as_bytes::(); - if arr.is_null(row) { - self.nulls.push(self.len()); - // nulls need a zero length in the offset buffer - let offset = self.buffer.len(); - self.offsets.push(O::usize_as(offset)); - return; - } - - let value: &[u8] = arr.value(row).as_ref(); - self.buffer.append_slice(value); - self.offsets.push(O::usize_as(self.buffer.len())); - } - - fn equal_to_inner(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool - where - B: ByteArrayType, - { - // Handle nulls - let is_lhs_null = self.nulls.iter().any(|null_idx| *null_idx == lhs_row); - let arr = array.as_bytes::(); - if is_lhs_null { - return arr.is_null(rhs_row); - } else if arr.is_null(rhs_row) { - return false; - } - - let arr = array.as_bytes::(); - let rhs_elem: &[u8] = arr.value(rhs_row).as_ref(); - let rhs_elem_len = arr.value_length(rhs_row).as_usize(); - assert_eq!(rhs_elem_len, rhs_elem.len()); - let l = self.offsets[lhs_row].as_usize(); - let r = self.offsets[lhs_row + 1].as_usize(); - let existing_elem = unsafe { self.buffer.as_slice().get_unchecked(l..r) }; - existing_elem.len() == rhs_elem.len() && rhs_elem == existing_elem - } -} - -impl ArrayEq for ByteGroupValueBuilderNaive -where - O: OffsetSizeTrait, -{ - fn equal_to(&self, lhs_row: usize, column: &ArrayRef, rhs_row: usize) -> bool { - // Sanity array type - match self.output_type { - OutputType::Binary => { - assert!(matches!( - column.data_type(), - DataType::Binary | DataType::LargeBinary - )); - self.equal_to_inner::>(lhs_row, column, rhs_row) - } - OutputType::Utf8 => { - assert!(matches!( - column.data_type(), - DataType::Utf8 | DataType::LargeUtf8 - )); - self.equal_to_inner::>(lhs_row, column, rhs_row) - } - _ => unreachable!("View types should use `ArrowBytesViewMap`"), - } - } - - fn append_val(&mut self, column: &ArrayRef, row: usize) { - // Sanity array type - match self.output_type { - OutputType::Binary => { - assert!(matches!( - column.data_type(), - DataType::Binary | DataType::LargeBinary - )); - self.append_val_inner::>(column, row) - } - OutputType::Utf8 => { - assert!(matches!( - column.data_type(), - DataType::Utf8 | DataType::LargeUtf8 - )); - self.append_val_inner::>(column, row) - } - _ => unreachable!("View types should use `ArrowBytesViewMap`"), - }; - } +#[cfg(test)] +mod tests { + use arrow::{array::{GenericStringBuilder, PrimitiveBuilder}, datatypes::Int32Type}; - fn len(&self) -> usize { - self.offsets.len() - 1 - } + #[test] + fn te1() { + let mut a = PrimitiveBuilder::::new(); + a.append_value(1); + a.append_value(2); + a.append_null(); + a.append_null(); + a.append_value(2); - fn build(self: Box) -> ArrayRef { - let Self { - output_type, - mut buffer, - offsets, - nulls, - } = *self; + let s = a.values_slice(); + let v = a.validity_slice(); + println!("s: {:?}", s); + println!("v: {:?}", v); - let null_buffer = if nulls.is_empty() { - None - } else { - // Only make a `NullBuffer` if there was a null value - let num_values = offsets.len() - 1; - let mut bool_builder = BooleanBufferBuilder::new(num_values); - bool_builder.append_n(num_values, true); - nulls.into_iter().for_each(|null_index| { - bool_builder.set_bit(null_index, false); - }); - Some(NullBuffer::from(bool_builder.finish())) - }; - - // SAFETY: the offsets were constructed correctly in `insert_if_new` -- - // monotonically increasing, overflows were checked. - let offsets = unsafe { OffsetBuffer::new_unchecked(ScalarBuffer::from(offsets)) }; - let values = buffer.finish(); - - match output_type { - OutputType::Binary => { - // SAFETY: the offsets were constructed correctly - Arc::new(unsafe { - GenericBinaryArray::new_unchecked(offsets, values, null_buffer) - }) - } - OutputType::Utf8 => { - // SAFETY: - // 1. the offsets were constructed safely - // - // 2. we asserted the input arrays were all the correct type and - // thus since all the values that went in were valid (e.g. utf8) - // so are all the values that come out - let res = Arc::new(unsafe { - GenericStringArray::new_unchecked(offsets, values, null_buffer) - }); - res - } - _ => unreachable!("View types should use `ArrowBytesViewMap`"), - } } -} - -// pub struct ByteGroupValueBuilder -// where -// O: OffsetSizeTrait, -// { -// output_type: OutputType, -// /// Underlying hash set for each distinct value -// map: hashbrown::raw::RawTable>, -// /// Total size of the map in bytes -// map_size: usize, -// buffer: BufferBuilder, -// /// Offsets into `buffer` for each distinct value. These offsets as used -// /// directly to create the final `GenericBinaryArray`. The `i`th string is -// /// stored in the range `offsets[i]..offsets[i+1]` in `buffer`. Null values -// /// are stored as a zero length string. -// offsets: Vec, -// /// buffer that stores hash values (reused across batches to save allocations) -// hashes_buffer: Vec, -// /// Null indexes in offsets -// nulls: Vec, -// // Store the offset + len for group values -// group_values_offset: Vec>, -// } - -// impl ByteGroupValueBuilder -// where -// O: OffsetSizeTrait, -// { -// pub fn new(array: &ArrayRef, output_type: OutputType) -> Self { -// let n_rows = array.len(); -// let random_state = RandomState::new(); -// let mut hashes_buffer = vec![]; -// let batch_hashes = &mut hashes_buffer; -// batch_hashes.clear(); -// batch_hashes.resize(n_rows, 0); -// create_hashes(&[array.to_owned()], &random_state, batch_hashes) -// // hash is supported for all types and create_hashes only -// // returns errors for unsupported types -// .unwrap(); - -// Self { -// output_type, -// map: hashbrown::raw::RawTable::with_capacity(INITIAL_MAP_CAPACITY), -// map_size: 0, -// buffer: BufferBuilder::new(INITIAL_BUFFER_CAPACITY), -// offsets: vec![O::default()], -// hashes_buffer, -// nulls: vec![], -// group_values_offset: vec![], -// } -// } - -// fn append_val_inner(&mut self, array: &ArrayRef, row: usize) -// where -// B: ByteArrayType, -// { -// let arr = array.as_bytes::(); -// if arr.is_null(row) { -// self.nulls.push(self.offsets.len() - 1); -// // nulls need a zero length in the offset buffer -// let offset = self.buffer.len(); -// self.offsets.push(O::usize_as(offset)); -// return; -// } - -// let hash = self.hashes_buffer[row]; -// let value: &[u8] = arr.value(row).as_ref(); -// let value_len = O::usize_as(value.len()); - -// if value.len() <= SHORT_VALUE_LEN { -// let inline = value.iter().fold(0usize, |acc, &x| acc << 8 | x as usize); -// // is value is already present in the set? -// let entry = self.map.get(hash, |header| { -// // compare value if hashes match -// if header.len != value_len { -// return false; -// } -// // value is stored inline so no need to consult buffer -// // (this is the "small string optimization") -// inline == header.offset_or_inline -// }); - -// // Put the small values into buffer and offsets so it appears -// // the output array, but store the actual bytes inline for -// // comparison -// self.buffer.append_slice(value); -// self.offsets.push(O::usize_as(self.buffer.len())); -// if let Some(entry) = entry { -// } -// // if no existing entry, make a new one -// else { -// // let payload = make_payload_fn(Some(value)); -// let new_header = EntryWithPayload { -// hash, -// len: value_len, -// offset_or_inline: inline, -// }; -// self.map.insert_accounted( -// new_header, -// |header| header.hash, -// &mut self.map_size, -// ); -// } -// } else { -// // Check if the value is already present in the set -// let entry = self.map.get_mut(hash, |header| { -// // compare value if hashes match -// if header.len != value_len { -// return false; -// } -// // Need to compare the bytes in the buffer -// // SAFETY: buffer is only appended to, and we correctly inserted values and offsets -// let existing_value = -// unsafe { self.buffer.as_slice().get_unchecked(header.range()) }; -// value == existing_value -// }); - -// let offset = self.buffer.len(); // offset of start for data -// self.buffer.append_slice(value); -// self.offsets.push(O::usize_as(self.buffer.len())); - -// if let Some(entry) = entry { -// } -// // if no existing entry, make a new header in map for equality check -// else { -// let new_header = EntryWithPayload { -// hash, -// len: value_len, -// offset_or_inline: offset, -// }; -// self.map.insert_accounted( -// new_header, -// |header| header.hash, -// &mut self.map_size, -// ); -// } -// }; -// } - -// fn equal_to_inner(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool -// where -// B: ByteArrayType, -// { -// // Handle nulls -// let is_lhs_null = self.nulls.iter().any(|null_idx| *null_idx == lhs_row); -// let arr = array.as_bytes::(); -// if is_lhs_null { -// return arr.is_null(rhs_row); -// } else if arr.is_null(rhs_row) { -// return false; -// } - -// let hash = self.hashes_buffer[rhs_row]; -// let arr = array.as_bytes::(); -// let rhs_elem: &[u8] = arr.value(rhs_row).as_ref(); -// let rhs_elem_len = O::usize_as(rhs_elem.len()); -// if rhs_elem.len() <= SHORT_VALUE_LEN { -// let inline = rhs_elem -// .iter() -// .fold(0usize, |acc, &x| acc << 8 | x as usize); -// // is value is already present in the set? -// let entry = self.map.get(hash, |header| { -// // compare value if hashes match -// if header.len != rhs_elem_len { -// return false; -// } -// // value is stored inline so no need to consult buffer -// // (this is the "small string optimization") -// inline == header.offset_or_inline -// }); -// entry.is_some() -// } else { -// // Check if the value is already present in the set -// let entry = self.map.get(hash, |header| { -// // if header.hash != hash { -// // return false; -// // } - -// // compare value if hashes match -// if header.len != rhs_elem_len { -// return false; -// } -// // Need to compare the bytes in the buffer -// // SAFETY: buffer is only appended to, and we correctly inserted values and offsets -// let existing_elem = -// unsafe { self.buffer.as_slice().get_unchecked(header.range()) }; -// rhs_elem == existing_elem -// }); -// entry.is_some() -// } -// } -// } - -// impl ArrayEq for ByteGroupValueBuilder -// where -// O: OffsetSizeTrait, -// { -// fn equal_to(&self, lhs_row: usize, column: &ArrayRef, rhs_row: usize) -> bool { -// // Sanity array type -// match self.output_type { -// OutputType::Binary => { -// assert!(matches!( -// column.data_type(), -// DataType::Binary | DataType::LargeBinary -// )); -// self.equal_to_inner::>(lhs_row, column, rhs_row) -// } -// OutputType::Utf8 => { -// assert!(matches!( -// column.data_type(), -// DataType::Utf8 | DataType::LargeUtf8 -// )); -// self.equal_to_inner::>(lhs_row, column, rhs_row) -// } -// _ => unreachable!("View types should use `ArrowBytesViewMap`"), -// } -// } - -// fn append_val(&mut self, column: &ArrayRef, row: usize) { -// // Sanity array type -// match self.output_type { -// OutputType::Binary => { -// assert!(matches!( -// column.data_type(), -// DataType::Binary | DataType::LargeBinary -// )); -// self.append_val_inner::>(column, row) -// } -// OutputType::Utf8 => { -// assert!(matches!( -// column.data_type(), -// DataType::Utf8 | DataType::LargeUtf8 -// )); -// self.append_val_inner::>(column, row) -// } -// _ => unreachable!("View types should use `ArrowBytesViewMap`"), -// }; -// } - -// fn len(&self) -> usize { -// self.offsets.len() - 1 -// } - -// fn build(self: Box) -> ArrayRef { -// let Self { -// map: _, -// map_size: _, -// mut buffer, -// offsets, -// hashes_buffer: _, -// nulls, -// output_type, -// group_values_offset, -// } = *self; - -// let null_buffer = if nulls.is_empty() { -// None -// } else { -// // Only make a `NullBuffer` if there was a null value -// let num_values = offsets.len() - 1; -// let mut bool_builder = BooleanBufferBuilder::new(num_values); -// bool_builder.append_n(num_values, true); -// nulls.into_iter().for_each(|null_index| { -// bool_builder.set_bit(null_index, false); -// }); -// Some(NullBuffer::from(bool_builder.finish())) -// }; - -// // let nulls = null.map(|null_index| { -// // let num_values = offsets.len() - 1; -// // single_null_buffer(num_values, null_index) -// // }); -// // SAFETY: the offsets were constructed correctly in `insert_if_new` -- -// // monotonically increasing, overflows were checked. -// let offsets = unsafe { OffsetBuffer::new_unchecked(ScalarBuffer::from(offsets)) }; -// let values = buffer.finish(); - -// match output_type { -// OutputType::Binary => { -// // SAFETY: the offsets were constructed correctly -// Arc::new(unsafe { -// GenericBinaryArray::new_unchecked(offsets, values, null_buffer) -// }) -// } -// OutputType::Utf8 => { -// // SAFETY: -// // 1. the offsets were constructed safely -// // -// // 2. we asserted the input arrays were all the correct type and -// // thus since all the values that went in were valid (e.g. utf8) -// // so are all the values that come out -// let res = Arc::new(unsafe { -// GenericStringArray::new_unchecked(offsets, values, null_buffer) -// }); -// res -// } -// _ => unreachable!("View types should use `ArrowBytesViewMap`"), -// } -// } -// } - -// #[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)] -// struct EntryWithPayload -// where -// O: OffsetSizeTrait, -// { -// /// hash of the value (stored to avoid recomputing it in hash table check) -// hash: u64, -// /// if len =< [`SHORT_VALUE_LEN`]: the data inlined -// /// if len > [`SHORT_VALUE_LEN`], the offset of where the data starts -// offset_or_inline: usize, -// /// length of the value, in bytes (use O here so we use only i32 for -// /// strings, rather 64 bit usize) -// len: O, -// } - -// impl EntryWithPayload -// where -// O: OffsetSizeTrait, -// { -// /// returns self.offset..self.offset + self.len -// #[inline(always)] -// fn range(&self) -> Range { -// self.offset_or_inline..self.offset_or_inline + self.len.as_usize() -// } -// } - -#[cfg(test)] -mod tests { - use arrow::{array::GenericByteBuilder, datatypes::GenericStringType}; #[test] fn test123() { - let mut a = GenericByteBuilder::>::new(); + let mut a = GenericStringBuilder::::new(); a.append_null(); - a.append_value("a"); + let p = a.offsets_slice(); + println!("p: {:?}", p); + let s = a.validity_slice(); + println!("s: {:?}", s); a.append_null(); - a.append_value("bc"); - a.append_value("def"); + let p = a.offsets_slice(); + println!("p: {:?}", p); + let s = a.validity_slice(); + println!("s: {:?}", s); + a.append_value("12"); + let p = a.offsets_slice(); + println!("p: {:?}", p); + let s = a.validity_slice(); + println!("s: {:?}", s); a.append_null(); - + let p = a.offsets_slice(); + println!("p: {:?}", p); let s = a.validity_slice(); println!("s: {:?}", s); - let v = a.values_slice(); - let o = a.offsets_slice(); - println!("v: {:?}", v); - println!("o: {:?}", o); } -} +} \ No newline at end of file diff --git a/datafusion/physical-plan/src/aggregates/group_values/row_like.rs b/datafusion/physical-plan/src/aggregates/group_values/row_like.rs index 96234e28e4a66..3e270f3f0d3b6 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/row_like.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/row_like.rs @@ -33,7 +33,7 @@ use datafusion_common::hash_utils::create_hashes; use datafusion_common::{internal_err, DataFusionError, Result}; use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt}; use datafusion_expr::EmitTo; -use datafusion_physical_expr_common::group_value_row::ArrayEqV2; +use datafusion_physical_expr_common::group_value_row::ArrayRowEq; use hashbrown::raw::RawTable; pub(super) const INITIAL_CAPACITY: usize = 8 * 1024; @@ -77,7 +77,7 @@ pub struct GroupValuesRowLike { /// Random state for creating hashes random_state: RandomState, - group_values_v2: Option>>, + group_values_v2: Option>>, } impl GroupValuesRowLike { @@ -112,17 +112,6 @@ impl GroupValuesRowLike { impl GroupValues for GroupValuesRowLike { fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec) -> Result<()> { - // Convert the group keys into the row format - // let group_rows = &mut self.rows_buffer; - // group_rows.clear(); - // self.row_converter.append(group_rows, cols)?; - // let n_rows = group_rows.num_rows(); - - // let mut group_values = match self.group_values.take() { - // Some(group_values) => group_values, - // None => self.row_converter.empty_rows(0, 0), - // }; - let n_rows = cols[0].len(); let mut group_values_v2 = match self.group_values_v2.take() { Some(group_values) => group_values, @@ -130,7 +119,7 @@ impl GroupValues for GroupValuesRowLike { let len = cols.len(); let mut v = Vec::with_capacity(len); // Move to `try_new` - for (i, f) in self.schema.fields().iter().enumerate() { + for f in self.schema.fields().iter() { match f.data_type() { &DataType::Int8 => { let b = PrimitiveBuilder::::new(); @@ -225,7 +214,7 @@ impl GroupValues for GroupValuesRowLike { // && group_rows.row(row) == group_values.row(*group_idx) fn compare_equal( - arry_eq: &dyn ArrayEqV2, + arry_eq: &dyn ArrayRowEq, lhs_row: usize, array: &ArrayRef, rhs_row: usize, @@ -310,24 +299,19 @@ impl GroupValues for GroupValuesRowLike { } fn emit(&mut self, emit_to: EmitTo) -> Result> { - // println!("emit"); - // let mut group_values = self - // .group_values - // .take() - // .expect("Can not emit from empty rows"); - let mut group_values_v2 = self .group_values_v2 .take() .expect("Can not emit from empty rows"); + // println!("emit_to: {:?}", emit_to); + let mut output = match emit_to { EmitTo::All => { let output = group_values_v2 - .into_iter() - .map(|mut v| { - let p = v.deref_mut().build(); - p + .iter_mut() + .map(|v| { + v.deref_mut().build() }) .collect::>(); // let output = self.row_converter.convert_rows(&group_values)?; @@ -341,7 +325,7 @@ impl GroupValues for GroupValuesRowLike { // println!("to first n"); let len = group_values_v2.len(); - let first_n: Vec> = + let first_n: Vec> = group_values_v2.drain(..n).collect(); let output = first_n .into_iter() @@ -394,6 +378,7 @@ impl GroupValues for GroupValuesRowLike { } // self.group_values = Some(group_values); + // println!("output: {:?}", output); Ok(output) } diff --git a/datafusion/sqllogictest/test_files/test2.slt b/datafusion/sqllogictest/test_files/test2.slt index dcad020d5bb05..572cf25b539ae 100644 --- a/datafusion/sqllogictest/test_files/test2.slt +++ b/datafusion/sqllogictest/test_files/test2.slt @@ -1,28 +1,19 @@ -statement ok -CREATE TABLE test( - i_item_desc VARCHAR, - d1_date DATE, - d2_date DATE, - d3_date DATE -) as VALUES - ('a','2022-12-12','2022-12-12','2022-12-12'), - ('b','2022-12-12','2022-12-11','2022-12-12'), - ('c','2022-12-12','2022-12-10','2022-12-12'), - ('d','2022-12-12','2022-12-9','2022-12-12'), - ('e','2022-12-12','2022-12-8','2022-12-12'), - ('f','2022-12-12','2022-12-7','2022-12-12'), - ('g','2022-12-12','2022-12-6','2022-12-12'), - ('h','2022-12-12','2022-12-5','2022-12-12') -; +query ok +create table multi_null_data(c1 int, c2 string) as values + (0, NULL), + (0, NULL), + (3, 'foo'), + (NULL, NULL), + (NULL, 'bar'), + (3, 'foo'), + (0, NULL), + (NULL, 'bar'), + (3, 'foo'); -query DDI -select d1_date, d2_date, count(*) as c from test group by d2_date, d1_date order by c desc; +query IIT rowsort +SELECT COUNT(*), c1, c2 FROM multi_null_data GROUP BY c1, c2 ---- -2022-12-12 2022-12-09 1 -2022-12-12 2022-12-06 1 -2022-12-12 2022-12-05 1 -2022-12-12 2022-12-12 1 -2022-12-12 2022-12-11 1 -2022-12-12 2022-12-07 1 -2022-12-12 2022-12-10 1 -2022-12-12 2022-12-08 1 +1 NULL NULL +2 NULL bar +3 0 NULL +3 3 foo \ No newline at end of file