Skip to content

Commit

Permalink
impl vectorized_append.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint committed Oct 23, 2024
1 parent 1a7c2eb commit d79b813
Show file tree
Hide file tree
Showing 2 changed files with 247 additions and 108 deletions.
47 changes: 43 additions & 4 deletions datafusion/physical-plan/src/aggregates/group_values/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,16 +319,23 @@ impl GroupValuesColumn {
next_group_index = self.group_index_lists[current_group_index];
}
}

self.vectorized_equal_to_results
.resize(self.vectorized_equal_to_group_indices.len(), true);
}

/// Perform `vectorized_equal_to`
///
///
fn vectorized_equal_to(&mut self, cols: &[ArrayRef]) {
debug_assert_eq!(
self.vectorized_equal_to_group_indices.len(),
self.vectorized_equal_to_row_indices.len()
);

if self.vectorized_equal_to_group_indices.is_empty() {
return;
}

// Vectorized equal to `cols` and `group columns`
let mut equal_to_results = mem::take(&mut self.vectorized_equal_to_results);
equal_to_results.resize(self.vectorized_equal_to_group_indices.len(), true);
for (col_idx, group_col) in self.group_values.iter().enumerate() {
group_col.vectorized_equal_to(
&self.vectorized_equal_to_group_indices,
Expand All @@ -337,8 +344,40 @@ impl GroupValuesColumn {
&mut equal_to_results,
);
}

let mut current_row_equal_to_result = false;
let mut current_row = *self.vectorized_equal_to_row_indices.first().unwrap();
for (idx, &row) in self.vectorized_equal_to_row_indices.iter().enumerate() {
// If found next row, according to the equal to result of `current_row`
if current_row != row {
if !current_row_equal_to_result {
self.vectorized_append_row_indices.push(row);
}
current_row = row;
current_row_equal_to_result = equal_to_results[idx];
continue;
}
current_row_equal_to_result |= equal_to_results[idx];
}

if !current_row_equal_to_result {
self.vectorized_append_row_indices.push(current_row);
}

self.vectorized_equal_to_results = equal_to_results;
}

/// Perform `vectorized_append`
///
/// 1. Vectorized append new values into `group_values`
/// 2. Update `map` and `group_index_lists`
fn vectorized_append(&mut self, cols: &[ArrayRef], batch_hashes: &[u64]) {
if self.vectorized_append_row_indices.is_empty() {
return;
}

// 1. Vectorized append new values into `group_values`
}
}

/// instantiates a [`PrimitiveGroupValueBuilder`] and pushes it into $v
Expand Down
Loading

0 comments on commit d79b813

Please sign in to comment.