-
Notifications
You must be signed in to change notification settings - Fork 450
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CORE] Creates vanilla plan when the join operators fall back #6093
Conversation
Thanks for opening a pull request! Could you open an issue for this pull request on Github Issues? https://github.com/apache/incubator-gluten/issues Then could you also rename commit message and pull request title in the following format?
See also: |
Run Gluten Clickhouse CI |
gluten-core/src/main/scala/org/apache/gluten/extension/columnar/TransformHintRule.scala
Show resolved
Hide resolved
Run Gluten Clickhouse CI |
3 similar comments
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
+- ^ RegularHashAggregateExecTransformer (38) | ||
+- ^ ProjectExecTransformer (37) | ||
+- ^ ShuffledHashJoinExecTransformer Inner BuildLeft (36) | ||
:- ^ SortExecTransformer (26) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't drop SortExecTransformer
when RAS enabled is a known issue, #6011 @zhztheplayer
@ulysses-you @zhztheplayer If you have time, can you help to take a look again? Thank you. |
if (isReOptimize && existsShuffledJoin(plan)) { | ||
cost -= 1 | ||
} | ||
isReOptimize = !isReOptimize |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could understand how this statement works but it's like something too caller-sensitive to rely on.
I see Cost
is a trait that can be extended. Could we store the plan's id which is incremental by instantiation order in a customized Cost structure, then use the ID during cost comparison to find out the one that was created later?
// sql adaptive customCostEvaluatorClass | ||
conf.set( | ||
SQLConf.ADAPTIVE_CUSTOM_COST_EVALUATOR_CLASS.key, | ||
"org.apache.spark.sql.execution.adaptive.GlutenCostEvaluator") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's have a check to raise an error when there was already another cost evaluator set by user.
val simpleCost = | ||
SimpleCostEvaluator(forceOptimizeSkewedJoin).evaluateCost(plan).asInstanceOf[SimpleCost] | ||
var cost = simpleCost.value * 2 | ||
if (isReOptimize && plan.exists(_.isInstanceOf[ShuffledJoin])) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should only change the plan when there is an actual better build side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have considered it before, but doing this requires a lot of cost. First, need to add extra rule to add tags. Secondly, when the plan contains multiple joins, it is not easy to judge which join is about to be executed. I just modified cost equal. If the costs are equal, it is forced to use new physics plan, which doesn't seem to hurt.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the costs are equal, it is forced to use new physics plan
Would you like to confirm why the newer plan is always better than the older one? E.g., The newer one carries the latest up-to-date statistics so the build side re-calculation in columnar rule could be more reliable?
If we can make sure the newer is better, then it sounds reasonable to use the newer one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additionally, I assume cost(reoptimizePlan) == cost(plan)
happens frequently speaking of AQE execution. If so we should measure on the planer performance too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default condition for spark to use newPhysicalPlan is newCost < origCost || (newCost == origCost && currentPhysicalPlan != newPhysicalPlan)
, this pr just adds a situation, newCost == origCost && currentPhysicalPlan == newPhysicalPlan && !currentPhysicalPlan eq newPhysicalPlan
, so I think the performance of newPhysicalPlan has not changed.
Run Gluten Clickhouse CI |
1 similar comment
Run Gluten Clickhouse CI |
case class GlutenCost(value: Long, planId: Int) extends Cost { | ||
override def compare(that: Cost): Int = that match { | ||
case GlutenCost(thatValue, thatId) => | ||
if (value < thatValue || (value == thatValue && planId != thatId)) -1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The planId is increasing, but the parent node may be removed, causing the id to become smaller, such as OptimizeOneRowPlan
, so use !=
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part is comparatively tricky and important in this PR so let's code it carefully. planId != thatId
is still a condition relying on caller code.
Would you check if we can use something like this:
class GlutenCost(val eval: CostEvaluator, val plan: SparkPlan) extends Cost {
override def compare(that: Cost): Int = that match {
case that: GlutenCost if plan eq that.plan =>
0
case that: GlutenCost if plan == that.plan =>
// Plans are identical. Considers the newer one as having lower cost.
-(plan.id - that.plan.id)
case that: GlutenCost =>
// Plans are different. Use the delegated cost evaluator.
assert(eval == that.eval)
eval.evaluateCost(plan).compare(eval.evaluateCost(that.plan))
case _ =>
throw QueryExecutionErrors.cannotCompareCostWithTargetCostError(that.toString)
}
override def hashCode(): Int = throw new UnsupportedOperationException()
override def equals(obj: Any): Boolean = obj match {
case that: Cost => compare(that) == 0
case _ => false
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
eval.evaluateCost(plan).compare(eval.evaluateCost(that.plan))
There seems to be a codependency issue here.
How about this:
class GlutenCost(value: Long, plan: SparkPlan) extends SimpleCost(value) {
override def compare(that: Cost): Int = that match {
case that: GlutenCost if plan eq that.plan =>
0
case that: GlutenCost if plan == that.plan =>
// Plans are identical. Considers the newer one as having lower cost.
-(plan.id - that.plan.id)
case _ =>
// Plans are different. Use the default cost compare.
super.compare(that)
}
override def hashCode(): Int = throw new UnsupportedOperationException()
override def equals(obj: Any): Boolean = obj match {
case that: Cost => compare(that) == 0
case _ => false
}
}
/** This [[CostEvaluator]] is to force use the new physical plan when cost is equal. */
case class GlutenCostEvaluator() extends CostEvaluator with SQLConfHelper {
override def evaluateCost(plan: SparkPlan): Cost = {
val forceOptimizeSkewedJoin = conf.getConf(SQLConf.ADAPTIVE_FORCE_OPTIMIZE_SKEWED_JOIN)
val simpleCost = SimpleCostEvaluator(forceOptimizeSkewedJoin)
.evaluateCost(plan)
.asInstanceOf[SimpleCost]
new GlutenCost(simpleCost.value, plan)
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
class GlutenCost(value: Long, plan: SparkPlan) extends SimpleCost(value)
I will not be into this approach. Class inherency (despite that it's a case-class inheritance) should be used only when no other ways.
There seems to be a codependency issue here.
There should not be codependency. val eval: CostEvaluator
is the one that used by Spark (SimpleCostEvaluator) rather than GlutenCostEvaluator. E.g.,
case class GlutenCostEvaluator() extends CostEvaluator with SQLConfHelper {
override def evaluateCost(plan: SparkPlan): Cost = {
val forceOptimizeSkewedJoin = conf.getConf(SQLConf.ADAPTIVE_FORCE_OPTIMIZE_SKEWED_JOIN)
new GlutenCost(SimpleCostEvaluator(forceOptimizeSkewedJoin), plan)
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SimpleCostEvaluator.evaluateCost return SimpleCost, not GlutenCost
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SimpleCostEvaluator.evaluateCost return SimpleCost, not GlutenCost
It's intentional. Gluten only chooses the newest plan when plans are equal. Otherwise we just compare against SimpleCost.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
class GlutenCost(val eval: CostEvaluator, val plan: SparkPlan) extends Cost {
override def compare(that: Cost): Int = that match {
case that: GlutenCost if plan eq that.plan =>
0
case that: GlutenCost if plan == that.plan =>
// Plans are identical. Considers the newer one as having lower cost.
-(plan.id - that.plan.id)
case that: GlutenCost =>
// Plans are different. Use the delegated cost evaluator.
assert(eval == that.eval)
eval.evaluateCost(plan).compare(eval.evaluateCost(that.plan))
case _ =>
throw QueryExecutionErrors.cannotCompareCostWithTargetCostError(that.toString)
}
override def hashCode(): Int = throw new UnsupportedOperationException()
override def equals(obj: Any): Boolean = obj match {
case that: Cost => compare(that) == 0
case _ => false
}
}
/** This [[CostEvaluator]] is to force use the new physical plan when cost is equal. */
case class GlutenCostEvaluator() extends CostEvaluator with SQLConfHelper {
override def evaluateCost(plan: SparkPlan): Cost = {
val forceOptimizeSkewedJoin = conf.getConf(SQLConf.ADAPTIVE_FORCE_OPTIMIZE_SKEWED_JOIN)
new GlutenCost(SimpleCostEvaluator(forceOptimizeSkewedJoin), plan)
}
}
smj.joinType, | ||
getBuildSide(smj), | ||
smj.condition, | ||
dropPartialSort(smj.left), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Strip local sort is a bit conflict with pull out project, there is a pre-project leak if we pull out expr from sort first but the sort is stripped later. Before the smj can never happen since we plan join at strategy side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don’t understand. Which rule is it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see PullOutPreProject
case class GlutenCost(value: Long, planId: Int) extends Cost { | ||
override def compare(that: Cost): Int = that match { | ||
case GlutenCost(thatValue, thatId) => | ||
if (value < thatValue || (value == thatValue && planId != thatId)) -1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does planId != thatId
needed ? When will two plan have same planId ?
val simpleCost = SimpleCostEvaluator(forceOptimizeSkewedJoin) | ||
.evaluateCost(plan) | ||
.asInstanceOf[SimpleCost] | ||
GlutenCost(simpleCost.value, plan.id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add one more config to crontrol this feature ? if disable then do nothing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about add GlutenConfig spark.gluten.sql.execution.adaptive.costEvaluator.enabled
, if true, use GlutenCostEvaluator
, else follow the configuration spark.sql.adaptive.customCostEvaluatorClass
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, besides, we'd better to respect spark.gluten.enabled even costEvaluator is enabled. People may tune spark.gluten.enabled off at runtime.
Run Gluten Clickhouse CI |
3 similar comments
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
1 similar comment
Run Gluten Clickhouse CI |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
% a rebase. Thanks.
Run Gluten Clickhouse CI |
1 similar comment
Run Gluten Clickhouse CI |
private val nodeNameMap = Map( | ||
"SortMergeJoin" -> "ShuffledHashJoin", | ||
"SortMergeJoin(skew=true)" -> "ShuffledHashJoin(skew=true)") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this mapping needed? This is like an exception comparing to other rewrites.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
year, other rules are to add nodes, and rewriteJoin is to replace nodes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @ulysses-you
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use nodeName to get original plan is bit strict. It seems the only added node is ProjectExec. We can skip all ProjectExec and make sure there is only one target rewritten plan. e.g.:
- private def getTransformHintBack(
- origin: SparkPlan,
- rewrittenPlan: SparkPlan): Option[TransformHint] = {
- // The rewritten plan may contain more nodes than origin, here use the node name to get it back
+ private def getTransformHintBack(rewrittenPlan: SparkPlan): Option[TransformHint] = {
+ // The rewritten plan may contain more nodes than origin, for now it should only be
+ // `ProjectExec`.
val target = rewrittenPlan.collect {
- case p if p.nodeName == origin.nodeName => p
+ case p if !p.isInstanceOf[ProjectExec] => p
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated, also excludes RewrittenNodeWall
.
Run Gluten Clickhouse CI |
1 similar comment
Run Gluten Clickhouse CI |
@zml1206 It seems clickhouse failed test is related, jenkins account: |
Run Gluten Clickhouse CI |
1 similar comment
Run Gluten Clickhouse CI |
case smj: SortMergeJoinExec if GlutenConfig.getConf.forceShuffledHashJoin => | ||
getBuildSide(smj.joinType) match { | ||
case Some(buildSide) => | ||
ShuffledHashJoinExec( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move drop local sort code into this rule ? The sort is a part pf smj, we should handle them together. We do not need introduce SortUtils, just move code into this rule.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RewriteSparkPlanRulesManager
currently handles a single node, and simple move code cannot be achieved.
Is it possible to consider whether it can be optimized in future pr?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, keep SortUtils
for use it later for sortAgg.
} | ||
|
||
override def rewrite(plan: SparkPlan): SparkPlan = plan match { | ||
case smj: SortMergeJoinExec if GlutenConfig.getConf.forceShuffledHashJoin => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: better to define val forceShuffledHashJoin = GlutenConfig.getConf.forceShuffledHashJoin
as the class member variables
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RewriteJoin is object, define val
cannot be modified forceShuffledHashJoin dynamically.
@@ -421,7 +395,7 @@ case class AddTransformHintRule() extends Rule[SparkPlan] { | |||
plan.leftKeys, | |||
plan.rightKeys, | |||
plan.joinType, | |||
TransformHints.getShuffleHashJoinBuildSide(plan), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need change it? The build side may be changed when we offload shj, and we should use the target build side to do validate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about it carefully. The buildSide itself is the legal buildSide in the corresponding backends obtained. The buildSide will not be verified in doValidate, and it should not be done in the future. If there are changes in the future, the corresponding supportHashBuildJoinTypeOnLeft and supportHashBuildJoinTypeOnRight should be changed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The buildSide here is from vanilla Spark rather than Gluten backends.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, but I think there is no need to convert here, because do validate does not require buildside.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should still stay consistent to avoid risks, updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should match SortMergeJoinExec
in mayNeedRewrite
. The CI did not fail due to the SortMergeJoinExec
always have SortExec
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mayNeedRewrite
match BaseJoinExec
, it contains SortMergeJoinExec
.
Run Gluten Clickhouse CI |
1 similar comment
Run Gluten Clickhouse CI |
/Benchmark Velox TPCDS |
1 similar comment
/Benchmark Velox TPCDS |
There seems to be a problem with the benchmark workflow and it keeps being queued. @ulysses-you cc @zhztheplayer |
/Benchmark Velox TPCDS |
===== Performance report for TPCDS SF2000 with Velox backend, for reference only ====
|
fix fix fix fix ut fix ut update fix update fix fix fix update update golden file split add cost evaluator fix update update update fix refactor to convert SortMergeJoinExec to ShuffledHashJoinExec update fix fix update fix fix ck ut fix style move drop local sort into RewriteJoin update
Resolve conflicts |
Run Gluten Clickhouse CI |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, thank you @zml1206
Thank you very much for your patiently review. @ulysses-you @zhztheplayer |
What changes were proposed in this pull request?
How was this patch tested?