Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-6067][CH][Minor] Compile Spark-3.5 ut with backends-clickhouse #6114

Merged
merged 2 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions gluten-ut/spark35/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,28 @@
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>add-sources</id>
<phase>generate-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/test/backends-clickhouse</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>backends-velox</id>
Expand Down Expand Up @@ -155,6 +177,28 @@
<log4j.version>2.19.0</log4j.version>
<clickhouse.lib.path></clickhouse.lib.path>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>add-sources</id>
<phase>generate-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/test/backends-velox</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten

import org.apache.spark.sql.execution.SparkPlan

trait GlutenColumnarWriteTestSupport {

def checkWriteFilesAndGetChild(sparkPlan: SparkPlan): SparkPlan = {
throw new UnsupportedOperationException("Clickhouse Backend does not support write files")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten

import org.apache.spark.sql.execution.{SparkPlan, VeloxColumnarWriteFilesExec}

trait GlutenColumnarWriteTestSupport {

def checkWriteFilesAndGetChild(sparkPlan: SparkPlan): SparkPlan = {
assert(sparkPlan.isInstanceOf[VeloxColumnarWriteFilesExec])
sparkPlan.asInstanceOf[VeloxColumnarWriteFilesExec].child
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@
*/
package org.apache.spark.sql.execution.datasources

import org.apache.gluten.GlutenColumnarWriteTestSupport
import org.apache.gluten.execution.SortExecTransformer

import org.apache.spark.sql.GlutenSQLTestsBaseTrait
import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, NullsFirst, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Sort}
import org.apache.spark.sql.execution.{QueryExecution, SortExec, VeloxColumnarWriteFilesExec}
import org.apache.spark.sql.execution.{QueryExecution, SortExec}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{IntegerType, StringType}
Expand Down Expand Up @@ -96,7 +97,8 @@ trait GlutenV1WriteCommandSuiteBase extends V1WriteCommandSuiteBase {
class GlutenV1WriteCommandSuite
extends V1WriteCommandSuite
with GlutenV1WriteCommandSuiteBase
with GlutenSQLTestsBaseTrait {
with GlutenSQLTestsBaseTrait
with GlutenColumnarWriteTestSupport {

testGluten(
"SPARK-41914: v1 write with AQE and in-partition sorted - non-string partition column") {
Expand All @@ -122,8 +124,7 @@ class GlutenV1WriteCommandSuite
val executedPlan = FileFormatWriter.executedPlan.get

val plan = if (enabled) {
assert(executedPlan.isInstanceOf[VeloxColumnarWriteFilesExec])
executedPlan.asInstanceOf[VeloxColumnarWriteFilesExec].child
checkWriteFilesAndGetChild(executedPlan)
} else {
executedPlan.transformDown { case a: AdaptiveSparkPlanExec => a.executedPlan }
}
Expand Down Expand Up @@ -204,8 +205,7 @@ class GlutenV1WriteCommandSuite
val executedPlan = FileFormatWriter.executedPlan.get

val plan = if (enabled) {
assert(executedPlan.isInstanceOf[VeloxColumnarWriteFilesExec])
executedPlan.asInstanceOf[VeloxColumnarWriteFilesExec].child
checkWriteFilesAndGetChild(executedPlan)
} else {
executedPlan.transformDown { case a: AdaptiveSparkPlanExec => a.executedPlan }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.sources

import org.apache.gluten.GlutenColumnarWriteTestSupport
import org.apache.gluten.execution.SortExecTransformer
import org.apache.gluten.extension.GlutenPlan

Expand All @@ -24,7 +25,7 @@ import org.apache.spark.executor.OutputMetrics
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.{CommandResultExec, QueryExecution, VeloxColumnarWriteFilesExec}
import org.apache.spark.sql.execution.{CommandResultExec, QueryExecution, SparkPlan}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.metric.SQLMetric
Expand All @@ -38,7 +39,8 @@ import java.io.{File, IOException}
class GlutenInsertSuite
extends InsertSuite
with GlutenSQLTestsBaseTrait
with AdaptiveSparkPlanHelper {
with AdaptiveSparkPlanHelper
with GlutenColumnarWriteTestSupport {

override def sparkConf: SparkConf = {
super.sparkConf.set("spark.sql.leafNodeDefaultParallelism", "1")
Expand All @@ -60,13 +62,13 @@ class GlutenInsertSuite
super.afterAll()
}

private def checkAndGetWriteFiles(df: DataFrame): VeloxColumnarWriteFilesExec = {
private def checkWriteFilesAndGetChild(df: DataFrame): (SparkPlan, SparkPlan) = {
val writeFiles = stripAQEPlan(
df.queryExecution.executedPlan
.asInstanceOf[CommandResultExec]
.commandPhysicalPlan).children.head
assert(writeFiles.isInstanceOf[VeloxColumnarWriteFilesExec])
writeFiles.asInstanceOf[VeloxColumnarWriteFilesExec]
val child = checkWriteFilesAndGetChild(writeFiles)
(writeFiles, child)
}

testGluten("insert partition table") {
Expand Down Expand Up @@ -97,7 +99,7 @@ class GlutenInsertSuite
val df =
spark.sql("INSERT INTO TABLE pt partition(pt='a') SELECT * FROM VALUES(1, 'a'),(2, 'b')")
spark.sparkContext.listenerBus.waitUntilEmpty()
checkAndGetWriteFiles(df)
checkWriteFilesAndGetChild(df)

assert(taskMetrics.bytesWritten > 0)
assert(taskMetrics.recordsWritten == 2)
Expand Down Expand Up @@ -135,13 +137,13 @@ class GlutenInsertSuite
private def validateDynamicPartitionWrite(
df: DataFrame,
expectedPartitionNames: Set[String]): Unit = {
val writeFiles = checkAndGetWriteFiles(df)
val (writeFiles, writeChild) = checkWriteFilesAndGetChild(df)
assert(
writeFiles
.find(_.isInstanceOf[SortExecTransformer])
.isEmpty)
// all operators should be transformed
assert(writeFiles.child.find(!_.isInstanceOf[GlutenPlan]).isEmpty)
assert(writeChild.find(!_.isInstanceOf[GlutenPlan]).isEmpty)

val parts = spark.sessionState.catalog.listPartitionNames(TableIdentifier("pt")).toSet
assert(parts == expectedPartitionNames)
Expand Down Expand Up @@ -209,7 +211,7 @@ class GlutenInsertSuite
spark.sql("CREATE TABLE t (c1 int, c2 string) USING PARQUET")

val df = spark.sql("INSERT OVERWRITE TABLE t SELECT c1, c2 FROM source SORT BY c1")
val writeFiles = checkAndGetWriteFiles(df)
val (writeFiles, _) = checkWriteFilesAndGetChild(df)
assert(writeFiles.find(x => x.isInstanceOf[SortExecTransformer]).isDefined)
checkAnswer(spark.sql("SELECT * FROM t"), spark.sql("SELECT * FROM source SORT BY c1"))
}
Expand Down Expand Up @@ -244,7 +246,7 @@ class GlutenInsertSuite
spark.sql("CREATE TABLE t1 USING PARQUET AS SELECT id as c1, id % 3 as c2 FROM range(10)")
spark.sql("CREATE TABLE t2 (c1 long, c2 long) USING PARQUET")
val df = spark.sql("INSERT INTO TABLE t2 SELECT c2, count(*) FROM t1 GROUP BY c2")
checkAndGetWriteFiles(df)
checkWriteFilesAndGetChild(df)
}
}

Expand All @@ -257,7 +259,7 @@ class GlutenInsertSuite
spark.sql("INSERT INTO TABLE t1 VALUES(1, 1),(2, 2)")
spark.sql("CREATE TABLE t2 (c1 long, c2 long) USING PARQUET")
val df = spark.sql("INSERT INTO TABLE t2 SELECT * FROM t1")
checkAndGetWriteFiles(df)
checkWriteFilesAndGetChild(df)
}
}

Expand Down Expand Up @@ -405,7 +407,7 @@ class GlutenInsertSuite
withTable("t") {
sql(s"create table t(i boolean) using ${config.dataSource}")
if (config.useDataFrames) {
Seq((false)).toDF.write.insertInto("t")
Seq(false).toDF.write.insertInto("t")
} else {
sql("insert into t select false")
}
Expand All @@ -420,12 +422,12 @@ class GlutenInsertSuite
val incompatibleDefault =
"Failed to execute ALTER TABLE ADD COLUMNS command because the destination " +
"table column `s` has a DEFAULT value"
Seq(Config("parquet"), Config("parquet", true)).foreach {
Seq(Config("parquet"), Config("parquet", useDataFrames = true)).foreach {
config =>
withTable("t") {
sql(s"create table t(i boolean) using ${config.dataSource}")
if (config.useDataFrames) {
Seq((false)).toDF.write.insertInto("t")
Seq(false).toDF.write.insertInto("t")
} else {
sql("insert into t select false")
}
Expand All @@ -452,7 +454,7 @@ class GlutenInsertSuite
withTable("t") {
sql(s"create table t(i boolean) using ${config.dataSource}")
if (config.useDataFrames) {
Seq((false)).toDF.write.insertInto("t")
Seq(false).toDF.write.insertInto("t")
} else {
sql("insert into t select false")
}
Expand All @@ -469,12 +471,12 @@ class GlutenInsertSuite
val incompatibleDefault =
"Failed to execute ALTER TABLE ADD COLUMNS command because the destination " +
"table column `s` has a DEFAULT value"
Seq(Config("parquet"), Config("parquet", true)).foreach {
Seq(Config("parquet"), Config("parquet", useDataFrames = true)).foreach {
config =>
withTable("t") {
sql(s"create table t(i boolean) using ${config.dataSource}")
if (config.useDataFrames) {
Seq((false)).toDF.write.insertInto("t")
Seq(false).toDF.write.insertInto("t")
} else {
sql("insert into t select false")
}
Expand All @@ -501,7 +503,7 @@ class GlutenInsertSuite
withTable("t") {
sql(s"create table t(i boolean) using ${config.dataSource}")
if (config.useDataFrames) {
Seq((false)).toDF.write.insertInto("t")
Seq(false).toDF.write.insertInto("t")
} else {
sql("insert into t select false")
}
Expand Down Expand Up @@ -566,12 +568,12 @@ class GlutenInsertSuite
val incompatibleDefault =
"Failed to execute ALTER TABLE ADD COLUMNS command because the destination " +
"table column `s` has a DEFAULT value"
Seq(Config("parquet"), Config("parquet", true)).foreach {
Seq(Config("parquet"), Config("parquet", useDataFrames = true)).foreach {
config =>
withTable("t") {
sql(s"create table t(i boolean) using ${config.dataSource}")
if (config.useDataFrames) {
Seq((false)).toDF.write.insertInto("t")
Seq(false).toDF.write.insertInto("t")
} else {
sql("insert into t select false")
}
Expand Down
Loading