Skip to content

Commit

Permalink
[SPARK-28386][SQL] Cannot resolve ORDER BY columns with GROUP BY and …
Browse files Browse the repository at this point in the history
…HAVING

### What changes were proposed in this pull request?

This PR enhanced the analyzer to handle the following pattern properly.

```
Sort
 - Filter
   - Aggregate
```

### Why are the changes needed?

```
spark-sql (default)> CREATE TABLE t1 (flag BOOLEAN, dt STRING);

spark-sql (default)>   SELECT LENGTH(dt),
                   >          COUNT(t1.flag)
                   >     FROM t1
                   > GROUP BY LENGTH(dt)
                   >   HAVING COUNT(t1.flag) > 1
                   > ORDER BY LENGTH(dt);
[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `dt` cannot be resolved. Did you mean one of the following? [`length(dt)`, `count(flag)`].; line 6 pos 16;
'Sort ['LENGTH('dt) ASC NULLS FIRST], true
+- Filter (count(flag)#60L > cast(1 as bigint))
   +- Aggregate [length(dt#9)], [length(dt#9) AS length(dt)#59, count(flag#8) AS count(flag)#60L]
      +- SubqueryAlias spark_catalog.default.t1
         +- Relation spark_catalog.default.t1[flag#8,dt#9] parquet
```

The above code demonstrates the failure case, the query failed during the analysis phase when both `HAVING` and `ORDER BY` clauses are present, but successful if only one is present.

### Does this PR introduce _any_ user-facing change?

Yes, maybe we can call it a bugfix.

### How was this patch tested?

New UTs are added

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #44352 from pan3793/SPARK-28386.

Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
  • Loading branch information
pan3793 authored and yaooqinn committed Dec 20, 2023
1 parent f9e468e commit 9fdc375
Show file tree
Hide file tree
Showing 10 changed files with 87 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2690,6 +2690,15 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
}
s.copy(order = newSortOrder, child = newChild)
})

case s @ Sort(_, _, f @ Filter(cond, agg: Aggregate))
if agg.resolved && cond.resolved && s.order.forall(_.resolved) =>
resolveOperatorWithAggregate(s.order.map(_.child), agg, (newExprs, newChild) => {
val newSortOrder = s.order.zip(newExprs).map {
case (sortOrder, expr) => sortOrder.copy(child = expr)
}
s.copy(order = newSortOrder, child = f.copy(child = newChild))
})
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.expressions.SortOrder
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Sort}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LogicalPlan, Project, Sort}
import org.apache.spark.sql.connector.catalog.CatalogManager

/**
Expand All @@ -28,10 +28,11 @@ import org.apache.spark.sql.connector.catalog.CatalogManager
* includes metadata columns as well.
* 2. Resolves the column to a literal function which is allowed to be invoked without braces, e.g.
* `SELECT col, current_date FROM t`.
* 3. If the child plan is Aggregate, resolves the column to [[TempResolvedColumn]] with the output
* of Aggregate's child plan. This is to allow Sort to host grouping expressions and aggregate
* functions, which can be pushed down to the Aggregate later. For example,
* `SELECT max(a) FROM t GROUP BY b ORDER BY min(a)`.
* 3. If the child plan is Aggregate or Filter(_, Aggregate), resolves the column to
* [[TempResolvedColumn]] with the output of Aggregate's child plan.
* This is to allow Sort to host grouping expressions and aggregate functions, which can
* be pushed down to the Aggregate later. For example,
* `SELECT max(a) FROM t GROUP BY b HAVING max(a) > 1 ORDER BY min(a)`.
* 4. Resolves the column to [[AttributeReference]] with the output of a descendant plan node.
* Spark will propagate the missing attributes from the descendant plan node to the Sort node.
* This is to allow users to ORDER BY columns that are not in the SELECT clause, which is
Expand All @@ -51,7 +52,10 @@ class ResolveReferencesInSort(val catalogManager: CatalogManager)

def apply(s: Sort): LogicalPlan = {
val resolvedBasic = s.order.map(resolveExpressionByPlanOutput(_, s.child))
val resolvedWithAgg = resolvedBasic.map(resolveColWithAgg(_, s.child))
val resolvedWithAgg = s.child match {
case Filter(_, agg: Aggregate) => resolvedBasic.map(resolveColWithAgg(_, agg))
case _ => resolvedBasic.map(resolveColWithAgg(_, s.child))
}
val (missingAttrResolved, newChild) = resolveExprsAndAddMissingAttrs(resolvedWithAgg, s.child)
val orderByAllResolved = resolveOrderByAll(
s.global, newChild, missingAttrResolved.map(_.asInstanceOf[SortOrder]))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,32 @@ Filter (c1#x = 1)
+- Aggregate [c1#x], [c1#x]
+- SubqueryAlias t
+- LocalRelation [c1#x, c2#x]


-- !query
SELECT k, sum(v) FROM hav GROUP BY k HAVING sum(v) > 2 ORDER BY sum(v)
-- !query analysis
Sort [sum(v)#xL ASC NULLS FIRST], true
+- Filter (sum(v)#xL > cast(2 as bigint))
+- Aggregate [k#x], [k#x, sum(v#x) AS sum(v)#xL]
+- SubqueryAlias hav
+- View (`hav`, [k#x,v#x])
+- Project [cast(k#x as string) AS k#x, cast(v#x as int) AS v#x]
+- Project [k#x, v#x]
+- SubqueryAlias hav
+- LocalRelation [k#x, v#x]


-- !query
SELECT k, sum(v) FROM hav GROUP BY k HAVING sum(v) > 2 ORDER BY avg(v)
-- !query analysis
Project [k#x, sum(v)#xL]
+- Sort [avg(v#x)#x ASC NULLS FIRST], true
+- Filter (sum(v)#xL > cast(2 as bigint))
+- Aggregate [k#x], [k#x, sum(v#x) AS sum(v)#xL, avg(v#x) AS avg(v#x)#x]
+- SubqueryAlias hav
+- View (`hav`, [k#x,v#x])
+- Project [cast(k#x as string) AS k#x, cast(v#x as int) AS v#x]
+- Project [k#x, v#x]
+- SubqueryAlias hav
+- LocalRelation [k#x, v#x]
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,11 @@ Project [udf(b)#x, udf(c)#x]
SELECT udf(b), udf(c) FROM test_having
GROUP BY b, c HAVING udf(b) = 3 ORDER BY udf(b), udf(c)
-- !query analysis
Project [udf(b)#x, udf(c)#x]
+- Sort [cast(udf(cast(b#x as string)) as int) ASC NULLS FIRST, cast(udf(cast(c#x as string)) as string) ASC NULLS FIRST], true
+- Filter (udf(b)#x = 3)
+- Aggregate [b#x, c#x], [cast(udf(cast(b#x as string)) as int) AS udf(b)#x, cast(udf(cast(c#x as string)) as string) AS udf(c)#x, b#x, c#x]
+- SubqueryAlias spark_catalog.default.test_having
+- Relation spark_catalog.default.test_having[a#x,b#x,c#x,d#x] parquet
Sort [udf(b)#x ASC NULLS FIRST, udf(c)#x ASC NULLS FIRST], true
+- Filter (udf(b)#x = 3)
+- Aggregate [b#x, c#x], [cast(udf(cast(b#x as string)) as int) AS udf(b)#x, cast(udf(cast(c#x as string)) as string) AS udf(c)#x]
+- SubqueryAlias spark_catalog.default.test_having
+- Relation spark_catalog.default.test_having[a#x,b#x,c#x,d#x] parquet


-- !query
Expand Down
6 changes: 6 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/having.sql
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,9 @@ SELECT c1 FROM VALUES (1, 2) as t(c1, c2) GROUP BY GROUPING SETS(t.c1) HAVING t.
SELECT c1 FROM VALUES (1, 2) as t(c1, c2) GROUP BY CUBE(t.c1) HAVING t.c1 = 1;
SELECT c1 FROM VALUES (1, 2) as t(c1, c2) GROUP BY ROLLUP(t.c1) HAVING t.c1 = 1;
SELECT c1 FROM VALUES (1, 2) as t(c1, c2) GROUP BY t.c1 HAVING t.c1 = 1;

-- SPARK-28386: Resolve ORDER BY agg function with HAVING clause, while the agg function presents on SELECT list
SELECT k, sum(v) FROM hav GROUP BY k HAVING sum(v) > 2 ORDER BY sum(v);

-- SPARK-28386: Resolve ORDER BY agg function with HAVING clause, while the agg function does not present on SELECT list
SELECT k, sum(v) FROM hav GROUP BY k HAVING sum(v) > 2 ORDER BY avg(v);
18 changes: 18 additions & 0 deletions sql/core/src/test/resources/sql-tests/results/having.sql.out
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,21 @@ SELECT c1 FROM VALUES (1, 2) as t(c1, c2) GROUP BY t.c1 HAVING t.c1 = 1
struct<c1:int>
-- !query output
1


-- !query
SELECT k, sum(v) FROM hav GROUP BY k HAVING sum(v) > 2 ORDER BY sum(v)
-- !query schema
struct<k:string,sum(v):bigint>
-- !query output
three 3
one 6


-- !query
SELECT k, sum(v) FROM hav GROUP BY k HAVING sum(v) > 2 ORDER BY avg(v)
-- !query schema
struct<k:string,sum(v):bigint>
-- !query output
one 6
three 3
Original file line number Diff line number Diff line change
Expand Up @@ -248,15 +248,15 @@ Input [2]: [ca_state#18, count#22]
Keys [1]: [ca_state#18]
Functions [1]: [count(1)]
Aggregate Attributes [1]: [count(1)#23]
Results [3]: [ca_state#18 AS state#24, count(1)#23 AS cnt#25, ca_state#18]
Results [2]: [ca_state#18 AS state#24, count(1)#23 AS cnt#25]

(44) Filter [codegen id : 14]
Input [3]: [state#24, cnt#25, ca_state#18]
Input [2]: [state#24, cnt#25]
Condition : (cnt#25 >= 10)

(45) TakeOrderedAndProject
Input [3]: [state#24, cnt#25, ca_state#18]
Arguments: 100, [cnt#25 ASC NULLS FIRST, ca_state#18 ASC NULLS FIRST], [state#24, cnt#25]
Input [2]: [state#24, cnt#25]
Arguments: 100, [cnt#25 ASC NULLS FIRST, state#24 ASC NULLS FIRST], [state#24, cnt#25]

===== Subqueries =====

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
TakeOrderedAndProject [cnt,ca_state,state]
TakeOrderedAndProject [cnt,state]
WholeStageCodegen (14)
Filter [cnt]
HashAggregate [ca_state,count] [count(1),state,cnt,count]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,15 +218,15 @@ Input [2]: [ca_state#2, count#22]
Keys [1]: [ca_state#2]
Functions [1]: [count(1)]
Aggregate Attributes [1]: [count(1)#23]
Results [3]: [ca_state#2 AS state#24, count(1)#23 AS cnt#25, ca_state#2]
Results [2]: [ca_state#2 AS state#24, count(1)#23 AS cnt#25]

(38) Filter [codegen id : 8]
Input [3]: [state#24, cnt#25, ca_state#2]
Input [2]: [state#24, cnt#25]
Condition : (cnt#25 >= 10)

(39) TakeOrderedAndProject
Input [3]: [state#24, cnt#25, ca_state#2]
Arguments: 100, [cnt#25 ASC NULLS FIRST, ca_state#2 ASC NULLS FIRST], [state#24, cnt#25]
Input [2]: [state#24, cnt#25]
Arguments: 100, [cnt#25 ASC NULLS FIRST, state#24 ASC NULLS FIRST], [state#24, cnt#25]

===== Subqueries =====

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
TakeOrderedAndProject [cnt,ca_state,state]
TakeOrderedAndProject [cnt,state]
WholeStageCodegen (8)
Filter [cnt]
HashAggregate [ca_state,count] [count(1),state,cnt,count]
Expand Down

0 comments on commit 9fdc375

Please sign in to comment.