Skip to content

Commit

Permalink
Refactor GlutenV1WriteCommandSuite and GlutenInsertSuite, so we can c…
Browse files Browse the repository at this point in the history
…ompile spark-ut with spark-3.5 profile
  • Loading branch information
baibaichen committed Jun 17, 2024
1 parent e4388e6 commit 1dd4ea9
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 17 deletions.
44 changes: 44 additions & 0 deletions gluten-ut/spark35/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,28 @@
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>add-sources</id>
<phase>generate-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/test/backends-clickhouse</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>backends-velox</id>
Expand Down Expand Up @@ -155,6 +177,28 @@
<log4j.version>2.19.0</log4j.version>
<clickhouse.lib.path></clickhouse.lib.path>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>add-sources</id>
<phase>generate-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/test/backends-velox</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten

import org.apache.spark.sql.execution.SparkPlan

trait GlutenColumnarWriteTestSupport {

def checkWriteFilesAndGetChild(sparkPlan: SparkPlan): SparkPlan = {
throw new UnsupportedOperationException("Clickhouse Backend does not support write files")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten

import org.apache.spark.sql.execution.{SparkPlan, VeloxColumnarWriteFilesExec}

trait GlutenColumnarWriteTestSupport {

def checkWriteFilesAndGetChild(sparkPlan: SparkPlan): SparkPlan = {
assert(sparkPlan.isInstanceOf[VeloxColumnarWriteFilesExec])
sparkPlan.asInstanceOf[VeloxColumnarWriteFilesExec].child
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@
*/
package org.apache.spark.sql.execution.datasources

import org.apache.gluten.GlutenColumnarWriteTestSupport
import org.apache.gluten.execution.SortExecTransformer

import org.apache.spark.sql.GlutenSQLTestsBaseTrait
import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, NullsFirst, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Sort}
import org.apache.spark.sql.execution.{QueryExecution, SortExec, VeloxColumnarWriteFilesExec}
import org.apache.spark.sql.execution.{QueryExecution, SortExec}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{IntegerType, StringType}
Expand Down Expand Up @@ -96,7 +97,8 @@ trait GlutenV1WriteCommandSuiteBase extends V1WriteCommandSuiteBase {
class GlutenV1WriteCommandSuite
extends V1WriteCommandSuite
with GlutenV1WriteCommandSuiteBase
with GlutenSQLTestsBaseTrait {
with GlutenSQLTestsBaseTrait
with GlutenColumnarWriteTestSupport {

testGluten(
"SPARK-41914: v1 write with AQE and in-partition sorted - non-string partition column") {
Expand All @@ -122,8 +124,7 @@ class GlutenV1WriteCommandSuite
val executedPlan = FileFormatWriter.executedPlan.get

val plan = if (enabled) {
assert(executedPlan.isInstanceOf[VeloxColumnarWriteFilesExec])
executedPlan.asInstanceOf[VeloxColumnarWriteFilesExec].child
checkWriteFilesAndGetChild(executedPlan)
} else {
executedPlan.transformDown { case a: AdaptiveSparkPlanExec => a.executedPlan }
}
Expand Down Expand Up @@ -204,8 +205,7 @@ class GlutenV1WriteCommandSuite
val executedPlan = FileFormatWriter.executedPlan.get

val plan = if (enabled) {
assert(executedPlan.isInstanceOf[VeloxColumnarWriteFilesExec])
executedPlan.asInstanceOf[VeloxColumnarWriteFilesExec].child
checkWriteFilesAndGetChild(executedPlan)
} else {
executedPlan.transformDown { case a: AdaptiveSparkPlanExec => a.executedPlan }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.sources

import org.apache.gluten.GlutenColumnarWriteTestSupport
import org.apache.gluten.execution.SortExecTransformer
import org.apache.gluten.extension.GlutenPlan

Expand All @@ -24,7 +25,7 @@ import org.apache.spark.executor.OutputMetrics
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.{CommandResultExec, QueryExecution, VeloxColumnarWriteFilesExec}
import org.apache.spark.sql.execution.{CommandResultExec, QueryExecution, SparkPlan}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.metric.SQLMetric
Expand All @@ -38,7 +39,8 @@ import java.io.{File, IOException}
class GlutenInsertSuite
extends InsertSuite
with GlutenSQLTestsBaseTrait
with AdaptiveSparkPlanHelper {
with AdaptiveSparkPlanHelper
with GlutenColumnarWriteTestSupport {

override def sparkConf: SparkConf = {
super.sparkConf.set("spark.sql.leafNodeDefaultParallelism", "1")
Expand All @@ -60,13 +62,13 @@ class GlutenInsertSuite
super.afterAll()
}

private def checkAndGetWriteFiles(df: DataFrame): VeloxColumnarWriteFilesExec = {
private def checkWriteFilesAndGetChild(df: DataFrame): (SparkPlan, SparkPlan) = {
val writeFiles = stripAQEPlan(
df.queryExecution.executedPlan
.asInstanceOf[CommandResultExec]
.commandPhysicalPlan).children.head
assert(writeFiles.isInstanceOf[VeloxColumnarWriteFilesExec])
writeFiles.asInstanceOf[VeloxColumnarWriteFilesExec]
val child = checkWriteFilesAndGetChild(writeFiles)
(writeFiles, child)
}

testGluten("insert partition table") {
Expand Down Expand Up @@ -97,7 +99,7 @@ class GlutenInsertSuite
val df =
spark.sql("INSERT INTO TABLE pt partition(pt='a') SELECT * FROM VALUES(1, 'a'),(2, 'b')")
spark.sparkContext.listenerBus.waitUntilEmpty()
checkAndGetWriteFiles(df)
checkWriteFilesAndGetChild(df)

assert(taskMetrics.bytesWritten > 0)
assert(taskMetrics.recordsWritten == 2)
Expand Down Expand Up @@ -135,13 +137,13 @@ class GlutenInsertSuite
private def validateDynamicPartitionWrite(
df: DataFrame,
expectedPartitionNames: Set[String]): Unit = {
val writeFiles = checkAndGetWriteFiles(df)
val (writeFiles, writeChild) = checkWriteFilesAndGetChild(df)
assert(
writeFiles
.find(_.isInstanceOf[SortExecTransformer])
.isEmpty)
// all operators should be transformed
assert(writeFiles.child.find(!_.isInstanceOf[GlutenPlan]).isEmpty)
assert(writeChild.find(!_.isInstanceOf[GlutenPlan]).isEmpty)

val parts = spark.sessionState.catalog.listPartitionNames(TableIdentifier("pt")).toSet
assert(parts == expectedPartitionNames)
Expand Down Expand Up @@ -209,7 +211,7 @@ class GlutenInsertSuite
spark.sql("CREATE TABLE t (c1 int, c2 string) USING PARQUET")

val df = spark.sql("INSERT OVERWRITE TABLE t SELECT c1, c2 FROM source SORT BY c1")
val writeFiles = checkAndGetWriteFiles(df)
val (writeFiles, _) = checkWriteFilesAndGetChild(df)
assert(writeFiles.find(x => x.isInstanceOf[SortExecTransformer]).isDefined)
checkAnswer(spark.sql("SELECT * FROM t"), spark.sql("SELECT * FROM source SORT BY c1"))
}
Expand Down Expand Up @@ -244,7 +246,7 @@ class GlutenInsertSuite
spark.sql("CREATE TABLE t1 USING PARQUET AS SELECT id as c1, id % 3 as c2 FROM range(10)")
spark.sql("CREATE TABLE t2 (c1 long, c2 long) USING PARQUET")
val df = spark.sql("INSERT INTO TABLE t2 SELECT c2, count(*) FROM t1 GROUP BY c2")
checkAndGetWriteFiles(df)
checkWriteFilesAndGetChild(df)
}
}

Expand All @@ -257,7 +259,7 @@ class GlutenInsertSuite
spark.sql("INSERT INTO TABLE t1 VALUES(1, 1),(2, 2)")
spark.sql("CREATE TABLE t2 (c1 long, c2 long) USING PARQUET")
val df = spark.sql("INSERT INTO TABLE t2 SELECT * FROM t1")
checkAndGetWriteFiles(df)
checkWriteFilesAndGetChild(df)
}
}

Expand Down

0 comments on commit 1dd4ea9

Please sign in to comment.