From b8f3c26ee885c20c115c16b9c148defbfe693201 Mon Sep 17 00:00:00 2001 From: Johnny Schmidt Date: Fri, 3 Jan 2025 14:26:24 -0800 Subject: [PATCH] Destination S3-V2: Parquet mappings propagate into converted unions --- .../load/data/AirbyteValueIdentityMapper.kt | 11 +- .../load/data/SchemalessTypesToJsonString.kt | 24 ----- .../load/data/UnionTypeToDisjointRecord.kt | 8 +- .../test/kotlin/ParquetMapperPipelineTest.kt | 100 ++++++++++++++++++ 4 files changed, 115 insertions(+), 28 deletions(-) create mode 100644 airbyte-cdk/bulk/toolkits/load-parquet/src/test/kotlin/ParquetMapperPipelineTest.kt diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValueIdentityMapper.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValueIdentityMapper.kt index ade174a12b7a..bb43c1f870ce 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValueIdentityMapper.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValueIdentityMapper.kt @@ -152,7 +152,16 @@ open class AirbyteValueIdentityMapper : AirbyteValueMapper { value: AirbyteValue, schema: UnionType, context: Context - ): Pair = value to context + ): Pair { + schema.options.forEach { + try { + return mapInner(value, it, context) + } catch (e: Exception) { + // ignore + } + } + return nulledOut(schema, context) + } open fun mapBoolean(value: AirbyteValue, context: Context): Pair = value to context diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/SchemalessTypesToJsonString.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/SchemalessTypesToJsonString.kt index 795477d241f9..eb61fb562c5b 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/SchemalessTypesToJsonString.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/SchemalessTypesToJsonString.kt @@ -58,28 +58,4 @@ class SchemalessValuesToJsonString : AirbyteValueIdentityMapper() { value.toJson().serializeToString().let(::StringValue) to context override fun mapUnknown(value: AirbyteValue, context: Context): Pair = value.toJson().serializeToString().let(::StringValue) to context - - override fun mapUnion( - value: AirbyteValue, - schema: UnionType, - context: Context - ): Pair { - if (ObjectTypeWithEmptySchema in schema.options && value is ObjectValue) { - return mapObjectWithEmptySchema(value, ObjectTypeWithEmptySchema, context) - } - - if (ObjectTypeWithoutSchema in schema.options && value is ObjectValue) { - return mapObjectWithoutSchema(value, ObjectTypeWithoutSchema, context) - } - - if (ArrayTypeWithoutSchema in schema.options && value is ArrayValue) { - return mapArrayWithoutSchema(value, ArrayTypeWithoutSchema, context) - } - - if (schema.options.any { it is UnknownType } && value is UnknownValue) { - return mapUnknown(value, context) - } - - return super.mapUnion(value, schema, context) - } } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/UnionTypeToDisjointRecord.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/UnionTypeToDisjointRecord.kt index 3adefd152204..cbbbba37caa2 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/UnionTypeToDisjointRecord.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/UnionTypeToDisjointRecord.kt @@ -11,12 +11,14 @@ class UnionTypeToDisjointRecord : AirbyteSchemaIdentityMapper { } /* Create a schema of { "type": "string", "": , etc... } */ val properties = linkedMapOf("type" to FieldType(StringType, nullable = false)) - schema.options.forEach { - val name = typeName(it) + schema.options.forEach { unmappedType -> + /* Necessary because the type might contain a nested union that needs mapping to a disjoint record. */ + val mappedType = map(unmappedType) + val name = typeName(mappedType) if (name in properties) { throw IllegalArgumentException("Union of types with same name: $name") } - properties[typeName(it)] = FieldType(it, nullable = true) + properties[typeName(mappedType)] = FieldType(mappedType, nullable = true) } return ObjectType(properties) } diff --git a/airbyte-cdk/bulk/toolkits/load-parquet/src/test/kotlin/ParquetMapperPipelineTest.kt b/airbyte-cdk/bulk/toolkits/load-parquet/src/test/kotlin/ParquetMapperPipelineTest.kt new file mode 100644 index 000000000000..a390507429df --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-parquet/src/test/kotlin/ParquetMapperPipelineTest.kt @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.data.* +import io.airbyte.cdk.load.data.parquet.ParquetMapperPipelineFactory +import io.mockk.every +import io.mockk.mockk +import org.junit.jupiter.api.Test + +class ParquetMapperPipelineTest { + @Test + fun `test conversions nested in unions`() { + val stream = mockk() + val schema = + ObjectType( + linkedMapOf( + "id" to FieldType(StringType, true), + "plan" to + FieldType( + UnionType( + setOf( + ObjectType( + linkedMapOf( + "id" to FieldType(StringType, true), + "price" to FieldType(NumberType, true), + "tiers" to + FieldType( + UnionType( + setOf( + ObjectType( + linkedMapOf( + "up_to" to + FieldType( + IntegerType, + true + ), + "name" to + FieldType(StringType, true), + ) + ), + StringType + ) + ), + true + ), + "metadata" to FieldType(ObjectTypeWithEmptySchema, true) + ) + ), + StringType + ) + ), + true + ) + ) + ) + every { stream.schema } returns schema + every { stream.syncId } returns 101L + every { stream.generationId } returns 202L + + val record = + ObjectValue( + linkedMapOf( + "id" to StringValue("1"), + "plan" to + ObjectValue( + linkedMapOf( + "id" to StringValue("2"), + "price" to NumberValue(10.0.toBigDecimal()), + "tiers" to + ObjectValue( + linkedMapOf( + "up_to" to IntegerValue(10), + "name" to StringValue("tier1") + ) + ), + "metadata" to + ObjectValue(linkedMapOf("key" to StringValue("value"))) + ) + ), + ) + ) + val pipeline = ParquetMapperPipelineFactory().create(stream) + val schemaMapped = pipeline.finalSchema as ObjectType + val (recordMapped, _) = pipeline.map(record) + + val planSchema = schemaMapped.properties["plan"]?.type as ObjectType + val planObjectOption = planSchema.properties["object"]?.type as ObjectType + assert(planObjectOption.properties["tiers"]?.type is ObjectType) { + "Unions nested within converted unions should also be converted" + } + + val planValue = (recordMapped as ObjectValue).values["plan"] as ObjectValue + val planObjectValue = planValue.values["object"] as ObjectValue + assert(planObjectValue.values["metadata"] is StringValue) { + "Schemaless types values nested within converted unions should be converted" + } + } +}