diff --git a/src/it/scala/net/snowflake/spark/snowflake/ParquetSuite.scala b/src/it/scala/net/snowflake/spark/snowflake/ParquetSuite.scala index 81ffc29e..398ee41a 100644 --- a/src/it/scala/net/snowflake/spark/snowflake/ParquetSuite.scala +++ b/src/it/scala/net/snowflake/spark/snowflake/ParquetSuite.scala @@ -536,6 +536,41 @@ class ParquetSuite extends IntegrationSuiteBase { } } + test("null value in array") { + val data: RDD[Row] = sc.makeRDD( + List( + Row( + Array(null, "one", "two", "three"), + ), + Row( + Array("one", null, "two", "three"), + ) + ) + ) + + val schema = StructType(List( + StructField("ARRAY_STRING_FIELD", + ArrayType(StringType, containsNull = true), nullable = true))) + val df = sparkSession.createDataFrame(data, schema) + df.write + .format(SNOWFLAKE_SOURCE_NAME) + .options(connectorOptionsNoTable) + .option("dbtable", test_array_map) + .option(Parameters.PARAM_USE_PARQUET_IN_WRITE, "true") + .mode(SaveMode.Overwrite) + .save() + + + val res = sparkSession.read + .format(SNOWFLAKE_SOURCE_NAME) + .options(connectorOptionsNoTable) + .option("dbtable", test_array_map) + .schema(schema) + .load().collect() + assert(res.head.getSeq(0) == Seq("null", "one", "two", "three")) + assert(res(1).getSeq(0) == Seq("one", "null", "two", "three")) + } + test("test error when column map does not match") { jdbcUpdate(s"create or replace table $test_column_map_not_match (num int, str string)") // auto map diff --git a/src/main/scala/net/snowflake/spark/snowflake/io/CloudStorageOperations.scala b/src/main/scala/net/snowflake/spark/snowflake/io/CloudStorageOperations.scala index 18f4e2e1..23f31b93 100644 --- a/src/main/scala/net/snowflake/spark/snowflake/io/CloudStorageOperations.scala +++ b/src/main/scala/net/snowflake/spark/snowflake/io/CloudStorageOperations.scala @@ -46,6 +46,7 @@ import net.snowflake.spark.snowflake.test.{TestHook, TestHookFlag} import org.apache.avro.file.DataFileWriter import org.apache.avro.generic.{GenericData, GenericDatumWriter, GenericRecord} import org.apache.commons.io.IOUtils +import org.apache.hadoop.conf.Configuration import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.spark.{SparkContext, TaskContext} @@ -674,9 +675,12 @@ sealed trait CloudStorage { format match { case SupportedFormat.PARQUET => val rows = input.asInstanceOf[Iterator[GenericData.Record]].toSeq + val config = new Configuration() + config.setBoolean("parquet.avro.write-old-list-structure", false) val writer = AvroParquetWriter.builder[GenericData.Record]( new ParquetUtils.StreamOutputFile(uploadStream) ).withSchema(rows.head.getSchema) + .withConf(config) .withCompressionCodec(CompressionCodecName.SNAPPY) .build() rows.foreach(writer.write)