diff --git a/datafusion/physical-plan/src/aggregates/group_values/column.rs b/datafusion/physical-plan/src/aggregates/group_values/column.rs index 29e0e434c392..b6c7096ea98f 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/column.rs @@ -45,9 +45,42 @@ use datafusion_physical_expr::binary_map::OutputType; use datafusion_physical_expr_common::datum::compare_with_eq; use hashbrown::raw::RawTable; +/// Group index context for performing `vectorized compare` and `vectorized append` +struct GroupIndexContext { + /// It is possible that hash value collision exists, + /// and we will chain the `group indices` with same hash value + /// + /// The chained indices is like: + /// `latest group index -> older group index -> even older group index -> ...` + prev_group_index: usize, + + /// It is possible that rows with same hash values exist in `input cols`. + /// And if we `vectorized compare` and `vectorized append` them + /// in the same round, some fault cases will occur especially when + /// they are totally the repeated rows... + /// + /// For example: + /// - Two repeated rows exist in `input cols`. + /// + /// - We found their hash values equal to one exist group + /// + /// - We then perform `vectorized compare` for them to the exist group, + /// and found their values not equal to the exist one + /// + /// - Finally when perform `vectorized append`, we decide to build two + /// respective new groups for them, even we actually just need one + /// new group... + /// + /// So for solving such cases simply, if some rows with same hash value + /// in `input cols`, just allow to process one of them in a round, + /// and this flag is used to represent that one of them is processing + /// in current round. + /// + checking: bool, +} + /// A [`GroupValues`] that stores multiple columns of group values. /// -/// pub struct GroupValuesColumn { /// The output schema schema: SchemaRef, @@ -62,6 +95,11 @@ pub struct GroupValuesColumn { /// values: (hash, group_index) map: RawTable<(u64, usize)>, + group_index_ctxs: Vec, + + /// Some + remaining_indices: Vec, + /// The size of `map` in bytes map_size: usize, @@ -94,6 +132,7 @@ impl GroupValuesColumn { Ok(Self { schema, map, + group_index_ctxs: Vec::new(), map_size: 0, group_values: vec![], hashes_buffer: Default::default(), @@ -160,13 +199,6 @@ macro_rules! instantiate_primitive { }; } -fn append_col_value(mut core: C, array: &ArrayRef, row: usize) -where - C: FnMut(&ArrayRef, usize), -{ - core(array, row); -} - impl GroupValues for GroupValuesColumn { fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec) -> Result<()> { let n_rows = cols[0].len(); diff --git a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs index a6961835edef..95690fce596d 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs @@ -554,7 +554,7 @@ impl ByteViewGroupValueBuilder { all_non_null: bool, ) { let arr = array.as_byte_view::(); - + if all_non_null { self.nulls.append_n(rows.len(), false); for &row in rows {