Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-8356][VL] Delta support / Hudi support as Gluten components #8282

Merged
merged 11 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.gluten.execution.WriteFilesExecTransformer
import org.apache.gluten.expression.WindowFunctionsBuilder
import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.extension.columnar.transition.{Convention, ConventionFunc}
import org.apache.gluten.substrait.rel.LocalFilesNode
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat._

Expand All @@ -34,6 +35,7 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.datasources.FileFormat
Expand Down Expand Up @@ -198,6 +200,27 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
}
}

override def getSubstraitReadFileFormatV1(
fileFormat: FileFormat): LocalFilesNode.ReadFileFormat = {
fileFormat.getClass.getSimpleName match {
case "OrcFileFormat" => ReadFileFormat.OrcReadFormat
case "ParquetFileFormat" => ReadFileFormat.ParquetReadFormat
case "DeltaParquetFileFormat" => ReadFileFormat.ParquetReadFormat
case "DeltaMergeTreeFileFormat" => ReadFileFormat.MergeTreeReadFormat
case "CSVFileFormat" => ReadFileFormat.TextReadFormat
case _ => ReadFileFormat.UnknownFormat
}
}

override def getSubstraitReadFileFormatV2(scan: Scan): LocalFilesNode.ReadFileFormat = {
scan.getClass.getSimpleName match {
case "OrcScan" => ReadFileFormat.OrcReadFormat
case "ParquetScan" => ReadFileFormat.ParquetReadFormat
case "ClickHouseScan" => ReadFileFormat.MergeTreeReadFormat
case _ => ReadFileFormat.UnknownFormat
}
}

override def supportWriteFilesExec(
format: FileFormat,
fields: Array[StructField],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ object CHRuleApi {

// Legacy: Post-transform rules.
injector.injectPostTransform(_ => PruneNestedColumnsInHiveTableScan)
injector.injectPostTransform(c => intercept(RewriteTransformer.apply(c.session)))
injector.injectPostTransform(_ => PushDownFilterToScan)
injector.injectPostTransform(_ => PushDownInputFileExpression.PostOffload)
injector.injectPostTransform(_ => EnsureLocalSortRequirements)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gluten.component

import org.apache.gluten.backendsapi.velox.VeloxBackend
import org.apache.gluten.execution.OffloadDeltaScan
import org.apache.gluten.extension.DeltaPostTransformRules
import org.apache.gluten.extension.columnar.enumerated.RasOffload
import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
import org.apache.gluten.extension.columnar.validator.Validators
import org.apache.gluten.extension.injector.Injector

import org.apache.spark.sql.execution.FileSourceScanExec

class VeloxDeltaComponent extends Component {
override def name(): String = "velox-delta"
override def buildInfo(): Component.BuildInfo =
Component.BuildInfo("VeloxDelta", "N/A", "N/A", "N/A")
override def dependencies(): Seq[Class[_ <: Component]] = classOf[VeloxBackend] :: Nil
override def injectRules(injector: Injector): Unit = {
val legacy = injector.gluten.legacy
val ras = injector.gluten.ras
legacy.injectTransform {
c =>
val offload = Seq(OffloadDeltaScan())
HeuristicTransform.Simple(Validators.newValidator(c.glutenConf, offload), offload)
}
ras.injectRasRule {
c =>
RasOffload.Rule(
RasOffload.from[FileSourceScanExec](OffloadDeltaScan()),
Validators.newValidator(c.glutenConf),
Nil)
}
DeltaPostTransformRules.rules.foreach {
r =>
legacy.injectPostTransform(_ => r)
ras.injectPostTransform(_ => r)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gluten.component

import org.apache.gluten.backendsapi.velox.VeloxBackend
import org.apache.gluten.execution.OffloadHudiScan
import org.apache.gluten.extension.columnar.enumerated.RasOffload
import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
import org.apache.gluten.extension.columnar.validator.Validators
import org.apache.gluten.extension.injector.Injector

import org.apache.spark.sql.execution.FileSourceScanExec

class VeloxHudiComponent extends Component {
override def name(): String = "velox-hudi"
override def buildInfo(): Component.BuildInfo =
Component.BuildInfo("VeloxHudi", "N/A", "N/A", "N/A")
override def dependencies(): Seq[Class[_ <: Component]] = classOf[VeloxBackend] :: Nil
override def injectRules(injector: Injector): Unit = {
val legacy = injector.gluten.legacy
val ras = injector.gluten.ras
legacy.injectTransform {
c =>
val offload = Seq(OffloadHudiScan())
HeuristicTransform.Simple(Validators.newValidator(c.glutenConf, offload), offload)
}
ras.injectRasRule {
c =>
RasOffload.Rule(
RasOffload.from[FileSourceScanExec](OffloadHudiScan()),
Validators.newValidator(c.glutenConf),
Nil)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.gluten.expression.WindowFunctionsBuilder
import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.extension.columnar.transition.{Convention, ConventionFunc}
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.rel.LocalFilesNode
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat.{DwrfReadFormat, OrcReadFormat, ParquetReadFormat}
import org.apache.gluten.utils._
Expand All @@ -36,6 +37,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, CumeDist, DenseRank, De
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, ApproximatePercentile, Percentile}
import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils}
import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution.{ColumnarCachedBatchSerializer, SparkPlan}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
Expand Down Expand Up @@ -173,7 +175,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
}
validateTypes(typeValidator)
}
case _ => Some(s"Unsupported file format for $format.")
case _ => Some(s"Unsupported file format $format.")
}
}

Expand All @@ -194,6 +196,26 @@ object VeloxBackendSettings extends BackendSettingsApi {
.toSeq
}

override def getSubstraitReadFileFormatV1(
fileFormat: FileFormat): LocalFilesNode.ReadFileFormat = {
fileFormat.getClass.getSimpleName match {
case "OrcFileFormat" => ReadFileFormat.OrcReadFormat
case "ParquetFileFormat" => ReadFileFormat.ParquetReadFormat
case "DwrfFileFormat" => ReadFileFormat.DwrfReadFormat
case "CSVFileFormat" => ReadFileFormat.TextReadFormat
case _ => ReadFileFormat.UnknownFormat
}
}

override def getSubstraitReadFileFormatV2(scan: Scan): LocalFilesNode.ReadFileFormat = {
scan.getClass.getSimpleName match {
case "OrcScan" => ReadFileFormat.OrcReadFormat
case "ParquetScan" => ReadFileFormat.ParquetReadFormat
case "DwrfScan" => ReadFileFormat.DwrfReadFormat
case _ => ReadFileFormat.UnknownFormat
}
}

override def supportWriteFilesExec(
format: FileFormat,
fields: Array[StructField],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ object VeloxRuleApi {
injector.injectPostTransform(_ => UnionTransformerRule())
injector.injectPostTransform(c => PartialProjectRule.apply(c.session))
injector.injectPostTransform(_ => RemoveNativeWriteFilesSortAndProject())
injector.injectPostTransform(c => RewriteTransformer.apply(c.session))
injector.injectPostTransform(_ => PushDownFilterToScan)
injector.injectPostTransform(_ => PushDownInputFileExpression.PostOffload)
injector.injectPostTransform(_ => EnsureLocalSortRequirements)
Expand Down Expand Up @@ -163,7 +162,6 @@ object VeloxRuleApi {
injector.injectPostTransform(_ => UnionTransformerRule())
injector.injectPostTransform(c => PartialProjectRule.apply(c.session))
injector.injectPostTransform(_ => RemoveNativeWriteFilesSortAndProject())
injector.injectPostTransform(c => RewriteTransformer.apply(c.session))
injector.injectPostTransform(_ => PushDownFilterToScan)
injector.injectPostTransform(_ => PushDownInputFileExpression.PostOffload)
injector.injectPostTransform(_ => EnsureLocalSortRequirements)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
/**
* A trivial memory consumer implementation used by Gluten.
*
* @deprecated Use {@link TreeMemoryConsumers#shared()} instead.
* @deprecated Use {@link TreeMemoryConsumers} instead.
*/
@Deprecated
public class RegularMemoryConsumer extends MemoryConsumer
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@
*/
package org.apache.gluten.execution

import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.gluten.extension.columnar.offload.OffloadSingleNode

class HudiScanTransformerProvider extends DataSourceScanTransformerRegister {
import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}

override val scanClassName: String = "HoodieParquetFileFormat"

override def createDataSourceTransformer(
batchScan: FileSourceScanExec): FileSourceScanExecTransformerBase = {
HudiScanTransformer(batchScan)
case class OffloadDeltaScan() extends OffloadSingleNode {
override def offload(plan: SparkPlan): SparkPlan = plan match {
case scan: FileSourceScanExec
if scan.relation.fileFormat.getClass.getName == "org.apache.spark.sql.delta.DeltaParquetFileFormat" =>
DeltaScanTransformer(scan)
case other => other
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
package org.apache.gluten.extension

import org.apache.gluten.execution.{DeltaFilterExecTransformer, DeltaProjectExecTransformer, DeltaScanTransformer, ProjectExecTransformer}
import org.apache.gluten.extension.DeltaRewriteTransformerRules.{columnMappingRule, filterRule, projectRule, pushDownInputFileExprRule}
import org.apache.gluten.extension.columnar.RewriteTransformerRules
import org.apache.gluten.extension.columnar.transition.RemoveTransitions

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Expression, InputFileBlockLength, InputFileBlockStart, InputFileName}
Expand All @@ -30,12 +29,10 @@ import org.apache.spark.sql.execution.datasources.FileFormat

import scala.collection.mutable.ListBuffer

class DeltaRewriteTransformerRules extends RewriteTransformerRules {
override def rules: Seq[Rule[SparkPlan]] =
columnMappingRule :: filterRule :: projectRule :: pushDownInputFileExprRule :: Nil
}

object DeltaRewriteTransformerRules {
object DeltaPostTransformRules {
def rules: Seq[Rule[SparkPlan]] =
RemoveTransitions :: columnMappingRule :: filterRule :: projectRule ::
pushDownInputFileExprRule :: Nil

private val COLUMN_MAPPING_RULE_TAG: TreeNodeTag[String] =
TreeNodeTag[String]("org.apache.gluten.delta.column.mapping")
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution

import org.apache.spark.sql.execution.FileSourceScanExec
package org.apache.gluten.execution

class DeltaScanTransformerProvider extends DataSourceScanTransformerRegister {
import org.apache.gluten.extension.columnar.offload.OffloadSingleNode

override val scanClassName: String = "org.apache.spark.sql.delta.DeltaParquetFileFormat"
import org.apache.spark.sql.execution.SparkPlan

override def createDataSourceTransformer(
batchScan: FileSourceScanExec): FileSourceScanExecTransformerBase = {
DeltaScanTransformer(batchScan)
/** Since https://github.com/apache/incubator-gluten/pull/6049. */
case class OffloadHudiScan() extends OffloadSingleNode {
override def offload(plan: SparkPlan): SparkPlan = {
plan match {
// Hudi has multiple file format definitions whose names end with "HoodieParquetFileFormat".
case scan: org.apache.spark.sql.execution.FileSourceScanExec
if scan.relation.fileFormat.getClass.getName.endsWith("HoodieParquetFileFormat") =>
HudiScanTransformer(scan)
case other => other
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ package org.apache.gluten.backendsapi
import org.apache.gluten.GlutenConfig
import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.gluten.substrait.rel.LocalFilesNode
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat

import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
import org.apache.spark.sql.execution.datasources.{FileFormat, InsertIntoHadoopFsRelationCommand}
import org.apache.spark.sql.types.StructField
Expand All @@ -39,6 +41,10 @@ trait BackendSettingsApi {
rootPaths: Seq[String],
properties: Map[String, String]): ValidationResult = ValidationResult.succeeded

def getSubstraitReadFileFormatV1(fileFormat: FileFormat): LocalFilesNode.ReadFileFormat

def getSubstraitReadFileFormatV2(scan: Scan): LocalFilesNode.ReadFileFormat

def supportWriteFilesExec(
format: FileFormat,
fields: Array[StructField],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,8 @@ abstract class BatchScanExecTransformerBase(
@transient protected lazy val filteredFlattenPartitions: Seq[InputPartition] =
filteredPartitions.flatten

@transient override lazy val fileFormat: ReadFileFormat = scan.getClass.getSimpleName match {
case "OrcScan" => ReadFileFormat.OrcReadFormat
case "ParquetScan" => ReadFileFormat.ParquetReadFormat
case "DwrfScan" => ReadFileFormat.DwrfReadFormat
case "ClickHouseScan" => ReadFileFormat.MergeTreeReadFormat
case _ => ReadFileFormat.UnknownFormat
}
@transient override lazy val fileFormat: ReadFileFormat =
BackendsApiManager.getSettings.getSubstraitReadFileFormatV2(scan)

override def simpleString(maxFields: Int): String = {
val truncatedOutputString = truncatedString(output, "[", ", ", "]", maxFields)
Expand Down
Loading
Loading