Skip to content

Commit

Permalink
Move test to gluten-ut
Browse files Browse the repository at this point in the history
  • Loading branch information
liujiayi771 committed Dec 7, 2023
1 parent b5c01cb commit f9d90a9
Show file tree
Hide file tree
Showing 13 changed files with 416 additions and 235 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.optimizer.CollapseProjectShim
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkPlan

object CollapseProjectTransformer extends Rule[SparkPlan] {
object CollapseProjectExecTransformer extends Rule[SparkPlan] {

override def apply(plan: SparkPlan): SparkPlan = {
if (!GlutenConfig.getConf.enableColumnarProjectCollapse) {
Expand All @@ -51,7 +51,7 @@ object CollapseProjectTransformer extends Rule[SparkPlan] {

/**
* In Velox, CreateNamedStruct will generate a special output named obj, We cannot collapse such
* ProjectTransformer, otherwise it will result in a bind reference failure.
* project transformer, otherwise it will result in a bind reference failure.
*/
private def containsNamedStructAlias(projectList: Seq[NamedExpression]): Boolean = {
projectList.exists {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,7 @@ case class ColumnarOverrideRules(session: SparkSession)
(_: SparkSession) => TransformPreOverrides(isAdaptiveContext),
(spark: SparkSession) => RewriteTransformer(spark),
(_: SparkSession) => EnsureLocalSortRequirements,
(_: SparkSession) => CollapseProjectTransformer
(_: SparkSession) => CollapseProjectExecTransformer
) :::
BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarPreRules() :::
SparkRuleUtil.extendedColumnarRules(session, GlutenConfig.getConf.extendedColumnarPreRules)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ package org.apache.spark.sql

import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.util.{sideBySide, stackTraceToString}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, ShuffleQueryStageExec}

import org.apache.commons.io.FileUtils
import org.apache.commons.math3.util.Precision
Expand Down Expand Up @@ -75,6 +76,52 @@ trait GlutenSQLTestsTrait extends QueryTest with GlutenSQLTestsBaseTrait {

GlutenQueryTest.checkAnswer(analyzedDF, expectedAnswer)
}

/**
* Get all the children plan of plans.
*
* @param plans
* : the input plans.
* @return
*/
def getChildrenPlan(plans: Seq[SparkPlan]): Seq[SparkPlan] = {
if (plans.isEmpty) {
return Seq()
}

val inputPlans: Seq[SparkPlan] = plans.map {
case stage: ShuffleQueryStageExec => stage.plan
case plan => plan
}

var newChildren: Seq[SparkPlan] = Seq()
inputPlans.foreach {
plan =>
newChildren = newChildren ++ getChildrenPlan(plan.children)
// To avoid duplication of WholeStageCodegenXXX and its children.
if (!plan.nodeName.startsWith("WholeStageCodegen")) {
newChildren = newChildren :+ plan
}
}
newChildren
}

/**
* Get the executed plan of a data frame.
*
* @param df
* : dataframe.
* @return
* A sequence of executed plans.
*/
def getExecutedPlan(df: DataFrame): Seq[SparkPlan] = {
df.queryExecution.executedPlan match {
case exec: AdaptiveSparkPlanExec =>
getChildrenPlan(Seq(exec.executedPlan))
case plan =>
getChildrenPlan(Seq(plan))
}
}
}

object GlutenQueryTest extends Assertions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.datasources.text.{GlutenTextV1Suite, Glute
import org.apache.spark.sql.execution.datasources.v2.GlutenFileTableSuite
import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite
import org.apache.spark.sql.execution.joins.{GlutenBroadcastJoinSuite, GlutenExistenceJoinSuite, GlutenInnerJoinSuite, GlutenOuterJoinSuite}
import org.apache.spark.sql.extension.{GlutenCustomerExpressionTransformerSuite, GlutenCustomerExtensionSuite, GlutenSessionExtensionSuite}
import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenCustomerExpressionTransformerSuite, GlutenCustomerExtensionSuite, GlutenSessionExtensionSuite}
import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite
import org.apache.spark.sql.sources.{GlutenBucketedReadWithoutHiveSupportSuite, GlutenBucketedWriteWithoutHiveSupportSuite, GlutenCreateTableAsSelectSuite, GlutenDDLSourceLoadSuite, GlutenDisableUnnecessaryBucketedScanWithoutHiveSupportSuite, GlutenDisableUnnecessaryBucketedScanWithoutHiveSupportSuiteAE, GlutenExternalCommandRunnerSuite, GlutenFilteredScanSuite, GlutenFiltersSuite, GlutenInsertSuite, GlutenPartitionedWriteSuite, GlutenPathOptionSuite, GlutenPrunedScanSuite, GlutenResolvedDataSourceSuite, GlutenSaveLoadSuite, GlutenTableScanSuite}

Expand Down Expand Up @@ -1078,6 +1078,7 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("SPARK-33687: analyze all tables in a specific database")
enableSuite[FallbackStrategiesSuite]
enableSuite[GlutenHiveSQLQuerySuite]
enableSuite[GlutenCollapseProjectExecTransformerSuite]

override def getSQLQueryTestSettings: SQLQueryTestSettings = VeloxSQLQueryTestSettings
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.extension

import io.glutenproject.GlutenConfig
import io.glutenproject.execution.ProjectExecTransformer

import org.apache.spark.sql.GlutenSQLTestsTrait
import org.apache.spark.sql.Row

class GlutenCollapseProjectExecTransformerSuite extends GlutenSQLTestsTrait {

import testImplicits._

test("Support ProjectExecTransformer collapse") {
val query =
"""
|SELECT
| o_orderpriority
|FROM
| orders
|WHERE
| o_shippriority >= 0
| AND EXISTS (
| SELECT
| *
| FROM
| lineitem
| WHERE
| l_orderkey = o_orderkey
| AND l_linenumber < 10
| )
|ORDER BY
| o_orderpriority
|LIMIT
| 100;
|""".stripMargin

val ordersData = Seq[(Int, Int, String)](
(30340, 1, "3-MEDIUM"),
(31140, 1, "1-URGENT"),
(31940, 1, "2-HIGH"),
(32740, 1, "3-MEDIUM"),
(33540, 1, "5-LOW"),
(34340, 1, "2-HIGH"),
(35140, 1, "3-MEDIUM"),
(35940, 1, "1-URGENT"),
(36740, 1, "3-MEDIUM"),
(37540, 1, "4-NOT SPECIFIED")
)
val lineitemData = Seq[(Int, Int, String)](
(30340, 1, "F"),
(31140, 4, "F"),
(31940, 7, "O"),
(32740, 6, "O"),
(33540, 2, "F"),
(34340, 3, "F"),
(35140, 1, "O"),
(35940, 2, "F"),
(36740, 3, "F"),
(37540, 5, "O")
)
withTable("orders", "lineitem") {
ordersData
.toDF("o_orderkey", "o_shippriority", "o_orderpriority")
.write
.format("parquet")
.saveAsTable("orders")
lineitemData
.toDF("l_orderkey", "l_linenumber", "l_linestatus")
.write
.format("parquet")
.saveAsTable("lineitem")
Seq(true, false).foreach {
collapsed =>
withSQLConf(
GlutenConfig.ENABLE_COLUMNAR_PROJECT_COLLAPSE.key -> collapsed.toString,
"spark.sql.autoBroadcastJoinThreshold" -> "-1") {
val df = sql(query)
checkAnswer(
df,
Seq(
Row("1-URGENT"),
Row("1-URGENT"),
Row("2-HIGH"),
Row("2-HIGH"),
Row("3-MEDIUM"),
Row("3-MEDIUM"),
Row("3-MEDIUM"),
Row("3-MEDIUM"),
Row("4-NOT SPECIFIED"),
Row("5-LOW")
)
)
assert(
getExecutedPlan(df).exists {
case _ @ProjectExecTransformer(_, _: ProjectExecTransformer) => true
case _ => false
} == !collapsed
)
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import org.apache.spark.sql.{DataFrame, GlutenSQLTestsTrait, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, ShuffleQueryStageExec}
import org.apache.spark.sql.hive.{HiveTableScanExecTransformer, HiveUtils}
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}

Expand Down Expand Up @@ -98,44 +96,6 @@ class GlutenHiveSQLQuerySuite extends GlutenSQLTestsTrait {
* : the input plans.
* @return
*/
def getChildrenPlan(plans: Seq[SparkPlan]): Seq[SparkPlan] = {
if (plans.isEmpty) {
return Seq()
}

val inputPlans: Seq[SparkPlan] = plans.map {
case stage: ShuffleQueryStageExec => stage.plan
case plan => plan
}

var newChildren: Seq[SparkPlan] = Seq()
inputPlans.foreach {
plan =>
newChildren = newChildren ++ getChildrenPlan(plan.children)
// To avoid duplication of WholeStageCodegenXXX and its children.
if (!plan.nodeName.startsWith("WholeStageCodegen")) {
newChildren = newChildren :+ plan
}
}
newChildren
}

/**
* Get the executed plan of a data frame.
*
* @param df
* : dataframe.
* @return
* A sequence of executed plans.
*/
def getExecutedPlan(df: DataFrame): Seq[SparkPlan] = {
df.queryExecution.executedPlan match {
case exec: AdaptiveSparkPlanExec =>
getChildrenPlan(Seq(exec.executedPlan))
case plan =>
getChildrenPlan(Seq(plan))
}
}

def checkOperatorMatch[T <: TransformSupport](df: DataFrame)(implicit tag: ClassTag[T]): Unit = {
val executedPlan = getExecutedPlan(df)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.datasources.text.{GlutenTextV1Suite, Glute
import org.apache.spark.sql.execution.datasources.v2.{GlutenDataSourceV2StrategySuite, GlutenFileTableSuite, GlutenV2PredicateSuite}
import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite
import org.apache.spark.sql.execution.joins.{GlutenBroadcastJoinSuite, GlutenExistenceJoinSuite, GlutenInnerJoinSuite, GlutenOuterJoinSuite}
import org.apache.spark.sql.extension.{GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer}
import org.apache.spark.sql.extension.{GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer, GlutenCollapseProjectExecTransformerSuite}
import org.apache.spark.sql.gluten.GlutenFallbackSuite
import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite
import org.apache.spark.sql.sources.{GlutenBucketedReadWithoutHiveSupportSuite, GlutenBucketedWriteWithoutHiveSupportSuite, GlutenCreateTableAsSelectSuite, GlutenDDLSourceLoadSuite, GlutenDisableUnnecessaryBucketedScanWithoutHiveSupportSuite, GlutenDisableUnnecessaryBucketedScanWithoutHiveSupportSuiteAE, GlutenExternalCommandRunnerSuite, GlutenFilteredScanSuite, GlutenFiltersSuite, GlutenInsertSuite, GlutenPartitionedWriteSuite, GlutenPathOptionSuite, GlutenPrunedScanSuite, GlutenResolvedDataSourceSuite, GlutenSaveLoadSuite, GlutenTableScanSuite}
Expand Down Expand Up @@ -1148,6 +1148,7 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenFallbackSuite]
enableSuite[GlutenHiveSQLQuerySuite]
enableSuite[GlutenImplicitsTest]
enableSuite[GlutenCollapseProjectExecTransformerSuite]

override def getSQLQueryTestSettings: SQLQueryTestSettings = VeloxSQLQueryTestSettings
}
Expand Down
Loading

0 comments on commit f9d90a9

Please sign in to comment.