From 056fd7b64eff621fffc695f069967714c79f0929 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 20 Jul 2023 11:35:42 -0400 Subject: [PATCH 1/8] Specialize primitive group values --- .../src/physical_plan/aggregates/row_hash.rs | 133 +++++++++++++++++- 1 file changed, 130 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs index e3ac5c49a94b..cf2a9c9d72d8 100644 --- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs +++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs @@ -39,12 +39,16 @@ use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput}; use crate::physical_plan::{aggregates, PhysicalExpr}; use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream}; use arrow::array::*; +use arrow::buffer::NullBuffer; use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; +use arrow_array::downcast_integer; +use arrow_schema::DataType; use datafusion_common::Result; use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt}; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::TaskContext; use hashbrown::raw::RawTable; +use hashbrown::HashMap; #[derive(Debug, Clone)] /// This object tracks the aggregation phase (input/output) @@ -77,6 +81,110 @@ trait GroupValues: Send { fn emit(&mut self, emit_to: EmitTo) -> Result>; } +/// A [`GroupValues`] storing raw primitive values +struct GroupValuesPrimitive { + data_type: DataType, + map: HashMap, + null_group: Option, + values: Vec, +} + +impl GroupValuesPrimitive { + fn new(data_type: DataType) -> Self { + assert!(PrimitiveArray::::is_compatible(&data_type)); + Self { + data_type, + map: HashMap::with_capacity(1024), + values: Vec::with_capacity(1024), + null_group: None, + } + } +} + +impl GroupValues for GroupValuesPrimitive +where + T::Native: std::hash::Hash + Eq, +{ + fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec) -> Result<()> { + assert_eq!(cols.len(), 1); + groups.clear(); + + for v in cols[0].as_primitive::() { + let group_id = match v { + None => self.null_group.get_or_insert_with(|| { + let group_id = self.values.len(); + self.values.push(Default::default()); + group_id + }), + Some(key) => self.map.entry(key).or_insert_with(|| { + let group_id = self.values.len(); + self.values.push(key); + group_id + }), + }; + groups.push(*group_id) + } + Ok(()) + } + + fn size(&self) -> usize { + // This is an approximation + self.map.capacity() * std::mem::size_of::<(T::Native, usize)>() + + self.values.allocated_size() + } + + fn is_empty(&self) -> bool { + self.values.is_empty() + } + + fn len(&self) -> usize { + self.values.len() + } + + fn emit(&mut self, emit_to: EmitTo) -> Result> { + fn build_primitive( + values: Vec, + null_idx: Option, + ) -> PrimitiveArray { + let nulls = null_idx.map(|null_idx| { + let mut buffer = BooleanBufferBuilder::new(values.len()); + buffer.append_n(values.len(), true); + buffer.set_bit(null_idx, false); + unsafe { NullBuffer::new_unchecked(buffer.finish(), 1) } + }); + PrimitiveArray::::new(values.into(), nulls) + } + + let array: PrimitiveArray = match emit_to { + EmitTo::All => { + self.map.clear(); + build_primitive(std::mem::take(&mut self.values), self.null_group.take()) + } + EmitTo::First(n) => { + self.map.retain(|_, v| match v.checked_sub(n) { + Some(new) => { + *v = new; + true + } + None => false, + }); + let null_group = match &mut self.null_group { + Some(v) if *v >= n => { + *v -= n; + None + } + Some(_) => self.null_group.take(), + None => None, + }; + let mut split = self.values.split_off(n); + std::mem::swap(&mut self.values, &mut split); + build_primitive(split, null_group) + } + }; + Ok(vec![Arc::new(array.with_data_type(self.data_type.clone()))]) + } +} + /// A [`GroupValues`] making use of [`Rows`] struct GroupValuesRows { /// Converter for the group values @@ -234,6 +342,26 @@ impl GroupValues for GroupValuesRows { } } +fn group_values(schema: SchemaRef) -> Result> { + if schema.fields.len() == 1 { + let d = schema.fields[0].data_type(); + + macro_rules! downcast_helper { + ($t:ty, $d:ident) => { + return Ok(Box::new(GroupValuesPrimitive::<$t>::new($d.clone()))) + }; + } + + // TODO: More primitives + downcast_integer! { + d => (downcast_helper, d), + _ => {} + } + } + + Ok(Box::new(GroupValuesRows::try_new(schema)?)) +} + /// Hash based Grouping Aggregator /// /// # Design Goals @@ -416,8 +544,7 @@ impl GroupedHashAggregateStream { .transpose()? .unwrap_or(GroupOrdering::None); - let group = Box::new(GroupValuesRows::try_new(group_schema)?); - + let group_values = group_values(group_schema)?; timer.done(); let exec_state = ExecutionState::ReadingInput; @@ -431,7 +558,7 @@ impl GroupedHashAggregateStream { filter_expressions, group_by: agg_group_by, reservation, - group_values: group, + group_values, current_group_indices: Default::default(), exec_state, baseline_metrics, From a7f6a336a2dbd824d06422ae5f2dbc827244841a Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 20 Jul 2023 12:25:12 -0400 Subject: [PATCH 2/8] Split module --- .../aggregates/group_values/mod.rs | 65 ++++ .../aggregates/group_values/primitive.rs | 132 ++++++++ .../aggregates/group_values/row.rs | 184 +++++++++++ .../core/src/physical_plan/aggregates/mod.rs | 1 + .../src/physical_plan/aggregates/row_hash.rs | 312 +----------------- 5 files changed, 385 insertions(+), 309 deletions(-) create mode 100644 datafusion/core/src/physical_plan/aggregates/group_values/mod.rs create mode 100644 datafusion/core/src/physical_plan/aggregates/group_values/primitive.rs create mode 100644 datafusion/core/src/physical_plan/aggregates/group_values/row.rs diff --git a/datafusion/core/src/physical_plan/aggregates/group_values/mod.rs b/datafusion/core/src/physical_plan/aggregates/group_values/mod.rs new file mode 100644 index 000000000000..bf197f723ba6 --- /dev/null +++ b/datafusion/core/src/physical_plan/aggregates/group_values/mod.rs @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_array::{downcast_integer, ArrayRef}; +use arrow_schema::SchemaRef; +use datafusion_common::Result; +use datafusion_physical_expr::EmitTo; + +mod primitive; +use primitive::GroupValuesPrimitive; + +mod row; +use row::GroupValuesRows; + +/// An interning store for group keys +pub trait GroupValues: Send { + /// Calculates the `groups` for each input row of `cols` + fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec) -> Result<()>; + + /// Returns the number of bytes used by this [`GroupValues`] + fn size(&self) -> usize; + + /// Returns true if this [`GroupValues`] is empty + fn is_empty(&self) -> bool; + + /// The number of values stored in this [`GroupValues`] + fn len(&self) -> usize; + + /// Emits the group values + fn emit(&mut self, emit_to: EmitTo) -> Result>; +} + +pub fn new_group_values(schema: SchemaRef) -> Result> { + if schema.fields.len() == 1 { + let d = schema.fields[0].data_type(); + + macro_rules! downcast_helper { + ($t:ty, $d:ident) => { + return Ok(Box::new(GroupValuesPrimitive::<$t>::new($d.clone()))) + }; + } + + // TODO: More primitives + downcast_integer! { + d => (downcast_helper, d), + _ => {} + } + } + + Ok(Box::new(GroupValuesRows::try_new(schema)?)) +} diff --git a/datafusion/core/src/physical_plan/aggregates/group_values/primitive.rs b/datafusion/core/src/physical_plan/aggregates/group_values/primitive.rs new file mode 100644 index 000000000000..d8a395504135 --- /dev/null +++ b/datafusion/core/src/physical_plan/aggregates/group_values/primitive.rs @@ -0,0 +1,132 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::physical_plan::aggregates::group_values::GroupValues; +use arrow::array::BooleanBufferBuilder; +use arrow::buffer::NullBuffer; +use arrow_array::{ArrayRef, ArrowPrimitiveType, PrimitiveArray}; +use arrow_schema::DataType; +use datafusion_common::Result; +use datafusion_physical_expr::EmitTo; +use std::collections::HashMap; +use std::sync::Arc; +use arrow_array::cast::AsArray; +use datafusion_execution::memory_pool::proxy::VecAllocExt; + +/// A [`GroupValues`] storing raw primitive values +pub struct GroupValuesPrimitive { + data_type: DataType, + map: HashMap, + null_group: Option, + values: Vec, +} + +impl GroupValuesPrimitive { + pub fn new(data_type: DataType) -> Self { + assert!(PrimitiveArray::::is_compatible(&data_type)); + Self { + data_type, + map: HashMap::with_capacity(1024), + values: Vec::with_capacity(1024), + null_group: None, + } + } +} + +impl GroupValues for GroupValuesPrimitive +where + T::Native: std::hash::Hash + Eq, +{ + fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec) -> Result<()> { + assert_eq!(cols.len(), 1); + groups.clear(); + + for v in cols[0].as_primitive::() { + let group_id = match v { + None => self.null_group.get_or_insert_with(|| { + let group_id = self.values.len(); + self.values.push(Default::default()); + group_id + }), + Some(key) => self.map.entry(key).or_insert_with(|| { + let group_id = self.values.len(); + self.values.push(key); + group_id + }), + }; + groups.push(*group_id) + } + Ok(()) + } + + fn size(&self) -> usize { + // This is an approximation + self.map.capacity() * std::mem::size_of::<(T::Native, usize)>() + + self.values.allocated_size() + } + + fn is_empty(&self) -> bool { + self.values.is_empty() + } + + fn len(&self) -> usize { + self.values.len() + } + + fn emit(&mut self, emit_to: EmitTo) -> Result> { + fn build_primitive( + values: Vec, + null_idx: Option, + ) -> PrimitiveArray { + let nulls = null_idx.map(|null_idx| { + let mut buffer = BooleanBufferBuilder::new(values.len()); + buffer.append_n(values.len(), true); + buffer.set_bit(null_idx, false); + unsafe { NullBuffer::new_unchecked(buffer.finish(), 1) } + }); + PrimitiveArray::::new(values.into(), nulls) + } + + let array: PrimitiveArray = match emit_to { + EmitTo::All => { + self.map.clear(); + build_primitive(std::mem::take(&mut self.values), self.null_group.take()) + } + EmitTo::First(n) => { + self.map.retain(|_, v| match v.checked_sub(n) { + Some(new) => { + *v = new; + true + } + None => false, + }); + let null_group = match &mut self.null_group { + Some(v) if *v >= n => { + *v -= n; + None + } + Some(_) => self.null_group.take(), + None => None, + }; + let mut split = self.values.split_off(n); + std::mem::swap(&mut self.values, &mut split); + build_primitive(split, null_group) + } + }; + Ok(vec![Arc::new(array.with_data_type(self.data_type.clone()))]) + } +} diff --git a/datafusion/core/src/physical_plan/aggregates/group_values/row.rs b/datafusion/core/src/physical_plan/aggregates/group_values/row.rs new file mode 100644 index 000000000000..4eb660d52590 --- /dev/null +++ b/datafusion/core/src/physical_plan/aggregates/group_values/row.rs @@ -0,0 +1,184 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::physical_plan::aggregates::group_values::GroupValues; +use ahash::RandomState; +use arrow::row::{RowConverter, Rows, SortField}; +use arrow_array::ArrayRef; +use arrow_schema::SchemaRef; +use datafusion_common::Result; +use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt}; +use datafusion_physical_expr::hash_utils::create_hashes; +use datafusion_physical_expr::EmitTo; +use hashbrown::raw::RawTable; + +/// A [`GroupValues`] making use of [`Rows`] +pub struct GroupValuesRows { + /// Converter for the group values + row_converter: RowConverter, + + /// Logically maps group values to a group_index in + /// [`Self::group_values`] and in each accumulator + /// + /// Uses the raw API of hashbrown to avoid actually storing the + /// keys (group values) in the table + /// + /// keys: u64 hashes of the GroupValue + /// values: (hash, group_index) + map: RawTable<(u64, usize)>, + + /// The size of `map` in bytes + map_size: usize, + + /// The actual group by values, stored in arrow [`Row`] format. + /// `group_values[i]` holds the group value for group_index `i`. + /// + /// The row format is used to compare group keys quickly and store + /// them efficiently in memory. Quick comparison is especially + /// important for multi-column group keys. + /// + /// [`Row`]: arrow::row::Row + group_values: Rows, + + // buffer to be reused to store hashes + hashes_buffer: Vec, + + /// Random state for creating hashes + random_state: RandomState, +} + +impl GroupValuesRows { + pub fn try_new(schema: SchemaRef) -> Result { + let row_converter = RowConverter::new( + schema + .fields() + .iter() + .map(|f| SortField::new(f.data_type().clone())) + .collect(), + )?; + + let map = RawTable::with_capacity(0); + let group_values = row_converter.empty_rows(0, 0); + + Ok(Self { + row_converter, + map, + map_size: 0, + group_values, + hashes_buffer: Default::default(), + random_state: Default::default(), + }) + } +} + +impl GroupValues for GroupValuesRows { + fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec) -> Result<()> { + // Convert the group keys into the row format + // Avoid reallocation when https://github.com/apache/arrow-rs/issues/4479 is available + let group_rows = self.row_converter.convert_columns(cols)?; + let n_rows = group_rows.num_rows(); + + // tracks to which group each of the input rows belongs + groups.clear(); + + // 1.1 Calculate the group keys for the group values + let batch_hashes = &mut self.hashes_buffer; + batch_hashes.clear(); + batch_hashes.resize(n_rows, 0); + create_hashes(cols, &self.random_state, batch_hashes)?; + + for (row, &hash) in batch_hashes.iter().enumerate() { + let entry = self.map.get_mut(hash, |(_hash, group_idx)| { + // verify that a group that we are inserting with hash is + // actually the same key value as the group in + // existing_idx (aka group_values @ row) + group_rows.row(row) == self.group_values.row(*group_idx) + }); + + let group_idx = match entry { + // Existing group_index for this group value + Some((_hash, group_idx)) => *group_idx, + // 1.2 Need to create new entry for the group + None => { + // Add new entry to aggr_state and save newly created index + let group_idx = self.group_values.num_rows(); + self.group_values.push(group_rows.row(row)); + + // for hasher function, use precomputed hash value + self.map.insert_accounted( + (hash, group_idx), + |(hash, _group_index)| *hash, + &mut self.map_size, + ); + group_idx + } + }; + groups.push(group_idx); + } + + Ok(()) + } + + fn size(&self) -> usize { + self.row_converter.size() + + self.group_values.size() + + self.map_size + + self.hashes_buffer.allocated_size() + } + + fn is_empty(&self) -> bool { + self.len() == 0 + } + + fn len(&self) -> usize { + self.group_values.num_rows() + } + + fn emit(&mut self, emit_to: EmitTo) -> Result> { + Ok(match emit_to { + EmitTo::All => { + // Eventually we may also want to clear the hash table here + self.row_converter.convert_rows(&self.group_values)? + } + EmitTo::First(n) => { + let groups_rows = self.group_values.iter().take(n); + let output = self.row_converter.convert_rows(groups_rows)?; + // Clear out first n group keys by copying them to a new Rows. + // TODO file some ticket in arrow-rs to make this more efficent? + let mut new_group_values = self.row_converter.empty_rows(0, 0); + for row in self.group_values.iter().skip(n) { + new_group_values.push(row); + } + std::mem::swap(&mut new_group_values, &mut self.group_values); + + // SAFETY: self.map outlives iterator and is not modified concurrently + unsafe { + for bucket in self.map.iter() { + // Decrement group index by n + match bucket.as_ref().1.checked_sub(n) { + // Group index was >= n, shift value down + Some(sub) => bucket.as_mut().1 = sub, + // Group index was < n, so remove from table + None => self.map.erase(bucket), + } + } + } + output + } + }) + } +} diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs index 9d8ced18dac3..4b338952837d 100644 --- a/datafusion/core/src/physical_plan/aggregates/mod.rs +++ b/datafusion/core/src/physical_plan/aggregates/mod.rs @@ -45,6 +45,7 @@ use std::any::Any; use std::collections::HashMap; use std::sync::Arc; +mod group_values; mod no_grouping; mod order; mod row_hash; diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs index cf2a9c9d72d8..4613a2e46443 100644 --- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs +++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs @@ -25,12 +25,10 @@ use std::sync::Arc; use std::task::{Context, Poll}; use std::vec; -use ahash::RandomState; -use arrow::row::{RowConverter, Rows, SortField}; -use datafusion_physical_expr::hash_utils::create_hashes; use futures::ready; use futures::stream::{Stream, StreamExt}; +use crate::physical_plan::aggregates::group_values::{new_group_values, GroupValues}; use crate::physical_plan::aggregates::{ evaluate_group_by, evaluate_many, evaluate_optional, group_schema, AggregateMode, PhysicalGroupBy, @@ -39,16 +37,11 @@ use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput}; use crate::physical_plan::{aggregates, PhysicalExpr}; use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream}; use arrow::array::*; -use arrow::buffer::NullBuffer; use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; -use arrow_array::downcast_integer; -use arrow_schema::DataType; use datafusion_common::Result; -use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt}; +use datafusion_execution::memory_pool::proxy::VecAllocExt; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::TaskContext; -use hashbrown::raw::RawTable; -use hashbrown::HashMap; #[derive(Debug, Clone)] /// This object tracks the aggregation phase (input/output) @@ -63,305 +56,6 @@ pub(crate) enum ExecutionState { use super::order::GroupOrdering; use super::AggregateExec; -/// An interning store for group keys -trait GroupValues: Send { - /// Calculates the `groups` for each input row of `cols` - fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec) -> Result<()>; - - /// Returns the number of bytes used by this [`GroupValues`] - fn size(&self) -> usize; - - /// Returns true if this [`GroupValues`] is empty - fn is_empty(&self) -> bool; - - /// The number of values stored in this [`GroupValues`] - fn len(&self) -> usize; - - /// Emits the group values - fn emit(&mut self, emit_to: EmitTo) -> Result>; -} - -/// A [`GroupValues`] storing raw primitive values -struct GroupValuesPrimitive { - data_type: DataType, - map: HashMap, - null_group: Option, - values: Vec, -} - -impl GroupValuesPrimitive { - fn new(data_type: DataType) -> Self { - assert!(PrimitiveArray::::is_compatible(&data_type)); - Self { - data_type, - map: HashMap::with_capacity(1024), - values: Vec::with_capacity(1024), - null_group: None, - } - } -} - -impl GroupValues for GroupValuesPrimitive -where - T::Native: std::hash::Hash + Eq, -{ - fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec) -> Result<()> { - assert_eq!(cols.len(), 1); - groups.clear(); - - for v in cols[0].as_primitive::() { - let group_id = match v { - None => self.null_group.get_or_insert_with(|| { - let group_id = self.values.len(); - self.values.push(Default::default()); - group_id - }), - Some(key) => self.map.entry(key).or_insert_with(|| { - let group_id = self.values.len(); - self.values.push(key); - group_id - }), - }; - groups.push(*group_id) - } - Ok(()) - } - - fn size(&self) -> usize { - // This is an approximation - self.map.capacity() * std::mem::size_of::<(T::Native, usize)>() - + self.values.allocated_size() - } - - fn is_empty(&self) -> bool { - self.values.is_empty() - } - - fn len(&self) -> usize { - self.values.len() - } - - fn emit(&mut self, emit_to: EmitTo) -> Result> { - fn build_primitive( - values: Vec, - null_idx: Option, - ) -> PrimitiveArray { - let nulls = null_idx.map(|null_idx| { - let mut buffer = BooleanBufferBuilder::new(values.len()); - buffer.append_n(values.len(), true); - buffer.set_bit(null_idx, false); - unsafe { NullBuffer::new_unchecked(buffer.finish(), 1) } - }); - PrimitiveArray::::new(values.into(), nulls) - } - - let array: PrimitiveArray = match emit_to { - EmitTo::All => { - self.map.clear(); - build_primitive(std::mem::take(&mut self.values), self.null_group.take()) - } - EmitTo::First(n) => { - self.map.retain(|_, v| match v.checked_sub(n) { - Some(new) => { - *v = new; - true - } - None => false, - }); - let null_group = match &mut self.null_group { - Some(v) if *v >= n => { - *v -= n; - None - } - Some(_) => self.null_group.take(), - None => None, - }; - let mut split = self.values.split_off(n); - std::mem::swap(&mut self.values, &mut split); - build_primitive(split, null_group) - } - }; - Ok(vec![Arc::new(array.with_data_type(self.data_type.clone()))]) - } -} - -/// A [`GroupValues`] making use of [`Rows`] -struct GroupValuesRows { - /// Converter for the group values - row_converter: RowConverter, - - /// Logically maps group values to a group_index in - /// [`Self::group_values`] and in each accumulator - /// - /// Uses the raw API of hashbrown to avoid actually storing the - /// keys (group values) in the table - /// - /// keys: u64 hashes of the GroupValue - /// values: (hash, group_index) - map: RawTable<(u64, usize)>, - - /// The size of `map` in bytes - map_size: usize, - - /// The actual group by values, stored in arrow [`Row`] format. - /// `group_values[i]` holds the group value for group_index `i`. - /// - /// The row format is used to compare group keys quickly and store - /// them efficiently in memory. Quick comparison is especially - /// important for multi-column group keys. - /// - /// [`Row`]: arrow::row::Row - group_values: Rows, - - // buffer to be reused to store hashes - hashes_buffer: Vec, - - /// Random state for creating hashes - random_state: RandomState, -} - -impl GroupValuesRows { - fn try_new(schema: SchemaRef) -> Result { - let row_converter = RowConverter::new( - schema - .fields() - .iter() - .map(|f| SortField::new(f.data_type().clone())) - .collect(), - )?; - - let map = RawTable::with_capacity(0); - let group_values = row_converter.empty_rows(0, 0); - - Ok(Self { - row_converter, - map, - map_size: 0, - group_values, - hashes_buffer: Default::default(), - random_state: Default::default(), - }) - } -} - -impl GroupValues for GroupValuesRows { - fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec) -> Result<()> { - // Convert the group keys into the row format - // Avoid reallocation when https://github.com/apache/arrow-rs/issues/4479 is available - let group_rows = self.row_converter.convert_columns(cols)?; - let n_rows = group_rows.num_rows(); - - // tracks to which group each of the input rows belongs - groups.clear(); - - // 1.1 Calculate the group keys for the group values - let batch_hashes = &mut self.hashes_buffer; - batch_hashes.clear(); - batch_hashes.resize(n_rows, 0); - create_hashes(cols, &self.random_state, batch_hashes)?; - - for (row, &hash) in batch_hashes.iter().enumerate() { - let entry = self.map.get_mut(hash, |(_hash, group_idx)| { - // verify that a group that we are inserting with hash is - // actually the same key value as the group in - // existing_idx (aka group_values @ row) - group_rows.row(row) == self.group_values.row(*group_idx) - }); - - let group_idx = match entry { - // Existing group_index for this group value - Some((_hash, group_idx)) => *group_idx, - // 1.2 Need to create new entry for the group - None => { - // Add new entry to aggr_state and save newly created index - let group_idx = self.group_values.num_rows(); - self.group_values.push(group_rows.row(row)); - - // for hasher function, use precomputed hash value - self.map.insert_accounted( - (hash, group_idx), - |(hash, _group_index)| *hash, - &mut self.map_size, - ); - group_idx - } - }; - groups.push(group_idx); - } - - Ok(()) - } - - fn size(&self) -> usize { - self.row_converter.size() - + self.group_values.size() - + self.map_size - + self.hashes_buffer.allocated_size() - } - - fn is_empty(&self) -> bool { - self.len() == 0 - } - - fn len(&self) -> usize { - self.group_values.num_rows() - } - - fn emit(&mut self, emit_to: EmitTo) -> Result> { - Ok(match emit_to { - EmitTo::All => { - // Eventually we may also want to clear the hash table here - self.row_converter.convert_rows(&self.group_values)? - } - EmitTo::First(n) => { - let groups_rows = self.group_values.iter().take(n); - let output = self.row_converter.convert_rows(groups_rows)?; - // Clear out first n group keys by copying them to a new Rows. - // TODO file some ticket in arrow-rs to make this more efficent? - let mut new_group_values = self.row_converter.empty_rows(0, 0); - for row in self.group_values.iter().skip(n) { - new_group_values.push(row); - } - std::mem::swap(&mut new_group_values, &mut self.group_values); - - // SAFETY: self.map outlives iterator and is not modified concurrently - unsafe { - for bucket in self.map.iter() { - // Decrement group index by n - match bucket.as_ref().1.checked_sub(n) { - // Group index was >= n, shift value down - Some(sub) => bucket.as_mut().1 = sub, - // Group index was < n, so remove from table - None => self.map.erase(bucket), - } - } - } - output - } - }) - } -} - -fn group_values(schema: SchemaRef) -> Result> { - if schema.fields.len() == 1 { - let d = schema.fields[0].data_type(); - - macro_rules! downcast_helper { - ($t:ty, $d:ident) => { - return Ok(Box::new(GroupValuesPrimitive::<$t>::new($d.clone()))) - }; - } - - // TODO: More primitives - downcast_integer! { - d => (downcast_helper, d), - _ => {} - } - } - - Ok(Box::new(GroupValuesRows::try_new(schema)?)) -} - /// Hash based Grouping Aggregator /// /// # Design Goals @@ -544,7 +238,7 @@ impl GroupedHashAggregateStream { .transpose()? .unwrap_or(GroupOrdering::None); - let group_values = group_values(group_schema)?; + let group_values = new_group_values(group_schema)?; timer.done(); let exec_state = ExecutionState::ReadingInput; From 98384dce9eda1cfe98accaedff2f5f0ea96e3dff Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 20 Jul 2023 12:30:41 -0400 Subject: [PATCH 3/8] RawTable --- .../aggregates/group_values/primitive.rs | 68 +++++++++++++------ 1 file changed, 46 insertions(+), 22 deletions(-) diff --git a/datafusion/core/src/physical_plan/aggregates/group_values/primitive.rs b/datafusion/core/src/physical_plan/aggregates/group_values/primitive.rs index d8a395504135..69338bbe8472 100644 --- a/datafusion/core/src/physical_plan/aggregates/group_values/primitive.rs +++ b/datafusion/core/src/physical_plan/aggregates/group_values/primitive.rs @@ -16,21 +16,23 @@ // under the License. use crate::physical_plan::aggregates::group_values::GroupValues; +use ahash::RandomState; use arrow::array::BooleanBufferBuilder; use arrow::buffer::NullBuffer; +use arrow_array::cast::AsArray; use arrow_array::{ArrayRef, ArrowPrimitiveType, PrimitiveArray}; use arrow_schema::DataType; use datafusion_common::Result; +use datafusion_execution::memory_pool::proxy::VecAllocExt; use datafusion_physical_expr::EmitTo; -use std::collections::HashMap; +use hashbrown::raw::RawTable; use std::sync::Arc; -use arrow_array::cast::AsArray; -use datafusion_execution::memory_pool::proxy::VecAllocExt; /// A [`GroupValues`] storing raw primitive values pub struct GroupValuesPrimitive { data_type: DataType, - map: HashMap, + map: RawTable, + random_state: RandomState, null_group: Option, values: Vec, } @@ -40,9 +42,10 @@ impl GroupValuesPrimitive { assert!(PrimitiveArray::::is_compatible(&data_type)); Self { data_type, - map: HashMap::with_capacity(1024), - values: Vec::with_capacity(1024), + map: RawTable::with_capacity(128), + values: Vec::with_capacity(128), null_group: None, + random_state: Default::default(), } } } @@ -57,26 +60,42 @@ where for v in cols[0].as_primitive::() { let group_id = match v { - None => self.null_group.get_or_insert_with(|| { + None => *self.null_group.get_or_insert_with(|| { let group_id = self.values.len(); self.values.push(Default::default()); group_id }), - Some(key) => self.map.entry(key).or_insert_with(|| { - let group_id = self.values.len(); - self.values.push(key); - group_id - }), + Some(key) => { + let hash = self.random_state.hash_one(key); + let insert = self.map.find_or_find_insert_slot( + hash, + |g| unsafe { *self.values.get_unchecked(*g) == key }, + |g| unsafe { + self.random_state.hash_one(*self.values.get_unchecked(*g)) + }, + ); + + // SAFETY: No mutation occurred since find_or_find_insert_slot + unsafe { + match insert { + Ok(v) => *v.as_ref(), + Err(slot) => { + let g = self.values.len(); + self.map.insert_in_slot(hash, slot, g); + self.values.push(key); + g + } + } + } + } }; - groups.push(*group_id) + groups.push(group_id) } Ok(()) } fn size(&self) -> usize { - // This is an approximation - self.map.capacity() * std::mem::size_of::<(T::Native, usize)>() - + self.values.allocated_size() + self.map.capacity() * std::mem::size_of::() + self.values.allocated_size() } fn is_empty(&self) -> bool { @@ -107,13 +126,18 @@ where build_primitive(std::mem::take(&mut self.values), self.null_group.take()) } EmitTo::First(n) => { - self.map.retain(|_, v| match v.checked_sub(n) { - Some(new) => { - *v = new; - true + // SAFETY: self.map outlives iterator and is not modified concurrently + unsafe { + for bucket in self.map.iter() { + // Decrement group index by n + match bucket.as_ref().checked_sub(n) { + // Group index was >= n, shift value down + Some(sub) => *bucket.as_mut() = sub, + // Group index was < n, so remove from table + None => self.map.erase(bucket), + } } - None => false, - }); + } let null_group = match &mut self.null_group { Some(v) if *v >= n => { *v -= n; From b5e839166e678c97073bd21dfa9852faadf35fe9 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 20 Jul 2023 14:11:31 -0400 Subject: [PATCH 4/8] Support all primitives --- datafusion/core/Cargo.toml | 1 + .../aggregates/group_values/mod.rs | 4 +- .../aggregates/group_values/primitive.rs | 44 ++++++++++++++++--- 3 files changed, 40 insertions(+), 9 deletions(-) diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 862d41a889f3..542d8c032c78 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -73,6 +73,7 @@ flate2 = { version = "1.0.24", optional = true } futures = "0.3" glob = "0.3.0" hashbrown = { version = "0.14", features = ["raw"] } +half = { version = "2.1", default-features = false } indexmap = "2.0.0" itertools = "0.11" lazy_static = { version = "^1.4.0" } diff --git a/datafusion/core/src/physical_plan/aggregates/group_values/mod.rs b/datafusion/core/src/physical_plan/aggregates/group_values/mod.rs index bf197f723ba6..af8872e93176 100644 --- a/datafusion/core/src/physical_plan/aggregates/group_values/mod.rs +++ b/datafusion/core/src/physical_plan/aggregates/group_values/mod.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use arrow_array::{downcast_integer, ArrayRef}; +use arrow_array::{downcast_primitive, ArrayRef}; use arrow_schema::SchemaRef; use datafusion_common::Result; use datafusion_physical_expr::EmitTo; @@ -55,7 +55,7 @@ pub fn new_group_values(schema: SchemaRef) -> Result> { } // TODO: More primitives - downcast_integer! { + downcast_primitive! { d => (downcast_helper, d), _ => {} } diff --git a/datafusion/core/src/physical_plan/aggregates/group_values/primitive.rs b/datafusion/core/src/physical_plan/aggregates/group_values/primitive.rs index 69338bbe8472..a874d20479e5 100644 --- a/datafusion/core/src/physical_plan/aggregates/group_values/primitive.rs +++ b/datafusion/core/src/physical_plan/aggregates/group_values/primitive.rs @@ -19,15 +19,46 @@ use crate::physical_plan::aggregates::group_values::GroupValues; use ahash::RandomState; use arrow::array::BooleanBufferBuilder; use arrow::buffer::NullBuffer; +use arrow::datatypes::i256; use arrow_array::cast::AsArray; -use arrow_array::{ArrayRef, ArrowPrimitiveType, PrimitiveArray}; +use arrow_array::{ArrayRef, ArrowNativeTypeOp, ArrowPrimitiveType, PrimitiveArray}; use arrow_schema::DataType; use datafusion_common::Result; use datafusion_execution::memory_pool::proxy::VecAllocExt; use datafusion_physical_expr::EmitTo; +use half::f16; use hashbrown::raw::RawTable; use std::sync::Arc; +/// A trait to allow hashing of floating point numbers +trait HashValue { + fn hash(self, state: &RandomState) -> u64; +} + +macro_rules! hash_integer { + ($($t:ty),+) => { + $(impl HashValue for $t { + fn hash(self, state: &RandomState) -> u64 { + state.hash_one(self) + } + })+ + }; +} +hash_integer!(i8, i16, i32, i64, i128, i256); +hash_integer!(u8, u16, u32, u64); + +macro_rules! hash_float { + ($($t:ty),+) => { + $(impl HashValue for $t { + fn hash(self, state: &RandomState) -> u64 { + state.hash_one(self.to_bits()) + } + })+ + }; +} + +hash_float!(f16, f32, f64); + /// A [`GroupValues`] storing raw primitive values pub struct GroupValuesPrimitive { data_type: DataType, @@ -52,7 +83,7 @@ impl GroupValuesPrimitive { impl GroupValues for GroupValuesPrimitive where - T::Native: std::hash::Hash + Eq, + T::Native: HashValue, { fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec) -> Result<()> { assert_eq!(cols.len(), 1); @@ -66,13 +97,12 @@ where group_id }), Some(key) => { - let hash = self.random_state.hash_one(key); + let state = &self.random_state; + let hash = key.hash(state); let insert = self.map.find_or_find_insert_slot( hash, - |g| unsafe { *self.values.get_unchecked(*g) == key }, - |g| unsafe { - self.random_state.hash_one(*self.values.get_unchecked(*g)) - }, + |g| unsafe { self.values.get_unchecked(*g).is_eq(key) }, + |g| unsafe { self.values.get_unchecked(*g).hash(state) }, ); // SAFETY: No mutation occurred since find_or_find_insert_slot From d015363c817513e1eda57f9f9ca9e0a7c2a9a740 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 20 Jul 2023 14:16:13 -0400 Subject: [PATCH 5/8] Add docs --- .../src/physical_plan/aggregates/group_values/primitive.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_plan/aggregates/group_values/primitive.rs b/datafusion/core/src/physical_plan/aggregates/group_values/primitive.rs index a874d20479e5..f93237ecdee1 100644 --- a/datafusion/core/src/physical_plan/aggregates/group_values/primitive.rs +++ b/datafusion/core/src/physical_plan/aggregates/group_values/primitive.rs @@ -61,11 +61,16 @@ hash_float!(f16, f32, f64); /// A [`GroupValues`] storing raw primitive values pub struct GroupValuesPrimitive { + /// The data type of the output array data_type: DataType, + /// Stores the group index based on the hash of its value map: RawTable, - random_state: RandomState, + /// The group index of the null value if any null_group: Option, + /// The values for each group index values: Vec, + /// The random state used to generate hashes + random_state: RandomState, } impl GroupValuesPrimitive { From 3750df678b7a82a69eba0d676f6f2e6493ac1ccf Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 20 Jul 2023 17:00:10 -0500 Subject: [PATCH 6/8] Update datafusion-cli cargo lock --- datafusion-cli/Cargo.lock | 155 ++++++++++++++++---------------------- 1 file changed, 65 insertions(+), 90 deletions(-) diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index c94e81311c90..e6bd697623d1 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -56,9 +56,9 @@ dependencies = [ [[package]] name = "allocator-api2" -version = "0.2.15" +version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56fc6cf8dc8c4158eed8649f9b8b0ea1518eb62b544fe9490d66fa0b349eafe9" +checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" [[package]] name = "android-tzdata" @@ -307,9 +307,9 @@ dependencies = [ [[package]] name = "assert_cmd" -version = "2.0.11" +version = "2.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86d6b683edf8d1119fe420a94f8a7e389239666aa72e65495d91c00462510151" +checksum = "88903cb14723e4d4003335bb7f8a14f27691649105346a0f0957466c096adfe6" dependencies = [ "anstyle", "bstr", @@ -346,7 +346,7 @@ checksum = "a564d521dd56509c4c47480d00b80ee55f7e385ae48db5744c67ad50c92d2ebf" dependencies = [ "proc-macro2", "quote", - "syn 2.0.25", + "syn 2.0.26", ] [[package]] @@ -384,7 +384,7 @@ dependencies = [ "aws-smithy-types", "aws-types", "bytes", - "fastrand", + "fastrand 1.9.0", "hex", "http", "hyper", @@ -404,7 +404,7 @@ checksum = "1fcdb2f7acbc076ff5ad05e7864bdb191ca70a6fd07668dc3a1a8bcd051de5ae" dependencies = [ "aws-smithy-async", "aws-smithy-types", - "fastrand", + "fastrand 1.9.0", "tokio", "tracing", "zeroize", @@ -550,7 +550,7 @@ dependencies = [ "aws-smithy-http-tower", "aws-smithy-types", "bytes", - "fastrand", + "fastrand 1.9.0", "http", "http-body", "hyper", @@ -1029,7 +1029,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1f34ba9a9bcb8645379e9de8cb3ecfcf4d1c85ba66d90deb3259206fa5aa193b" dependencies = [ "quote", - "syn 2.0.25", + "syn 2.0.26", ] [[package]] @@ -1069,6 +1069,7 @@ dependencies = [ "flate2", "futures", "glob", + "half", "hashbrown 0.14.0", "indexmap 2.0.0", "itertools 0.11.0", @@ -1374,6 +1375,12 @@ dependencies = [ "instant", ] +[[package]] +name = "fastrand" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6999dc1837253364c2ebb0704ba97994bd874e8f195d665c50b7548f6ea92764" + [[package]] name = "fd-lock" version = "3.0.13" @@ -1381,7 +1388,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ef033ed5e9bad94e55838ca0ca906db0e043f517adda0c8b79c7a8c66c93c1b5" dependencies = [ "cfg-if", - "rustix 0.38.4", + "rustix", "windows-sys", ] @@ -1491,7 +1498,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.25", + "syn 2.0.26", ] [[package]] @@ -1806,17 +1813,6 @@ version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" -[[package]] -name = "io-lifetimes" -version = "1.0.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2" -dependencies = [ - "hermit-abi 0.3.2", - "libc", - "windows-sys", -] - [[package]] name = "ipnet" version = "2.8.0" @@ -1843,9 +1839,9 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.8" +version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62b02a5381cc465bd3041d84623d0fa3b66738b52b8e2fc3bab8ad63ab032f4a" +checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38" [[package]] name = "jobserver" @@ -1957,12 +1953,6 @@ dependencies = [ "libc", ] -[[package]] -name = "linux-raw-sys" -version = "0.3.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" - [[package]] name = "linux-raw-sys" version = "0.4.3" @@ -2320,9 +2310,9 @@ dependencies = [ [[package]] name = "paste" -version = "1.0.13" +version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4b27ab7be369122c218afc2079489cdcb4b517c0a3fc386ff11e1fedfcc2b35" +checksum = "de3145af08024dea9fa9914f381a17b8fc6034dfb00f3a84013f7ff43f29ed4c" [[package]] name = "percent-encoding" @@ -2395,7 +2385,7 @@ checksum = "ec2e072ecce94ec471b13398d5402c188e76ac03cf74dd1a975161b23a3f6d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.25", + "syn 2.0.26", ] [[package]] @@ -2485,9 +2475,9 @@ checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" [[package]] name = "proc-macro2" -version = "1.0.64" +version = "1.0.66" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78803b62cbf1f46fde80d7c0e803111524b9877184cfe7c3033659490ac7a7da" +checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9" dependencies = [ "unicode-ident", ] @@ -2504,9 +2494,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.29" +version = "1.0.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "573015e8ab27661678357f27dc26460738fd2b6c86e46f386fde94cb5d913105" +checksum = "5fe8a65d69dd0808184ebb5f836ab526bb259db23c657efa38711b1072ee47f0" dependencies = [ "proc-macro2", ] @@ -2706,20 +2696,6 @@ dependencies = [ "semver", ] -[[package]] -name = "rustix" -version = "0.37.23" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d69718bf81c6127a49dc64e44a742e8bb9213c0ff8869a22c308f84c1d4ab06" -dependencies = [ - "bitflags 1.3.2", - "errno", - "io-lifetimes", - "libc", - "linux-raw-sys 0.3.8", - "windows-sys", -] - [[package]] name = "rustix" version = "0.38.4" @@ -2729,7 +2705,7 @@ dependencies = [ "bitflags 2.3.3", "errno", "libc", - "linux-raw-sys 0.4.3", + "linux-raw-sys", "windows-sys", ] @@ -2790,9 +2766,9 @@ dependencies = [ [[package]] name = "rustversion" -version = "1.0.13" +version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc31bd9b61a32c31f9650d18add92aa83a49ba979c143eefd27fe7177b05bd5f" +checksum = "7ffc183a10b4478d04cbbbfc96d0873219d962dd5accaff2ffbd4ceb7df837f4" [[package]] name = "rustyline" @@ -2819,9 +2795,9 @@ dependencies = [ [[package]] name = "ryu" -version = "1.0.14" +version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe232bdf6be8c8de797b22184ee71118d63780ea42ac85b61d1baa6d3b782ae9" +checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741" [[package]] name = "same-file" @@ -2843,9 +2819,9 @@ dependencies = [ [[package]] name = "scopeguard" -version = "1.1.0" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" [[package]] name = "sct" @@ -2882,41 +2858,41 @@ dependencies = [ [[package]] name = "semver" -version = "1.0.17" +version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bebd363326d05ec3e2f532ab7660680f3b02130d780c299bca73469d521bc0ed" +checksum = "b0293b4b29daaf487284529cc2f5675b8e57c61f70167ba415a463651fd6a918" [[package]] name = "seq-macro" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "63134939175b3131fe4d2c131b103fd42f25ccca89423d43b5e4f267920ccf03" +checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" [[package]] name = "serde" -version = "1.0.171" +version = "1.0.173" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30e27d1e4fd7659406c492fd6cfaf2066ba8773de45ca75e855590f856dc34a9" +checksum = "e91f70896d6720bc714a4a57d22fc91f1db634680e65c8efe13323f1fa38d53f" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.171" +version = "1.0.173" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "389894603bd18c46fa56231694f8d827779c0951a667087194cf9de94ed24682" +checksum = "a6250dde8342e0232232be9ca3db7aa40aceb5a3e5dd9bddbc00d99a007cde49" dependencies = [ "proc-macro2", "quote", - "syn 2.0.25", + "syn 2.0.26", ] [[package]] name = "serde_json" -version = "1.0.102" +version = "1.0.103" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5062a995d481b2308b6064e9af76011f2921c35f97b0468811ed9f6cd91dfed" +checksum = "d03b412469450d4404fe8499a268edd7f8b79fecb074b0d812ad64ca21f4031b" dependencies = [ "itoa", "ryu", @@ -3088,7 +3064,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.25", + "syn 2.0.26", ] [[package]] @@ -3110,9 +3086,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.25" +version = "2.0.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15e3fc8c0c74267e2df136e5e5fb656a464158aa57624053375eb9c8c6e25ae2" +checksum = "45c3457aacde3c65315de5031ec191ce46604304d2446e803d71ade03308d970" dependencies = [ "proc-macro2", "quote", @@ -3121,15 +3097,14 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.6.0" +version = "3.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31c0432476357e58790aaa47a8efb0c5138f137343f3b5f23bd36a27e3b0a6d6" +checksum = "5486094ee78b2e5038a6382ed7645bc084dc2ec433426ca4c3cb61e2007b8998" dependencies = [ - "autocfg", "cfg-if", - "fastrand", + "fastrand 2.0.0", "redox_syscall 0.3.5", - "rustix 0.37.23", + "rustix", "windows-sys", ] @@ -3171,7 +3146,7 @@ checksum = "463fe12d7993d3b327787537ce8dd4dfa058de32fc2b195ef3cde03dc4771e8f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.25", + "syn 2.0.26", ] [[package]] @@ -3262,7 +3237,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.25", + "syn 2.0.26", ] [[package]] @@ -3360,7 +3335,7 @@ checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" dependencies = [ "proc-macro2", "quote", - "syn 2.0.25", + "syn 2.0.26", ] [[package]] @@ -3402,9 +3377,9 @@ checksum = "92888ba5573ff080736b3648696b70cafad7d250551175acbaa4e0385b3e1460" [[package]] name = "unicode-ident" -version = "1.0.10" +version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22049a19f4a68748a168c0fc439f9516686aa045927ff767eca0a85101fb6e73" +checksum = "301abaae475aa91687eb82514b328ab47a211a533026cb25fc3e519b86adfc3c" [[package]] name = "unicode-normalization" @@ -3458,9 +3433,9 @@ checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" [[package]] name = "uuid" -version = "1.4.0" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d023da39d1fde5a8a3fe1f3e01ca9632ada0a63e9797de55a879d6e2236277be" +checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d" dependencies = [ "getrandom", ] @@ -3532,7 +3507,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.25", + "syn 2.0.26", "wasm-bindgen-shared", ] @@ -3566,7 +3541,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.25", + "syn 2.0.26", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -3757,18 +3732,18 @@ checksum = "2a0956f1ba7c7909bfb66c2e9e4124ab6f6482560f6628b5aaeba39207c9aad9" [[package]] name = "zstd" -version = "0.12.3+zstd.1.5.2" +version = "0.12.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76eea132fb024e0e13fd9c2f5d5d595d8a967aa72382ac2f9d39fcc95afd0806" +checksum = "1a27595e173641171fc74a1232b7b1c7a7cb6e18222c11e9dfb9888fa424c53c" dependencies = [ "zstd-safe", ] [[package]] name = "zstd-safe" -version = "6.0.5+zstd.1.5.4" +version = "6.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d56d9e60b4b1758206c238a10165fbcae3ca37b01744e394c463463f6529d23b" +checksum = "ee98ffd0b48ee95e6c5168188e44a54550b1564d9d530ee21d5f0eaed1069581" dependencies = [ "libc", "zstd-sys", From 6654876dbd3786a1d023ff7671d10604c04e7ec1 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 20 Jul 2023 17:31:06 -0500 Subject: [PATCH 7/8] Make Cargo.toml order 'just so' --- datafusion/core/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 542d8c032c78..e729a6056178 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -72,8 +72,8 @@ datafusion-sql = { path = "../sql", version = "27.0.0" } flate2 = { version = "1.0.24", optional = true } futures = "0.3" glob = "0.3.0" -hashbrown = { version = "0.14", features = ["raw"] } half = { version = "2.1", default-features = false } +hashbrown = { version = "0.14", features = ["raw"] } indexmap = "2.0.0" itertools = "0.11" lazy_static = { version = "^1.4.0" } From daf408c0d6d1f8f1d99a49c22c90e5e01231cb96 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 21 Jul 2023 11:51:06 -0400 Subject: [PATCH 8/8] Review feedback --- .../aggregates/group_values/mod.rs | 1 - .../aggregates/group_values/primitive.rs | 20 ++++++++++++++++++- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/physical_plan/aggregates/group_values/mod.rs b/datafusion/core/src/physical_plan/aggregates/group_values/mod.rs index af8872e93176..46f372b6ad28 100644 --- a/datafusion/core/src/physical_plan/aggregates/group_values/mod.rs +++ b/datafusion/core/src/physical_plan/aggregates/group_values/mod.rs @@ -54,7 +54,6 @@ pub fn new_group_values(schema: SchemaRef) -> Result> { }; } - // TODO: More primitives downcast_primitive! { d => (downcast_helper, d), _ => {} diff --git a/datafusion/core/src/physical_plan/aggregates/group_values/primitive.rs b/datafusion/core/src/physical_plan/aggregates/group_values/primitive.rs index f93237ecdee1..7b8691c67fdd 100644 --- a/datafusion/core/src/physical_plan/aggregates/group_values/primitive.rs +++ b/datafusion/core/src/physical_plan/aggregates/group_values/primitive.rs @@ -38,9 +38,15 @@ trait HashValue { macro_rules! hash_integer { ($($t:ty),+) => { $(impl HashValue for $t { + #[cfg(not(feature = "force_hash_collisions"))] fn hash(self, state: &RandomState) -> u64 { state.hash_one(self) } + + #[cfg(feature = "force_hash_collisions")] + fn hash(self, _state: &RandomState) -> u64 { + 0 + } })+ }; } @@ -50,20 +56,32 @@ hash_integer!(u8, u16, u32, u64); macro_rules! hash_float { ($($t:ty),+) => { $(impl HashValue for $t { + #[cfg(not(feature = "force_hash_collisions"))] fn hash(self, state: &RandomState) -> u64 { state.hash_one(self.to_bits()) } + + #[cfg(feature = "force_hash_collisions")] + fn hash(self, _state: &RandomState) -> u64 { + 0 + } })+ }; } hash_float!(f16, f32, f64); -/// A [`GroupValues`] storing raw primitive values +/// A [`GroupValues`] storing a single column of primitive values +/// +/// This specialization is significantly faster than using the more general +/// purpose `Row`s format pub struct GroupValuesPrimitive { /// The data type of the output array data_type: DataType, /// Stores the group index based on the hash of its value + /// + /// We don't store the hashes as hashing fixed width primitives + /// is fast enough for this not to benefit performance map: RawTable, /// The group index of the null value if any null_group: Option,