diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala index 6bc7df98cca2..158be10f486c 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.execution.aggregate.HashAggregateExec import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand import org.apache.spark.sql.execution.datasources.{FileFormat, InsertIntoHadoopFsRelationCommand} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.hive.execution.HiveFileFormat import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -182,6 +183,30 @@ object VeloxBackendSettings extends BackendSettingsApi { bucketSpec: Option[BucketSpec], options: Map[String, String]): ValidationResult = { + // Validate if HiveFileFormat write is supported based on output file type + def validateHiveFileFormat(hiveFileFormat: HiveFileFormat): Option[String] = { + // Reflect to get access to fileSinkConf which contains the output file format + val fileSinkConfField = format.getClass.getDeclaredField("fileSinkConf") + fileSinkConfField.setAccessible(true) + val fileSinkConf = fileSinkConfField.get(hiveFileFormat) + val tableInfoField = fileSinkConf.getClass.getDeclaredField("tableInfo") + tableInfoField.setAccessible(true) + val tableInfo = tableInfoField.get(fileSinkConf) + val getOutputFileFormatClassNameMethod = tableInfo.getClass + .getDeclaredMethod("getOutputFileFormatClassName") + val outputFileFormatClassName = getOutputFileFormatClassNameMethod.invoke(tableInfo) + + // Match based on the output file format class name + outputFileFormatClassName match { + case "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat" => + None + case _ => + Some( + "HiveFileFormat is supported only with Parquet as the output file type" + ) // Unsupported format + } + } + def validateCompressionCodec(): Option[String] = { // Velox doesn't support brotli and lzo. val unSupportedCompressions = Set("brotli", "lzo", "lz4raw", "lz4_raw") @@ -194,7 +219,7 @@ object VeloxBackendSettings extends BackendSettingsApi { } // Validate if all types are supported. - def validateDateTypes(): Option[String] = { + def validateDataTypes(): Option[String] = { val unsupportedTypes = fields.flatMap { field => field.dataType match { @@ -222,8 +247,13 @@ object VeloxBackendSettings extends BackendSettingsApi { def validateFileFormat(): Option[String] = { format match { - case _: ParquetFileFormat => None - case _: FileFormat => Some("Only parquet fileformat is supported in Velox backend.") + case _: ParquetFileFormat => None // Parquet is directly supported + case h: HiveFileFormat if GlutenConfig.getConf.enableHiveFileFormatWriter => + validateHiveFileFormat(h) // Parquet via Hive SerDe + case _ => + Some( + "Only ParquetFileFormat and HiveFileFormat are supported." + ) // Unsupported format } } @@ -250,7 +280,7 @@ object VeloxBackendSettings extends BackendSettingsApi { validateCompressionCodec() .orElse(validateFileFormat()) .orElse(validateFieldMetadata()) - .orElse(validateDateTypes()) + .orElse(validateDataTypes()) .orElse(validateWriteFilesOptions()) .orElse(validateBucketSpec()) match { case Some(reason) => ValidationResult.notOk(reason) diff --git a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala index 9597e3110a10..731f5ef4845c 100644 --- a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala +++ b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala @@ -139,11 +139,7 @@ class VeloxParquetWriteForHiveSuite extends GlutenQueryTest with SQLTestUtils { withTable("t") { spark.sql("CREATE TABLE t (c int) STORED AS PARQUET") withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "false") { - if (isSparkVersionGE("3.4")) { - checkNativeWrite("INSERT OVERWRITE TABLE t SELECT 1 as c", checkNative = false) - } else { - checkNativeWrite("INSERT OVERWRITE TABLE t SELECT 1 as c", checkNative = true) - } + checkNativeWrite("INSERT OVERWRITE TABLE t SELECT 1 as c", checkNative = true) } checkAnswer(spark.table("t"), Row(1)) } diff --git a/docs/velox-backend-limitations.md b/docs/velox-backend-limitations.md index 75b52f38e17a..002bbb3c3017 100644 --- a/docs/velox-backend-limitations.md +++ b/docs/velox-backend-limitations.md @@ -118,6 +118,10 @@ spark.range(100).toDF("id") .saveAsTable("velox_ctas") ``` +#### HiveFileFormat write + +Gluten supports writes of HiveFileFormat when the output file type is of type `parquet` only + #### NaN support Velox does NOT support NaN. So unexpected result can be obtained for a few cases, e.g., comparing a number with NaN. diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala index 13ad8e47113b..a4e5a4425e3b 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala @@ -438,6 +438,8 @@ class GlutenConfig(conf: SQLConf) extends Logging { def dynamicOffHeapSizingEnabled: Boolean = conf.getConf(DYNAMIC_OFFHEAP_SIZING_ENABLED) + + def enableHiveFileFormatWriter: Boolean = conf.getConf(NATIVE_HIVEFILEFORMAT_WRITER_ENABLED) } object GlutenConfig { @@ -1578,6 +1580,16 @@ object GlutenConfig { .booleanConf .createOptional + val NATIVE_HIVEFILEFORMAT_WRITER_ENABLED = + buildConf("spark.gluten.sql.native.hive.writer.enabled") + .internal() + .doc( + "This is config to specify whether to enable the native columnar writer for " + + "HiveFileFormat. Currently only supports HiveFileFormat with Parquet as the output " + + "file type.") + .booleanConf + .createWithDefault(true) + val NATIVE_ARROW_READER_ENABLED = buildConf("spark.gluten.sql.native.arrow.reader.enabled") .internal()