Skip to content

Commit

Permalink
[MINOR]: Extract aggregate topk function to aggregate_topk.slt (apa…
Browse files Browse the repository at this point in the history
…che#8948)

* Add new orders to make possible result unique

* create a new file for topk aggregate tests

* Move topk aggregate tests

* Open flag
  • Loading branch information
mustafasrepo authored Jan 23, 2024
1 parent 084fdfb commit 04e147b
Show file tree
Hide file tree
Showing 2 changed files with 212 additions and 192 deletions.
192 changes: 0 additions & 192 deletions datafusion/sqllogictest/test_files/aggregate.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2515,198 +2515,6 @@ false
true
NULL

# TopK aggregation
statement ok
CREATE TABLE traces(trace_id varchar, timestamp bigint, other bigint) AS VALUES
(NULL, 0, 0),
('a', NULL, NULL),
('a', 1, 1),
('a', -1, -1),
('b', 0, 0),
('c', 1, 1),
('c', 2, 2),
('b', 3, 3);

statement ok
set datafusion.optimizer.enable_topk_aggregation = false;

query TT
explain select trace_id, MAX(timestamp) from traces group by trace_id order by MAX(timestamp) desc limit 4;
----
logical_plan
Limit: skip=0, fetch=4
--Sort: MAX(traces.timestamp) DESC NULLS FIRST, fetch=4
----Aggregate: groupBy=[[traces.trace_id]], aggr=[[MAX(traces.timestamp)]]
------TableScan: traces projection=[trace_id, timestamp]
physical_plan
GlobalLimitExec: skip=0, fetch=4
--SortPreservingMergeExec: [MAX(traces.timestamp)@1 DESC], fetch=4
----SortExec: TopK(fetch=4), expr=[MAX(traces.timestamp)@1 DESC]
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4
------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
----------------MemoryExec: partitions=1, partition_sizes=[1]


query TI
select trace_id, MAX(timestamp) from traces group by trace_id order by MAX(timestamp) desc limit 4;
----
b 3
c 2
a 1
NULL 0

query TI
select trace_id, MIN(timestamp) from traces group by trace_id order by MIN(timestamp) asc limit 4;
----
a -1
NULL 0
b 0
c 1

query TII
select trace_id, other, MIN(timestamp) from traces group by trace_id, other order by MIN(timestamp) asc limit 4;
----
a -1 -1
b 0 0
NULL 0 0
c 1 1

query TII
select trace_id, MIN(other), MIN(timestamp) from traces group by trace_id order by MIN(timestamp), MIN(other) limit 4;
----
a -1 -1
NULL 0 0
b 0 0
c 1 1

statement ok
set datafusion.optimizer.enable_topk_aggregation = true;

query TT
explain select trace_id, MAX(timestamp) from traces group by trace_id order by MAX(timestamp) desc limit 4;
----
logical_plan
Limit: skip=0, fetch=4
--Sort: MAX(traces.timestamp) DESC NULLS FIRST, fetch=4
----Aggregate: groupBy=[[traces.trace_id]], aggr=[[MAX(traces.timestamp)]]
------TableScan: traces projection=[trace_id, timestamp]
physical_plan
GlobalLimitExec: skip=0, fetch=4
--SortPreservingMergeExec: [MAX(traces.timestamp)@1 DESC], fetch=4
----SortExec: TopK(fetch=4), expr=[MAX(traces.timestamp)@1 DESC]
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)], lim=[4]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4
------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)], lim=[4]
----------------MemoryExec: partitions=1, partition_sizes=[1]

query TT
explain select trace_id, MIN(timestamp) from traces group by trace_id order by MIN(timestamp) desc limit 4;
----
logical_plan
Limit: skip=0, fetch=4
--Sort: MIN(traces.timestamp) DESC NULLS FIRST, fetch=4
----Aggregate: groupBy=[[traces.trace_id]], aggr=[[MIN(traces.timestamp)]]
------TableScan: traces projection=[trace_id, timestamp]
physical_plan
GlobalLimitExec: skip=0, fetch=4
--SortPreservingMergeExec: [MIN(traces.timestamp)@1 DESC], fetch=4
----SortExec: TopK(fetch=4), expr=[MIN(traces.timestamp)@1 DESC]
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MIN(traces.timestamp)]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4
------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MIN(traces.timestamp)]
----------------MemoryExec: partitions=1, partition_sizes=[1]

query TT
explain select trace_id, MAX(timestamp) from traces group by trace_id order by MAX(timestamp) asc limit 4;
----
logical_plan
Limit: skip=0, fetch=4
--Sort: MAX(traces.timestamp) ASC NULLS LAST, fetch=4
----Aggregate: groupBy=[[traces.trace_id]], aggr=[[MAX(traces.timestamp)]]
------TableScan: traces projection=[trace_id, timestamp]
physical_plan
GlobalLimitExec: skip=0, fetch=4
--SortPreservingMergeExec: [MAX(traces.timestamp)@1 ASC NULLS LAST], fetch=4
----SortExec: TopK(fetch=4), expr=[MAX(traces.timestamp)@1 ASC NULLS LAST]
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4
------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
----------------MemoryExec: partitions=1, partition_sizes=[1]

query TT
explain select trace_id, MAX(timestamp) from traces group by trace_id order by trace_id asc limit 4;
----
logical_plan
Limit: skip=0, fetch=4
--Sort: traces.trace_id ASC NULLS LAST, fetch=4
----Aggregate: groupBy=[[traces.trace_id]], aggr=[[MAX(traces.timestamp)]]
------TableScan: traces projection=[trace_id, timestamp]
physical_plan
GlobalLimitExec: skip=0, fetch=4
--SortPreservingMergeExec: [trace_id@0 ASC NULLS LAST], fetch=4
----SortExec: TopK(fetch=4), expr=[trace_id@0 ASC NULLS LAST]
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4
------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
----------------MemoryExec: partitions=1, partition_sizes=[1]

query TI
select trace_id, MAX(timestamp) from traces group by trace_id order by MAX(timestamp) desc limit 4;
----
b 3
c 2
a 1
NULL 0

query TI
select trace_id, MIN(timestamp) from traces group by trace_id order by MIN(timestamp) asc limit 4;
----
a -1
NULL 0
b 0
c 1

query TI
select trace_id, MAX(timestamp) from traces group by trace_id order by MAX(timestamp) desc limit 3;
----
b 3
c 2
a 1

query TI
select trace_id, MIN(timestamp) from traces group by trace_id order by MIN(timestamp) asc limit 3;
----
a -1
NULL 0
b 0

query TII
select trace_id, other, MIN(timestamp) from traces group by trace_id, other order by MIN(timestamp) asc limit 4;
----
a -1 -1
b 0 0
NULL 0 0
c 1 1

query TII
select trace_id, MIN(other), MIN(timestamp) from traces group by trace_id order by MIN(timestamp), MIN(other) limit 4;
----
a -1 -1
NULL 0 0
b 0 0
c 1 1

#
# Push limit into distinct group-by aggregation tests
#
Expand Down
Loading

0 comments on commit 04e147b

Please sign in to comment.