-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Specialize single column primitive group values #7043
Changes from 5 commits
056fd7b
a7f6a33
98384dc
b5e8391
d015363
3750df6
6654876
daf408c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
use arrow_array::{downcast_primitive, ArrayRef}; | ||
use arrow_schema::SchemaRef; | ||
use datafusion_common::Result; | ||
use datafusion_physical_expr::EmitTo; | ||
|
||
mod primitive; | ||
use primitive::GroupValuesPrimitive; | ||
|
||
mod row; | ||
use row::GroupValuesRows; | ||
|
||
/// An interning store for group keys | ||
pub trait GroupValues: Send { | ||
/// Calculates the `groups` for each input row of `cols` | ||
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()>; | ||
|
||
/// Returns the number of bytes used by this [`GroupValues`] | ||
fn size(&self) -> usize; | ||
|
||
/// Returns true if this [`GroupValues`] is empty | ||
fn is_empty(&self) -> bool; | ||
|
||
/// The number of values stored in this [`GroupValues`] | ||
fn len(&self) -> usize; | ||
|
||
/// Emits the group values | ||
fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>>; | ||
} | ||
|
||
pub fn new_group_values(schema: SchemaRef) -> Result<Box<dyn GroupValues>> { | ||
if schema.fields.len() == 1 { | ||
let d = schema.fields[0].data_type(); | ||
|
||
macro_rules! downcast_helper { | ||
($t:ty, $d:ident) => { | ||
return Ok(Box::new(GroupValuesPrimitive::<$t>::new($d.clone()))) | ||
}; | ||
} | ||
|
||
// TODO: More primitives | ||
downcast_primitive! { | ||
d => (downcast_helper, d), | ||
_ => {} | ||
} | ||
} | ||
|
||
Ok(Box::new(GroupValuesRows::try_new(schema)?)) | ||
} |
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,191 @@ | ||||||||||||
// Licensed to the Apache Software Foundation (ASF) under one | ||||||||||||
// or more contributor license agreements. See the NOTICE file | ||||||||||||
// distributed with this work for additional information | ||||||||||||
// regarding copyright ownership. The ASF licenses this file | ||||||||||||
// to you under the Apache License, Version 2.0 (the | ||||||||||||
// "License"); you may not use this file except in compliance | ||||||||||||
// with the License. You may obtain a copy of the License at | ||||||||||||
// | ||||||||||||
// http://www.apache.org/licenses/LICENSE-2.0 | ||||||||||||
// | ||||||||||||
// Unless required by applicable law or agreed to in writing, | ||||||||||||
// software distributed under the License is distributed on an | ||||||||||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||||||||||||
// KIND, either express or implied. See the License for the | ||||||||||||
// specific language governing permissions and limitations | ||||||||||||
// under the License. | ||||||||||||
|
||||||||||||
use crate::physical_plan::aggregates::group_values::GroupValues; | ||||||||||||
use ahash::RandomState; | ||||||||||||
use arrow::array::BooleanBufferBuilder; | ||||||||||||
use arrow::buffer::NullBuffer; | ||||||||||||
use arrow::datatypes::i256; | ||||||||||||
use arrow_array::cast::AsArray; | ||||||||||||
use arrow_array::{ArrayRef, ArrowNativeTypeOp, ArrowPrimitiveType, PrimitiveArray}; | ||||||||||||
use arrow_schema::DataType; | ||||||||||||
use datafusion_common::Result; | ||||||||||||
use datafusion_execution::memory_pool::proxy::VecAllocExt; | ||||||||||||
use datafusion_physical_expr::EmitTo; | ||||||||||||
use half::f16; | ||||||||||||
use hashbrown::raw::RawTable; | ||||||||||||
use std::sync::Arc; | ||||||||||||
|
||||||||||||
/// A trait to allow hashing of floating point numbers | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you explain why this doesn't use If it is important not to use
Here is an example of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can't use create_hashes as we are generating the hashes from the native values, not an array |
||||||||||||
trait HashValue { | ||||||||||||
fn hash(self, state: &RandomState) -> u64; | ||||||||||||
} | ||||||||||||
|
||||||||||||
macro_rules! hash_integer { | ||||||||||||
($($t:ty),+) => { | ||||||||||||
$(impl HashValue for $t { | ||||||||||||
fn hash(self, state: &RandomState) -> u64 { | ||||||||||||
state.hash_one(self) | ||||||||||||
} | ||||||||||||
})+ | ||||||||||||
}; | ||||||||||||
} | ||||||||||||
hash_integer!(i8, i16, i32, i64, i128, i256); | ||||||||||||
hash_integer!(u8, u16, u32, u64); | ||||||||||||
|
||||||||||||
macro_rules! hash_float { | ||||||||||||
($($t:ty),+) => { | ||||||||||||
$(impl HashValue for $t { | ||||||||||||
fn hash(self, state: &RandomState) -> u64 { | ||||||||||||
state.hash_one(self.to_bits()) | ||||||||||||
} | ||||||||||||
})+ | ||||||||||||
}; | ||||||||||||
} | ||||||||||||
|
||||||||||||
hash_float!(f16, f32, f64); | ||||||||||||
|
||||||||||||
/// A [`GroupValues`] storing raw primitive values | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||
pub struct GroupValuesPrimitive<T: ArrowPrimitiveType> { | ||||||||||||
/// The data type of the output array | ||||||||||||
data_type: DataType, | ||||||||||||
/// Stores the group index based on the hash of its value | ||||||||||||
map: RawTable<usize>, | ||||||||||||
alamb marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||
/// The group index of the null value if any | ||||||||||||
null_group: Option<usize>, | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||||||||||||
/// The values for each group index | ||||||||||||
values: Vec<T::Native>, | ||||||||||||
/// The random state used to generate hashes | ||||||||||||
random_state: RandomState, | ||||||||||||
} | ||||||||||||
|
||||||||||||
impl<T: ArrowPrimitiveType> GroupValuesPrimitive<T> { | ||||||||||||
pub fn new(data_type: DataType) -> Self { | ||||||||||||
assert!(PrimitiveArray::<T>::is_compatible(&data_type)); | ||||||||||||
Self { | ||||||||||||
data_type, | ||||||||||||
map: RawTable::with_capacity(128), | ||||||||||||
values: Vec::with_capacity(128), | ||||||||||||
null_group: None, | ||||||||||||
random_state: Default::default(), | ||||||||||||
} | ||||||||||||
} | ||||||||||||
} | ||||||||||||
|
||||||||||||
impl<T: ArrowPrimitiveType> GroupValues for GroupValuesPrimitive<T> | ||||||||||||
where | ||||||||||||
T::Native: HashValue, | ||||||||||||
{ | ||||||||||||
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()> { | ||||||||||||
assert_eq!(cols.len(), 1); | ||||||||||||
groups.clear(); | ||||||||||||
|
||||||||||||
for v in cols[0].as_primitive::<T>() { | ||||||||||||
let group_id = match v { | ||||||||||||
None => *self.null_group.get_or_insert_with(|| { | ||||||||||||
let group_id = self.values.len(); | ||||||||||||
self.values.push(Default::default()); | ||||||||||||
group_id | ||||||||||||
}), | ||||||||||||
Some(key) => { | ||||||||||||
let state = &self.random_state; | ||||||||||||
let hash = key.hash(state); | ||||||||||||
let insert = self.map.find_or_find_insert_slot( | ||||||||||||
hash, | ||||||||||||
|g| unsafe { self.values.get_unchecked(*g).is_eq(key) }, | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is awesome 🚀 |
||||||||||||
|g| unsafe { self.values.get_unchecked(*g).hash(state) }, | ||||||||||||
); | ||||||||||||
|
||||||||||||
// SAFETY: No mutation occurred since find_or_find_insert_slot | ||||||||||||
unsafe { | ||||||||||||
match insert { | ||||||||||||
Ok(v) => *v.as_ref(), | ||||||||||||
Err(slot) => { | ||||||||||||
let g = self.values.len(); | ||||||||||||
self.map.insert_in_slot(hash, slot, g); | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this should still track the allocated memory (like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is accounted in the size method |
||||||||||||
self.values.push(key); | ||||||||||||
g | ||||||||||||
} | ||||||||||||
} | ||||||||||||
} | ||||||||||||
} | ||||||||||||
}; | ||||||||||||
groups.push(group_id) | ||||||||||||
} | ||||||||||||
Ok(()) | ||||||||||||
} | ||||||||||||
|
||||||||||||
fn size(&self) -> usize { | ||||||||||||
self.map.capacity() * std::mem::size_of::<usize>() + self.values.allocated_size() | ||||||||||||
} | ||||||||||||
|
||||||||||||
fn is_empty(&self) -> bool { | ||||||||||||
self.values.is_empty() | ||||||||||||
} | ||||||||||||
|
||||||||||||
fn len(&self) -> usize { | ||||||||||||
self.values.len() | ||||||||||||
} | ||||||||||||
|
||||||||||||
fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> { | ||||||||||||
fn build_primitive<T: ArrowPrimitiveType>( | ||||||||||||
values: Vec<T::Native>, | ||||||||||||
null_idx: Option<usize>, | ||||||||||||
) -> PrimitiveArray<T> { | ||||||||||||
let nulls = null_idx.map(|null_idx| { | ||||||||||||
let mut buffer = BooleanBufferBuilder::new(values.len()); | ||||||||||||
buffer.append_n(values.len(), true); | ||||||||||||
buffer.set_bit(null_idx, false); | ||||||||||||
unsafe { NullBuffer::new_unchecked(buffer.finish(), 1) } | ||||||||||||
}); | ||||||||||||
PrimitiveArray::<T>::new(values.into(), nulls) | ||||||||||||
} | ||||||||||||
|
||||||||||||
let array: PrimitiveArray<T> = match emit_to { | ||||||||||||
EmitTo::All => { | ||||||||||||
self.map.clear(); | ||||||||||||
build_primitive(std::mem::take(&mut self.values), self.null_group.take()) | ||||||||||||
} | ||||||||||||
EmitTo::First(n) => { | ||||||||||||
// SAFETY: self.map outlives iterator and is not modified concurrently | ||||||||||||
unsafe { | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this code is largely replicated from the row version. I wonder if it could be refactored into a (templated) common function (with appropriate documentation)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There isn't an easy way to make this generic, as one stores tuples and one isn't... I at least can't see a way that doesn't just obfuscate the code |
||||||||||||
for bucket in self.map.iter() { | ||||||||||||
// Decrement group index by n | ||||||||||||
match bucket.as_ref().checked_sub(n) { | ||||||||||||
// Group index was >= n, shift value down | ||||||||||||
Some(sub) => *bucket.as_mut() = sub, | ||||||||||||
// Group index was < n, so remove from table | ||||||||||||
None => self.map.erase(bucket), | ||||||||||||
} | ||||||||||||
} | ||||||||||||
} | ||||||||||||
let null_group = match &mut self.null_group { | ||||||||||||
Some(v) if *v >= n => { | ||||||||||||
*v -= n; | ||||||||||||
None | ||||||||||||
} | ||||||||||||
Some(_) => self.null_group.take(), | ||||||||||||
None => None, | ||||||||||||
}; | ||||||||||||
let mut split = self.values.split_off(n); | ||||||||||||
std::mem::swap(&mut self.values, &mut split); | ||||||||||||
build_primitive(split, null_group) | ||||||||||||
} | ||||||||||||
}; | ||||||||||||
Ok(vec![Arc::new(array.with_data_type(self.data_type.clone()))]) | ||||||||||||
} | ||||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this still relevant?