diff --git a/.github/workflows/velox_backend.yml b/.github/workflows/velox_backend.yml index fc375c666a4f..de6995673167 100644 --- a/.github/workflows/velox_backend.yml +++ b/.github/workflows/velox_backend.yml @@ -30,6 +30,7 @@ on: - 'gluten-data/**' - 'gluten-delta/**' - 'gluten-iceberg/**' + - 'gluten-hudi/**' - 'gluten-ut/**' - 'shims/**' - 'tools/gluten-it/**' @@ -567,7 +568,7 @@ jobs: cd $GITHUB_WORKSPACE/ export SPARK_SCALA_VERSION=2.12 $MVN_CMD clean test -Pspark-3.2 -Pspark-ut -Pbackends-velox -Pceleborn -Piceberg \ - -Pdelta -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark32/spark_home/" \ + -Pdelta -Phudi -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark32/spark_home/" \ -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags - name: Upload golden files if: failure() @@ -617,7 +618,7 @@ jobs: - name: Build and run unit test for Spark 3.2.2 (slow tests) run: | cd $GITHUB_WORKSPACE/ - $MVN_CMD clean test -Pspark-3.2 -Pspark-ut -Pbackends-velox -Pceleborn -Piceberg -Pdelta \ + $MVN_CMD clean test -Pspark-3.2 -Pspark-ut -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi \ -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark32/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest run-spark-test-spark33: @@ -667,7 +668,7 @@ jobs: run: | cd $GITHUB_WORKSPACE/ export SPARK_SCALA_VERSION=2.12 - $MVN_CMD clean test -Pspark-3.3 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut \ + $MVN_CMD clean test -Pspark-3.3 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi -Pspark-ut \ -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark33/spark_home/" \ -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags - name: Upload golden files @@ -719,7 +720,7 @@ jobs: - name: Build and Run unit test for Spark 3.3.1 (slow tests) run: | cd $GITHUB_WORKSPACE/ - $MVN_CMD clean test -Pspark-3.3 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut \ + $MVN_CMD clean test -Pspark-3.3 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi -Pspark-ut \ -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark33/spark_home/" \ -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest @@ -770,7 +771,7 @@ jobs: run: | cd $GITHUB_WORKSPACE/ export SPARK_SCALA_VERSION=2.12 - $MVN_CMD clean test -Pspark-3.4 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut \ + $MVN_CMD clean test -Pspark-3.4 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi -Pspark-ut \ -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark34/spark_home/" \ -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags - name: Upload golden files @@ -822,7 +823,7 @@ jobs: - name: Build and Run unit test for Spark 3.4.2 (slow tests) run: | cd $GITHUB_WORKSPACE/ - $MVN_CMD clean test -Pspark-3.4 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut \ + $MVN_CMD clean test -Pspark-3.4 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut -Phudi \ -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark34/spark_home/" \ -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest @@ -873,7 +874,7 @@ jobs: run: | cd $GITHUB_WORKSPACE/ export SPARK_SCALA_VERSION=2.12 - $MVN_CMD clean test -Pspark-3.5 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut \ + $MVN_CMD clean test -Pspark-3.5 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi -Pspark-ut \ -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark35/spark_home/" \ -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags - name: Upload golden files @@ -975,7 +976,7 @@ jobs: - name: Build and Run unit test for Spark 3.5.1 (slow tests) run: | cd $GITHUB_WORKSPACE/ - $MVN_CMD clean test -Pspark-3.5 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut \ + $MVN_CMD clean test -Pspark-3.5 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi -Pspark-ut \ -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark35/spark_home/" \ -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest diff --git a/docs/get-started/Velox.md b/docs/get-started/Velox.md index 776736b2fff5..2f7dae4fb3dd 100644 --- a/docs/get-started/Velox.md +++ b/docs/get-started/Velox.md @@ -321,7 +321,7 @@ About column mapping, see more [here](https://docs.delta.io/latest/delta-column- ## Iceberg Support -Gluten with velox backend supports [Iceberg](https://iceberg.apache.org/) table. Currently, only reading COW (Copy-On-Write) tables is supported. +Gluten with velox backend supports [Iceberg](https://iceberg.apache.org/) table. Currently, both reading COW (Copy-On-Write) and MOR (Merge-On-Read) tables are supported. ### How to use @@ -333,6 +333,20 @@ mvn clean package -Pbackends-velox -Pspark-3.3 -Piceberg -DskipTests Once built successfully, iceberg features will be included in gluten-velox-bundle-X jar. Then you can query iceberg table by gluten/velox without scan's fallback. +## Hudi Support + +Gluten with velox backend supports [Hudi](https://hudi.apache.org/) table. Currently, only reading COW (Copy-On-Write) tables is supported. + +### How to use + +First of all, compile gluten-hudi module by a `hudi` profile, as follows: + +``` +mvn clean package -Pbackends-velox -Pspark-3.3 -Phudi -DskipTests +``` + +Once built successfully, hudi features will be included in gluten-velox-bundle-X jar. Then you can query hudi **COW** table by gluten/velox without scan's fallback. + # Coverage Spark3.3 has 387 functions in total. ~240 are commonly used. To get the support status of all Spark built-in functions, please refer to [Velox Backend's Supported Operators & Functions](../velox-backend-support-progress.md). diff --git a/docs/get-started/build-guide.md b/docs/get-started/build-guide.md index 05e6119ecd4f..d1b1533ad1e3 100644 --- a/docs/get-started/build-guide.md +++ b/docs/get-started/build-guide.md @@ -62,6 +62,7 @@ The below parameters can be set via `-P` for mvn. | uniffle | Build Gluten with Uniffle. | disabled | | delta | Build Gluten with Delta Lake support. | disabled | | iceberg | Build Gluten with Iceberg support. | disabled | +| hudi | Build Gluten with Hudi support. | disabled | | spark-3.2 | Build Gluten for Spark 3.2. | enabled | | spark-3.3 | Build Gluten for Spark 3.3. | disabled | | spark-3.4 | Build Gluten for Spark 3.4. | disabled | diff --git a/gluten-core/src/main/scala/org/apache/gluten/execution/DataSourceScanTransformerRegister.scala b/gluten-core/src/main/scala/org/apache/gluten/execution/DataSourceScanTransformerRegister.scala index 5b46c23857d3..ac6811f0ba2f 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/execution/DataSourceScanTransformerRegister.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/execution/DataSourceScanTransformerRegister.scala @@ -29,13 +29,13 @@ trait DataSourceScanTransformerRegister { /** * The class name that used to identify what kind of datasource this is。 * - * For DataSource V1, it should be the child class name of - * [[org.apache.spark.sql.execution.datasources.FileIndex]]. + * For DataSource V1, it should be relation.fileFormat like + * {{{ + * override val scanClassName: String = "org.apache.spark.sql.delta.DeltaParquetFileFormat" + * }}} * * For DataSource V2, it should be the child class name of - * [[org.apache.spark.sql.connector.read.Scan]]. - * - * For example: + * [[org.apache.spark.sql.connector.read.Scan]]. For example: * {{{ * override val scanClassName: String = "org.apache.iceberg.spark.source.SparkBatchQueryScan" * }}} diff --git a/gluten-core/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala b/gluten-core/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala index a05a5e72bfe1..7d8c5aab6b87 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala @@ -127,7 +127,7 @@ object ScanTransformerFactory { .getOrElse(getClass.getClassLoader) val serviceLoader = ServiceLoader.load(classOf[DataSourceScanTransformerRegister], loader) serviceLoader.asScala - .filter(_.scanClassName.equalsIgnoreCase(scanClassName)) + .filter(service => scanClassName.contains(service.scanClassName)) .toList match { case head :: Nil => // there is exactly one registered alias diff --git a/gluten-hudi/pom.xml b/gluten-hudi/pom.xml new file mode 100755 index 000000000000..2faf53a07ea7 --- /dev/null +++ b/gluten-hudi/pom.xml @@ -0,0 +1,162 @@ + + + + gluten-parent + org.apache.gluten + 1.3.0-SNAPSHOT + ../pom.xml + + 4.0.0 + + gluten-hudi + jar + Gluten Hudi + + + ${project.basedir}/src/main/resources + + + + + org.apache.gluten + gluten-core + ${project.version} + provided + + + org.apache.hudi + hudi-spark${sparkbundle.version}-bundle_${scala.binary.version} + ${hudi.version} + provided + + + org.apache.spark + spark-core_${scala.binary.version} + provided + + + org.apache.spark + spark-sql_${scala.binary.version} + provided + + + + + org.apache.gluten + gluten-core + ${project.version} + test-jar + test + + + org.apache.gluten + backends-velox + ${project.version} + test + + + org.apache.gluten + backends-velox + ${project.version} + test-jar + test + + + org.apache.spark + spark-core_${scala.binary.version} + test-jar + test + + + org.apache.spark + spark-sql_${scala.binary.version} + test-jar + + + org.apache.spark + spark-catalyst_${scala.binary.version} + test-jar + + + org.apache.spark + spark-hive_${scala.binary.version} + ${spark.version} + test + + + org.apache.hadoop + hadoop-client + ${hadoop.version} + test + + + com.google.protobuf + protobuf-java + ${protobuf.version} + test + + + org.scalatest + scalatest_${scala.binary.version} + test + + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + ${resource.dir} + + + + + net.alchim31.maven + scala-maven-plugin + + + org.apache.maven.plugins + maven-compiler-plugin + + + org.scalastyle + scalastyle-maven-plugin + + + org.apache.maven.plugins + maven-checkstyle-plugin + + + com.diffplug.spotless + spotless-maven-plugin + + + org.scalatest + scalatest-maven-plugin + + . + + + + org.apache.maven.plugins + maven-jar-plugin + + + prepare-test-jar + test-compile + + test-jar + + + + + + org.apache.maven.plugins + maven-resources-plugin + + + + diff --git a/gluten-hudi/src/main/resources/META-INF/services/org.apache.gluten.execution.DataSourceScanTransformerRegister b/gluten-hudi/src/main/resources/META-INF/services/org.apache.gluten.execution.DataSourceScanTransformerRegister new file mode 100644 index 000000000000..ccfe1ada479e --- /dev/null +++ b/gluten-hudi/src/main/resources/META-INF/services/org.apache.gluten.execution.DataSourceScanTransformerRegister @@ -0,0 +1 @@ +org.apache.gluten.execution.HudiScanTransformerProvider \ No newline at end of file diff --git a/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformer.scala b/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformer.scala new file mode 100644 index 000000000000..76a818c96e37 --- /dev/null +++ b/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformer.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.gluten.extension.ValidationResult +import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.execution.FileSourceScanExec +import org.apache.spark.sql.execution.datasources.HadoopFsRelation +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.collection.BitSet + +case class HudiScanTransformer( + @transient override val relation: HadoopFsRelation, + override val output: Seq[Attribute], + override val requiredSchema: StructType, + override val partitionFilters: Seq[Expression], + override val optionalBucketSet: Option[BitSet], + override val optionalNumCoalescedBuckets: Option[Int], + override val dataFilters: Seq[Expression], + override val tableIdentifier: Option[TableIdentifier], + override val disableBucketedScan: Boolean = false) + extends FileSourceScanExecTransformerBase( + relation, + output, + requiredSchema, + partitionFilters, + optionalBucketSet, + optionalNumCoalescedBuckets, + dataFilters, + tableIdentifier, + disableBucketedScan + ) { + + override lazy val fileFormat: ReadFileFormat = ReadFileFormat.ParquetReadFormat + + override protected def doValidateInternal(): ValidationResult = { + if (requiredSchema.fields.exists(_.name.startsWith("_hoodie"))) { + return ValidationResult.failed(s"Hudi meta field not supported.") + } + super.doValidateInternal() + } + + override def doCanonicalize(): HudiScanTransformer = { + HudiScanTransformer( + relation, + output.map(QueryPlan.normalizeExpressions(_, output)), + requiredSchema, + QueryPlan.normalizePredicates( + filterUnusedDynamicPruningExpressions(partitionFilters), + output), + optionalBucketSet, + optionalNumCoalescedBuckets, + QueryPlan.normalizePredicates(dataFilters, output), + None, + disableBucketedScan + ) + } +} + +object HudiScanTransformer { + + def apply(scanExec: FileSourceScanExec): HudiScanTransformer = { + new HudiScanTransformer( + scanExec.relation, + scanExec.output, + scanExec.requiredSchema, + scanExec.partitionFilters, + scanExec.optionalBucketSet, + scanExec.optionalNumCoalescedBuckets, + scanExec.dataFilters, + scanExec.tableIdentifier, + scanExec.disableBucketedScan + ) + } +} diff --git a/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformerProvider.scala b/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformerProvider.scala new file mode 100644 index 000000000000..6c083107f79a --- /dev/null +++ b/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformerProvider.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.spark.sql.execution.FileSourceScanExec + +class HudiScanTransformerProvider extends DataSourceScanTransformerRegister { + + override val scanClassName: String = "HoodieParquetFileFormat" + + override def createDataSourceTransformer( + batchScan: FileSourceScanExec): FileSourceScanExecTransformerBase = { + HudiScanTransformer(batchScan) + } +} diff --git a/gluten-hudi/src/test/scala/org/apache/gluten/execution/VeloxHudiSuite.scala b/gluten-hudi/src/test/scala/org/apache/gluten/execution/VeloxHudiSuite.scala new file mode 100644 index 000000000000..4f9bd896c252 --- /dev/null +++ b/gluten-hudi/src/test/scala/org/apache/gluten/execution/VeloxHudiSuite.scala @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.spark.SparkConf +import org.apache.spark.sql.Row + +class VeloxHudiSuite extends WholeStageTransformerSuite { + + protected val rootPath: String = getClass.getResource("/").getPath + override protected val resourcePath: String = "/tpch-data-parquet-velox" + override protected val fileFormat: String = "parquet" + + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + .set("spark.memory.offHeap.size", "2g") + .set("spark.unsafe.exceptionOnMemoryLeak", "true") + .set("spark.sql.autoBroadcastJoinThreshold", "-1") + .set("spark.sql.sources.useV1SourceList", "avro") + .set("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") + .set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") + .set("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar") + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + } + + testWithSpecifiedSparkVersion("hudi: time travel", Some("3.2")) { + withTable("hudi_tm") { + spark.sql(s""" + |create table hudi_tm (id int, name string) using hudi + |""".stripMargin) + spark.sql(s""" + |insert into hudi_tm values (1, "v1"), (2, "v2") + |""".stripMargin) + spark.sql(s""" + |insert into hudi_tm values (3, "v3"), (4, "v4") + |""".stripMargin) + val df = spark.sql(" select _hoodie_commit_time from hudi_tm;") + val value = df.collectAsList().get(0).getAs[String](0) + val df1 = runQueryAndCompare("select id, name from hudi_tm timestamp AS OF " + value) { + checkGlutenOperatorMatch[HudiScanTransformer] + } + checkLengthAndPlan(df1, 2) + checkAnswer(df1, Row(1, "v1") :: Row(2, "v2") :: Nil) + val df2 = + runQueryAndCompare("select name from hudi_tm timestamp AS OF " + value + " where id = 2") { + checkGlutenOperatorMatch[HudiScanTransformer] + } + checkLengthAndPlan(df2, 1) + checkAnswer(df2, Row("v2") :: Nil) + } + } + + testWithSpecifiedSparkVersion("hudi: soft delete", Some("3.2")) { + withTable("hudi_pf") { + spark.sql(s""" + |create table hudi_pf (id int, name string) using hudi + |""".stripMargin) + spark.sql(s""" + |insert into hudi_pf values (1, "v1"), (2, "v2"), (3, "v1"), (4, "v2") + |""".stripMargin) + spark.sql(s""" + |delete from hudi_pf where name = "v2" + |""".stripMargin) + val df1 = runQueryAndCompare("select id, name from hudi_pf") { + checkGlutenOperatorMatch[HudiScanTransformer] + } + checkLengthAndPlan(df1, 2) + checkAnswer(df1, Row(1, "v1") :: Row(3, "v1") :: Nil) + } + } + + // FIXME: flaky leaked file systems issue + ignore("hudi: mor", Some("3.2")) { + withTable("hudi_mor") { + spark.sql(s""" + |create table hudi_mor (id int, name string, ts bigint) + |using hudi + |tblproperties ( + | type = 'mor', + | primaryKey = 'id', + | preCombineField = 'ts' + |) + |""".stripMargin) + spark.sql(s""" + |insert into hudi_mor values (1, "v1", 1000), (2, "v2", 2000), + | (3, "v1", 3000), (4, "v2", 4000) + |""".stripMargin) + spark.sql(s""" + |delete from hudi_mor where id = 1 + |""".stripMargin) + val df1 = + runQueryAndCompare("select id, name from hudi_mor where name = 'v1'", true, false, false) { + _ => + } + checkAnswer(df1, Row(3, "v1") :: Nil) + } + } + + testWithSpecifiedSparkVersion("hudi: partition filters", Some("3.2")) { + withTable("hudi_pf") { + spark.sql(s""" + |create table hudi_pf (id int, name string) using hudi partitioned by (name) + |""".stripMargin) + spark.sql(s""" + |insert into hudi_pf values (1, "v1"), (2, "v2"), (3, "v1"), (4, "v2") + |""".stripMargin) + val df1 = runQueryAndCompare("select id, name from hudi_pf where name = 'v1'") { _ => } + val hudiScanTransformer = df1.queryExecution.executedPlan.collect { + case f: HudiScanTransformer => f + }.head + // No data filters as only partition filters exist + assert(hudiScanTransformer.filterExprs().size == 0) + checkLengthAndPlan(df1, 2) + checkAnswer(df1, Row(1, "v1") :: Row(3, "v1") :: Nil) + } + } +} diff --git a/gluten-hudi/src/test/scala/org/apache/gluten/execution/VeloxTPCHHudiSuite.scala b/gluten-hudi/src/test/scala/org/apache/gluten/execution/VeloxTPCHHudiSuite.scala new file mode 100644 index 000000000000..ce6ec9bbcf30 --- /dev/null +++ b/gluten-hudi/src/test/scala/org/apache/gluten/execution/VeloxTPCHHudiSuite.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.spark.SparkConf + +import java.io.File + +class VeloxTPCHHudiSuite extends VeloxTPCHSuite { + + protected val tpchBasePath: String = new File( + "../backends-velox/src/test/resources").getAbsolutePath + + override protected val resourcePath: String = + new File(tpchBasePath, "tpch-data-parquet-velox").getCanonicalPath + + override protected val veloxTPCHQueries: String = + new File(tpchBasePath, "tpch-queries-velox").getCanonicalPath + + override protected val queriesResults: String = + new File(tpchBasePath, "queries-output").getCanonicalPath + + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.executor.memory", "4g") + .set("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") + .set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") + .set("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar") + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + } + + override protected def createTPCHNotNullTables(): Unit = { + TPCHTables + .map(_.name) + .map { + table => + val tablePath = new File(resourcePath, table).getAbsolutePath + val tableDF = spark.read.format(fileFormat).load(tablePath) + tableDF.write.format("hudi").mode("append").saveAsTable(table) + (table, tableDF) + } + .toMap + } + + override protected def afterAll(): Unit = { + TPCHTables.map(_.name).foreach(table => spark.sql(s"DROP TABLE IF EXISTS $table")) + super.afterAll() + } +} diff --git a/package/pom.xml b/package/pom.xml index f385a2a5a058..ac0754c38e7e 100644 --- a/package/pom.xml +++ b/package/pom.xml @@ -98,7 +98,17 @@ org.apache.gluten - gluten-delta + gluten-delta + ${project.version} + + + + + hudi + + + org.apache.gluten + gluten-hudi ${project.version} diff --git a/pom.xml b/pom.xml index e95300744813..991ff835dcb4 100644 --- a/pom.xml +++ b/pom.xml @@ -280,6 +280,7 @@ 2.0.1 20 4.8 + 0.15.0 @@ -295,6 +296,7 @@ 2.3.0 23 4.8 + 0.15.0 @@ -309,6 +311,7 @@ 2.4.0 24 4.9.3 + 0.15.0 @@ -321,7 +324,8 @@ 1.5.0 delta-spark 3.2.0 - 32 + 32 + 0.15.0 2.15.1 3.3.4 4.9.3 @@ -430,6 +434,15 @@ gluten-iceberg + + hudi + + false + + + gluten-hudi + + backends-velox