diff --git a/datafusion/physical-plan/src/aggregates/group_values/column.rs b/datafusion/physical-plan/src/aggregates/group_values/column.rs index 87bcd8e10ff5..2045cbc9b790 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/column.rs @@ -62,7 +62,7 @@ const UNSET_CHECKING_FLAG_MASK: u64 = 0x7FFFFFFFFFFFFFFF; /// ### Checking flag /// /// It is possible that rows with same hash values exist in `input cols`. -/// And if we `vectorized compare` and `vectorized append` them +/// And if we `vectorized_equal_to` and `vectorized append` them /// in the same round, some fault cases will occur especially when /// they are totally the repeated rows... /// @@ -71,7 +71,7 @@ const UNSET_CHECKING_FLAG_MASK: u64 = 0x7FFFFFFFFFFFFFFF; /// /// - We found their hash values equal to one exist group /// -/// - We then perform `vectorized compare` for them to the exist group, +/// - We then perform `vectorized_equal_to` for them to the exist group, /// and found their values not equal to the exist one /// /// - Finally when perform `vectorized append`, we decide to build two @@ -153,14 +153,14 @@ pub struct GroupValuesColumn { /// is used to store the rows will be processed in next round. remaining_indices: Vec, - /// The `vectorized compared` row indices buffer - vectorized_compare_row_indices: Vec, + /// The `vectorized_equal_tod` row indices buffer + vectorized_equal_to_row_indices: Vec, - /// The `vectorized compared` group indices buffer - vectorized_compare_group_indices: Vec, + /// The `vectorized_equal_tod` group indices buffer + vectorized_equal_to_group_indices: Vec, - /// The `vectorized compared` result buffer - vectorized_compare_results: Vec, + /// The `vectorized_equal_tod` result buffer + vectorized_equal_to_results: Vec, /// The `vectorized append` row indices buffer vectorized_append_row_indices: Vec, @@ -204,9 +204,9 @@ impl GroupValuesColumn { append_rows_buffer: Default::default(), current_indices: Default::default(), remaining_indices: Default::default(), - vectorized_compare_row_indices: Default::default(), - vectorized_compare_group_indices: Default::default(), - vectorized_compare_results: Default::default(), + vectorized_equal_to_row_indices: Default::default(), + vectorized_equal_to_group_indices: Default::default(), + vectorized_equal_to_results: Default::default(), vectorized_append_row_indices: Default::default(), }) } @@ -260,8 +260,8 @@ impl GroupValuesColumn { /// - Check if the `bucket` checking, if so add it to `remaining_indices`, /// and just process it in next round, otherwise we continue the process /// - Mark `bucket` checking, and add it to `checking_buckets` - /// - Add row index to `vectorized_compare_row_indices` - /// - Add group indices(from `group_index_lists`) to `vectorized_compare_group_indices` + /// - Add row index to `vectorized_equal_to_row_indices` + /// - Add group indices(from `group_index_lists`) to `vectorized_equal_to_group_indices` /// fn collect_vectorized_process_context(&mut self, batch_hashes: &[u64]) { let mut next_group_idx = self.group_values[0].len() as u64; @@ -308,21 +308,36 @@ impl GroupValuesColumn { // Mark `bucket` checking, and add it to `checking_buckets` bucket_ctx.set_checking(); - // Add row index to `vectorized_compare_row_indices` - // Add group indices(from `group_index_lists`) to `vectorized_compare_group_indices` + // Add row index to `vectorized_equal_to_row_indices` + // Add group indices(from `group_index_lists`) to `vectorized_equal_to_group_indices` let mut next_group_index = bucket_ctx.group_index() as usize + 1; while next_group_index > 0 { let current_group_index = next_group_index; - self.vectorized_compare_row_indices.push(row); - self.vectorized_compare_group_indices + self.vectorized_equal_to_row_indices.push(row); + self.vectorized_equal_to_group_indices .push(current_group_index - 1); next_group_index = self.group_index_lists[current_group_index]; } } - } - fn vectorized_compare(&mut self) { + self.vectorized_equal_to_results + .resize(self.vectorized_equal_to_group_indices.len(), true); + } + /// Perform `vectorized_equal_to` + /// + /// + fn vectorized_equal_to(&mut self, cols: &[ArrayRef]) { + let mut equal_to_results = mem::take(&mut self.vectorized_equal_to_results); + for (col_idx, group_col) in self.group_values.iter().enumerate() { + group_col.vectorized_equal_to( + &self.vectorized_equal_to_group_indices, + &cols[col_idx], + &self.vectorized_equal_to_row_indices, + &mut equal_to_results, + ); + } + self.vectorized_equal_to_results = equal_to_results; } } @@ -411,10 +426,10 @@ impl GroupValues for GroupValuesColumn { batch_hashes.resize(n_rows, 0); create_hashes(cols, &self.random_state, &mut batch_hashes)?; - // General steps for one round `vectorized compare & append`: + // General steps for one round `vectorized equal_to & append`: // 1. Collect vectorized context by checking hash values of `cols` in `map` - // 2. Perform `vectorized compare` - // 3. Perform `vectorized append` + // 2. Perform `vectorized_equal_to` + // 3. Perform `vectorized_append` // 4. Reset the checking flag in `BucketContext` let num_rows = cols[0].len(); @@ -422,14 +437,14 @@ impl GroupValues for GroupValuesColumn { self.current_indices.extend(0..num_rows); while self.current_indices.len() > 0 { self.vectorized_append_row_indices.clear(); - self.vectorized_compare_row_indices.clear(); - self.vectorized_compare_group_indices.clear(); - self.vectorized_compare_results.clear(); + self.vectorized_equal_to_row_indices.clear(); + self.vectorized_equal_to_group_indices.clear(); + self.vectorized_equal_to_results.clear(); // 1. Collect vectorized context by checking hash values of `cols` in `map` self.collect_vectorized_process_context(&batch_hashes); - // 2. Perform `vectorized compare` + // 2. Perform `vectorized_equal_to` } self.hashes_buffer = batch_hashes; diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index 89cfea3b4de6..7db5bd6d6d3a 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -62,12 +62,12 @@ pub trait GroupColumn: Send + Sync { /// Appends the row at `row` in `array` to this builder fn append_val(&mut self, array: &ArrayRef, row: usize); - fn vectorized_compare( - &mut self, + fn vectorized_equal_to( + &self, group_indices: &[usize], array: &ArrayRef, rows: &[usize], - compare_results: &mut [bool], + equal_to_results: &mut [bool], ); fn vectorized_append(&mut self, array: &ArrayRef, rows: &[usize], all_non_null: bool); @@ -142,18 +142,18 @@ impl GroupColumn } } - fn vectorized_compare( - &mut self, + fn vectorized_equal_to( + &self, group_indices: &[usize], array: &ArrayRef, rows: &[usize], - compare_results: &mut [bool], + equal_to_results: &mut [bool], ) { let array = array.as_primitive::(); for (idx, &lhs_row) in group_indices.iter().enumerate() { // Has found not equal to, don't need to check - if !compare_results[idx] { + if !equal_to_results[idx] { continue; } @@ -163,13 +163,13 @@ impl GroupColumn let exist_null = self.nulls.is_null(lhs_row); let input_null = array.is_null(rhs_row); if let Some(result) = nulls_equal_to(exist_null, input_null) { - compare_results[idx] = result; + equal_to_results[idx] = result; continue; } // Otherwise, we need to check their values } - compare_results[idx] = self.group_values[lhs_row] == array.value(rhs_row); + equal_to_results[idx] = self.group_values[lhs_row] == array.value(rhs_row); } } @@ -404,12 +404,12 @@ where }; } - fn vectorized_compare( - &mut self, + fn vectorized_equal_to( + &self, group_indices: &[usize], array: &ArrayRef, rows: &[usize], - compare_results: &mut [bool], + equal_to_results: &mut [bool], ) { todo!() } @@ -976,12 +976,12 @@ impl GroupColumn for ByteViewGroupValueBuilder { self.append_val_inner(array, row) } - fn vectorized_compare( - &mut self, + fn vectorized_equal_to( + &self, group_indices: &[usize], array: &ArrayRef, rows: &[usize], - compare_results: &mut [bool], + equal_to_results: &mut [bool], ) { todo!() }