Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-6026][VL] Add Support for HiveFileFormat parquet write for Spark 3.4+ #6062

Merged
merged 16 commits into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
import org.apache.spark.sql.execution.datasources.{FileFormat, InsertIntoHadoopFsRelationCommand}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.hive.execution.HiveFileFormat
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -182,6 +183,29 @@ object VeloxBackendSettings extends BackendSettingsApi {
bucketSpec: Option[BucketSpec],
options: Map[String, String]): ValidationResult = {

// Validate if HiveFileFormat write is supported based on output file type
def validateHiveFileFormat(hiveFileFormat: HiveFileFormat): Option[String] = {
// Reflect to get access to fileSinkConf which contains the output file format
val fileSinkConfField = format.getClass.getDeclaredField("fileSinkConf")
fileSinkConfField.setAccessible(true)
val fileSinkConf = fileSinkConfField.get(hiveFileFormat)
val getTableInfoMethod = fileSinkConf.getClass.getDeclaredMethod("getTableInfo")
val tableInfo = getTableInfoMethod.invoke(fileSinkConf)
val getOutputFileFormatClassNameMethod = tableInfo.getClass
.getDeclaredMethod("getOutputFileFormatClassName")
val outputFileFormatClassName = getOutputFileFormatClassNameMethod.invoke(tableInfo)

// Match based on the output file format class name
outputFileFormatClassName match {
case "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat" =>
None
case _ =>
Some(
"HiveFileFormat is supported only with Parquet as the output file type"
) // Unsupported format
}
}

def validateCompressionCodec(): Option[String] = {
// Velox doesn't support brotli and lzo.
val unSupportedCompressions = Set("brotli", "lzo", "lz4raw", "lz4_raw")
Expand All @@ -194,7 +218,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
}

// Validate if all types are supported.
def validateDateTypes(): Option[String] = {
def validateDataTypes(): Option[String] = {
val unsupportedTypes = fields.flatMap {
field =>
field.dataType match {
Expand Down Expand Up @@ -222,8 +246,13 @@ object VeloxBackendSettings extends BackendSettingsApi {

def validateFileFormat(): Option[String] = {
format match {
case _: ParquetFileFormat => None
case _: FileFormat => Some("Only parquet fileformat is supported in Velox backend.")
case _: ParquetFileFormat => None // Parquet is directly supported
case h: HiveFileFormat if GlutenConfig.getConf.enableHiveFileFormatWriter =>
validateHiveFileFormat(h) // Parquet via Hive SerDe
case _ =>
Some(
"Only ParquetFileFormat and HiveFileFormat are supported."
) // Unsupported format
}
}

Expand All @@ -250,7 +279,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
validateCompressionCodec()
.orElse(validateFileFormat())
.orElse(validateFieldMetadata())
.orElse(validateDateTypes())
.orElse(validateDataTypes())
.orElse(validateWriteFilesOptions())
.orElse(validateBucketSpec()) match {
case Some(reason) => ValidationResult.notOk(reason)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,7 @@ class VeloxParquetWriteForHiveSuite extends GlutenQueryTest with SQLTestUtils {
withTable("t") {
spark.sql("CREATE TABLE t (c int) STORED AS PARQUET")
withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "false") {
if (isSparkVersionGE("3.4")) {
checkNativeWrite("INSERT OVERWRITE TABLE t SELECT 1 as c", checkNative = false)
} else {
checkNativeWrite("INSERT OVERWRITE TABLE t SELECT 1 as c", checkNative = true)
}
checkNativeWrite("INSERT OVERWRITE TABLE t SELECT 1 as c", checkNative = true)
}
checkAnswer(spark.table("t"), Row(1))
}
Expand Down
4 changes: 4 additions & 0 deletions docs/velox-backend-limitations.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ spark.range(100).toDF("id")
.saveAsTable("velox_ctas")
```

#### HiveFileFormat write

Gluten supports writes of HiveFileFormat when the output file type is of type `parquet` only

#### NaN support
Velox does NOT support NaN. So unexpected result can be obtained for a few cases, e.g., comparing a number with NaN.

Expand Down
12 changes: 12 additions & 0 deletions shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,8 @@ class GlutenConfig(conf: SQLConf) extends Logging {

def dynamicOffHeapSizingEnabled: Boolean =
conf.getConf(DYNAMIC_OFFHEAP_SIZING_ENABLED)

def enableHiveFileFormatWriter: Boolean = conf.getConf(NATIVE_HIVEFILEFORMAT_WRITER_ENABLED)
}

object GlutenConfig {
Expand Down Expand Up @@ -1563,6 +1565,16 @@ object GlutenConfig {
.booleanConf
.createOptional

val NATIVE_HIVEFILEFORMAT_WRITER_ENABLED =
buildConf("spark.gluten.sql.native.hive.writer.enabled")
.internal()
.doc(
"This is config to specify whether to enable the native columnar writer for " +
"HiveFileFormat. Currently only supports HiveFileFormat with Parquet as the output " +
"file type.")
.booleanConf
.createWithDefault(true)

val NATIVE_ARROW_READER_ENABLED =
buildConf("spark.gluten.sql.native.arrow.reader.enabled")
.internal()
Expand Down
Loading