From f99a67aaf3c8f9c30822da75e800d0031745bed0 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Thu, 2 Jan 2025 16:01:37 -0800 Subject: [PATCH] Load CDK: Support strongly/weakly-typed mappers (#49959) --- .../MockBasicFunctionalityIntegrationTest.kt | 4 +- .../io/airbyte/cdk/load/data/AirbyteValue.kt | 100 ++---- .../data/AirbyteValueDeepCoercingMapper.kt | 231 ++++++++++++++ .../load/data/AirbyteValueIdentityMapper.kt | 46 ++- .../cdk/load/data/NullOutOfRangeIntegers.kt | 3 +- .../load/data/SchemalessTypesToJsonString.kt | 2 +- .../cdk/load/data/TimeStringToInteger.kt | 60 +--- .../cdk/load/data/json/AirbyteValueToJson.kt | 10 +- .../cdk/load/data/json/JsonToAirbyteValue.kt | 212 ++----------- .../cdk/load/message/DestinationMessage.kt | 45 ++- .../cdk/load/util/TimeStringUtility.kt | 21 +- .../AirbyteValueDeepCoercingMapperTest.kt | 290 ++++++++++++++++++ .../data/AirbyteValueIdentityMapperTest.kt | 24 +- .../cdk/load/data/MapperPipelineTest.kt | 4 +- .../data/SchemalessTypesToJsonStringTest.kt | 2 +- .../cdk/load/data/TimeStringToIntegerTest.kt | 142 +++------ .../load/data/json/AirbyteValueToJsonTest.kt | 63 +--- .../load/data/json/JsonToAirbyteValueTest.kt | 144 +-------- .../cdk/load/test/util/RecordDifferTest.kt | 6 +- .../cdk/load/util/TimeStringUtilityTest.kt | 26 +- .../airbyte/cdk/load/message/InputMessage.kt | 3 +- .../AirbyteValueWithMetaToOutputRecord.kt | 12 +- .../load/test/util/ExpectedRecordMapper.kt | 43 +++ .../cdk/load/test/util/RecordDiffer.kt | 50 +-- .../BasicFunctionalityIntegrationTest.kt | 33 +- .../data/avro/AvroMapperPipelineFactory.kt | 12 +- .../data/avro/AvroRecordToAirbyteValue.kt | 26 +- .../cdk/load/data/csv/AirbyteValueToCsvRow.kt | 12 +- .../cdk/load/data/csv/CsvRowToAirbyteValue.kt | 115 ++----- .../parquet/AirbyteTypeToIcebergSchema.kt | 10 +- .../parquet/AirbyteValueToIcebergRecord.kt | 43 ++- .../parquet/IcebergParquetPipelineFactory.kt | 14 +- .../parquet/SchemalessTypesToStringType.kt | 19 -- .../AirbyteValueToIcebergRecordTest.kt | 59 +++- .../cdk/load/ObjectStorageDataDumper.kt | 2 +- .../parquet/ParquetMapperPipelineFactory.kt | 14 +- .../iceberg/v2/IcebergExpectedRecordMapper.kt | 36 +++ .../iceberg/v2/IcebergV2WriteTest.kt | 27 +- .../iceberg/v2/io/IcebergUtilTest.kt | 5 +- .../destination/s3_v2/S3V2WriteTest.kt | 44 ++- 40 files changed, 1076 insertions(+), 938 deletions(-) create mode 100644 airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValueDeepCoercingMapper.kt create mode 100644 airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/AirbyteValueDeepCoercingMapperTest.kt delete mode 100644 airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/SchemalessTypesToStringType.kt create mode 100644 airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergExpectedRecordMapper.kt diff --git a/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockBasicFunctionalityIntegrationTest.kt b/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockBasicFunctionalityIntegrationTest.kt index 195ed81067f4..6e3891269cfc 100644 --- a/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockBasicFunctionalityIntegrationTest.kt +++ b/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockBasicFunctionalityIntegrationTest.kt @@ -5,8 +5,8 @@ package io.airbyte.cdk.load.mock_integration_test import io.airbyte.cdk.load.test.util.NoopDestinationCleaner -import io.airbyte.cdk.load.test.util.NoopExpectedRecordMapper import io.airbyte.cdk.load.test.util.NoopNameMapper +import io.airbyte.cdk.load.test.util.UncoercedExpectedRecordMapper import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest import io.airbyte.cdk.load.write.Untyped import org.junit.jupiter.api.Disabled @@ -18,7 +18,7 @@ class MockBasicFunctionalityIntegrationTest : MockDestinationSpecification::class.java, MockDestinationDataDumper, NoopDestinationCleaner, - NoopExpectedRecordMapper, + UncoercedExpectedRecordMapper, NoopNameMapper, isStreamSchemaRetroactive = false, supportsDedup = true, diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValue.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValue.kt index b37dffac4cfa..6bf7069822cc 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValue.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValue.kt @@ -12,7 +12,6 @@ import java.time.LocalDateTime import java.time.LocalTime import java.time.OffsetDateTime import java.time.OffsetTime -import java.time.ZoneOffset sealed interface AirbyteValue { companion object { @@ -27,11 +26,11 @@ sealed interface AirbyteValue { is BigInteger -> IntegerValue(value) is Double -> NumberValue(BigDecimal.valueOf(value)) is BigDecimal -> NumberValue(value) - is LocalDate -> DateValue(value.toString()) - is OffsetDateTime, - is LocalDateTime -> TimestampValue(value.toString()) - is OffsetTime, - is LocalTime -> TimeValue(value.toString()) + is LocalDate -> DateValue(value) + is OffsetDateTime -> TimestampWithTimezoneValue(value) + is LocalDateTime -> TimestampWithoutTimezoneValue(value) + is OffsetTime -> TimeWithTimezoneValue(value) + is LocalTime -> TimeWithoutTimezoneValue(value) is Map<*, *> -> ObjectValue.from(@Suppress("UNCHECKED_CAST") (value as Map)) is List<*> -> ArrayValue.from(value) @@ -73,76 +72,37 @@ value class NumberValue(val value: BigDecimal) : AirbyteValue, Comparable { - override fun compareTo(other: DateValue): Int { - val thisDate = - try { - LocalDate.parse(value) - } catch (e: Exception) { - LocalDate.MIN - } - val otherDate = - try { - LocalDate.parse(other.value) - } catch (e: Exception) { - LocalDate.MIN - } - return thisDate.compareTo(otherDate) - } +value class DateValue(val value: LocalDate) : AirbyteValue, Comparable { + constructor(date: String) : this(LocalDate.parse(date)) + override fun compareTo(other: DateValue): Int = value.compareTo(other.value) } @JvmInline -value class TimestampValue(val value: String) : AirbyteValue, Comparable { - override fun compareTo(other: TimestampValue): Int { - // Do all comparisons using OffsetDateTime for convenience. - // First, try directly parsing as OffsetDateTime. - // If that fails, try parsing as LocalDateTime and assume UTC. - // We could maybe have separate value classes for these cases, - // but that comes with its own set of problems - // (mostly around sources declaring bad schemas). - val thisTimestamp = - try { - OffsetDateTime.parse(value) - } catch (e: Exception) { - LocalDateTime.parse(value).atOffset(ZoneOffset.UTC) - } catch (e: Exception) { - LocalDateTime.MIN.atOffset(ZoneOffset.UTC) - } - val otherTimestamp = - try { - OffsetDateTime.parse(other.value) - } catch (e: Exception) { - LocalDateTime.parse(other.value).atOffset(ZoneOffset.UTC) - } catch (e: Exception) { - LocalDateTime.MIN.atOffset(ZoneOffset.UTC) - } - return thisTimestamp.compareTo(otherTimestamp) - } +value class TimestampWithTimezoneValue(val value: OffsetDateTime) : + AirbyteValue, Comparable { + constructor(timestamp: String) : this(OffsetDateTime.parse(timestamp)) + override fun compareTo(other: TimestampWithTimezoneValue): Int = value.compareTo(other.value) } @JvmInline -value class TimeValue(val value: String) : AirbyteValue, Comparable { - override fun compareTo(other: TimeValue): Int { - // Similar to TimestampValue, try parsing with/without timezone, - // and do all comparisons using OffsetTime. - val thisTime = - try { - OffsetTime.parse(value) - } catch (e: Exception) { - LocalTime.parse(value).atOffset(ZoneOffset.UTC) - } catch (e: Exception) { - LocalTime.MIN.atOffset(ZoneOffset.UTC) - } - val otherTime = - try { - OffsetTime.parse(other.value) - } catch (e: Exception) { - LocalTime.parse(other.value).atOffset(ZoneOffset.UTC) - } catch (e: Exception) { - LocalTime.MIN.atOffset(ZoneOffset.UTC) - } - return thisTime.compareTo(otherTime) - } +value class TimestampWithoutTimezoneValue(val value: LocalDateTime) : + AirbyteValue, Comparable { + constructor(timestamp: String) : this(LocalDateTime.parse(timestamp)) + override fun compareTo(other: TimestampWithoutTimezoneValue): Int = value.compareTo(other.value) +} + +@JvmInline +value class TimeWithTimezoneValue(val value: OffsetTime) : + AirbyteValue, Comparable { + constructor(time: String) : this(OffsetTime.parse(time)) + override fun compareTo(other: TimeWithTimezoneValue): Int = value.compareTo(other.value) +} + +@JvmInline +value class TimeWithoutTimezoneValue(val value: LocalTime) : + AirbyteValue, Comparable { + constructor(time: String) : this(LocalTime.parse(time)) + override fun compareTo(other: TimeWithoutTimezoneValue): Int = value.compareTo(other.value) } @JvmInline diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValueDeepCoercingMapper.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValueDeepCoercingMapper.kt new file mode 100644 index 000000000000..1d4cdd2ab9fc --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValueDeepCoercingMapper.kt @@ -0,0 +1,231 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.data + +import io.airbyte.cdk.load.util.serializeToString +import java.math.BigDecimal +import java.math.BigInteger +import java.time.LocalDate +import java.time.LocalDateTime +import java.time.LocalTime +import java.time.OffsetDateTime +import java.time.OffsetTime +import java.time.ZoneOffset +import java.time.ZonedDateTime +import java.time.format.DateTimeFormatter + +/** + * A mapper which coerces ALL values against the schema. This mapper MUST NOT be called after any + * mapper that returns non-native-JSON types (date, timestamp, etc.), or any mapper that causes the + * values to become misaligned with the schema (e.g. [AirbyteSchemaNoopMapper] + + * [SchemalessValuesToJsonString]). + * + * If this mapper is included in a [MapperPipeline], it SHOULD be preceded by a [MergeUnions] + * mapper. Not including this mapper may result in strange behavior when coercing union types. + * + * This mapper performs common-sense type coercions. For example, it will promote IntegerValue to + * NumberValue, or parse StringValue to TimestampValue. + */ +class AirbyteValueDeepCoercingMapper : AirbyteValueIdentityMapper() { + override fun mapObject( + value: AirbyteValue, + schema: ObjectType, + context: Context + ): Pair = + // force to object, and then use the superclass recursion + requireType(value, schema, context) { super.mapObject(it, schema, context) } + + override fun mapObjectWithEmptySchema( + value: AirbyteValue, + schema: ObjectTypeWithEmptySchema, + context: Context + ): Pair = requireType(value, schema, context) + + override fun mapObjectWithoutSchema( + value: AirbyteValue, + schema: ObjectTypeWithoutSchema, + context: Context + ): Pair = requireType(value, schema, context) + + override fun mapArray( + value: AirbyteValue, + schema: ArrayType, + context: Context + ): Pair = + // force to array, and then use the superclass recursion + requireType(value, schema, context) { super.mapArray(it, schema, context) } + + override fun mapArrayWithoutSchema( + value: AirbyteValue, + schema: ArrayTypeWithoutSchema, + context: Context + ): Pair = requireType(value, schema, context) + + override fun mapBoolean(value: AirbyteValue, context: Context): Pair = + requireType(value, BooleanType, context) + + override fun mapNumber(value: AirbyteValue, context: Context): Pair = + when (value) { + is NumberValue -> value to context + is IntegerValue -> NumberValue(value.value.toBigDecimal()) to context + is StringValue -> NumberValue(BigDecimal(value.value)) to context + else -> nulledOut(NumberType, context) + } + + override fun mapInteger(value: AirbyteValue, context: Context): Pair = + when (value) { + // Maybe we should truncate non-int values? + // But to match existing behavior, let's just null for now. + is NumberValue -> IntegerValue(value.value.toBigIntegerExact()) to context + is IntegerValue -> value to context + is StringValue -> IntegerValue(BigInteger(value.value)) to context + else -> nulledOut(IntegerType, context) + } + + override fun mapString(value: AirbyteValue, context: Context): Pair { + val stringified = + when (value) { + // this should never happen, because we handle `value is NullValue` + // in the top-level if statement + NullValue -> throw IllegalStateException("Unexpected NullValue") + is StringValue -> value.value + is NumberValue -> value.value.toString() + is IntegerValue -> value.value.toString() + is BooleanValue -> value.value.toString() + is ArrayValue, + is ObjectValue -> value.serializeToString() + // JsonToAirbyteValue never outputs these values, so don't handle them. + is DateValue, + is TimeWithTimezoneValue, + is TimeWithoutTimezoneValue, + is TimestampWithTimezoneValue, + is TimestampWithoutTimezoneValue, + is UnknownValue -> + throw IllegalArgumentException( + "Invalid value type ${value.javaClass.canonicalName}" + ) + } + return StringValue(stringified) to context + } + + override fun mapDate(value: AirbyteValue, context: Context): Pair = + requireType(value, DateType, context) { + DateValue(LocalDate.parse(it.value, DATE_TIME_FORMATTER)) to context + } + + override fun mapTimeWithTimezone( + value: AirbyteValue, + context: Context + ): Pair = + requireType(value, TimeTypeWithTimezone, context) { + val ot = + try { + OffsetTime.parse(it.value, TIME_FORMATTER) + } catch (e: Exception) { + LocalTime.parse(it.value, TIME_FORMATTER).atOffset(ZoneOffset.UTC) + } + TimeWithTimezoneValue(ot) to context + } + + override fun mapTimeWithoutTimezone( + value: AirbyteValue, + context: Context + ): Pair = + requireType(value, TimeTypeWithoutTimezone, context) { + TimeWithoutTimezoneValue(LocalTime.parse(it.value, TIME_FORMATTER)) to context + } + + override fun mapTimestampWithTimezone( + value: AirbyteValue, + context: Context + ): Pair = + requireType(value, TimestampTypeWithTimezone, context) { + TimestampWithTimezoneValue(offsetDateTime(it)) to context + } + + override fun mapTimestampWithoutTimezone( + value: AirbyteValue, + context: Context + ): Pair = + requireType(value, TimestampTypeWithoutTimezone, context) { + TimestampWithoutTimezoneValue(offsetDateTime(it).toLocalDateTime()) to context + } + + private fun offsetDateTime(it: StringValue): OffsetDateTime { + val odt = + try { + ZonedDateTime.parse(it.value, DATE_TIME_FORMATTER).toOffsetDateTime() + } catch (e: Exception) { + LocalDateTime.parse(it.value, DATE_TIME_FORMATTER).atOffset(ZoneOffset.UTC) + } + return odt + } + + override fun mapUnion( + value: AirbyteValue, + schema: UnionType, + context: Context + ): Pair = + if (schema.options.isEmpty()) { + nulledOut(schema, context) + } else { + val option = + schema.options.find { matchesStrictly(value, it) } + ?: schema.options.find { matchesPermissively(value, it) } + ?: throw IllegalArgumentException( + "No matching union option in ${schema.options} for ${value::class.java.canonicalName}", + ) + mapInner(value, option, context) + } + + private fun matchesStrictly(value: AirbyteValue, schema: AirbyteType): Boolean = + when (schema) { + is ArrayType, + is ArrayTypeWithoutSchema -> value is ArrayValue + is BooleanType -> value is BooleanValue + is DateType -> value is StringValue + is IntegerType -> value is IntegerValue + is NumberType -> value is NumberValue + is ObjectType, + is ObjectTypeWithoutSchema, + is ObjectTypeWithEmptySchema -> value is ObjectValue + is StringType -> value is StringValue + is TimeTypeWithTimezone, + is TimeTypeWithoutTimezone, + is TimestampTypeWithTimezone, + is TimestampTypeWithoutTimezone -> value is StringValue + is UnionType -> schema.options.any { matchesStrictly(value, it) } + is UnknownType -> false + } + + private fun matchesPermissively(value: AirbyteValue, schema: AirbyteType): Boolean { + val (mappedValue, _) = mapInner(value, schema, Context(nullable = true)) + return mappedValue !is NullValue + } + + private inline fun requireType( + value: AirbyteValue, + schema: AirbyteType, + context: Context, + f: (T) -> Pair = { value to context }, + ): Pair { + return if (value is T) { + f(value) + } else { + nulledOut(schema, context) + } + } + + companion object { + val DATE_TIME_FORMATTER: DateTimeFormatter = + DateTimeFormatter.ofPattern( + "[yyyy][yy]['-']['/']['.'][' '][MMM][MM][M]['-']['/']['.'][' '][dd][d][[' '][G]][[' ']['T']HH:mm[':'ss[.][SSSSSS][SSSSS][SSSS][SSS][' '][z][zzz][Z][O][x][XXX][XX][X][[' '][G]]]]" + ) + val TIME_FORMATTER: DateTimeFormatter = + DateTimeFormatter.ofPattern( + "HH:mm[':'ss[.][SSSSSS][SSSSS][SSSS][SSS][' '][z][zzz][Z][O][x][XXX][XX][X]]" + ) + } +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValueIdentityMapper.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValueIdentityMapper.kt index 4fd7f4d3d037..ade174a12b7a 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValueIdentityMapper.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValueIdentityMapper.kt @@ -65,40 +65,35 @@ open class AirbyteValueIdentityMapper : AirbyteValueMapper { } else try { when (schema) { - is ObjectType -> mapObject(value as ObjectValue, schema, context) + is ObjectType -> mapObject(value, schema, context) is ObjectTypeWithoutSchema -> mapObjectWithoutSchema(value, schema, context) is ObjectTypeWithEmptySchema -> mapObjectWithEmptySchema(value, schema, context) - is ArrayType -> mapArray(value as ArrayValue, schema, context) + is ArrayType -> mapArray(value, schema, context) is ArrayTypeWithoutSchema -> mapArrayWithoutSchema(value, schema, context) is UnionType -> mapUnion(value, schema, context) - is BooleanType -> mapBoolean(value as BooleanValue, context) - is NumberType -> mapNumber(value as NumberValue, context) - is StringType -> mapString(value as StringValue, context) - is IntegerType -> mapInteger(value as IntegerValue, context) + is BooleanType -> mapBoolean(value, context) + is NumberType -> mapNumber(value, context) + is StringType -> mapString(value, context) + is IntegerType -> mapInteger(value, context) is DateType -> mapDate(value, context) - is TimeTypeWithTimezone -> - mapTimeWithTimezone( - value, - context, - ) - is TimeTypeWithoutTimezone -> - mapTimeWithoutTimezone( - value, - context, - ) + is TimeTypeWithTimezone -> mapTimeWithTimezone(value, context) + is TimeTypeWithoutTimezone -> mapTimeWithoutTimezone(value, context) is TimestampTypeWithTimezone -> mapTimestampWithTimezone(value, context) is TimestampTypeWithoutTimezone -> mapTimestampWithoutTimezone(value, context) - is UnknownType -> mapUnknown(value as UnknownValue, context) + is UnknownType -> mapUnknown(value, context) } } catch (e: Exception) { nulledOut(schema, context) } open fun mapObject( - value: ObjectValue, + value: AirbyteValue, schema: ObjectType, context: Context ): Pair { + if (value !is ObjectValue) { + return value to context + } val values = LinkedHashMap() schema.properties.forEach { (name, field) -> values[name] = @@ -125,10 +120,13 @@ open class AirbyteValueIdentityMapper : AirbyteValueMapper { ): Pair = value to context open fun mapArray( - value: ArrayValue, + value: AirbyteValue, schema: ArrayType, context: Context ): Pair { + if (value !is ArrayValue) { + return value to context + } val mapped = value.values.mapIndexed { index, element -> mapInner( @@ -156,16 +154,16 @@ open class AirbyteValueIdentityMapper : AirbyteValueMapper { context: Context ): Pair = value to context - open fun mapBoolean(value: BooleanValue, context: Context): Pair = + open fun mapBoolean(value: AirbyteValue, context: Context): Pair = value to context - open fun mapNumber(value: NumberValue, context: Context): Pair = + open fun mapNumber(value: AirbyteValue, context: Context): Pair = value to context - open fun mapString(value: StringValue, context: Context): Pair = + open fun mapString(value: AirbyteValue, context: Context): Pair = value to context - open fun mapInteger(value: IntegerValue, context: Context): Pair = + open fun mapInteger(value: AirbyteValue, context: Context): Pair = value to context /** @@ -197,6 +195,6 @@ open class AirbyteValueIdentityMapper : AirbyteValueMapper { open fun mapNull(context: Context): Pair = NullValue to context - open fun mapUnknown(value: UnknownValue, context: Context): Pair = + open fun mapUnknown(value: AirbyteValue, context: Context): Pair = value to context } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/NullOutOfRangeIntegers.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/NullOutOfRangeIntegers.kt index f89d5eb9799e..2186019ce46f 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/NullOutOfRangeIntegers.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/NullOutOfRangeIntegers.kt @@ -15,7 +15,8 @@ class NullOutOfRangeIntegers( private val minValue: BigInteger = Long.MIN_VALUE.toBigInteger(), private val maxValue: BigInteger = Long.MAX_VALUE.toBigInteger() ) : AirbyteValueIdentityMapper() { - override fun mapInteger(value: IntegerValue, context: Context): Pair { + override fun mapInteger(value: AirbyteValue, context: Context): Pair { + value as IntegerValue if (value.value < minValue || value.value > maxValue) { return nulledOut( IntegerType, diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/SchemalessTypesToJsonString.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/SchemalessTypesToJsonString.kt index 263539acb521..795477d241f9 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/SchemalessTypesToJsonString.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/SchemalessTypesToJsonString.kt @@ -56,7 +56,7 @@ class SchemalessValuesToJsonString : AirbyteValueIdentityMapper() { context: Context ): Pair = value.toJson().serializeToString().let(::StringValue) to context - override fun mapUnknown(value: UnknownValue, context: Context): Pair = + override fun mapUnknown(value: AirbyteValue, context: Context): Pair = value.toJson().serializeToString().let(::StringValue) to context override fun mapUnion( diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/TimeStringToInteger.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/TimeStringToInteger.kt index a09419f7daf0..d2be041c8778 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/TimeStringToInteger.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/TimeStringToInteger.kt @@ -4,13 +4,11 @@ package io.airbyte.cdk.load.data -import java.time.LocalDate import java.time.LocalDateTime import java.time.LocalTime +import java.time.OffsetDateTime import java.time.OffsetTime import java.time.ZoneOffset -import java.time.ZonedDateTime -import java.time.format.DateTimeFormatter import java.time.temporal.ChronoUnit /** @@ -18,81 +16,55 @@ import java.time.temporal.ChronoUnit * as with timezone, then fall back to without. But in theory we should be more strict. */ class TimeStringToInteger : AirbyteValueIdentityMapper() { - companion object { - val DATE_TIME_FORMATTER: DateTimeFormatter = - DateTimeFormatter.ofPattern( - "[yyyy][yy]['-']['/']['.'][' '][MMM][MM][M]['-']['/']['.'][' '][dd][d][[' '][G]][[' ']['T']HH:mm[':'ss[.][SSSSSS][SSSSS][SSSS][SSS][' '][z][zzz][Z][O][x][XXX][XX][X][[' '][G]]]]" - ) - val TIME_FORMATTER: DateTimeFormatter = - DateTimeFormatter.ofPattern( - "HH:mm[':'ss[.][SSSSSS][SSSSS][SSSS][SSS][' '][z][zzz][Z][O][x][XXX][XX][X]]" - ) - } override fun mapDate(value: AirbyteValue, context: Context): Pair { value as DateValue - val epochDay = LocalDate.parse(value.value, DATE_TIME_FORMATTER).toEpochDay() + val epochDay = value.value.toEpochDay() return IntegerValue(epochDay) to context } - private fun toMicrosOfDayWithTimezone(timeString: String): Long { - val time = OffsetTime.parse(timeString, TIME_FORMATTER) + private fun toMicrosOfDayWithTimezone(time: OffsetTime): Long { return time.withOffsetSameInstant(ZoneOffset.UTC).toLocalTime().toNanoOfDay() / 1_000 } - private fun toMicrosOfDayWithoutTimezone(timeString: String): Long { - val time = LocalTime.parse(timeString, TIME_FORMATTER) + private fun toMicrosOfDayWithoutTimezone(time: LocalTime): Long { return time.toNanoOfDay() / 1_000 } - private fun toMicrosOfDay(timeString: String): Long { - return try { - toMicrosOfDayWithTimezone(timeString) - } catch (e: Exception) { - toMicrosOfDayWithoutTimezone(timeString) - } - } - override fun mapTimeWithTimezone( value: AirbyteValue, context: Context ): Pair = - IntegerValue(toMicrosOfDay((value as TimeValue).value)) to context + IntegerValue(toMicrosOfDayWithTimezone((value as TimeWithTimezoneValue).value)) to context override fun mapTimeWithoutTimezone( value: AirbyteValue, context: Context ): Pair = - IntegerValue(toMicrosOfDay((value as TimeValue).value)) to context + IntegerValue(toMicrosOfDayWithoutTimezone((value as TimeWithoutTimezoneValue).value)) to + context - private fun toEpochMicrosWithTimezone(timestampString: String): Long { - val zdt = ZonedDateTime.parse(timestampString, DATE_TIME_FORMATTER) - return zdt.toInstant().truncatedTo(ChronoUnit.MICROS).toEpochMilli() * 1_000 + - zdt.toInstant().nano / 1_000 % 1_000 + private fun toEpochMicrosWithTimezone(odt: OffsetDateTime): Long { + return odt.toInstant().truncatedTo(ChronoUnit.MICROS).toEpochMilli() * 1_000 + + odt.toInstant().nano / 1_000 % 1_000 } - private fun toEpochMicrosWithoutTimezone(timestampString: String): Long { - val dt = LocalDateTime.parse(timestampString, DATE_TIME_FORMATTER) + private fun toEpochMicrosWithoutTimezone(dt: LocalDateTime): Long { val instant = dt.toInstant(ZoneOffset.UTC) return instant.epochSecond * 1_000_000 + instant.nano / 1_000 } - private fun toEpochMicros(timestampString: String): Long { - return try { - toEpochMicrosWithTimezone(timestampString) - } catch (e: Exception) { - toEpochMicrosWithoutTimezone(timestampString) - } - } - override fun mapTimestampWithTimezone( value: AirbyteValue, context: Context ): Pair = - IntegerValue(toEpochMicros((value as TimestampValue).value)) to context + IntegerValue(toEpochMicrosWithTimezone((value as TimestampWithTimezoneValue).value)) to + context override fun mapTimestampWithoutTimezone( value: AirbyteValue, context: Context ): Pair = - IntegerValue(toEpochMicros((value as TimestampValue).value)) to context + IntegerValue( + toEpochMicrosWithoutTimezone((value as TimestampWithoutTimezoneValue).value) + ) to context } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/AirbyteValueToJson.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/AirbyteValueToJson.kt index 28077dba1b05..88bc4050256f 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/AirbyteValueToJson.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/AirbyteValueToJson.kt @@ -14,7 +14,7 @@ class AirbyteValueToJson { is ArrayValue -> JsonNodeFactory.instance.arrayNode().addAll(value.values.map { convert(it) }) is BooleanValue -> JsonNodeFactory.instance.booleanNode(value.value) - is DateValue -> JsonNodeFactory.instance.textNode(value.value) + is DateValue -> JsonNodeFactory.instance.textNode(value.value.toString()) is IntegerValue -> JsonNodeFactory.instance.numberNode(value.value) is NullValue -> JsonNodeFactory.instance.nullNode() is NumberValue -> JsonNodeFactory.instance.numberNode(value.value) @@ -24,8 +24,12 @@ class AirbyteValueToJson { objNode } is StringValue -> JsonNodeFactory.instance.textNode(value.value) - is TimeValue -> JsonNodeFactory.instance.textNode(value.value) - is TimestampValue -> JsonNodeFactory.instance.textNode(value.value) + is TimeWithTimezoneValue -> JsonNodeFactory.instance.textNode(value.value.toString()) + is TimeWithoutTimezoneValue -> JsonNodeFactory.instance.textNode(value.value.toString()) + is TimestampWithTimezoneValue -> + JsonNodeFactory.instance.textNode(value.value.toString()) + is TimestampWithoutTimezoneValue -> + JsonNodeFactory.instance.textNode(value.value.toString()) is UnknownValue -> value.value } } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/JsonToAirbyteValue.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/JsonToAirbyteValue.kt index 8f08f3017007..aa848f8444a2 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/JsonToAirbyteValue.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/JsonToAirbyteValue.kt @@ -5,202 +5,42 @@ package io.airbyte.cdk.load.data.json import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.JsonNodeType +import com.fasterxml.jackson.databind.node.ObjectNode import io.airbyte.cdk.load.data.* -import io.airbyte.cdk.load.util.serializeToString -import java.math.BigDecimal -import java.math.BigInteger /** - * Converts from json to airbyte value, performing the minimum validation necessary to marshal to a - * native type. For example, we enforce that an integer is either integral or something that can be - * reasonably converted to an integer, but we do not parse dates or timestamps, which can be - * reasonably left as strings. - * - * TODO: In the future, should we be more or less aggressive here? Which keeps parity with existing - * behavior? Which existing behavior should be preserved? + * Naively convert a json node to the equivalent AirbyteValue. Note that this does not match against + * a declared schema; it simply does the most obvious conversion. */ class JsonToAirbyteValue { - fun convert(json: JsonNode, schema: AirbyteType): AirbyteValue { - if (json.isNull) { - return NullValue - } - try { - return when (schema) { - is ArrayType -> toArray(json, schema.items.type) - is ArrayTypeWithoutSchema -> toArrayWithoutSchema(json) - is BooleanType -> toBoolean(json) - is DateType -> DateValue(json.asText()) - is IntegerType -> toInteger(json) - is NumberType -> toNumber(json) - is ObjectType -> toObject(json, schema) - is ObjectTypeWithoutSchema, - is ObjectTypeWithEmptySchema -> toObjectWithoutSchema(json) - is StringType -> toString(json) - is TimeTypeWithTimezone, - is TimeTypeWithoutTimezone -> TimeValue(json.asText()) - is TimestampTypeWithTimezone, - is TimestampTypeWithoutTimezone -> TimestampValue(json.asText()) - is UnionType -> toUnion(json, schema.options) - // If we fail to recognize the schema, just pass the json through directly. - // This enables us to more easily add new types, without breaking compatibility - // within existing connections. - is UnknownType -> fromJson(json) - } - } catch (t: Throwable) { - // In case of any failure, just pass the json through directly. - return fromJson(json) - } - } - - private fun toArray(json: JsonNode, schema: AirbyteType): ArrayValue { - if (!json.isArray) { - throw IllegalArgumentException("Could not convert $json to Array") - } - - return ArrayValue(json.map { convert(it, schema) }) - } - - private fun toArrayWithoutSchema(json: JsonNode): ArrayValue { - if (!json.isArray) { - throw IllegalArgumentException("Could not convert $json to Array") - } - - return ArrayValue(json.map { fromJson(it) }) - } - - private fun toString(json: JsonNode): StringValue { - return if (json.isTextual) { - StringValue(json.asText()) - } else { - StringValue(json.serializeToString()) - } - } - - private fun toBoolean(json: JsonNode): BooleanValue { - val boolVal = - when { - json.isBoolean -> json.asBoolean() - json.isIntegralNumber -> json.asLong() != 0L - json.isFloatingPointNumber -> json.asDouble() != 0.0 - json.isTextual -> json.asText().toBooleanStrict() - else -> throw IllegalArgumentException("Could not convert $json to Boolean") - } - return BooleanValue(boolVal) - } - - private fun toInteger(json: JsonNode): IntegerValue { - val longVal = - when { - json.isBoolean -> if (json.asBoolean()) BigInteger.ONE else BigInteger.ZERO - json.isIntegralNumber -> json.bigIntegerValue() - json.isFloatingPointNumber -> json.bigIntegerValue() - json.isTextual -> json.asText().toBigInteger() - else -> throw IllegalArgumentException("Could not convert $json to Integer") - } - return IntegerValue(longVal) - } - - private fun toNumber(json: JsonNode): NumberValue { - val numVal = - when { - json.isBoolean -> BigDecimal(if (json.asBoolean()) 1.0 else 0.0) - json.isIntegralNumber -> json.decimalValue() - json.isFloatingPointNumber -> json.decimalValue() - json.isTextual -> json.asText().toBigDecimal() - else -> throw IllegalArgumentException("Could not convert $json to Number") + fun convert(json: JsonNode): AirbyteValue { + return when (json.nodeType!!) { + JsonNodeType.NULL, + JsonNodeType.MISSING -> NullValue + JsonNodeType.BOOLEAN -> BooleanValue(json.booleanValue()) + JsonNodeType.NUMBER -> { + if (json.isIntegralNumber) { + IntegerValue(json.bigIntegerValue()) + } else { + NumberValue(json.decimalValue()) + } } - return NumberValue(numVal) - } - - private fun toObject(json: JsonNode, schema: ObjectType): ObjectValue { - if (!json.isObject) { - throw IllegalArgumentException("Could not convert $json to Object") - } - val objectProperties = LinkedHashMap() - json.fields().forEach { (key, value) -> - // TODO: Would it be more correct just to pass undeclared fields through as unknowns? - val type = schema.properties[key]?.type ?: UnknownType(value) - objectProperties[key] = convert(value, type) - } - - return ObjectValue(objectProperties) - } - - private fun toObjectWithoutSchema(json: JsonNode): ObjectValue { - if (!json.isObject) { - throw IllegalArgumentException("Could not convert $json to Object") - } - - return ObjectValue( - values = - json - .fields() - .asSequence() - .map { (name, value) -> name to fromJson(value) } - .toMap(LinkedHashMap()) - ) - } - - private fun toUnion(json: JsonNode, options: Set): AirbyteValue { - val option = - options.find { matchesStrictly(it, json) } - ?: options.find { matchesPermissively(it, json) } - ?: throw IllegalArgumentException( - "No matching union option in $options for $json" - ) - return convert(json, option) - } - - fun fromJson(json: JsonNode): AirbyteValue { - return when { - json.isBoolean -> toBoolean(json) - json.isIntegralNumber -> toInteger(json) - json.isFloatingPointNumber -> toNumber(json) - json.isTextual -> StringValue(json.asText()) - json.isArray -> ArrayValue(json.map { fromJson(it) }) - json.isObject -> + JsonNodeType.STRING -> StringValue(json.textValue()) + JsonNodeType.ARRAY -> ArrayValue(json.map { convert(it) }) + JsonNodeType.OBJECT -> ObjectValue( - json - .fields() - .asSequence() - .map { (name, value) -> name to fromJson(value) } - .toMap(LinkedHashMap()) + (json as ObjectNode).properties().associateTo(linkedMapOf()) { (k, v) -> + k to convert(v) + } ) - json.isNull -> NullValue - else -> UnknownValue(json) - } - } - - private fun matchesStrictly(schema: AirbyteType, json: JsonNode): Boolean { - return when (schema) { - is ArrayType, - is ArrayTypeWithoutSchema -> json.isArray - is BooleanType -> json.isBoolean - is DateType -> json.isTextual - is IntegerType -> json.isIntegralNumber - is NumberType -> json.isNumber - is ObjectType, - is ObjectTypeWithoutSchema, - is ObjectTypeWithEmptySchema -> json.isObject - is StringType -> json.isTextual - is TimeTypeWithTimezone, - is TimeTypeWithoutTimezone, - is TimestampTypeWithTimezone, - is TimestampTypeWithoutTimezone -> json.isTextual - is UnionType -> schema.options.any { matchesStrictly(it, json) } - is UnknownType -> false - } - } - - private fun matchesPermissively(schema: AirbyteType, json: JsonNode): Boolean { - return try { - convert(json, schema) !is UnknownValue - } catch (t: Throwable) { - false + JsonNodeType.POJO, + JsonNodeType.BINARY -> + throw NotImplementedError("Unsupported JsonNode type: ${json.nodeType}") } } } -fun JsonNode.toAirbyteValue(schema: AirbyteType): AirbyteValue { - return JsonToAirbyteValue().convert(this, schema) +fun JsonNode.toAirbyteValue(): AirbyteValue { + return JsonToAirbyteValue().convert(this) } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt index 45592ea65ae0..55efadc0d422 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt @@ -10,6 +10,10 @@ import io.airbyte.cdk.load.command.DestinationCatalog import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.data.AirbyteType import io.airbyte.cdk.load.data.AirbyteValue +import io.airbyte.cdk.load.data.AirbyteValueDeepCoercingMapper +import io.airbyte.cdk.load.data.IntegerValue +import io.airbyte.cdk.load.data.StringValue +import io.airbyte.cdk.load.data.TimestampWithTimezoneValue import io.airbyte.cdk.load.data.json.toAirbyteValue import io.airbyte.cdk.load.message.CheckpointMessage.Checkpoint import io.airbyte.cdk.load.message.CheckpointMessage.Stats @@ -27,6 +31,8 @@ import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage.AirbyteStre import io.airbyte.protocol.models.v0.AirbyteTraceMessage import io.micronaut.context.annotation.Value import jakarta.inject.Singleton +import java.math.BigInteger +import java.time.OffsetDateTime /** * Internal representation of destination messages. These are intended to be specialized for @@ -61,6 +67,43 @@ data class Meta( COLUMN_NAME_AB_META, COLUMN_NAME_AB_GENERATION_ID, ) + + fun getMetaValue(metaColumnName: String, value: String): AirbyteValue { + if (!COLUMN_NAMES.contains(metaColumnName)) { + throw IllegalArgumentException("Invalid meta column name: $metaColumnName") + } + fun toObjectValue(value: JsonNode): AirbyteValue { + if (value.isTextual) { + return toObjectValue(value.textValue().deserializeToNode()) + } + return value.toAirbyteValue() + } + return when (metaColumnName) { + COLUMN_NAME_AB_RAW_ID -> StringValue(value) + COLUMN_NAME_AB_EXTRACTED_AT -> { + // Some destinations represent extractedAt as a long epochMillis, + // and others represent it as a timestamp string. + // Handle both cases here. + try { + IntegerValue(BigInteger(value)) + } catch (e: Exception) { + TimestampWithTimezoneValue( + OffsetDateTime.parse( + value, + AirbyteValueDeepCoercingMapper.DATE_TIME_FORMATTER + ) + ) + } + } + COLUMN_NAME_AB_META -> toObjectValue(value.deserializeToNode()) + COLUMN_NAME_AB_GENERATION_ID -> IntegerValue(BigInteger(value)) + COLUMN_NAME_DATA -> toObjectValue(value.deserializeToNode()) + else -> + throw NotImplementedError( + "Column name $metaColumnName is not yet supported. This is probably a bug." + ) + } + } } fun asProtocolObject(): AirbyteRecordMessageMeta = @@ -91,7 +134,7 @@ data class DestinationRecord( fun asRecordMarshaledToAirbyteValue(): DestinationRecordAirbyteValue { return DestinationRecordAirbyteValue( stream, - message.record.data.toAirbyteValue(schema), + message.record.data.toAirbyteValue(), message.record.emittedAt, Meta( message.record.meta?.changes?.map { Meta.Change(it.field, it.change, it.reason) } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/util/TimeStringUtility.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/util/TimeStringUtility.kt index 910d1ff2dd05..e0f87e5cbd55 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/util/TimeStringUtility.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/util/TimeStringUtility.kt @@ -4,7 +4,7 @@ package io.airbyte.cdk.load.util -import io.airbyte.cdk.load.data.TimeStringToInteger +import io.airbyte.cdk.load.data.AirbyteValueDeepCoercingMapper import java.time.LocalDate import java.time.LocalDateTime import java.time.LocalTime @@ -17,11 +17,11 @@ import java.time.ZonedDateTime object TimeStringUtility { fun toLocalDate(dateString: String): LocalDate { - return LocalDate.parse(dateString, TimeStringToInteger.DATE_TIME_FORMATTER) + return LocalDate.parse(dateString, AirbyteValueDeepCoercingMapper.DATE_TIME_FORMATTER) } fun toLocalDateTime(dateString: String): LocalDateTime { - return LocalDateTime.parse(dateString, TimeStringToInteger.DATE_TIME_FORMATTER) + return LocalDateTime.parse(dateString, AirbyteValueDeepCoercingMapper.DATE_TIME_FORMATTER) } fun toOffset(timeString: String): LocalTime { @@ -33,11 +33,12 @@ object TimeStringUtility { } private fun toMicrosOfDayWithTimezone(timeString: String): LocalTime { - return OffsetTime.parse(timeString, TimeStringToInteger.TIME_FORMATTER).toLocalTime() + return OffsetTime.parse(timeString, AirbyteValueDeepCoercingMapper.TIME_FORMATTER) + .toLocalTime() } private fun toMicrosOfDayWithoutTimezone(timeString: String): LocalTime { - return LocalTime.parse(timeString, TimeStringToInteger.TIME_FORMATTER) + return LocalTime.parse(timeString, AirbyteValueDeepCoercingMapper.TIME_FORMATTER) } fun toOffsetDateTime(timestampString: String): OffsetDateTime { @@ -49,12 +50,18 @@ object TimeStringUtility { } private fun toOffsetDateTimeWithTimezone(timestampString: String): OffsetDateTime { - return ZonedDateTime.parse(timestampString, TimeStringToInteger.DATE_TIME_FORMATTER) + return ZonedDateTime.parse( + timestampString, + AirbyteValueDeepCoercingMapper.DATE_TIME_FORMATTER + ) .toOffsetDateTime() } private fun toOffsetDateTimeWithoutTimezone(timestampString: String): OffsetDateTime { - return LocalDateTime.parse(timestampString, TimeStringToInteger.DATE_TIME_FORMATTER) + return LocalDateTime.parse( + timestampString, + AirbyteValueDeepCoercingMapper.DATE_TIME_FORMATTER + ) .atOffset(ZoneOffset.UTC) } } diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/AirbyteValueDeepCoercingMapperTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/AirbyteValueDeepCoercingMapperTest.kt new file mode 100644 index 000000000000..337992809718 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/AirbyteValueDeepCoercingMapperTest.kt @@ -0,0 +1,290 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.data + +import io.airbyte.cdk.load.data.AirbyteValueDeepCoercingMapper.Companion.DATE_TIME_FORMATTER +import io.airbyte.cdk.load.data.json.toAirbyteValue +import io.airbyte.cdk.load.message.Meta +import io.airbyte.cdk.load.util.Jsons +import java.math.BigDecimal +import java.time.LocalDate +import java.time.LocalDateTime +import java.time.LocalTime +import java.time.OffsetDateTime +import java.time.OffsetTime +import java.time.ZoneOffset +import java.time.ZonedDateTime +import org.junit.jupiter.api.Assertions.assertAll +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test + +class AirbyteValueDeepCoercingMapperTest { + private val mapper = AirbyteValueDeepCoercingMapper() + + @Test + fun testBasicCoerce() { + val (mappedValue, changes) = + mapper.map( + Jsons.readTree( + """ + { + "undeclared": 42, + "null": null, + "string": "foo", + "boolean": true, + "integer": 42, + "number": 42.1, + "date": "2024-01-23", + "timestamptz": "2024-01-23T01:23:45Z", + "timestampntz": "2024-01-23T01:23:45", + "timetz": "01:23:45Z", + "timentz": "01:23:45", + "array": [1, 2, 3], + "array_schemaless": [1, true, "foo"], + "object": {"foo": 42}, + "object_schemaless": {"foo": 42}, + "object_empty": {"foo": 42} + } + """.trimIndent() + ) + .toAirbyteValue(), + ObjectType( + linkedMapOf( + "null" to f(IntegerType), + "string" to f(StringType), + "boolean" to f(BooleanType), + "integer" to f(IntegerType), + "number" to f(NumberType), + "date" to f(DateType), + "timestamptz" to f(TimestampTypeWithTimezone), + "timestampntz" to f(TimestampTypeWithoutTimezone), + "timetz" to f(TimeTypeWithTimezone), + "timentz" to f(TimeTypeWithoutTimezone), + "array" to f(ArrayType(f(IntegerType))), + "array_schemaless" to f(ArrayTypeWithoutSchema), + "object" to f(ObjectType(linkedMapOf("foo" to f(IntegerType)))), + "object_schemaless" to f(ObjectTypeWithoutSchema), + "object_empty" to f(ObjectTypeWithEmptySchema), + ) + ), + ) + assertAll( + { + assertEquals( + ObjectValue( + linkedMapOf( + // note that the undeclared field is now gone + "null" to NullValue, + "string" to StringValue("foo"), + "boolean" to BooleanValue(true), + "integer" to IntegerValue(42), + "number" to NumberValue(BigDecimal("42.1")), + "date" to DateValue(LocalDate.parse("2024-01-23")), + "timestamptz" to + TimestampWithTimezoneValue( + OffsetDateTime.parse("2024-01-23T01:23:45Z") + ), + "timestampntz" to + TimestampWithoutTimezoneValue( + LocalDateTime.parse("2024-01-23T01:23:45") + ), + "timetz" to TimeWithTimezoneValue(OffsetTime.parse("01:23:45Z")), + "timentz" to TimeWithoutTimezoneValue(LocalTime.parse("01:23:45")), + "array" to + ArrayValue( + listOf(IntegerValue(1), IntegerValue(2), IntegerValue(3)) + ), + "array_schemaless" to + ArrayValue( + listOf(IntegerValue(1), BooleanValue(true), StringValue("foo")) + ), + "object" to ObjectValue(linkedMapOf("foo" to IntegerValue(42))), + "object_schemaless" to + ObjectValue(linkedMapOf("foo" to IntegerValue(42))), + "object_empty" to ObjectValue(linkedMapOf("foo" to IntegerValue(42))), + ) + ), + mappedValue + ) + }, + { assertEquals(emptyList(), changes) }, + ) + } + + @Test + fun testCoerceDate() { + listOf( + "2021-1-1", + "2021-01-01", + "2021/01/02", + "2021.01.03", + "2021 Jan 04", + "2021-1-1 BC", + ) + .map { it to LocalDate.parse(it, DATE_TIME_FORMATTER) } + .forEach { (input, localDate) -> + val (value, changes) = mapper.map(StringValue(input), DateType) + assertAll( + "Failed for input $input", + { assertEquals(DateValue(localDate), value) }, + { assertEquals(emptyList(), changes) } + ) + } + } + + private val timestampPairs: List> = + listOf( + "2018-09-15 12:00:00" to + LocalDateTime.parse("2018-09-15 12:00:00", DATE_TIME_FORMATTER) + .atOffset(ZoneOffset.UTC), + "2018-09-15 12:00:00.006542" to + LocalDateTime.parse("2018-09-15 12:00:00.006542", DATE_TIME_FORMATTER) + .atOffset(ZoneOffset.UTC), + "2018/09/15 12:00:00" to + LocalDateTime.parse("2018/09/15 12:00:00", DATE_TIME_FORMATTER) + .atOffset(ZoneOffset.UTC), + "2018.09.15 12:00:00" to + LocalDateTime.parse("2018.09.15 12:00:00", DATE_TIME_FORMATTER) + .atOffset(ZoneOffset.UTC), + "2018 Jul 15 12:00:00" to + LocalDateTime.parse("2018 Jul 15 12:00:00", DATE_TIME_FORMATTER) + .atOffset(ZoneOffset.UTC), + "2021-1-1 01:01:01" to + LocalDateTime.parse("2021-1-1 01:01:01", DATE_TIME_FORMATTER) + .atOffset(ZoneOffset.UTC), + "2021.1.1 01:01:01" to + LocalDateTime.parse("2021.1.1 01:01:01", DATE_TIME_FORMATTER) + .atOffset(ZoneOffset.UTC), + "2021/1/1 01:01:01" to + LocalDateTime.parse("2021/1/1 01:01:01", DATE_TIME_FORMATTER) + .atOffset(ZoneOffset.UTC), + "2021-01-01 01:01:01" to + LocalDateTime.parse("2021-01-01 01:01:01", DATE_TIME_FORMATTER) + .atOffset(ZoneOffset.UTC), + "2021-1-1 01:01:01 +01" to + ZonedDateTime.parse("2021-1-1 01:01:01 +01", DATE_TIME_FORMATTER) + .toOffsetDateTime(), + "2018 Jul 15 12:00:00 GMT+08:00" to + ZonedDateTime.parse("2018 Jul 15 12:00:00 GMT+08:00", DATE_TIME_FORMATTER) + .toOffsetDateTime(), + "2018 Jul 15 12:00:00GMT+07" to + ZonedDateTime.parse("2018 Jul 15 12:00:00GMT+07", DATE_TIME_FORMATTER) + .toOffsetDateTime(), + "2021-01-01T01:01:01+01:00" to + ZonedDateTime.parse("2021-01-01T01:01:01+01:00", DATE_TIME_FORMATTER) + .toOffsetDateTime(), + "2021-01-01T01:01:01.546+01:00" to + ZonedDateTime.parse("2021-01-01T01:01:01.546+01:00", DATE_TIME_FORMATTER) + .toOffsetDateTime(), + "2021-01-01 01:01:01 +0000" to + ZonedDateTime.parse("2021-01-01 01:01:01 +0000", DATE_TIME_FORMATTER) + .toOffsetDateTime(), + "2021/01/01 01:01:01 +0000" to + ZonedDateTime.parse("2021/01/01 01:01:01 +0000", DATE_TIME_FORMATTER) + .toOffsetDateTime(), + "2021-01-01T01:01:01Z" to + ZonedDateTime.parse("2021-01-01T01:01:01Z", DATE_TIME_FORMATTER).toOffsetDateTime(), + "2021-01-01T01:01:01-01:00" to + ZonedDateTime.parse("2021-01-01T01:01:01-01:00", DATE_TIME_FORMATTER) + .toOffsetDateTime(), + "2021-01-01T01:01:01+01:00" to + ZonedDateTime.parse("2021-01-01T01:01:01+01:00", DATE_TIME_FORMATTER) + .toOffsetDateTime(), + "2021-01-01 01:01:01 UTC" to + ZonedDateTime.parse("2021-01-01 01:01:01 UTC", DATE_TIME_FORMATTER) + .toOffsetDateTime(), + "2021-01-01T01:01:01 PST" to + ZonedDateTime.parse("2021-01-01T01:01:01 PST", DATE_TIME_FORMATTER) + .toOffsetDateTime(), + "2021-01-01T01:01:01 +0000" to + ZonedDateTime.parse("2021-01-01T01:01:01 +0000", DATE_TIME_FORMATTER) + .toOffsetDateTime(), + "2021-01-01T01:01:01+0000" to + ZonedDateTime.parse("2021-01-01T01:01:01+0000", DATE_TIME_FORMATTER) + .toOffsetDateTime(), + "2021-01-01T01:01:01UTC" to + ZonedDateTime.parse("2021-01-01T01:01:01UTC", DATE_TIME_FORMATTER) + .toOffsetDateTime(), + "2021-01-01T01:01:01+01" to + ZonedDateTime.parse("2021-01-01T01:01:01+01", DATE_TIME_FORMATTER) + .toOffsetDateTime(), + "2022-01-23T01:23:45.678-11:30 BC" to + ZonedDateTime.parse("2022-01-23T01:23:45.678-11:30 BC", DATE_TIME_FORMATTER) + .toOffsetDateTime(), + "2022-01-23T01:23:45.678-11:30" to + ZonedDateTime.parse("2022-01-23T01:23:45.678-11:30", DATE_TIME_FORMATTER) + .toOffsetDateTime(), + ) + + @Test + fun testCoerceTimestampWithTimezone() { + timestampPairs.forEach { (input, offsetDateTime) -> + val (value, changes) = mapper.map(StringValue(input), TimestampTypeWithTimezone) + + assertAll( + "Failed for input $input", + { assertEquals(TimestampWithTimezoneValue(offsetDateTime), value) }, + { assertEquals(emptyList(), changes) } + ) + } + } + + @Test + fun testCoerceTimestampWithoutTimezone() { + timestampPairs.forEach { (input, offsetDateTime) -> + val (value, changes) = mapper.map(StringValue(input), TimestampTypeWithoutTimezone) + + assertAll( + "Failed for input $input", + { + assertEquals( + TimestampWithoutTimezoneValue(offsetDateTime.toLocalDateTime()), + value + ) + }, + { assertEquals(emptyList(), changes) } + ) + } + } + + private val timePairs: List> = + listOf( + "01:01:01" to LocalTime.parse("01:01:01").atOffset(ZoneOffset.UTC), + "01:01" to LocalTime.parse("01:01").atOffset(ZoneOffset.UTC), + "12:23:01.541" to LocalTime.parse("12:23:01.541").atOffset(ZoneOffset.UTC), + "12:23:01.541214" to LocalTime.parse("12:23:01.541214").atOffset(ZoneOffset.UTC), + "12:00:00.000000+01:00" to OffsetTime.parse("12:00:00.000000+01:00"), + "10:00:00.000000-01:00" to OffsetTime.parse("10:00:00.000000-01:00"), + "03:30:00.000000+04:00" to OffsetTime.parse("03:30:00.000000+04:00"), + ) + + @Test + fun testCoerceTimeWithTimezone() { + timePairs.forEach { (input, offsetTime) -> + val (value, changes) = mapper.map(StringValue(input), TimeTypeWithTimezone) + + assertAll( + "Failed for input $input", + { assertEquals(TimeWithTimezoneValue(offsetTime), value) }, + { assertEquals(emptyList(), changes) } + ) + } + } + + @Test + fun testCoerceTimeWithoutTimezone() { + timePairs.forEach { (input, offsetTime) -> + val (value, changes) = mapper.map(StringValue(input), TimeTypeWithoutTimezone) + + assertAll( + "Failed for input $input", + { assertEquals(TimeWithoutTimezoneValue(offsetTime.toLocalTime()), value) }, + { assertEquals(emptyList(), changes) } + ) + } + } + + private fun f(type: AirbyteType) = FieldType(type, nullable = true) +} diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/AirbyteValueIdentityMapperTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/AirbyteValueIdentityMapperTest.kt index ea9f17d5e480..17a7bc1994bf 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/AirbyteValueIdentityMapperTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/AirbyteValueIdentityMapperTest.kt @@ -4,12 +4,13 @@ package io.airbyte.cdk.load.data +import io.airbyte.cdk.load.message.Meta import io.airbyte.cdk.load.test.util.Root import io.airbyte.cdk.load.test.util.SchemaRecordBuilder import io.airbyte.cdk.load.test.util.ValueTestBuilder -import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll class AirbyteValueIdentityMapperTest { @Test @@ -19,10 +20,13 @@ class AirbyteValueIdentityMapperTest { .with(StringValue("a"), StringType) .with(IntegerValue(1), IntegerType) .with(BooleanValue(true), BooleanType) - .with(TimestampValue("2021-01-01T12:00:00Z"), TimestampTypeWithTimezone) - .with(TimestampValue("2021-01-01T12:00:00"), TimestampTypeWithoutTimezone) - .with(TimeValue("12:00:00Z"), TimeTypeWithTimezone) - .with(TimeValue("12:00:00"), TimeTypeWithoutTimezone) + .with(TimestampWithTimezoneValue("2021-01-01T12:00:00Z"), TimestampTypeWithTimezone) + .with( + TimestampWithoutTimezoneValue("2021-01-01T12:00:00"), + TimestampTypeWithoutTimezone + ) + .with(TimeWithTimezoneValue("12:00:00Z"), TimeTypeWithTimezone) + .with(TimeWithoutTimezoneValue("12:00:00"), TimeTypeWithoutTimezone) .with(DateValue("2021-01-01"), DateType) .withRecord() .with( @@ -53,13 +57,9 @@ class AirbyteValueIdentityMapperTest { .build() val mapper = AirbyteValueIdentityMapper() val (values, changes) = mapper.map(inputValues, inputSchema) - Assertions.assertTrue(changes.isNotEmpty()) - Assertions.assertTrue((values as ObjectValue).values["bad"] is NullValue) - Assertions.assertTrue(changes[0].field == "bad") - Assertions.assertTrue(changes[0].change == AirbyteRecordMessageMetaChange.Change.NULLED) - Assertions.assertTrue( - changes[0].reason == - AirbyteRecordMessageMetaChange.Reason.DESTINATION_SERIALIZATION_ERROR + assertAll( + { Assertions.assertEquals(emptyList(), changes) }, + { Assertions.assertEquals(IntegerValue(1000), (values as ObjectValue).values["bad"]) }, ) } } diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/MapperPipelineTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/MapperPipelineTest.kt index 2e22078cf96c..3d38c3691815 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/MapperPipelineTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/MapperPipelineTest.kt @@ -35,10 +35,10 @@ class MapperPipelineTest { class TurnIntegersIntoStrings : AirbyteValueIdentityMapper() { override fun mapInteger( - value: IntegerValue, + value: AirbyteValue, context: Context ): Pair { - if (value.value.toLong() == 2L) { + if ((value as IntegerValue).value.toLong() == 2L) { throw IllegalStateException("Arbitrarily reject 2") } return StringValue(value.value.toString()) to context diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/SchemalessTypesToJsonStringTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/SchemalessTypesToJsonStringTest.kt index 17652ce39f53..7d91ffc8db85 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/SchemalessTypesToJsonStringTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/SchemalessTypesToJsonStringTest.kt @@ -29,7 +29,7 @@ class SchemalessTypesToJsonStringTest { StringValue("""{"foo":"bar"}""") ) .with( - addressJson.deserializeToNode().toAirbyteValue(ObjectTypeWithoutSchema), + addressJson.deserializeToNode().toAirbyteValue(), ObjectTypeWithEmptySchema, StringValue(addressJson) ) diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/TimeStringToIntegerTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/TimeStringToIntegerTest.kt index f51a006d7797..7c8fc1728aa0 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/TimeStringToIntegerTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/TimeStringToIntegerTest.kt @@ -12,127 +12,65 @@ class TimeStringToIntegerTest { @Test fun testMapDate() { val mapper = TimeStringToInteger() - listOf( - "2021-1-1" to 18628, - "2021-01-01" to 18628, - "2021/01/02" to 18629, - "2021.01.03" to 18630, - "2021 Jan 04" to 18631, - "2021-1-1 BC" to -1457318 - ) - .forEach { - Assertions.assertEquals( - IntegerValue(it.second.toLong()), - mapper.mapDate(DateValue(it.first), AirbyteValueIdentityMapper.Context()).first, - "Failed for ${it.first} to ${it.second}" - ) - } - } - - private val timestampPairs = - listOf( - "2018-09-15 12:00:00" to 1537012800000000, - "2018-09-15 12:00:00.006542" to 1537012800006542, - "2018/09/15 12:00:00" to 1537012800000000, - "2018.09.15 12:00:00" to 1537012800000000, - "2018 Jul 15 12:00:00" to 1531656000000000, - "2018 Jul 15 12:00:00 GMT+08:00" to 1531627200000000, - "2018 Jul 15 12:00:00GMT+07" to 1531630800000000, - "2021-1-1 01:01:01" to 1609462861000000, - "2021.1.1 01:01:01" to 1609462861000000, - "2021/1/1 01:01:01" to 1609462861000000, - "2021-1-1 01:01:01 +01" to 1609459261000000, - "2021-01-01T01:01:01+01:00" to 1609459261000000, - "2021-01-01T01:01:01.546+01:00" to 1609459261546000, - "2021-01-01 01:01:01" to 1609462861000000, - "2021-01-01 01:01:01 +0000" to 1609462861000000, - "2021/01/01 01:01:01 +0000" to 1609462861000000, - "2021-01-01T01:01:01Z" to 1609462861000000, - "2021-01-01T01:01:01-01:00" to 1609466461000000, - "2021-01-01T01:01:01+01:00" to 1609459261000000, - "2021-01-01 01:01:01 UTC" to 1609462861000000, - "2021-01-01T01:01:01 PST" to 1609491661000000, - "2021-01-01T01:01:01 +0000" to 1609462861000000, - "2021-01-01T01:01:01+0000" to 1609462861000000, - "2021-01-01T01:01:01UTC" to 1609462861000000, - "2021-01-01T01:01:01+01" to 1609459261000000, - "2022-01-23T01:23:45.678-11:30 BC" to -125941863974322000, - "2022-01-23T01:23:45.678-11:30" to 1642942425678000 + Assertions.assertEquals( + IntegerValue(18628), + mapper.mapDate(DateValue("2021-01-01"), AirbyteValueIdentityMapper.Context()).first, ) + } @Test fun testMapTimestampWithTimezone() { val mapper = TimeStringToInteger() - timestampPairs.forEach { - Assertions.assertEquals( - IntegerValue(it.second), - mapper - .mapTimestampWithTimezone( - TimestampValue(it.first), - AirbyteValueIdentityMapper.Context() - ) - .first, - "Failed for ${it.first} to ${it.second}" - ) - } + Assertions.assertEquals( + IntegerValue(1609462861000000), + mapper + .mapTimestampWithTimezone( + TimestampWithTimezoneValue("2021-01-01T01:01:01Z"), + AirbyteValueIdentityMapper.Context() + ) + .first, + ) } @Test fun testMapTimestampWithoutTimezone() { val mapper = TimeStringToInteger() - timestampPairs.forEach { - Assertions.assertEquals( - IntegerValue(it.second), - mapper - .mapTimestampWithoutTimezone( - TimestampValue(it.first), - AirbyteValueIdentityMapper.Context() - ) - .first, - "Failed for ${it.first} to ${it.second}" - ) - } - } - - private val timePairs = - listOf( - "01:01:01" to 3661000000, - "01:01" to 3660000000, - "12:23:01.541" to 44581541000, - "12:23:01.541214" to 44581541214, - "12:00:00.000000+01:00" to 39600000000, - "10:00:00.000000-01:00" to 39600000000, - "03:30:00.000000+04:00" to 84600000000 + Assertions.assertEquals( + IntegerValue(1537012800000000), + mapper + .mapTimestampWithoutTimezone( + TimestampWithoutTimezoneValue("2018-09-15T12:00:00"), + AirbyteValueIdentityMapper.Context() + ) + .first, ) + } @Test fun testTimeWithTimezone() { val mapper = TimeStringToInteger() - timePairs.forEach { - Assertions.assertEquals( - IntegerValue(it.second), - mapper - .mapTimeWithTimezone(TimeValue(it.first), AirbyteValueIdentityMapper.Context()) - .first, - "Failed for ${it.first} to ${it.second}" - ) - } + Assertions.assertEquals( + IntegerValue(39600000000), + mapper + .mapTimeWithTimezone( + TimeWithTimezoneValue("12:00:00.000000+01:00"), + AirbyteValueIdentityMapper.Context() + ) + .first, + ) } @Test fun testTimeWithoutTimezone() { val mapper = TimeStringToInteger() - timePairs.forEach { - Assertions.assertEquals( - IntegerValue(it.second), - mapper - .mapTimeWithoutTimezone( - TimeValue(it.first), - AirbyteValueIdentityMapper.Context() - ) - .first, - "Failed for ${it.first} to ${it.second}" - ) - } + Assertions.assertEquals( + IntegerValue(3661000000), + mapper + .mapTimeWithoutTimezone( + TimeWithoutTimezoneValue("01:01:01"), + AirbyteValueIdentityMapper.Context() + ) + .first, + ) } } diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/json/AirbyteValueToJsonTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/json/AirbyteValueToJsonTest.kt index 2c15184c5546..17a7e7f43f34 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/json/AirbyteValueToJsonTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/json/AirbyteValueToJsonTest.kt @@ -4,28 +4,12 @@ package io.airbyte.cdk.load.data.json -import io.airbyte.cdk.load.data.ArrayType import io.airbyte.cdk.load.data.ArrayValue -import io.airbyte.cdk.load.data.BooleanType import io.airbyte.cdk.load.data.BooleanValue -import io.airbyte.cdk.load.data.DateType -import io.airbyte.cdk.load.data.DateValue -import io.airbyte.cdk.load.data.FieldType -import io.airbyte.cdk.load.data.IntegerType import io.airbyte.cdk.load.data.IntegerValue -import io.airbyte.cdk.load.data.NumberType import io.airbyte.cdk.load.data.NumberValue -import io.airbyte.cdk.load.data.ObjectType import io.airbyte.cdk.load.data.ObjectValue -import io.airbyte.cdk.load.data.StringType import io.airbyte.cdk.load.data.StringValue -import io.airbyte.cdk.load.data.TimeTypeWithTimezone -import io.airbyte.cdk.load.data.TimeTypeWithoutTimezone -import io.airbyte.cdk.load.data.TimeValue -import io.airbyte.cdk.load.data.TimestampTypeWithTimezone -import io.airbyte.cdk.load.data.TimestampTypeWithoutTimezone -import io.airbyte.cdk.load.data.TimestampValue -import io.airbyte.cdk.load.data.UnionType import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test @@ -47,55 +31,10 @@ class AirbyteValueToJsonTest { "city" to StringValue("San Francisco") ) ), - "union" to StringValue("hello"), - "combined_denormalized" to - ObjectValue(linkedMapOf("name" to StringValue("hello"))), - "union_array" to ArrayValue(listOf(StringValue("hello"), IntegerValue(42))), - "date" to DateValue("2021-01-01"), - "time" to TimeValue("12:00:00"), - "timestamp" to TimestampValue("2021-01-01T12:00:00Z"), - "time_without_timezone" to TimeValue("12:00:00"), - "timestamp_without_timezone" to TimestampValue("2021-01-01T12:00:00") - ) - ) - val schema = - ObjectType( - linkedMapOf( - "name" to FieldType(StringType, true), - "age" to FieldType(IntegerType, false), - "is_cool" to FieldType(BooleanType, false), - "height" to FieldType(NumberType, false), - "friends" to FieldType(ArrayType(FieldType(StringType, true)), false), - "address" to - FieldType( - ObjectType( - linkedMapOf( - "street" to FieldType(StringType, true), - "city" to FieldType(StringType, true) - ) - ), - false - ), - "union" to FieldType(UnionType.of(StringType, IntegerType), true), - "combined_denormalized" to - FieldType( - ObjectType(linkedMapOf("name" to FieldType(StringType, true))), - false - ), - "union_array" to - FieldType( - ArrayType(FieldType(UnionType.of(StringType, IntegerType), true)), - true - ), - "date" to FieldType(DateType, false), - "time" to FieldType(TimeTypeWithoutTimezone, false), - "timestamp" to FieldType(TimestampTypeWithoutTimezone, false), - "time_without_timezone" to FieldType(TimeTypeWithTimezone, false), - "timestamp_without_timezone" to FieldType(TimestampTypeWithTimezone, false) ) ) val jsonValue = AirbyteValueToJson().convert(airbyteValue) - val roundTripValue = JsonToAirbyteValue().convert(jsonValue, schema) + val roundTripValue = JsonToAirbyteValue().convert(jsonValue) Assertions.assertEquals(airbyteValue, roundTripValue) } diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/json/JsonToAirbyteValueTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/json/JsonToAirbyteValueTest.kt index 423b357b898a..dfddd5e9854b 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/json/JsonToAirbyteValueTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/json/JsonToAirbyteValueTest.kt @@ -5,30 +5,12 @@ package io.airbyte.cdk.load.data.json import com.fasterxml.jackson.databind.node.JsonNodeFactory -import io.airbyte.cdk.load.data.ArrayType -import io.airbyte.cdk.load.data.ArrayTypeWithoutSchema import io.airbyte.cdk.load.data.ArrayValue -import io.airbyte.cdk.load.data.BooleanType import io.airbyte.cdk.load.data.BooleanValue -import io.airbyte.cdk.load.data.DateType -import io.airbyte.cdk.load.data.DateValue -import io.airbyte.cdk.load.data.FieldType -import io.airbyte.cdk.load.data.IntegerType import io.airbyte.cdk.load.data.IntegerValue -import io.airbyte.cdk.load.data.NumberType import io.airbyte.cdk.load.data.NumberValue -import io.airbyte.cdk.load.data.ObjectType -import io.airbyte.cdk.load.data.ObjectTypeWithEmptySchema -import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema import io.airbyte.cdk.load.data.ObjectValue -import io.airbyte.cdk.load.data.StringType import io.airbyte.cdk.load.data.StringValue -import io.airbyte.cdk.load.data.TimeTypeWithTimezone -import io.airbyte.cdk.load.data.TimeValue -import io.airbyte.cdk.load.data.TimestampTypeWithTimezone -import io.airbyte.cdk.load.data.TimestampValue -import io.airbyte.cdk.load.data.UnionType -import io.airbyte.cdk.load.util.deserializeToNode import java.math.BigDecimal import java.math.BigInteger import org.junit.jupiter.api.Assertions @@ -38,44 +20,37 @@ class JsonToAirbyteValueTest { @Test fun testString() { - val value = - JsonToAirbyteValue().convert(JsonNodeFactory.instance.textNode("hello"), StringType) + val value = JsonToAirbyteValue().convert(JsonNodeFactory.instance.textNode("hello")) Assertions.assertTrue(value is StringValue) Assertions.assertEquals("hello", (value as StringValue).value) } @Test fun testBoolean() { - val value = - JsonToAirbyteValue().convert(JsonNodeFactory.instance.booleanNode(true), BooleanType) + val value = JsonToAirbyteValue().convert(JsonNodeFactory.instance.booleanNode(true)) Assertions.assertTrue(value is BooleanValue) Assertions.assertEquals(true, (value as BooleanValue).value) } @Test fun testInteger() { - val value = - JsonToAirbyteValue().convert(JsonNodeFactory.instance.numberNode(42), IntegerType) + val value = JsonToAirbyteValue().convert(JsonNodeFactory.instance.numberNode(42)) Assertions.assertTrue(value is IntegerValue) Assertions.assertEquals(BigInteger.valueOf(42), (value as IntegerValue).value) } @Test fun testNumber() { - val value = - JsonToAirbyteValue().convert(JsonNodeFactory.instance.numberNode(42), NumberType) + val value = JsonToAirbyteValue().convert(JsonNodeFactory.instance.numberNode(42.1)) Assertions.assertTrue(value is NumberValue) - Assertions.assertEquals(BigDecimal(42), (value as NumberValue).value) + Assertions.assertEquals(BigDecimal("42.1"), (value as NumberValue).value) } @Test fun testArray() { val value = JsonToAirbyteValue() - .convert( - JsonNodeFactory.instance.arrayNode().add("hello").add("world"), - ArrayType(FieldType(StringType, true)) - ) + .convert(JsonNodeFactory.instance.arrayNode().add("hello").add("world")) Assertions.assertTrue(value is ArrayValue) val arrayValue = value as ArrayValue Assertions.assertEquals(2, arrayValue.values.size) @@ -85,119 +60,14 @@ class JsonToAirbyteValueTest { Assertions.assertEquals("world", (arrayValue.values[1] as StringValue).value) } - @Test - fun testArrayWithoutSchema() { - val value = - JsonToAirbyteValue() - .convert( - JsonNodeFactory.instance.arrayNode().add("hello").add("world"), - ArrayTypeWithoutSchema - ) - Assertions.assertTrue(value is ArrayValue, "Expected ArrayValue, got $value") - val arrayValue = value as ArrayValue - Assertions.assertEquals(2, arrayValue.values.size) - Assertions.assertTrue(arrayValue.values[0] is StringValue) - Assertions.assertEquals("hello", (arrayValue.values[0] as StringValue).value) - Assertions.assertTrue(arrayValue.values[1] is StringValue) - Assertions.assertEquals("world", (arrayValue.values[1] as StringValue).value) - } - @Test fun testObject() { val value = - JsonToAirbyteValue() - .convert( - JsonNodeFactory.instance.objectNode().put("name", "world"), - ObjectType(linkedMapOf("name" to FieldType(StringType, true))) - ) + JsonToAirbyteValue().convert(JsonNodeFactory.instance.objectNode().put("name", "world")) Assertions.assertTrue(value is ObjectValue) val objectValue = value as ObjectValue Assertions.assertEquals(1, objectValue.values.size) Assertions.assertTrue(objectValue.values["name"] is StringValue) Assertions.assertEquals("world", (objectValue.values["name"] as StringValue).value) } - - @Test - fun testObjectWithoutSchema() { - listOf(ObjectTypeWithoutSchema, ObjectTypeWithEmptySchema).forEach { - val value = - JsonToAirbyteValue() - .convert(JsonNodeFactory.instance.objectNode().put("name", "world"), it) - Assertions.assertTrue(value is ObjectValue) - val objectValue = value as ObjectValue - Assertions.assertEquals(1, objectValue.values.size) - Assertions.assertTrue(objectValue.values["name"] is StringValue) - Assertions.assertEquals("world", (objectValue.values["name"] as StringValue).value) - } - } - - @Test - fun testUnion() { - val stringValue = - JsonToAirbyteValue() - .convert( - JsonNodeFactory.instance.textNode("hello"), - UnionType.of(StringType, IntegerType) - ) - Assertions.assertTrue(stringValue is StringValue) - Assertions.assertEquals("hello", (stringValue as StringValue).value) - - val intValue = - JsonToAirbyteValue() - .convert( - JsonNodeFactory.instance.numberNode(42), - UnionType.of(StringType, IntegerType) - ) - Assertions.assertTrue(intValue is IntegerValue) - Assertions.assertEquals(BigInteger.valueOf(42), (intValue as IntegerValue).value) - } - - @Test - fun testDate() { - val value = - JsonToAirbyteValue().convert(JsonNodeFactory.instance.textNode("2021-01-01"), DateType) - Assertions.assertTrue(value is DateValue) - Assertions.assertEquals("2021-01-01", (value as DateValue).value) - } - - @Test - fun testTimestamp() { - val value = - JsonToAirbyteValue() - .convert( - JsonNodeFactory.instance.textNode("2021-01-01T00:00:00Z"), - TimestampTypeWithTimezone - ) - Assertions.assertTrue(value is TimestampValue) - Assertions.assertEquals("2021-01-01T00:00:00Z", (value as TimestampValue).value) - } - - @Test - fun testTime() { - val value = - JsonToAirbyteValue() - .convert(JsonNodeFactory.instance.textNode("00:00:00"), TimeTypeWithTimezone) - Assertions.assertTrue(value is TimeValue) - Assertions.assertEquals("00:00:00", (value as TimeValue).value) - } - - @Test - fun testMissingObjectField() { - val value = - JsonToAirbyteValue() - .convert( - """{"foo": 1}""".deserializeToNode(), - ObjectType( - properties = - linkedMapOf( - "foo" to FieldType(IntegerType, nullable = true), - "bar" to FieldType(IntegerType, nullable = true), - ) - ) - ) - Assertions.assertEquals( - ObjectValue(linkedMapOf("foo" to IntegerValue(1))), - value, - ) - } } diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/test/util/RecordDifferTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/test/util/RecordDifferTest.kt index 5f567b00643d..bfae3c54bfbe 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/test/util/RecordDifferTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/test/util/RecordDifferTest.kt @@ -108,14 +108,14 @@ class RecordDifferTest { Assertions.assertEquals( """ - Missing record (pk=[IntegerValue(value=1), IntegerValue(value=100)], cursor=TimestampValue(value=1970-01-01T00:00Z)): OutputRecord(rawId=null, extractedAt=1970-01-01T00:00:01.234Z, loadedAt=null, generationId=42, data=ObjectValue(values={id1=IntegerValue(value=1), id2=IntegerValue(value=100), updated_at=TimestampValue(value=1970-01-01T00:00Z), name=StringValue(value=alice), phone=StringValue(value=1234)}), airbyteMeta=null) - Incorrect record (pk=[IntegerValue(value=1), IntegerValue(value=100)], cursor=TimestampValue(value=1970-01-01T00:00:02Z)): + Missing record (pk=[IntegerValue(value=1), IntegerValue(value=100)], cursor=TimestampWithTimezoneValue(value=1970-01-01T00:00Z)): OutputRecord(rawId=null, extractedAt=1970-01-01T00:00:01.234Z, loadedAt=null, generationId=42, data=ObjectValue(values={id1=IntegerValue(value=1), id2=IntegerValue(value=100), updated_at=TimestampWithTimezoneValue(value=1970-01-01T00:00Z), name=StringValue(value=alice), phone=StringValue(value=1234)}), airbyteMeta=null) + Incorrect record (pk=[IntegerValue(value=1), IntegerValue(value=100)], cursor=TimestampWithTimezoneValue(value=1970-01-01T00:00:02Z)): generationId: Expected 42, got 41 airbyteMeta: Expected Meta(changes=[], syncId=42), got null phone: Expected StringValue(value=1234), but was StringValue(value=5678) email: Expected StringValue(value=charlie@example.com), but was address: Expected , but was StringValue(value=1234 charlie street) - Unexpected record (pk=[IntegerValue(value=1), IntegerValue(value=100)], cursor=TimestampValue(value=1970-01-01T00:00:03Z)): OutputRecord(rawId=null, extractedAt=1970-01-01T00:00:01.234Z, loadedAt=null, generationId=42, data=ObjectValue(values={id1=IntegerValue(value=1), id2=IntegerValue(value=100), updated_at=TimestampValue(value=1970-01-01T00:00:03Z), name=StringValue(value=dana)}), airbyteMeta=null) + Unexpected record (pk=[IntegerValue(value=1), IntegerValue(value=100)], cursor=TimestampWithTimezoneValue(value=1970-01-01T00:00:03Z)): OutputRecord(rawId=null, extractedAt=1970-01-01T00:00:01.234Z, loadedAt=null, generationId=42, data=ObjectValue(values={id1=IntegerValue(value=1), id2=IntegerValue(value=100), updated_at=TimestampWithTimezoneValue(value=1970-01-01T00:00:03Z), name=StringValue(value=dana)}), airbyteMeta=null) """.trimIndent(), diff ) diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/util/TimeStringUtilityTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/util/TimeStringUtilityTest.kt index 28e39a4219e8..676a66cc8d2a 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/util/TimeStringUtilityTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/util/TimeStringUtilityTest.kt @@ -4,7 +4,7 @@ package io.airbyte.cdk.load.util -import io.airbyte.cdk.load.data.TimeStringToInteger +import io.airbyte.cdk.load.data.AirbyteValueDeepCoercingMapper import java.time.LocalDate import java.time.LocalDateTime import java.time.LocalTime @@ -22,7 +22,7 @@ internal class TimeStringUtilityTest { val localDateString = "2024-11-18" val localDate = TimeStringUtility.toLocalDate(localDateString) assertEquals( - LocalDate.parse(localDateString, TimeStringToInteger.DATE_TIME_FORMATTER), + LocalDate.parse(localDateString, AirbyteValueDeepCoercingMapper.DATE_TIME_FORMATTER), localDate ) } @@ -40,7 +40,10 @@ internal class TimeStringUtilityTest { val localDateTimeString = "2024-11-18T12:34:56Z" val localDateTime = TimeStringUtility.toLocalDateTime(localDateTimeString) assertEquals( - LocalDateTime.parse(localDateTimeString, TimeStringToInteger.DATE_TIME_FORMATTER), + LocalDateTime.parse( + localDateTimeString, + AirbyteValueDeepCoercingMapper.DATE_TIME_FORMATTER + ), localDateTime ) } @@ -50,7 +53,10 @@ internal class TimeStringUtilityTest { val offsetWithTimezoneString = "12:34:56Z" val offsetWithTimezone = TimeStringUtility.toOffset(offsetWithTimezoneString) assertEquals( - OffsetTime.parse(offsetWithTimezoneString, TimeStringToInteger.TIME_FORMATTER) + OffsetTime.parse( + offsetWithTimezoneString, + AirbyteValueDeepCoercingMapper.TIME_FORMATTER + ) .toLocalTime(), offsetWithTimezone ) @@ -61,7 +67,10 @@ internal class TimeStringUtilityTest { val offsetWithoutTimezoneString = "12:34:56" val offsetWithoutTimezone = TimeStringUtility.toOffset(offsetWithoutTimezoneString) assertEquals( - LocalTime.parse(offsetWithoutTimezoneString, TimeStringToInteger.TIME_FORMATTER), + LocalTime.parse( + offsetWithoutTimezoneString, + AirbyteValueDeepCoercingMapper.TIME_FORMATTER + ), offsetWithoutTimezone ) } @@ -71,7 +80,10 @@ internal class TimeStringUtilityTest { val offsetWithTimezoneString = "2024-11-18T12:34:56Z" val offsetWithTimezone = TimeStringUtility.toOffsetDateTime(offsetWithTimezoneString) assertEquals( - ZonedDateTime.parse(offsetWithTimezoneString, TimeStringToInteger.DATE_TIME_FORMATTER) + ZonedDateTime.parse( + offsetWithTimezoneString, + AirbyteValueDeepCoercingMapper.DATE_TIME_FORMATTER + ) .toOffsetDateTime(), offsetWithTimezone ) @@ -84,7 +96,7 @@ internal class TimeStringUtilityTest { assertEquals( LocalDateTime.parse( offsetWithoutTimezoneString, - TimeStringToInteger.DATE_TIME_FORMATTER + AirbyteValueDeepCoercingMapper.DATE_TIME_FORMATTER ) .atOffset(ZoneOffset.UTC), offsetWithoutTimezone diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/message/InputMessage.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/message/InputMessage.kt index f6cbd25b63f0..322e750056f6 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/message/InputMessage.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/message/InputMessage.kt @@ -6,7 +6,6 @@ package io.airbyte.cdk.load.message import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.data.AirbyteValue -import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema import io.airbyte.cdk.load.data.json.JsonToAirbyteValue import io.airbyte.cdk.load.data.json.toJson import io.airbyte.cdk.load.message.CheckpointMessage.Checkpoint @@ -35,7 +34,7 @@ data class InputRecord( changes: MutableList = mutableListOf(), ) : this( stream = DestinationStream.Descriptor(namespace, name), - data = JsonToAirbyteValue().convert(data.deserializeToNode(), ObjectTypeWithoutSchema), + data = JsonToAirbyteValue().convert(data.deserializeToNode()), emittedAtMs = emittedAtMs, meta = Meta(changes), serialized = "", diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/AirbyteValueWithMetaToOutputRecord.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/AirbyteValueWithMetaToOutputRecord.kt index c348f4c3d24c..20c70534e0f9 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/AirbyteValueWithMetaToOutputRecord.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/AirbyteValueWithMetaToOutputRecord.kt @@ -9,11 +9,11 @@ import io.airbyte.cdk.load.data.ArrayValue import io.airbyte.cdk.load.data.IntegerValue import io.airbyte.cdk.load.data.ObjectValue import io.airbyte.cdk.load.data.StringValue +import io.airbyte.cdk.load.data.TimestampWithTimezoneValue import io.airbyte.cdk.load.message.Meta import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange import java.time.Instant -import java.util.* -import kotlin.collections.LinkedHashMap +import java.util.UUID class AirbyteValueWithMetaToOutputRecord { fun convert(value: ObjectValue): OutputRecord { @@ -23,7 +23,13 @@ class AirbyteValueWithMetaToOutputRecord { UUID.fromString((value.values[Meta.COLUMN_NAME_AB_RAW_ID] as StringValue).value), extractedAt = Instant.ofEpochMilli( - (value.values[Meta.COLUMN_NAME_AB_EXTRACTED_AT] as IntegerValue).value.toLong() + value.values[Meta.COLUMN_NAME_AB_EXTRACTED_AT].let { v -> + when (v) { + is IntegerValue -> v.value.toLong() + is TimestampWithTimezoneValue -> v.value.toEpochSecond() + else -> throw IllegalArgumentException("Invalid extractedAt value: $v") + } + } ), loadedAt = null, data = value.values[Meta.COLUMN_NAME_DATA] as ObjectValue, diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/ExpectedRecordMapper.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/ExpectedRecordMapper.kt index a94c9643fff8..3320a78ad8c9 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/ExpectedRecordMapper.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/ExpectedRecordMapper.kt @@ -5,6 +5,16 @@ package io.airbyte.cdk.load.test.util import io.airbyte.cdk.load.data.AirbyteType +import io.airbyte.cdk.load.data.AirbyteValue +import io.airbyte.cdk.load.data.ArrayValue +import io.airbyte.cdk.load.data.DateValue +import io.airbyte.cdk.load.data.ObjectValue +import io.airbyte.cdk.load.data.StringValue +import io.airbyte.cdk.load.data.TimeWithTimezoneValue +import io.airbyte.cdk.load.data.TimeWithoutTimezoneValue +import io.airbyte.cdk.load.data.TimestampWithTimezoneValue +import io.airbyte.cdk.load.data.TimestampWithoutTimezoneValue +import java.time.format.DateTimeFormatter fun interface ExpectedRecordMapper { fun mapRecord(expectedRecord: OutputRecord, schema: AirbyteType): OutputRecord @@ -14,3 +24,36 @@ object NoopExpectedRecordMapper : ExpectedRecordMapper { override fun mapRecord(expectedRecord: OutputRecord, schema: AirbyteType): OutputRecord = expectedRecord } + +/** + * Some destinations (e.g. JSONL files) don't have temporal types, we just write everything as + * string. So we map expected records' temporal values back to string. + */ +object UncoercedExpectedRecordMapper : ExpectedRecordMapper { + override fun mapRecord(expectedRecord: OutputRecord, schema: AirbyteType): OutputRecord { + val mappedData = mapTemporalValuesToString(expectedRecord.data) + return expectedRecord.copy(data = mappedData as ObjectValue) + } + + private fun mapTemporalValuesToString(value: AirbyteValue): AirbyteValue = + when (value) { + is DateValue -> StringValue(value.value.toString()) + // Use specific formatters that match our integration test input. + is TimeWithTimezoneValue -> + StringValue(value.value.format(DateTimeFormatter.ISO_OFFSET_TIME)) + is TimeWithoutTimezoneValue -> + StringValue(value.value.format(DateTimeFormatter.ISO_LOCAL_TIME)) + is TimestampWithTimezoneValue -> + StringValue(value.value.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME)) + is TimestampWithoutTimezoneValue -> + StringValue(value.value.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)) + is ArrayValue -> ArrayValue(value.values.map { mapTemporalValuesToString(it) }) + is ObjectValue -> + ObjectValue( + value.values.mapValuesTo(linkedMapOf()) { (_, v) -> + mapTemporalValuesToString(v) + } + ) + else -> value + } +} diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/RecordDiffer.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/RecordDiffer.kt index 49f275c17ead..227de6114934 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/RecordDiffer.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/RecordDiffer.kt @@ -6,19 +6,11 @@ package io.airbyte.cdk.load.test.util import io.airbyte.cdk.load.data.AirbyteValue import io.airbyte.cdk.load.data.ArrayValue -import io.airbyte.cdk.load.data.DateValue import io.airbyte.cdk.load.data.IntegerValue import io.airbyte.cdk.load.data.NullValue import io.airbyte.cdk.load.data.ObjectValue -import io.airbyte.cdk.load.data.TimeValue -import io.airbyte.cdk.load.data.TimestampValue import io.airbyte.cdk.load.data.UnknownValue import io.airbyte.cdk.load.data.json.JsonToAirbyteValue -import java.time.LocalDate -import java.time.LocalDateTime -import java.time.LocalTime -import java.time.OffsetDateTime -import java.time.OffsetTime import kotlin.reflect.jvm.jvmName class RecordDiffer( @@ -302,7 +294,7 @@ class RecordDiffer( private fun compare(v1: AirbyteValue, v2: AirbyteValue, nullEqualsUnset: Boolean): Int { if (v1 is UnknownValue) { return compare( - JsonToAirbyteValue().fromJson(v1.value), + JsonToAirbyteValue().convert(v1.value), v2, nullEqualsUnset, ) @@ -310,7 +302,7 @@ class RecordDiffer( if (v2 is UnknownValue) { return compare( v1, - JsonToAirbyteValue().fromJson(v2.value), + JsonToAirbyteValue().convert(v2.value), nullEqualsUnset, ) } @@ -321,45 +313,7 @@ class RecordDiffer( return if (v1::class != v2::class) { v1::class.jvmName.compareTo(v2::class.jvmName) } else { - // Handle temporal types specifically, because they require explicit parsing return when (v1) { - is DateValue -> - try { - LocalDate.parse(v1.value) - .compareTo(LocalDate.parse((v2 as DateValue).value)) - } catch (e: Exception) { - v1.value.compareTo((v2 as DateValue).value) - } - is TimeValue -> { - try { - val time1 = LocalTime.parse(v1.value) - val time2 = LocalTime.parse((v2 as TimeValue).value) - time1.compareTo(time2) - } catch (e: Exception) { - try { - val time1 = OffsetTime.parse(v1.value) - val time2 = OffsetTime.parse((v2 as TimeValue).value) - time1.compareTo(time2) - } catch (e: Exception) { - v1.value.compareTo((v2 as TimeValue).value) - } - } - } - is TimestampValue -> { - try { - val ts1 = LocalDateTime.parse(v1.value) - val ts2 = LocalDateTime.parse((v2 as TimestampValue).value) - ts1.compareTo(ts2) - } catch (e: Exception) { - try { - val ts1 = OffsetDateTime.parse(v1.value) - val ts2 = OffsetDateTime.parse((v2 as TimestampValue).value) - ts1.compareTo(ts2) - } catch (e: Exception) { - v1.value.compareTo((v2 as TimestampValue).value) - } - } - } is ObjectValue -> { fun objComp(a: ObjectValue, b: ObjectValue): Int { // objects aren't really comparable, so just do an equality check diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt index 91a3d30d108a..dddfd73562d3 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt @@ -16,7 +16,6 @@ import io.airbyte.cdk.load.data.ArrayType import io.airbyte.cdk.load.data.ArrayTypeWithoutSchema import io.airbyte.cdk.load.data.BooleanType import io.airbyte.cdk.load.data.DateType -import io.airbyte.cdk.load.data.DateValue import io.airbyte.cdk.load.data.FieldType import io.airbyte.cdk.load.data.IntegerType import io.airbyte.cdk.load.data.IntegerValue @@ -29,10 +28,9 @@ import io.airbyte.cdk.load.data.StringType import io.airbyte.cdk.load.data.StringValue import io.airbyte.cdk.load.data.TimeTypeWithTimezone import io.airbyte.cdk.load.data.TimeTypeWithoutTimezone -import io.airbyte.cdk.load.data.TimeValue import io.airbyte.cdk.load.data.TimestampTypeWithTimezone import io.airbyte.cdk.load.data.TimestampTypeWithoutTimezone -import io.airbyte.cdk.load.data.TimestampValue +import io.airbyte.cdk.load.data.TimestampWithTimezoneValue import io.airbyte.cdk.load.data.UnionType import io.airbyte.cdk.load.data.UnknownType import io.airbyte.cdk.load.message.DestinationFile @@ -647,7 +645,7 @@ abstract class BasicFunctionalityIntegrationTest( data = mapOf( "id" to id, - "updated_at" to OffsetDateTime.parse(updatedAt), + "updated_at" to TimestampWithTimezoneValue(updatedAt), "name" to "foo_${id}_$extractedAt", ), airbyteMeta = OutputRecord.Meta(syncId = syncId), @@ -816,7 +814,7 @@ abstract class BasicFunctionalityIntegrationTest( data = mapOf( "id" to id, - "updated_at" to OffsetDateTime.parse(updatedAt), + "updated_at" to TimestampWithTimezoneValue(updatedAt), "name" to "foo_${id}_$extractedAt", ), airbyteMeta = OutputRecord.Meta(syncId = syncId), @@ -937,7 +935,7 @@ abstract class BasicFunctionalityIntegrationTest( data = mapOf( "id" to id, - "updated_at" to OffsetDateTime.parse(updatedAt), + "updated_at" to TimestampWithTimezoneValue(updatedAt), "name" to "foo_${id}_$extractedAt", ), airbyteMeta = OutputRecord.Meta(syncId = syncId), @@ -1309,7 +1307,7 @@ abstract class BasicFunctionalityIntegrationTest( mapOf( "id1" to 1, "id2" to 200, - "updated_at" to OffsetDateTime.parse("2000-01-01T00:01:00Z"), + "updated_at" to TimestampWithTimezoneValue("2000-01-01T00:01:00Z"), "name" to "Alice2", "_ab_cdc_deleted_at" to null ), @@ -1322,7 +1320,7 @@ abstract class BasicFunctionalityIntegrationTest( mapOf( "id1" to 1, "id2" to 201, - "updated_at" to OffsetDateTime.parse("2000-01-01T00:02:00Z"), + "updated_at" to TimestampWithTimezoneValue("2000-01-01T00:02:00Z"), "name" to "Bob1" ), airbyteMeta = OutputRecord.Meta(syncId = 42), @@ -1367,7 +1365,7 @@ abstract class BasicFunctionalityIntegrationTest( mapOf( "id1" to 1, "id2" to 200, - "updated_at" to OffsetDateTime.parse("2000-01-02T00:00:00Z"), + "updated_at" to TimestampWithTimezoneValue("2000-01-02T00:00:00Z"), "name" to "Alice3", "_ab_cdc_deleted_at" to null ), @@ -1687,21 +1685,18 @@ abstract class BasicFunctionalityIntegrationTest( bigInt = BigInteger("99999999999999999999999999999999") bigIntChanges = emptyList() badValuesData = + // note that the values have different types than what's declared in the schema mapOf( "id" to 5, - "string" to StringValue("{}"), + "string" to ObjectValue(linkedMapOf()), "number" to "foo", "integer" to "foo", "boolean" to "foo", - // TODO this probably indicates that we should - // 1. actually parse time types - // 2. and just rely on the fallback to JsonToAirbyteValue.fromJson to return - // a StringValue - "timestamp_with_timezone" to TimestampValue("foo"), - "timestamp_without_timezone" to TimestampValue("foo"), - "time_with_timezone" to TimeValue("foo"), - "time_without_timezone" to TimeValue("foo"), - "date" to DateValue("foo"), + "timestamp_with_timezone" to "foo", + "timestamp_without_timezone" to "foo", + "time_with_timezone" to "foo", + "time_without_timezone" to "foo", + "date" to "foo", ) badValuesChanges = mutableListOf() } diff --git a/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AvroMapperPipelineFactory.kt b/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AvroMapperPipelineFactory.kt index b47c596dd272..cae1f8a7759e 100644 --- a/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AvroMapperPipelineFactory.kt +++ b/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AvroMapperPipelineFactory.kt @@ -6,6 +6,7 @@ package io.airbyte.cdk.load.data.avro import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.data.AirbyteSchemaNoopMapper +import io.airbyte.cdk.load.data.AirbyteValueDeepCoercingMapper import io.airbyte.cdk.load.data.AirbyteValueNoopMapper import io.airbyte.cdk.load.data.FailOnAllUnknownTypesExceptNull import io.airbyte.cdk.load.data.MapperPipeline @@ -20,10 +21,17 @@ class AvroMapperPipelineFactory : MapperPipelineFactory { MapperPipeline( stream.schema, listOf( - FailOnAllUnknownTypesExceptNull() to SchemalessValuesToJsonString(), + FailOnAllUnknownTypesExceptNull() to AirbyteValueNoopMapper(), + MergeUnions() to AirbyteValueNoopMapper(), + AirbyteSchemaNoopMapper() to AirbyteValueDeepCoercingMapper(), + // We need to maintain the original ObjectWithNoProperties/etc type. + // For example, if a stream declares no columns, we will (correctly) recognize + // the root schema as ObjectTypeWithEmptySchema. + // If we then map that root schema to StringType, then + // AirbyteTypeToAirbyteTypeWithMeta will crash on it. + AirbyteSchemaNoopMapper() to SchemalessValuesToJsonString(), AirbyteSchemaNoopMapper() to NullOutOfRangeIntegers(), AirbyteSchemaNoopMapper() to TimeStringToInteger(), - MergeUnions() to AirbyteValueNoopMapper(), ), ) } diff --git a/airbyte-cdk/bulk/toolkits/load-avro/src/testFixtures/kotlin/io/airbyte/cdk/load/data/avro/AvroRecordToAirbyteValue.kt b/airbyte-cdk/bulk/toolkits/load-avro/src/testFixtures/kotlin/io/airbyte/cdk/load/data/avro/AvroRecordToAirbyteValue.kt index 59fcb4feb0f1..1c2136c13286 100644 --- a/airbyte-cdk/bulk/toolkits/load-avro/src/testFixtures/kotlin/io/airbyte/cdk/load/data/avro/AvroRecordToAirbyteValue.kt +++ b/airbyte-cdk/bulk/toolkits/load-avro/src/testFixtures/kotlin/io/airbyte/cdk/load/data/avro/AvroRecordToAirbyteValue.kt @@ -26,10 +26,12 @@ import io.airbyte.cdk.load.data.StringType import io.airbyte.cdk.load.data.StringValue import io.airbyte.cdk.load.data.TimeTypeWithTimezone import io.airbyte.cdk.load.data.TimeTypeWithoutTimezone -import io.airbyte.cdk.load.data.TimeValue +import io.airbyte.cdk.load.data.TimeWithTimezoneValue +import io.airbyte.cdk.load.data.TimeWithoutTimezoneValue import io.airbyte.cdk.load.data.TimestampTypeWithTimezone import io.airbyte.cdk.load.data.TimestampTypeWithoutTimezone -import io.airbyte.cdk.load.data.TimestampValue +import io.airbyte.cdk.load.data.TimestampWithTimezoneValue +import io.airbyte.cdk.load.data.TimestampWithoutTimezoneValue import io.airbyte.cdk.load.data.UnionType import io.airbyte.cdk.load.data.UnknownType import java.time.Instant @@ -68,7 +70,6 @@ class AvroRecordToAirbyteValue { ZoneOffset.UTC ) .toLocalDate() - .toString() ) is IntegerType -> return IntegerValue(avroValue as Long) is NumberType -> return NumberValue((avroValue as Double).toBigDecimal()) @@ -86,29 +87,28 @@ class AvroRecordToAirbyteValue { } ) is TimeTypeWithoutTimezone -> - return TimeValue( + return TimeWithoutTimezoneValue( Instant.ofEpochMilli((avroValue as Long) / 1000) .atOffset(ZoneOffset.UTC) .toLocalTime() - .toString() ) is TimeTypeWithTimezone -> - return TimeValue( + return TimeWithTimezoneValue( Instant.ofEpochMilli((avroValue as Long) / 1000) .atOffset(ZoneOffset.UTC) .toOffsetTime() - .toString() ) is TimestampTypeWithoutTimezone -> - return TimestampValue( + return TimestampWithoutTimezoneValue( LocalDateTime.ofInstant( - Instant.ofEpochMilli((avroValue as Long) / 1000), - ZoneOffset.UTC - ) - .toString() + Instant.ofEpochMilli((avroValue as Long) / 1000), + ZoneOffset.UTC + ) ) is TimestampTypeWithTimezone -> - return TimestampValue(Instant.ofEpochMilli((avroValue as Long) / 1000).toString()) + return TimestampWithTimezoneValue( + Instant.ofEpochMilli((avroValue as Long) / 1000).atOffset(ZoneOffset.UTC) + ) is UnionType -> return tryConvertUnion(avroValue, schema) else -> throw IllegalArgumentException("Unsupported schema type: $schema") } diff --git a/airbyte-cdk/bulk/toolkits/load-csv/src/main/kotlin/io/airbyte/cdk/load/data/csv/AirbyteValueToCsvRow.kt b/airbyte-cdk/bulk/toolkits/load-csv/src/main/kotlin/io/airbyte/cdk/load/data/csv/AirbyteValueToCsvRow.kt index 2097d977f062..f35cde846af1 100644 --- a/airbyte-cdk/bulk/toolkits/load-csv/src/main/kotlin/io/airbyte/cdk/load/data/csv/AirbyteValueToCsvRow.kt +++ b/airbyte-cdk/bulk/toolkits/load-csv/src/main/kotlin/io/airbyte/cdk/load/data/csv/AirbyteValueToCsvRow.kt @@ -13,8 +13,10 @@ import io.airbyte.cdk.load.data.NumberValue import io.airbyte.cdk.load.data.ObjectType import io.airbyte.cdk.load.data.ObjectValue import io.airbyte.cdk.load.data.StringValue -import io.airbyte.cdk.load.data.TimeValue -import io.airbyte.cdk.load.data.TimestampValue +import io.airbyte.cdk.load.data.TimeWithTimezoneValue +import io.airbyte.cdk.load.data.TimeWithoutTimezoneValue +import io.airbyte.cdk.load.data.TimestampWithTimezoneValue +import io.airbyte.cdk.load.data.TimestampWithoutTimezoneValue import io.airbyte.cdk.load.data.UnknownValue import io.airbyte.cdk.load.data.json.toJson import io.airbyte.cdk.load.util.serializeToString @@ -29,10 +31,12 @@ fun ObjectValue.toCsvRecord(schema: ObjectType): List { is IntegerValue -> it.value is NumberValue -> it.value is NullValue -> "" - is TimestampValue -> it.value + is TimestampWithTimezoneValue -> it.value + is TimestampWithoutTimezoneValue -> it.value is BooleanValue -> it.value is DateValue -> it.value - is TimeValue -> it.value + is TimeWithTimezoneValue -> it.value + is TimeWithoutTimezoneValue -> it.value is UnknownValue -> "" } } diff --git a/airbyte-cdk/bulk/toolkits/load-csv/src/testFixtures/kotlin/io/airbyte/cdk/load/data/csv/CsvRowToAirbyteValue.kt b/airbyte-cdk/bulk/toolkits/load-csv/src/testFixtures/kotlin/io/airbyte/cdk/load/data/csv/CsvRowToAirbyteValue.kt index b2fde6a6425e..f140cf9142e4 100644 --- a/airbyte-cdk/bulk/toolkits/load-csv/src/testFixtures/kotlin/io/airbyte/cdk/load/data/csv/CsvRowToAirbyteValue.kt +++ b/airbyte-cdk/bulk/toolkits/load-csv/src/testFixtures/kotlin/io/airbyte/cdk/load/data/csv/CsvRowToAirbyteValue.kt @@ -6,34 +6,11 @@ package io.airbyte.cdk.load.data.csv import io.airbyte.cdk.load.data.AirbyteType import io.airbyte.cdk.load.data.AirbyteValue -import io.airbyte.cdk.load.data.ArrayType -import io.airbyte.cdk.load.data.ArrayTypeWithoutSchema -import io.airbyte.cdk.load.data.ArrayValue -import io.airbyte.cdk.load.data.BooleanType -import io.airbyte.cdk.load.data.BooleanValue -import io.airbyte.cdk.load.data.DateType -import io.airbyte.cdk.load.data.DateValue -import io.airbyte.cdk.load.data.IntegerType -import io.airbyte.cdk.load.data.IntegerValue -import io.airbyte.cdk.load.data.NullValue -import io.airbyte.cdk.load.data.NumberType -import io.airbyte.cdk.load.data.NumberValue import io.airbyte.cdk.load.data.ObjectType -import io.airbyte.cdk.load.data.ObjectTypeWithEmptySchema -import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema import io.airbyte.cdk.load.data.ObjectValue -import io.airbyte.cdk.load.data.StringType import io.airbyte.cdk.load.data.StringValue -import io.airbyte.cdk.load.data.TimeTypeWithTimezone -import io.airbyte.cdk.load.data.TimeTypeWithoutTimezone -import io.airbyte.cdk.load.data.TimeValue -import io.airbyte.cdk.load.data.TimestampTypeWithTimezone -import io.airbyte.cdk.load.data.TimestampTypeWithoutTimezone -import io.airbyte.cdk.load.data.TimestampValue -import io.airbyte.cdk.load.data.UnionType -import io.airbyte.cdk.load.data.UnknownType -import io.airbyte.cdk.load.data.UnknownValue import io.airbyte.cdk.load.data.json.toAirbyteValue +import io.airbyte.cdk.load.message.Meta import io.airbyte.cdk.load.util.deserializeToNode import org.apache.commons.csv.CSVRecord @@ -42,81 +19,25 @@ class CsvRowToAirbyteValue { if (schema !is ObjectType) { throw IllegalArgumentException("Only object types are supported") } - val asList = row.toList() - val properties = linkedMapOf() - schema.properties - .toList() - .zip(asList) - .map { (property, value) -> - property.first to convertInner(value, property.second.type) - } - .toMap(properties) - return ObjectValue(properties) - } - private fun convertInner(value: String, field: AirbyteType): AirbyteValue { - if (value.isBlank()) { - return NullValue - } - return try { - when (field) { - is ArrayType -> { - value - .deserializeToNode() - .elements() - .asSequence() - .map { it.toAirbyteValue(field.items.type) } - .toList() - .let(::ArrayValue) - } - is ArrayTypeWithoutSchema -> - value.deserializeToNode().toAirbyteValue(ArrayTypeWithoutSchema) - is BooleanType -> BooleanValue(value.toBooleanStrict()) - is IntegerType -> IntegerValue(value.toBigInteger()) - is NumberType -> NumberValue(value.toBigDecimal()) - is ObjectType -> { - val properties = linkedMapOf() - value - .deserializeToNode() - .fields() - .asSequence() - .map { entry -> - val type = - field.properties[entry.key]?.type - ?: UnknownType(value.deserializeToNode()) - entry.key to entry.value.toAirbyteValue(type) + return ObjectValue( + row.parser.headerNames.zip(row.toList()).associateTo(linkedMapOf()) { (key, valueString) + -> + val airbyteValue = + if (Meta.COLUMN_NAMES.contains(key)) { + Meta.getMetaValue(key, valueString) + } else { + try { + valueString.deserializeToNode().toAirbyteValue() + } catch (e: Exception) { + // boolean/number/object/array can deserialize cleanly + // but strings don't work, so just handle them here + StringValue(valueString) } - .toMap(properties) - ObjectValue(properties) - } - is ObjectTypeWithEmptySchema -> - value.deserializeToNode().toAirbyteValue(ObjectTypeWithEmptySchema) - is ObjectTypeWithoutSchema -> - value.deserializeToNode().toAirbyteValue(ObjectTypeWithoutSchema) - is StringType -> StringValue(value) - is UnionType -> { - // Use the options sorted with string last since it always works - field.options - .sortedBy { it is StringType } - .firstNotNullOfOrNull { option -> - try { - convertInner(value, option) - } catch (e: Exception) { - null - } - } - ?: NullValue - } - DateType -> DateValue(value) - TimeTypeWithTimezone, - TimeTypeWithoutTimezone -> TimeValue(value) - TimestampTypeWithTimezone, - TimestampTypeWithoutTimezone -> TimestampValue(value) - is UnknownType -> UnknownValue(value.deserializeToNode()) - } - } catch (e: Exception) { - StringValue(value) - } + } + key to airbyteValue + }, + ) } } diff --git a/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/AirbyteTypeToIcebergSchema.kt b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/AirbyteTypeToIcebergSchema.kt index b5224df1a8f2..344fcf56e85c 100644 --- a/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/AirbyteTypeToIcebergSchema.kt +++ b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/AirbyteTypeToIcebergSchema.kt @@ -59,16 +59,14 @@ class AirbyteTypeToIcebergSchema { } return Types.ListType.ofRequired(UUID.randomUUID().hashCode(), convert) } - is ArrayTypeWithoutSchema -> - throw IllegalArgumentException("Array type without schema is not supported") is BooleanType -> Types.BooleanType.get() is DateType -> Types.DateType.get() is IntegerType -> Types.LongType.get() is NumberType -> Types.DoubleType.get() - is ObjectTypeWithEmptySchema -> - throw IllegalArgumentException("Object type with empty schema is not supported") - is ObjectTypeWithoutSchema -> - throw IllegalArgumentException("Object type without schema is not supported") + // Schemaless types are converted to string + is ArrayTypeWithoutSchema, + is ObjectTypeWithEmptySchema, + is ObjectTypeWithoutSchema -> Types.StringType.get() is StringType -> Types.StringType.get() is TimeTypeWithTimezone, is TimeTypeWithoutTimezone -> Types.TimeType.get() diff --git a/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/AirbyteValueToIcebergRecord.kt b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/AirbyteValueToIcebergRecord.kt index 4fe37446a559..e3f6dead93fb 100644 --- a/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/AirbyteValueToIcebergRecord.kt +++ b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/AirbyteValueToIcebergRecord.kt @@ -12,10 +12,12 @@ import io.airbyte.cdk.load.data.NullValue import io.airbyte.cdk.load.data.NumberValue import io.airbyte.cdk.load.data.ObjectValue import io.airbyte.cdk.load.data.StringValue -import io.airbyte.cdk.load.data.TimeValue -import io.airbyte.cdk.load.data.TimestampValue +import io.airbyte.cdk.load.data.TimeWithTimezoneValue +import io.airbyte.cdk.load.data.TimeWithoutTimezoneValue +import io.airbyte.cdk.load.data.TimestampWithTimezoneValue +import io.airbyte.cdk.load.data.TimestampWithoutTimezoneValue import io.airbyte.cdk.load.data.UnknownValue -import io.airbyte.cdk.load.util.TimeStringUtility +import java.time.ZoneOffset import org.apache.iceberg.Schema import org.apache.iceberg.data.GenericRecord import org.apache.iceberg.types.Type @@ -60,24 +62,32 @@ class AirbyteValueToIcebergRecord { return array } is BooleanValue -> return airbyteValue.value - is DateValue -> return TimeStringUtility.toLocalDate(airbyteValue.value) + is DateValue -> return airbyteValue.value is IntegerValue -> return airbyteValue.value.toLong() is NullValue -> return null is NumberValue -> return airbyteValue.value.toDouble() is StringValue -> return airbyteValue.value - is TimeValue -> + is TimeWithTimezoneValue -> return when (type.typeId()) { - Type.TypeID.TIME -> TimeStringUtility.toOffset(airbyteValue.value) + Type.TypeID.TIME -> airbyteValue.value.toLocalTime() else -> throw IllegalArgumentException( "${type.typeId()} type is not allowed for TimeValue" ) } - is TimestampValue -> + is TimeWithoutTimezoneValue -> + return when (type.typeId()) { + Type.TypeID.TIME -> airbyteValue.value + else -> + throw IllegalArgumentException( + "${type.typeId()} type is not allowed for TimeValue" + ) + } + is TimestampWithTimezoneValue -> return when (type.typeId()) { Type.TypeID.TIMESTAMP -> { val timestampType = type as TimestampType - val offsetDateTime = TimeStringUtility.toOffsetDateTime(airbyteValue.value) + val offsetDateTime = airbyteValue.value if (timestampType.shouldAdjustToUTC()) { offsetDateTime @@ -90,6 +100,23 @@ class AirbyteValueToIcebergRecord { "${type.typeId()} type is not allowed for TimestampValue" ) } + is TimestampWithoutTimezoneValue -> + return when (type.typeId()) { + Type.TypeID.TIMESTAMP -> { + val timestampType = type as TimestampType + val localDateTime = airbyteValue.value + + if (timestampType.shouldAdjustToUTC()) { + localDateTime.atOffset(ZoneOffset.UTC) + } else { + localDateTime + } + } + else -> + throw IllegalArgumentException( + "${type.typeId()} type is not allowed for TimestampValue" + ) + } is UnknownValue -> throw IllegalArgumentException("Unknown type is not supported") } } diff --git a/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/IcebergParquetPipelineFactory.kt b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/IcebergParquetPipelineFactory.kt index 4ea15d75cd24..979662621260 100644 --- a/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/IcebergParquetPipelineFactory.kt +++ b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/IcebergParquetPipelineFactory.kt @@ -6,6 +6,7 @@ package io.airbyte.cdk.load.data.iceberg.parquet import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.data.AirbyteSchemaNoopMapper +import io.airbyte.cdk.load.data.AirbyteValueDeepCoercingMapper import io.airbyte.cdk.load.data.AirbyteValueNoopMapper import io.airbyte.cdk.load.data.MapperPipeline import io.airbyte.cdk.load.data.MapperPipelineFactory @@ -20,10 +21,17 @@ class IcebergParquetPipelineFactory : MapperPipelineFactory { MapperPipeline( stream.schema, listOf( - // TODO not sure why base parquet was doing this as a noop - SchemalessTypesToStringType() to SchemalessValuesToJsonString(), - AirbyteSchemaNoopMapper() to NullOutOfRangeIntegers(), MergeUnions() to AirbyteValueNoopMapper(), + AirbyteSchemaNoopMapper() to AirbyteValueDeepCoercingMapper(), + // We need to maintain the original ObjectWithNoProperties/etc type. + // For example, if a stream declares no columns, we will (correctly) recognize + // the root schema as ObjectTypeWithEmptySchema. + // If we then map that root schema to StringType, then + // AirbyteTypeToAirbyteTypeWithMeta will crash on it. + // Furthermore, in UnionTypeToDisjointRecord, this enables us to write thes fields + // as "object" rather than as "string". + AirbyteSchemaNoopMapper() to SchemalessValuesToJsonString(), + AirbyteSchemaNoopMapper() to NullOutOfRangeIntegers(), UnionTypeToDisjointRecord() to UnionValueToDisjointRecord(), ), ) diff --git a/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/SchemalessTypesToStringType.kt b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/SchemalessTypesToStringType.kt deleted file mode 100644 index 9e48c9e95d42..000000000000 --- a/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/SchemalessTypesToStringType.kt +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Copyright (c) 2024 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.cdk.load.data.iceberg.parquet - -import io.airbyte.cdk.load.data.AirbyteSchemaIdentityMapper -import io.airbyte.cdk.load.data.ArrayTypeWithoutSchema -import io.airbyte.cdk.load.data.ObjectTypeWithEmptySchema -import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema -import io.airbyte.cdk.load.data.StringType -import io.airbyte.cdk.load.data.UnknownType - -class SchemalessTypesToStringType : AirbyteSchemaIdentityMapper { - override fun mapArrayWithoutSchema(schema: ArrayTypeWithoutSchema) = StringType - override fun mapObjectWithEmptySchema(schema: ObjectTypeWithEmptySchema) = StringType - override fun mapObjectWithoutSchema(schema: ObjectTypeWithoutSchema) = StringType - override fun mapUnknown(schema: UnknownType) = StringType -} diff --git a/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/testFixtures/kotlin/io/airbyte/cdk/load/data/icerberg/parquet/AirbyteValueToIcebergRecordTest.kt b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/testFixtures/kotlin/io/airbyte/cdk/load/data/icerberg/parquet/AirbyteValueToIcebergRecordTest.kt index e7f255e38e63..5bba3951666e 100644 --- a/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/testFixtures/kotlin/io/airbyte/cdk/load/data/icerberg/parquet/AirbyteValueToIcebergRecordTest.kt +++ b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/testFixtures/kotlin/io/airbyte/cdk/load/data/icerberg/parquet/AirbyteValueToIcebergRecordTest.kt @@ -12,13 +12,20 @@ import io.airbyte.cdk.load.data.NullValue import io.airbyte.cdk.load.data.NumberValue import io.airbyte.cdk.load.data.ObjectValue import io.airbyte.cdk.load.data.StringValue -import io.airbyte.cdk.load.data.TimeValue -import io.airbyte.cdk.load.data.TimestampValue +import io.airbyte.cdk.load.data.TimeWithTimezoneValue +import io.airbyte.cdk.load.data.TimeWithoutTimezoneValue +import io.airbyte.cdk.load.data.TimestampWithTimezoneValue +import io.airbyte.cdk.load.data.TimestampWithoutTimezoneValue import io.airbyte.cdk.load.data.UnknownValue import io.airbyte.cdk.load.data.iceberg.parquet.AirbyteValueToIcebergRecord import io.airbyte.cdk.load.data.iceberg.parquet.toIcebergRecord import io.airbyte.protocol.models.Jsons import java.math.BigDecimal +import java.time.LocalDate +import java.time.LocalDateTime +import java.time.LocalTime +import java.time.OffsetDateTime +import java.time.OffsetTime import org.apache.iceberg.Schema import org.apache.iceberg.data.GenericRecord import org.apache.iceberg.types.Types @@ -81,9 +88,9 @@ class AirbyteValueToIcebergRecordTest { @Test fun `convert throws exception for DateValue`() { - assertThrows { - converter.convert(DateValue("2024-11-18"), Types.DateType.get()) - } + val result = + converter.convert(DateValue(LocalDate.parse("2024-11-18")), Types.DateType.get()) + assertEquals(LocalDate.parse("2024-11-18"), result) } @Test @@ -112,20 +119,44 @@ class AirbyteValueToIcebergRecordTest { } @Test - fun `convert throws exception for TimeValue`() { - assertThrows { - converter.convert(TimeValue("12:34:56"), Types.TimeType.get()) - } + fun `convert handles TimeNtzValue`() { + val result = + converter.convert( + TimeWithoutTimezoneValue(LocalTime.parse("12:34:56")), + Types.TimeType.get() + ) + assertEquals(LocalTime.parse("12:34:56"), result) } @Test - fun `convert throws exception for TimestampValue`() { - assertThrows { + fun `convert handles TimeTzValue`() { + val result = converter.convert( - TimestampValue("2024-11-18T12:34:56Z"), + TimeWithTimezoneValue(OffsetTime.parse("12:34:56Z")), + Types.TimeType.get() + ) + // Note LocalTime here. Iceberg+Parquet doesn't have a dedicated timetz type. + assertEquals(LocalTime.parse("12:34:56"), result) + } + + @Test + fun `convert handles TimestampNtzValue`() { + val result = + converter.convert( + TimestampWithoutTimezoneValue(LocalDateTime.parse("2024-11-18T12:34:56")), + Types.TimestampType.withoutZone() + ) + assertEquals(LocalDateTime.parse("2024-11-18T12:34:56"), result) + } + + @Test + fun `convert handles TimestampTzValue`() { + val result = + converter.convert( + TimestampWithTimezoneValue(OffsetDateTime.parse("2024-11-18T12:34:56Z")), Types.TimestampType.withZone() ) - } + assertEquals(OffsetDateTime.parse("2024-11-18T12:34:56Z"), result) } @Test @@ -144,7 +175,7 @@ class AirbyteValueToIcebergRecordTest { NestedField.required( 3, "meta", - Types.StructType.of( + StructType.of( NestedField.required(4, "sync_id", Types.IntegerType.get()), NestedField.required( 5, diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDataDumper.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDataDumper.kt index 17697e1f7fba..1a4b25070332 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDataDumper.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDataDumper.kt @@ -113,7 +113,7 @@ class ObjectStorageDataDumper( .map { line -> line .deserializeToNode() - .toAirbyteValue(stream.schema.withAirbyteMeta(wasFlattened)) + .toAirbyteValue() .maybeUnflatten(wasFlattened) .toOutputRecord() } diff --git a/airbyte-cdk/bulk/toolkits/load-parquet/src/main/kotlin/io/airbyte/cdk/load/data/parquet/ParquetMapperPipelineFactory.kt b/airbyte-cdk/bulk/toolkits/load-parquet/src/main/kotlin/io/airbyte/cdk/load/data/parquet/ParquetMapperPipelineFactory.kt index 6e5870c9fc9f..605eb097385d 100644 --- a/airbyte-cdk/bulk/toolkits/load-parquet/src/main/kotlin/io/airbyte/cdk/load/data/parquet/ParquetMapperPipelineFactory.kt +++ b/airbyte-cdk/bulk/toolkits/load-parquet/src/main/kotlin/io/airbyte/cdk/load/data/parquet/ParquetMapperPipelineFactory.kt @@ -6,6 +6,7 @@ package io.airbyte.cdk.load.data.parquet import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.data.AirbyteSchemaNoopMapper +import io.airbyte.cdk.load.data.AirbyteValueDeepCoercingMapper import io.airbyte.cdk.load.data.AirbyteValueNoopMapper import io.airbyte.cdk.load.data.FailOnAllUnknownTypesExceptNull import io.airbyte.cdk.load.data.MapperPipeline @@ -22,10 +23,19 @@ class ParquetMapperPipelineFactory : MapperPipelineFactory { MapperPipeline( stream.schema, listOf( - FailOnAllUnknownTypesExceptNull() to SchemalessValuesToJsonString(), + FailOnAllUnknownTypesExceptNull() to AirbyteValueNoopMapper(), + MergeUnions() to AirbyteValueNoopMapper(), + AirbyteSchemaNoopMapper() to AirbyteValueDeepCoercingMapper(), + // We need to maintain the original ObjectWithNoProperties/etc type. + // For example, if a stream declares no columns, we will (correctly) recognize + // the root schema as ObjectTypeWithEmptySchema. + // If we then map that root schema to StringType, then + // AirbyteTypeToAirbyteTypeWithMeta will crash on it. + // Furthermore, in UnionTypeToDisjointRecord, this enables us to write thes fields + // as "object" rather than as "string". + AirbyteSchemaNoopMapper() to SchemalessValuesToJsonString(), AirbyteSchemaNoopMapper() to NullOutOfRangeIntegers(), AirbyteSchemaNoopMapper() to TimeStringToInteger(), - MergeUnions() to AirbyteValueNoopMapper(), UnionTypeToDisjointRecord() to UnionValueToDisjointRecord(), ), ) diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergExpectedRecordMapper.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergExpectedRecordMapper.kt new file mode 100644 index 000000000000..418c1e9d55fc --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergExpectedRecordMapper.kt @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.iceberg.v2 + +import io.airbyte.cdk.load.data.AirbyteType +import io.airbyte.cdk.load.data.AirbyteValue +import io.airbyte.cdk.load.data.ArrayValue +import io.airbyte.cdk.load.data.ObjectValue +import io.airbyte.cdk.load.data.TimeWithTimezoneValue +import io.airbyte.cdk.load.data.TimeWithoutTimezoneValue +import io.airbyte.cdk.load.test.util.ExpectedRecordMapper +import io.airbyte.cdk.load.test.util.OutputRecord + +/** + * Iceberg doesn't have a TimeWithTimezone type. So map expectedRecords' TimeWithTimezone to + * TimeWithoutTimezone. + */ +object IcebergExpectedRecordMapper : ExpectedRecordMapper { + override fun mapRecord(expectedRecord: OutputRecord, schema: AirbyteType): OutputRecord { + val mappedData = mapTimeTzToTimeNtz(expectedRecord.data) + return expectedRecord.copy(data = mappedData as ObjectValue) + } + + private fun mapTimeTzToTimeNtz(value: AirbyteValue): AirbyteValue = + when (value) { + is TimeWithTimezoneValue -> TimeWithoutTimezoneValue(value.value.toLocalTime()) + is ArrayValue -> ArrayValue(value.values.map { mapTimeTzToTimeNtz(it) }) + is ObjectValue -> + ObjectValue( + value.values.mapValuesTo(linkedMapOf()) { (_, v) -> mapTimeTzToTimeNtz(v) } + ) + else -> value + } +} diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt index 997b6d833994..f2532207c27c 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt @@ -8,7 +8,6 @@ import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.ObjectMapper import io.airbyte.cdk.load.test.util.DestinationCleaner import io.airbyte.cdk.load.test.util.NoopDestinationCleaner -import io.airbyte.cdk.load.test.util.NoopExpectedRecordMapper import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest import io.airbyte.cdk.load.write.StronglyTyped import java.nio.file.Files @@ -30,22 +29,24 @@ abstract class IcebergV2WriteTest( IcebergV2Specification::class.java, IcebergV2DataDumper, destinationCleaner, - NoopExpectedRecordMapper, + IcebergExpectedRecordMapper, isStreamSchemaRetroactive = true, - supportsDedup = false, + supportsDedup = true, stringifySchemalessObjects = true, promoteUnionToObject = true, preserveUndeclaredFields = false, commitDataIncrementally = false, supportFileTransfer = false, envVars = envVars, - allTypesBehavior = StronglyTyped(), + allTypesBehavior = StronglyTyped(integerCanBeLarge = false), nullEqualsUnset = true, ) { @Test - @Disabled("bad values handling for timestamps is currently broken") - override fun testBasicTypes() { - super.testBasicTypes() + @Disabled( + "failing because we have an extra _pos column - that's probably fine, but problem for a different day" + ) + override fun testDedup() { + super.testDedup() } @Test @@ -73,6 +74,18 @@ abstract class IcebergV2WriteTest( override fun testAppendSchemaEvolution() { super.testAppendSchemaEvolution() } + + @Test + @Disabled("This is expected (dest-iceberg-v2 doesn't yet support schema evolution)") + override fun testDedupChangeCursor() { + super.testDedupChangeCursor() + } + + @Test + @Disabled("https://github.com/airbytehq/airbyte-internal-issues/issues/11221") + override fun testUnknownTypes() { + super.testUnknownTypes() + } } class IcebergGlueWriteTest : diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtilTest.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtilTest.kt index d2dddc3e591a..ffda15b3911e 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtilTest.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtilTest.kt @@ -19,7 +19,7 @@ import io.airbyte.cdk.load.data.ObjectType import io.airbyte.cdk.load.data.ObjectValue import io.airbyte.cdk.load.data.StringType import io.airbyte.cdk.load.data.StringValue -import io.airbyte.cdk.load.data.TimestampValue +import io.airbyte.cdk.load.data.TimestampWithTimezoneValue import io.airbyte.cdk.load.data.parquet.ParquetMapperPipelineFactory import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue import io.airbyte.cdk.load.message.Meta @@ -251,7 +251,8 @@ internal class IcebergUtilTest { linkedMapOf( "id" to IntegerValue(42L), "name" to StringValue("John Doe"), - AIRBYTE_CDC_DELETE_COLUMN to TimestampValue("2024-01-01T00:00:00Z"), + AIRBYTE_CDC_DELETE_COLUMN to + TimestampWithTimezoneValue("2024-01-01T00:00:00Z"), ) ), emittedAtMs = System.currentTimeMillis(), diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt index 09cafb4b13e3..47fa340b0fe8 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt @@ -4,8 +4,10 @@ package io.airbyte.integrations.destination.s3_v2 +import io.airbyte.cdk.load.test.util.ExpectedRecordMapper import io.airbyte.cdk.load.test.util.NoopDestinationCleaner import io.airbyte.cdk.load.test.util.NoopExpectedRecordMapper +import io.airbyte.cdk.load.test.util.UncoercedExpectedRecordMapper import io.airbyte.cdk.load.write.AllTypesBehavior import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest import io.airbyte.cdk.load.write.StronglyTyped @@ -18,6 +20,7 @@ import org.junit.jupiter.api.Timeout @Timeout(30, unit = TimeUnit.MINUTES) abstract class S3V2WriteTest( path: String, + expectedRecordMapper: ExpectedRecordMapper, stringifySchemalessObjects: Boolean, promoteUnionToObject: Boolean, preserveUndeclaredFields: Boolean, @@ -33,7 +36,7 @@ abstract class S3V2WriteTest( S3V2Specification::class.java, S3V2DataDumper, NoopDestinationCleaner, - NoopExpectedRecordMapper, + expectedRecordMapper, isStreamSchemaRetroactive = false, supportsDedup = false, stringifySchemalessObjects = stringifySchemalessObjects, @@ -62,45 +65,28 @@ abstract class S3V2WriteTest( class S3V2WriteTestJsonUncompressed : S3V2WriteTest( S3V2TestUtils.JSON_UNCOMPRESSED_CONFIG_PATH, + UncoercedExpectedRecordMapper, stringifySchemalessObjects = false, promoteUnionToObject = false, preserveUndeclaredFields = true, allTypesBehavior = Untyped, - ) { - @Test - override fun testInterruptedTruncateWithPriorData() { - super.testInterruptedTruncateWithPriorData() - } - - @Test - override fun testBasicTypes() { - super.testBasicTypes() - } - - @Test - override fun testBasicWriteFile() { - super.testBasicWriteFile() - } -} + ) class S3V2WriteTestJsonRootLevelFlattening : S3V2WriteTest( S3V2TestUtils.JSON_ROOT_LEVEL_FLATTENING_CONFIG_PATH, + UncoercedExpectedRecordMapper, stringifySchemalessObjects = false, promoteUnionToObject = false, preserveUndeclaredFields = true, allTypesBehavior = Untyped, - ) { - @Test - override fun testInterruptedTruncateWithPriorData() { - super.testInterruptedTruncateWithPriorData() - } -} + ) @Disabled("Un-disable once staging is re-enabled") class S3V2WriteTestJsonStaging : S3V2WriteTest( S3V2TestUtils.JSON_STAGING_CONFIG_PATH, + UncoercedExpectedRecordMapper, stringifySchemalessObjects = false, promoteUnionToObject = false, preserveUndeclaredFields = true, @@ -115,6 +101,7 @@ class S3V2WriteTestJsonStaging : class S3V2WriteTestJsonGzip : S3V2WriteTest( S3V2TestUtils.JSON_GZIP_CONFIG_PATH, + UncoercedExpectedRecordMapper, stringifySchemalessObjects = false, promoteUnionToObject = false, preserveUndeclaredFields = true, @@ -124,6 +111,7 @@ class S3V2WriteTestJsonGzip : class S3V2WriteTestCsvUncompressed : S3V2WriteTest( S3V2TestUtils.CSV_UNCOMPRESSED_CONFIG_PATH, + UncoercedExpectedRecordMapper, stringifySchemalessObjects = false, promoteUnionToObject = false, preserveUndeclaredFields = true, @@ -138,6 +126,7 @@ class S3V2WriteTestCsvUncompressed : class S3V2WriteTestCsvRootLevelFlattening : S3V2WriteTest( S3V2TestUtils.CSV_ROOT_LEVEL_FLATTENING_CONFIG_PATH, + UncoercedExpectedRecordMapper, stringifySchemalessObjects = false, promoteUnionToObject = false, preserveUndeclaredFields = false, @@ -149,6 +138,7 @@ class S3V2WriteTestCsvRootLevelFlattening : class S3V2WriteTestCsvGzip : S3V2WriteTest( S3V2TestUtils.CSV_GZIP_CONFIG_PATH, + UncoercedExpectedRecordMapper, stringifySchemalessObjects = false, promoteUnionToObject = false, preserveUndeclaredFields = true, @@ -158,6 +148,7 @@ class S3V2WriteTestCsvGzip : class S3V2WriteTestAvroUncompressed : S3V2WriteTest( S3V2TestUtils.AVRO_UNCOMPRESSED_CONFIG_PATH, + NoopExpectedRecordMapper, stringifySchemalessObjects = true, promoteUnionToObject = false, preserveUndeclaredFields = false, @@ -169,6 +160,7 @@ class S3V2WriteTestAvroUncompressed : class S3V2WriteTestAvroBzip2 : S3V2WriteTest( S3V2TestUtils.AVRO_BZIP2_CONFIG_PATH, + NoopExpectedRecordMapper, stringifySchemalessObjects = true, promoteUnionToObject = false, preserveUndeclaredFields = false, @@ -180,6 +172,7 @@ class S3V2WriteTestAvroBzip2 : class S3V2WriteTestParquetUncompressed : S3V2WriteTest( S3V2TestUtils.PARQUET_UNCOMPRESSED_CONFIG_PATH, + NoopExpectedRecordMapper, stringifySchemalessObjects = true, promoteUnionToObject = true, preserveUndeclaredFields = false, @@ -191,6 +184,7 @@ class S3V2WriteTestParquetUncompressed : class S3V2WriteTestParquetSnappy : S3V2WriteTest( S3V2TestUtils.PARQUET_SNAPPY_CONFIG_PATH, + NoopExpectedRecordMapper, stringifySchemalessObjects = true, promoteUnionToObject = true, preserveUndeclaredFields = false, @@ -202,6 +196,8 @@ class S3V2WriteTestParquetSnappy : class S3V2WriteTestEndpointURL : S3V2WriteTest( S3V2TestUtils.ENDPOINT_URL_CONFIG_PATH, + // this test is writing to CSV + UncoercedExpectedRecordMapper, stringifySchemalessObjects = false, promoteUnionToObject = false, preserveUndeclaredFields = false, @@ -212,6 +208,8 @@ class S3V2WriteTestEndpointURL : class S3V2AmbiguousFilepath : S3V2WriteTest( S3V2TestUtils.AMBIGUOUS_FILEPATH_CONFIG_PATH, + // this test is writing to CSV + UncoercedExpectedRecordMapper, stringifySchemalessObjects = false, promoteUnionToObject = false, preserveUndeclaredFields = true,