Skip to content

Commit

Permalink
[SPARK-47739][SQL] Register logical avro type
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
In this pull request I propose that we register logical avro types when we initialize `AvroUtils` and `AvroFileFormat`, otherwise for first schema discovery we might get wrong result on very first execution after spark starts.
<img width="1727" alt="image" src="https://github.com/apache/spark/assets/150366084/3eaba6e3-34ec-4ca9-ae89-d0259ce942ba">

example
```scala
val new_schema = """
     | {
     |   "type": "record",
     |   "name": "Entry",
     |   "fields": [
     |     {
     |       "name": "rate",
     |       "type": [
     |         "null",
     |         {
     |           "type": "long",
     |           "logicalType": "custom-decimal",
     |           "precision": 38,
     |           "scale": 9
     |         }
     |       ],
     |       "default": null
     |     }
     |   ]
     | }""".stripMargin
spark.read.format("avro").option("avroSchema", new_schema).load().printSchema // maps to long - WRONG
spark.read.format("avro").option("avroSchema", new_schema).load().printSchema // maps to Decimal - CORRECT
```

### Why are the changes needed?
To fix issue with resolving avro schema upon spark startup.

### Does this PR introduce _any_ user-facing change?
No, its a bugfix

### How was this patch tested?
Unit tests

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#45895 from milastdbx/dev/milast/fixAvroLogicalTypeRegistration.

Lead-authored-by: milastdbx <[email protected]>
Co-authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
milastdbx and dongjoon-hyun committed Apr 16, 2024
1 parent b7a729b commit fa2e9c7
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ import org.apache.spark.util.SerializableConfiguration
private[sql] class AvroFileFormat extends FileFormat
with DataSourceRegister with Logging with Serializable {

AvroFileFormat.registerCustomAvroTypes()

override def equals(other: Any): Boolean = other match {
case _: AvroFileFormat => true
case _ => false
Expand Down Expand Up @@ -173,10 +175,17 @@ private[sql] class AvroFileFormat extends FileFormat
private[avro] object AvroFileFormat {
val IgnoreFilesWithoutExtensionProperty = "avro.mapred.ignore.inputs.without.extension"

// Register the customized decimal type backed by long.
LogicalTypes.register(CustomDecimal.TYPE_NAME, new LogicalTypes.LogicalTypeFactory {
override def fromSchema(schema: Schema): LogicalType = {
new CustomDecimal(schema)
}
})
/**
* Register Spark defined custom Avro types.
*/
def registerCustomAvroTypes(): Unit = {
// Register the customized decimal type backed by long.
LogicalTypes.register(CustomDecimal.TYPE_NAME, new LogicalTypes.LogicalTypeFactory {
override def fromSchema(schema: Schema): LogicalType = {
new CustomDecimal(schema)
}
})
}

registerCustomAvroTypes()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.avro

import org.apache.spark.SparkConf
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.DecimalType

/**
* Test suite for Avro logical type initialization.
* Tests here must run in isolation, otherwise some other test might
* initialize variable and make this test flaky
*/
abstract class AvroLogicalTypeInitSuite
extends QueryTest
with SharedSparkSession {

test("SPARK-47739: custom logical type registration test") {
val avroTypeJson =
"""
|{
| "type": "record",
| "name": "Entry",
| "fields": [
| {
| "name": "test_col",
| "type": [
| "null",
| {
| "type": "long",
| "logicalType": "custom-decimal",
| "precision": 38,
| "scale": 9
| }
| ],
| "default": null
| }
| ]
|}
|
""".stripMargin

val df = spark.read.format("avro").option("avroSchema", avroTypeJson).load()
assert(df.schema.fields(0).dataType == DecimalType(38, 9))
}
}

class AvroV1LogicalTypeInitSuite extends AvroLogicalTypeInitSuite {
override protected def sparkConf: SparkConf =
super
.sparkConf
.set(SQLConf.USE_V1_SOURCE_LIST, "avro")
}

class AvroV2LogicalTypeInitSuite extends AvroLogicalTypeInitSuite {
override protected def sparkConf: SparkConf =
super
.sparkConf
.set(SQLConf.USE_V1_SOURCE_LIST, "")
}

0 comments on commit fa2e9c7

Please sign in to comment.