Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-50838][SQL]Performs additional checks inside recursive CTEs to throw an error if forbidden case is encountered #49518

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
7f71cdc
Add checkAnalysis and corresponding errors
milanisvet Jan 15, 2025
23f65d5
Adjust optimizer in case UnionLoop is encountered, and disables inlin…
milanisvet Jan 15, 2025
9f4a84a
Merge branch 'master' into checkRecursion
milanisvet Jan 17, 2025
05cb795
Revert "Adjust optimizer in case UnionLoop is encountered, and disabl…
milanisvet Jan 20, 2025
c455756
Split recursive funciton inside CTERelationDef into hasRecursiveCTERe…
milanisvet Jan 20, 2025
f46823b
Add condintion that recursive cteDefs should not be inlined if necessary
milanisvet Jan 20, 2025
8f3cfa8
Add checkNumberOfSelfReferences funciton to check if we have more tha…
milanisvet Jan 20, 2025
5d70004
Merge branch 'master' into checkRecursion
milanisvet Jan 20, 2025
c6dbae5
Rewrite error message. Corrects some typos.
milanisvet Jan 20, 2025
b9ebdff
Removes checkRecursion
milanisvet Jan 20, 2025
0da1043
Deletes debugging print inside checkNumberOfSelfReferences
milanisvet Jan 20, 2025
0a885a6
Corrects wrong error message
milanisvet Jan 20, 2025
ec8c8c3
Add check if anchor and recursion term have the same data type
milanisvet Jan 20, 2025
b7af544
Add check to throw error if self reference is placed at forbidden cases
milanisvet Jan 20, 2025
d4af85a
Rewrite comments
milanisvet Jan 20, 2025
3f11993
Rewrite comments
milanisvet Jan 20, 2025
ffa8aa2
Move "data type" and "place" checks back to checkRecursion, and rewri…
milanisvet Jan 21, 2025
879df0a
Move check to CTESubstitution that there are no multiple references p…
milanisvet Jan 21, 2025
d073e0a
Small fixes from reviewers' comments
milanisvet Jan 22, 2025
43a7460
Remove data type check
milanisvet Jan 22, 2025
45fdff9
Rewites the name of hasRecursiveCTERelationRef and hasItsOwnUnionLoopRef
milanisvet Jan 22, 2025
7c08882
Rewrites throwing exceptions
milanisvet Jan 22, 2025
6f99924
Add exception if self-reference is within a subquery
milanisvet Jan 22, 2025
a1ce987
Add passing down recursiveCTERelation to subqueries to substitute rec…
milanisvet Jan 23, 2025
07046d9
Add check for self reference within subquery to ResolveWithCTE
milanisvet Jan 23, 2025
38fa2ef
Removes check number of self references from CTESubstitution
milanisvet Jan 23, 2025
1ab39aa
Removes check for self ref in subquery; adds check number of self ref…
milanisvet Jan 23, 2025
99fafaf
Add check for self ref in subquery to CheckAnalysis
milanisvet Jan 23, 2025
0c1c107
Sort error conditions in correct order
milanisvet Jan 23, 2025
48b21cb
Set the subquery error message to be actually the place error message
milanisvet Jan 24, 2025
236f202
Rewrite checkIfSelfReferenceIsPlacedCorrectly to have better complexity
milanisvet Jan 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -3117,6 +3117,34 @@
],
"sqlState" : "42602"
},
"INVALID_RECURSIVE_REFERENCE" : {
"message" : [
"Invalid recursive reference found inside WITH RECURSIVE clause."
],
"subClass" : {
"DATA_TYPE" : {
"message" : [
"The data type of recursive references cannot change during resolution. Check that anchor and recursion term actually output the same data type. Originally it was <fromDataType> but after resolution is <toDataType>."
]
},
"NUMBER" : {
milanisvet marked this conversation as resolved.
Show resolved Hide resolved
"message" : [
"Multiple self-references to one recursive CTE are not allowed."
]
},
"PLACE" : {
"message" : [
"Recursive references cannot be used on the right side of left outer/semi/anti joins, on the left side of right outer joins, in full outer joins and in aggregates."
milanisvet marked this conversation as resolved.
Show resolved Hide resolved
]
},
"SUBQUERY" : {
"message" : [
"Recursive reference must not appear within a subquery."
]
}
},
"sqlState" : "42836"
},
"INVALID_QUERY_MIXED_QUERY_PARAMETERS" : {
"message" : [
"Parameterized query must either use positional, or named parameters, but not both."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ object CTESubstitution extends Rule[LogicalPlan] {
case LegacyBehaviorPolicy.CORRECTED =>
traverseAndSubstituteCTE(plan, forceInline, Seq.empty, cteDefs)
}
// Recursive CTEs allow only one self-reference per CTE. Here, we perform this check and
// throw an error if it is not fulfilled
cteDefs.foreach { cteDef =>
checkNumberOfSelfReferences(cteDef)
milanisvet marked this conversation as resolved.
Show resolved Hide resolved
}
if (cteDefs.isEmpty) {
substituted
} else if (substituted eq firstSubstituted.get) {
Expand Down Expand Up @@ -402,7 +407,7 @@ object CTESubstitution extends Rule[LogicalPlan] {
other.transformExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) {
case e: SubqueryExpression =>
e.withNewPlan(
apply(substituteCTE(e.plan, alwaysInline, cteRelations, None)))
apply(substituteCTE(e.plan, alwaysInline, cteRelations, recursiveCTERelation)))
}
}
}
Expand All @@ -423,4 +428,19 @@ object CTESubstitution extends Rule[LogicalPlan] {
case _ => WithCTE(p, cteDefs)
}
}

/**
* Counts number of self-references in a recursive CTE definition and throws an error
* if that number is bigger than 1.
*/
private def checkNumberOfSelfReferences(cteDef: CTERelationDef): Unit = {
val numOfSelfRef = cteDef.collectWithSubqueries {
case ref: CTERelationRef if ref.cteId == cteDef.id => ref
}.length
if (numOfSelfRef > 1) {
cteDef.failAnalysis(
errorClass = "INVALID_RECURSIVE_REFERENCE.NUMBER",
messageParameters = Map.empty)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.{SparkException, SparkThrowable}
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.ExtendedAnalysisException
import org.apache.spark.sql.catalyst.analysis.ResolveWithCTE.checkIfSelfReferenceIsPlacedCorrectly
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.SubExprUtils._
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction, ListAgg, Median, PercentileCont, PercentileDisc}
Expand Down Expand Up @@ -274,6 +275,10 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
checkTrailingCommaInSelect(proj)
case agg: Aggregate =>
checkTrailingCommaInSelect(agg)
case unionLoop: UnionLoop =>
// Recursive CTEs have already substituted Union to UnionLoop at this stage.
// Here we perform additional checks for them.
checkIfSelfReferenceIsPlacedCorrectly(unionLoop)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's match CTERelationDef here and call checkForSelfReferenceInSubquery. The function should use the cte id to find recursive references in subquery expressions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But there we come to the problem that we discussed. Union won't be substituted to UnionLoop in case we have a self-reference in subquery.
Should I just leave it in ResolveWithCTE? Because anyway, the part of ResolveWithCTE I placed it will always be executed at most once.

case _ =>
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.collection.mutable

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftOuter, LeftSemi, RightOuter}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern.{CTE, PLAN_EXPRESSION}
Expand Down Expand Up @@ -48,13 +49,16 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
cteDefMap: mutable.HashMap[Long, CTERelationDef]): LogicalPlan = {
plan.resolveOperatorsDownWithPruning(_.containsAllPatterns(CTE)) {
case withCTE @ WithCTE(_, cteDefs) =>
// Check for self-reference in subqueries and throw an error if that is the case.
// We don't want to resolve any such reference, therefore we need to perform this check
// here.
cteDefs.foreach(checkForSelfReferenceInSubquery)
val newCTEDefs = cteDefs.map {
// `cteDef.recursive` means "presence of a recursive CTERelationRef under cteDef". The
// side effect of node substitution below is that after CTERelationRef substitution
// its cteDef is no more considered `recursive`. This code path is common for `cteDef`
// that were non-recursive from the get go, as well as those that are no more recursive
// due to node substitution.
case cteDef if !cteDef.recursive =>
// cteDef in the first case is either recursive and all the recursive CTERelationRefs
// are already substituted to UnionLoopRef in the previous pass, or it is not recursive
// at all. In both cases we need to put it in the map in case it is resolved.
// Second case is performing the substitution of recursive CTERelationRefs.
case cteDef if !cteDef.hasSelfReferenceAsCTERef =>
if (cteDef.resolved) {
cteDefMap.put(cteDef.id, cteDef)
}
Expand Down Expand Up @@ -183,4 +187,52 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
columnNames.map(UnresolvedSubqueryColumnAliases(_, ref)).getOrElse(ref)
}
}

/**
* Checks if there is any self-reference within subqueries and throws an error
* if that is the case.
*/
private def checkForSelfReferenceInSubquery(cteDef: CTERelationDef): Unit = {
cteDef.subqueriesAll.foreach { subquery =>
subquery.foreach {
case CTERelationRef(cteDef.id, _, _, _, _, true) =>
cteDef.failAnalysis(
errorClass = "INVALID_RECURSIVE_REFERENCE.SUBQUERY",
messageParameters = Map.empty)
case _ =>
}
}
}

/**
* Throws error if self-reference is placed in places which are not allowed:
* right side of left outer/semi/anti joins, left side of right outer joins,
* in full outer joins and in aggregates
*/
def checkIfSelfReferenceIsPlacedCorrectly(unionLoop: UnionLoop): Unit = {
def unionLoopRefNotAllowedUnderCurrentNode(currentNode: LogicalPlan) : Unit =
currentNode.foreach {
case UnionLoopRef(unionLoop.id, _, _) =>
throw new AnalysisException(
errorClass = "INVALID_RECURSIVE_REFERENCE.PLACE",
messageParameters = Map.empty)
case other =>
}
unionLoop.foreach {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's not use .foreach here which causes O(n^2) time complexity. When matching certain nodes we will call unionLoopRefNotAllowedUnderCurrentNode which also traverse the tree.

how about this:

def checkIfSelfReferenceIsPlacedCorrectly(plan: LogicalPlan, allowRecursiveRef: Boolean = true) {
  case Join(left, right, LeftOuter, _, _) =>
    checkIfSelfReferenceIsPlacedCorrectly(left, allowRecursiveRef)
    checkIfSelfReferenceIsPlacedCorrectly(right, allowRecursiveRef = false)
  ...
  case _: UnionLoopRef if !allowRecursiveRef => fail ...
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes completely sense. It should be fixed now. Thanks a lot

case Join(left, right, LeftOuter, _, _) =>
cloud-fan marked this conversation as resolved.
Show resolved Hide resolved
unionLoopRefNotAllowedUnderCurrentNode(right)
case Join(left, right, RightOuter, _, _) =>
unionLoopRefNotAllowedUnderCurrentNode(left)
case Join(left, right, LeftSemi, _, _) =>
unionLoopRefNotAllowedUnderCurrentNode(right)
case Join(left, right, LeftAnti, _, _) =>
unionLoopRefNotAllowedUnderCurrentNode(right)
case Join(left, right, _, _, _) =>
unionLoopRefNotAllowedUnderCurrentNode(left)
unionLoopRefNotAllowedUnderCurrentNode(right)
case Aggregate(_, _, child, _) =>
unionLoopRefNotAllowedUnderCurrentNode(child)
case other =>
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ case class InlineCTE(
// 1) It is fine to inline a CTE if it references another CTE that is non-deterministic;
// 2) Any `CTERelationRef` that contains `OuterReference` would have been inlined first.
refCount == 1 ||
cteDef.deterministic ||
// Don't inline recursive CTEs if not necessary as recursion is very costly.
// The check if cteDef is recursive is performed by checking if it contains
// a UnionLoopRef with the same ID.
(cteDef.deterministic && !cteDef.hasSelfReferenceAsUnionLoopRef) ||
cteDef.child.exists(_.expressions.exists(_.isInstanceOf[OuterReference]))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,14 @@ case class CTERelationDef(

override def output: Seq[Attribute] = if (resolved) child.output else Nil

lazy val recursive: Boolean = child.exists{
// If the reference is found inside the child, referencing to this CTE definition,
// and already marked as recursive, then this CTE definition is recursive.
lazy val hasSelfReferenceAsCTERef: Boolean = child.exists{
case CTERelationRef(this.id, _, _, _, _, true) => true
case _ => false
}
lazy val hasSelfReferenceAsUnionLoopRef: Boolean = child.exists{
case UnionLoopRef(this.id, _, _) => true
case _ => false
}
}

object CTERelationDef {
Expand Down
Loading