Skip to content

Commit

Permalink
make as a query level config
Browse files Browse the repository at this point in the history
  • Loading branch information
zml1206 committed Jun 19, 2024
1 parent 51b4911 commit f75f346
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ object GlutenConfig {
val GLUTEN_DYNAMIC_OFFHEAP_SIZING_MEMORY_FRACTION =
"spark.gluten.memory.dynamic.offHeap.sizing.memory.fraction"

val GLUTEN_COST_EVALUATOR_ENABLED = "spark.gluten.sql.execution.adaptive.costEvaluator.enabled"
val GLUTEN_COST_EVALUATOR_ENABLED = "spark.gluten.sql.adaptive.costEvaluator.enabled"

var ins: GlutenConfig = _

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,17 @@
*/
package org.apache.spark.sql.execution.adaptive

import org.apache.gluten.GlutenConfig

import org.apache.spark.sql.execution.SparkPlan

/** This [[CostEvaluator]] is to force use the new physical plan when cost is equal. */
case class GlutenCostEvaluator() extends CostEvaluator {
override def evaluateCost(plan: SparkPlan): Cost = {
new GlutenCost(SimpleCostEvaluator, plan)
if (GlutenConfig.getConf.enableGluten) {
new GlutenCost(SimpleCostEvaluator, plan)
} else {
SimpleCostEvaluator.evaluateCost(plan)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.execution.adaptive

import org.apache.gluten.GlutenConfig

import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.internal.SQLConf
Expand All @@ -24,6 +26,10 @@ import org.apache.spark.sql.internal.SQLConf
case class GlutenCostEvaluator() extends CostEvaluator with SQLConfHelper {
override def evaluateCost(plan: SparkPlan): Cost = {
val forceOptimizeSkewedJoin = conf.getConf(SQLConf.ADAPTIVE_FORCE_OPTIMIZE_SKEWED_JOIN)
new GlutenCost(SimpleCostEvaluator(forceOptimizeSkewedJoin), plan)
if (GlutenConfig.getConf.enableGluten) {
new GlutenCost(SimpleCostEvaluator(forceOptimizeSkewedJoin), plan)
} else {
SimpleCostEvaluator(forceOptimizeSkewedJoin).evaluateCost(plan)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.execution.adaptive

import org.apache.gluten.GlutenConfig

import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.internal.SQLConf
Expand All @@ -24,6 +26,10 @@ import org.apache.spark.sql.internal.SQLConf
case class GlutenCostEvaluator() extends CostEvaluator with SQLConfHelper {
override def evaluateCost(plan: SparkPlan): Cost = {
val forceOptimizeSkewedJoin = conf.getConf(SQLConf.ADAPTIVE_FORCE_OPTIMIZE_SKEWED_JOIN)
new GlutenCost(SimpleCostEvaluator(forceOptimizeSkewedJoin), plan)
if (GlutenConfig.getConf.enableGluten) {
new GlutenCost(SimpleCostEvaluator(forceOptimizeSkewedJoin), plan)
} else {
SimpleCostEvaluator(forceOptimizeSkewedJoin).evaluateCost(plan)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.execution.adaptive

import org.apache.gluten.GlutenConfig

import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.internal.SQLConf
Expand All @@ -24,6 +26,10 @@ import org.apache.spark.sql.internal.SQLConf
case class GlutenCostEvaluator() extends CostEvaluator with SQLConfHelper {
override def evaluateCost(plan: SparkPlan): Cost = {
val forceOptimizeSkewedJoin = conf.getConf(SQLConf.ADAPTIVE_FORCE_OPTIMIZE_SKEWED_JOIN)
new GlutenCost(SimpleCostEvaluator(forceOptimizeSkewedJoin), plan)
if (GlutenConfig.getConf.enableGluten) {
new GlutenCost(SimpleCostEvaluator(forceOptimizeSkewedJoin), plan)
} else {
SimpleCostEvaluator(forceOptimizeSkewedJoin).evaluateCost(plan)
}
}
}

0 comments on commit f75f346

Please sign in to comment.