Skip to content

Commit

Permalink
fix iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Dec 20, 2024
1 parent f01cc09 commit e6612bd
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ import io.airbyte.cdk.load.data.NullValue
import io.airbyte.cdk.load.data.NumberValue
import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.data.StringValue
import io.airbyte.cdk.load.data.TimeValue
import io.airbyte.cdk.load.data.TimestampValue
import io.airbyte.cdk.load.data.TimeWithTimezoneValue
import io.airbyte.cdk.load.data.TimeWithoutTimezoneValue
import io.airbyte.cdk.load.data.TimestampWithTimezoneValue
import io.airbyte.cdk.load.data.TimestampWithoutTimezoneValue
import io.airbyte.cdk.load.data.UnknownValue
import io.airbyte.cdk.load.util.TimeStringUtility
import java.time.ZoneOffset
import org.apache.iceberg.Schema
import org.apache.iceberg.data.GenericRecord
import org.apache.iceberg.types.Type
Expand Down Expand Up @@ -60,24 +62,32 @@ class AirbyteValueToIcebergRecord {
return array
}
is BooleanValue -> return airbyteValue.value
is DateValue -> return TimeStringUtility.toLocalDate(airbyteValue.value)
is DateValue -> return airbyteValue.value
is IntegerValue -> return airbyteValue.value.toLong()
is NullValue -> return null
is NumberValue -> return airbyteValue.value.toDouble()
is StringValue -> return airbyteValue.value
is TimeValue ->
is TimeWithTimezoneValue ->
return when (type.typeId()) {
Type.TypeID.TIME -> TimeStringUtility.toOffset(airbyteValue.value)
Type.TypeID.TIME -> airbyteValue.value.toLocalTime()
else ->
throw IllegalArgumentException(
"${type.typeId()} type is not allowed for TimeValue"
)
}
is TimestampValue ->
is TimeWithoutTimezoneValue ->
return when (type.typeId()) {
Type.TypeID.TIME -> airbyteValue.value
else ->
throw IllegalArgumentException(
"${type.typeId()} type is not allowed for TimeValue"
)
}
is TimestampWithTimezoneValue ->
return when (type.typeId()) {
Type.TypeID.TIMESTAMP -> {
val timestampType = type as TimestampType
val offsetDateTime = TimeStringUtility.toOffsetDateTime(airbyteValue.value)
val offsetDateTime = airbyteValue.value

if (timestampType.shouldAdjustToUTC()) {
offsetDateTime
Expand All @@ -90,6 +100,23 @@ class AirbyteValueToIcebergRecord {
"${type.typeId()} type is not allowed for TimestampValue"
)
}
is TimestampWithoutTimezoneValue ->
return when (type.typeId()) {
Type.TypeID.TIMESTAMP -> {
val timestampType = type as TimestampType
val localDateTime = airbyteValue.value

if (timestampType.shouldAdjustToUTC()) {
localDateTime.atOffset(ZoneOffset.UTC)
} else {
localDateTime
}
}
else ->
throw IllegalArgumentException(
"${type.typeId()} type is not allowed for TimestampValue"
)
}
is UnknownValue -> throw IllegalArgumentException("Unknown type is not supported")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ package io.airbyte.cdk.load.data.iceberg.parquet

import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.AirbyteSchemaNoopMapper
import io.airbyte.cdk.load.data.AirbyteValueDeepCoercingMapper
import io.airbyte.cdk.load.data.AirbyteValueNoopMapper
import io.airbyte.cdk.load.data.MapperPipeline
import io.airbyte.cdk.load.data.MapperPipelineFactory
import io.airbyte.cdk.load.data.MergeUnions
import io.airbyte.cdk.load.data.NullOutOfRangeIntegers
import io.airbyte.cdk.load.data.SchemalessTypesToStringType
import io.airbyte.cdk.load.data.SchemalessValuesToJsonString
import io.airbyte.cdk.load.data.UnionTypeToDisjointRecord
import io.airbyte.cdk.load.data.UnionValueToDisjointRecord
Expand All @@ -21,9 +23,10 @@ class IcebergParquetPipelineFactory : MapperPipelineFactory {
stream.schema,
listOf(
// TODO not sure why base parquet was doing this as a noop
SchemalessTypesToStringType() to SchemalessValuesToJsonString(),
AirbyteSchemaNoopMapper() to NullOutOfRangeIntegers(),
SchemalessTypesToStringType to SchemalessValuesToJsonString(),
MergeUnions() to AirbyteValueNoopMapper(),
AirbyteSchemaNoopMapper() to AirbyteValueDeepCoercingMapper(),
AirbyteSchemaNoopMapper() to NullOutOfRangeIntegers(),
UnionTypeToDisjointRecord() to UnionValueToDisjointRecord(),
),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,20 @@ import io.airbyte.cdk.load.data.NullValue
import io.airbyte.cdk.load.data.NumberValue
import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.data.StringValue
import io.airbyte.cdk.load.data.TimeValue
import io.airbyte.cdk.load.data.TimestampValue
import io.airbyte.cdk.load.data.TimeWithTimezoneValue
import io.airbyte.cdk.load.data.TimeWithoutTimezoneValue
import io.airbyte.cdk.load.data.TimestampWithTimezoneValue
import io.airbyte.cdk.load.data.TimestampWithoutTimezoneValue
import io.airbyte.cdk.load.data.UnknownValue
import io.airbyte.cdk.load.data.iceberg.parquet.AirbyteValueToIcebergRecord
import io.airbyte.cdk.load.data.iceberg.parquet.toIcebergRecord
import io.airbyte.protocol.models.Jsons
import java.math.BigDecimal
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.LocalTime
import java.time.OffsetDateTime
import java.time.OffsetTime
import org.apache.iceberg.Schema
import org.apache.iceberg.data.GenericRecord
import org.apache.iceberg.types.Types
Expand Down Expand Up @@ -81,9 +88,9 @@ class AirbyteValueToIcebergRecordTest {

@Test
fun `convert throws exception for DateValue`() {
assertThrows<IllegalArgumentException> {
converter.convert(DateValue("2024-11-18"), Types.DateType.get())
}
val result =
converter.convert(DateValue(LocalDate.parse("2024-11-18")), Types.DateType.get())
assertEquals(LocalDate.parse("2024-11-18"), result)
}

@Test
Expand Down Expand Up @@ -112,20 +119,44 @@ class AirbyteValueToIcebergRecordTest {
}

@Test
fun `convert throws exception for TimeValue`() {
assertThrows<IllegalArgumentException> {
converter.convert(TimeValue("12:34:56"), Types.TimeType.get())
}
fun `convert handles TimeNtzValue`() {
val result =
converter.convert(
TimeWithoutTimezoneValue(LocalTime.parse("12:34:56")),
Types.TimeType.get()
)
assertEquals(LocalTime.parse("12:34:56"), result)
}

@Test
fun `convert throws exception for TimestampValue`() {
assertThrows<IllegalArgumentException> {
fun `convert handles TimeTzValue`() {
val result =
converter.convert(
TimestampValue("2024-11-18T12:34:56Z"),
TimeWithTimezoneValue(OffsetTime.parse("12:34:56Z")),
Types.TimeType.get()
)
// Note LocalTime here. Iceberg+Parquet doesn't have a dedicated timetz type.
assertEquals(LocalTime.parse("12:34:56"), result)
}

@Test
fun `convert handles TimestampNtzValue`() {
val result =
converter.convert(
TimestampWithoutTimezoneValue(LocalDateTime.parse("2024-11-18T12:34:56")),
Types.TimestampType.withoutZone()
)
assertEquals(LocalDateTime.parse("2024-11-18T12:34:56"), result)
}

@Test
fun `convert handles TimestampTzValue`() {
val result =
converter.convert(
TimestampWithTimezoneValue(OffsetDateTime.parse("2024-11-18T12:34:56Z")),
Types.TimestampType.withZone()
)
}
assertEquals(OffsetDateTime.parse("2024-11-18T12:34:56Z"), result)
}

@Test
Expand All @@ -144,7 +175,7 @@ class AirbyteValueToIcebergRecordTest {
NestedField.required(
3,
"meta",
Types.StructType.of(
StructType.of(
NestedField.required(4, "sync_id", Types.IntegerType.get()),
NestedField.required(
5,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.iceberg.v2

import io.airbyte.cdk.load.data.AirbyteType
import io.airbyte.cdk.load.data.AirbyteValue
import io.airbyte.cdk.load.data.ArrayValue
import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.data.TimeWithTimezoneValue
import io.airbyte.cdk.load.data.TimeWithoutTimezoneValue
import io.airbyte.cdk.load.test.util.ExpectedRecordMapper
import io.airbyte.cdk.load.test.util.OutputRecord

/**
* Iceberg doesn't have a TimeWithTimezone type. So map expectedRecords' TimeWithTimezone to
* TimeWithoutTimezone.
*/
object IcebergExpectedRecordMapper : ExpectedRecordMapper {
override fun mapRecord(expectedRecord: OutputRecord, schema: AirbyteType): OutputRecord {
val mappedData = mapTimeTzToTimeNtz(expectedRecord.data)
return expectedRecord.copy(data = mappedData as ObjectValue)
}

private fun mapTimeTzToTimeNtz(value: AirbyteValue): AirbyteValue =
when (value) {
is TimeWithTimezoneValue -> TimeWithoutTimezoneValue(value.value.toLocalTime())
is ArrayValue -> ArrayValue(value.values.map { mapTimeTzToTimeNtz(it) })
is ObjectValue ->
ObjectValue(
value.values.mapValuesTo(linkedMapOf()) { (_, v) -> mapTimeTzToTimeNtz(v) }
)
else -> value
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import io.airbyte.cdk.load.test.util.DestinationCleaner
import io.airbyte.cdk.load.test.util.NoopDestinationCleaner
import io.airbyte.cdk.load.test.util.NoopExpectedRecordMapper
import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest
import io.airbyte.cdk.load.write.StronglyTyped
import java.nio.file.Files
Expand All @@ -29,21 +28,23 @@ abstract class IcebergV2WriteTest(
IcebergV2Specification::class.java,
IcebergV2DataDumper,
destinationCleaner,
NoopExpectedRecordMapper,
IcebergExpectedRecordMapper,
isStreamSchemaRetroactive = true,
supportsDedup = false,
supportsDedup = true,
stringifySchemalessObjects = true,
promoteUnionToObject = true,
preserveUndeclaredFields = false,
commitDataIncrementally = false,
supportFileTransfer = false,
allTypesBehavior = StronglyTyped(),
allTypesBehavior = StronglyTyped(integerCanBeLarge = false),
nullEqualsUnset = true,
) {
@Test
@Disabled("bad values handling for timestamps is currently broken")
override fun testBasicTypes() {
super.testBasicTypes()
@Disabled(
"failing because we have an extra _pos column - that's probably fine, but problem for a different day"
)
override fun testDedup() {
super.testDedup()
}

@Test
Expand Down Expand Up @@ -71,6 +72,12 @@ abstract class IcebergV2WriteTest(
override fun testAppendSchemaEvolution() {
super.testAppendSchemaEvolution()
}

@Test
@Disabled("This is expected (dest-iceberg-v2 doesn't yet support schema evolution)")
override fun testDedupChangeCursor() {
super.testDedupChangeCursor()
}
}

class IcebergGlueWriteTest :
Expand Down

0 comments on commit e6612bd

Please sign in to comment.