diff --git a/datafusion/functions-aggregate/src/first_last.rs b/datafusion/functions-aggregate/src/first_last.rs index 9ad55d91a68b..8ef139ae6123 100644 --- a/datafusion/functions-aggregate/src/first_last.rs +++ b/datafusion/functions-aggregate/src/first_last.rs @@ -651,6 +651,7 @@ impl Accumulator for LastValueAccumulator { // Either there is no existing value, or there is a newer (latest) // version in the new data: if !self.is_set + || self.requirement_satisfied || compare_rows(&self.orderings, last_ordering, &sort_options)?.is_lt() { // Update with last value in the state. Note that we should exclude the diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index cd62e5625342..ffc441d31765 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -6137,3 +6137,69 @@ SELECT v1 FROM t1 WHERE ((count(v1) % 1) << 1) > 0; statement ok DROP TABLE t1; + +# Test last function with merge batch +query II +with A as ( + select 1 as id, 10 as foo + UNION ALL + select 1, 10 + UNION ALL + select 1, 10 + UNION ALL + select 1, 10 + UNION ALL + select 1, 10 + ---- The order is non-deterministic, keep the value the same +) select last_value(a.foo), sum(distinct 1) from A a group by a.id; +---- +10 1 + +# It has only AggregateExec with FinalPartitioned mode, so `merge_batch` is used +# If the plan is changed, whether the `merge_batch` is used should be verified to ensure the test coverage +query TT +explain with A as ( + select 1 as id, 2 as foo + UNION ALL + select 1, 4 + UNION ALL + select 1, 5 + UNION ALL + select 1, 3 + UNION ALL + select 1, 2 +) select last_value(a.foo order by a.foo), sum(distinct 1) from A a group by a.id; +---- +logical_plan +01)Projection: last_value(a.foo) ORDER BY [a.foo ASC NULLS LAST], sum(DISTINCT Int64(1)) +02)--Aggregate: groupBy=[[a.id]], aggr=[[last_value(a.foo) ORDER BY [a.foo ASC NULLS LAST], sum(DISTINCT Int64(1))]] +03)----SubqueryAlias: a +04)------SubqueryAlias: a +05)--------Union +06)----------Projection: Int64(1) AS id, Int64(2) AS foo +07)------------EmptyRelation +08)----------Projection: Int64(1) AS id, Int64(4) AS foo +09)------------EmptyRelation +10)----------Projection: Int64(1) AS id, Int64(5) AS foo +11)------------EmptyRelation +12)----------Projection: Int64(1) AS id, Int64(3) AS foo +13)------------EmptyRelation +14)----------Projection: Int64(1) AS id, Int64(2) AS foo +15)------------EmptyRelation +physical_plan +01)ProjectionExec: expr=[last_value(a.foo) ORDER BY [a.foo ASC NULLS LAST]@1 as last_value(a.foo) ORDER BY [a.foo ASC NULLS LAST], sum(DISTINCT Int64(1))@2 as sum(DISTINCT Int64(1))] +02)--AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], aggr=[last_value(a.foo) ORDER BY [a.foo ASC NULLS LAST], sum(DISTINCT Int64(1))], ordering_mode=Sorted +03)----CoalesceBatchesExec: target_batch_size=8192 +04)------RepartitionExec: partitioning=Hash([id@0], 4), input_partitions=5 +05)--------AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[last_value(a.foo) ORDER BY [a.foo ASC NULLS LAST], sum(DISTINCT Int64(1))], ordering_mode=Sorted +06)----------UnionExec +07)------------ProjectionExec: expr=[1 as id, 2 as foo] +08)--------------PlaceholderRowExec +09)------------ProjectionExec: expr=[1 as id, 4 as foo] +10)--------------PlaceholderRowExec +11)------------ProjectionExec: expr=[1 as id, 5 as foo] +12)--------------PlaceholderRowExec +13)------------ProjectionExec: expr=[1 as id, 3 as foo] +14)--------------PlaceholderRowExec +15)------------ProjectionExec: expr=[1 as id, 2 as foo] +16)--------------PlaceholderRowExec