Skip to content

Commit

Permalink
feat: Support faster multi-column grouping ( GroupColumn) for `Date…
Browse files Browse the repository at this point in the history
…/Time/Timestamp` types (#13457)

* feat: Add `GroupColumn` for `Date/Time/Timestamp`

* Add tests
  • Loading branch information
jonathanc-n authored Nov 20, 2024
1 parent 8ce4da6 commit ecc04d4
Show file tree
Hide file tree
Showing 3 changed files with 263 additions and 3 deletions.
28 changes: 28 additions & 0 deletions datafusion/physical-plan/src/aggregates/group_values/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@
//! [`GroupValues`] trait for storing and interning group keys
use arrow::record_batch::RecordBatch;
use arrow_array::types::{
Date32Type, Date64Type, Time32MillisecondType, Time32SecondType,
Time64MicrosecondType, Time64NanosecondType, TimestampMicrosecondType,
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
};
use arrow_array::{downcast_primitive, ArrayRef};
use arrow_schema::TimeUnit;
use arrow_schema::{DataType, SchemaRef};
use datafusion_common::Result;

Expand Down Expand Up @@ -142,6 +148,28 @@ pub(crate) fn new_group_values(
}

match d {
DataType::Date32 => {
downcast_helper!(Date32Type, d);
}
DataType::Date64 => {
downcast_helper!(Date64Type, d);
}
DataType::Time32(t) => match t {
TimeUnit::Second => downcast_helper!(Time32SecondType, d),
TimeUnit::Millisecond => downcast_helper!(Time32MillisecondType, d),
_ => {}
},
DataType::Time64(t) => match t {
TimeUnit::Microsecond => downcast_helper!(Time64MicrosecondType, d),
TimeUnit::Nanosecond => downcast_helper!(Time64NanosecondType, d),
_ => {}
},
DataType::Timestamp(t, _) => match t {
TimeUnit::Second => downcast_helper!(TimestampSecondType, d),
TimeUnit::Millisecond => downcast_helper!(TimestampMillisecondType, d),
TimeUnit::Microsecond => downcast_helper!(TimestampMicrosecondType, d),
TimeUnit::Nanosecond => downcast_helper!(TimestampNanosecondType, d),
},
DataType::Utf8 => {
return Ok(Box::new(GroupValuesByes::<i32>::new(OutputType::Utf8)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ use ahash::RandomState;
use arrow::compute::cast;
use arrow::datatypes::{
BinaryViewType, Date32Type, Date64Type, Float32Type, Float64Type, Int16Type,
Int32Type, Int64Type, Int8Type, StringViewType, UInt16Type, UInt32Type, UInt64Type,
UInt8Type,
Int32Type, Int64Type, Int8Type, StringViewType, Time32MillisecondType,
Time32SecondType, Time64MicrosecondType, Time64NanosecondType,
TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
};
use arrow::record_batch::RecordBatch;
use arrow_array::{Array, ArrayRef};
use arrow_schema::{DataType, Schema, SchemaRef};
use arrow_schema::{DataType, Schema, SchemaRef, TimeUnit};
use datafusion_common::hash_utils::create_hashes;
use datafusion_common::{not_impl_err, DataFusionError, Result};
use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
Expand Down Expand Up @@ -913,6 +915,38 @@ impl<const STREAMING: bool> GroupValues for GroupValuesColumn<STREAMING> {
}
&DataType::Date32 => instantiate_primitive!(v, nullable, Date32Type),
&DataType::Date64 => instantiate_primitive!(v, nullable, Date64Type),
&DataType::Time32(t) => match t {
TimeUnit::Second => {
instantiate_primitive!(v, nullable, Time32SecondType)
}
TimeUnit::Millisecond => {
instantiate_primitive!(v, nullable, Time32MillisecondType)
}
_ => {}
},
&DataType::Time64(t) => match t {
TimeUnit::Microsecond => {
instantiate_primitive!(v, nullable, Time64MicrosecondType)
}
TimeUnit::Nanosecond => {
instantiate_primitive!(v, nullable, Time64NanosecondType)
}
_ => {}
},
&DataType::Timestamp(t, _) => match t {
TimeUnit::Second => {
instantiate_primitive!(v, nullable, TimestampSecondType)
}
TimeUnit::Millisecond => {
instantiate_primitive!(v, nullable, TimestampMillisecondType)
}
TimeUnit::Microsecond => {
instantiate_primitive!(v, nullable, TimestampMicrosecondType)
}
TimeUnit::Nanosecond => {
instantiate_primitive!(v, nullable, TimestampNanosecondType)
}
},
&DataType::Utf8 => {
let b = ByteGroupValueBuilder::<i32>::new(OutputType::Utf8);
v.push(Box::new(b) as _)
Expand Down Expand Up @@ -1125,6 +1159,8 @@ fn supported_type(data_type: &DataType) -> bool {
| DataType::LargeBinary
| DataType::Date32
| DataType::Date64
| DataType::Time32(_)
| DataType::Timestamp(_, _)
| DataType::Utf8View
| DataType::BinaryView
)
Expand Down
196 changes: 196 additions & 0 deletions datafusion/sqllogictest/test_files/group_by.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5272,6 +5272,201 @@ drop view t
statement ok
drop table source;

# Test multi group by int + Date32
statement ok
create table source as values
(1, '2020-01-01'),
(1, '2020-01-01'),
(2, '2020-01-02'),
(2, '2020-01-03'),
(3, '2020-01-04'),
(3, '2020-01-04'),
(2, '2020-01-03'),
(null, null),
(null, '2020-01-01'),
(null, null),
(null, '2020-01-01'),
(2, '2020-01-02'),
(2, '2020-01-02'),
(1, null)
;

statement ok
create view t as select column1 as a, arrow_cast(column2, 'Date32') as b from source;

query IDI
select a, b, count(*) from t group by a, b order by a, b;
----
1 2020-01-01 2
1 NULL 1
2 2020-01-02 3
2 2020-01-03 2
3 2020-01-04 2
NULL 2020-01-01 2
NULL NULL 2

statement ok
drop view t

statement ok
drop table source;

# Test multi group by int + Date64
statement ok
create table source as values
(1, '2020-01-01'),
(1, '2020-01-01'),
(2, '2020-01-02'),
(2, '2020-01-03'),
(3, '2020-01-04'),
(3, '2020-01-04'),
(2, '2020-01-03'),
(null, null),
(null, '2020-01-01'),
(null, null),
(null, '2020-01-01'),
(2, '2020-01-02'),
(2, '2020-01-02'),
(1, null)
;

statement ok
create view t as select column1 as a, arrow_cast(column2, 'Date64') as b from source;

query IDI
select a, b, count(*) from t group by a, b order by a, b;
----
1 2020-01-01T00:00:00 2
1 NULL 1
2 2020-01-02T00:00:00 3
2 2020-01-03T00:00:00 2
3 2020-01-04T00:00:00 2
NULL 2020-01-01T00:00:00 2
NULL NULL 2

statement ok
drop view t

statement ok
drop table source;

# Test multi group by int + Time32
statement ok
create table source as values
(1, '12:34:56'),
(1, '12:34:56'),
(2, '13:00:00'),
(2, '14:15:00'),
(3, '23:59:59'),
(3, '23:59:59'),
(2, '14:15:00'),
(null, null),
(null, '12:00:00'),
(null, null),
(null, '12:00:00'),
(2, '13:00:00'),
(2, '13:00:00'),
(1, null)
;

statement ok
create view t as select column1 as a, arrow_cast(column2, 'Time32(Second)') as b from source;

query IDI
select a, b, count(*) from t group by a, b order by a, b;
----
1 12:34:56 2
1 NULL 1
2 13:00:00 3
2 14:15:00 2
3 23:59:59 2
NULL 12:00:00 2
NULL NULL 2

statement ok
drop view t

statement ok
drop table source;

# Test multi group by int + Time64
statement ok
create table source as values
(1, '12:34:56.123456'),
(1, '12:34:56.123456'),
(2, '13:00:00.000001'),
(2, '14:15:00.999999'),
(3, '23:59:59.500000'),
(3, '23:59:59.500000'),
(2, '14:15:00.999999'),
(null, null),
(null, '12:00:00.000000'),
(null, null),
(null, '12:00:00.000000'),
(2, '13:00:00.000001'),
(2, '13:00:00.000001'),
(1, null)
;

statement ok
create view t as select column1 as a, arrow_cast(column2, 'Time64(Microsecond)') as b from source;

query IDI
select a, b, count(*) from t group by a, b order by a, b;
----
1 12:34:56.123456 2
1 NULL 1
2 13:00:00.000001 3
2 14:15:00.999999 2
3 23:59:59.500 2
NULL 12:00:00 2
NULL NULL 2

statement ok
drop view t

statement ok
drop table source;

# Test multi group by int + Timestamp
statement ok
create table source as values
(1, '2020-01-01 12:34:56'),
(1, '2020-01-01 12:34:56'),
(2, '2020-01-02 13:00:00'),
(2, '2020-01-03 14:15:00'),
(3, '2020-01-04 23:59:59'),
(3, '2020-01-04 23:59:59'),
(2, '2020-01-03 14:15:00'),
(null, null),
(null, '2020-01-01 12:00:00'),
(null, null),
(null, '2020-01-01 12:00:00'),
(2, '2020-01-02 13:00:00'),
(2, '2020-01-02 13:00:00'),
(1, null)
;

statement ok
create view t as select column1 as a, arrow_cast(column2, 'Timestamp(Nanosecond, None)') as b from source;

query IPI
select a, b, count(*) from t group by a, b order by a, b;
----
1 2020-01-01T12:34:56 2
1 NULL 1
2 2020-01-02T13:00:00 3
2 2020-01-03T14:15:00 2
3 2020-01-04T23:59:59 2
NULL 2020-01-01T12:00:00 2
NULL NULL 2

statement ok
drop view t

statement ok
drop table source;

# Test whether min, max accumulator produces NaN result when input is NaN.
# See https://github.com/apache/datafusion/issues/13415 for rationale
statement ok
Expand All @@ -5287,3 +5482,4 @@ query RR
SELECT max(input_table.x), min(input_table.x) from input_table GROUP BY input_table."row";
----
NaN NaN

0 comments on commit ecc04d4

Please sign in to comment.