Skip to content

Commit

Permalink
Move test to gluten-ut# This is a combination of 2 commits.
Browse files Browse the repository at this point in the history
fix
  • Loading branch information
liujiayi771 committed Dec 8, 2023
1 parent 54aee9c commit 68055d2
Show file tree
Hide file tree
Showing 14 changed files with 416 additions and 234 deletions.

This file was deleted.

4 changes: 2 additions & 2 deletions ep/build-clickhouse/src/build_clickhouse.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ done

echo ${GLUTEN_SOURCE}

export CC=${CC:-clang-15}
export CXX=${CXX:-clang++-15}
export CC=${CC:-clang-16}
export CXX=${CXX:-clang++-16}
cmake -G Ninja -S ${GLUTEN_SOURCE}/cpp-ch -B ${GLUTEN_SOURCE}/cpp-ch/build_ch -DCH_SOURCE_DIR=${CH_SOURCE_DIR} "-DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}"
cmake --build ${GLUTEN_SOURCE}/cpp-ch/build_ch --target build_ch
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.optimizer.CollapseProjectShim
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkPlan

object CollapseProjectTransformer extends Rule[SparkPlan] {
object CollapseProjectExecTransformer extends Rule[SparkPlan] {

override def apply(plan: SparkPlan): SparkPlan = {
if (!GlutenConfig.getConf.enableColumnarProjectCollapse) {
Expand All @@ -51,7 +51,7 @@ object CollapseProjectTransformer extends Rule[SparkPlan] {

/**
* In Velox, CreateNamedStruct will generate a special output named obj, We cannot collapse such
* ProjectTransformer, otherwise it will result in a bind reference failure.
* project transformer, otherwise it will result in a bind reference failure.
*/
private def containsNamedStructAlias(projectList: Seq[NamedExpression]): Boolean = {
projectList.exists {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -799,7 +799,7 @@ case class ColumnarOverrideRules(session: SparkSession)
(_: SparkSession) => TransformPreOverrides(isAdaptiveContext),
(spark: SparkSession) => RewriteTransformer(spark),
(_: SparkSession) => EnsureLocalSortRequirements,
(_: SparkSession) => CollapseProjectTransformer
(_: SparkSession) => CollapseProjectExecTransformer
) :::
BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarPreRules() :::
SparkRuleUtil.extendedColumnarRules(session, GlutenConfig.getConf.extendedColumnarPreRules)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ package org.apache.spark.sql

import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.util.{sideBySide, stackTraceToString}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, ShuffleQueryStageExec}

import org.apache.commons.io.FileUtils
import org.apache.commons.math3.util.Precision
Expand Down Expand Up @@ -75,6 +76,52 @@ trait GlutenSQLTestsTrait extends QueryTest with GlutenSQLTestsBaseTrait {

GlutenQueryTest.checkAnswer(analyzedDF, expectedAnswer)
}

/**
* Get all the children plan of plans.
*
* @param plans
* : the input plans.
* @return
*/
def getChildrenPlan(plans: Seq[SparkPlan]): Seq[SparkPlan] = {
if (plans.isEmpty) {
return Seq()
}

val inputPlans: Seq[SparkPlan] = plans.map {
case stage: ShuffleQueryStageExec => stage.plan
case plan => plan
}

var newChildren: Seq[SparkPlan] = Seq()
inputPlans.foreach {
plan =>
newChildren = newChildren ++ getChildrenPlan(plan.children)
// To avoid duplication of WholeStageCodegenXXX and its children.
if (!plan.nodeName.startsWith("WholeStageCodegen")) {
newChildren = newChildren :+ plan
}
}
newChildren
}

/**
* Get the executed plan of a data frame.
*
* @param df
* : dataframe.
* @return
* A sequence of executed plans.
*/
def getExecutedPlan(df: DataFrame): Seq[SparkPlan] = {
df.queryExecution.executedPlan match {
case exec: AdaptiveSparkPlanExec =>
getChildrenPlan(Seq(exec.executedPlan))
case plan =>
getChildrenPlan(Seq(plan))
}
}
}

object GlutenQueryTest extends Assertions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.datasources.text.{GlutenTextV1Suite, Glute
import org.apache.spark.sql.execution.datasources.v2.GlutenFileTableSuite
import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite
import org.apache.spark.sql.execution.joins.{GlutenBroadcastJoinSuite, GlutenExistenceJoinSuite, GlutenInnerJoinSuite, GlutenOuterJoinSuite}
import org.apache.spark.sql.extension.{GlutenCustomerExpressionTransformerSuite, GlutenCustomerExtensionSuite, GlutenSessionExtensionSuite}
import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenCustomerExpressionTransformerSuite, GlutenCustomerExtensionSuite, GlutenSessionExtensionSuite}
import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite
import org.apache.spark.sql.sources.{GlutenBucketedReadWithoutHiveSupportSuite, GlutenBucketedWriteWithoutHiveSupportSuite, GlutenCreateTableAsSelectSuite, GlutenDDLSourceLoadSuite, GlutenDisableUnnecessaryBucketedScanWithoutHiveSupportSuite, GlutenDisableUnnecessaryBucketedScanWithoutHiveSupportSuiteAE, GlutenExternalCommandRunnerSuite, GlutenFilteredScanSuite, GlutenFiltersSuite, GlutenInsertSuite, GlutenPartitionedWriteSuite, GlutenPathOptionSuite, GlutenPrunedScanSuite, GlutenResolvedDataSourceSuite, GlutenSaveLoadSuite, GlutenTableScanSuite}

Expand Down Expand Up @@ -1078,6 +1078,7 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("SPARK-33687: analyze all tables in a specific database")
enableSuite[FallbackStrategiesSuite]
enableSuite[GlutenHiveSQLQuerySuite]
enableSuite[GlutenCollapseProjectExecTransformerSuite]

override def getSQLQueryTestSettings: SQLQueryTestSettings = VeloxSQLQueryTestSettings
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.extension

import io.glutenproject.GlutenConfig
import io.glutenproject.execution.ProjectExecTransformer

import org.apache.spark.sql.GlutenSQLTestsTrait
import org.apache.spark.sql.Row

class GlutenCollapseProjectExecTransformerSuite extends GlutenSQLTestsTrait {

import testImplicits._

test("Support ProjectExecTransformer collapse") {
val query =
"""
|SELECT
| o_orderpriority
|FROM
| orders
|WHERE
| o_shippriority >= 0
| AND EXISTS (
| SELECT
| *
| FROM
| lineitem
| WHERE
| l_orderkey = o_orderkey
| AND l_linenumber < 10
| )
|ORDER BY
| o_orderpriority
|LIMIT
| 100;
|""".stripMargin

val ordersData = Seq[(Int, Int, String)](
(30340, 1, "3-MEDIUM"),
(31140, 1, "1-URGENT"),
(31940, 1, "2-HIGH"),
(32740, 1, "3-MEDIUM"),
(33540, 1, "5-LOW"),
(34340, 1, "2-HIGH"),
(35140, 1, "3-MEDIUM"),
(35940, 1, "1-URGENT"),
(36740, 1, "3-MEDIUM"),
(37540, 1, "4-NOT SPECIFIED")
)
val lineitemData = Seq[(Int, Int, String)](
(30340, 1, "F"),
(31140, 4, "F"),
(31940, 7, "O"),
(32740, 6, "O"),
(33540, 2, "F"),
(34340, 3, "F"),
(35140, 1, "O"),
(35940, 2, "F"),
(36740, 3, "F"),
(37540, 5, "O")
)
withTable("orders", "lineitem") {
ordersData
.toDF("o_orderkey", "o_shippriority", "o_orderpriority")
.write
.format("parquet")
.saveAsTable("orders")
lineitemData
.toDF("l_orderkey", "l_linenumber", "l_linestatus")
.write
.format("parquet")
.saveAsTable("lineitem")
Seq(true, false).foreach {
collapsed =>
withSQLConf(
GlutenConfig.ENABLE_COLUMNAR_PROJECT_COLLAPSE.key -> collapsed.toString,
"spark.sql.autoBroadcastJoinThreshold" -> "-1") {
val df = sql(query)
checkAnswer(
df,
Seq(
Row("1-URGENT"),
Row("1-URGENT"),
Row("2-HIGH"),
Row("2-HIGH"),
Row("3-MEDIUM"),
Row("3-MEDIUM"),
Row("3-MEDIUM"),
Row("3-MEDIUM"),
Row("4-NOT SPECIFIED"),
Row("5-LOW")
)
)
assert(
getExecutedPlan(df).exists {
case _ @ProjectExecTransformer(_, _: ProjectExecTransformer) => true
case _ => false
} == !collapsed
)
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import org.apache.spark.sql.{DataFrame, GlutenSQLTestsTrait, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, ShuffleQueryStageExec}
import org.apache.spark.sql.hive.{HiveTableScanExecTransformer, HiveUtils}
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}

Expand Down Expand Up @@ -98,44 +96,6 @@ class GlutenHiveSQLQuerySuite extends GlutenSQLTestsTrait {
* : the input plans.
* @return
*/
def getChildrenPlan(plans: Seq[SparkPlan]): Seq[SparkPlan] = {
if (plans.isEmpty) {
return Seq()
}

val inputPlans: Seq[SparkPlan] = plans.map {
case stage: ShuffleQueryStageExec => stage.plan
case plan => plan
}

var newChildren: Seq[SparkPlan] = Seq()
inputPlans.foreach {
plan =>
newChildren = newChildren ++ getChildrenPlan(plan.children)
// To avoid duplication of WholeStageCodegenXXX and its children.
if (!plan.nodeName.startsWith("WholeStageCodegen")) {
newChildren = newChildren :+ plan
}
}
newChildren
}

/**
* Get the executed plan of a data frame.
*
* @param df
* : dataframe.
* @return
* A sequence of executed plans.
*/
def getExecutedPlan(df: DataFrame): Seq[SparkPlan] = {
df.queryExecution.executedPlan match {
case exec: AdaptiveSparkPlanExec =>
getChildrenPlan(Seq(exec.executedPlan))
case plan =>
getChildrenPlan(Seq(plan))
}
}

def checkOperatorMatch[T <: TransformSupport](df: DataFrame)(implicit tag: ClassTag[T]): Unit = {
val executedPlan = getExecutedPlan(df)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.datasources.text.{GlutenTextV1Suite, Glute
import org.apache.spark.sql.execution.datasources.v2.{GlutenDataSourceV2StrategySuite, GlutenFileTableSuite, GlutenV2PredicateSuite}
import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite
import org.apache.spark.sql.execution.joins.{GlutenBroadcastJoinSuite, GlutenExistenceJoinSuite, GlutenInnerJoinSuite, GlutenOuterJoinSuite}
import org.apache.spark.sql.extension.{GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer}
import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer}
import org.apache.spark.sql.gluten.GlutenFallbackSuite
import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite
import org.apache.spark.sql.sources.{GlutenBucketedReadWithoutHiveSupportSuite, GlutenBucketedWriteWithoutHiveSupportSuite, GlutenCreateTableAsSelectSuite, GlutenDDLSourceLoadSuite, GlutenDisableUnnecessaryBucketedScanWithoutHiveSupportSuite, GlutenDisableUnnecessaryBucketedScanWithoutHiveSupportSuiteAE, GlutenExternalCommandRunnerSuite, GlutenFilteredScanSuite, GlutenFiltersSuite, GlutenInsertSuite, GlutenPartitionedWriteSuite, GlutenPathOptionSuite, GlutenPrunedScanSuite, GlutenResolvedDataSourceSuite, GlutenSaveLoadSuite, GlutenTableScanSuite}
Expand Down Expand Up @@ -1148,6 +1148,7 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenFallbackSuite]
enableSuite[GlutenHiveSQLQuerySuite]
enableSuite[GlutenImplicitsTest]
enableSuite[GlutenCollapseProjectExecTransformerSuite]

override def getSQLQueryTestSettings: SQLQueryTestSettings = VeloxSQLQueryTestSettings
}
Expand Down
Loading

0 comments on commit 68055d2

Please sign in to comment.