Skip to content

Commit

Permalink
Enhance LastValueAccumulator logic and add SQL logic tests for last_v…
Browse files Browse the repository at this point in the history
…alue function (#13980)

- Updated LastValueAccumulator to include requirement satisfaction check before updating the last value.
- Added SQL logic tests to verify the behavior of the last_value function with merge batches and ensure correct aggregation in various scenarios.
  • Loading branch information
jayzhan211 authored Jan 3, 2025
1 parent 33437f7 commit ab1de2c
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 0 deletions.
1 change: 1 addition & 0 deletions datafusion/functions-aggregate/src/first_last.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,7 @@ impl Accumulator for LastValueAccumulator {
// Either there is no existing value, or there is a newer (latest)
// version in the new data:
if !self.is_set
|| self.requirement_satisfied
|| compare_rows(&self.orderings, last_ordering, &sort_options)?.is_lt()
{
// Update with last value in the state. Note that we should exclude the
Expand Down
66 changes: 66 additions & 0 deletions datafusion/sqllogictest/test_files/aggregate.slt
Original file line number Diff line number Diff line change
Expand Up @@ -6137,3 +6137,69 @@ SELECT v1 FROM t1 WHERE ((count(v1) % 1) << 1) > 0;

statement ok
DROP TABLE t1;

# Test last function with merge batch
query II
with A as (
select 1 as id, 10 as foo
UNION ALL
select 1, 10
UNION ALL
select 1, 10
UNION ALL
select 1, 10
UNION ALL
select 1, 10
---- The order is non-deterministic, keep the value the same
) select last_value(a.foo), sum(distinct 1) from A a group by a.id;
----
10 1

# It has only AggregateExec with FinalPartitioned mode, so `merge_batch` is used
# If the plan is changed, whether the `merge_batch` is used should be verified to ensure the test coverage
query TT
explain with A as (
select 1 as id, 2 as foo
UNION ALL
select 1, 4
UNION ALL
select 1, 5
UNION ALL
select 1, 3
UNION ALL
select 1, 2
) select last_value(a.foo order by a.foo), sum(distinct 1) from A a group by a.id;
----
logical_plan
01)Projection: last_value(a.foo) ORDER BY [a.foo ASC NULLS LAST], sum(DISTINCT Int64(1))
02)--Aggregate: groupBy=[[a.id]], aggr=[[last_value(a.foo) ORDER BY [a.foo ASC NULLS LAST], sum(DISTINCT Int64(1))]]
03)----SubqueryAlias: a
04)------SubqueryAlias: a
05)--------Union
06)----------Projection: Int64(1) AS id, Int64(2) AS foo
07)------------EmptyRelation
08)----------Projection: Int64(1) AS id, Int64(4) AS foo
09)------------EmptyRelation
10)----------Projection: Int64(1) AS id, Int64(5) AS foo
11)------------EmptyRelation
12)----------Projection: Int64(1) AS id, Int64(3) AS foo
13)------------EmptyRelation
14)----------Projection: Int64(1) AS id, Int64(2) AS foo
15)------------EmptyRelation
physical_plan
01)ProjectionExec: expr=[last_value(a.foo) ORDER BY [a.foo ASC NULLS LAST]@1 as last_value(a.foo) ORDER BY [a.foo ASC NULLS LAST], sum(DISTINCT Int64(1))@2 as sum(DISTINCT Int64(1))]
02)--AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], aggr=[last_value(a.foo) ORDER BY [a.foo ASC NULLS LAST], sum(DISTINCT Int64(1))], ordering_mode=Sorted
03)----CoalesceBatchesExec: target_batch_size=8192
04)------RepartitionExec: partitioning=Hash([id@0], 4), input_partitions=5
05)--------AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[last_value(a.foo) ORDER BY [a.foo ASC NULLS LAST], sum(DISTINCT Int64(1))], ordering_mode=Sorted
06)----------UnionExec
07)------------ProjectionExec: expr=[1 as id, 2 as foo]
08)--------------PlaceholderRowExec
09)------------ProjectionExec: expr=[1 as id, 4 as foo]
10)--------------PlaceholderRowExec
11)------------ProjectionExec: expr=[1 as id, 5 as foo]
12)--------------PlaceholderRowExec
13)------------ProjectionExec: expr=[1 as id, 3 as foo]
14)--------------PlaceholderRowExec
15)------------ProjectionExec: expr=[1 as id, 2 as foo]
16)--------------PlaceholderRowExec

0 comments on commit ab1de2c

Please sign in to comment.