Skip to content

Commit

Permalink
[VL] Delta support / Hudi support as Gluten components (#8282)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer authored Dec 20, 2024
1 parent 4e0aab0 commit c2bf8f0
Show file tree
Hide file tree
Showing 24 changed files with 206 additions and 226 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.gluten.execution.WriteFilesExecTransformer
import org.apache.gluten.expression.WindowFunctionsBuilder
import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.extension.columnar.transition.{Convention, ConventionFunc}
import org.apache.gluten.substrait.rel.LocalFilesNode
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat._

Expand All @@ -34,6 +35,7 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.datasources.FileFormat
Expand Down Expand Up @@ -198,6 +200,27 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
}
}

override def getSubstraitReadFileFormatV1(
fileFormat: FileFormat): LocalFilesNode.ReadFileFormat = {
fileFormat.getClass.getSimpleName match {
case "OrcFileFormat" => ReadFileFormat.OrcReadFormat
case "ParquetFileFormat" => ReadFileFormat.ParquetReadFormat
case "DeltaParquetFileFormat" => ReadFileFormat.ParquetReadFormat
case "DeltaMergeTreeFileFormat" => ReadFileFormat.MergeTreeReadFormat
case "CSVFileFormat" => ReadFileFormat.TextReadFormat
case _ => ReadFileFormat.UnknownFormat
}
}

override def getSubstraitReadFileFormatV2(scan: Scan): LocalFilesNode.ReadFileFormat = {
scan.getClass.getSimpleName match {
case "OrcScan" => ReadFileFormat.OrcReadFormat
case "ParquetScan" => ReadFileFormat.ParquetReadFormat
case "ClickHouseScan" => ReadFileFormat.MergeTreeReadFormat
case _ => ReadFileFormat.UnknownFormat
}
}

override def supportWriteFilesExec(
format: FileFormat,
fields: Array[StructField],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ object CHRuleApi {

// Legacy: Post-transform rules.
injector.injectPostTransform(_ => PruneNestedColumnsInHiveTableScan)
injector.injectPostTransform(c => intercept(RewriteTransformer.apply(c.session)))
injector.injectPostTransform(_ => PushDownFilterToScan)
injector.injectPostTransform(_ => PushDownInputFileExpression.PostOffload)
injector.injectPostTransform(_ => EnsureLocalSortRequirements)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gluten.component

import org.apache.gluten.backendsapi.velox.VeloxBackend
import org.apache.gluten.execution.OffloadDeltaScan
import org.apache.gluten.extension.DeltaPostTransformRules
import org.apache.gluten.extension.columnar.enumerated.RasOffload
import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
import org.apache.gluten.extension.columnar.validator.Validators
import org.apache.gluten.extension.injector.Injector

import org.apache.spark.sql.execution.FileSourceScanExec

class VeloxDeltaComponent extends Component {
override def name(): String = "velox-delta"
override def buildInfo(): Component.BuildInfo =
Component.BuildInfo("VeloxDelta", "N/A", "N/A", "N/A")
override def dependencies(): Seq[Class[_ <: Component]] = classOf[VeloxBackend] :: Nil
override def injectRules(injector: Injector): Unit = {
val legacy = injector.gluten.legacy
val ras = injector.gluten.ras
legacy.injectTransform {
c =>
val offload = Seq(OffloadDeltaScan())
HeuristicTransform.Simple(Validators.newValidator(c.glutenConf, offload), offload)
}
ras.injectRasRule {
c =>
RasOffload.Rule(
RasOffload.from[FileSourceScanExec](OffloadDeltaScan()),
Validators.newValidator(c.glutenConf),
Nil)
}
DeltaPostTransformRules.rules.foreach {
r =>
legacy.injectPostTransform(_ => r)
ras.injectPostTransform(_ => r)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gluten.component

import org.apache.gluten.backendsapi.velox.VeloxBackend
import org.apache.gluten.execution.OffloadHudiScan
import org.apache.gluten.extension.columnar.enumerated.RasOffload
import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
import org.apache.gluten.extension.columnar.validator.Validators
import org.apache.gluten.extension.injector.Injector

import org.apache.spark.sql.execution.FileSourceScanExec

class VeloxHudiComponent extends Component {
override def name(): String = "velox-hudi"
override def buildInfo(): Component.BuildInfo =
Component.BuildInfo("VeloxHudi", "N/A", "N/A", "N/A")
override def dependencies(): Seq[Class[_ <: Component]] = classOf[VeloxBackend] :: Nil
override def injectRules(injector: Injector): Unit = {
val legacy = injector.gluten.legacy
val ras = injector.gluten.ras
legacy.injectTransform {
c =>
val offload = Seq(OffloadHudiScan())
HeuristicTransform.Simple(Validators.newValidator(c.glutenConf, offload), offload)
}
ras.injectRasRule {
c =>
RasOffload.Rule(
RasOffload.from[FileSourceScanExec](OffloadHudiScan()),
Validators.newValidator(c.glutenConf),
Nil)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.gluten.expression.WindowFunctionsBuilder
import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.extension.columnar.transition.{Convention, ConventionFunc}
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.rel.LocalFilesNode
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat.{DwrfReadFormat, OrcReadFormat, ParquetReadFormat}
import org.apache.gluten.utils._
Expand All @@ -36,6 +37,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, CumeDist, DenseRank, De
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, ApproximatePercentile, Percentile}
import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils}
import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution.{ColumnarCachedBatchSerializer, SparkPlan}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
Expand Down Expand Up @@ -173,7 +175,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
}
validateTypes(typeValidator)
}
case _ => Some(s"Unsupported file format for $format.")
case _ => Some(s"Unsupported file format $format.")
}
}

Expand All @@ -194,6 +196,26 @@ object VeloxBackendSettings extends BackendSettingsApi {
.toSeq
}

override def getSubstraitReadFileFormatV1(
fileFormat: FileFormat): LocalFilesNode.ReadFileFormat = {
fileFormat.getClass.getSimpleName match {
case "OrcFileFormat" => ReadFileFormat.OrcReadFormat
case "ParquetFileFormat" => ReadFileFormat.ParquetReadFormat
case "DwrfFileFormat" => ReadFileFormat.DwrfReadFormat
case "CSVFileFormat" => ReadFileFormat.TextReadFormat
case _ => ReadFileFormat.UnknownFormat
}
}

override def getSubstraitReadFileFormatV2(scan: Scan): LocalFilesNode.ReadFileFormat = {
scan.getClass.getSimpleName match {
case "OrcScan" => ReadFileFormat.OrcReadFormat
case "ParquetScan" => ReadFileFormat.ParquetReadFormat
case "DwrfScan" => ReadFileFormat.DwrfReadFormat
case _ => ReadFileFormat.UnknownFormat
}
}

override def supportWriteFilesExec(
format: FileFormat,
fields: Array[StructField],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ object VeloxRuleApi {
injector.injectPostTransform(_ => UnionTransformerRule())
injector.injectPostTransform(c => PartialProjectRule.apply(c.session))
injector.injectPostTransform(_ => RemoveNativeWriteFilesSortAndProject())
injector.injectPostTransform(c => RewriteTransformer.apply(c.session))
injector.injectPostTransform(_ => PushDownFilterToScan)
injector.injectPostTransform(_ => PushDownInputFileExpression.PostOffload)
injector.injectPostTransform(_ => EnsureLocalSortRequirements)
Expand Down Expand Up @@ -163,7 +162,6 @@ object VeloxRuleApi {
injector.injectPostTransform(_ => UnionTransformerRule())
injector.injectPostTransform(c => PartialProjectRule.apply(c.session))
injector.injectPostTransform(_ => RemoveNativeWriteFilesSortAndProject())
injector.injectPostTransform(c => RewriteTransformer.apply(c.session))
injector.injectPostTransform(_ => PushDownFilterToScan)
injector.injectPostTransform(_ => PushDownInputFileExpression.PostOffload)
injector.injectPostTransform(_ => EnsureLocalSortRequirements)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
/**
* A trivial memory consumer implementation used by Gluten.
*
* @deprecated Use {@link TreeMemoryConsumers#shared()} instead.
* @deprecated Use {@link TreeMemoryConsumers} instead.
*/
@Deprecated
public class RegularMemoryConsumer extends MemoryConsumer
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@
*/
package org.apache.gluten.execution

import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.gluten.extension.columnar.offload.OffloadSingleNode

class HudiScanTransformerProvider extends DataSourceScanTransformerRegister {
import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}

override val scanClassName: String = "HoodieParquetFileFormat"

override def createDataSourceTransformer(
batchScan: FileSourceScanExec): FileSourceScanExecTransformerBase = {
HudiScanTransformer(batchScan)
case class OffloadDeltaScan() extends OffloadSingleNode {
override def offload(plan: SparkPlan): SparkPlan = plan match {
case scan: FileSourceScanExec
if scan.relation.fileFormat.getClass.getName == "org.apache.spark.sql.delta.DeltaParquetFileFormat" =>
DeltaScanTransformer(scan)
case other => other
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
package org.apache.gluten.extension

import org.apache.gluten.execution.{DeltaFilterExecTransformer, DeltaProjectExecTransformer, DeltaScanTransformer, ProjectExecTransformer}
import org.apache.gluten.extension.DeltaRewriteTransformerRules.{columnMappingRule, filterRule, projectRule, pushDownInputFileExprRule}
import org.apache.gluten.extension.columnar.RewriteTransformerRules
import org.apache.gluten.extension.columnar.transition.RemoveTransitions

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Expression, InputFileBlockLength, InputFileBlockStart, InputFileName}
Expand All @@ -30,12 +29,10 @@ import org.apache.spark.sql.execution.datasources.FileFormat

import scala.collection.mutable.ListBuffer

class DeltaRewriteTransformerRules extends RewriteTransformerRules {
override def rules: Seq[Rule[SparkPlan]] =
columnMappingRule :: filterRule :: projectRule :: pushDownInputFileExprRule :: Nil
}

object DeltaRewriteTransformerRules {
object DeltaPostTransformRules {
def rules: Seq[Rule[SparkPlan]] =
RemoveTransitions :: columnMappingRule :: filterRule :: projectRule ::
pushDownInputFileExprRule :: Nil

private val COLUMN_MAPPING_RULE_TAG: TreeNodeTag[String] =
TreeNodeTag[String]("org.apache.gluten.delta.column.mapping")
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution

import org.apache.spark.sql.execution.FileSourceScanExec
package org.apache.gluten.execution

class DeltaScanTransformerProvider extends DataSourceScanTransformerRegister {
import org.apache.gluten.extension.columnar.offload.OffloadSingleNode

override val scanClassName: String = "org.apache.spark.sql.delta.DeltaParquetFileFormat"
import org.apache.spark.sql.execution.SparkPlan

override def createDataSourceTransformer(
batchScan: FileSourceScanExec): FileSourceScanExecTransformerBase = {
DeltaScanTransformer(batchScan)
/** Since https://github.com/apache/incubator-gluten/pull/6049. */
case class OffloadHudiScan() extends OffloadSingleNode {
override def offload(plan: SparkPlan): SparkPlan = {
plan match {
// Hudi has multiple file format definitions whose names end with "HoodieParquetFileFormat".
case scan: org.apache.spark.sql.execution.FileSourceScanExec
if scan.relation.fileFormat.getClass.getName.endsWith("HoodieParquetFileFormat") =>
HudiScanTransformer(scan)
case other => other
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ package org.apache.gluten.backendsapi
import org.apache.gluten.GlutenConfig
import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.gluten.substrait.rel.LocalFilesNode
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat

import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
import org.apache.spark.sql.execution.datasources.{FileFormat, InsertIntoHadoopFsRelationCommand}
import org.apache.spark.sql.types.StructField
Expand All @@ -39,6 +41,10 @@ trait BackendSettingsApi {
rootPaths: Seq[String],
properties: Map[String, String]): ValidationResult = ValidationResult.succeeded

def getSubstraitReadFileFormatV1(fileFormat: FileFormat): LocalFilesNode.ReadFileFormat

def getSubstraitReadFileFormatV2(scan: Scan): LocalFilesNode.ReadFileFormat

def supportWriteFilesExec(
format: FileFormat,
fields: Array[StructField],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,8 @@ abstract class BatchScanExecTransformerBase(
@transient protected lazy val filteredFlattenPartitions: Seq[InputPartition] =
filteredPartitions.flatten

@transient override lazy val fileFormat: ReadFileFormat = scan.getClass.getSimpleName match {
case "OrcScan" => ReadFileFormat.OrcReadFormat
case "ParquetScan" => ReadFileFormat.ParquetReadFormat
case "DwrfScan" => ReadFileFormat.DwrfReadFormat
case "ClickHouseScan" => ReadFileFormat.MergeTreeReadFormat
case _ => ReadFileFormat.UnknownFormat
}
@transient override lazy val fileFormat: ReadFileFormat =
BackendsApiManager.getSettings.getSubstraitReadFileFormatV2(scan)

override def simpleString(maxFields: Int): String = {
val truncatedOutputString = truncatedString(output, "[", ", ", "]", maxFields)
Expand Down
Loading

0 comments on commit c2bf8f0

Please sign in to comment.