Skip to content

Commit

Permalink
Merge branch 'main' into feature/fix-6091
Browse files Browse the repository at this point in the history
  • Loading branch information
baibaichen authored Jun 14, 2024
2 parents 8c3a1bb + 284b304 commit 9e5ad48
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
import org.apache.spark.sql.execution.datasources.{FileFormat, InsertIntoHadoopFsRelationCommand}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.hive.execution.HiveFileFormat
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -182,6 +183,30 @@ object VeloxBackendSettings extends BackendSettingsApi {
bucketSpec: Option[BucketSpec],
options: Map[String, String]): ValidationResult = {

// Validate if HiveFileFormat write is supported based on output file type
def validateHiveFileFormat(hiveFileFormat: HiveFileFormat): Option[String] = {
// Reflect to get access to fileSinkConf which contains the output file format
val fileSinkConfField = format.getClass.getDeclaredField("fileSinkConf")
fileSinkConfField.setAccessible(true)
val fileSinkConf = fileSinkConfField.get(hiveFileFormat)
val tableInfoField = fileSinkConf.getClass.getDeclaredField("tableInfo")
tableInfoField.setAccessible(true)
val tableInfo = tableInfoField.get(fileSinkConf)
val getOutputFileFormatClassNameMethod = tableInfo.getClass
.getDeclaredMethod("getOutputFileFormatClassName")
val outputFileFormatClassName = getOutputFileFormatClassNameMethod.invoke(tableInfo)

// Match based on the output file format class name
outputFileFormatClassName match {
case "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat" =>
None
case _ =>
Some(
"HiveFileFormat is supported only with Parquet as the output file type"
) // Unsupported format
}
}

def validateCompressionCodec(): Option[String] = {
// Velox doesn't support brotli and lzo.
val unSupportedCompressions = Set("brotli", "lzo", "lz4raw", "lz4_raw")
Expand All @@ -194,7 +219,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
}

// Validate if all types are supported.
def validateDateTypes(): Option[String] = {
def validateDataTypes(): Option[String] = {
val unsupportedTypes = fields.flatMap {
field =>
field.dataType match {
Expand Down Expand Up @@ -222,8 +247,13 @@ object VeloxBackendSettings extends BackendSettingsApi {

def validateFileFormat(): Option[String] = {
format match {
case _: ParquetFileFormat => None
case _: FileFormat => Some("Only parquet fileformat is supported in Velox backend.")
case _: ParquetFileFormat => None // Parquet is directly supported
case h: HiveFileFormat if GlutenConfig.getConf.enableHiveFileFormatWriter =>
validateHiveFileFormat(h) // Parquet via Hive SerDe
case _ =>
Some(
"Only ParquetFileFormat and HiveFileFormat are supported."
) // Unsupported format
}
}

Expand All @@ -250,7 +280,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
validateCompressionCodec()
.orElse(validateFileFormat())
.orElse(validateFieldMetadata())
.orElse(validateDateTypes())
.orElse(validateDataTypes())
.orElse(validateWriteFilesOptions())
.orElse(validateBucketSpec()) match {
case Some(reason) => ValidationResult.notOk(reason)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,7 @@ class VeloxParquetWriteForHiveSuite extends GlutenQueryTest with SQLTestUtils {
withTable("t") {
spark.sql("CREATE TABLE t (c int) STORED AS PARQUET")
withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "false") {
if (isSparkVersionGE("3.4")) {
checkNativeWrite("INSERT OVERWRITE TABLE t SELECT 1 as c", checkNative = false)
} else {
checkNativeWrite("INSERT OVERWRITE TABLE t SELECT 1 as c", checkNative = true)
}
checkNativeWrite("INSERT OVERWRITE TABLE t SELECT 1 as c", checkNative = true)
}
checkAnswer(spark.table("t"), Row(1))
}
Expand Down
4 changes: 4 additions & 0 deletions docs/velox-backend-limitations.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ spark.range(100).toDF("id")
.saveAsTable("velox_ctas")
```

#### HiveFileFormat write

Gluten supports writes of HiveFileFormat when the output file type is of type `parquet` only

#### NaN support
Velox does NOT support NaN. So unexpected result can be obtained for a few cases, e.g., comparing a number with NaN.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,44 @@
*/
package org.apache.gluten.extension.columnar

import org.apache.gluten.GlutenConfig
import org.apache.gluten.metrics.GlutenTimeMetric
import org.apache.gluten.utils.LogLevelUtil

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
import org.apache.spark.sql.execution.SparkPlan

trait ColumnarRuleApplier {
def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan
}

object ColumnarRuleApplier {
class Executor(phase: String, rules: Seq[Rule[SparkPlan]]) extends RuleExecutor[SparkPlan] {
private val batch: Batch =
Batch(s"Columnar (Phase [$phase])", Once, rules.map(r => new LoggedRule(r)): _*)

// TODO Remove this exclusion then pass Spark's idempotence check.
override protected val excludedOnceBatches: Set[String] = Set(batch.name)

override protected def batches: Seq[Batch] = List(batch)
}

private class LoggedRule(delegate: Rule[SparkPlan])
extends Rule[SparkPlan]
with Logging
with LogLevelUtil {
// Columnar plan change logging added since https://github.com/apache/incubator-gluten/pull/456.
private val transformPlanLogLevel = GlutenConfig.getConf.transformPlanLogLevel
override val ruleName: String = delegate.ruleName

override def apply(plan: SparkPlan): SparkPlan = GlutenTimeMetric.withMillisTime {
logOnLevel(
transformPlanLogLevel,
s"Preparing to apply rule $ruleName on plan:\n${plan.toString}")
val out = delegate.apply(plan)
logOnLevel(transformPlanLogLevel, s"Plan after applied rule $ruleName:\n${plan.toString}")
out
}(t => logOnLevel(transformPlanLogLevel, s"Applying rule $ruleName took $t ms."))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@ import org.apache.gluten.extension.columnar._
import org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow, RemoveTopmostColumnarToRow, RewriteSubqueryBroadcast}
import org.apache.gluten.extension.columnar.transition.{InsertTransitions, RemoveTransitions}
import org.apache.gluten.extension.columnar.util.AdaptiveContext
import org.apache.gluten.metrics.GlutenTimeMetric
import org.apache.gluten.utils.{LogLevelUtil, PhysicalPlanSelector}

import org.apache.spark.annotation.Experimental
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages, GlutenFallbackReporter, SparkPlan}
import org.apache.spark.util.SparkRuleUtil

Expand All @@ -47,41 +46,26 @@ class EnumeratedApplier(session: SparkSession)
with LogLevelUtil {
// An empirical value.
private val aqeStackTraceIndex = 16

private lazy val transformPlanLogLevel = GlutenConfig.getConf.transformPlanLogLevel
private lazy val planChangeLogger = new PlanChangeLogger[SparkPlan]()

private val adaptiveContext = AdaptiveContext(session, aqeStackTraceIndex)

override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan =
PhysicalPlanSelector.maybe(session, plan) {
val transformed = transformPlan(transformRules(outputsColumnar), plan, "transform")
val transformed =
transformPlan("transform", transformRules(outputsColumnar).map(_(session)), plan)
val postPlan = maybeAqe {
transformPlan(postRules(), transformed, "post")
transformPlan("post", postRules().map(_(session)), transformed)
}
val finalPlan = transformPlan(finalRules(), postPlan, "final")
val finalPlan = transformPlan("final", finalRules().map(_(session)), postPlan)
finalPlan
}

private def transformPlan(
getRules: List[SparkSession => Rule[SparkPlan]],
plan: SparkPlan,
step: String) = GlutenTimeMetric.withMillisTime {
logOnLevel(
transformPlanLogLevel,
s"${step}ColumnarTransitions preOverriden plan:\n${plan.toString}")
val overridden = getRules.foldLeft(plan) {
(p, getRule) =>
val rule = getRule(session)
val newPlan = rule(p)
planChangeLogger.logRule(rule.ruleName, p, newPlan)
newPlan
}
logOnLevel(
transformPlanLogLevel,
s"${step}ColumnarTransitions afterOverriden plan:\n${overridden.toString}")
overridden
}(t => logOnLevel(transformPlanLogLevel, s"${step}Transform SparkPlan took: $t ms."))
phase: String,
rules: Seq[Rule[SparkPlan]],
plan: SparkPlan): SparkPlan = {
val executor = new ColumnarRuleApplier.Executor(phase, rules)
executor.execute(plan)
}

private def maybeAqe[T](f: => T): T = {
adaptiveContext.setAdaptiveContext()
Expand All @@ -96,7 +80,7 @@ class EnumeratedApplier(session: SparkSession)
* Rules to let planner create a suggested Gluten plan being sent to `fallbackPolicies` in which
* the plan will be breakdown and decided to be fallen back or not.
*/
private def transformRules(outputsColumnar: Boolean): List[SparkSession => Rule[SparkPlan]] = {
private def transformRules(outputsColumnar: Boolean): Seq[SparkSession => Rule[SparkPlan]] = {
List(
(_: SparkSession) => RemoveTransitions,
(spark: SparkSession) => FallbackOnANSIMode(spark),
Expand Down Expand Up @@ -126,7 +110,7 @@ class EnumeratedApplier(session: SparkSession)
* Rules applying to non-fallen-back Gluten plans. To do some post cleanup works on the plan to
* make sure it be able to run and be compatible with Spark's execution engine.
*/
private def postRules(): List[SparkSession => Rule[SparkPlan]] =
private def postRules(): Seq[SparkSession => Rule[SparkPlan]] =
List(
(s: SparkSession) => RemoveTopmostColumnarToRow(s, adaptiveContext.isAdaptiveContext())) :::
BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarPostRules() :::
Expand All @@ -137,7 +121,7 @@ class EnumeratedApplier(session: SparkSession)
* Rules consistently applying to all input plans after all other rules have been applied, despite
* whether the input plan is fallen back or not.
*/
private def finalRules(): List[SparkSession => Rule[SparkPlan]] = {
private def finalRules(): Seq[SparkSession => Rule[SparkPlan]] = {
List(
// The rule is required despite whether the stage is fallen back or not. Since
// ColumnarCachedBatchSerializer is statically registered to Spark without a columnar rule
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@ import org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTable
import org.apache.gluten.extension.columnar.rewrite.RewriteSparkPlanRulesManager
import org.apache.gluten.extension.columnar.transition.{InsertTransitions, RemoveTransitions}
import org.apache.gluten.extension.columnar.util.AdaptiveContext
import org.apache.gluten.metrics.GlutenTimeMetric
import org.apache.gluten.utils.{LogLevelUtil, PhysicalPlanSelector}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages, GlutenFallbackReporter, SparkPlan}
import org.apache.spark.util.SparkRuleUtil

Expand All @@ -42,54 +41,39 @@ class HeuristicApplier(session: SparkSession)
with LogLevelUtil {
// This is an empirical value, may need to be changed for supporting other versions of spark.
private val aqeStackTraceIndex = 19

private lazy val transformPlanLogLevel = GlutenConfig.getConf.transformPlanLogLevel
private lazy val planChangeLogger = new PlanChangeLogger[SparkPlan]()

private val adaptiveContext = AdaptiveContext(session, aqeStackTraceIndex)

override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan =
override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = {
withTransformRules(transformRules(outputsColumnar)).apply(plan)
}

// Visible for testing.
def withTransformRules(transformRules: List[SparkSession => Rule[SparkPlan]]): Rule[SparkPlan] =
def withTransformRules(transformRules: Seq[SparkSession => Rule[SparkPlan]]): Rule[SparkPlan] =
plan =>
PhysicalPlanSelector.maybe(session, plan) {
val finalPlan = prepareFallback(plan) {
p =>
val suggestedPlan = transformPlan(transformRules, p, "transform")
transformPlan(fallbackPolicies(), suggestedPlan, "fallback") match {
val suggestedPlan = transformPlan("transform", transformRules.map(_(session)), p)
transformPlan("fallback", fallbackPolicies().map(_(session)), suggestedPlan) match {
case FallbackNode(fallbackPlan) =>
// we should use vanilla c2r rather than native c2r,
// and there should be no `GlutenPlan` any more,
// so skip the `postRules()`.
fallbackPlan
case plan =>
transformPlan(postRules(), plan, "post")
transformPlan("post", postRules().map(_(session)), plan)
}
}
transformPlan(finalRules(), finalPlan, "final")
transformPlan("final", finalRules().map(_(session)), finalPlan)
}

private def transformPlan(
getRules: List[SparkSession => Rule[SparkPlan]],
plan: SparkPlan,
step: String) = GlutenTimeMetric.withMillisTime {
logOnLevel(
transformPlanLogLevel,
s"${step}ColumnarTransitions preOverridden plan:\n${plan.toString}")
val overridden = getRules.foldLeft(plan) {
(p, getRule) =>
val rule = getRule(session)
val newPlan = rule(p)
planChangeLogger.logRule(rule.ruleName, p, newPlan)
newPlan
}
logOnLevel(
transformPlanLogLevel,
s"${step}ColumnarTransitions afterOverridden plan:\n${overridden.toString}")
overridden
}(t => logOnLevel(transformPlanLogLevel, s"${step}Transform SparkPlan took: $t ms."))
phase: String,
rules: Seq[Rule[SparkPlan]],
plan: SparkPlan): SparkPlan = {
val executor = new ColumnarRuleApplier.Executor(phase, rules)
executor.execute(plan)
}

private def prepareFallback[T](plan: SparkPlan)(f: SparkPlan => T): T = {
adaptiveContext.setAdaptiveContext()
Expand All @@ -106,7 +90,7 @@ class HeuristicApplier(session: SparkSession)
* Rules to let planner create a suggested Gluten plan being sent to `fallbackPolicies` in which
* the plan will be breakdown and decided to be fallen back or not.
*/
private def transformRules(outputsColumnar: Boolean): List[SparkSession => Rule[SparkPlan]] = {
private def transformRules(outputsColumnar: Boolean): Seq[SparkSession => Rule[SparkPlan]] = {
List(
(_: SparkSession) => RemoveTransitions,
(spark: SparkSession) => FallbackOnANSIMode(spark),
Expand Down Expand Up @@ -138,7 +122,7 @@ class HeuristicApplier(session: SparkSession)
* Rules to add wrapper `FallbackNode`s on top of the input plan, as hints to make planner fall
* back the whole input plan to the original vanilla Spark plan.
*/
private def fallbackPolicies(): List[SparkSession => Rule[SparkPlan]] = {
private def fallbackPolicies(): Seq[SparkSession => Rule[SparkPlan]] = {
List(
(_: SparkSession) =>
ExpandFallbackPolicy(adaptiveContext.isAdaptiveContext(), adaptiveContext.originalPlan()))
Expand All @@ -148,7 +132,7 @@ class HeuristicApplier(session: SparkSession)
* Rules applying to non-fallen-back Gluten plans. To do some post cleanup works on the plan to
* make sure it be able to run and be compatible with Spark's execution engine.
*/
private def postRules(): List[SparkSession => Rule[SparkPlan]] =
private def postRules(): Seq[SparkSession => Rule[SparkPlan]] =
List(
(s: SparkSession) => RemoveTopmostColumnarToRow(s, adaptiveContext.isAdaptiveContext())) :::
BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarPostRules() :::
Expand All @@ -159,7 +143,7 @@ class HeuristicApplier(session: SparkSession)
* Rules consistently applying to all input plans after all other rules have been applied, despite
* whether the input plan is fallen back or not.
*/
private def finalRules(): List[SparkSession => Rule[SparkPlan]] = {
private def finalRules(): Seq[SparkSession => Rule[SparkPlan]] = {
List(
// The rule is required despite whether the stage is fallen back or not. Since
// ColumnarCachedBatchSerializer is statically registered to Spark without a columnar rule
Expand Down
12 changes: 12 additions & 0 deletions shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,8 @@ class GlutenConfig(conf: SQLConf) extends Logging {

def dynamicOffHeapSizingEnabled: Boolean =
conf.getConf(DYNAMIC_OFFHEAP_SIZING_ENABLED)

def enableHiveFileFormatWriter: Boolean = conf.getConf(NATIVE_HIVEFILEFORMAT_WRITER_ENABLED)
}

object GlutenConfig {
Expand Down Expand Up @@ -1578,6 +1580,16 @@ object GlutenConfig {
.booleanConf
.createOptional

val NATIVE_HIVEFILEFORMAT_WRITER_ENABLED =
buildConf("spark.gluten.sql.native.hive.writer.enabled")
.internal()
.doc(
"This is config to specify whether to enable the native columnar writer for " +
"HiveFileFormat. Currently only supports HiveFileFormat with Parquet as the output " +
"file type.")
.booleanConf
.createWithDefault(true)

val NATIVE_ARROW_READER_ENABLED =
buildConf("spark.gluten.sql.native.arrow.reader.enabled")
.internal()
Expand Down
Loading

0 comments on commit 9e5ad48

Please sign in to comment.