Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
zml1206 committed Jun 14, 2024
1 parent 482a4d8 commit bdbcf40
Show file tree
Hide file tree
Showing 13 changed files with 132 additions and 229 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ object ShuffledHashJoinExecTransformerBase {
if (rightSize <= leftSize) BuildRight else BuildLeft
// Only the ShuffledHashJoinExec generated directly in some spark tests is not link
// logical plan, such as OuterJoinSuite.
case _ => buildSide
case _ => Option(buildSide).getOrElse(BuildRight)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1121,9 +1121,6 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenJoinSuite]
// exclude as it check spark plan
.exclude("SPARK-36794: Ignore duplicated key when building relation for semi/anti hash join")
// exclude as it check for SMJ node
.exclude(
"SPARK-43113: Full outer join with duplicate stream-side references in condition (SMJ)")
enableSuite[GlutenMathFunctionsSuite]
enableSuite[GlutenMetadataCacheSuite]
.exclude("SPARK-16336,SPARK-27961 Suggest fixing FileNotFoundException")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,4 @@ class GlutenJoinSuite extends JoinSuite with GlutenSQLTestsTrait {
|""".stripMargin
checkAnswer(spark.sql(sql), Seq(Row(0, 1), Row(1, 2), Row(2, 3)))
}

testGluten(
"SPARK-43113: Full outer join with duplicate stream-side" +
" references in condition (SHJ)") {
def check(plan: SparkPlan): Unit = {
assert(collect(plan) { case _: ShuffledHashJoinExec => true }.size === 1)
}

dupStreamSideColTest("MERGE", check)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1138,9 +1138,6 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenJoinSuite]
// exclude as it check spark plan
.exclude("SPARK-36794: Ignore duplicated key when building relation for semi/anti hash join")
// exclude as it check for SMJ node
.exclude(
"SPARK-43113: Full outer join with duplicate stream-side references in condition (SMJ)")
enableSuite[GlutenMathFunctionsSuite]
enableSuite[GlutenMetadataCacheSuite]
.exclude("SPARK-16336,SPARK-27961 Suggest fixing FileNotFoundException")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.spark.sql

import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec

class GlutenJoinSuite extends JoinSuite with GlutenSQLTestsTrait {

Expand Down Expand Up @@ -57,14 +56,4 @@ class GlutenJoinSuite extends JoinSuite with GlutenSQLTestsTrait {
|""".stripMargin
checkAnswer(spark.sql(sql), Seq(Row(0, 1), Row(1, 2), Row(2, 3)))
}

testGluten(
"SPARK-43113: Full outer join with duplicate stream-side" +
" references in condition (SHJ)") {
def check(plan: SparkPlan): Unit = {
assert(collect(plan) { case _: ShuffledHashJoinExec => true }.size === 1)
}

dupStreamSideColTest("MERGE", check)
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.adaptive

import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.joins.ShuffledJoin

/**
* This [[CostEvaluator]] is to force use the new physical plan when AQE's new physical plan
* contains [[ShuffledJoin]] and cost is the same.
*/
case class GlutenCostEvaluator() extends CostEvaluator {
override def evaluateCost(plan: SparkPlan): Cost = {
val simpleCost =
SimpleCostEvaluator.evaluateCost(plan).asInstanceOf[SimpleCost]
var cost = simpleCost.value * 2
if (existsShuffledJoin(plan)) {
plan.logicalLink.map(_.stats.isRuntime) match {
case Some(true) => cost -= 1
case _ =>
}
}
SimpleCost(cost)
}

def existsShuffledJoin(plan: SparkPlan): Boolean = if (plan.isInstanceOf[ShuffledJoin]) {
true
} else {
plan.children.exists(existsShuffledJoin(_))
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.spark.sql.execution.adaptive

import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.plans.logical.Join
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.joins.ShuffledJoin
import org.apache.spark.sql.internal.SQLConf
Expand All @@ -33,13 +32,8 @@ case class GlutenCostEvaluator() extends CostEvaluator with SQLConfHelper {
SimpleCostEvaluator(forceOptimizeSkewedJoin).evaluateCost(plan).asInstanceOf[SimpleCost]
var cost = simpleCost.value * 2
if (plan.exists(_.isInstanceOf[ShuffledJoin])) {
// If the children of outer Join are all LogicalQueryStage, it means that the join has
// been reOptimized.
plan.logicalLink.map(_.collect { case j: Join => j }) match {
case Some(joins)
if joins(0).left.isInstanceOf[LogicalQueryStage]
&& joins(0).right.isInstanceOf[LogicalQueryStage] =>
cost -= 1
plan.logicalLink.map(_.stats.isRuntime) match {
case Some(true) => cost -= 1
case _ =>
}
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.adaptive

import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.joins.ShuffledJoin
import org.apache.spark.sql.internal.SQLConf

/**
* This [[CostEvaluator]] is to force use the new physical plan when AQE's new physical plan
* contains [[ShuffledJoin]] and cost is the same.
*/
case class GlutenCostEvaluator() extends CostEvaluator with SQLConfHelper {
override def evaluateCost(plan: SparkPlan): Cost = {
val forceOptimizeSkewedJoin = conf.getConf(SQLConf.ADAPTIVE_FORCE_OPTIMIZE_SKEWED_JOIN)
val simpleCost =
SimpleCostEvaluator(forceOptimizeSkewedJoin).evaluateCost(plan).asInstanceOf[SimpleCost]
var cost = simpleCost.value * 2
if (plan.exists(_.isInstanceOf[ShuffledJoin])) {
plan.logicalLink.map(_.stats.isRuntime) match {
case Some(true) => cost -= 1
case _ =>
}
}
SimpleCost(cost)
}
}

This file was deleted.

Loading

0 comments on commit bdbcf40

Please sign in to comment.