Skip to content

Commit

Permalink
[GLUTEN-4964][CORE]Fallback complex data type in parquet write for Sp…
Browse files Browse the repository at this point in the history
…ark32 & Spark33 (#5107)
  • Loading branch information
JkSelf authored Mar 28, 2024
1 parent a311ff8 commit 4456161
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,17 @@ object BackendSettings extends BackendSettingsApi {
}
}

override def supportNativeWrite(fields: Array[StructField]): Boolean = {
fields.map {
field =>
field.dataType match {
case _: TimestampType | _: StructType | _: ArrayType | _: MapType => return false
case _ =>
}
}
true
}

override def supportNativeMetadataColumns(): Boolean = true

override def supportExpandExec(): Boolean = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,20 @@ class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite {
super.sparkConf.set("spark.gluten.sql.native.writer.enabled", "true")
}

test("test Array(Struct) fallback") {
withTempPath {
f =>
val path = f.getCanonicalPath
val testAppender = new LogAppender("native write tracker")
withLogAppender(testAppender) {
spark.sql("select array(struct(1), null) as var1").write.mode("overwrite").save(path)
}
assert(
testAppender.loggingEvents.exists(
_.getMessage.toString.contains("Use Gluten parquet write for hive")) == false)
}
}

test("test write parquet with compression codec") {
// compression codec details see `VeloxParquetDatasource.cc`
Seq("snappy", "gzip", "zstd", "lz4", "none", "uncompressed")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ trait BackendSettingsApi {
fields: Array[StructField],
bucketSpec: Option[BucketSpec],
options: Map[String, String]): ValidationResult = ValidationResult.ok
def supportNativeWrite(fields: Array[StructField]): Boolean = true
def supportNativeMetadataColumns(): Boolean = false
def supportExpandExec(): Boolean = false
def supportSortExec(): Boolean = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ object GlutenWriterColumnarRules {
if write.getClass.getName == NOOP_WRITE &&
BackendsApiManager.getSettings.enableNativeWriteFiles() =>
injectFakeRowAdaptor(rc, rc.child)
case rc @ DataWritingCommandExec(cmd, child) =>
case rc @ DataWritingCommandExec(cmd, child)
if BackendsApiManager.getSettings.supportNativeWrite(child.output.toStructType.fields) =>
val format = getNativeFormat(cmd)
session.sparkContext.setLocalProperty(
"staticPartitionWriteOnly",
Expand Down

0 comments on commit 4456161

Please sign in to comment.