Skip to content

Commit

Permalink
support null value in array
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-bli committed Oct 30, 2024
1 parent cc16590 commit 793aa21
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 0 deletions.
35 changes: 35 additions & 0 deletions src/it/scala/net/snowflake/spark/snowflake/ParquetSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,41 @@ class ParquetSuite extends IntegrationSuiteBase {
}
}

test("null value in array") {
val data: RDD[Row] = sc.makeRDD(
List(
Row(
Array(null, "one", "two", "three"),
),
Row(
Array("one", null, "two", "three"),
)
)
)

val schema = StructType(List(
StructField("ARRAY_STRING_FIELD",
ArrayType(StringType, containsNull = true), nullable = true)))
val df = sparkSession.createDataFrame(data, schema)
df.write
.format(SNOWFLAKE_SOURCE_NAME)
.options(connectorOptionsNoTable)
.option("dbtable", test_array_map)
.option(Parameters.PARAM_USE_PARQUET_IN_WRITE, "true")
.mode(SaveMode.Overwrite)
.save()


val res = sparkSession.read
.format(SNOWFLAKE_SOURCE_NAME)
.options(connectorOptionsNoTable)
.option("dbtable", test_array_map)
.schema(schema)
.load().collect()
assert(res.head.getSeq(0) == Seq("null", "one", "two", "three"))
assert(res(1).getSeq(0) == Seq("one", "null", "two", "three"))
}

test("test error when column map does not match") {
jdbcUpdate(s"create or replace table $test_column_map_not_match (num int, str string)")
// auto map
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import net.snowflake.spark.snowflake.test.{TestHook, TestHookFlag}
import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic.{GenericData, GenericDatumWriter, GenericRecord}
import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.spark.{SparkContext, TaskContext}
Expand Down Expand Up @@ -674,9 +675,12 @@ sealed trait CloudStorage {
format match {
case SupportedFormat.PARQUET =>
val rows = input.asInstanceOf[Iterator[GenericData.Record]].toSeq
val config = new Configuration()
config.setBoolean("parquet.avro.write-old-list-structure", false)
val writer = AvroParquetWriter.builder[GenericData.Record](
new ParquetUtils.StreamOutputFile(uploadStream)
).withSchema(rows.head.getSchema)
.withConf(config)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.build()
rows.foreach(writer.write)
Expand Down

0 comments on commit 793aa21

Please sign in to comment.