From 04e147b18ca446f44232e72f4899e660285f6344 Mon Sep 17 00:00:00 2001 From: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> Date: Tue, 23 Jan 2024 23:10:06 +0300 Subject: [PATCH] [MINOR]: Extract aggregate topk function to `aggregate_topk.slt` (#8948) * Add new orders to make possible result unique * create a new file for topk aggregate tests * Move topk aggregate tests * Open flag --- .../sqllogictest/test_files/aggregate.slt | 192 ---------------- .../test_files/aggregates_topk.slt | 212 ++++++++++++++++++ 2 files changed, 212 insertions(+), 192 deletions(-) create mode 100644 datafusion/sqllogictest/test_files/aggregates_topk.slt diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index a098c8de0d3c..e9c92f53e0fa 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -2515,198 +2515,6 @@ false true NULL -# TopK aggregation -statement ok -CREATE TABLE traces(trace_id varchar, timestamp bigint, other bigint) AS VALUES -(NULL, 0, 0), -('a', NULL, NULL), -('a', 1, 1), -('a', -1, -1), -('b', 0, 0), -('c', 1, 1), -('c', 2, 2), -('b', 3, 3); - -statement ok -set datafusion.optimizer.enable_topk_aggregation = false; - -query TT -explain select trace_id, MAX(timestamp) from traces group by trace_id order by MAX(timestamp) desc limit 4; ----- -logical_plan -Limit: skip=0, fetch=4 ---Sort: MAX(traces.timestamp) DESC NULLS FIRST, fetch=4 -----Aggregate: groupBy=[[traces.trace_id]], aggr=[[MAX(traces.timestamp)]] -------TableScan: traces projection=[trace_id, timestamp] -physical_plan -GlobalLimitExec: skip=0, fetch=4 ---SortPreservingMergeExec: [MAX(traces.timestamp)@1 DESC], fetch=4 -----SortExec: TopK(fetch=4), expr=[MAX(traces.timestamp)@1 DESC] -------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)] ---------CoalesceBatchesExec: target_batch_size=8192 -----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4 -------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 ---------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)] -----------------MemoryExec: partitions=1, partition_sizes=[1] - - -query TI -select trace_id, MAX(timestamp) from traces group by trace_id order by MAX(timestamp) desc limit 4; ----- -b 3 -c 2 -a 1 -NULL 0 - -query TI -select trace_id, MIN(timestamp) from traces group by trace_id order by MIN(timestamp) asc limit 4; ----- -a -1 -NULL 0 -b 0 -c 1 - -query TII -select trace_id, other, MIN(timestamp) from traces group by trace_id, other order by MIN(timestamp) asc limit 4; ----- -a -1 -1 -b 0 0 -NULL 0 0 -c 1 1 - -query TII -select trace_id, MIN(other), MIN(timestamp) from traces group by trace_id order by MIN(timestamp), MIN(other) limit 4; ----- -a -1 -1 -NULL 0 0 -b 0 0 -c 1 1 - -statement ok -set datafusion.optimizer.enable_topk_aggregation = true; - -query TT -explain select trace_id, MAX(timestamp) from traces group by trace_id order by MAX(timestamp) desc limit 4; ----- -logical_plan -Limit: skip=0, fetch=4 ---Sort: MAX(traces.timestamp) DESC NULLS FIRST, fetch=4 -----Aggregate: groupBy=[[traces.trace_id]], aggr=[[MAX(traces.timestamp)]] -------TableScan: traces projection=[trace_id, timestamp] -physical_plan -GlobalLimitExec: skip=0, fetch=4 ---SortPreservingMergeExec: [MAX(traces.timestamp)@1 DESC], fetch=4 -----SortExec: TopK(fetch=4), expr=[MAX(traces.timestamp)@1 DESC] -------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)], lim=[4] ---------CoalesceBatchesExec: target_batch_size=8192 -----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4 -------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 ---------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)], lim=[4] -----------------MemoryExec: partitions=1, partition_sizes=[1] - -query TT -explain select trace_id, MIN(timestamp) from traces group by trace_id order by MIN(timestamp) desc limit 4; ----- -logical_plan -Limit: skip=0, fetch=4 ---Sort: MIN(traces.timestamp) DESC NULLS FIRST, fetch=4 -----Aggregate: groupBy=[[traces.trace_id]], aggr=[[MIN(traces.timestamp)]] -------TableScan: traces projection=[trace_id, timestamp] -physical_plan -GlobalLimitExec: skip=0, fetch=4 ---SortPreservingMergeExec: [MIN(traces.timestamp)@1 DESC], fetch=4 -----SortExec: TopK(fetch=4), expr=[MIN(traces.timestamp)@1 DESC] -------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MIN(traces.timestamp)] ---------CoalesceBatchesExec: target_batch_size=8192 -----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4 -------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 ---------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MIN(traces.timestamp)] -----------------MemoryExec: partitions=1, partition_sizes=[1] - -query TT -explain select trace_id, MAX(timestamp) from traces group by trace_id order by MAX(timestamp) asc limit 4; ----- -logical_plan -Limit: skip=0, fetch=4 ---Sort: MAX(traces.timestamp) ASC NULLS LAST, fetch=4 -----Aggregate: groupBy=[[traces.trace_id]], aggr=[[MAX(traces.timestamp)]] -------TableScan: traces projection=[trace_id, timestamp] -physical_plan -GlobalLimitExec: skip=0, fetch=4 ---SortPreservingMergeExec: [MAX(traces.timestamp)@1 ASC NULLS LAST], fetch=4 -----SortExec: TopK(fetch=4), expr=[MAX(traces.timestamp)@1 ASC NULLS LAST] -------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)] ---------CoalesceBatchesExec: target_batch_size=8192 -----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4 -------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 ---------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)] -----------------MemoryExec: partitions=1, partition_sizes=[1] - -query TT -explain select trace_id, MAX(timestamp) from traces group by trace_id order by trace_id asc limit 4; ----- -logical_plan -Limit: skip=0, fetch=4 ---Sort: traces.trace_id ASC NULLS LAST, fetch=4 -----Aggregate: groupBy=[[traces.trace_id]], aggr=[[MAX(traces.timestamp)]] -------TableScan: traces projection=[trace_id, timestamp] -physical_plan -GlobalLimitExec: skip=0, fetch=4 ---SortPreservingMergeExec: [trace_id@0 ASC NULLS LAST], fetch=4 -----SortExec: TopK(fetch=4), expr=[trace_id@0 ASC NULLS LAST] -------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)] ---------CoalesceBatchesExec: target_batch_size=8192 -----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4 -------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 ---------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)] -----------------MemoryExec: partitions=1, partition_sizes=[1] - -query TI -select trace_id, MAX(timestamp) from traces group by trace_id order by MAX(timestamp) desc limit 4; ----- -b 3 -c 2 -a 1 -NULL 0 - -query TI -select trace_id, MIN(timestamp) from traces group by trace_id order by MIN(timestamp) asc limit 4; ----- -a -1 -NULL 0 -b 0 -c 1 - -query TI -select trace_id, MAX(timestamp) from traces group by trace_id order by MAX(timestamp) desc limit 3; ----- -b 3 -c 2 -a 1 - -query TI -select trace_id, MIN(timestamp) from traces group by trace_id order by MIN(timestamp) asc limit 3; ----- -a -1 -NULL 0 -b 0 - -query TII -select trace_id, other, MIN(timestamp) from traces group by trace_id, other order by MIN(timestamp) asc limit 4; ----- -a -1 -1 -b 0 0 -NULL 0 0 -c 1 1 - -query TII -select trace_id, MIN(other), MIN(timestamp) from traces group by trace_id order by MIN(timestamp), MIN(other) limit 4; ----- -a -1 -1 -NULL 0 0 -b 0 0 -c 1 1 - # # Push limit into distinct group-by aggregation tests # diff --git a/datafusion/sqllogictest/test_files/aggregates_topk.slt b/datafusion/sqllogictest/test_files/aggregates_topk.slt new file mode 100644 index 000000000000..6b6204e09f40 --- /dev/null +++ b/datafusion/sqllogictest/test_files/aggregates_topk.slt @@ -0,0 +1,212 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +####### +# Setup test data table +####### + +# TopK aggregation +statement ok +CREATE TABLE traces(trace_id varchar, timestamp bigint, other bigint) AS VALUES +(NULL, 0, 0), +('a', NULL, NULL), +('a', 1, 1), +('a', -1, -1), +('b', 0, 0), +('c', 1, 1), +('c', 2, 2), +('b', 3, 3); + +statement ok +set datafusion.optimizer.enable_topk_aggregation = false; + +query TT +explain select trace_id, MAX(timestamp) from traces group by trace_id order by MAX(timestamp) desc limit 4; +---- +logical_plan +Limit: skip=0, fetch=4 +--Sort: MAX(traces.timestamp) DESC NULLS FIRST, fetch=4 +----Aggregate: groupBy=[[traces.trace_id]], aggr=[[MAX(traces.timestamp)]] +------TableScan: traces projection=[trace_id, timestamp] +physical_plan +GlobalLimitExec: skip=0, fetch=4 +--SortPreservingMergeExec: [MAX(traces.timestamp)@1 DESC], fetch=4 +----SortExec: TopK(fetch=4), expr=[MAX(traces.timestamp)@1 DESC] +------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)] +--------CoalesceBatchesExec: target_batch_size=8192 +----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4 +------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)] +----------------MemoryExec: partitions=1, partition_sizes=[1] + + +query TI +select trace_id, MAX(timestamp) from traces group by trace_id order by MAX(timestamp) desc limit 4; +---- +b 3 +c 2 +a 1 +NULL 0 + +query TI +select trace_id, MIN(timestamp) from traces group by trace_id order by MIN(timestamp) asc limit 4; +---- +a -1 +NULL 0 +b 0 +c 1 + +query TII +select trace_id, other, MIN(timestamp) from traces group by trace_id, other order by MIN(timestamp) asc limit 4; +---- +a -1 -1 +b 0 0 +NULL 0 0 +c 1 1 + +query TII +select trace_id, MIN(other), MIN(timestamp) from traces group by trace_id order by MIN(timestamp), MIN(other) limit 4; +---- +a -1 -1 +NULL 0 0 +b 0 0 +c 1 1 + +statement ok +set datafusion.optimizer.enable_topk_aggregation = true; + +query TT +explain select trace_id, MAX(timestamp) from traces group by trace_id order by MAX(timestamp) desc limit 4; +---- +logical_plan +Limit: skip=0, fetch=4 +--Sort: MAX(traces.timestamp) DESC NULLS FIRST, fetch=4 +----Aggregate: groupBy=[[traces.trace_id]], aggr=[[MAX(traces.timestamp)]] +------TableScan: traces projection=[trace_id, timestamp] +physical_plan +GlobalLimitExec: skip=0, fetch=4 +--SortPreservingMergeExec: [MAX(traces.timestamp)@1 DESC], fetch=4 +----SortExec: TopK(fetch=4), expr=[MAX(traces.timestamp)@1 DESC] +------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)], lim=[4] +--------CoalesceBatchesExec: target_batch_size=8192 +----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4 +------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)], lim=[4] +----------------MemoryExec: partitions=1, partition_sizes=[1] + +query TT +explain select trace_id, MIN(timestamp) from traces group by trace_id order by MIN(timestamp) desc limit 4; +---- +logical_plan +Limit: skip=0, fetch=4 +--Sort: MIN(traces.timestamp) DESC NULLS FIRST, fetch=4 +----Aggregate: groupBy=[[traces.trace_id]], aggr=[[MIN(traces.timestamp)]] +------TableScan: traces projection=[trace_id, timestamp] +physical_plan +GlobalLimitExec: skip=0, fetch=4 +--SortPreservingMergeExec: [MIN(traces.timestamp)@1 DESC], fetch=4 +----SortExec: TopK(fetch=4), expr=[MIN(traces.timestamp)@1 DESC] +------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MIN(traces.timestamp)] +--------CoalesceBatchesExec: target_batch_size=8192 +----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4 +------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MIN(traces.timestamp)] +----------------MemoryExec: partitions=1, partition_sizes=[1] + +query TT +explain select trace_id, MAX(timestamp) from traces group by trace_id order by MAX(timestamp) asc limit 4; +---- +logical_plan +Limit: skip=0, fetch=4 +--Sort: MAX(traces.timestamp) ASC NULLS LAST, fetch=4 +----Aggregate: groupBy=[[traces.trace_id]], aggr=[[MAX(traces.timestamp)]] +------TableScan: traces projection=[trace_id, timestamp] +physical_plan +GlobalLimitExec: skip=0, fetch=4 +--SortPreservingMergeExec: [MAX(traces.timestamp)@1 ASC NULLS LAST], fetch=4 +----SortExec: TopK(fetch=4), expr=[MAX(traces.timestamp)@1 ASC NULLS LAST] +------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)] +--------CoalesceBatchesExec: target_batch_size=8192 +----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4 +------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)] +----------------MemoryExec: partitions=1, partition_sizes=[1] + +query TT +explain select trace_id, MAX(timestamp) from traces group by trace_id order by trace_id asc limit 4; +---- +logical_plan +Limit: skip=0, fetch=4 +--Sort: traces.trace_id ASC NULLS LAST, fetch=4 +----Aggregate: groupBy=[[traces.trace_id]], aggr=[[MAX(traces.timestamp)]] +------TableScan: traces projection=[trace_id, timestamp] +physical_plan +GlobalLimitExec: skip=0, fetch=4 +--SortPreservingMergeExec: [trace_id@0 ASC NULLS LAST], fetch=4 +----SortExec: TopK(fetch=4), expr=[trace_id@0 ASC NULLS LAST] +------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)] +--------CoalesceBatchesExec: target_batch_size=8192 +----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4 +------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)] +----------------MemoryExec: partitions=1, partition_sizes=[1] + +query TI +select trace_id, MAX(timestamp) from traces group by trace_id order by MAX(timestamp) desc limit 4; +---- +b 3 +c 2 +a 1 +NULL 0 + +query TI +select trace_id, MIN(timestamp) from traces group by trace_id order by MIN(timestamp) asc limit 4; +---- +a -1 +NULL 0 +b 0 +c 1 + +query TI +select trace_id, MAX(timestamp) from traces group by trace_id order by MAX(timestamp) desc limit 3; +---- +b 3 +c 2 +a 1 + +query TI +select trace_id, MIN(timestamp) from traces group by trace_id order by MIN(timestamp) asc limit 3; +---- +a -1 +NULL 0 +b 0 + +query TII +select trace_id, other, MIN(timestamp) from traces group by trace_id, other order by MIN(timestamp) asc limit 4; +---- +a -1 -1 +b 0 0 +NULL 0 0 +c 1 1 + +query TII +select trace_id, MIN(other), MIN(timestamp) from traces group by trace_id order by MIN(timestamp), MIN(other) limit 4; +---- +a -1 -1 +NULL 0 0 +b 0 0 +c 1 1