From ecc04d4af85a29111a1598e615350fea84e60fcb Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Wed, 20 Nov 2024 06:59:21 -0500 Subject: [PATCH] feat: Support faster multi-column grouping ( `GroupColumn`) for `Date/Time/Timestamp` types (#13457) * feat: Add `GroupColumn` for `Date/Time/Timestamp` * Add tests --- .../src/aggregates/group_values/mod.rs | 28 +++ .../group_values/multi_group_by/mod.rs | 42 +++- .../sqllogictest/test_files/group_by.slt | 196 ++++++++++++++++++ 3 files changed, 263 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/mod.rs index a816203b6812..ae528daad53c 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/mod.rs @@ -18,7 +18,13 @@ //! [`GroupValues`] trait for storing and interning group keys use arrow::record_batch::RecordBatch; +use arrow_array::types::{ + Date32Type, Date64Type, Time32MillisecondType, Time32SecondType, + Time64MicrosecondType, Time64NanosecondType, TimestampMicrosecondType, + TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, +}; use arrow_array::{downcast_primitive, ArrayRef}; +use arrow_schema::TimeUnit; use arrow_schema::{DataType, SchemaRef}; use datafusion_common::Result; @@ -142,6 +148,28 @@ pub(crate) fn new_group_values( } match d { + DataType::Date32 => { + downcast_helper!(Date32Type, d); + } + DataType::Date64 => { + downcast_helper!(Date64Type, d); + } + DataType::Time32(t) => match t { + TimeUnit::Second => downcast_helper!(Time32SecondType, d), + TimeUnit::Millisecond => downcast_helper!(Time32MillisecondType, d), + _ => {} + }, + DataType::Time64(t) => match t { + TimeUnit::Microsecond => downcast_helper!(Time64MicrosecondType, d), + TimeUnit::Nanosecond => downcast_helper!(Time64NanosecondType, d), + _ => {} + }, + DataType::Timestamp(t, _) => match t { + TimeUnit::Second => downcast_helper!(TimestampSecondType, d), + TimeUnit::Millisecond => downcast_helper!(TimestampMillisecondType, d), + TimeUnit::Microsecond => downcast_helper!(TimestampMicrosecondType, d), + TimeUnit::Nanosecond => downcast_helper!(TimestampNanosecondType, d), + }, DataType::Utf8 => { return Ok(Box::new(GroupValuesByes::::new(OutputType::Utf8))); } diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs index 83b0f9d77369..10b00cf74fdb 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs @@ -32,12 +32,14 @@ use ahash::RandomState; use arrow::compute::cast; use arrow::datatypes::{ BinaryViewType, Date32Type, Date64Type, Float32Type, Float64Type, Int16Type, - Int32Type, Int64Type, Int8Type, StringViewType, UInt16Type, UInt32Type, UInt64Type, - UInt8Type, + Int32Type, Int64Type, Int8Type, StringViewType, Time32MillisecondType, + Time32SecondType, Time64MicrosecondType, Time64NanosecondType, + TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, + TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, }; use arrow::record_batch::RecordBatch; use arrow_array::{Array, ArrayRef}; -use arrow_schema::{DataType, Schema, SchemaRef}; +use arrow_schema::{DataType, Schema, SchemaRef, TimeUnit}; use datafusion_common::hash_utils::create_hashes; use datafusion_common::{not_impl_err, DataFusionError, Result}; use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt}; @@ -913,6 +915,38 @@ impl GroupValues for GroupValuesColumn { } &DataType::Date32 => instantiate_primitive!(v, nullable, Date32Type), &DataType::Date64 => instantiate_primitive!(v, nullable, Date64Type), + &DataType::Time32(t) => match t { + TimeUnit::Second => { + instantiate_primitive!(v, nullable, Time32SecondType) + } + TimeUnit::Millisecond => { + instantiate_primitive!(v, nullable, Time32MillisecondType) + } + _ => {} + }, + &DataType::Time64(t) => match t { + TimeUnit::Microsecond => { + instantiate_primitive!(v, nullable, Time64MicrosecondType) + } + TimeUnit::Nanosecond => { + instantiate_primitive!(v, nullable, Time64NanosecondType) + } + _ => {} + }, + &DataType::Timestamp(t, _) => match t { + TimeUnit::Second => { + instantiate_primitive!(v, nullable, TimestampSecondType) + } + TimeUnit::Millisecond => { + instantiate_primitive!(v, nullable, TimestampMillisecondType) + } + TimeUnit::Microsecond => { + instantiate_primitive!(v, nullable, TimestampMicrosecondType) + } + TimeUnit::Nanosecond => { + instantiate_primitive!(v, nullable, TimestampNanosecondType) + } + }, &DataType::Utf8 => { let b = ByteGroupValueBuilder::::new(OutputType::Utf8); v.push(Box::new(b) as _) @@ -1125,6 +1159,8 @@ fn supported_type(data_type: &DataType) -> bool { | DataType::LargeBinary | DataType::Date32 | DataType::Date64 + | DataType::Time32(_) + | DataType::Timestamp(_, _) | DataType::Utf8View | DataType::BinaryView ) diff --git a/datafusion/sqllogictest/test_files/group_by.slt b/datafusion/sqllogictest/test_files/group_by.slt index 391f84836871..f74e1006f7f6 100644 --- a/datafusion/sqllogictest/test_files/group_by.slt +++ b/datafusion/sqllogictest/test_files/group_by.slt @@ -5272,6 +5272,201 @@ drop view t statement ok drop table source; +# Test multi group by int + Date32 +statement ok +create table source as values +(1, '2020-01-01'), +(1, '2020-01-01'), +(2, '2020-01-02'), +(2, '2020-01-03'), +(3, '2020-01-04'), +(3, '2020-01-04'), +(2, '2020-01-03'), +(null, null), +(null, '2020-01-01'), +(null, null), +(null, '2020-01-01'), +(2, '2020-01-02'), +(2, '2020-01-02'), +(1, null) +; + +statement ok +create view t as select column1 as a, arrow_cast(column2, 'Date32') as b from source; + +query IDI +select a, b, count(*) from t group by a, b order by a, b; +---- +1 2020-01-01 2 +1 NULL 1 +2 2020-01-02 3 +2 2020-01-03 2 +3 2020-01-04 2 +NULL 2020-01-01 2 +NULL NULL 2 + +statement ok +drop view t + +statement ok +drop table source; + +# Test multi group by int + Date64 +statement ok +create table source as values +(1, '2020-01-01'), +(1, '2020-01-01'), +(2, '2020-01-02'), +(2, '2020-01-03'), +(3, '2020-01-04'), +(3, '2020-01-04'), +(2, '2020-01-03'), +(null, null), +(null, '2020-01-01'), +(null, null), +(null, '2020-01-01'), +(2, '2020-01-02'), +(2, '2020-01-02'), +(1, null) +; + +statement ok +create view t as select column1 as a, arrow_cast(column2, 'Date64') as b from source; + +query IDI +select a, b, count(*) from t group by a, b order by a, b; +---- +1 2020-01-01T00:00:00 2 +1 NULL 1 +2 2020-01-02T00:00:00 3 +2 2020-01-03T00:00:00 2 +3 2020-01-04T00:00:00 2 +NULL 2020-01-01T00:00:00 2 +NULL NULL 2 + +statement ok +drop view t + +statement ok +drop table source; + +# Test multi group by int + Time32 +statement ok +create table source as values +(1, '12:34:56'), +(1, '12:34:56'), +(2, '13:00:00'), +(2, '14:15:00'), +(3, '23:59:59'), +(3, '23:59:59'), +(2, '14:15:00'), +(null, null), +(null, '12:00:00'), +(null, null), +(null, '12:00:00'), +(2, '13:00:00'), +(2, '13:00:00'), +(1, null) +; + +statement ok +create view t as select column1 as a, arrow_cast(column2, 'Time32(Second)') as b from source; + +query IDI +select a, b, count(*) from t group by a, b order by a, b; +---- +1 12:34:56 2 +1 NULL 1 +2 13:00:00 3 +2 14:15:00 2 +3 23:59:59 2 +NULL 12:00:00 2 +NULL NULL 2 + +statement ok +drop view t + +statement ok +drop table source; + +# Test multi group by int + Time64 +statement ok +create table source as values +(1, '12:34:56.123456'), +(1, '12:34:56.123456'), +(2, '13:00:00.000001'), +(2, '14:15:00.999999'), +(3, '23:59:59.500000'), +(3, '23:59:59.500000'), +(2, '14:15:00.999999'), +(null, null), +(null, '12:00:00.000000'), +(null, null), +(null, '12:00:00.000000'), +(2, '13:00:00.000001'), +(2, '13:00:00.000001'), +(1, null) +; + +statement ok +create view t as select column1 as a, arrow_cast(column2, 'Time64(Microsecond)') as b from source; + +query IDI +select a, b, count(*) from t group by a, b order by a, b; +---- +1 12:34:56.123456 2 +1 NULL 1 +2 13:00:00.000001 3 +2 14:15:00.999999 2 +3 23:59:59.500 2 +NULL 12:00:00 2 +NULL NULL 2 + +statement ok +drop view t + +statement ok +drop table source; + +# Test multi group by int + Timestamp +statement ok +create table source as values +(1, '2020-01-01 12:34:56'), +(1, '2020-01-01 12:34:56'), +(2, '2020-01-02 13:00:00'), +(2, '2020-01-03 14:15:00'), +(3, '2020-01-04 23:59:59'), +(3, '2020-01-04 23:59:59'), +(2, '2020-01-03 14:15:00'), +(null, null), +(null, '2020-01-01 12:00:00'), +(null, null), +(null, '2020-01-01 12:00:00'), +(2, '2020-01-02 13:00:00'), +(2, '2020-01-02 13:00:00'), +(1, null) +; + +statement ok +create view t as select column1 as a, arrow_cast(column2, 'Timestamp(Nanosecond, None)') as b from source; + +query IPI +select a, b, count(*) from t group by a, b order by a, b; +---- +1 2020-01-01T12:34:56 2 +1 NULL 1 +2 2020-01-02T13:00:00 3 +2 2020-01-03T14:15:00 2 +3 2020-01-04T23:59:59 2 +NULL 2020-01-01T12:00:00 2 +NULL NULL 2 + +statement ok +drop view t + +statement ok +drop table source; + # Test whether min, max accumulator produces NaN result when input is NaN. # See https://github.com/apache/datafusion/issues/13415 for rationale statement ok @@ -5287,3 +5482,4 @@ query RR SELECT max(input_table.x), min(input_table.x) from input_table GROUP BY input_table."row"; ---- NaN NaN +