Skip to content

Commit

Permalink
[CORE] Add custom cost evaluator for optimize buildSide of shuffled h…
Browse files Browse the repository at this point in the history
…ash join
  • Loading branch information
zml1206 committed Jun 19, 2024
1 parent 7ac9983 commit 7798a81
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,12 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging {
}
conf.set(SPARK_SESSION_EXTS_KEY, extensions)

// adaptive custom cost evaluator class
if (GlutenConfig.getConf.enableGluten && GlutenConfig.getConf.enableGlutenCostEvaluator) {
val costEvaluator = "org.apache.spark.sql.execution.adaptive.GlutenCostEvaluator"
conf.set(SQLConf.ADAPTIVE_CUSTOM_COST_EVALUATOR_CLASS.key, costEvaluator)
}

// check memory off-heap enabled and size
val minOffHeapSize = "1MB"
if (
Expand Down
15 changes: 15 additions & 0 deletions shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,8 @@ class GlutenConfig(conf: SQLConf) extends Logging {

def enableCastAvgAggregateFunction: Boolean = conf.getConf(COLUMNAR_NATIVE_CAST_AGGREGATE_ENABLED)

def enableGlutenCostEvaluator: Boolean = conf.getConf(COST_EVALUATOR_ENABLED)

def dynamicOffHeapSizingEnabled: Boolean =
conf.getConf(DYNAMIC_OFFHEAP_SIZING_ENABLED)

Expand Down Expand Up @@ -595,6 +597,8 @@ object GlutenConfig {
val GLUTEN_DYNAMIC_OFFHEAP_SIZING_MEMORY_FRACTION =
"spark.gluten.memory.dynamic.offHeap.sizing.memory.fraction"

val GLUTEN_COST_EVALUATOR_ENABLED = "spark.gluten.sql.execution.adaptive.costEvaluator.enabled"

var ins: GlutenConfig = _

def getConf: GlutenConfig = {
Expand Down Expand Up @@ -1962,6 +1966,17 @@ object GlutenConfig {
.booleanConf
.createWithDefault(true)

val COST_EVALUATOR_ENABLED =
buildConf(GlutenConfig.GLUTEN_COST_EVALUATOR_ENABLED)
.internal()
.doc(
"If true and gluten enabled, use " +
"org.apache.spark.sql.execution.adaptive.GlutenCostEvaluator as custom cost " +
"evaluator class, else follow the configuration " +
"org.apache.spark.sql.execution.adaptive.GlutenCostEvaluator.")
.booleanConf
.createWithDefault(true)

val DYNAMIC_OFFHEAP_SIZING_ENABLED =
buildConf(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED)
.internal()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.adaptive

import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.SparkPlan

class GlutenCost(val eval: CostEvaluator, val plan: SparkPlan) extends Cost {
override def compare(that: Cost): Int = that match {
case that: GlutenCost if plan eq that.plan =>
0
case that: GlutenCost if plan == that.plan =>
// Plans are identical. Considers the newer one as having lower cost.
-(plan.id - that.plan.id)
case that: GlutenCost =>
// Plans are different. Use the delegated cost evaluator.
assert(eval == that.eval)
eval.evaluateCost(plan).compare(eval.evaluateCost(that.plan))
case _ =>
throw QueryExecutionErrors.cannotCompareCostWithTargetCostError(that.toString)
}

override def hashCode(): Int = throw new UnsupportedOperationException()

override def equals(obj: Any): Boolean = obj match {
case that: Cost => compare(that) == 0
case _ => false
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.adaptive

import org.apache.spark.sql.execution.SparkPlan

/** This [[CostEvaluator]] is to force use the new physical plan when cost is equal. */
case class GlutenCostEvaluator() extends CostEvaluator {
override def evaluateCost(plan: SparkPlan): Cost = {
new GlutenCost(SimpleCostEvaluator, plan)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.adaptive

import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.internal.SQLConf

/** This [[CostEvaluator]] is to force use the new physical plan when cost is equal. */
case class GlutenCostEvaluator() extends CostEvaluator with SQLConfHelper {
override def evaluateCost(plan: SparkPlan): Cost = {
val forceOptimizeSkewedJoin = conf.getConf(SQLConf.ADAPTIVE_FORCE_OPTIMIZE_SKEWED_JOIN)
new GlutenCost(SimpleCostEvaluator(forceOptimizeSkewedJoin), plan)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.adaptive

import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.internal.SQLConf

/** This [[CostEvaluator]] is to force use the new physical plan when cost is equal. */
case class GlutenCostEvaluator() extends CostEvaluator with SQLConfHelper {
override def evaluateCost(plan: SparkPlan): Cost = {
val forceOptimizeSkewedJoin = conf.getConf(SQLConf.ADAPTIVE_FORCE_OPTIMIZE_SKEWED_JOIN)
new GlutenCost(SimpleCostEvaluator(forceOptimizeSkewedJoin), plan)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.adaptive

import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.internal.SQLConf

/** This [[CostEvaluator]] is to force use the new physical plan when cost is equal. */
case class GlutenCostEvaluator() extends CostEvaluator with SQLConfHelper {
override def evaluateCost(plan: SparkPlan): Cost = {
val forceOptimizeSkewedJoin = conf.getConf(SQLConf.ADAPTIVE_FORCE_OPTIMIZE_SKEWED_JOIN)
new GlutenCost(SimpleCostEvaluator(forceOptimizeSkewedJoin), plan)
}
}

0 comments on commit 7798a81

Please sign in to comment.