diff --git a/datafusion/physical-plan/src/aggregates/group_values/column.rs b/datafusion/physical-plan/src/aggregates/group_values/column.rs index 2045cbc9b790..ced09d26e6a8 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/column.rs @@ -319,16 +319,23 @@ impl GroupValuesColumn { next_group_index = self.group_index_lists[current_group_index]; } } - - self.vectorized_equal_to_results - .resize(self.vectorized_equal_to_group_indices.len(), true); } /// Perform `vectorized_equal_to` /// - /// fn vectorized_equal_to(&mut self, cols: &[ArrayRef]) { + debug_assert_eq!( + self.vectorized_equal_to_group_indices.len(), + self.vectorized_equal_to_row_indices.len() + ); + + if self.vectorized_equal_to_group_indices.is_empty() { + return; + } + + // Vectorized equal to `cols` and `group columns` let mut equal_to_results = mem::take(&mut self.vectorized_equal_to_results); + equal_to_results.resize(self.vectorized_equal_to_group_indices.len(), true); for (col_idx, group_col) in self.group_values.iter().enumerate() { group_col.vectorized_equal_to( &self.vectorized_equal_to_group_indices, @@ -337,8 +344,40 @@ impl GroupValuesColumn { &mut equal_to_results, ); } + + let mut current_row_equal_to_result = false; + let mut current_row = *self.vectorized_equal_to_row_indices.first().unwrap(); + for (idx, &row) in self.vectorized_equal_to_row_indices.iter().enumerate() { + // If found next row, according to the equal to result of `current_row` + if current_row != row { + if !current_row_equal_to_result { + self.vectorized_append_row_indices.push(row); + } + current_row = row; + current_row_equal_to_result = equal_to_results[idx]; + continue; + } + current_row_equal_to_result |= equal_to_results[idx]; + } + + if !current_row_equal_to_result { + self.vectorized_append_row_indices.push(current_row); + } + self.vectorized_equal_to_results = equal_to_results; } + + /// Perform `vectorized_append` + /// + /// 1. Vectorized append new values into `group_values` + /// 2. Update `map` and `group_index_lists` + fn vectorized_append(&mut self, cols: &[ArrayRef], batch_hashes: &[u64]) { + if self.vectorized_append_row_indices.is_empty() { + return; + } + + // 1. Vectorized append new values into `group_values` + } } /// instantiates a [`PrimitiveGroupValueBuilder`] and pushes it into $v diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index 7db5bd6d6d3a..22cbe70e90ca 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -70,7 +70,7 @@ pub trait GroupColumn: Send + Sync { equal_to_results: &mut [bool], ); - fn vectorized_append(&mut self, array: &ArrayRef, rows: &[usize], all_non_null: bool); + fn vectorized_append(&mut self, array: &ArrayRef, rows: &[usize]); /// Returns the number of rows stored in this builder fn len(&self) -> usize; @@ -173,22 +173,21 @@ impl GroupColumn } } - fn vectorized_append( - &mut self, - array: &ArrayRef, - rows: &[usize], - all_non_null: bool, - ) { + fn vectorized_append(&mut self, array: &ArrayRef, rows: &[usize]) { let arr = array.as_primitive::(); - match (NULLABLE, all_non_null) { - (true, true) => { - self.nulls.append_n(rows.len(), false); - self.group_values.reserve(rows.len()); - for &row in rows { - self.group_values.push(arr.value(row)); - } - } - (true, false) => { + + let null_count = array.null_count(); + let num_rows = array.len(); + let all_null_or_non_null = if null_count == 0 { + Some(true) + } else if null_count == num_rows { + Some(false) + } else { + None + }; + + match (NULLABLE, all_null_or_non_null) { + (true, None) => { for &row in rows { if array.is_null(row) { self.nulls.append(true); @@ -199,6 +198,19 @@ impl GroupColumn } } } + + (true, Some(true)) => { + self.nulls.append_n(rows.len(), false); + self.group_values.reserve(rows.len()); + for &row in rows { + self.group_values.push(arr.value(row)); + } + } + + (true, Some(false)) => { + self.nulls.append_n(rows.len(), true); + } + (false, _) => { self.group_values.reserve(rows.len()); for &row in rows { @@ -280,34 +292,12 @@ where } } - fn append_batch_inner( - &mut self, - array: &ArrayRef, - rows: &[usize], - all_non_null: bool, - ) where + fn equal_to_inner(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool + where B: ByteArrayType, { - let arr = array.as_bytes::(); - - if all_non_null { - self.nulls.append_n(rows.len(), false); - for &row in rows { - self.append_value(arr, row); - } - } else { - for &row in rows { - if arr.is_null(row) { - self.nulls.append(true); - // nulls need a zero length in the offset buffer - let offset = self.buffer.len(); - self.offsets.push(O::usize_as(offset)); - } else { - self.nulls.append(false); - self.append_value(arr, row); - } - } - } + let array = array.as_bytes::(); + self.do_equal_to_inner(lhs_row, array, rhs_row) } fn append_val_inner(&mut self, array: &ArrayRef, row: usize) @@ -322,24 +312,84 @@ where self.offsets.push(O::usize_as(offset)); } else { self.nulls.append(false); - self.append_value(arr, row); + self.do_append_val_inner(arr, row); } } - fn append_value(&mut self, array: &GenericByteArray, row: usize) + fn vectorized_equal_to_inner( + &self, + group_indices: &[usize], + array: &ArrayRef, + rows: &[usize], + equal_to_results: &mut [bool], + ) where + B: ByteArrayType, + { + let array = array.as_bytes::(); + + for (idx, &lhs_row) in group_indices.iter().enumerate() { + // Has found not equal to, don't need to check + if !equal_to_results[idx] { + continue; + } + + let rhs_row = rows[idx]; + equal_to_results[idx] = self.do_equal_to_inner(lhs_row, array, rhs_row); + } + } + + fn vectorized_append_inner(&mut self, array: &ArrayRef, rows: &[usize]) where B: ByteArrayType, { - let value: &[u8] = array.value(row).as_ref(); - self.buffer.append_slice(value); - self.offsets.push(O::usize_as(self.buffer.len())); + let arr = array.as_bytes::(); + let null_count = array.null_count(); + let num_rows = array.len(); + let all_null_or_non_null = if null_count == 0 { + Some(true) + } else if null_count == num_rows { + Some(false) + } else { + None + }; + + match all_null_or_non_null { + None => { + for &row in rows { + if arr.is_null(row) { + self.nulls.append(true); + // nulls need a zero length in the offset buffer + let offset = self.buffer.len(); + self.offsets.push(O::usize_as(offset)); + } else { + self.nulls.append(false); + self.do_append_val_inner(arr, row); + } + } + } + + Some(true) => { + self.nulls.append_n(rows.len(), false); + for &row in rows { + self.do_append_val_inner(arr, row); + } + } + + Some(false) => { + self.nulls.append_n(rows.len(), true); + } + } } - fn equal_to_inner(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool + fn do_equal_to_inner( + &self, + lhs_row: usize, + array: &GenericByteArray, + rhs_row: usize, + ) -> bool where B: ByteArrayType, { - let array = array.as_bytes::(); let exist_null = self.nulls.is_null(lhs_row); let input_null = array.is_null(rhs_row); if let Some(result) = nulls_equal_to(exist_null, input_null) { @@ -349,6 +399,15 @@ where self.value(lhs_row) == (array.value(rhs_row).as_ref() as &[u8]) } + fn do_append_val_inner(&mut self, array: &GenericByteArray, row: usize) + where + B: ByteArrayType, + { + let value: &[u8] = array.value(row).as_ref(); + self.buffer.append_slice(value); + self.offsets.push(O::usize_as(self.buffer.len())); + } + /// return the current value of the specified row irrespective of null pub fn value(&self, row: usize) -> &[u8] { let l = self.offsets[row].as_usize(); @@ -411,37 +470,51 @@ where rows: &[usize], equal_to_results: &mut [bool], ) { - todo!() + // Sanity array type + match self.output_type { + OutputType::Binary => { + debug_assert!(matches!( + array.data_type(), + DataType::Binary | DataType::LargeBinary + )); + self.vectorized_equal_to_inner::>( + group_indices, + array, + rows, + equal_to_results, + ); + } + OutputType::Utf8 => { + debug_assert!(matches!( + array.data_type(), + DataType::Utf8 | DataType::LargeUtf8 + )); + self.vectorized_equal_to_inner::>( + group_indices, + array, + rows, + equal_to_results, + ); + } + _ => unreachable!("View types should use `ArrowBytesViewMap`"), + } } - fn vectorized_append( - &mut self, - column: &ArrayRef, - rows: &[usize], - all_non_null: bool, - ) { + fn vectorized_append(&mut self, column: &ArrayRef, rows: &[usize]) { match self.output_type { OutputType::Binary => { debug_assert!(matches!( column.data_type(), DataType::Binary | DataType::LargeBinary )); - self.append_batch_inner::>( - column, - rows, - all_non_null, - ) + self.vectorized_append_inner::>(column, rows) } OutputType::Utf8 => { debug_assert!(matches!( column.data_type(), DataType::Utf8 | DataType::LargeUtf8 )); - self.append_batch_inner::>( - column, - rows, - all_non_null, - ) + self.vectorized_append_inner::>(column, rows) } _ => unreachable!("View types should use `ArrowBytesViewMap`"), }; @@ -606,31 +679,9 @@ impl ByteViewGroupValueBuilder { self } - fn append_batch_inner( - &mut self, - array: &ArrayRef, - rows: &[usize], - all_non_null: bool, - ) { - let arr = array.as_byte_view::(); - - if all_non_null { - self.nulls.append_n(rows.len(), false); - for &row in rows { - self.do_append_val_inner(arr, row); - } - } else { - for &row in rows { - // Null row case, set and return - if arr.is_valid(row) { - self.nulls.append(false); - self.do_append_val_inner(arr, row); - } else { - self.nulls.append(true); - self.views.push(0); - } - } - } + fn equal_to_inner(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool { + let array = array.as_byte_view::(); + self.do_equal_to_inner(lhs_row, array, rhs_row) } fn append_val_inner(&mut self, array: &ArrayRef, row: usize) { @@ -648,6 +699,65 @@ impl ByteViewGroupValueBuilder { self.do_append_val_inner(arr, row); } + fn vectorized_equal_to_inner( + &self, + group_indices: &[usize], + array: &ArrayRef, + rows: &[usize], + equal_to_results: &mut [bool], + ) { + let array = array.as_byte_view::(); + + for (idx, &lhs_row) in group_indices.iter().enumerate() { + // Has found not equal to, don't need to check + if !equal_to_results[idx] { + continue; + } + + let rhs_row = rows[idx]; + equal_to_results[idx] = self.do_equal_to_inner(lhs_row, array, rhs_row); + } + } + + fn vectorized_append_inner(&mut self, array: &ArrayRef, rows: &[usize]) { + let arr = array.as_byte_view::(); + let null_count = array.null_count(); + let num_rows = array.len(); + let all_null_or_non_null = if null_count == 0 { + Some(true) + } else if null_count == num_rows { + Some(false) + } else { + None + }; + + match all_null_or_non_null { + None => { + for &row in rows { + // Null row case, set and return + if arr.is_valid(row) { + self.nulls.append(false); + self.do_append_val_inner(arr, row); + } else { + self.nulls.append(true); + self.views.push(0); + } + } + } + + Some(true) => { + self.nulls.append_n(rows.len(), false); + for &row in rows { + self.do_append_val_inner(arr, row); + } + } + + Some(false) => { + self.nulls.append_n(rows.len(), true); + } + } + } + fn do_append_val_inner(&mut self, array: &GenericByteViewArray, row: usize) where B: ByteViewType, @@ -688,11 +798,6 @@ impl ByteViewGroupValueBuilder { } } - fn equal_to_inner(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool { - let array = array.as_byte_view::(); - self.do_equal_to_inner(lhs_row, array, rhs_row) - } - fn do_equal_to_inner( &self, lhs_row: usize, @@ -983,16 +1088,11 @@ impl GroupColumn for ByteViewGroupValueBuilder { rows: &[usize], equal_to_results: &mut [bool], ) { - todo!() + self.vectorized_equal_to_inner(group_indices, array, rows, equal_to_results); } - fn vectorized_append( - &mut self, - array: &ArrayRef, - rows: &[usize], - all_non_null: bool, - ) { - self.append_batch_inner(array, rows, all_non_null); + fn vectorized_append(&mut self, array: &ArrayRef, rows: &[usize]) { + self.vectorized_append_inner(array, rows); } fn len(&self) -> usize {