Skip to content

Commit

Permalink
Destination S3-V2: Parquet mappings propagate into converted unions
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt committed Jan 3, 2025
1 parent 044d1e0 commit b8f3c26
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,16 @@ open class AirbyteValueIdentityMapper : AirbyteValueMapper {
value: AirbyteValue,
schema: UnionType,
context: Context
): Pair<AirbyteValue, Context> = value to context
): Pair<AirbyteValue, Context> {
schema.options.forEach {
try {
return mapInner(value, it, context)
} catch (e: Exception) {
// ignore
}
}
return nulledOut(schema, context)
}

open fun mapBoolean(value: AirbyteValue, context: Context): Pair<AirbyteValue, Context> =
value to context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,28 +58,4 @@ class SchemalessValuesToJsonString : AirbyteValueIdentityMapper() {
value.toJson().serializeToString().let(::StringValue) to context
override fun mapUnknown(value: AirbyteValue, context: Context): Pair<AirbyteValue, Context> =
value.toJson().serializeToString().let(::StringValue) to context

override fun mapUnion(
value: AirbyteValue,
schema: UnionType,
context: Context
): Pair<AirbyteValue, Context> {
if (ObjectTypeWithEmptySchema in schema.options && value is ObjectValue) {
return mapObjectWithEmptySchema(value, ObjectTypeWithEmptySchema, context)
}

if (ObjectTypeWithoutSchema in schema.options && value is ObjectValue) {
return mapObjectWithoutSchema(value, ObjectTypeWithoutSchema, context)
}

if (ArrayTypeWithoutSchema in schema.options && value is ArrayValue) {
return mapArrayWithoutSchema(value, ArrayTypeWithoutSchema, context)
}

if (schema.options.any { it is UnknownType } && value is UnknownValue) {
return mapUnknown(value, context)
}

return super.mapUnion(value, schema, context)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ class UnionTypeToDisjointRecord : AirbyteSchemaIdentityMapper {
}
/* Create a schema of { "type": "string", "<typename(option1)>": <type(option1)>, etc... } */
val properties = linkedMapOf("type" to FieldType(StringType, nullable = false))
schema.options.forEach {
val name = typeName(it)
schema.options.forEach { unmappedType ->
/* Necessary because the type might contain a nested union that needs mapping to a disjoint record. */
val mappedType = map(unmappedType)
val name = typeName(mappedType)
if (name in properties) {
throw IllegalArgumentException("Union of types with same name: $name")
}
properties[typeName(it)] = FieldType(it, nullable = true)
properties[typeName(mappedType)] = FieldType(mappedType, nullable = true)
}
return ObjectType(properties)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.*
import io.airbyte.cdk.load.data.parquet.ParquetMapperPipelineFactory
import io.mockk.every
import io.mockk.mockk
import org.junit.jupiter.api.Test

class ParquetMapperPipelineTest {
@Test
fun `test conversions nested in unions`() {
val stream = mockk<DestinationStream>()
val schema =
ObjectType(
linkedMapOf(
"id" to FieldType(StringType, true),
"plan" to
FieldType(
UnionType(
setOf(
ObjectType(
linkedMapOf(
"id" to FieldType(StringType, true),
"price" to FieldType(NumberType, true),
"tiers" to
FieldType(
UnionType(
setOf(
ObjectType(
linkedMapOf(
"up_to" to
FieldType(
IntegerType,
true
),
"name" to
FieldType(StringType, true),
)
),
StringType
)
),
true
),
"metadata" to FieldType(ObjectTypeWithEmptySchema, true)
)
),
StringType
)
),
true
)
)
)
every { stream.schema } returns schema
every { stream.syncId } returns 101L
every { stream.generationId } returns 202L

val record =
ObjectValue(
linkedMapOf(
"id" to StringValue("1"),
"plan" to
ObjectValue(
linkedMapOf(
"id" to StringValue("2"),
"price" to NumberValue(10.0.toBigDecimal()),
"tiers" to
ObjectValue(
linkedMapOf(
"up_to" to IntegerValue(10),
"name" to StringValue("tier1")
)
),
"metadata" to
ObjectValue(linkedMapOf("key" to StringValue("value")))
)
),
)
)
val pipeline = ParquetMapperPipelineFactory().create(stream)
val schemaMapped = pipeline.finalSchema as ObjectType
val (recordMapped, _) = pipeline.map(record)

val planSchema = schemaMapped.properties["plan"]?.type as ObjectType
val planObjectOption = planSchema.properties["object"]?.type as ObjectType
assert(planObjectOption.properties["tiers"]?.type is ObjectType) {
"Unions nested within converted unions should also be converted"
}

val planValue = (recordMapped as ObjectValue).values["plan"] as ObjectValue
val planObjectValue = planValue.values["object"] as ObjectValue
assert(planObjectValue.values["metadata"] is StringValue) {
"Schemaless types values nested within converted unions should be converted"
}
}
}

0 comments on commit b8f3c26

Please sign in to comment.