Skip to content

Commit

Permalink
[VL] Remove reselect build side in ShuffledHashJoinExecTransformer (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
zml1206 authored Jun 4, 2024
1 parent 785958e commit 2713509
Showing 1 changed file with 2 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ package org.apache.gluten.execution

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide}
import org.apache.spark.sql.catalyst.optimizer.BuildSide
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.execution.{FilterExec, SparkPlan}
import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.joins.BuildSideRelation
import org.apache.spark.sql.vectorized.ColumnarBatch

Expand All @@ -46,80 +45,6 @@ case class ShuffledHashJoinExecTransformer(
right,
isSkewJoin) {

// Used to specify the preferred build side in backend's real execution.
object PreferredBuildSide extends Serializable {
val LEFT = "left table"
val RIGHT = "right table"
val NON = "none"
}

/**
* Returns whether the plan matches the condition to be preferred as build side. Currently, filter
* and aggregation are preferred.
* @param plan
* the left or right plan of join
* @return
* whether the plan matches the condition
*/
private def matchCondition(plan: SparkPlan): Boolean =
plan.isInstanceOf[FilterExecTransformerBase] || plan.isInstanceOf[FilterExec] ||
plan.isInstanceOf[BaseAggregateExec]

/**
* Returns whether a plan is preferred as the build side. If this plan or its children match the
* condition, it will be preferred.
* @param plan
* the left or right plan of join
* @return
* whether the plan is preferred as the build side
*/
private def isPreferred(plan: SparkPlan): Boolean =
matchCondition(plan) || plan.children.exists(child => matchCondition(child))

// Returns the preferred build side with the consideration of preferring condition.
private lazy val preferredBuildSide: String =
if ((isPreferred(left) && isPreferred(right)) || (!isPreferred(left) && !isPreferred(right))) {
PreferredBuildSide.NON
} else if (isPreferred(left)) {
PreferredBuildSide.LEFT
} else {
PreferredBuildSide.RIGHT
}

/**
* Returns whether the build and stream table should be exchanged with consideration of build
* type, planned build side and the preferred build side.
*/
override lazy val needSwitchChildren: Boolean = hashJoinType match {
case LeftOuter | LeftSemi | ExistenceJoin(_) =>
joinBuildSide match {
case BuildLeft =>
// Exchange build and stream side when left side or none is preferred as the build side,
// and RightOuter or RightSemi wil be used.
!(preferredBuildSide == PreferredBuildSide.RIGHT)
case _ =>
// Do not exchange build and stream side when right side or none is preferred
// as the build side, and LeftOuter or LeftSemi wil be used.
preferredBuildSide == PreferredBuildSide.LEFT
}
case RightOuter =>
joinBuildSide match {
case BuildRight =>
// Do not exchange build and stream side when right side or none is preferred
// as the build side, and RightOuter will be used.
preferredBuildSide == PreferredBuildSide.LEFT
case _ =>
// Exchange build and stream side when left side or none is preferred as the build side,
// and LeftOuter will be used.
!(preferredBuildSide == PreferredBuildSide.RIGHT)
}
case _ =>
joinBuildSide match {
case BuildLeft => true
case BuildRight => false
}
}

override protected lazy val substraitJoinType: JoinRel.JoinType = joinType match {
case _: InnerLike =>
JoinRel.JoinType.JOIN_TYPE_INNER
Expand Down

0 comments on commit 2713509

Please sign in to comment.