From db6f75a4752cc97212192188beceb0e00d636a0d Mon Sep 17 00:00:00 2001 From: Shiyan Xu Date: Sun, 21 Apr 2024 17:31:47 -0500 Subject: [PATCH] [GLUTEN-5471][VL]feat: Support read Hudi COW table --- .github/workflows/velox_docker.yml | 25 +-- docs/get-started/Velox.md | 17 ++ docs/get-started/build-guide.md | 1 + gluten-hudi/pom.xml | 165 ++++++++++++++++++ ...xecution.DataSourceScanTransformerRegister | 1 + .../execution/HudiScanTransformer.scala | 97 ++++++++++ .../HudiScanTransformerProvider.scala | 31 ++++ .../gluten/execution/VeloxHudiSuite.scala | 90 ++++++++++ .../gluten/execution/VeloxTPCHHudiSuite.scala | 63 +++++++ pom.xml | 14 +- 10 files changed, 491 insertions(+), 13 deletions(-) create mode 100755 gluten-hudi/pom.xml create mode 100644 gluten-hudi/src/main/resources/META-INF/services/org.apache.gluten.execution.DataSourceScanTransformerRegister create mode 100644 gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformer.scala create mode 100644 gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformerProvider.scala create mode 100644 gluten-hudi/src/test/scala/org/apache/gluten/execution/VeloxHudiSuite.scala create mode 100644 gluten-hudi/src/test/scala/org/apache/gluten/execution/VeloxTPCHHudiSuite.scala diff --git a/.github/workflows/velox_docker.yml b/.github/workflows/velox_docker.yml index 6c1be4344c71a..0124d14245ca3 100644 --- a/.github/workflows/velox_docker.yml +++ b/.github/workflows/velox_docker.yml @@ -30,6 +30,7 @@ on: - 'gluten-data/**' - 'gluten-delta/**' - 'gluten-iceberg/**' + - 'gluten-hudi/**' - 'gluten-ut/**' - 'shims/**' - 'tools/gluten-it/**' @@ -664,8 +665,8 @@ jobs: run: | cd $GITHUB_WORKSPACE/ export SPARK_SCALA_VERSION=2.12 - $MVN_CMD clean install -Pspark-3.2 -Pspark-ut -Pbackends-velox -Pceleborn -Piceberg -Pdelta -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark32/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \ - $MVN_CMD test -Pspark-3.2 -Pbackends-velox -Piceberg -Pdelta -DtagsToExclude=None -DtagsToInclude=org.apache.gluten.tags.UDFTest + $MVN_CMD clean install -Pspark-3.2 -Pspark-ut -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark32/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \ + $MVN_CMD test -Pspark-3.2 -Pbackends-velox -Piceberg -Pdelta -Phudi -DtagsToExclude=None -DtagsToInclude=org.apache.gluten.tags.UDFTest - name: Upload golden files if: failure() uses: actions/upload-artifact@v4 @@ -727,7 +728,7 @@ jobs: - name: Build and run unit test for Spark 3.2.2 (slow tests) run: | cd $GITHUB_WORKSPACE/ - $MVN_CMD clean install -Pspark-3.2 -Pspark-ut -Pbackends-velox -Pceleborn -Piceberg -Pdelta -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark32/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest + $MVN_CMD clean install -Pspark-3.2 -Pspark-ut -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark32/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest run-spark-test-spark33: needs: build-native-lib-centos-8 @@ -794,8 +795,8 @@ jobs: run: | cd $GITHUB_WORKSPACE/ export SPARK_SCALA_VERSION=2.12 - $MVN_CMD clean install -Pspark-3.3 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark33/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \ - $MVN_CMD test -Pspark-3.3 -Pbackends-velox -Piceberg -Pdelta -DtagsToExclude=None -DtagsToInclude=org.apache.gluten.tags.UDFTest + $MVN_CMD clean install -Pspark-3.3 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark33/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \ + $MVN_CMD test -Pspark-3.3 -Pbackends-velox -Piceberg -Pdelta -Phudi -DtagsToExclude=None -DtagsToInclude=org.apache.gluten.tags.UDFTest - name: Upload golden files if: failure() uses: actions/upload-artifact@v4 @@ -852,7 +853,7 @@ jobs: - name: Build and Run unit test for Spark 3.3.1 (slow tests) run: | cd $GITHUB_WORKSPACE/ - $MVN_CMD clean install -Pspark-3.3 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark33/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest + $MVN_CMD clean install -Pspark-3.3 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark33/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest run-spark-test-spark34: needs: build-native-lib-centos-8 @@ -919,8 +920,8 @@ jobs: run: | cd $GITHUB_WORKSPACE/ export SPARK_SCALA_VERSION=2.12 - $MVN_CMD clean install -Pspark-3.4 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark34/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \ - $MVN_CMD test -Pspark-3.4 -Pbackends-velox -Piceberg -Pdelta -DtagsToExclude=None -DtagsToInclude=org.apache.gluten.tags.UDFTest + $MVN_CMD clean install -Pspark-3.4 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark34/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \ + $MVN_CMD test -Pspark-3.4 -Pbackends-velox -Piceberg -Pdelta -Phudi -DtagsToExclude=None -DtagsToInclude=org.apache.gluten.tags.UDFTest - name: Upload golden files if: failure() uses: actions/upload-artifact@v4 @@ -977,7 +978,7 @@ jobs: - name: Build and Run unit test for Spark 3.4.2 (slow tests) run: | cd $GITHUB_WORKSPACE/ - $MVN_CMD clean install -Pspark-3.4 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark34/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest + $MVN_CMD clean install -Pspark-3.4 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark34/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest run-spark-test-spark35: needs: build-native-lib-centos-8 @@ -1044,8 +1045,8 @@ jobs: run: | cd $GITHUB_WORKSPACE/ export SPARK_SCALA_VERSION=2.12 - $MVN_CMD clean install -Pspark-3.5 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark35/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \ - $MVN_CMD test -Pspark-3.5 -Pbackends-velox -Piceberg -Pdelta -DtagsToExclude=None -DtagsToInclude=org.apache.gluten.tags.UDFTest + $MVN_CMD clean install -Pspark-3.5 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark35/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \ + $MVN_CMD test -Pspark-3.5 -Pbackends-velox -Piceberg -Pdelta -Phudi -DtagsToExclude=None -DtagsToInclude=org.apache.gluten.tags.UDFTest - name: Upload golden files if: failure() uses: actions/upload-artifact@v4 @@ -1107,4 +1108,4 @@ jobs: - name: Build and Run unit test for Spark 3.5.1 (slow tests) run: | cd $GITHUB_WORKSPACE/ - $MVN_CMD clean install -Pspark-3.5 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark35/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest + $MVN_CMD clean install -Pspark-3.5 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark35/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest diff --git a/docs/get-started/Velox.md b/docs/get-started/Velox.md index 964744bd5eb54..1805f39a38632 100644 --- a/docs/get-started/Velox.md +++ b/docs/get-started/Velox.md @@ -307,6 +307,23 @@ Once built successfully, iceberg features will be included in gluten-velox-bundl After the two steps, you can query iceberg table by gluten/velox without scan's fallback. +## Hudi Support + +Gluten with velox backend supports [Hudi](https://hudi.apache.org/) table. Currently, only reading COW (Copy-On-Write) tables is supported. + +### How to use + +First of all, compile gluten-hudi module by a `hudi` profile, as follows: + +``` +mvn clean package -Pbackends-velox -Pspark-3.3 -Phudi -DskipTests +``` + +Then, put the additional `gluten-hudi-XX-SNAPSHOT.jar` to the class path (usually it's `$SPARK_HOME/jars`). +The gluten-hudi jar is in `gluten-hudi/target` directory. + +After the two steps, you can query Hudi table by gluten/velox without scan's fallback. + # Coverage Spark3.3 has 387 functions in total. ~240 are commonly used. Velox's functions have two category, Presto and Spark. Presto has 124 functions implemented. Spark has 62 functions. Spark functions are verified to have the same result as Vanilla Spark. Some Presto functions have the same result as Vanilla Spark but some others have different. Gluten prefer to use Spark functions firstly. If it's not in Spark's list but implemented in Presto, we currently offload to Presto one until we noted some result mismatch, then we need to reimplement the function in Spark category. Gluten currently offloads 94 functions and 14 operators, more details refer to [Velox Backend's Supported Operators & Functions](../velox-backend-support-progress.md). diff --git a/docs/get-started/build-guide.md b/docs/get-started/build-guide.md index 3db2244ba229a..7658f8e5bfe89 100644 --- a/docs/get-started/build-guide.md +++ b/docs/get-started/build-guide.md @@ -62,6 +62,7 @@ The below parameters can be set via `-P` for mvn. | rss | Build Gluten with Remote Shuffle Service, only applicable for Velox backend. | disabled | | delta | Build Gluten with Delta Lake support. | disabled | | iceberg | Build Gluten with Iceberg support. | disabled | +| hudi | Build Gluten with Hudi support. | disabled | | spark-3.2 | Build Gluten for Spark 3.2. | enabled | | spark-3.3 | Build Gluten for Spark 3.3. | disabled | | spark-3.4 | Build Gluten for Spark 3.4. | disabled | diff --git a/gluten-hudi/pom.xml b/gluten-hudi/pom.xml new file mode 100755 index 0000000000000..e9aa342d4f1d5 --- /dev/null +++ b/gluten-hudi/pom.xml @@ -0,0 +1,165 @@ + + + + gluten-parent + org.apache.gluten + 1.2.0-SNAPSHOT + ../pom.xml + + 4.0.0 + + gluten-hudi + jar + Gluten Hudi + + + ${project.basedir}/src/main/resources + + + + + org.apache.gluten + gluten-core + ${project.version} + provided + + + org.apache.hudi + hudi-spark${sparkbundle.version}-bundle_${scala.binary.version} + ${hudi.version} + provided + + + org.apache.spark + spark-core_${scala.binary.version} + provided + + + org.apache.spark + spark-sql_${scala.binary.version} + provided + + + + + org.apache.gluten + gluten-core + ${project.version} + test-jar + test + + + org.apache.gluten + backends-velox + ${project.version} + test + + + org.apache.gluten + backends-velox + ${project.version} + test-jar + test + + + org.apache.spark + spark-core_${scala.binary.version} + + + org.apache.spark + spark-core_${scala.binary.version} + test-jar + + + org.apache.spark + spark-sql_${scala.binary.version} + test-jar + + + org.apache.spark + spark-catalyst_${scala.binary.version} + test-jar + + + org.apache.spark + spark-hive_${scala.binary.version} + ${spark.version} + test + + + org.apache.hadoop + hadoop-client + ${hadoop.version} + test + + + com.google.protobuf + protobuf-java + ${protobuf.version} + test + + + org.scalatest + scalatest_${scala.binary.version} + test + + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + ${resource.dir} + + + + + net.alchim31.maven + scala-maven-plugin + + + org.apache.maven.plugins + maven-compiler-plugin + + + org.scalastyle + scalastyle-maven-plugin + + + org.apache.maven.plugins + maven-checkstyle-plugin + + + com.diffplug.spotless + spotless-maven-plugin + + + org.scalatest + scalatest-maven-plugin + + . + + + + org.apache.maven.plugins + maven-jar-plugin + + + prepare-test-jar + test-compile + + test-jar + + + + + + org.apache.maven.plugins + maven-resources-plugin + + + + diff --git a/gluten-hudi/src/main/resources/META-INF/services/org.apache.gluten.execution.DataSourceScanTransformerRegister b/gluten-hudi/src/main/resources/META-INF/services/org.apache.gluten.execution.DataSourceScanTransformerRegister new file mode 100644 index 0000000000000..ccfe1ada479e8 --- /dev/null +++ b/gluten-hudi/src/main/resources/META-INF/services/org.apache.gluten.execution.DataSourceScanTransformerRegister @@ -0,0 +1 @@ +org.apache.gluten.execution.HudiScanTransformerProvider \ No newline at end of file diff --git a/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformer.scala b/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformer.scala new file mode 100644 index 0000000000000..d8fa461ea5897 --- /dev/null +++ b/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformer.scala @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.gluten.extension.ValidationResult +import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.execution.FileSourceScanExec +import org.apache.spark.sql.execution.datasources.HadoopFsRelation +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.collection.BitSet + +case class HudiScanTransformer( + @transient override val relation: HadoopFsRelation, + override val output: Seq[Attribute], + override val requiredSchema: StructType, + override val partitionFilters: Seq[Expression], + override val optionalBucketSet: Option[BitSet], + override val optionalNumCoalescedBuckets: Option[Int], + override val dataFilters: Seq[Expression], + override val tableIdentifier: Option[TableIdentifier], + override val disableBucketedScan: Boolean = false) + extends FileSourceScanExecTransformerBase( + relation, + output, + requiredSchema, + partitionFilters, + optionalBucketSet, + optionalNumCoalescedBuckets, + dataFilters, + tableIdentifier, + disableBucketedScan + ) { + + override lazy val fileFormat: ReadFileFormat = ReadFileFormat.ParquetReadFormat + + override protected def doValidateInternal(): ValidationResult = { + if (!requiredSchema.fields.exists(_.name == "_hoodie_record_key")) { + return ValidationResult.notOk(s"Hudi meta field not present.") + } + + super.doValidateInternal() + } + + override def doCanonicalize(): HudiScanTransformer = { + HudiScanTransformer( + relation, + output.map(QueryPlan.normalizeExpressions(_, output)), + requiredSchema, + QueryPlan.normalizePredicates( + filterUnusedDynamicPruningExpressions(partitionFilters), + output), + optionalBucketSet, + optionalNumCoalescedBuckets, + QueryPlan.normalizePredicates(dataFilters, output), + None, + disableBucketedScan + ) + } +} + +object HudiScanTransformer { + + def apply( + scanExec: FileSourceScanExec, + newPartitionFilters: Seq[Expression]): HudiScanTransformer = { + new HudiScanTransformer( + scanExec.relation, + scanExec.output, + scanExec.requiredSchema, + newPartitionFilters, + scanExec.optionalBucketSet, + scanExec.optionalNumCoalescedBuckets, + scanExec.dataFilters, + scanExec.tableIdentifier, + scanExec.disableBucketedScan + ) + } + +} diff --git a/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformerProvider.scala b/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformerProvider.scala new file mode 100644 index 0000000000000..2968220dc64eb --- /dev/null +++ b/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformerProvider.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.execution.FileSourceScanExec + +class HudiScanTransformerProvider extends DataSourceScanTransformerRegister { + + override val scanClassName: String = "org.apache.hudi.HoodieFileIndex" + + override def createDataSourceTransformer( + batchScan: FileSourceScanExec, + newPartitionFilters: Seq[Expression]): FileSourceScanExecTransformerBase = { + HudiScanTransformer(batchScan, newPartitionFilters) + } +} diff --git a/gluten-hudi/src/test/scala/org/apache/gluten/execution/VeloxHudiSuite.scala b/gluten-hudi/src/test/scala/org/apache/gluten/execution/VeloxHudiSuite.scala new file mode 100644 index 0000000000000..942a94a3edff5 --- /dev/null +++ b/gluten-hudi/src/test/scala/org/apache/gluten/execution/VeloxHudiSuite.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.spark.SparkConf +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{ArrayType, IntegerType, MapType, StringType, StructType} + +import scala.collection.JavaConverters._ + +class VeloxHudiSuite extends WholeStageTransformerSuite { + + protected val rootPath: String = getClass.getResource("/").getPath + override protected val resourcePath: String = "/tpch-data-parquet-velox" + override protected val fileFormat: String = "parquet" + + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + .set("spark.sql.files.maxPartitionBytes", "1g") + .set("spark.sql.shuffle.partitions", "1") + .set("spark.memory.offHeap.size", "2g") + .set("spark.unsafe.exceptionOnMemoryLeak", "true") + .set("spark.sql.autoBroadcastJoinThreshold", "-1") + .set("spark.sql.sources.useV1SourceList", "avro") + .set("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") + .set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") + .set("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar") + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + } + + testWithSpecifiedSparkVersion("hudi: time travel", Some("3.3")) { + withTable("hudi_tm") { + spark.sql(s""" + |create table hudi_tm (id int, name string) using hudi + |""".stripMargin) + spark.sql(s""" + |insert into hudi_tm values (1, "v1"), (2, "v2") + |""".stripMargin) + spark.sql(s""" + |insert into hudi_tm values (3, "v3"), (4, "v4") + |""".stripMargin) + val df1 = runQueryAndCompare("select * from hudi_tm VERSION AS OF 1") { _ => } + checkLengthAndPlan(df1, 2) + checkAnswer(df1, Row(1, "v1") :: Row(2, "v2") :: Nil) + val df2 = runQueryAndCompare("select * from hudi_tm VERSION AS OF 2") { _ => } + checkLengthAndPlan(df2, 4) + checkAnswer(df2, Row(1, "v1") :: Row(2, "v2") :: Row(3, "v3") :: Row(4, "v4") :: Nil) + val df3 = runQueryAndCompare("select name from hudi_tm VERSION AS OF 2 where id = 2") { + _ => + } + checkLengthAndPlan(df3, 1) + checkAnswer(df3, Row("v2") :: Nil) + } + } + + testWithSpecifiedSparkVersion("hudi: partition filters", Some("3.2")) { + withTable("hudi_pf") { + spark.sql(s""" + |create table hudi_pf (id int, name string) using hudi partitioned by (name) + |""".stripMargin) + spark.sql(s""" + |insert into hudi_pf values (1, "v1"), (2, "v2"), (3, "v1"), (4, "v2") + |""".stripMargin) + val df1 = runQueryAndCompare("select * from hudi_pf where name = 'v1'") { _ => } + val hudiScanTransformer = df1.queryExecution.executedPlan.collect { + case f: HudiScanTransformer => f + }.head + // No data filters as only partition filters exist + assert(hudiScanTransformer.filterExprs().size == 0) + checkLengthAndPlan(df1, 2) + checkAnswer(df1, Row(1, "v1") :: Row(3, "v1") :: Nil) + } + } + +} diff --git a/gluten-hudi/src/test/scala/org/apache/gluten/execution/VeloxTPCHHudiSuite.scala b/gluten-hudi/src/test/scala/org/apache/gluten/execution/VeloxTPCHHudiSuite.scala new file mode 100644 index 0000000000000..ce6ec9bbcf304 --- /dev/null +++ b/gluten-hudi/src/test/scala/org/apache/gluten/execution/VeloxTPCHHudiSuite.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.spark.SparkConf + +import java.io.File + +class VeloxTPCHHudiSuite extends VeloxTPCHSuite { + + protected val tpchBasePath: String = new File( + "../backends-velox/src/test/resources").getAbsolutePath + + override protected val resourcePath: String = + new File(tpchBasePath, "tpch-data-parquet-velox").getCanonicalPath + + override protected val veloxTPCHQueries: String = + new File(tpchBasePath, "tpch-queries-velox").getCanonicalPath + + override protected val queriesResults: String = + new File(tpchBasePath, "queries-output").getCanonicalPath + + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.executor.memory", "4g") + .set("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") + .set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") + .set("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar") + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + } + + override protected def createTPCHNotNullTables(): Unit = { + TPCHTables + .map(_.name) + .map { + table => + val tablePath = new File(resourcePath, table).getAbsolutePath + val tableDF = spark.read.format(fileFormat).load(tablePath) + tableDF.write.format("hudi").mode("append").saveAsTable(table) + (table, tableDF) + } + .toMap + } + + override protected def afterAll(): Unit = { + TPCHTables.map(_.name).foreach(table => spark.sql(s"DROP TABLE IF EXISTS $table")) + super.afterAll() + } +} diff --git a/pom.xml b/pom.xml index 88cbb724e0531..9f9d6ef4add8e 100644 --- a/pom.xml +++ b/pom.xml @@ -142,6 +142,7 @@ delta-core 2.0.1 20 + 0.14.1 @@ -155,7 +156,8 @@ 1.3.1 delta-core 2.3.0 - 23 + 23 + 0.14.1 @@ -169,6 +171,7 @@ delta-core 2.4.0 24 + 0.14.1 @@ -275,6 +278,15 @@ gluten-iceberg + + hudi + + false + + + gluten-hudi + + backends-velox