diff --git a/gluten-ut/spark35/pom.xml b/gluten-ut/spark35/pom.xml index cf2129389a6e..2bf1c93a0052 100644 --- a/gluten-ut/spark35/pom.xml +++ b/gluten-ut/spark35/pom.xml @@ -63,6 +63,28 @@ test + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-sources + generate-sources + + add-test-source + + + + src/test/backends-clickhouse + + + + + + + backends-velox @@ -155,6 +177,28 @@ 2.19.0 + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-sources + generate-sources + + add-test-source + + + + src/test/backends-velox + + + + + + + diff --git a/gluten-ut/spark35/src/test/backends-clickhouse/org/apache/gluten/GlutenColumnarWriteTestSupport.scala b/gluten-ut/spark35/src/test/backends-clickhouse/org/apache/gluten/GlutenColumnarWriteTestSupport.scala new file mode 100644 index 000000000000..43b83afe9af3 --- /dev/null +++ b/gluten-ut/spark35/src/test/backends-clickhouse/org/apache/gluten/GlutenColumnarWriteTestSupport.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten + +import org.apache.spark.sql.execution.SparkPlan + +trait GlutenColumnarWriteTestSupport { + + def checkWriteFilesAndGetChild(sparkPlan: SparkPlan): SparkPlan = { + throw new UnsupportedOperationException("Clickhouse Backend does not support write files") + } +} diff --git a/gluten-ut/spark35/src/test/backends-velox/org/apache/gluten/GlutenColumnarWriteTestSupport.scala b/gluten-ut/spark35/src/test/backends-velox/org/apache/gluten/GlutenColumnarWriteTestSupport.scala new file mode 100644 index 000000000000..c7ad606bcf8d --- /dev/null +++ b/gluten-ut/spark35/src/test/backends-velox/org/apache/gluten/GlutenColumnarWriteTestSupport.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten + +import org.apache.spark.sql.execution.{SparkPlan, VeloxColumnarWriteFilesExec} + +trait GlutenColumnarWriteTestSupport { + + def checkWriteFilesAndGetChild(sparkPlan: SparkPlan): SparkPlan = { + assert(sparkPlan.isInstanceOf[VeloxColumnarWriteFilesExec]) + sparkPlan.asInstanceOf[VeloxColumnarWriteFilesExec].child + } +} diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenV1WriteCommandSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenV1WriteCommandSuite.scala index 3d277b94cc3e..fcaf75a4d5c1 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenV1WriteCommandSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenV1WriteCommandSuite.scala @@ -16,12 +16,13 @@ */ package org.apache.spark.sql.execution.datasources +import org.apache.gluten.GlutenColumnarWriteTestSupport import org.apache.gluten.execution.SortExecTransformer import org.apache.spark.sql.GlutenSQLTestsBaseTrait import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, NullsFirst, SortOrder} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Sort} -import org.apache.spark.sql.execution.{QueryExecution, SortExec, VeloxColumnarWriteFilesExec} +import org.apache.spark.sql.execution.{QueryExecution, SortExec} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{IntegerType, StringType} @@ -96,7 +97,8 @@ trait GlutenV1WriteCommandSuiteBase extends V1WriteCommandSuiteBase { class GlutenV1WriteCommandSuite extends V1WriteCommandSuite with GlutenV1WriteCommandSuiteBase - with GlutenSQLTestsBaseTrait { + with GlutenSQLTestsBaseTrait + with GlutenColumnarWriteTestSupport { testGluten( "SPARK-41914: v1 write with AQE and in-partition sorted - non-string partition column") { @@ -122,8 +124,7 @@ class GlutenV1WriteCommandSuite val executedPlan = FileFormatWriter.executedPlan.get val plan = if (enabled) { - assert(executedPlan.isInstanceOf[VeloxColumnarWriteFilesExec]) - executedPlan.asInstanceOf[VeloxColumnarWriteFilesExec].child + checkWriteFilesAndGetChild(executedPlan) } else { executedPlan.transformDown { case a: AdaptiveSparkPlanExec => a.executedPlan } } @@ -204,8 +205,7 @@ class GlutenV1WriteCommandSuite val executedPlan = FileFormatWriter.executedPlan.get val plan = if (enabled) { - assert(executedPlan.isInstanceOf[VeloxColumnarWriteFilesExec]) - executedPlan.asInstanceOf[VeloxColumnarWriteFilesExec].child + checkWriteFilesAndGetChild(executedPlan) } else { executedPlan.transformDown { case a: AdaptiveSparkPlanExec => a.executedPlan } } diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala index 2814c2e8cba6..084c2faa8c5c 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.sources +import org.apache.gluten.GlutenColumnarWriteTestSupport import org.apache.gluten.execution.SortExecTransformer import org.apache.gluten.extension.GlutenPlan @@ -24,7 +25,7 @@ import org.apache.spark.executor.OutputMetrics import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.execution.{CommandResultExec, QueryExecution, VeloxColumnarWriteFilesExec} +import org.apache.spark.sql.execution.{CommandResultExec, QueryExecution, SparkPlan} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.command.DataWritingCommandExec import org.apache.spark.sql.execution.metric.SQLMetric @@ -38,7 +39,8 @@ import java.io.{File, IOException} class GlutenInsertSuite extends InsertSuite with GlutenSQLTestsBaseTrait - with AdaptiveSparkPlanHelper { + with AdaptiveSparkPlanHelper + with GlutenColumnarWriteTestSupport { override def sparkConf: SparkConf = { super.sparkConf.set("spark.sql.leafNodeDefaultParallelism", "1") @@ -60,13 +62,13 @@ class GlutenInsertSuite super.afterAll() } - private def checkAndGetWriteFiles(df: DataFrame): VeloxColumnarWriteFilesExec = { + private def checkWriteFilesAndGetChild(df: DataFrame): (SparkPlan, SparkPlan) = { val writeFiles = stripAQEPlan( df.queryExecution.executedPlan .asInstanceOf[CommandResultExec] .commandPhysicalPlan).children.head - assert(writeFiles.isInstanceOf[VeloxColumnarWriteFilesExec]) - writeFiles.asInstanceOf[VeloxColumnarWriteFilesExec] + val child = checkWriteFilesAndGetChild(writeFiles) + (writeFiles, child) } testGluten("insert partition table") { @@ -97,7 +99,7 @@ class GlutenInsertSuite val df = spark.sql("INSERT INTO TABLE pt partition(pt='a') SELECT * FROM VALUES(1, 'a'),(2, 'b')") spark.sparkContext.listenerBus.waitUntilEmpty() - checkAndGetWriteFiles(df) + checkWriteFilesAndGetChild(df) assert(taskMetrics.bytesWritten > 0) assert(taskMetrics.recordsWritten == 2) @@ -135,13 +137,13 @@ class GlutenInsertSuite private def validateDynamicPartitionWrite( df: DataFrame, expectedPartitionNames: Set[String]): Unit = { - val writeFiles = checkAndGetWriteFiles(df) + val (writeFiles, writeChild) = checkWriteFilesAndGetChild(df) assert( writeFiles .find(_.isInstanceOf[SortExecTransformer]) .isEmpty) // all operators should be transformed - assert(writeFiles.child.find(!_.isInstanceOf[GlutenPlan]).isEmpty) + assert(writeChild.find(!_.isInstanceOf[GlutenPlan]).isEmpty) val parts = spark.sessionState.catalog.listPartitionNames(TableIdentifier("pt")).toSet assert(parts == expectedPartitionNames) @@ -209,7 +211,7 @@ class GlutenInsertSuite spark.sql("CREATE TABLE t (c1 int, c2 string) USING PARQUET") val df = spark.sql("INSERT OVERWRITE TABLE t SELECT c1, c2 FROM source SORT BY c1") - val writeFiles = checkAndGetWriteFiles(df) + val (writeFiles, _) = checkWriteFilesAndGetChild(df) assert(writeFiles.find(x => x.isInstanceOf[SortExecTransformer]).isDefined) checkAnswer(spark.sql("SELECT * FROM t"), spark.sql("SELECT * FROM source SORT BY c1")) } @@ -244,7 +246,7 @@ class GlutenInsertSuite spark.sql("CREATE TABLE t1 USING PARQUET AS SELECT id as c1, id % 3 as c2 FROM range(10)") spark.sql("CREATE TABLE t2 (c1 long, c2 long) USING PARQUET") val df = spark.sql("INSERT INTO TABLE t2 SELECT c2, count(*) FROM t1 GROUP BY c2") - checkAndGetWriteFiles(df) + checkWriteFilesAndGetChild(df) } } @@ -257,7 +259,7 @@ class GlutenInsertSuite spark.sql("INSERT INTO TABLE t1 VALUES(1, 1),(2, 2)") spark.sql("CREATE TABLE t2 (c1 long, c2 long) USING PARQUET") val df = spark.sql("INSERT INTO TABLE t2 SELECT * FROM t1") - checkAndGetWriteFiles(df) + checkWriteFilesAndGetChild(df) } } @@ -405,7 +407,7 @@ class GlutenInsertSuite withTable("t") { sql(s"create table t(i boolean) using ${config.dataSource}") if (config.useDataFrames) { - Seq((false)).toDF.write.insertInto("t") + Seq(false).toDF.write.insertInto("t") } else { sql("insert into t select false") } @@ -420,12 +422,12 @@ class GlutenInsertSuite val incompatibleDefault = "Failed to execute ALTER TABLE ADD COLUMNS command because the destination " + "table column `s` has a DEFAULT value" - Seq(Config("parquet"), Config("parquet", true)).foreach { + Seq(Config("parquet"), Config("parquet", useDataFrames = true)).foreach { config => withTable("t") { sql(s"create table t(i boolean) using ${config.dataSource}") if (config.useDataFrames) { - Seq((false)).toDF.write.insertInto("t") + Seq(false).toDF.write.insertInto("t") } else { sql("insert into t select false") } @@ -452,7 +454,7 @@ class GlutenInsertSuite withTable("t") { sql(s"create table t(i boolean) using ${config.dataSource}") if (config.useDataFrames) { - Seq((false)).toDF.write.insertInto("t") + Seq(false).toDF.write.insertInto("t") } else { sql("insert into t select false") } @@ -469,12 +471,12 @@ class GlutenInsertSuite val incompatibleDefault = "Failed to execute ALTER TABLE ADD COLUMNS command because the destination " + "table column `s` has a DEFAULT value" - Seq(Config("parquet"), Config("parquet", true)).foreach { + Seq(Config("parquet"), Config("parquet", useDataFrames = true)).foreach { config => withTable("t") { sql(s"create table t(i boolean) using ${config.dataSource}") if (config.useDataFrames) { - Seq((false)).toDF.write.insertInto("t") + Seq(false).toDF.write.insertInto("t") } else { sql("insert into t select false") } @@ -501,7 +503,7 @@ class GlutenInsertSuite withTable("t") { sql(s"create table t(i boolean) using ${config.dataSource}") if (config.useDataFrames) { - Seq((false)).toDF.write.insertInto("t") + Seq(false).toDF.write.insertInto("t") } else { sql("insert into t select false") } @@ -566,12 +568,12 @@ class GlutenInsertSuite val incompatibleDefault = "Failed to execute ALTER TABLE ADD COLUMNS command because the destination " + "table column `s` has a DEFAULT value" - Seq(Config("parquet"), Config("parquet", true)).foreach { + Seq(Config("parquet"), Config("parquet", useDataFrames = true)).foreach { config => withTable("t") { sql(s"create table t(i boolean) using ${config.dataSource}") if (config.useDataFrames) { - Seq((false)).toDF.write.insertInto("t") + Seq(false).toDF.write.insertInto("t") } else { sql("insert into t select false") }