diff --git a/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/AirbyteValueToIcebergRecord.kt b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/AirbyteValueToIcebergRecord.kt index 4fe37446a559d..e3f6dead93fb8 100644 --- a/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/AirbyteValueToIcebergRecord.kt +++ b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/AirbyteValueToIcebergRecord.kt @@ -12,10 +12,12 @@ import io.airbyte.cdk.load.data.NullValue import io.airbyte.cdk.load.data.NumberValue import io.airbyte.cdk.load.data.ObjectValue import io.airbyte.cdk.load.data.StringValue -import io.airbyte.cdk.load.data.TimeValue -import io.airbyte.cdk.load.data.TimestampValue +import io.airbyte.cdk.load.data.TimeWithTimezoneValue +import io.airbyte.cdk.load.data.TimeWithoutTimezoneValue +import io.airbyte.cdk.load.data.TimestampWithTimezoneValue +import io.airbyte.cdk.load.data.TimestampWithoutTimezoneValue import io.airbyte.cdk.load.data.UnknownValue -import io.airbyte.cdk.load.util.TimeStringUtility +import java.time.ZoneOffset import org.apache.iceberg.Schema import org.apache.iceberg.data.GenericRecord import org.apache.iceberg.types.Type @@ -60,24 +62,32 @@ class AirbyteValueToIcebergRecord { return array } is BooleanValue -> return airbyteValue.value - is DateValue -> return TimeStringUtility.toLocalDate(airbyteValue.value) + is DateValue -> return airbyteValue.value is IntegerValue -> return airbyteValue.value.toLong() is NullValue -> return null is NumberValue -> return airbyteValue.value.toDouble() is StringValue -> return airbyteValue.value - is TimeValue -> + is TimeWithTimezoneValue -> return when (type.typeId()) { - Type.TypeID.TIME -> TimeStringUtility.toOffset(airbyteValue.value) + Type.TypeID.TIME -> airbyteValue.value.toLocalTime() else -> throw IllegalArgumentException( "${type.typeId()} type is not allowed for TimeValue" ) } - is TimestampValue -> + is TimeWithoutTimezoneValue -> + return when (type.typeId()) { + Type.TypeID.TIME -> airbyteValue.value + else -> + throw IllegalArgumentException( + "${type.typeId()} type is not allowed for TimeValue" + ) + } + is TimestampWithTimezoneValue -> return when (type.typeId()) { Type.TypeID.TIMESTAMP -> { val timestampType = type as TimestampType - val offsetDateTime = TimeStringUtility.toOffsetDateTime(airbyteValue.value) + val offsetDateTime = airbyteValue.value if (timestampType.shouldAdjustToUTC()) { offsetDateTime @@ -90,6 +100,23 @@ class AirbyteValueToIcebergRecord { "${type.typeId()} type is not allowed for TimestampValue" ) } + is TimestampWithoutTimezoneValue -> + return when (type.typeId()) { + Type.TypeID.TIMESTAMP -> { + val timestampType = type as TimestampType + val localDateTime = airbyteValue.value + + if (timestampType.shouldAdjustToUTC()) { + localDateTime.atOffset(ZoneOffset.UTC) + } else { + localDateTime + } + } + else -> + throw IllegalArgumentException( + "${type.typeId()} type is not allowed for TimestampValue" + ) + } is UnknownValue -> throw IllegalArgumentException("Unknown type is not supported") } } diff --git a/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/IcebergParquetPipelineFactory.kt b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/IcebergParquetPipelineFactory.kt index 4ea15d75cd242..dc4ddd7d2da9a 100644 --- a/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/IcebergParquetPipelineFactory.kt +++ b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/IcebergParquetPipelineFactory.kt @@ -6,11 +6,13 @@ package io.airbyte.cdk.load.data.iceberg.parquet import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.data.AirbyteSchemaNoopMapper +import io.airbyte.cdk.load.data.AirbyteValueDeepCoercingMapper import io.airbyte.cdk.load.data.AirbyteValueNoopMapper import io.airbyte.cdk.load.data.MapperPipeline import io.airbyte.cdk.load.data.MapperPipelineFactory import io.airbyte.cdk.load.data.MergeUnions import io.airbyte.cdk.load.data.NullOutOfRangeIntegers +import io.airbyte.cdk.load.data.SchemalessTypesToStringType import io.airbyte.cdk.load.data.SchemalessValuesToJsonString import io.airbyte.cdk.load.data.UnionTypeToDisjointRecord import io.airbyte.cdk.load.data.UnionValueToDisjointRecord @@ -21,9 +23,10 @@ class IcebergParquetPipelineFactory : MapperPipelineFactory { stream.schema, listOf( // TODO not sure why base parquet was doing this as a noop - SchemalessTypesToStringType() to SchemalessValuesToJsonString(), - AirbyteSchemaNoopMapper() to NullOutOfRangeIntegers(), + SchemalessTypesToStringType to SchemalessValuesToJsonString(), MergeUnions() to AirbyteValueNoopMapper(), + AirbyteSchemaNoopMapper() to AirbyteValueDeepCoercingMapper(), + AirbyteSchemaNoopMapper() to NullOutOfRangeIntegers(), UnionTypeToDisjointRecord() to UnionValueToDisjointRecord(), ), ) diff --git a/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/testFixtures/kotlin/io/airbyte/cdk/load/data/icerberg/parquet/AirbyteValueToIcebergRecordTest.kt b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/testFixtures/kotlin/io/airbyte/cdk/load/data/icerberg/parquet/AirbyteValueToIcebergRecordTest.kt index e7f255e38e631..5bba3951666e8 100644 --- a/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/testFixtures/kotlin/io/airbyte/cdk/load/data/icerberg/parquet/AirbyteValueToIcebergRecordTest.kt +++ b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/testFixtures/kotlin/io/airbyte/cdk/load/data/icerberg/parquet/AirbyteValueToIcebergRecordTest.kt @@ -12,13 +12,20 @@ import io.airbyte.cdk.load.data.NullValue import io.airbyte.cdk.load.data.NumberValue import io.airbyte.cdk.load.data.ObjectValue import io.airbyte.cdk.load.data.StringValue -import io.airbyte.cdk.load.data.TimeValue -import io.airbyte.cdk.load.data.TimestampValue +import io.airbyte.cdk.load.data.TimeWithTimezoneValue +import io.airbyte.cdk.load.data.TimeWithoutTimezoneValue +import io.airbyte.cdk.load.data.TimestampWithTimezoneValue +import io.airbyte.cdk.load.data.TimestampWithoutTimezoneValue import io.airbyte.cdk.load.data.UnknownValue import io.airbyte.cdk.load.data.iceberg.parquet.AirbyteValueToIcebergRecord import io.airbyte.cdk.load.data.iceberg.parquet.toIcebergRecord import io.airbyte.protocol.models.Jsons import java.math.BigDecimal +import java.time.LocalDate +import java.time.LocalDateTime +import java.time.LocalTime +import java.time.OffsetDateTime +import java.time.OffsetTime import org.apache.iceberg.Schema import org.apache.iceberg.data.GenericRecord import org.apache.iceberg.types.Types @@ -81,9 +88,9 @@ class AirbyteValueToIcebergRecordTest { @Test fun `convert throws exception for DateValue`() { - assertThrows { - converter.convert(DateValue("2024-11-18"), Types.DateType.get()) - } + val result = + converter.convert(DateValue(LocalDate.parse("2024-11-18")), Types.DateType.get()) + assertEquals(LocalDate.parse("2024-11-18"), result) } @Test @@ -112,20 +119,44 @@ class AirbyteValueToIcebergRecordTest { } @Test - fun `convert throws exception for TimeValue`() { - assertThrows { - converter.convert(TimeValue("12:34:56"), Types.TimeType.get()) - } + fun `convert handles TimeNtzValue`() { + val result = + converter.convert( + TimeWithoutTimezoneValue(LocalTime.parse("12:34:56")), + Types.TimeType.get() + ) + assertEquals(LocalTime.parse("12:34:56"), result) } @Test - fun `convert throws exception for TimestampValue`() { - assertThrows { + fun `convert handles TimeTzValue`() { + val result = converter.convert( - TimestampValue("2024-11-18T12:34:56Z"), + TimeWithTimezoneValue(OffsetTime.parse("12:34:56Z")), + Types.TimeType.get() + ) + // Note LocalTime here. Iceberg+Parquet doesn't have a dedicated timetz type. + assertEquals(LocalTime.parse("12:34:56"), result) + } + + @Test + fun `convert handles TimestampNtzValue`() { + val result = + converter.convert( + TimestampWithoutTimezoneValue(LocalDateTime.parse("2024-11-18T12:34:56")), + Types.TimestampType.withoutZone() + ) + assertEquals(LocalDateTime.parse("2024-11-18T12:34:56"), result) + } + + @Test + fun `convert handles TimestampTzValue`() { + val result = + converter.convert( + TimestampWithTimezoneValue(OffsetDateTime.parse("2024-11-18T12:34:56Z")), Types.TimestampType.withZone() ) - } + assertEquals(OffsetDateTime.parse("2024-11-18T12:34:56Z"), result) } @Test @@ -144,7 +175,7 @@ class AirbyteValueToIcebergRecordTest { NestedField.required( 3, "meta", - Types.StructType.of( + StructType.of( NestedField.required(4, "sync_id", Types.IntegerType.get()), NestedField.required( 5, diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergExpectedRecordMapper.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergExpectedRecordMapper.kt new file mode 100644 index 0000000000000..418c1e9d55fcc --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergExpectedRecordMapper.kt @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.iceberg.v2 + +import io.airbyte.cdk.load.data.AirbyteType +import io.airbyte.cdk.load.data.AirbyteValue +import io.airbyte.cdk.load.data.ArrayValue +import io.airbyte.cdk.load.data.ObjectValue +import io.airbyte.cdk.load.data.TimeWithTimezoneValue +import io.airbyte.cdk.load.data.TimeWithoutTimezoneValue +import io.airbyte.cdk.load.test.util.ExpectedRecordMapper +import io.airbyte.cdk.load.test.util.OutputRecord + +/** + * Iceberg doesn't have a TimeWithTimezone type. So map expectedRecords' TimeWithTimezone to + * TimeWithoutTimezone. + */ +object IcebergExpectedRecordMapper : ExpectedRecordMapper { + override fun mapRecord(expectedRecord: OutputRecord, schema: AirbyteType): OutputRecord { + val mappedData = mapTimeTzToTimeNtz(expectedRecord.data) + return expectedRecord.copy(data = mappedData as ObjectValue) + } + + private fun mapTimeTzToTimeNtz(value: AirbyteValue): AirbyteValue = + when (value) { + is TimeWithTimezoneValue -> TimeWithoutTimezoneValue(value.value.toLocalTime()) + is ArrayValue -> ArrayValue(value.values.map { mapTimeTzToTimeNtz(it) }) + is ObjectValue -> + ObjectValue( + value.values.mapValuesTo(linkedMapOf()) { (_, v) -> mapTimeTzToTimeNtz(v) } + ) + else -> value + } +} diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt index d09cb90af109c..3d7911beb683c 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt @@ -8,7 +8,6 @@ import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.ObjectMapper import io.airbyte.cdk.load.test.util.DestinationCleaner import io.airbyte.cdk.load.test.util.NoopDestinationCleaner -import io.airbyte.cdk.load.test.util.NoopExpectedRecordMapper import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest import io.airbyte.cdk.load.write.StronglyTyped import java.nio.file.Files @@ -29,21 +28,23 @@ abstract class IcebergV2WriteTest( IcebergV2Specification::class.java, IcebergV2DataDumper, destinationCleaner, - NoopExpectedRecordMapper, + IcebergExpectedRecordMapper, isStreamSchemaRetroactive = true, - supportsDedup = false, + supportsDedup = true, stringifySchemalessObjects = true, promoteUnionToObject = true, preserveUndeclaredFields = false, commitDataIncrementally = false, supportFileTransfer = false, - allTypesBehavior = StronglyTyped(), + allTypesBehavior = StronglyTyped(integerCanBeLarge = false), nullEqualsUnset = true, ) { @Test - @Disabled("bad values handling for timestamps is currently broken") - override fun testBasicTypes() { - super.testBasicTypes() + @Disabled( + "failing because we have an extra _pos column - that's probably fine, but problem for a different day" + ) + override fun testDedup() { + super.testDedup() } @Test @@ -71,6 +72,12 @@ abstract class IcebergV2WriteTest( override fun testAppendSchemaEvolution() { super.testAppendSchemaEvolution() } + + @Test + @Disabled("This is expected (dest-iceberg-v2 doesn't yet support schema evolution)") + override fun testDedupChangeCursor() { + super.testDedupChangeCursor() + } } class IcebergGlueWriteTest :