From 1cafa2826e428479c0ccca94d7f16470ff5eea8a Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 14 Jun 2024 16:45:31 +0800 Subject: [PATCH 1/2] [VL] Minor refactors on ColumnarRuleApplier (#6086) --- .../columnar/ColumnarRuleApplier.scala | 36 +++++++++++++ .../enumerated/EnumeratedApplier.scala | 44 +++++----------- .../columnar/heuristic/HeuristicApplier.scala | 52 +++++++------------ .../apache/spark/sql/SparkQueryRunner.scala | 39 ++++++++------ 4 files changed, 91 insertions(+), 80 deletions(-) diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala index 17bf017305f2..ee5bcd883e7e 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala @@ -16,8 +16,44 @@ */ package org.apache.gluten.extension.columnar +import org.apache.gluten.GlutenConfig +import org.apache.gluten.metrics.GlutenTimeMetric +import org.apache.gluten.utils.LogLevelUtil + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor} import org.apache.spark.sql.execution.SparkPlan trait ColumnarRuleApplier { def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan } + +object ColumnarRuleApplier { + class Executor(phase: String, rules: Seq[Rule[SparkPlan]]) extends RuleExecutor[SparkPlan] { + private val batch: Batch = + Batch(s"Columnar (Phase [$phase])", Once, rules.map(r => new LoggedRule(r)): _*) + + // TODO Remove this exclusion then pass Spark's idempotence check. + override protected val excludedOnceBatches: Set[String] = Set(batch.name) + + override protected def batches: Seq[Batch] = List(batch) + } + + private class LoggedRule(delegate: Rule[SparkPlan]) + extends Rule[SparkPlan] + with Logging + with LogLevelUtil { + // Columnar plan change logging added since https://github.com/apache/incubator-gluten/pull/456. + private val transformPlanLogLevel = GlutenConfig.getConf.transformPlanLogLevel + override val ruleName: String = delegate.ruleName + + override def apply(plan: SparkPlan): SparkPlan = GlutenTimeMetric.withMillisTime { + logOnLevel( + transformPlanLogLevel, + s"Preparing to apply rule $ruleName on plan:\n${plan.toString}") + val out = delegate.apply(plan) + logOnLevel(transformPlanLogLevel, s"Plan after applied rule $ruleName:\n${plan.toString}") + out + }(t => logOnLevel(transformPlanLogLevel, s"Applying rule $ruleName took $t ms.")) + } +} diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala index 26201dc1baa3..d5260f66adba 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala @@ -22,13 +22,12 @@ import org.apache.gluten.extension.columnar._ import org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow, RemoveTopmostColumnarToRow, RewriteSubqueryBroadcast} import org.apache.gluten.extension.columnar.transition.{InsertTransitions, RemoveTransitions} import org.apache.gluten.extension.columnar.util.AdaptiveContext -import org.apache.gluten.metrics.GlutenTimeMetric import org.apache.gluten.utils.{LogLevelUtil, PhysicalPlanSelector} import org.apache.spark.annotation.Experimental import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule} +import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages, GlutenFallbackReporter, SparkPlan} import org.apache.spark.util.SparkRuleUtil @@ -47,41 +46,26 @@ class EnumeratedApplier(session: SparkSession) with LogLevelUtil { // An empirical value. private val aqeStackTraceIndex = 16 - - private lazy val transformPlanLogLevel = GlutenConfig.getConf.transformPlanLogLevel - private lazy val planChangeLogger = new PlanChangeLogger[SparkPlan]() - private val adaptiveContext = AdaptiveContext(session, aqeStackTraceIndex) override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = PhysicalPlanSelector.maybe(session, plan) { - val transformed = transformPlan(transformRules(outputsColumnar), plan, "transform") + val transformed = + transformPlan("transform", transformRules(outputsColumnar).map(_(session)), plan) val postPlan = maybeAqe { - transformPlan(postRules(), transformed, "post") + transformPlan("post", postRules().map(_(session)), transformed) } - val finalPlan = transformPlan(finalRules(), postPlan, "final") + val finalPlan = transformPlan("final", finalRules().map(_(session)), postPlan) finalPlan } private def transformPlan( - getRules: List[SparkSession => Rule[SparkPlan]], - plan: SparkPlan, - step: String) = GlutenTimeMetric.withMillisTime { - logOnLevel( - transformPlanLogLevel, - s"${step}ColumnarTransitions preOverriden plan:\n${plan.toString}") - val overridden = getRules.foldLeft(plan) { - (p, getRule) => - val rule = getRule(session) - val newPlan = rule(p) - planChangeLogger.logRule(rule.ruleName, p, newPlan) - newPlan - } - logOnLevel( - transformPlanLogLevel, - s"${step}ColumnarTransitions afterOverriden plan:\n${overridden.toString}") - overridden - }(t => logOnLevel(transformPlanLogLevel, s"${step}Transform SparkPlan took: $t ms.")) + phase: String, + rules: Seq[Rule[SparkPlan]], + plan: SparkPlan): SparkPlan = { + val executor = new ColumnarRuleApplier.Executor(phase, rules) + executor.execute(plan) + } private def maybeAqe[T](f: => T): T = { adaptiveContext.setAdaptiveContext() @@ -96,7 +80,7 @@ class EnumeratedApplier(session: SparkSession) * Rules to let planner create a suggested Gluten plan being sent to `fallbackPolicies` in which * the plan will be breakdown and decided to be fallen back or not. */ - private def transformRules(outputsColumnar: Boolean): List[SparkSession => Rule[SparkPlan]] = { + private def transformRules(outputsColumnar: Boolean): Seq[SparkSession => Rule[SparkPlan]] = { List( (_: SparkSession) => RemoveTransitions, (spark: SparkSession) => FallbackOnANSIMode(spark), @@ -126,7 +110,7 @@ class EnumeratedApplier(session: SparkSession) * Rules applying to non-fallen-back Gluten plans. To do some post cleanup works on the plan to * make sure it be able to run and be compatible with Spark's execution engine. */ - private def postRules(): List[SparkSession => Rule[SparkPlan]] = + private def postRules(): Seq[SparkSession => Rule[SparkPlan]] = List( (s: SparkSession) => RemoveTopmostColumnarToRow(s, adaptiveContext.isAdaptiveContext())) ::: BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarPostRules() ::: @@ -137,7 +121,7 @@ class EnumeratedApplier(session: SparkSession) * Rules consistently applying to all input plans after all other rules have been applied, despite * whether the input plan is fallen back or not. */ - private def finalRules(): List[SparkSession => Rule[SparkPlan]] = { + private def finalRules(): Seq[SparkSession => Rule[SparkPlan]] = { List( // The rule is required despite whether the stage is fallen back or not. Since // ColumnarCachedBatchSerializer is statically registered to Spark without a columnar rule diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala index eb5c561bfa8d..ad68786e6579 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala @@ -23,12 +23,11 @@ import org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTable import org.apache.gluten.extension.columnar.rewrite.RewriteSparkPlanRulesManager import org.apache.gluten.extension.columnar.transition.{InsertTransitions, RemoveTransitions} import org.apache.gluten.extension.columnar.util.AdaptiveContext -import org.apache.gluten.metrics.GlutenTimeMetric import org.apache.gluten.utils.{LogLevelUtil, PhysicalPlanSelector} import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule} +import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages, GlutenFallbackReporter, SparkPlan} import org.apache.spark.util.SparkRuleUtil @@ -42,54 +41,39 @@ class HeuristicApplier(session: SparkSession) with LogLevelUtil { // This is an empirical value, may need to be changed for supporting other versions of spark. private val aqeStackTraceIndex = 19 - - private lazy val transformPlanLogLevel = GlutenConfig.getConf.transformPlanLogLevel - private lazy val planChangeLogger = new PlanChangeLogger[SparkPlan]() - private val adaptiveContext = AdaptiveContext(session, aqeStackTraceIndex) - override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = + override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = { withTransformRules(transformRules(outputsColumnar)).apply(plan) + } // Visible for testing. - def withTransformRules(transformRules: List[SparkSession => Rule[SparkPlan]]): Rule[SparkPlan] = + def withTransformRules(transformRules: Seq[SparkSession => Rule[SparkPlan]]): Rule[SparkPlan] = plan => PhysicalPlanSelector.maybe(session, plan) { val finalPlan = prepareFallback(plan) { p => - val suggestedPlan = transformPlan(transformRules, p, "transform") - transformPlan(fallbackPolicies(), suggestedPlan, "fallback") match { + val suggestedPlan = transformPlan("transform", transformRules.map(_(session)), p) + transformPlan("fallback", fallbackPolicies().map(_(session)), suggestedPlan) match { case FallbackNode(fallbackPlan) => // we should use vanilla c2r rather than native c2r, // and there should be no `GlutenPlan` any more, // so skip the `postRules()`. fallbackPlan case plan => - transformPlan(postRules(), plan, "post") + transformPlan("post", postRules().map(_(session)), plan) } } - transformPlan(finalRules(), finalPlan, "final") + transformPlan("final", finalRules().map(_(session)), finalPlan) } private def transformPlan( - getRules: List[SparkSession => Rule[SparkPlan]], - plan: SparkPlan, - step: String) = GlutenTimeMetric.withMillisTime { - logOnLevel( - transformPlanLogLevel, - s"${step}ColumnarTransitions preOverridden plan:\n${plan.toString}") - val overridden = getRules.foldLeft(plan) { - (p, getRule) => - val rule = getRule(session) - val newPlan = rule(p) - planChangeLogger.logRule(rule.ruleName, p, newPlan) - newPlan - } - logOnLevel( - transformPlanLogLevel, - s"${step}ColumnarTransitions afterOverridden plan:\n${overridden.toString}") - overridden - }(t => logOnLevel(transformPlanLogLevel, s"${step}Transform SparkPlan took: $t ms.")) + phase: String, + rules: Seq[Rule[SparkPlan]], + plan: SparkPlan): SparkPlan = { + val executor = new ColumnarRuleApplier.Executor(phase, rules) + executor.execute(plan) + } private def prepareFallback[T](plan: SparkPlan)(f: SparkPlan => T): T = { adaptiveContext.setAdaptiveContext() @@ -106,7 +90,7 @@ class HeuristicApplier(session: SparkSession) * Rules to let planner create a suggested Gluten plan being sent to `fallbackPolicies` in which * the plan will be breakdown and decided to be fallen back or not. */ - private def transformRules(outputsColumnar: Boolean): List[SparkSession => Rule[SparkPlan]] = { + private def transformRules(outputsColumnar: Boolean): Seq[SparkSession => Rule[SparkPlan]] = { List( (_: SparkSession) => RemoveTransitions, (spark: SparkSession) => FallbackOnANSIMode(spark), @@ -138,7 +122,7 @@ class HeuristicApplier(session: SparkSession) * Rules to add wrapper `FallbackNode`s on top of the input plan, as hints to make planner fall * back the whole input plan to the original vanilla Spark plan. */ - private def fallbackPolicies(): List[SparkSession => Rule[SparkPlan]] = { + private def fallbackPolicies(): Seq[SparkSession => Rule[SparkPlan]] = { List( (_: SparkSession) => ExpandFallbackPolicy(adaptiveContext.isAdaptiveContext(), adaptiveContext.originalPlan())) @@ -148,7 +132,7 @@ class HeuristicApplier(session: SparkSession) * Rules applying to non-fallen-back Gluten plans. To do some post cleanup works on the plan to * make sure it be able to run and be compatible with Spark's execution engine. */ - private def postRules(): List[SparkSession => Rule[SparkPlan]] = + private def postRules(): Seq[SparkSession => Rule[SparkPlan]] = List( (s: SparkSession) => RemoveTopmostColumnarToRow(s, adaptiveContext.isAdaptiveContext())) ::: BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarPostRules() ::: @@ -159,7 +143,7 @@ class HeuristicApplier(session: SparkSession) * Rules consistently applying to all input plans after all other rules have been applied, despite * whether the input plan is fallen back or not. */ - private def finalRules(): List[SparkSession => Rule[SparkPlan]] = { + private def finalRules(): Seq[SparkSession => Rule[SparkPlan]] = { List( // The rule is required despite whether the stage is fallen back or not. Since // ColumnarCachedBatchSerializer is statically registered to Spark without a columnar rule diff --git a/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkQueryRunner.scala b/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkQueryRunner.scala index bb11a679f9eb..b68f74c1d5ed 100644 --- a/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkQueryRunner.scala +++ b/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkQueryRunner.scala @@ -18,13 +18,9 @@ package org.apache.spark.sql import org.apache.spark.{SparkContext, Success, TaskKilled} import org.apache.spark.executor.ExecutorMetrics -import org.apache.spark.scheduler.{ - SparkListener, - SparkListenerExecutorMetricsUpdate, - SparkListenerTaskEnd, - SparkListenerTaskStart -} +import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorMetricsUpdate, SparkListenerTaskEnd, SparkListenerTaskStart} import org.apache.spark.sql.KillTaskListener.INIT_WAIT_TIME_MS +import org.apache.spark.sql.catalyst.QueryPlanningTracker import com.google.common.base.Preconditions import org.apache.commons.lang3.RandomUtils @@ -50,7 +46,8 @@ object SparkQueryRunner { "ProcessTreePythonVMemory", "ProcessTreePythonRSSMemory", "ProcessTreeOtherVMemory", - "ProcessTreeOtherRSSMemory") + "ProcessTreeOtherRSSMemory" + ) def runQuery( spark: SparkSession, @@ -82,25 +79,33 @@ object SparkQueryRunner { println(s"Executing SQL query from resource path $queryPath...") try { + val tracker = new QueryPlanningTracker val sql = resourceToString(queryPath) val prev = System.nanoTime() val df = spark.sql(sql) - val rows = df.collect() + val rows = QueryPlanningTracker.withTracker(tracker) { + df.collect() + } if (explain) { df.explain(extended = true) } - val planMillis = - df.queryExecution.tracker.phases.values.map(p => p.endTimeMs - p.startTimeMs).sum + val sparkTracker = df.queryExecution.tracker + val sparkRulesMillis = + sparkTracker.rules.map(_._2.totalTimeNs).sum / 1000000L + val otherRulesMillis = + tracker.rules.map(_._2.totalTimeNs).sum / 1000000L + val planMillis = sparkRulesMillis + otherRulesMillis val totalMillis = (System.nanoTime() - prev) / 1000000L val collectedMetrics = metrics.map(name => (name, em.getMetricValue(name))).toMap RunResult(rows, planMillis, totalMillis - planMillis, collectedMetrics) } finally { sc.removeSparkListener(metricsListener) - killTaskListener.foreach(l => { - sc.removeSparkListener(l) - println(s"Successful kill rate ${"%.2f%%" - .format(100 * l.successfulKillRate())} during execution of app: ${sc.applicationId}") - }) + killTaskListener.foreach( + l => { + sc.removeSparkListener(l) + println(s"Successful kill rate ${"%.2f%%" + .format(100 * l.successfulKillRate())} during execution of app: ${sc.applicationId}") + }) sc.setJobDescription(null) } } @@ -166,7 +171,8 @@ class KillTaskListener(val sc: SparkContext) extends SparkListener { val total = Math.min( stageKillMaxWaitTimeLookup.computeIfAbsent(taskStart.stageId, _ => Long.MaxValue), stageKillWaitTimeLookup - .computeIfAbsent(taskStart.stageId, _ => INIT_WAIT_TIME_MS)) + .computeIfAbsent(taskStart.stageId, _ => INIT_WAIT_TIME_MS) + ) val elapsed = System.currentTimeMillis() - startMs val remaining = total - elapsed if (remaining <= 0L) { @@ -180,6 +186,7 @@ class KillTaskListener(val sc: SparkContext) extends SparkListener { } throw new IllegalStateException() } + val elapsed = wait() // We have 50% chance to kill the task. FIXME make it configurable? From 284b304a9aec28c506fbb69a3c8393125ff0bac2 Mon Sep 17 00:00:00 2001 From: Suraj Naik Date: Fri, 14 Jun 2024 14:19:19 +0530 Subject: [PATCH 2/2] [GLUTEN-6026][VL] Add Support for HiveFileFormat parquet write for Spark 3.4+ (#6062) --- .../backendsapi/velox/VeloxBackend.scala | 38 +++++++++++++++++-- .../VeloxParquetWriteForHiveSuite.scala | 6 +-- docs/velox-backend-limitations.md | 4 ++ .../org/apache/gluten/GlutenConfig.scala | 12 ++++++ 4 files changed, 51 insertions(+), 9 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala index 6bc7df98cca2..158be10f486c 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.execution.aggregate.HashAggregateExec import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand import org.apache.spark.sql.execution.datasources.{FileFormat, InsertIntoHadoopFsRelationCommand} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.hive.execution.HiveFileFormat import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -182,6 +183,30 @@ object VeloxBackendSettings extends BackendSettingsApi { bucketSpec: Option[BucketSpec], options: Map[String, String]): ValidationResult = { + // Validate if HiveFileFormat write is supported based on output file type + def validateHiveFileFormat(hiveFileFormat: HiveFileFormat): Option[String] = { + // Reflect to get access to fileSinkConf which contains the output file format + val fileSinkConfField = format.getClass.getDeclaredField("fileSinkConf") + fileSinkConfField.setAccessible(true) + val fileSinkConf = fileSinkConfField.get(hiveFileFormat) + val tableInfoField = fileSinkConf.getClass.getDeclaredField("tableInfo") + tableInfoField.setAccessible(true) + val tableInfo = tableInfoField.get(fileSinkConf) + val getOutputFileFormatClassNameMethod = tableInfo.getClass + .getDeclaredMethod("getOutputFileFormatClassName") + val outputFileFormatClassName = getOutputFileFormatClassNameMethod.invoke(tableInfo) + + // Match based on the output file format class name + outputFileFormatClassName match { + case "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat" => + None + case _ => + Some( + "HiveFileFormat is supported only with Parquet as the output file type" + ) // Unsupported format + } + } + def validateCompressionCodec(): Option[String] = { // Velox doesn't support brotli and lzo. val unSupportedCompressions = Set("brotli", "lzo", "lz4raw", "lz4_raw") @@ -194,7 +219,7 @@ object VeloxBackendSettings extends BackendSettingsApi { } // Validate if all types are supported. - def validateDateTypes(): Option[String] = { + def validateDataTypes(): Option[String] = { val unsupportedTypes = fields.flatMap { field => field.dataType match { @@ -222,8 +247,13 @@ object VeloxBackendSettings extends BackendSettingsApi { def validateFileFormat(): Option[String] = { format match { - case _: ParquetFileFormat => None - case _: FileFormat => Some("Only parquet fileformat is supported in Velox backend.") + case _: ParquetFileFormat => None // Parquet is directly supported + case h: HiveFileFormat if GlutenConfig.getConf.enableHiveFileFormatWriter => + validateHiveFileFormat(h) // Parquet via Hive SerDe + case _ => + Some( + "Only ParquetFileFormat and HiveFileFormat are supported." + ) // Unsupported format } } @@ -250,7 +280,7 @@ object VeloxBackendSettings extends BackendSettingsApi { validateCompressionCodec() .orElse(validateFileFormat()) .orElse(validateFieldMetadata()) - .orElse(validateDateTypes()) + .orElse(validateDataTypes()) .orElse(validateWriteFilesOptions()) .orElse(validateBucketSpec()) match { case Some(reason) => ValidationResult.notOk(reason) diff --git a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala index 9597e3110a10..731f5ef4845c 100644 --- a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala +++ b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala @@ -139,11 +139,7 @@ class VeloxParquetWriteForHiveSuite extends GlutenQueryTest with SQLTestUtils { withTable("t") { spark.sql("CREATE TABLE t (c int) STORED AS PARQUET") withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "false") { - if (isSparkVersionGE("3.4")) { - checkNativeWrite("INSERT OVERWRITE TABLE t SELECT 1 as c", checkNative = false) - } else { - checkNativeWrite("INSERT OVERWRITE TABLE t SELECT 1 as c", checkNative = true) - } + checkNativeWrite("INSERT OVERWRITE TABLE t SELECT 1 as c", checkNative = true) } checkAnswer(spark.table("t"), Row(1)) } diff --git a/docs/velox-backend-limitations.md b/docs/velox-backend-limitations.md index 75b52f38e17a..002bbb3c3017 100644 --- a/docs/velox-backend-limitations.md +++ b/docs/velox-backend-limitations.md @@ -118,6 +118,10 @@ spark.range(100).toDF("id") .saveAsTable("velox_ctas") ``` +#### HiveFileFormat write + +Gluten supports writes of HiveFileFormat when the output file type is of type `parquet` only + #### NaN support Velox does NOT support NaN. So unexpected result can be obtained for a few cases, e.g., comparing a number with NaN. diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala index 13ad8e47113b..a4e5a4425e3b 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala @@ -438,6 +438,8 @@ class GlutenConfig(conf: SQLConf) extends Logging { def dynamicOffHeapSizingEnabled: Boolean = conf.getConf(DYNAMIC_OFFHEAP_SIZING_ENABLED) + + def enableHiveFileFormatWriter: Boolean = conf.getConf(NATIVE_HIVEFILEFORMAT_WRITER_ENABLED) } object GlutenConfig { @@ -1578,6 +1580,16 @@ object GlutenConfig { .booleanConf .createOptional + val NATIVE_HIVEFILEFORMAT_WRITER_ENABLED = + buildConf("spark.gluten.sql.native.hive.writer.enabled") + .internal() + .doc( + "This is config to specify whether to enable the native columnar writer for " + + "HiveFileFormat. Currently only supports HiveFileFormat with Parquet as the output " + + "file type.") + .booleanConf + .createWithDefault(true) + val NATIVE_ARROW_READER_ENABLED = buildConf("spark.gluten.sql.native.arrow.reader.enabled") .internal()