Skip to content

Commit

Permalink
Fix issue that unsupported join type in BNLJ is not fallback (#7569)
Browse files Browse the repository at this point in the history
  • Loading branch information
ccat3z authored Nov 22, 2024
1 parent c940e68 commit ca09a60
Showing 1 changed file with 24 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.gluten.utils.SubstraitUtil

import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide}
import org.apache.spark.sql.catalyst.plans.{InnerLike, JoinType, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.plans.{FullOuter, InnerLike, JoinType, LeftExistence, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.joins.BaseJoinExec
Expand Down Expand Up @@ -79,6 +79,10 @@ abstract class BroadcastNestedLoopJoinExecTransformer(
left.output ++ right.output.map(_.withNullability(true))
case RightOuter =>
left.output.map(_.withNullability(true)) ++ right.output
case LeftExistence(_) =>
left.output
case FullOuter =>
left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true))
case x =>
throw new IllegalArgumentException(s"${getClass.getSimpleName} not take $x as the JoinType")
}
Expand Down Expand Up @@ -145,17 +149,32 @@ abstract class BroadcastNestedLoopJoinExecTransformer(
inputBuildOutput)
}

private def validateJoinTypeAndBuildSide(): ValidationResult = {
val result = joinType match {
case _: InnerLike | LeftOuter | RightOuter => ValidationResult.ok
case _ =>
ValidationResult.notOk(s"$joinType join is not supported with BroadcastNestedLoopJoin")
}
if (!result.isValid) {
return result
}
(joinType, buildSide) match {
case (LeftOuter, BuildLeft) | (RightOuter, BuildRight) =>
ValidationResult.notOk(s"$joinType join is not supported with $buildSide")
case _ => ValidationResult.ok // continue
}
}

override protected def doValidateInternal(): ValidationResult = {
if (!BackendsApiManager.getSettings.supportBroadcastNestedLoopJoinExec()) {
return ValidationResult.notOk("Broadcast Nested Loop join is not supported in this backend")
}
if (substraitJoinType == CrossRel.JoinType.UNRECOGNIZED) {
return ValidationResult.notOk(s"$joinType join is not supported with BroadcastNestedLoopJoin")
}
(joinType, buildSide) match {
case (LeftOuter, BuildLeft) | (RightOuter, BuildRight) =>
return ValidationResult.notOk(s"$joinType join is not supported with $buildSide")
case _ => // continue
val validateResult = validateJoinTypeAndBuildSide()
if (!validateResult.isValid) {
return validateResult
}
val substraitContext = new SubstraitContext

Expand Down

0 comments on commit ca09a60

Please sign in to comment.