diff --git a/datafusion/physical-plan/src/aggregates/group_values/column.rs b/datafusion/physical-plan/src/aggregates/group_values/column.rs index 3625fc454d37..fe37329ddf9a 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/column.rs @@ -128,21 +128,8 @@ pub struct VectorizedGroupValuesColumn { /// a specific list in `group_index_lists`. emit_group_index_list_buffer: Vec, - /// Similar as `current_indices`, but `remaining_indices` - /// is used to store the rows will be processed in next round. - scalarized_indices: Vec, - - /// The `vectorized_equal_tod` row indices buffer - vectorized_equal_to_row_indices: Vec, - - /// The `vectorized_equal_tod` group indices buffer - vectorized_equal_to_group_indices: Vec, - - /// The `vectorized_equal_tod` result buffer - vectorized_equal_to_results: Vec, - - /// The `vectorized append` row indices buffer - vectorized_append_row_indices: Vec, + /// Buffers for `vectorized_append` and `vectorized_equal_to` + vectorized_operation_buffers: VectorizedOperationBuffers, /// The actual group by values, stored column-wise. Compare from /// the left to right, each column is stored as [`GroupColumn`]. @@ -161,6 +148,38 @@ pub struct VectorizedGroupValuesColumn { random_state: RandomState, } +/// Buffers to store intermediate results in `vectorized_append` +/// and `vectorized_equal_to`, for reducing memory allocation +#[derive(Default)] +struct VectorizedOperationBuffers { + /// The `vectorized append` row indices buffer + append_row_indices: Vec, + + /// The `vectorized_equal_to` row indices buffer + equal_to_row_indices: Vec, + + /// The `vectorized_equal_to` group indices buffer + equal_to_group_indices: Vec, + + /// The `vectorized_equal_to` result buffer + equal_to_results: Vec, + + /// The buffer for storing row indices found not equal to + /// exist groups in `group_values` in `vectorized_equal_to`. + /// We will perform `scalarized_intern` for such rows. + remaining_row_indices: Vec, +} + +impl VectorizedOperationBuffers { + fn clear(&mut self) { + self.append_row_indices.clear(); + self.equal_to_row_indices.clear(); + self.equal_to_group_indices.clear(); + self.equal_to_results.clear(); + self.remaining_row_indices.clear(); + } +} + impl VectorizedGroupValuesColumn { /// Create a new instance of GroupValuesColumn if supported for the specified schema pub fn try_new(schema: SchemaRef) -> Result { @@ -170,15 +189,11 @@ impl VectorizedGroupValuesColumn { map, group_index_lists: Vec::new(), emit_group_index_list_buffer: Vec::new(), + vectorized_operation_buffers: VectorizedOperationBuffers::default(), map_size: 0, group_values: vec![], hashes_buffer: Default::default(), random_state: Default::default(), - scalarized_indices: Default::default(), - vectorized_equal_to_row_indices: Default::default(), - vectorized_equal_to_group_indices: Default::default(), - vectorized_equal_to_results: Default::default(), - vectorized_append_row_indices: Default::default(), }) } @@ -201,9 +216,13 @@ impl VectorizedGroupValuesColumn { batch_hashes: &[u64], groups: &mut [usize], ) { - self.vectorized_append_row_indices.clear(); - self.vectorized_equal_to_row_indices.clear(); - self.vectorized_equal_to_group_indices.clear(); + self.vectorized_operation_buffers.append_row_indices.clear(); + self.vectorized_operation_buffers + .equal_to_row_indices + .clear(); + self.vectorized_operation_buffers + .equal_to_group_indices + .clear(); let mut group_values_len = self.group_values[0].len(); for (row, &target_hash) in batch_hashes.iter().enumerate() { @@ -227,7 +246,9 @@ impl VectorizedGroupValuesColumn { ); // Add row index to `vectorized_append_row_indices` - self.vectorized_append_row_indices.push(row); + self.vectorized_operation_buffers + .append_row_indices + .push(row); // Set group index to row in `groups` groups[row] = current_group_idx; @@ -245,26 +266,41 @@ impl VectorizedGroupValuesColumn { let list_offset = group_index_view.value() as usize; let group_index_list = &self.group_index_lists[list_offset]; for &group_index in group_index_list { - self.vectorized_equal_to_row_indices.push(row); - self.vectorized_equal_to_group_indices.push(group_index); + self.vectorized_operation_buffers + .equal_to_row_indices + .push(row); + self.vectorized_operation_buffers + .equal_to_group_indices + .push(group_index); } } else { let group_index = group_index_view.value() as usize; - self.vectorized_equal_to_row_indices.push(row); - self.vectorized_equal_to_group_indices.push(group_index); + self.vectorized_operation_buffers + .equal_to_row_indices + .push(row); + self.vectorized_operation_buffers + .equal_to_group_indices + .push(group_index); } } } /// Perform `vectorized_append`` for `rows` in `vectorized_append_row_indices` fn vectorized_append(&mut self, cols: &[ArrayRef]) { - if self.vectorized_append_row_indices.is_empty() { + if self + .vectorized_operation_buffers + .append_row_indices + .is_empty() + { return; } let iter = self.group_values.iter_mut().zip(cols.iter()); for (group_column, col) in iter { - group_column.vectorized_append(col, &self.vectorized_append_row_indices); + group_column.vectorized_append( + col, + &self.vectorized_operation_buffers.append_row_indices, + ); } } @@ -283,27 +319,41 @@ impl VectorizedGroupValuesColumn { /// are very few. fn vectorized_equal_to(&mut self, cols: &[ArrayRef], groups: &mut [usize]) { assert_eq!( - self.vectorized_equal_to_group_indices.len(), - self.vectorized_equal_to_row_indices.len() + self.vectorized_operation_buffers + .equal_to_group_indices + .len(), + self.vectorized_operation_buffers.equal_to_row_indices.len() ); - self.scalarized_indices.clear(); + self.vectorized_operation_buffers + .remaining_row_indices + .clear(); - if self.vectorized_equal_to_group_indices.is_empty() { + if self + .vectorized_operation_buffers + .equal_to_group_indices + .is_empty() + { return; } // 1. Perform `vectorized_equal_to` for `rows` in `vectorized_equal_to_group_indices` // and `group_indices` in `vectorized_equal_to_group_indices` - let mut equal_to_results = mem::take(&mut self.vectorized_equal_to_results); + let mut equal_to_results = + mem::take(&mut self.vectorized_operation_buffers.equal_to_results); equal_to_results.clear(); - equal_to_results.resize(self.vectorized_equal_to_group_indices.len(), true); + equal_to_results.resize( + self.vectorized_operation_buffers + .equal_to_group_indices + .len(), + true, + ); for (col_idx, group_col) in self.group_values.iter().enumerate() { group_col.vectorized_equal_to( - &self.vectorized_equal_to_group_indices, + &self.vectorized_operation_buffers.equal_to_group_indices, &cols[col_idx], - &self.vectorized_equal_to_row_indices, + &self.vectorized_operation_buffers.equal_to_row_indices, &mut equal_to_results, ); } @@ -311,19 +361,26 @@ impl VectorizedGroupValuesColumn { // 2. Check `equal_to_results`, if found not equal to `row`s, just add them // to `scalarized_indices`, and perform `scalarized_intern` for them after. let mut current_row_equal_to_result = false; - for (idx, &row) in self.vectorized_equal_to_row_indices.iter().enumerate() { + for (idx, &row) in self + .vectorized_operation_buffers + .equal_to_row_indices + .iter() + .enumerate() + { let equal_to_result = equal_to_results[idx]; // Equal to case, set the `group_indices` to `rows` in `groups` if equal_to_result { - groups[row] = self.vectorized_equal_to_group_indices[idx]; + groups[row] = + self.vectorized_operation_buffers.equal_to_group_indices[idx]; } current_row_equal_to_result |= equal_to_result; // Look forward next one row to check if have checked all results // of current row let next_row = self - .vectorized_equal_to_row_indices + .vectorized_operation_buffers + .equal_to_row_indices .get(idx + 1) .unwrap_or(&usize::MAX); @@ -331,7 +388,9 @@ impl VectorizedGroupValuesColumn { if row != *next_row { // Not equal to case, add `row` to `scalarized_indices` if !current_row_equal_to_result { - self.scalarized_indices.push(row); + self.vectorized_operation_buffers + .remaining_row_indices + .push(row); } // Init the total result for checking next row @@ -339,7 +398,7 @@ impl VectorizedGroupValuesColumn { } } - self.vectorized_equal_to_results = equal_to_results; + self.vectorized_operation_buffers.equal_to_results = equal_to_results; } /// It is possible that some `input rows` have the same @@ -384,13 +443,17 @@ impl VectorizedGroupValuesColumn { batch_hashes: &[u64], groups: &mut [usize], ) { - if self.scalarized_indices.is_empty() { + if self + .vectorized_operation_buffers + .remaining_row_indices + .is_empty() + { return; } let mut map = mem::take(&mut self.map); - for &row in &self.scalarized_indices { + for &row in &self.vectorized_operation_buffers.remaining_row_indices { let target_hash = batch_hashes[row]; let entry = map.get_mut(target_hash, |(exist_hash, _)| { // Somewhat surprisingly, this closure can be called even if the @@ -781,11 +844,7 @@ impl GroupValues for VectorizedGroupValuesColumn { self.hashes_buffer.shrink_to(count); self.group_index_lists.clear(); self.emit_group_index_list_buffer.clear(); - self.scalarized_indices.clear(); - self.vectorized_append_row_indices.clear(); - self.vectorized_equal_to_row_indices.clear(); - self.vectorized_equal_to_group_indices.clear(); - self.vectorized_equal_to_results.clear(); + self.vectorized_operation_buffers.clear(); } }