From 666e847c4e67cd65928c788a716bce7f3fd622e6 Mon Sep 17 00:00:00 2001 From: Johnny Schmidt Date: Sun, 22 Dec 2024 17:34:34 -0800 Subject: [PATCH] =?UTF-8?q?Destination=20S3V2:=20Only=20fail=20on=20bad=20?= =?UTF-8?q?types=20for=20avro=20and=20parquet=20(parity=E2=80=A6=20(#49981?= =?UTF-8?q?)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../load/data/SchemalessTypesToJsonString.kt | 30 +++++ .../load/data/UnionTypeToDisjointRecord.kt | 2 +- .../load/data/json/JsonSchemaToAirbyteType.kt | 14 +- .../cdk/load/message/DestinationMessage.kt | 42 +++--- .../FailOnAllUnknownTypesExceptNullTest.kt | 46 +++++++ .../data/UnionTypeToDisjointRecordTest.kt | 4 +- .../BasicFunctionalityIntegrationTest.kt | 97 ++++++++++---- .../load/data/avro/AirbyteTypeToAvroSchema.kt | 105 ++++++++------- .../data/avro/AirbyteValueToAvroRecord.kt | 121 ++++++++++-------- .../data/avro/AvroMapperPipelineFactory.kt | 3 +- .../data/avro/AvroRecordToAirbyteValue.kt | 2 +- .../RecordToPartAccumulatorTest.kt | 1 - .../cdk/load/ObjectStorageDataDumper.kt | 5 +- .../parquet/ParquetMapperPipelineFactory.kt | 3 +- .../io/airbyte/cdk/load/file/s3/S3Client.kt | 8 +- ...atic_types_disjoint_union_messages_out.txt | 2 +- .../destination-s3-v2/metadata.yaml | 7 +- .../s3/S3V2FileTransferDestinationTest.kt | 2 +- .../destination/s3_v2/S3V2CheckTest.kt | 4 + .../destination/s3_v2/S3V2TestUtils.kt | 1 + .../destination/s3_v2/S3V2WriteTest.kt | 6 + 21 files changed, 334 insertions(+), 171 deletions(-) create mode 100644 airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/FailOnAllUnknownTypesExceptNullTest.kt diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/SchemalessTypesToJsonString.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/SchemalessTypesToJsonString.kt index 3d38a03c4516..263539acb521 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/SchemalessTypesToJsonString.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/SchemalessTypesToJsonString.kt @@ -7,6 +7,36 @@ package io.airbyte.cdk.load.data import io.airbyte.cdk.load.data.json.toJson import io.airbyte.cdk.load.util.serializeToString +/** + * Intended for Avro and Parquet Conversions and similar use cases. + * + * The contract is to serialize the values of schemaless and unknown types to a json string. + * + * Because there is no JsonBlob `AirbyteType`, we leave the types as-is and just serialize them. It + * is expected that the serializer will know to expect strings for each type. + * + * This means there's no need for a type mapper, unless you also want to support some subset of the + * Unknown types. + * + * For example, [FailOnAllUnknownTypesExceptNull] is used to add support for `{ "type": "null" }` + */ +class FailOnAllUnknownTypesExceptNull : AirbyteSchemaIdentityMapper { + override fun mapUnknown(schema: UnknownType) = + if ( + schema.schema.isObject && + ((schema.schema.get("type").isTextual && + schema.schema.get("type").textValue() == "null") || + (schema.schema.get("type").isArray && + schema.schema.get("type").elements().asSequence().all { + it.isTextual && it.textValue() == "null" + })) + ) { + schema + } else { + throw IllegalStateException("Unknown type: $schema") + } +} + class SchemalessValuesToJsonString : AirbyteValueIdentityMapper() { override fun mapObjectWithoutSchema( value: AirbyteValue, diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/UnionTypeToDisjointRecord.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/UnionTypeToDisjointRecord.kt index 98c23feedf6c..3adefd152204 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/UnionTypeToDisjointRecord.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/UnionTypeToDisjointRecord.kt @@ -37,7 +37,7 @@ class UnionTypeToDisjointRecord : AirbyteSchemaIdentityMapper { is ObjectType -> "object" is ArrayTypeWithoutSchema, is ObjectTypeWithoutSchema, - is ObjectTypeWithEmptySchema -> "string" + is ObjectTypeWithEmptySchema -> "object" is UnionType -> "union" is UnknownType -> "unknown" } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/JsonSchemaToAirbyteType.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/JsonSchemaToAirbyteType.kt index 932382b89db3..4f9736282172 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/JsonSchemaToAirbyteType.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/JsonSchemaToAirbyteType.kt @@ -28,12 +28,7 @@ class JsonSchemaToAirbyteType { "array" -> fromArray(schema) "object" -> fromObject(schema) "null" -> null - else -> - throw IllegalArgumentException( - "Unknown type: ${ - schema.get("type").asText() - }" - ) + else -> UnknownType(schema) } } else if (schemaType.isArray) { // {"type": [...], ...} @@ -92,12 +87,7 @@ class JsonSchemaToAirbyteType { TimestampTypeWithTimezone } null -> StringType - else -> - throw IllegalArgumentException( - "Unknown string format: ${ - schema.get("format").asText() - }" - ) + else -> UnknownType(schema) } private fun fromNumber(schema: ObjectNode): AirbyteType = diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt index 715817d59a52..45592ea65ae0 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt @@ -392,24 +392,30 @@ class DestinationMessageFactory( name = message.record.stream, ) if (fileTransferEnabled) { - @Suppress("UNCHECKED_CAST") - val fileMessage = - message.record.additionalProperties["file"] as Map - - DestinationFile( - stream = stream.descriptor, - emittedAtMs = message.record.emittedAt, - serialized = serialized, - fileMessage = - DestinationFile.AirbyteRecordMessageFile( - fileUrl = fileMessage["file_url"] as String?, - bytes = toLong(fileMessage["bytes"], "message.record.bytes"), - fileRelativePath = fileMessage["file_relative_path"] as String?, - modified = - toLong(fileMessage["modified"], "message.record.modified"), - sourceFileUrl = fileMessage["source_file_url"] as String? - ) - ) + try { + @Suppress("UNCHECKED_CAST") + val fileMessage = + message.record.additionalProperties["file"] as Map + + DestinationFile( + stream = stream.descriptor, + emittedAtMs = message.record.emittedAt, + serialized = serialized, + fileMessage = + DestinationFile.AirbyteRecordMessageFile( + fileUrl = fileMessage["file_url"] as String?, + bytes = toLong(fileMessage["bytes"], "message.record.bytes"), + fileRelativePath = fileMessage["file_relative_path"] as String?, + modified = + toLong(fileMessage["modified"], "message.record.modified"), + sourceFileUrl = fileMessage["source_file_url"] as String? + ) + ) + } catch (e: Exception) { + throw IllegalArgumentException( + "Failed to construct file message: ${e.message}" + ) + } } else { DestinationRecord(stream.descriptor, message, serialized, stream.schema) } diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/FailOnAllUnknownTypesExceptNullTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/FailOnAllUnknownTypesExceptNullTest.kt new file mode 100644 index 000000000000..1f31db5e7aa7 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/FailOnAllUnknownTypesExceptNullTest.kt @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.data + +import com.fasterxml.jackson.databind.node.JsonNodeFactory +import io.airbyte.cdk.load.test.util.Root +import io.airbyte.cdk.load.test.util.SchemaRecordBuilder +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Test + +class FailOnAllUnknownTypesExceptNullTest { + @Test + fun testBasicTypeBehavior() { + val nullType = JsonNodeFactory.instance.objectNode().put("type", "null") + val (inputSchema, expectedOutput) = + SchemaRecordBuilder() + .with(UnknownType(nullType)) + .with( + UnknownType( + JsonNodeFactory.instance + .objectNode() + .set( + "type", + JsonNodeFactory.instance.arrayNode().add("null").add("null") + ) + ) + ) + .build() + FailOnAllUnknownTypesExceptNull().map(inputSchema).let { + Assertions.assertEquals(expectedOutput, it) + } + } + + @Test + fun `test throws on non-null unknown types`() { + val (inputSchema, _) = + SchemaRecordBuilder() + .with(UnknownType(JsonNodeFactory.instance.objectNode().put("type", "whatever"))) + .build() + Assertions.assertThrows(IllegalStateException::class.java) { + FailOnAllUnknownTypesExceptNull().map(inputSchema) + } + } +} diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/UnionTypeToDisjointRecordTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/UnionTypeToDisjointRecordTest.kt index 05c0fc4d8891..d11ea13c3bc8 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/UnionTypeToDisjointRecordTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/UnionTypeToDisjointRecordTest.kt @@ -33,8 +33,8 @@ class UnionTypeToDisjointRecordTest { fun testUnionOfTypesWithSameNameThrows() { val (inputSchema, _) = SchemaRecordBuilder() - // Both `StringType` and `ObjectWithoutSchema` are mapped to `string` - .with(UnionType.of(StringType, ObjectTypeWithoutSchema)) + // Both are mapped to `string` + .with(UnionType.of(ObjectTypeWithEmptySchema, ObjectTypeWithoutSchema)) .build() Assertions.assertThrows(IllegalArgumentException::class.java) { UnionTypeToDisjointRecord().map(inputSchema) diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt index a956271ea616..dc0de057a626 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt @@ -68,6 +68,7 @@ import org.junit.jupiter.api.Assumptions.assumeTrue import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll +import org.junit.jupiter.api.assertThrows sealed interface AllTypesBehavior @@ -126,6 +127,7 @@ abstract class BasicFunctionalityIntegrationTest( */ val commitDataIncrementally: Boolean, val allTypesBehavior: AllTypesBehavior, + val failOnUnknownTypes: Boolean = false, nullEqualsUnset: Boolean = false, ) : IntegrationTest(dataDumper, destinationCleaner, recordMangler, nameMapper, nullEqualsUnset) { val parsedConfig = ValidatedJsonUtils.parseOne(configSpecClass, configContents) @@ -1795,11 +1797,6 @@ abstract class BasicFunctionalityIntegrationTest( "schemaless_object" to FieldType(ObjectTypeWithoutSchema, nullable = true), "schematized_array" to FieldType(ArrayType(intType), nullable = true), "schemaless_array" to FieldType(ArrayTypeWithoutSchema, nullable = true), - "unknown" to - FieldType( - UnknownType(JsonNodeFactory.instance.textNode("test")), - nullable = true - ), ), ), generationId = 42, @@ -1820,8 +1817,7 @@ abstract class BasicFunctionalityIntegrationTest( "empty_object": {}, "schemaless_object": { "uuid": "38F52396-736D-4B23-B5B4-F504D8894B97", "probability": 1.5 }, "schematized_array": [10, null], - "schemaless_array": [ 10, "foo", null, { "bar": "qua" } ], - "unknown": {"foo": "bar"} + "schemaless_array": [ 10, "foo", null, { "bar": "qua" } ] }""".trimIndent(), emittedAtMs = 1602637589100, ), @@ -1835,8 +1831,7 @@ abstract class BasicFunctionalityIntegrationTest( "empty_object": {"extra": "stuff"}, "schemaless_object": { "address": { "street": "113 Hickey Rd", "zip": "37932" }, "flags": [ true, false, false ] }, "schematized_array": [], - "schemaless_array": [], - "unknown": {} + "schemaless_array": [] }""".trimIndent(), emittedAtMs = 1602637589200, ), @@ -1850,8 +1845,7 @@ abstract class BasicFunctionalityIntegrationTest( "empty_object": null, "schemaless_object": null, "schematized_array": null, - "schemaless_array": null, - "unknown": null + "schemaless_array": null }""".trimIndent(), emittedAtMs = 1602637589300, ), @@ -1885,12 +1879,6 @@ abstract class BasicFunctionalityIntegrationTest( } else { listOf(10, "foo", null, mapOf("bar" to "qua")) }, - "unknown" to - if (stringifySchemalessObjects) { - """{"foo":"bar"}""" - } else { - mapOf("foo" to "bar") - }, ), airbyteMeta = OutputRecord.Meta(syncId = 42), ), @@ -1927,12 +1915,6 @@ abstract class BasicFunctionalityIntegrationTest( } else { emptyList() }, - "unknown" to - if (stringifySchemalessObjects) { - """{}""" - } else { - emptyMap() - }, ), airbyteMeta = OutputRecord.Meta(syncId = 42), ), @@ -1947,7 +1929,6 @@ abstract class BasicFunctionalityIntegrationTest( "schemaless_object" to null, "schematized_array" to null, "schemaless_array" to null, - "unknown" to null, ), airbyteMeta = OutputRecord.Meta(syncId = 42), ), @@ -1962,6 +1943,74 @@ abstract class BasicFunctionalityIntegrationTest( ) } + @Test + open fun testUnknownTypes() { + val stream = + DestinationStream( + DestinationStream.Descriptor(randomizedNamespace, "problematic_types"), + Append, + ObjectType( + linkedMapOf( + "id" to + FieldType( + UnknownType( + JsonNodeFactory.instance.objectNode().put("type", "whatever") + ), + nullable = true + ), + ), + ), + generationId = 42, + minimumGenerationId = 0, + syncId = 42, + ) + runSync( + configContents, + stream, + listOf( + InputRecord( + randomizedNamespace, + "problematic_types", + """ + { + "id": "ex falso quodlibet" + }""".trimIndent(), + emittedAtMs = 1602637589100, + ) + ) + ) + + val expectedRecords: List = + listOf( + OutputRecord( + extractedAt = 1602637589100, + generationId = 42, + data = + mapOf( + "id" to "ex falso quodlibet", + ), + airbyteMeta = OutputRecord.Meta(syncId = 42), + ), + ) + + val dumpBlock = { + dumpAndDiffRecords( + parsedConfig, + expectedRecords, + stream, + primaryKey = listOf(listOf("id")), + cursor = null, + ) + } + if (failOnUnknownTypes) { + // Note: this will not catch assertion errors against data + // if the destination actually succeeds (by design). + assertThrows { dumpBlock() } + } else { + dumpBlock() + } + } + /** * This test verifies that destinations handle unions correctly. * diff --git a/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AirbyteTypeToAvroSchema.kt b/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AirbyteTypeToAvroSchema.kt index 5480f4047f4b..5a7fdc93ef58 100644 --- a/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AirbyteTypeToAvroSchema.kt +++ b/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AirbyteTypeToAvroSchema.kt @@ -30,55 +30,62 @@ import org.apache.avro.SchemaBuilder class AirbyteTypeToAvroSchema { fun convert(airbyteSchema: AirbyteType, path: List): Schema { - return when (airbyteSchema) { - is ObjectType -> { - val recordName = Transformations.toAvroSafeName(path.last()) - val recordNamespace = path.take(path.size - 1).reversed().joinToString(".") - val namespaceMangled = Transformations.toAvroSafeNamespace(recordNamespace) - val builder = SchemaBuilder.record(recordName).namespace(namespaceMangled).fields() - airbyteSchema.properties.entries - .fold(builder) { acc, (name, field) -> - val converted = convert(field.type, path + name) - val propertySchema = maybeMakeNullable(field, converted) - val nameMangled = Transformations.toAlphanumericAndUnderscore(name) - acc.name(nameMangled).type(propertySchema).let { - if (field.nullable) { - it.withDefault(null) - } else { - it.noDefault() + return try { + when (airbyteSchema) { + is ObjectType -> { + val recordName = Transformations.toAvroSafeName(path.last()) + val recordNamespace = path.take(path.size - 1).reversed().joinToString(".") + val namespaceMangled = Transformations.toAvroSafeNamespace(recordNamespace) + val builder = + SchemaBuilder.record(recordName).namespace(namespaceMangled).fields() + airbyteSchema.properties.entries + .fold(builder) { acc, (name, field) -> + val converted = convert(field.type, path + name) + val propertySchema = maybeMakeNullable(field, converted) + val nameMangled = Transformations.toAlphanumericAndUnderscore(name) + acc.name(nameMangled).type(propertySchema).let { + if (field.nullable) { + it.withDefault(null) + } else { + it.noDefault() + } } } - } - .endRecord() - } - is ArrayType -> { - val converted = convert(airbyteSchema.items.type, path + "items") - val itemsSchema = maybeMakeNullable(airbyteSchema.items, converted) - SchemaBuilder.array().items(itemsSchema) - } - is BooleanType -> SchemaBuilder.builder().booleanType() - is IntegerType -> SchemaBuilder.builder().longType() - is NumberType -> SchemaBuilder.builder().doubleType() - is StringType -> SchemaBuilder.builder().stringType() - is UnknownType, - is ObjectTypeWithEmptySchema, - is ObjectTypeWithoutSchema, - is ArrayTypeWithoutSchema -> SchemaBuilder.builder().stringType() - is DateType -> { - val schema = SchemaBuilder.builder().intType() - LogicalTypes.date().addToSchema(schema) - } - is TimeTypeWithTimezone, - is TimeTypeWithoutTimezone -> { - val schema = SchemaBuilder.builder().longType() - LogicalTypes.timeMicros().addToSchema(schema) - } - is TimestampTypeWithTimezone, - is TimestampTypeWithoutTimezone -> { - val schema = SchemaBuilder.builder().longType() - LogicalTypes.timestampMicros().addToSchema(schema) + .endRecord() + } + is ArrayType -> { + val converted = convert(airbyteSchema.items.type, path + "items") + val itemsSchema = maybeMakeNullable(airbyteSchema.items, converted) + SchemaBuilder.array().items(itemsSchema) + } + is BooleanType -> SchemaBuilder.builder().booleanType() + is IntegerType -> SchemaBuilder.builder().longType() + is NumberType -> SchemaBuilder.builder().doubleType() + is StringType -> SchemaBuilder.builder().stringType() + + // HACK: After upstream validation, UnknownType is sentinel for NullType + is UnknownType -> SchemaBuilder.builder().nullType() + is ObjectTypeWithEmptySchema, + is ObjectTypeWithoutSchema, + is ArrayTypeWithoutSchema -> SchemaBuilder.builder().stringType() + is DateType -> { + val schema = SchemaBuilder.builder().intType() + LogicalTypes.date().addToSchema(schema) + } + is TimeTypeWithTimezone, + is TimeTypeWithoutTimezone -> { + val schema = SchemaBuilder.builder().longType() + LogicalTypes.timeMicros().addToSchema(schema) + } + is TimestampTypeWithTimezone, + is TimestampTypeWithoutTimezone -> { + val schema = SchemaBuilder.builder().longType() + LogicalTypes.timestampMicros().addToSchema(schema) + } + is UnionType -> Schema.createUnion(airbyteSchema.options.map { convert(it, path) }) } - is UnionType -> Schema.createUnion(airbyteSchema.options.map { convert(it, path) }) + } catch (e: Exception) { + throw IllegalArgumentException("Failed to convert $airbyteSchema at $path", e) } } @@ -87,7 +94,11 @@ class AirbyteTypeToAvroSchema { avroSchema: Schema, ): Schema = if (airbyteSchema.nullable && avroSchema.type != Schema.Type.UNION) { - SchemaBuilder.unionOf().nullType().and().type(avroSchema).endUnion() + if (avroSchema.type == Schema.Type.NULL) { + avroSchema + } else { + SchemaBuilder.unionOf().nullType().and().type(avroSchema).endUnion() + } } else if (airbyteSchema.nullable && avroSchema.type == Schema.Type.UNION) { avroSchema.types .fold(SchemaBuilder.unionOf().nullType()) { acc, type -> acc.and().type(type) } diff --git a/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AirbyteValueToAvroRecord.kt b/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AirbyteValueToAvroRecord.kt index 7b3b58535ea4..a89697d93703 100644 --- a/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AirbyteValueToAvroRecord.kt +++ b/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AirbyteValueToAvroRecord.kt @@ -36,71 +36,82 @@ import org.apache.avro.generic.GenericRecord class AirbyteValueToAvroRecord { fun convert(airbyteValue: AirbyteValue, airbyteSchema: AirbyteType, schema: Schema): Any? { - if (airbyteValue == NullValue) { - return null - } + try { + if (airbyteValue == NullValue) { + return null + } - if ( - schema.isUnion && - schema.types.size == 2 && - schema.types.any { it.type == Schema.Type.NULL } - ) { - val nonNullSchema = schema.types.find { it.type != Schema.Type.NULL }!! - return convert(airbyteValue, airbyteSchema, nonNullSchema) - } + if ( + schema.isUnion && + schema.types.size == 2 && + schema.types.any { it.type == Schema.Type.NULL } + ) { + val nonNullSchema = schema.types.find { it.type != Schema.Type.NULL }!! + return convert(airbyteValue, airbyteSchema, nonNullSchema) + } - when (airbyteSchema) { - is ObjectType -> { - val record = GenericData.Record(schema) - airbyteSchema.properties.forEach { (name, airbyteField) -> - val nameMangled = Transformations.toAvroSafeName(name) - schema.getField(nameMangled)?.let { avroField -> - val value = (airbyteValue as ObjectValue).values[name] - record.put( - nameMangled, - convert(value ?: NullValue, airbyteField.type, avroField.schema()) - ) + when (airbyteSchema) { + is ObjectType -> { + val record = GenericData.Record(schema) + airbyteSchema.properties.forEach { (name, airbyteField) -> + val nameMangled = Transformations.toAvroSafeName(name) + schema.getField(nameMangled)?.let { avroField -> + val value = (airbyteValue as ObjectValue).values[name] + record.put( + nameMangled, + convert(value ?: NullValue, airbyteField.type, avroField.schema()) + ) + } } + return record } - return record - } - is ArrayType -> { - val array = GenericData.Array((airbyteValue as ArrayValue).values.size, schema) - airbyteValue.values.forEach { value -> - array.add(convert(value, airbyteSchema.items.type, schema.elementType)) + is ArrayType -> { + val array = + GenericData.Array((airbyteValue as ArrayValue).values.size, schema) + airbyteValue.values.forEach { value -> + array.add(convert(value, airbyteSchema.items.type, schema.elementType)) + } + return array } - return array - } - BooleanType -> return (airbyteValue as BooleanValue).value - DateType -> return (airbyteValue as IntegerValue).value.toInt() - IntegerType -> return (airbyteValue as IntegerValue).value.toLong() - NumberType -> return (airbyteValue as NumberValue).value.toDouble() - StringType -> return (airbyteValue as StringValue).value + BooleanType -> return (airbyteValue as BooleanValue).value + DateType -> return (airbyteValue as IntegerValue).value.toInt() + IntegerType -> return (airbyteValue as IntegerValue).value.toLong() + NumberType -> return (airbyteValue as NumberValue).value.toDouble() + StringType -> return (airbyteValue as StringValue).value - // Converted to strings upstream - is UnknownType, - ObjectTypeWithEmptySchema, - ObjectTypeWithoutSchema, - ArrayTypeWithoutSchema -> return (airbyteValue as StringValue).value + // Upstream all unknown types other than {"type": "null"} are converted to + // Schemaless + is UnknownType -> return null - // Converted to integrals upstream - TimeTypeWithTimezone, - TimeTypeWithoutTimezone, - TimestampTypeWithTimezone, - TimestampTypeWithoutTimezone -> return (airbyteValue as IntegerValue).value.toLong() - is UnionType -> { - for (optionType in airbyteSchema.options) { - try { - val optionSchema = matchingAvroType(optionType, schema) - return convert(airbyteValue, optionType, optionSchema) - } catch (e: Exception) { - continue + // Converted to strings upstream + ObjectTypeWithEmptySchema, + ObjectTypeWithoutSchema, + ArrayTypeWithoutSchema -> return (airbyteValue as StringValue).value + + // Converted to integrals upstream + TimeTypeWithTimezone, + TimeTypeWithoutTimezone, + TimestampTypeWithTimezone, + TimestampTypeWithoutTimezone -> return (airbyteValue as IntegerValue).value.toLong() + is UnionType -> { + for (optionType in airbyteSchema.options) { + try { + val optionSchema = matchingAvroType(optionType, schema) + return convert(airbyteValue, optionType, optionSchema) + } catch (e: Exception) { + continue + } } + throw IllegalArgumentException( + "No matching Avro type found for $airbyteSchema in $schema (airbyte value: ${airbyteValue.javaClass.simpleName})" + ) } - throw IllegalArgumentException( - "No matching Avro type found for $airbyteSchema in $schema (airbyte value: ${airbyteValue.javaClass.simpleName})" - ) } + } catch (e: Exception) { + throw RuntimeException( + "Failed to convert $airbyteSchema(${airbyteValue.javaClass.simpleName}) to $schema", + e + ) } } diff --git a/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AvroMapperPipelineFactory.kt b/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AvroMapperPipelineFactory.kt index db7b18eff8b2..b47c596dd272 100644 --- a/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AvroMapperPipelineFactory.kt +++ b/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AvroMapperPipelineFactory.kt @@ -7,6 +7,7 @@ package io.airbyte.cdk.load.data.avro import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.data.AirbyteSchemaNoopMapper import io.airbyte.cdk.load.data.AirbyteValueNoopMapper +import io.airbyte.cdk.load.data.FailOnAllUnknownTypesExceptNull import io.airbyte.cdk.load.data.MapperPipeline import io.airbyte.cdk.load.data.MapperPipelineFactory import io.airbyte.cdk.load.data.MergeUnions @@ -19,7 +20,7 @@ class AvroMapperPipelineFactory : MapperPipelineFactory { MapperPipeline( stream.schema, listOf( - AirbyteSchemaNoopMapper() to SchemalessValuesToJsonString(), + FailOnAllUnknownTypesExceptNull() to SchemalessValuesToJsonString(), AirbyteSchemaNoopMapper() to NullOutOfRangeIntegers(), AirbyteSchemaNoopMapper() to TimeStringToInteger(), MergeUnions() to AirbyteValueNoopMapper(), diff --git a/airbyte-cdk/bulk/toolkits/load-avro/src/testFixtures/kotlin/io/airbyte/cdk/load/data/avro/AvroRecordToAirbyteValue.kt b/airbyte-cdk/bulk/toolkits/load-avro/src/testFixtures/kotlin/io/airbyte/cdk/load/data/avro/AvroRecordToAirbyteValue.kt index ed4e840ac50b..59fcb4feb0f1 100644 --- a/airbyte-cdk/bulk/toolkits/load-avro/src/testFixtures/kotlin/io/airbyte/cdk/load/data/avro/AvroRecordToAirbyteValue.kt +++ b/airbyte-cdk/bulk/toolkits/load-avro/src/testFixtures/kotlin/io/airbyte/cdk/load/data/avro/AvroRecordToAirbyteValue.kt @@ -72,7 +72,7 @@ class AvroRecordToAirbyteValue { ) is IntegerType -> return IntegerValue(avroValue as Long) is NumberType -> return NumberValue((avroValue as Double).toBigDecimal()) - is UnknownType, + is UnknownType -> return NullValue ArrayTypeWithoutSchema, ObjectTypeWithEmptySchema, ObjectTypeWithoutSchema, diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/RecordToPartAccumulatorTest.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/RecordToPartAccumulatorTest.kt index 8ef9e9aab9d9..9b1d24b7a99c 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/RecordToPartAccumulatorTest.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/RecordToPartAccumulatorTest.kt @@ -137,7 +137,6 @@ class RecordToPartAccumulatorTest { // should be second part and final (and not empty) when (val batch = acc.processRecords(makeRecords(1), 0L, false) as ObjectStorageBatch) { is LoadablePart -> { - println(batch.part.bytes.contentToString()) assert(batch.part.bytes.contentEquals(makeBytes(1))) assert(batch.part.partIndex == 2) assert(batch.part.fileNumber == 111L) diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDataDumper.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDataDumper.kt index 86720448a900..71edac9b3bd5 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDataDumper.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDataDumper.kt @@ -48,9 +48,6 @@ class ObjectStorageDataDumper( private val formatConfig: ObjectStorageFormatConfiguration, private val compressionConfig: ObjectStorageCompressionConfiguration<*>? = null ) { - private val avroMapperPipeline = AvroMapperPipelineFactory().create(stream) - private val parquetMapperPipeline = ParquetMapperPipelineFactory().create(stream) - fun dump(): List { // Note: this is implicitly a test of the `streamConstant` final directory // and the path matcher, so a failure here might imply a bug in the metadata-based @@ -131,6 +128,7 @@ class ObjectStorageDataDumper( } } is AvroFormatConfiguration -> { + val avroMapperPipeline = AvroMapperPipelineFactory().create(stream) val finalSchema = avroMapperPipeline.finalSchema.withAirbyteMeta(wasFlattened) inputStream.toAvroReader(finalSchema.toAvroSchema(stream.descriptor)).use { reader -> @@ -145,6 +143,7 @@ class ObjectStorageDataDumper( } } is ParquetFormatConfiguration -> { + val parquetMapperPipeline = ParquetMapperPipelineFactory().create(stream) val finalSchema = parquetMapperPipeline.finalSchema.withAirbyteMeta(wasFlattened) inputStream.toParquetReader(finalSchema.toAvroSchema(stream.descriptor)).use { reader -> diff --git a/airbyte-cdk/bulk/toolkits/load-parquet/src/main/kotlin/io/airbyte/cdk/load/data/parquet/ParquetMapperPipelineFactory.kt b/airbyte-cdk/bulk/toolkits/load-parquet/src/main/kotlin/io/airbyte/cdk/load/data/parquet/ParquetMapperPipelineFactory.kt index bbdd4c179de7..6e5870c9fc9f 100644 --- a/airbyte-cdk/bulk/toolkits/load-parquet/src/main/kotlin/io/airbyte/cdk/load/data/parquet/ParquetMapperPipelineFactory.kt +++ b/airbyte-cdk/bulk/toolkits/load-parquet/src/main/kotlin/io/airbyte/cdk/load/data/parquet/ParquetMapperPipelineFactory.kt @@ -7,6 +7,7 @@ package io.airbyte.cdk.load.data.parquet import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.data.AirbyteSchemaNoopMapper import io.airbyte.cdk.load.data.AirbyteValueNoopMapper +import io.airbyte.cdk.load.data.FailOnAllUnknownTypesExceptNull import io.airbyte.cdk.load.data.MapperPipeline import io.airbyte.cdk.load.data.MapperPipelineFactory import io.airbyte.cdk.load.data.MergeUnions @@ -21,7 +22,7 @@ class ParquetMapperPipelineFactory : MapperPipelineFactory { MapperPipeline( stream.schema, listOf( - AirbyteSchemaNoopMapper() to SchemalessValuesToJsonString(), + FailOnAllUnknownTypesExceptNull() to SchemalessValuesToJsonString(), AirbyteSchemaNoopMapper() to NullOutOfRangeIntegers(), AirbyteSchemaNoopMapper() to TimeStringToInteger(), MergeUnions() to AirbyteValueNoopMapper(), diff --git a/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3Client.kt b/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3Client.kt index a2c8bb79d5dd..08ad56398010 100644 --- a/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3Client.kt +++ b/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3Client.kt @@ -207,7 +207,6 @@ class S3ClientFactory( @Singleton @Secondary fun make(): S3Client { - val credsProvider: CredentialsProvider = if (keyConfig.awsAccessKeyConfiguration.accessKeyId != null) { StaticCredentialsProvider { @@ -240,7 +239,12 @@ class S3ClientFactory( aws.sdk.kotlin.services.s3.S3Client { region = bucketConfig.s3BucketConfiguration.s3BucketRegion.name credentialsProvider = credsProvider - endpointUrl = bucketConfig.s3BucketConfiguration.s3Endpoint?.let { Url.parse(it) } + endpointUrl = + bucketConfig.s3BucketConfiguration.s3Endpoint?.let { + if (it.isNotBlank()) { + Url.parse(it) + } else null + } } return S3Client( diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v0/problematic_types_disjoint_union_messages_out.txt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v0/problematic_types_disjoint_union_messages_out.txt index 0194f28d3d9f..fc9e73e5de9c 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v0/problematic_types_disjoint_union_messages_out.txt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v0/problematic_types_disjoint_union_messages_out.txt @@ -1,3 +1,3 @@ -{"schemaless_object":"{\"uuid\":\"38F52396-736D-4B23-B5B4-F504D8894B97\",\"probability\":1.5}","schematized_object":{"id":1,"name":"Joe"},"combined_type":{"type":"string","string":"string1","integer":null},"union_type":{"type":"integer","string":null,"integer":10},"schemaless_array":"[10,\"foo\",null,{\"bar\":\"qua\"}]","mixed_array_integer_and_schemaless_object":[{"type":"integer","integer":15,"string":null},null,{"type":"string","integer":null,"string":"{\"hello\":\"world\"}"}],"array_of_union_integer_and_schemaless_array":[{"type":"integer","integer":25,"string":null},null,{"type":"string","integer":null,"string":"[\"goodbye\",\"cruel world\"]"}],"union_of_objects_with_properties_identical":{"id":10,"name":"Joe"},"union_of_objects_with_properties_overlapping":{"id":20,"name":"Jane","flagged":true},"union_of_objects_with_properties_nonoverlapping":{"id":30,"name":"Phil","flagged":false,"description":"Very Phil"}, "union_of_objects_with_properties_contradicting": { "id": {"type":"integer","integer":1,"string":null}, "name": "Jenny" }, "empty_object": "{}","object_with_null_properties": "{}", "combined_with_null": "foobar", "union_with_null":"barfoo", "combined_nulls": null, "compact_union": {"type": "object", "object": { "id": 10, "name": "Tyler" }, "integer": null } } +{"schemaless_object":"{\"uuid\":\"38F52396-736D-4B23-B5B4-F504D8894B97\",\"probability\":1.5}","schematized_object":{"id":1,"name":"Joe"},"combined_type":{"type":"string","string":"string1","integer":null},"union_type":{"type":"integer","string":null,"integer":10},"schemaless_array":"[10,\"foo\",null,{\"bar\":\"qua\"}]","mixed_array_integer_and_schemaless_object":[{"type":"integer","integer":15,"object":null},null,{"type":"object","integer":null,"object":"{\"hello\":\"world\"}"}],"array_of_union_integer_and_schemaless_array":[{"type":"integer","integer":25,"object":null},null,{"type":"object","integer":null,"object":"[\"goodbye\",\"cruel world\"]"}],"union_of_objects_with_properties_identical":{"id":10,"name":"Joe"},"union_of_objects_with_properties_overlapping":{"id":20,"name":"Jane","flagged":true},"union_of_objects_with_properties_nonoverlapping":{"id":30,"name":"Phil","flagged":false,"description":"Very Phil"},"union_of_objects_with_properties_contradicting":{"id":{"type":"integer","integer":1,"string":null},"name":"Jenny"},"empty_object":"{}","object_with_null_properties":"{}","combined_with_null":"foobar","union_with_null":"barfoo","combined_nulls":null,"compact_union":{"type":"object","object":{"id":10,"name":"Tyler"},"integer":null}} {"schemaless_object":"{\"address\":{\"street\":\"113 Hickey Rd\",\"zip\":\"37932\"},\"flags\":[true,false,false]}","schematized_object":{"id":2,"name":"Jane"},"combined_type":{"type":"integer","string":null,"integer":20},"union_type":{"type":"string","string":"string2","integer":null},"schemaless_array":"[]","mixed_array_integer_and_schemaless_object":[],"array_of_union_integer_and_schemaless_array":[],"union_of_objects_with_properties_identical":{"id":null,"name":null},"union_of_objects_with_properties_overlapping":{"id":null,"name":null,"flagged":null},"union_of_objects_with_properties_nonoverlapping":{"id":null,"name":null,"flagged":null,"description":null}, "union_of_objects_with_properties_contradicting": { "id": {"type":"string","integer":null,"string":"seal-one-hippity"}, "name": "James" }, "empty_object": "{\"extra\":\"stuff\"}", "object_with_null_properties": "{\"more\":{\"extra\":\"stuff\"}}", "combined_with_null": "foobar2", "union_with_null": "barfoo2", "combined_nulls": null, "compact_union": {"type":"integer","integer":4444,"object":null} } { "schemaless_object": null, "schematized_object": null, "combined_type": null, "union_type": null, "schemaless_array": null, "mixed_array_integer_and_schemaless_object": null, "array_of_union_integer_and_schemaless_array": null, "union_of_objects_with_properties_identical": null, "union_of_objects_with_properties_overlapping": null, "union_of_objects_with_properties_nonoverlapping": null, "union_of_objects_with_properties_contradicting": null, "empty_object": null, "object_with_null_properties": null, "combined_with_null": null, "union_with_null": null, "combined_nulls": null, "compact_union": null } \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml b/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml index 2dfbda00c691..d4572456a265 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml +++ b/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: file connectorType: destination definitionId: d6116991-e809-4c7c-ae09-c64712df5b66 - dockerImageTag: 0.3.4 + dockerImageTag: 0.3.5 dockerRepository: airbyte/destination-s3-v2 githubIssueLabel: destination-s3-v2 icon: s3.svg @@ -111,4 +111,9 @@ data: secretStore: type: GSM alias: airbyte-connector-testing-secret-store + - name: SECRET_DESTINATION-S3-V2-ENDPOINT_EMPTY_URL + fileName: s3_dest_v2_endpoint_empty_url_config.json + secretStore: + type: GSM + alias: airbyte-connector-testing-secret-store metadataSpecVersion: "1.0" diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2FileTransferDestinationTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2FileTransferDestinationTest.kt index 0dc65fca62ac..efd09916ba04 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2FileTransferDestinationTest.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2FileTransferDestinationTest.kt @@ -164,7 +164,7 @@ class S3V2FileTransferDestinationTest : S3BaseDestinationAcceptanceTest() { } catch (e: TestHarnessException) { assertContains( e.outputMessages!![0].trace.error.internalMessage, - "Failed to convert AirbyteMessage" + "Failed to construct file message" ) } } diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2CheckTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2CheckTest.kt index c1d1b512d38e..01c6eeb3ce38 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2CheckTest.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2CheckTest.kt @@ -37,6 +37,10 @@ class S3V2CheckTest : Path.of(S3V2TestUtils.CSV_GZIP_CONFIG_PATH), setOf(FeatureFlag.AIRBYTE_CLOUD_DEPLOYMENT), ), + CheckTestConfig( + Path.of(S3V2TestUtils.ENDPOINT_EMPTY_URL_CONFIG_PATH), + setOf(FeatureFlag.AIRBYTE_CLOUD_DEPLOYMENT), + ) ), failConfigFilenamesAndFailureReasons = emptyMap() ) { diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2TestUtils.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2TestUtils.kt index 229e276883a7..713abb1d6b6f 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2TestUtils.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2TestUtils.kt @@ -22,5 +22,6 @@ object S3V2TestUtils { const val PARQUET_UNCOMPRESSED_CONFIG_PATH = "secrets/s3_dest_v2_parquet_config.json" const val PARQUET_SNAPPY_CONFIG_PATH = "secrets/s3_dest_v2_parquet_snappy_config.json" const val ENDPOINT_URL_CONFIG_PATH = "secrets/s3_dest_v2_endpoint_url_config.json" + const val ENDPOINT_EMPTY_URL_CONFIG_PATH = "secrets/s3_dest_v2_endpoint_empty_url_config.json" fun getConfig(configPath: String): String = Files.readString(Path.of(configPath)) } diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt index a4b3d56ab8b1..896a8da27a9f 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt @@ -25,6 +25,7 @@ abstract class S3V2WriteTest( commitDataIncrementally: Boolean = true, allTypesBehavior: AllTypesBehavior, nullEqualsUnset: Boolean = false, + failOnUnknownTypes: Boolean = false, ) : BasicFunctionalityIntegrationTest( S3V2TestUtils.getConfig(path), @@ -41,6 +42,7 @@ abstract class S3V2WriteTest( allTypesBehavior = allTypesBehavior, nullEqualsUnset = nullEqualsUnset, supportFileTransfer = true, + failOnUnknownTypes = failOnUnknownTypes, ) { @Disabled("Irrelevant for file destinations") @Test @@ -149,6 +151,7 @@ class S3V2WriteTestAvroUncompressed : preserveUndeclaredFields = false, allTypesBehavior = StronglyTyped(integerCanBeLarge = false), nullEqualsUnset = true, + failOnUnknownTypes = true, ) class S3V2WriteTestAvroBzip2 : @@ -159,6 +162,7 @@ class S3V2WriteTestAvroBzip2 : preserveUndeclaredFields = false, allTypesBehavior = StronglyTyped(integerCanBeLarge = false), nullEqualsUnset = true, + failOnUnknownTypes = true, ) class S3V2WriteTestParquetUncompressed : @@ -169,6 +173,7 @@ class S3V2WriteTestParquetUncompressed : preserveUndeclaredFields = false, allTypesBehavior = StronglyTyped(integerCanBeLarge = false), nullEqualsUnset = true, + failOnUnknownTypes = true, ) class S3V2WriteTestParquetSnappy : @@ -179,6 +184,7 @@ class S3V2WriteTestParquetSnappy : preserveUndeclaredFields = false, allTypesBehavior = StronglyTyped(integerCanBeLarge = false), nullEqualsUnset = true, + failOnUnknownTypes = true, ) class S3V2WriteTestEndpointURL :