diff --git a/docs/get-started/Velox.md b/docs/get-started/Velox.md
index aadc7c66adcb7..2f7dae4fb3dd4 100644
--- a/docs/get-started/Velox.md
+++ b/docs/get-started/Velox.md
@@ -321,7 +321,7 @@ About column mapping, see more [here](https://docs.delta.io/latest/delta-column-
## Iceberg Support
-Gluten with velox backend supports [Iceberg](https://iceberg.apache.org/) table. Currently, only reading COW (Copy-On-Write) tables is supported.
+Gluten with velox backend supports [Iceberg](https://iceberg.apache.org/) table. Currently, both reading COW (Copy-On-Write) and MOR (Merge-On-Read) tables are supported.
### How to use
@@ -333,7 +333,6 @@ mvn clean package -Pbackends-velox -Pspark-3.3 -Piceberg -DskipTests
Once built successfully, iceberg features will be included in gluten-velox-bundle-X jar. Then you can query iceberg table by gluten/velox without scan's fallback.
-
## Hudi Support
Gluten with velox backend supports [Hudi](https://hudi.apache.org/) table. Currently, only reading COW (Copy-On-Write) tables is supported.
@@ -346,7 +345,7 @@ First of all, compile gluten-hudi module by a `hudi` profile, as follows:
mvn clean package -Pbackends-velox -Pspark-3.3 -Phudi -DskipTests
```
-Once built successfully, hudi features will be included in gluten-velox-bundle-X jar. Then you can query hudi table by gluten/velox without scan's fallback.
+Once built successfully, hudi features will be included in gluten-velox-bundle-X jar. Then you can query hudi **COW** table by gluten/velox without scan's fallback.
# Coverage
diff --git a/gluten-core/src/main/scala/org/apache/gluten/execution/DataSourceScanTransformerRegister.scala b/gluten-core/src/main/scala/org/apache/gluten/execution/DataSourceScanTransformerRegister.scala
index 5b46c23857d3d..ac6811f0ba2f7 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/execution/DataSourceScanTransformerRegister.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/execution/DataSourceScanTransformerRegister.scala
@@ -29,13 +29,13 @@ trait DataSourceScanTransformerRegister {
/**
* The class name that used to identify what kind of datasource this is。
*
- * For DataSource V1, it should be the child class name of
- * [[org.apache.spark.sql.execution.datasources.FileIndex]].
+ * For DataSource V1, it should be relation.fileFormat like
+ * {{{
+ * override val scanClassName: String = "org.apache.spark.sql.delta.DeltaParquetFileFormat"
+ * }}}
*
* For DataSource V2, it should be the child class name of
- * [[org.apache.spark.sql.connector.read.Scan]].
- *
- * For example:
+ * [[org.apache.spark.sql.connector.read.Scan]]. For example:
* {{{
* override val scanClassName: String = "org.apache.iceberg.spark.source.SparkBatchQueryScan"
* }}}
diff --git a/gluten-core/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala b/gluten-core/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
index a05a5e72bfe1d..7d8c5aab6b87b 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
@@ -127,7 +127,7 @@ object ScanTransformerFactory {
.getOrElse(getClass.getClassLoader)
val serviceLoader = ServiceLoader.load(classOf[DataSourceScanTransformerRegister], loader)
serviceLoader.asScala
- .filter(_.scanClassName.equalsIgnoreCase(scanClassName))
+ .filter(service => scanClassName.contains(service.scanClassName))
.toList match {
case head :: Nil =>
// there is exactly one registered alias
diff --git a/gluten-hudi/pom.xml b/gluten-hudi/pom.xml
index e9aa342d4f1d5..be3d84b0167e2 100755
--- a/gluten-hudi/pom.xml
+++ b/gluten-hudi/pom.xml
@@ -63,14 +63,11 @@
test-jar
test
-
- org.apache.spark
- spark-core_${scala.binary.version}
-
org.apache.spark
spark-core_${scala.binary.version}
test-jar
+ test
org.apache.spark
diff --git a/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformer.scala b/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformer.scala
index d8fa461ea5897..76a818c96e37d 100644
--- a/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformer.scala
+++ b/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformer.scala
@@ -52,10 +52,9 @@ case class HudiScanTransformer(
override lazy val fileFormat: ReadFileFormat = ReadFileFormat.ParquetReadFormat
override protected def doValidateInternal(): ValidationResult = {
- if (!requiredSchema.fields.exists(_.name == "_hoodie_record_key")) {
- return ValidationResult.notOk(s"Hudi meta field not present.")
+ if (requiredSchema.fields.exists(_.name.startsWith("_hoodie"))) {
+ return ValidationResult.failed(s"Hudi meta field not supported.")
}
-
super.doValidateInternal()
}
@@ -78,14 +77,12 @@ case class HudiScanTransformer(
object HudiScanTransformer {
- def apply(
- scanExec: FileSourceScanExec,
- newPartitionFilters: Seq[Expression]): HudiScanTransformer = {
+ def apply(scanExec: FileSourceScanExec): HudiScanTransformer = {
new HudiScanTransformer(
scanExec.relation,
scanExec.output,
scanExec.requiredSchema,
- newPartitionFilters,
+ scanExec.partitionFilters,
scanExec.optionalBucketSet,
scanExec.optionalNumCoalescedBuckets,
scanExec.dataFilters,
@@ -93,5 +90,4 @@ object HudiScanTransformer {
scanExec.disableBucketedScan
)
}
-
}
diff --git a/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformerProvider.scala b/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformerProvider.scala
index 2968220dc64eb..6c083107f79a8 100644
--- a/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformerProvider.scala
+++ b/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformerProvider.scala
@@ -16,16 +16,14 @@
*/
package org.apache.gluten.execution
-import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.FileSourceScanExec
class HudiScanTransformerProvider extends DataSourceScanTransformerRegister {
- override val scanClassName: String = "org.apache.hudi.HoodieFileIndex"
+ override val scanClassName: String = "HoodieParquetFileFormat"
override def createDataSourceTransformer(
- batchScan: FileSourceScanExec,
- newPartitionFilters: Seq[Expression]): FileSourceScanExecTransformerBase = {
- HudiScanTransformer(batchScan, newPartitionFilters)
+ batchScan: FileSourceScanExec): FileSourceScanExecTransformerBase = {
+ HudiScanTransformer(batchScan)
}
}
diff --git a/gluten-hudi/src/test/scala/org/apache/gluten/execution/VeloxHudiSuite.scala b/gluten-hudi/src/test/scala/org/apache/gluten/execution/VeloxHudiSuite.scala
index 942a94a3edff5..4f9bd896c2522 100644
--- a/gluten-hudi/src/test/scala/org/apache/gluten/execution/VeloxHudiSuite.scala
+++ b/gluten-hudi/src/test/scala/org/apache/gluten/execution/VeloxHudiSuite.scala
@@ -18,9 +18,6 @@ package org.apache.gluten.execution
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
-import org.apache.spark.sql.types.{ArrayType, IntegerType, MapType, StringType, StructType}
-
-import scala.collection.JavaConverters._
class VeloxHudiSuite extends WholeStageTransformerSuite {
@@ -31,8 +28,6 @@ class VeloxHudiSuite extends WholeStageTransformerSuite {
override protected def sparkConf: SparkConf = {
super.sparkConf
.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager")
- .set("spark.sql.files.maxPartitionBytes", "1g")
- .set("spark.sql.shuffle.partitions", "1")
.set("spark.memory.offHeap.size", "2g")
.set("spark.unsafe.exceptionOnMemoryLeak", "true")
.set("spark.sql.autoBroadcastJoinThreshold", "-1")
@@ -43,7 +38,7 @@ class VeloxHudiSuite extends WholeStageTransformerSuite {
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
}
- testWithSpecifiedSparkVersion("hudi: time travel", Some("3.3")) {
+ testWithSpecifiedSparkVersion("hudi: time travel", Some("3.2")) {
withTable("hudi_tm") {
spark.sql(s"""
|create table hudi_tm (id int, name string) using hudi
@@ -54,17 +49,65 @@ class VeloxHudiSuite extends WholeStageTransformerSuite {
spark.sql(s"""
|insert into hudi_tm values (3, "v3"), (4, "v4")
|""".stripMargin)
- val df1 = runQueryAndCompare("select * from hudi_tm VERSION AS OF 1") { _ => }
+ val df = spark.sql(" select _hoodie_commit_time from hudi_tm;")
+ val value = df.collectAsList().get(0).getAs[String](0)
+ val df1 = runQueryAndCompare("select id, name from hudi_tm timestamp AS OF " + value) {
+ checkGlutenOperatorMatch[HudiScanTransformer]
+ }
checkLengthAndPlan(df1, 2)
checkAnswer(df1, Row(1, "v1") :: Row(2, "v2") :: Nil)
- val df2 = runQueryAndCompare("select * from hudi_tm VERSION AS OF 2") { _ => }
- checkLengthAndPlan(df2, 4)
- checkAnswer(df2, Row(1, "v1") :: Row(2, "v2") :: Row(3, "v3") :: Row(4, "v4") :: Nil)
- val df3 = runQueryAndCompare("select name from hudi_tm VERSION AS OF 2 where id = 2") {
- _ =>
+ val df2 =
+ runQueryAndCompare("select name from hudi_tm timestamp AS OF " + value + " where id = 2") {
+ checkGlutenOperatorMatch[HudiScanTransformer]
+ }
+ checkLengthAndPlan(df2, 1)
+ checkAnswer(df2, Row("v2") :: Nil)
+ }
+ }
+
+ testWithSpecifiedSparkVersion("hudi: soft delete", Some("3.2")) {
+ withTable("hudi_pf") {
+ spark.sql(s"""
+ |create table hudi_pf (id int, name string) using hudi
+ |""".stripMargin)
+ spark.sql(s"""
+ |insert into hudi_pf values (1, "v1"), (2, "v2"), (3, "v1"), (4, "v2")
+ |""".stripMargin)
+ spark.sql(s"""
+ |delete from hudi_pf where name = "v2"
+ |""".stripMargin)
+ val df1 = runQueryAndCompare("select id, name from hudi_pf") {
+ checkGlutenOperatorMatch[HudiScanTransformer]
}
- checkLengthAndPlan(df3, 1)
- checkAnswer(df3, Row("v2") :: Nil)
+ checkLengthAndPlan(df1, 2)
+ checkAnswer(df1, Row(1, "v1") :: Row(3, "v1") :: Nil)
+ }
+ }
+
+ // FIXME: flaky leaked file systems issue
+ ignore("hudi: mor", Some("3.2")) {
+ withTable("hudi_mor") {
+ spark.sql(s"""
+ |create table hudi_mor (id int, name string, ts bigint)
+ |using hudi
+ |tblproperties (
+ | type = 'mor',
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ |)
+ |""".stripMargin)
+ spark.sql(s"""
+ |insert into hudi_mor values (1, "v1", 1000), (2, "v2", 2000),
+ | (3, "v1", 3000), (4, "v2", 4000)
+ |""".stripMargin)
+ spark.sql(s"""
+ |delete from hudi_mor where id = 1
+ |""".stripMargin)
+ val df1 =
+ runQueryAndCompare("select id, name from hudi_mor where name = 'v1'", true, false, false) {
+ _ =>
+ }
+ checkAnswer(df1, Row(3, "v1") :: Nil)
}
}
@@ -76,7 +119,7 @@ class VeloxHudiSuite extends WholeStageTransformerSuite {
spark.sql(s"""
|insert into hudi_pf values (1, "v1"), (2, "v2"), (3, "v1"), (4, "v2")
|""".stripMargin)
- val df1 = runQueryAndCompare("select * from hudi_pf where name = 'v1'") { _ => }
+ val df1 = runQueryAndCompare("select id, name from hudi_pf where name = 'v1'") { _ => }
val hudiScanTransformer = df1.queryExecution.executedPlan.collect {
case f: HudiScanTransformer => f
}.head
@@ -86,5 +129,4 @@ class VeloxHudiSuite extends WholeStageTransformerSuite {
checkAnswer(df1, Row(1, "v1") :: Row(3, "v1") :: Nil)
}
}
-
}
diff --git a/package/pom.xml b/package/pom.xml
index f385a2a5a0586..ac0754c38e7e3 100644
--- a/package/pom.xml
+++ b/package/pom.xml
@@ -98,7 +98,17 @@
org.apache.gluten
- gluten-delta
+ gluten-delta
+ ${project.version}
+
+
+
+
+ hudi
+
+
+ org.apache.gluten
+ gluten-hudi
${project.version}
diff --git a/pom.xml b/pom.xml
index 3d0cc53f1255b..9381adc55b324 100644
--- a/pom.xml
+++ b/pom.xml
@@ -264,7 +264,7 @@
2.0.1
20
4.8
- 0.14.1
+ 0.15.0
@@ -280,7 +280,7 @@
2.3.0
23
4.8
- 0.14.1
+ 0.15.0
@@ -295,7 +295,7 @@
2.4.0
24
4.9.3
- 0.14.1
+ 0.15.0
@@ -308,7 +308,8 @@
1.5.0
delta-spark
3.2.0
- 32
+ 32
+ 0.15.0
2.15.1
3.3.4
4.9.3