Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vectorized hash grouping #6904

Merged
merged 32 commits into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
54a5c95
Vectorized hash grouping
alamb Jul 10, 2023
1991a76
Prepare for merge to main
alamb Jul 10, 2023
8464816
Improve comments and update size calculations
alamb Jul 10, 2023
4bd3066
Implement test for accumulate_boolean
alamb Jul 10, 2023
7c97b24
Use resize instead of resize_with
alamb Jul 10, 2023
3ca27ac
fix avg size calculation
alamb Jul 10, 2023
e4a52f9
Simplify sum accumulator
alamb Jul 10, 2023
f9eaa68
Add comments explaining i64 as counts
alamb Jul 11, 2023
fc96b13
Clarify `aggreate_arguments`
alamb Jul 11, 2023
9db6f4b
Apply suggestions from code review
alamb Jul 11, 2023
edc8c43
Merge remote-tracking branch 'apache/main' into alamb/fast_gby_hash
alamb Jul 11, 2023
19b8981
Merge branch 'alamb/fast_gby_hash' of github.com:alamb/arrow-datafusi…
alamb Jul 11, 2023
90f8730
Clarify rationale for ScratchSpace being a field
alamb Jul 11, 2023
3369ec1
use slice syntax
alamb Jul 12, 2023
4124bfa
Merge remote-tracking branch 'apache/main' into alamb/fast_gby_hash
alamb Jul 12, 2023
58e3b6d
Update datafusion/physical-expr/src/aggregate/average.rs
alamb Jul 12, 2023
47135ba
Update datafusion/physical-expr/src/aggregate/count.rs
alamb Jul 12, 2023
744b4aa
Update datafusion/physical-expr/src/aggregate/groups_accumulator/adap…
alamb Jul 12, 2023
c3d5ff2
fix diagram
alamb Jul 12, 2023
92f6234
Update datafusion/physical-expr/src/aggregate/groups_accumulator/adap…
alamb Jul 12, 2023
f35f2ae
Merge branch 'alamb/fast_gby_hash' of github.com:alamb/arrow-datafusi…
alamb Jul 12, 2023
d19c41e
simplify the supported logic
alamb Jul 12, 2023
da911a3
Add a log message when using slow adapter
alamb Jul 12, 2023
de7b250
fmt
alamb Jul 12, 2023
b313278
Revert "chore(deps): update bigdecimal requirement from 0.3.0 to 0.4.…
alamb Jul 12, 2023
2bff155
Make FileScanConfig::project pub (#6931)
Dandandan Jul 12, 2023
453b71e
feat: add round trip test of physical plan in tpch unit tests (#6918)
r4ntix Jul 12, 2023
32ff16e
Use thiserror to implement the From trait for DFSqlLogicTestError (#6…
jonahgao Jul 12, 2023
54f96e6
parallel csv scan (#6801)
2010YOUY01 Jul 12, 2023
d96dfa2
Add additional test coverage for aggregaes using dates/times/timestam…
alamb Jul 12, 2023
a98b6a0
Support timestamp types for min/max
alamb Jul 13, 2023
5ab75b1
Fix aggregate nullability calculation
alamb Jul 13, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,4 @@ lto = false
opt-level = 3
overflow-checks = false
panic = 'unwind'
rpath = false
rpath = false
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -998,7 +998,7 @@ fn group_schema(schema: &Schema, group_count: usize) -> SchemaRef {
Arc::new(Schema::new(group_fields))
}

/// returns physical expressions to evaluate against a batch
/// returns physical expressions for arguments to evaluate against a batch
/// The expressions are different depending on `mode`:
/// * Partial: AggregateExpr::expressions
/// * Final: columns of `AggregateExpr::state_fields()`
Expand Down
888 changes: 343 additions & 545 deletions datafusion/core/src/physical_plan/aggregates/row_hash.rs

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions datafusion/execution/src/memory_pool/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ pub trait VecAllocExt {

/// [Push](Vec::push) new element to vector and store additional allocated bytes in `accounting` (additive).
fn push_accounted(&mut self, x: Self::T, accounting: &mut usize);

/// Return the amount of memory allocated by this Vec (not
/// recursively counting any heap allocations contained within the
/// structure). Does not include the size of `self`
fn allocated_size(&self) -> usize;
}

impl<T> VecAllocExt for Vec<T> {
Expand All @@ -44,6 +49,9 @@ impl<T> VecAllocExt for Vec<T> {

self.push(x);
}
fn allocated_size(&self) -> usize {
std::mem::size_of::<T>() * self.capacity()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refactored this calculation into its own function as leaving it inline in the group code made it harder to follow in my opinion

}
}

/// Extension trait for [`RawTable`] to account for allocations.
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ indexmap = "2.0.0"
itertools = { version = "0.11", features = ["use_std"] }
lazy_static = { version = "^1.4.0" }
libc = "0.2.140"
log = "^0.4"
md-5 = { version = "^0.10.0", optional = true }
paste = "^1.0"
petgraph = "0.6.2"
Expand Down
241 changes: 238 additions & 3 deletions datafusion/physical-expr/src/aggregate/average.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@

//! Defines physical expressions that can evaluated at runtime during query execution

use arrow::array::{AsArray, PrimitiveBuilder};
use log::debug;

use std::any::Any;
use std::convert::TryFrom;
use std::sync::Arc;

use crate::aggregate::groups_accumulator::accumulate::NullState;
use crate::aggregate::row_accumulator::{
is_row_accumulator_support_dtype, RowAccumulator,
};
Expand All @@ -29,19 +33,23 @@ use crate::aggregate::sum::sum_batch;
use crate::aggregate::utils::calculate_result_decimal_for_avg;
use crate::aggregate::utils::down_cast_any_ref;
use crate::expressions::format_state_name;
use crate::{AggregateExpr, PhysicalExpr};
use crate::{AggregateExpr, GroupsAccumulator, PhysicalExpr};
use arrow::compute;
use arrow::datatypes::DataType;
use arrow::datatypes::{DataType, Decimal128Type, Float64Type, UInt64Type};
use arrow::{
array::{ArrayRef, UInt64Array},
datatypes::Field,
};
use arrow_array::Array;
use arrow_array::{
Array, ArrowNativeTypeOp, ArrowNumericType, ArrowPrimitiveType, PrimitiveArray,
};
use datafusion_common::{downcast_value, ScalarValue};
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::Accumulator;
use datafusion_row::accessor::RowAccessor;

use super::utils::{adjust_output_array, Decimal128Averager};

/// AVG aggregate expression
#[derive(Debug, Clone)]
pub struct Avg {
Expand Down Expand Up @@ -155,6 +163,50 @@ impl AggregateExpr for Avg {
&self.rt_data_type,
)?))
}

fn groups_accumulator_supported(&self) -> bool {
use DataType::*;

matches!(&self.rt_data_type, Float64 | Decimal128(_, _))
}

fn create_groups_accumulator(&self) -> Result<Box<dyn GroupsAccumulator>> {
use DataType::*;
// instantiate specialized accumulator based for the type
match (&self.sum_data_type, &self.rt_data_type) {
(Float64, Float64) => {
Ok(Box::new(AvgGroupsAccumulator::<Float64Type, _>::new(
&self.sum_data_type,
&self.rt_data_type,
|sum: f64, count: u64| Ok(sum / count as f64),
)))
}
(
Decimal128(_sum_precision, sum_scale),
Decimal128(target_precision, target_scale),
) => {
let decimal_averager = Decimal128Averager::try_new(
*sum_scale,
*target_precision,
*target_scale,
)?;

let avg_fn =
move |sum: i128, count: u64| decimal_averager.avg(sum, count as i128);

Ok(Box::new(AvgGroupsAccumulator::<Decimal128Type, _>::new(
&self.sum_data_type,
&self.rt_data_type,
avg_fn,
)))
}

_ => Err(DataFusionError::NotImplemented(format!(
"AvgGroupsAccumulator for ({} --> {})",
self.sum_data_type, self.rt_data_type,
))),
}
}
}

impl PartialEq<dyn Any> for Avg {
Expand Down Expand Up @@ -383,6 +435,189 @@ impl RowAccumulator for AvgRowAccumulator {
}
}

/// An accumulator to compute the average of `[PrimitiveArray<T>]`.
/// Stores values as native types, and does overflow checking
///
/// F: Function that calcuates the average value from a sum of
alamb marked this conversation as resolved.
Show resolved Hide resolved
/// T::Native and a total count
#[derive(Debug)]
struct AvgGroupsAccumulator<T, F>
where
T: ArrowNumericType + Send,
F: Fn(T::Native, u64) -> Result<T::Native> + Send,
{
/// The type of the internal sum
sum_data_type: DataType,

/// The type of the returned sum
return_data_type: DataType,

/// Count per group (use u64 to make UInt64Array)
counts: Vec<u64>,

/// Sums per group, stored as the native type
sums: Vec<T::Native>,

/// Track nulls in the input / filters
null_state: NullState,

/// Function that computes the final average (value / count)
avg_fn: F,
}

impl<T, F> AvgGroupsAccumulator<T, F>
where
T: ArrowNumericType + Send,
F: Fn(T::Native, u64) -> Result<T::Native> + Send,
{
pub fn new(sum_data_type: &DataType, return_data_type: &DataType, avg_fn: F) -> Self {
debug!(
"AvgGroupsAccumulator ({}, sum type: {sum_data_type:?}) --> {return_data_type:?}",
std::any::type_name::<T>()
);

Self {
return_data_type: return_data_type.clone(),
sum_data_type: sum_data_type.clone(),
counts: vec![],
sums: vec![],
null_state: NullState::new(),
avg_fn,
}
}
}

impl<T, F> GroupsAccumulator for AvgGroupsAccumulator<T, F>
where
T: ArrowNumericType + Send,
F: Fn(T::Native, u64) -> Result<T::Native> + Send,
{
fn update_batch(
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&arrow_array::BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
assert_eq!(values.len(), 1, "single argument to update_batch");
let values = values.get(0).unwrap().as_primitive::<T>();
alamb marked this conversation as resolved.
Show resolved Hide resolved

// increment counts, update sums
self.counts.resize(total_num_groups, 0);
self.sums.resize(total_num_groups, T::default_value());
self.null_state.accumulate(
group_indices,
values,
opt_filter,
total_num_groups,
|group_index, new_value| {
let sum = &mut self.sums[group_index];
*sum = sum.add_wrapping(new_value);

self.counts[group_index] += 1;
},
);

Ok(())
}

fn merge_batch(
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&arrow_array::BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
assert_eq!(values.len(), 2, "two arguments to merge_batch");
// first batch is counts, second is partial sums
let partial_counts = values.get(0).unwrap().as_primitive::<UInt64Type>();
alamb marked this conversation as resolved.
Show resolved Hide resolved
let partial_sums = values.get(1).unwrap().as_primitive::<T>();
// update counts with partial counts
self.counts.resize(total_num_groups, 0);
self.null_state.accumulate(
group_indices,
partial_counts,
opt_filter,
total_num_groups,
|group_index, partial_count| {
self.counts[group_index] += partial_count;
},
);

// update sums
self.sums.resize(total_num_groups, T::default_value());
self.null_state.accumulate(
group_indices,
partial_sums,
opt_filter,
total_num_groups,
|group_index, new_value: <T as ArrowPrimitiveType>::Native| {
let sum = &mut self.sums[group_index];
*sum = sum.add_wrapping(new_value);
},
);

Ok(())
}

fn evaluate(&mut self) -> Result<ArrayRef> {
let counts = std::mem::take(&mut self.counts);
let sums = std::mem::take(&mut self.sums);
let nulls = self.null_state.build();

assert_eq!(counts.len(), sums.len());

// don't evaluate averages with null inputs to avoid errors on null vaues
alamb marked this conversation as resolved.
Show resolved Hide resolved
let array: PrimitiveArray<T> = if let Some(nulls) = nulls.as_ref() {
assert_eq!(nulls.len(), sums.len());
let mut builder = PrimitiveBuilder::<T>::with_capacity(nulls.len());
let iter = sums.into_iter().zip(counts.into_iter()).zip(nulls.iter());

for ((sum, count), is_valid) in iter {
if is_valid {
builder.append_value((self.avg_fn)(sum, count)?)
} else {
builder.append_null();
}
}
builder.finish()
} else {
let averages: Vec<T::Native> = sums
.into_iter()
.zip(counts.into_iter())
.map(|(sum, count)| (self.avg_fn)(sum, count))
.collect::<Result<Vec<_>>>()?;
PrimitiveArray::new(averages.into(), nulls) // no copy
};

// fix up decimal precision and scale for decimals
let array = adjust_output_array(&self.return_data_type, Arc::new(array))?;

Ok(array)
}

// return arrays for sums and counts
fn state(&mut self) -> Result<Vec<ArrayRef>> {
let nulls = self.null_state.build();
let counts = std::mem::take(&mut self.counts);
let counts = UInt64Array::from(counts); // zero copy

let sums = std::mem::take(&mut self.sums);
let sums = PrimitiveArray::<T>::new(sums.into(), nulls); // zero copy
let sums = adjust_output_array(&self.sum_data_type, Arc::new(sums))?;

Ok(vec![
Arc::new(counts) as ArrayRef,
Arc::new(sums) as ArrayRef,
])
}

fn size(&self) -> usize {
self.counts.capacity() * std::mem::size_of::<u64>()
+ self.sums.capacity() * std::mem::size_of::<T>()
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading