Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-50838][SQL]Performs additional checks inside recursive CTEs to throw an error if forbidden case is encountered #49518

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
7f71cdc
Add checkAnalysis and corresponding errors
milanisvet Jan 15, 2025
23f65d5
Adjust optimizer in case UnionLoop is encountered, and disables inlin…
milanisvet Jan 15, 2025
9f4a84a
Merge branch 'master' into checkRecursion
milanisvet Jan 17, 2025
05cb795
Revert "Adjust optimizer in case UnionLoop is encountered, and disabl…
milanisvet Jan 20, 2025
c455756
Split recursive funciton inside CTERelationDef into hasRecursiveCTERe…
milanisvet Jan 20, 2025
f46823b
Add condintion that recursive cteDefs should not be inlined if necessary
milanisvet Jan 20, 2025
8f3cfa8
Add checkNumberOfSelfReferences funciton to check if we have more tha…
milanisvet Jan 20, 2025
5d70004
Merge branch 'master' into checkRecursion
milanisvet Jan 20, 2025
c6dbae5
Rewrite error message. Corrects some typos.
milanisvet Jan 20, 2025
b9ebdff
Removes checkRecursion
milanisvet Jan 20, 2025
0da1043
Deletes debugging print inside checkNumberOfSelfReferences
milanisvet Jan 20, 2025
0a885a6
Corrects wrong error message
milanisvet Jan 20, 2025
ec8c8c3
Add check if anchor and recursion term have the same data type
milanisvet Jan 20, 2025
b7af544
Add check to throw error if self reference is placed at forbidden cases
milanisvet Jan 20, 2025
d4af85a
Rewrite comments
milanisvet Jan 20, 2025
3f11993
Rewrite comments
milanisvet Jan 20, 2025
ffa8aa2
Move "data type" and "place" checks back to checkRecursion, and rewri…
milanisvet Jan 21, 2025
879df0a
Move check to CTESubstitution that there are no multiple references p…
milanisvet Jan 21, 2025
d073e0a
Small fixes from reviewers' comments
milanisvet Jan 22, 2025
43a7460
Remove data type check
milanisvet Jan 22, 2025
45fdff9
Rewites the name of hasRecursiveCTERelationRef and hasItsOwnUnionLoopRef
milanisvet Jan 22, 2025
7c08882
Rewrites throwing exceptions
milanisvet Jan 22, 2025
6f99924
Add exception if self-reference is within a subquery
milanisvet Jan 22, 2025
a1ce987
Add passing down recursiveCTERelation to subqueries to substitute rec…
milanisvet Jan 23, 2025
07046d9
Add check for self reference within subquery to ResolveWithCTE
milanisvet Jan 23, 2025
38fa2ef
Removes check number of self references from CTESubstitution
milanisvet Jan 23, 2025
1ab39aa
Removes check for self ref in subquery; adds check number of self ref…
milanisvet Jan 23, 2025
99fafaf
Add check for self ref in subquery to CheckAnalysis
milanisvet Jan 23, 2025
0c1c107
Sort error conditions in correct order
milanisvet Jan 23, 2025
48b21cb
Set the subquery error message to be actually the place error message
milanisvet Jan 24, 2025
236f202
Rewrite checkIfSelfReferenceIsPlacedCorrectly to have better complexity
milanisvet Jan 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -3024,6 +3024,29 @@
],
"sqlState" : "42602"
},
"INVALID_RECURSIVE_REFERENCE" : {
"message" : [
"Invalid recursive reference found."
milanisvet marked this conversation as resolved.
Show resolved Hide resolved
],
"subClass" : {
"DATA_TYPE" : {
"message" : [
"The data type of recursive references cannot change during resolution. Originally it was <fromDataType> but after resolution is <toDataType>."
milanisvet marked this conversation as resolved.
Show resolved Hide resolved
]
},
"NUMBER" : {
milanisvet marked this conversation as resolved.
Show resolved Hide resolved
"message" : [
"Recursive references cannot be used multiple times."
]
},
"PLACE" : {
"message" : [
"Recursive references cannot be used on the right side of left outer/semi/anti joins, on the left side of right outer joins, in full outer joins and in aggregates."
]
}
},
"sqlState" : "42836"
},
"INVALID_QUERY_MIXED_QUERY_PARAMETERS" : {
"message" : [
"Parameterized query must either use positional, or named parameters, but not both."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -932,6 +932,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
case _ => // Analysis successful!
}
}
checkRecursion(plan)
checkCollectedMetrics(plan)
extendedCheckRules.foreach(_(plan))
plan.foreachUp {
Expand Down Expand Up @@ -1042,6 +1043,75 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
if (Utils.isTesting) scrubOutIds(result) else result
}

/**
* Recursion, according to SQL standard, comes with several limitations:
* 1. Recursive term can contain one recursive reference only.
* 2. Recursive reference can't be used in some kinds of joins and aggregations.
milanisvet marked this conversation as resolved.
Show resolved Hide resolved
* This rule checks that these restrictions are not violated.
*/
private def checkRecursion(
plan: LogicalPlan,
references: mutable.Map[Long, (Int, Seq[DataType])] = mutable.Map.empty): Unit = {
plan match {
// The map is filled with UnionLoop id as key and 0 (number of Ref occasions) and datatype
// as value
case UnionLoop(id, anchor, recursion, _) =>
checkRecursion(anchor, references)
milanisvet marked this conversation as resolved.
Show resolved Hide resolved
checkRecursion(recursion, references += id -> (0, anchor.output.map(_.dataType)))
references -= id
case r @ UnionLoopRef(loopId, output, false) =>
// If we encounter a recursive reference, it has to be present in the map
if (!references.contains(loopId)) {
r.failAnalysis(
errorClass = "INVALID_RECURSIVE_REFERENCE.PLACE",
messageParameters = Map.empty
)
}
val (count, dataType) = references(loopId)
if (count > 0) {
r.failAnalysis(
errorClass = "INVALID_RECURSIVE_REFERENCE.NUMBER",
messageParameters = Map.empty
)
}
val originalDataType = r.output.map(_.dataType)
if (!originalDataType.zip(dataType).forall {
case (odt, dt) => DataType.equalsStructurally(odt, dt, true)
milanisvet marked this conversation as resolved.
Show resolved Hide resolved
}) {
r.failAnalysis(
errorClass = "INVALID_RECURSIVE_REFERENCE.DATA_TYPE",
messageParameters = Map(
"fromDataType" -> originalDataType.map(toSQLType).mkString(", "),
"toDataType" -> dataType.map(toSQLType).mkString(", ")
)
)
}
references(loopId) = (count + 1, dataType)
case Join(left, right, Inner, _, _) =>
checkRecursion(left, references)
milanisvet marked this conversation as resolved.
Show resolved Hide resolved
checkRecursion(right, references)
// Reference not allowed in the right part of LeftOuter join
case Join(left, right, LeftOuter, _, _) =>
checkRecursion(left, references)
checkRecursion(right, mutable.Map.empty)
case Join(left, right, RightOuter, _, _) =>
checkRecursion(left, mutable.Map.empty)
checkRecursion(right, references)
case Join(left, right, LeftSemi, _, _) =>
checkRecursion(left, references)
checkRecursion(right, mutable.Map.empty)
case Join(left, right, LeftAnti, _, _) =>
checkRecursion(left, references)
checkRecursion(right, mutable.Map.empty)
case Join(left, right, _, _, _) =>
checkRecursion(left, mutable.Map.empty)
checkRecursion(right, mutable.Map.empty)
// Reference not allowed below an aggregate node
case Aggregate(_, _, child, _) => checkRecursion(child, mutable.Map.empty)
case o => o.children.foreach(checkRecursion(_, references))
}
}

/**
* Validates subquery expressions in the plan. Upon failure, returns an user facing error.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ case class InlineCTE(
// 1) It is fine to inline a CTE if it references another CTE that is non-deterministic;
// 2) Any `CTERelationRef` that contains `OuterReference` would have been inlined first.
refCount == 1 ||
cteDef.deterministic ||
// Don't inline recursive CTEs if not necessary as recursion is very costly.
(cteDef.deterministic && !cteDef.recursive) ||
cteDef.child.exists(_.expressions.exists(_.isInstanceOf[OuterReference]))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,11 @@ object LimitPushDown extends Rule[LogicalPlan] {
case LocalLimit(exp, u: Union) =>
LocalLimit(exp, u.copy(children = u.children.map(maybePushLocalLimit(exp, _))))

case l @ LocalLimit(IntegerLiteral(limit), p @ Project(_, u: UnionLoop)) =>
l.copy(child = p.copy(child = u.copy(limit = Some(limit))))
case l @ LocalLimit(IntegerLiteral(limit), u: UnionLoop) =>
l.copy(child = u.copy(limit = Some(limit)))

// Add extra limits below JOIN:
// 1. For LEFT OUTER and RIGHT OUTER JOIN, we push limits to the left and right sides
// respectively if join condition is not empty.
Expand Down Expand Up @@ -1028,6 +1033,9 @@ object ColumnPruning extends Rule[LogicalPlan] {
} else {
p
}
// TODO: Pruning `UnionLoop`s needs to take into account both the outer `Project` and the inner
// `UnionLoopRef` nodes.
case p @ Project(_, _: UnionLoop) => p

// Prune unnecessary window expressions
case p @ Project(_, w: Window) if !w.windowOutputSet.subsetOf(p.references) =>
Expand Down
Loading