Skip to content

Commit

Permalink
[GLUTEN-5471][VL]feat: Support read Hudi COW table
Browse files Browse the repository at this point in the history
  • Loading branch information
xushiyan authored and yma11 committed Jun 11, 2024
1 parent d3ccd4a commit db6f75a
Show file tree
Hide file tree
Showing 10 changed files with 491 additions and 13 deletions.
25 changes: 13 additions & 12 deletions .github/workflows/velox_docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ on:
- 'gluten-data/**'
- 'gluten-delta/**'
- 'gluten-iceberg/**'
- 'gluten-hudi/**'
- 'gluten-ut/**'
- 'shims/**'
- 'tools/gluten-it/**'
Expand Down Expand Up @@ -664,8 +665,8 @@ jobs:
run: |
cd $GITHUB_WORKSPACE/
export SPARK_SCALA_VERSION=2.12
$MVN_CMD clean install -Pspark-3.2 -Pspark-ut -Pbackends-velox -Pceleborn -Piceberg -Pdelta -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark32/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \
$MVN_CMD test -Pspark-3.2 -Pbackends-velox -Piceberg -Pdelta -DtagsToExclude=None -DtagsToInclude=org.apache.gluten.tags.UDFTest
$MVN_CMD clean install -Pspark-3.2 -Pspark-ut -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark32/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \
$MVN_CMD test -Pspark-3.2 -Pbackends-velox -Piceberg -Pdelta -Phudi -DtagsToExclude=None -DtagsToInclude=org.apache.gluten.tags.UDFTest
- name: Upload golden files
if: failure()
uses: actions/upload-artifact@v4
Expand Down Expand Up @@ -727,7 +728,7 @@ jobs:
- name: Build and run unit test for Spark 3.2.2 (slow tests)
run: |
cd $GITHUB_WORKSPACE/
$MVN_CMD clean install -Pspark-3.2 -Pspark-ut -Pbackends-velox -Pceleborn -Piceberg -Pdelta -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark32/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
$MVN_CMD clean install -Pspark-3.2 -Pspark-ut -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark32/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
run-spark-test-spark33:
needs: build-native-lib-centos-8
Expand Down Expand Up @@ -794,8 +795,8 @@ jobs:
run: |
cd $GITHUB_WORKSPACE/
export SPARK_SCALA_VERSION=2.12
$MVN_CMD clean install -Pspark-3.3 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark33/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \
$MVN_CMD test -Pspark-3.3 -Pbackends-velox -Piceberg -Pdelta -DtagsToExclude=None -DtagsToInclude=org.apache.gluten.tags.UDFTest
$MVN_CMD clean install -Pspark-3.3 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark33/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \
$MVN_CMD test -Pspark-3.3 -Pbackends-velox -Piceberg -Pdelta -Phudi -DtagsToExclude=None -DtagsToInclude=org.apache.gluten.tags.UDFTest
- name: Upload golden files
if: failure()
uses: actions/upload-artifact@v4
Expand Down Expand Up @@ -852,7 +853,7 @@ jobs:
- name: Build and Run unit test for Spark 3.3.1 (slow tests)
run: |
cd $GITHUB_WORKSPACE/
$MVN_CMD clean install -Pspark-3.3 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark33/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
$MVN_CMD clean install -Pspark-3.3 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark33/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
run-spark-test-spark34:
needs: build-native-lib-centos-8
Expand Down Expand Up @@ -919,8 +920,8 @@ jobs:
run: |
cd $GITHUB_WORKSPACE/
export SPARK_SCALA_VERSION=2.12
$MVN_CMD clean install -Pspark-3.4 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark34/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \
$MVN_CMD test -Pspark-3.4 -Pbackends-velox -Piceberg -Pdelta -DtagsToExclude=None -DtagsToInclude=org.apache.gluten.tags.UDFTest
$MVN_CMD clean install -Pspark-3.4 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark34/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \
$MVN_CMD test -Pspark-3.4 -Pbackends-velox -Piceberg -Pdelta -Phudi -DtagsToExclude=None -DtagsToInclude=org.apache.gluten.tags.UDFTest
- name: Upload golden files
if: failure()
uses: actions/upload-artifact@v4
Expand Down Expand Up @@ -977,7 +978,7 @@ jobs:
- name: Build and Run unit test for Spark 3.4.2 (slow tests)
run: |
cd $GITHUB_WORKSPACE/
$MVN_CMD clean install -Pspark-3.4 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark34/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
$MVN_CMD clean install -Pspark-3.4 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark34/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
run-spark-test-spark35:
needs: build-native-lib-centos-8
Expand Down Expand Up @@ -1044,8 +1045,8 @@ jobs:
run: |
cd $GITHUB_WORKSPACE/
export SPARK_SCALA_VERSION=2.12
$MVN_CMD clean install -Pspark-3.5 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark35/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \
$MVN_CMD test -Pspark-3.5 -Pbackends-velox -Piceberg -Pdelta -DtagsToExclude=None -DtagsToInclude=org.apache.gluten.tags.UDFTest
$MVN_CMD clean install -Pspark-3.5 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark35/spark_home/" -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags && \
$MVN_CMD test -Pspark-3.5 -Pbackends-velox -Piceberg -Pdelta -Phudi -DtagsToExclude=None -DtagsToInclude=org.apache.gluten.tags.UDFTest
- name: Upload golden files
if: failure()
uses: actions/upload-artifact@v4
Expand Down Expand Up @@ -1107,4 +1108,4 @@ jobs:
- name: Build and Run unit test for Spark 3.5.1 (slow tests)
run: |
cd $GITHUB_WORKSPACE/
$MVN_CMD clean install -Pspark-3.5 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark35/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
$MVN_CMD clean install -Pspark-3.5 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Phudi -Pspark-ut -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark35/spark_home/" -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
17 changes: 17 additions & 0 deletions docs/get-started/Velox.md
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,23 @@ Once built successfully, iceberg features will be included in gluten-velox-bundl

After the two steps, you can query iceberg table by gluten/velox without scan's fallback.

## Hudi Support

Gluten with velox backend supports [Hudi](https://hudi.apache.org/) table. Currently, only reading COW (Copy-On-Write) tables is supported.

### How to use

First of all, compile gluten-hudi module by a `hudi` profile, as follows:

```
mvn clean package -Pbackends-velox -Pspark-3.3 -Phudi -DskipTests
```

Then, put the additional `gluten-hudi-XX-SNAPSHOT.jar` to the class path (usually it's `$SPARK_HOME/jars`).
The gluten-hudi jar is in `gluten-hudi/target` directory.

After the two steps, you can query Hudi table by gluten/velox without scan's fallback.

# Coverage

Spark3.3 has 387 functions in total. ~240 are commonly used. Velox's functions have two category, Presto and Spark. Presto has 124 functions implemented. Spark has 62 functions. Spark functions are verified to have the same result as Vanilla Spark. Some Presto functions have the same result as Vanilla Spark but some others have different. Gluten prefer to use Spark functions firstly. If it's not in Spark's list but implemented in Presto, we currently offload to Presto one until we noted some result mismatch, then we need to reimplement the function in Spark category. Gluten currently offloads 94 functions and 14 operators, more details refer to [Velox Backend's Supported Operators & Functions](../velox-backend-support-progress.md).
Expand Down
1 change: 1 addition & 0 deletions docs/get-started/build-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ The below parameters can be set via `-P` for mvn.
| rss | Build Gluten with Remote Shuffle Service, only applicable for Velox backend. | disabled |
| delta | Build Gluten with Delta Lake support. | disabled |
| iceberg | Build Gluten with Iceberg support. | disabled |
| hudi | Build Gluten with Hudi support. | disabled |
| spark-3.2 | Build Gluten for Spark 3.2. | enabled |
| spark-3.3 | Build Gluten for Spark 3.3. | disabled |
| spark-3.4 | Build Gluten for Spark 3.4. | disabled |
Expand Down
165 changes: 165 additions & 0 deletions gluten-hudi/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>gluten-parent</artifactId>
<groupId>org.apache.gluten</groupId>
<version>1.2.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>gluten-hudi</artifactId>
<packaging>jar</packaging>
<name>Gluten Hudi</name>

<properties>
<resource.dir>${project.basedir}/src/main/resources</resource.dir>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.gluten</groupId>
<artifactId>gluten-core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark${sparkbundle.version}-bundle_${scala.binary.version}</artifactId>
<version>${hudi.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>

<!-- For test -->
<dependency>
<groupId>org.apache.gluten</groupId>
<artifactId>gluten-core</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.gluten</groupId>
<artifactId>backends-velox</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.gluten</groupId>
<artifactId>backends-velox</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<resources>
<resource>
<directory>${resource.dir}</directory>
</resource>
</resources>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.scalastyle</groupId>
<artifactId>scalastyle-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
</plugin>
<plugin>
<groupId>com.diffplug.spotless</groupId>
<artifactId>spotless-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<configuration>
<junitxml>.</junitxml>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<id>prepare-test-jar</id>
<phase>test-compile</phase>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.gluten.execution.HudiScanTransformerProvider
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution

import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.collection.BitSet

case class HudiScanTransformer(
@transient override val relation: HadoopFsRelation,
override val output: Seq[Attribute],
override val requiredSchema: StructType,
override val partitionFilters: Seq[Expression],
override val optionalBucketSet: Option[BitSet],
override val optionalNumCoalescedBuckets: Option[Int],
override val dataFilters: Seq[Expression],
override val tableIdentifier: Option[TableIdentifier],
override val disableBucketedScan: Boolean = false)
extends FileSourceScanExecTransformerBase(
relation,
output,
requiredSchema,
partitionFilters,
optionalBucketSet,
optionalNumCoalescedBuckets,
dataFilters,
tableIdentifier,
disableBucketedScan
) {

override lazy val fileFormat: ReadFileFormat = ReadFileFormat.ParquetReadFormat

override protected def doValidateInternal(): ValidationResult = {
if (!requiredSchema.fields.exists(_.name == "_hoodie_record_key")) {
return ValidationResult.notOk(s"Hudi meta field not present.")
}

super.doValidateInternal()
}

override def doCanonicalize(): HudiScanTransformer = {
HudiScanTransformer(
relation,
output.map(QueryPlan.normalizeExpressions(_, output)),
requiredSchema,
QueryPlan.normalizePredicates(
filterUnusedDynamicPruningExpressions(partitionFilters),
output),
optionalBucketSet,
optionalNumCoalescedBuckets,
QueryPlan.normalizePredicates(dataFilters, output),
None,
disableBucketedScan
)
}
}

object HudiScanTransformer {

def apply(
scanExec: FileSourceScanExec,
newPartitionFilters: Seq[Expression]): HudiScanTransformer = {
new HudiScanTransformer(
scanExec.relation,
scanExec.output,
scanExec.requiredSchema,
newPartitionFilters,
scanExec.optionalBucketSet,
scanExec.optionalNumCoalescedBuckets,
scanExec.dataFilters,
scanExec.tableIdentifier,
scanExec.disableBucketedScan
)
}

}
Loading

0 comments on commit db6f75a

Please sign in to comment.