From 3415659d36783d3d5b8b4f5a1e3700a1a20cb4cc Mon Sep 17 00:00:00 2001 From: kamille Date: Wed, 23 Oct 2024 14:19:12 +0800 Subject: [PATCH] impl `vectorized_append`. --- .../src/aggregates/group_values/column.rs | 47 +++++++- .../aggregates/group_values/group_column.rs | 102 ++++++++++++++++-- 2 files changed, 135 insertions(+), 14 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/column.rs b/datafusion/physical-plan/src/aggregates/group_values/column.rs index 2045cbc9b7902..ced09d26e6a8f 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/column.rs @@ -319,16 +319,23 @@ impl GroupValuesColumn { next_group_index = self.group_index_lists[current_group_index]; } } - - self.vectorized_equal_to_results - .resize(self.vectorized_equal_to_group_indices.len(), true); } /// Perform `vectorized_equal_to` /// - /// fn vectorized_equal_to(&mut self, cols: &[ArrayRef]) { + debug_assert_eq!( + self.vectorized_equal_to_group_indices.len(), + self.vectorized_equal_to_row_indices.len() + ); + + if self.vectorized_equal_to_group_indices.is_empty() { + return; + } + + // Vectorized equal to `cols` and `group columns` let mut equal_to_results = mem::take(&mut self.vectorized_equal_to_results); + equal_to_results.resize(self.vectorized_equal_to_group_indices.len(), true); for (col_idx, group_col) in self.group_values.iter().enumerate() { group_col.vectorized_equal_to( &self.vectorized_equal_to_group_indices, @@ -337,8 +344,40 @@ impl GroupValuesColumn { &mut equal_to_results, ); } + + let mut current_row_equal_to_result = false; + let mut current_row = *self.vectorized_equal_to_row_indices.first().unwrap(); + for (idx, &row) in self.vectorized_equal_to_row_indices.iter().enumerate() { + // If found next row, according to the equal to result of `current_row` + if current_row != row { + if !current_row_equal_to_result { + self.vectorized_append_row_indices.push(row); + } + current_row = row; + current_row_equal_to_result = equal_to_results[idx]; + continue; + } + current_row_equal_to_result |= equal_to_results[idx]; + } + + if !current_row_equal_to_result { + self.vectorized_append_row_indices.push(current_row); + } + self.vectorized_equal_to_results = equal_to_results; } + + /// Perform `vectorized_append` + /// + /// 1. Vectorized append new values into `group_values` + /// 2. Update `map` and `group_index_lists` + fn vectorized_append(&mut self, cols: &[ArrayRef], batch_hashes: &[u64]) { + if self.vectorized_append_row_indices.is_empty() { + return; + } + + // 1. Vectorized append new values into `group_values` + } } /// instantiates a [`PrimitiveGroupValueBuilder`] and pushes it into $v diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index 7db5bd6d6d3a3..d05288bc6d309 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -180,6 +180,7 @@ impl GroupColumn all_non_null: bool, ) { let arr = array.as_primitive::(); + match (NULLABLE, all_non_null) { (true, true) => { self.nulls.append_n(rows.len(), false); @@ -280,7 +281,7 @@ where } } - fn append_batch_inner( + fn vectorized_append_inner( &mut self, array: &ArrayRef, rows: &[usize], @@ -293,7 +294,7 @@ where if all_non_null { self.nulls.append_n(rows.len(), false); for &row in rows { - self.append_value(arr, row); + self.do_append_val_inner(arr, row); } } else { for &row in rows { @@ -304,7 +305,7 @@ where self.offsets.push(O::usize_as(offset)); } else { self.nulls.append(false); - self.append_value(arr, row); + self.do_append_val_inner(arr, row); } } } @@ -322,11 +323,11 @@ where self.offsets.push(O::usize_as(offset)); } else { self.nulls.append(false); - self.append_value(arr, row); + self.do_append_val_inner(arr, row); } } - fn append_value(&mut self, array: &GenericByteArray, row: usize) + fn do_append_val_inner(&mut self, array: &GenericByteArray, row: usize) where B: ByteArrayType, { @@ -340,6 +341,40 @@ where B: ByteArrayType, { let array = array.as_bytes::(); + self.do_equal_to_inner(lhs_row, array, rhs_row) + } + + fn vectorized_equal_to_inner( + &self, + group_indices: &[usize], + array: &ArrayRef, + rows: &[usize], + equal_to_results: &mut [bool], + ) where + B: ByteArrayType, + { + let array = array.as_bytes::(); + + for (idx, &lhs_row) in group_indices.iter().enumerate() { + // Has found not equal to, don't need to check + if !equal_to_results[idx] { + continue; + } + + let rhs_row = rows[idx]; + equal_to_results[idx] = self.do_equal_to_inner(lhs_row, array, rhs_row); + } + } + + fn do_equal_to_inner( + &self, + lhs_row: usize, + array: &GenericByteArray, + rhs_row: usize, + ) -> bool + where + B: ByteArrayType, + { let exist_null = self.nulls.is_null(lhs_row); let input_null = array.is_null(rhs_row); if let Some(result) = nulls_equal_to(exist_null, input_null) { @@ -411,7 +446,34 @@ where rows: &[usize], equal_to_results: &mut [bool], ) { - todo!() + // Sanity array type + match self.output_type { + OutputType::Binary => { + debug_assert!(matches!( + array.data_type(), + DataType::Binary | DataType::LargeBinary + )); + self.vectorized_equal_to_inner::>( + group_indices, + array, + rows, + equal_to_results, + ); + } + OutputType::Utf8 => { + debug_assert!(matches!( + array.data_type(), + DataType::Utf8 | DataType::LargeUtf8 + )); + self.vectorized_equal_to_inner::>( + group_indices, + array, + rows, + equal_to_results, + ); + } + _ => unreachable!("View types should use `ArrowBytesViewMap`"), + } } fn vectorized_append( @@ -426,7 +488,7 @@ where column.data_type(), DataType::Binary | DataType::LargeBinary )); - self.append_batch_inner::>( + self.vectorized_append_inner::>( column, rows, all_non_null, @@ -437,7 +499,7 @@ where column.data_type(), DataType::Utf8 | DataType::LargeUtf8 )); - self.append_batch_inner::>( + self.vectorized_append_inner::>( column, rows, all_non_null, @@ -606,7 +668,7 @@ impl ByteViewGroupValueBuilder { self } - fn append_batch_inner( + fn vectorized_append_inner( &mut self, array: &ArrayRef, rows: &[usize], @@ -693,6 +755,26 @@ impl ByteViewGroupValueBuilder { self.do_equal_to_inner(lhs_row, array, rhs_row) } + fn vectorized_equal_to_inner( + &self, + group_indices: &[usize], + array: &ArrayRef, + rows: &[usize], + equal_to_results: &mut [bool], + ) { + let array = array.as_byte_view::(); + + for (idx, &lhs_row) in group_indices.iter().enumerate() { + // Has found not equal to, don't need to check + if !equal_to_results[idx] { + continue; + } + + let rhs_row = rows[idx]; + equal_to_results[idx] = self.do_equal_to_inner(lhs_row, array, rhs_row); + } + } + fn do_equal_to_inner( &self, lhs_row: usize, @@ -992,7 +1074,7 @@ impl GroupColumn for ByteViewGroupValueBuilder { rows: &[usize], all_non_null: bool, ) { - self.append_batch_inner(array, rows, all_non_null); + self.vectorized_append_inner(array, rows, all_non_null); } fn len(&self) -> usize {