Skip to content

Commit

Permalink
impl vectorized_equal_to.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint committed Oct 22, 2024
1 parent dad79c0 commit 1a7c2eb
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 41 deletions.
67 changes: 41 additions & 26 deletions datafusion/physical-plan/src/aggregates/group_values/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ const UNSET_CHECKING_FLAG_MASK: u64 = 0x7FFFFFFFFFFFFFFF;
/// ### Checking flag
///
/// It is possible that rows with same hash values exist in `input cols`.
/// And if we `vectorized compare` and `vectorized append` them
/// And if we `vectorized_equal_to` and `vectorized append` them
/// in the same round, some fault cases will occur especially when
/// they are totally the repeated rows...
///
Expand All @@ -71,7 +71,7 @@ const UNSET_CHECKING_FLAG_MASK: u64 = 0x7FFFFFFFFFFFFFFF;
///
/// - We found their hash values equal to one exist group
///
/// - We then perform `vectorized compare` for them to the exist group,
/// - We then perform `vectorized_equal_to` for them to the exist group,
/// and found their values not equal to the exist one
///
/// - Finally when perform `vectorized append`, we decide to build two
Expand Down Expand Up @@ -153,14 +153,14 @@ pub struct GroupValuesColumn {
/// is used to store the rows will be processed in next round.
remaining_indices: Vec<usize>,

/// The `vectorized compared` row indices buffer
vectorized_compare_row_indices: Vec<usize>,
/// The `vectorized_equal_tod` row indices buffer
vectorized_equal_to_row_indices: Vec<usize>,

/// The `vectorized compared` group indices buffer
vectorized_compare_group_indices: Vec<usize>,
/// The `vectorized_equal_tod` group indices buffer
vectorized_equal_to_group_indices: Vec<usize>,

/// The `vectorized compared` result buffer
vectorized_compare_results: Vec<bool>,
/// The `vectorized_equal_tod` result buffer
vectorized_equal_to_results: Vec<bool>,

/// The `vectorized append` row indices buffer
vectorized_append_row_indices: Vec<usize>,
Expand Down Expand Up @@ -204,9 +204,9 @@ impl GroupValuesColumn {
append_rows_buffer: Default::default(),
current_indices: Default::default(),
remaining_indices: Default::default(),
vectorized_compare_row_indices: Default::default(),
vectorized_compare_group_indices: Default::default(),
vectorized_compare_results: Default::default(),
vectorized_equal_to_row_indices: Default::default(),
vectorized_equal_to_group_indices: Default::default(),
vectorized_equal_to_results: Default::default(),
vectorized_append_row_indices: Default::default(),
})
}
Expand Down Expand Up @@ -260,8 +260,8 @@ impl GroupValuesColumn {
/// - Check if the `bucket` checking, if so add it to `remaining_indices`,
/// and just process it in next round, otherwise we continue the process
/// - Mark `bucket` checking, and add it to `checking_buckets`
/// - Add row index to `vectorized_compare_row_indices`
/// - Add group indices(from `group_index_lists`) to `vectorized_compare_group_indices`
/// - Add row index to `vectorized_equal_to_row_indices`
/// - Add group indices(from `group_index_lists`) to `vectorized_equal_to_group_indices`
///
fn collect_vectorized_process_context(&mut self, batch_hashes: &[u64]) {
let mut next_group_idx = self.group_values[0].len() as u64;
Expand Down Expand Up @@ -308,21 +308,36 @@ impl GroupValuesColumn {
// Mark `bucket` checking, and add it to `checking_buckets`
bucket_ctx.set_checking();

// Add row index to `vectorized_compare_row_indices`
// Add group indices(from `group_index_lists`) to `vectorized_compare_group_indices`
// Add row index to `vectorized_equal_to_row_indices`
// Add group indices(from `group_index_lists`) to `vectorized_equal_to_group_indices`
let mut next_group_index = bucket_ctx.group_index() as usize + 1;
while next_group_index > 0 {
let current_group_index = next_group_index;
self.vectorized_compare_row_indices.push(row);
self.vectorized_compare_group_indices
self.vectorized_equal_to_row_indices.push(row);
self.vectorized_equal_to_group_indices
.push(current_group_index - 1);
next_group_index = self.group_index_lists[current_group_index];
}
}
}

fn vectorized_compare(&mut self) {
self.vectorized_equal_to_results
.resize(self.vectorized_equal_to_group_indices.len(), true);
}

/// Perform `vectorized_equal_to`
///
///
fn vectorized_equal_to(&mut self, cols: &[ArrayRef]) {
let mut equal_to_results = mem::take(&mut self.vectorized_equal_to_results);
for (col_idx, group_col) in self.group_values.iter().enumerate() {
group_col.vectorized_equal_to(
&self.vectorized_equal_to_group_indices,
&cols[col_idx],
&self.vectorized_equal_to_row_indices,
&mut equal_to_results,
);
}
self.vectorized_equal_to_results = equal_to_results;
}
}

Expand Down Expand Up @@ -411,25 +426,25 @@ impl GroupValues for GroupValuesColumn {
batch_hashes.resize(n_rows, 0);
create_hashes(cols, &self.random_state, &mut batch_hashes)?;

// General steps for one round `vectorized compare & append`:
// General steps for one round `vectorized equal_to & append`:
// 1. Collect vectorized context by checking hash values of `cols` in `map`
// 2. Perform `vectorized compare`
// 3. Perform `vectorized append`
// 2. Perform `vectorized_equal_to`
// 3. Perform `vectorized_append`
// 4. Reset the checking flag in `BucketContext`

let num_rows = cols[0].len();
self.current_indices.clear();
self.current_indices.extend(0..num_rows);
while self.current_indices.len() > 0 {
self.vectorized_append_row_indices.clear();
self.vectorized_compare_row_indices.clear();
self.vectorized_compare_group_indices.clear();
self.vectorized_compare_results.clear();
self.vectorized_equal_to_row_indices.clear();
self.vectorized_equal_to_group_indices.clear();
self.vectorized_equal_to_results.clear();

// 1. Collect vectorized context by checking hash values of `cols` in `map`
self.collect_vectorized_process_context(&batch_hashes);

// 2. Perform `vectorized compare`
// 2. Perform `vectorized_equal_to`
}

self.hashes_buffer = batch_hashes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@ pub trait GroupColumn: Send + Sync {
/// Appends the row at `row` in `array` to this builder
fn append_val(&mut self, array: &ArrayRef, row: usize);

fn vectorized_compare(
&mut self,
fn vectorized_equal_to(
&self,
group_indices: &[usize],
array: &ArrayRef,
rows: &[usize],
compare_results: &mut [bool],
equal_to_results: &mut [bool],
);

fn vectorized_append(&mut self, array: &ArrayRef, rows: &[usize], all_non_null: bool);
Expand Down Expand Up @@ -142,18 +142,18 @@ impl<T: ArrowPrimitiveType, const NULLABLE: bool> GroupColumn
}
}

fn vectorized_compare(
&mut self,
fn vectorized_equal_to(
&self,
group_indices: &[usize],
array: &ArrayRef,
rows: &[usize],
compare_results: &mut [bool],
equal_to_results: &mut [bool],
) {
let array = array.as_primitive::<T>();

for (idx, &lhs_row) in group_indices.iter().enumerate() {
// Has found not equal to, don't need to check
if !compare_results[idx] {
if !equal_to_results[idx] {
continue;
}

Expand All @@ -163,13 +163,13 @@ impl<T: ArrowPrimitiveType, const NULLABLE: bool> GroupColumn
let exist_null = self.nulls.is_null(lhs_row);
let input_null = array.is_null(rhs_row);
if let Some(result) = nulls_equal_to(exist_null, input_null) {
compare_results[idx] = result;
equal_to_results[idx] = result;
continue;
}
// Otherwise, we need to check their values
}

compare_results[idx] = self.group_values[lhs_row] == array.value(rhs_row);
equal_to_results[idx] = self.group_values[lhs_row] == array.value(rhs_row);
}
}

Expand Down Expand Up @@ -404,12 +404,12 @@ where
};
}

fn vectorized_compare(
&mut self,
fn vectorized_equal_to(
&self,
group_indices: &[usize],
array: &ArrayRef,
rows: &[usize],
compare_results: &mut [bool],
equal_to_results: &mut [bool],
) {
todo!()
}
Expand Down Expand Up @@ -976,12 +976,12 @@ impl<B: ByteViewType> GroupColumn for ByteViewGroupValueBuilder<B> {
self.append_val_inner(array, row)
}

fn vectorized_compare(
&mut self,
fn vectorized_equal_to(
&self,
group_indices: &[usize],
array: &ArrayRef,
rows: &[usize],
compare_results: &mut [bool],
equal_to_results: &mut [bool],
) {
todo!()
}
Expand Down

0 comments on commit 1a7c2eb

Please sign in to comment.