diff --git a/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/RelationalColumnCustomConverter.kt b/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/RelationalColumnCustomConverter.kt index 67367a93126f..ea2bec28a545 100644 --- a/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/RelationalColumnCustomConverter.kt +++ b/airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/RelationalColumnCustomConverter.kt @@ -7,10 +7,10 @@ package io.airbyte.cdk.read.cdc import io.debezium.spi.converter.CustomConverter import io.debezium.spi.converter.RelationalColumn import java.util.Properties -import org.apache.kafka.connect.data.Schema +import org.apache.kafka.connect.data.SchemaBuilder /** Used by Debezium to transform record values into their expected format. */ -interface RelationalColumnCustomConverter : CustomConverter { +interface RelationalColumnCustomConverter : CustomConverter { /** A nice name for use in Debezium properties. */ val debeziumPropertiesKey: String @@ -18,30 +18,29 @@ interface RelationalColumnCustomConverter : CustomConverter - data class Handler( + interface Handler { /** Predicate to match the column by. */ - val predicate: (RelationalColumn) -> Boolean, + fun matches(column: RelationalColumn): Boolean + /** Schema of the output values. */ - val outputSchema: Schema, + fun outputSchemaBuilder(): SchemaBuilder + /** Partial conversion functions, applied in sequence until conversion occurs. */ val partialConverters: List - ) + } override fun configure(props: Properties?) {} override fun converterFor( column: RelationalColumn?, - registration: CustomConverter.ConverterRegistration? + registration: CustomConverter.ConverterRegistration? ) { if (column == null || registration == null) { return } - for (handler in handlers) { - if (!handler.predicate(column)) continue - val converter: CustomConverter.Converter = - ConverterFactory(javaClass).build(column, handler.partialConverters) - registration.register(handler.outputSchema, converter) - return - } + val handler: Handler = handlers.find { it.matches(column) } ?: return + val converter: CustomConverter.Converter = + ConverterFactory(javaClass).build(column, handler.partialConverters) + registration.register(handler.outputSchemaBuilder(), converter) } } diff --git a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/jdbc/converters/DataTypeUtils.kt b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/jdbc/converters/DataTypeUtils.kt deleted file mode 100644 index 2bb7ab4c4f2c..000000000000 --- a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/jdbc/converters/DataTypeUtils.kt +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ -package io.airbyte.cdk.jdbc.converters - -import java.time.* -import java.time.format.DateTimeFormatter - -/** - * TODO : Replace all the DateTime related logic of this class with - * [io.airbyte.cdk.db.jdbc.DateTimeConverter] - */ -object DataTypeUtils { - val TIME_FORMATTER: DateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSS") - val TIMESTAMP_FORMATTER: DateTimeFormatter = - DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSS") - val TIMETZ_FORMATTER: DateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSSXXX") - val TIMESTAMPTZ_FORMATTER: DateTimeFormatter = - DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSSXXX") - val DATE_FORMATTER: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd") -} diff --git a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/jdbc/converters/DateTimeConverter.kt b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/jdbc/converters/DateTimeConverter.kt deleted file mode 100644 index 04eed03be47e..000000000000 --- a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/jdbc/converters/DateTimeConverter.kt +++ /dev/null @@ -1,259 +0,0 @@ -/* - * Copyright (c) 2024 Airbyte, Inc., all rights reserved. - */ -package io.airbyte.cdk.jdbc.converters - -import io.github.oshai.kotlinlogging.KotlinLogging -import java.sql.* -import java.time.* -import java.time.chrono.IsoEra -import java.time.format.DateTimeFormatter -import java.util.concurrent.* -import kotlin.math.abs -import kotlin.math.min - -private val LOGGER = KotlinLogging.logger {} - -object DateTimeConverter { - private val ONE_CE: Date = Date.valueOf("0001-01-01") - val TIME_WITH_TIMEZONE_FORMATTER: DateTimeFormatter = - DateTimeFormatter.ofPattern( - "HH:mm:ss[.][SSSSSSSSS][SSSSSSS][SSSSSS][SSSSS][SSSS][SSS][SS][S][''][XXX][XX][X]" - ) - private var loggedUnknownTimeWithTimeZoneClass = false - private var loggedUnknownTimeClass = false - private var loggedUnknownTimestampWithTimeZoneClass = false - private var loggedUnknownTimestampClass = false - private var loggedUnknownDateClass = false - - @JvmStatic - fun convertToTimeWithTimezone(time: Any): String { - if (time is OffsetTime) { - return if (hasZeroSecondsAndNanos(time.toLocalTime())) - time.format(DataTypeUtils.TIMETZ_FORMATTER) - else time.toString() - } else { - if (!loggedUnknownTimeWithTimeZoneClass) { - LOGGER.info { "Unknown class for Time with timezone data type ${time.javaClass}" } - loggedUnknownTimeWithTimeZoneClass = true - } - val timetz = OffsetTime.parse(time.toString(), TIME_WITH_TIMEZONE_FORMATTER) - return if (hasZeroSecondsAndNanos(timetz.toLocalTime())) - timetz.format(DataTypeUtils.TIMETZ_FORMATTER) - else timetz.toString() - } - } - - @JvmStatic - fun convertToTimestampWithTimezone(timestamp: Any): String { - if (timestamp is Timestamp) { - // In snapshot mode, debezium produces a java.sql.Timestamp object for the TIMESTAMPTZ - // type. - // Conceptually, a timestamp with timezone is an Instant. But t.toInstant() actually - // mangles the - // value for ancient dates, because leap years weren't applied consistently in ye olden - // days. - // Additionally, toInstant() (and toLocalDateTime()) actually lose the era indicator, so - // we can't - // rely on their getEra() methods. - // So we have special handling for this case, which sidesteps the toInstant conversion. - val timestamptz: ZonedDateTime = timestamp.toLocalDateTime().atZone(ZoneOffset.UTC) - val value = timestamptz.format(DataTypeUtils.TIMESTAMPTZ_FORMATTER) - return resolveEra(timestamp, value) - } else if (timestamp is OffsetDateTime) { - return resolveEra( - timestamp.toLocalDate(), - timestamp.format(DataTypeUtils.TIMESTAMPTZ_FORMATTER) - ) - } else if (timestamp is ZonedDateTime) { - return resolveEra( - timestamp.toLocalDate(), - timestamp.format(DataTypeUtils.TIMESTAMPTZ_FORMATTER) - ) - } else if (timestamp is Instant) { - val offsetDateTime = OffsetDateTime.ofInstant(timestamp, ZoneOffset.UTC) - val timestamptz = ZonedDateTime.from(offsetDateTime) - val localDate = timestamptz.toLocalDate() - val value = timestamptz.format(DataTypeUtils.TIMESTAMPTZ_FORMATTER) - return resolveEra(localDate, value) - } else { - if (!loggedUnknownTimestampWithTimeZoneClass) { - LOGGER.info { - "Unknown class for Timestamp with time zone data type ${timestamp.javaClass}" - } - loggedUnknownTimestampWithTimeZoneClass = true - } - val instant = Instant.parse(timestamp.toString()) - val offsetDateTime = OffsetDateTime.ofInstant(instant, ZoneOffset.UTC) - val timestamptz = ZonedDateTime.from(offsetDateTime) - val localDate = timestamptz.toLocalDate() - val value = timestamptz.format(DataTypeUtils.TIMESTAMPTZ_FORMATTER) - return resolveEra(localDate, value) - } - } - - /** See [.convertToTimestampWithTimezone] for explanation of the weird things happening here. */ - @JvmStatic - fun convertToTimestamp(timestamp: Any): String { - if (timestamp is Timestamp) { - // Snapshot mode - val localDateTime: LocalDateTime = timestamp.toLocalDateTime() - return resolveEra( - timestamp, - if (hasZeroSecondsAndNanos(localDateTime.toLocalTime())) - localDateTime.format(DataTypeUtils.TIMESTAMP_FORMATTER) - else localDateTime.toString() - ) - } else if (timestamp is Instant) { - // Incremental mode - return resolveEra( - timestamp.atZone(ZoneOffset.UTC).toLocalDate(), - timestamp - .atOffset(ZoneOffset.UTC) - .toLocalDateTime() - .format(DataTypeUtils.TIMESTAMP_FORMATTER) - ) - } else if (timestamp is LocalDateTime) { - val date: LocalDate = timestamp.toLocalDate() - return resolveEra( - date, - if (hasZeroSecondsAndNanos(timestamp.toLocalTime())) - timestamp.format(DataTypeUtils.TIMESTAMP_FORMATTER) - else timestamp.toString() - ) - } else { - if (!loggedUnknownTimestampClass) { - LOGGER.info { "Unknown class for Timestamp data type ${timestamp.javaClass}" } - loggedUnknownTimestampClass = true - } - val localDateTime = LocalDateTime.parse(timestamp.toString()) - val date = localDateTime.toLocalDate() - return resolveEra( - date, - if (hasZeroSecondsAndNanos(localDateTime.toLocalTime())) - localDateTime.format(DataTypeUtils.TIMESTAMP_FORMATTER) - else localDateTime.toString() - ) - } - } - - /** See [.convertToTimestampWithTimezone] for explanation of the weird things happening here. */ - @JvmStatic - fun convertToDate(date: Any): String { - if (date is Date) { - // Snapshot mode - val localDate = date.toLocalDate() - return resolveEra(date, localDate.format(DataTypeUtils.DATE_FORMATTER)) - } else if (date is LocalDate) { - // Incremental mode - return resolveEra(date, date.format(DataTypeUtils.DATE_FORMATTER)) - } else if (date is Int) { - // Incremental mode - return LocalDate.ofEpochDay(date.toLong()).format(DataTypeUtils.DATE_FORMATTER) - } else { - if (!loggedUnknownDateClass) { - LOGGER.info { "Unknown class for Date data type${date.javaClass}" } - loggedUnknownDateClass = true - } - val localDate = LocalDate.parse(date.toString()) - return resolveEra(localDate, localDate.format(DataTypeUtils.DATE_FORMATTER)) - } - } - - @JvmStatic - fun convertToTime(time: Any): String { - if (time is Time) { - return formatTime(time.toLocalTime()) - } else if (time is LocalTime) { - return formatTime(time) - } else if (time is Duration) { - val value = time.toNanos() - if (value >= 0 && value < TimeUnit.DAYS.toNanos(1)) { - return formatTime(LocalTime.ofNanoOfDay(value)) - } else { - val updatedValue = - min(abs(value.toDouble()), LocalTime.MAX.toNanoOfDay().toDouble()).toLong() - LOGGER.debug { - "Time values must use number of nanoseconds greater than 0 and less than 86400000000000 but its $value, converting to $updatedValue " - } - return formatTime(LocalTime.ofNanoOfDay(updatedValue)) - } - } else { - if (!loggedUnknownTimeClass) { - LOGGER.info { "Unknown class for Time data type ${time.javaClass}" } - loggedUnknownTimeClass = true - } - - val valueAsString = time.toString() - if (valueAsString.startsWith("24")) { - LOGGER.debug { - "Time value ${valueAsString} is above range, converting to 23:59:59" - } - return LocalTime.MAX.toString() - } - return formatTime(LocalTime.parse(valueAsString)) - } - } - - @JvmStatic - private fun formatTime(localTime: LocalTime): String { - return if (hasZeroSecondsAndNanos(localTime)) localTime.format(DataTypeUtils.TIME_FORMATTER) - else localTime.toString() - } - - @JvmStatic - fun hasZeroSecondsAndNanos(localTime: LocalTime): Boolean { - return (localTime.second == 0 && localTime.nano == 0) - } - - /** - * Modifies a string representation of a date/timestamp and normalizes its era indicator. - * Specifically, if this is a BCE value: - * - * * The leading negative sign will be removed if present - * * The "BC" suffix will be appended, if not already present - * - * You most likely would prefer to call one of the overloaded methods, which accept temporal - * types. - */ - fun resolveEra(isBce: Boolean, value: String): String { - var mangledValue = value - if (isBce) { - if (mangledValue.startsWith("-")) { - mangledValue = mangledValue.substring(1) - } - if (!mangledValue.endsWith(" BC")) { - mangledValue += " BC" - } - } - return mangledValue - } - - fun isBce(date: LocalDate): Boolean { - return date.era == IsoEra.BCE - } - - @JvmStatic - fun resolveEra(date: LocalDate, value: String): String { - return resolveEra(isBce(date), value) - } - - /** - * java.sql.Date objects don't properly represent their era (for example, using toLocalDate() - * always returns an object in CE). So to determine the era, we just check whether the date is - * before 1 AD. - * - * This is technically kind of sketchy due to ancient timestamps being weird (leap years, etc.), - * but my understanding is that [.ONE_CE] has the same weirdness, so it cancels out. - */ - @JvmStatic - fun resolveEra(date: Date, value: String): String { - return resolveEra(date.before(ONE_CE), value) - } - - /** See [.resolveEra] for explanation. */ - @JvmStatic - fun resolveEra(timestamp: Timestamp, value: String): String { - return resolveEra(timestamp.before(ONE_CE), value) - } -} diff --git a/airbyte-integrations/connectors/destination-dev-null/src/test-integration/kotlin/io/airbyte/integrations/destination/dev_null/DevNullBasicFunctionalityIntegrationTest.kt b/airbyte-integrations/connectors/destination-dev-null/src/test-integration/kotlin/io/airbyte/integrations/destination/dev_null/DevNullBasicFunctionalityIntegrationTest.kt index cf46192004c5..76c52d3cb95d 100644 --- a/airbyte-integrations/connectors/destination-dev-null/src/test-integration/kotlin/io/airbyte/integrations/destination/dev_null/DevNullBasicFunctionalityIntegrationTest.kt +++ b/airbyte-integrations/connectors/destination-dev-null/src/test-integration/kotlin/io/airbyte/integrations/destination/dev_null/DevNullBasicFunctionalityIntegrationTest.kt @@ -40,6 +40,4 @@ class DevNullBasicFunctionalityIntegrationTest : } @Test @Disabled("File transfer is not supported") override fun testBasicWriteFile() {} - - @Test @Disabled("DevNull does not support Unknown types") override fun testUnknownTypes() {} } diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt index fed78ffa76dd..21083abca4fa 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt @@ -92,9 +92,6 @@ class IcebergGlueWriteTest : ) ) ) { - @Test - @Disabled("dest iceberge-v2 doesn't support unknown types") - override fun testUnknownTypes() {} @Test override fun testBasicWrite() { diff --git a/airbyte-integrations/connectors/source-mysql/build.gradle b/airbyte-integrations/connectors/source-mysql/build.gradle index 008277f47aac..40175176fa1a 100644 --- a/airbyte-integrations/connectors/source-mysql/build.gradle +++ b/airbyte-integrations/connectors/source-mysql/build.gradle @@ -9,7 +9,7 @@ application { airbyteBulkConnector { core = 'extract' toolkits = ['extract-jdbc', 'extract-cdc'] - cdk = '0.249' + cdk = '0.255' } dependencies { diff --git a/airbyte-integrations/connectors/source-mysql/metadata.yaml b/airbyte-integrations/connectors/source-mysql/metadata.yaml index 5b9b06bbdfab..5d5187ef2f87 100644 --- a/airbyte-integrations/connectors/source-mysql/metadata.yaml +++ b/airbyte-integrations/connectors/source-mysql/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad - dockerImageTag: 3.10.0-rc.5 + dockerImageTag: 3.10.0-rc.8 dockerRepository: airbyte/source-mysql documentationUrl: https://docs.airbyte.com/integrations/sources/mysql githubIssueLabel: source-mysql @@ -17,6 +17,8 @@ data: license: ELv2 maxSecondsBetweenMessages: 7200 name: MySQL + connectorBuildOptions: + baseImage: docker.io/airbyte/java-connector-base:1.0.0@sha256:be86e5684e1e6d9280512d3d8071b47153698fe08ad990949c8eeff02803201a registryOverrides: cloud: enabled: true diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcBooleanConverter.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcBooleanConverter.kt index 0f9f04cdccd1..b29fbcdea228 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcBooleanConverter.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcBooleanConverter.kt @@ -9,27 +9,27 @@ import io.airbyte.cdk.read.cdc.NoConversion import io.airbyte.cdk.read.cdc.NullFallThrough import io.airbyte.cdk.read.cdc.PartialConverter import io.airbyte.cdk.read.cdc.RelationalColumnCustomConverter +import io.debezium.spi.converter.RelationalColumn import org.apache.kafka.connect.data.SchemaBuilder class MySqlSourceCdcBooleanConverter : RelationalColumnCustomConverter { override val debeziumPropertiesKey: String = "boolean" - override val handlers: List = listOf(tinyint1Handler) - - companion object { - val tinyint1Handler = - RelationalColumnCustomConverter.Handler( - predicate = { - it.typeName().equals("TINYINT", ignoreCase = true) && - it.length().isPresent && - it.length().asInt == 1 - }, - outputSchema = SchemaBuilder.bool(), - partialConverters = - listOf( - NullFallThrough, - PartialConverter { if (it is Number) Converted(it != 0) else NoConversion } - ) + override val handlers: List = listOf(TinyInt1Handler) + + data object TinyInt1Handler : RelationalColumnCustomConverter.Handler { + + override fun matches(column: RelationalColumn): Boolean = + column.typeName().equals("TINYINT", ignoreCase = true) && + column.length().isPresent && + column.length().asInt == 1 + + override fun outputSchemaBuilder(): SchemaBuilder = SchemaBuilder.bool() + + override val partialConverters: List = + listOf( + NullFallThrough, + PartialConverter { if (it is Number) Converted(it != 0) else NoConversion } ) } } diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcTemporalConverter.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcTemporalConverter.kt index cecdeed92ec6..ded4475e2c2e 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcTemporalConverter.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCdcTemporalConverter.kt @@ -13,6 +13,7 @@ import io.airbyte.cdk.read.cdc.NoConversion import io.airbyte.cdk.read.cdc.NullFallThrough import io.airbyte.cdk.read.cdc.PartialConverter import io.airbyte.cdk.read.cdc.RelationalColumnCustomConverter +import io.debezium.spi.converter.RelationalColumn import java.time.Duration import java.time.Instant import java.time.LocalDate @@ -30,159 +31,166 @@ class MySqlSourceCdcTemporalConverter : RelationalColumnCustomConverter { override val handlers: List = listOf( - datetimeMillisHandler, - datetimeMicrosHandler, - dateHandler, - timeHandler, - timestampHandler + DatetimeMillisHandler, + DatetimeMicrosHandler, + DateHandler, + TimeHandler, + TimestampHandler ) - companion object { + data object DatetimeMillisHandler : RelationalColumnCustomConverter.Handler { - val datetimeMillisHandler = - RelationalColumnCustomConverter.Handler( - predicate = { - it.typeName().equals("DATETIME", ignoreCase = true) && - it.length().orElse(0) <= 3 + override fun matches(column: RelationalColumn): Boolean = + column.typeName().equals("DATETIME", ignoreCase = true) && + column.length().orElse(0) <= 3 + + override fun outputSchemaBuilder(): SchemaBuilder = SchemaBuilder.string() + + override val partialConverters: List = + listOf( + NullFallThrough, + PartialConverter { + if (it is LocalDateTime) { + Converted(it.format(LocalDateTimeCodec.formatter)) + } else { + NoConversion + } }, - outputSchema = SchemaBuilder.string(), - partialConverters = - listOf( - NullFallThrough, - PartialConverter { - if (it is LocalDateTime) { - Converted(it.format(LocalDateTimeCodec.formatter)) - } else { - NoConversion - } - }, - PartialConverter { - // Required for default values. - if (it is Number) { - val delta: Duration = Duration.ofMillis(it.toLong()) - val instant: Instant = Instant.EPOCH.plus(delta) - val localDateTime: LocalDateTime = - LocalDateTime.ofInstant(instant, ZoneOffset.UTC) - Converted(localDateTime.format(LocalDateTimeCodec.formatter)) - } else { - NoConversion - } - } - ) + PartialConverter { + // Required for default values. + if (it is Number) { + val delta: Duration = Duration.ofMillis(it.toLong()) + val instant: Instant = Instant.EPOCH.plus(delta) + val localDateTime: LocalDateTime = + LocalDateTime.ofInstant(instant, ZoneOffset.UTC) + Converted(localDateTime.format(LocalDateTimeCodec.formatter)) + } else { + NoConversion + } + } ) + } + + data object DatetimeMicrosHandler : RelationalColumnCustomConverter.Handler { + + override fun matches(column: RelationalColumn): Boolean = + column.typeName().equals("DATETIME", ignoreCase = true) && column.length().orElse(0) > 3 - val datetimeMicrosHandler = - RelationalColumnCustomConverter.Handler( - predicate = { - it.typeName().equals("DATETIME", ignoreCase = true) && it.length().orElse(0) > 3 + override fun outputSchemaBuilder(): SchemaBuilder = SchemaBuilder.string() + + override val partialConverters: List = + listOf( + NullFallThrough, + PartialConverter { + if (it is LocalDateTime) { + Converted(it.format(LocalDateTimeCodec.formatter)) + } else { + NoConversion + } }, - outputSchema = SchemaBuilder.string(), - partialConverters = - listOf( - NullFallThrough, - PartialConverter { - if (it is LocalDateTime) { - Converted(it.format(LocalDateTimeCodec.formatter)) - } else { - NoConversion - } - }, - PartialConverter { - // Required for default values. - if (it is Number) { - val delta: Duration = Duration.of(it.toLong(), ChronoUnit.MICROS) - val instant: Instant = Instant.EPOCH.plus(delta) - val localDateTime: LocalDateTime = - LocalDateTime.ofInstant(instant, ZoneOffset.UTC) - Converted(localDateTime.format(LocalDateTimeCodec.formatter)) - } else { - NoConversion - } - } - ) + PartialConverter { + // Required for default values. + if (it is Number) { + val delta: Duration = Duration.of(it.toLong(), ChronoUnit.MICROS) + val instant: Instant = Instant.EPOCH.plus(delta) + val localDateTime: LocalDateTime = + LocalDateTime.ofInstant(instant, ZoneOffset.UTC) + Converted(localDateTime.format(LocalDateTimeCodec.formatter)) + } else { + NoConversion + } + } ) + } + + data object DateHandler : RelationalColumnCustomConverter.Handler { + + override fun matches(column: RelationalColumn): Boolean = + column.typeName().equals("DATE", ignoreCase = true) + + override fun outputSchemaBuilder(): SchemaBuilder = SchemaBuilder.string() - val dateHandler = - RelationalColumnCustomConverter.Handler( - predicate = { it.typeName().equals("DATE", ignoreCase = true) }, - outputSchema = SchemaBuilder.string(), - partialConverters = - listOf( - NullFallThrough, - PartialConverter { - if (it is LocalDate) { - Converted(it.format(LocalDateCodec.formatter)) - } else { - NoConversion - } - }, - PartialConverter { - // Required for default values. - if (it is Number) { - val localDate: LocalDate = LocalDate.ofEpochDay(it.toLong()) - Converted(localDate.format(LocalDateCodec.formatter)) - } else { - NoConversion - } - } - ), + override val partialConverters: List = + listOf( + NullFallThrough, + PartialConverter { + if (it is LocalDate) { + Converted(it.format(LocalDateCodec.formatter)) + } else { + NoConversion + } + }, + PartialConverter { + // Required for default values. + if (it is Number) { + val localDate: LocalDate = LocalDate.ofEpochDay(it.toLong()) + Converted(localDate.format(LocalDateCodec.formatter)) + } else { + NoConversion + } + } ) + } + + data object TimeHandler : RelationalColumnCustomConverter.Handler { + + override fun matches(column: RelationalColumn): Boolean = + column.typeName().equals("TIME", ignoreCase = true) + + override fun outputSchemaBuilder(): SchemaBuilder = SchemaBuilder.string() - val timeHandler = - RelationalColumnCustomConverter.Handler( - predicate = { it.typeName().equals("TIME", ignoreCase = true) }, - outputSchema = SchemaBuilder.string(), - partialConverters = - listOf( - NullFallThrough, - PartialConverter { - if (it is Duration) { - val localTime: LocalTime = LocalTime.MIDNIGHT.plus(it) - Converted(localTime.format(LocalTimeCodec.formatter)) - } else { - NoConversion - } - }, - PartialConverter { - // Required for default values. - if (it is Number) { - val delta: Duration = Duration.of(it.toLong(), ChronoUnit.MICROS) - val localTime: LocalTime = LocalTime.ofNanoOfDay(delta.toNanos()) - Converted(localTime.format(LocalTimeCodec.formatter)) - } else { - NoConversion - } - } - ), + override val partialConverters: List = + listOf( + NullFallThrough, + PartialConverter { + if (it is Duration) { + val localTime: LocalTime = LocalTime.MIDNIGHT.plus(it) + Converted(localTime.format(LocalTimeCodec.formatter)) + } else { + NoConversion + } + }, + PartialConverter { + // Required for default values. + if (it is Number) { + val delta: Duration = Duration.of(it.toLong(), ChronoUnit.MICROS) + val localTime: LocalTime = LocalTime.ofNanoOfDay(delta.toNanos()) + Converted(localTime.format(LocalTimeCodec.formatter)) + } else { + NoConversion + } + } ) + } + + data object TimestampHandler : RelationalColumnCustomConverter.Handler { + override fun matches(column: RelationalColumn): Boolean = + column.typeName().equals("TIMESTAMP", ignoreCase = true) - val timestampHandler = - RelationalColumnCustomConverter.Handler( - predicate = { it.typeName().equals("TIMESTAMP", ignoreCase = true) }, - outputSchema = SchemaBuilder.string(), - partialConverters = - listOf( - NullFallThrough, - PartialConverter { - if (it is ZonedDateTime) { - val offsetDateTime: OffsetDateTime = it.toOffsetDateTime() - Converted(offsetDateTime.format(OffsetDateTimeCodec.formatter)) - } else { - NoConversion - } - }, - PartialConverter { - // Required for default values. - if (it is String) { - val instant: Instant = Instant.parse(it) - val offsetDateTime: OffsetDateTime = - OffsetDateTime.ofInstant(instant, ZoneOffset.UTC) - Converted(offsetDateTime.format(OffsetDateTimeCodec.formatter)) - } else { - NoConversion - } - } - ), + override fun outputSchemaBuilder(): SchemaBuilder = SchemaBuilder.string() + + override val partialConverters: List = + listOf( + NullFallThrough, + PartialConverter { + if (it is ZonedDateTime) { + val offsetDateTime: OffsetDateTime = it.toOffsetDateTime() + Converted(offsetDateTime.format(OffsetDateTimeCodec.formatter)) + } else { + NoConversion + } + }, + PartialConverter { + // Required for default values. + if (it is String) { + val instant: Instant = Instant.parse(it) + val offsetDateTime: OffsetDateTime = + OffsetDateTime.ofInstant(instant, ZoneOffset.UTC) + Converted(offsetDateTime.format(OffsetDateTimeCodec.formatter)) + } else { + NoConversion + } + } ) } } diff --git a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceOperations.kt b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceOperations.kt index a3a2f3c70398..dc7930ea91bc 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceOperations.kt +++ b/airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceOperations.kt @@ -208,13 +208,16 @@ class MySqlSourceOperations : // chance of a row getting picked. This comes at a price of bias to the beginning // of table on very large tables ( > 100s million of rows) val greatestRate: String = 0.00005.toString() + // We only do a full count in case information schema contains no row count. + // This is the case for views. + val fullCount = "SELECT COUNT(*) FROM `$namespace`.`$name`" // Quick approximation to "select count(*) from table" which doesn't require // full table scan. However, note this could give delayed summary info about a table // and thus a new table could be treated as empty despite we recently added rows. // To prevent that from happening and resulted for skipping the table altogether, // the minimum count is set to 10. val quickCount = - "SELECT GREATEST(10, table_rows) FROM information_schema.tables WHERE table_schema = '$namespace' AND table_name = '$name'" + "SELECT GREATEST(10, COALESCE(table_rows, ($fullCount))) FROM information_schema.tables WHERE table_schema = '$namespace' AND table_name = '$name'" val greatest = "GREATEST($greatestRate, $sampleSize / ($quickCount))" // Rand returns a value between 0 and 1 val where = "WHERE RAND() < $greatest " diff --git a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCursorBasedIntegrationTest.kt b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCursorBasedIntegrationTest.kt index b011ba01dda1..5e2f9267fc0a 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCursorBasedIntegrationTest.kt +++ b/airbyte-integrations/connectors/source-mysql/src/test/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceCursorBasedIntegrationTest.kt @@ -159,6 +159,24 @@ class MySqlSourceCursorBasedIntegrationTest { assertTrue(run2.records().isEmpty()) } + @Test + fun testCursorBasedViewRead() { + provisionView(connectionFactory) + val catalog = getConfiguredCatalog() + catalog.streams[0].stream.name = viewName + val run1: BufferingOutputConsumer = CliRunner.source("read", config, catalog).run() + val lastStateMessageFromRun1 = run1.states().last() + val lastStreamStateFromRun1 = lastStateMessageFromRun1.stream.streamState + + assertEquals("20", lastStreamStateFromRun1.get("cursor").textValue()) + assertEquals(2, lastStreamStateFromRun1.get("version").intValue()) + assertEquals("cursor_based", lastStreamStateFromRun1.get("state_type").asText()) + assertEquals(viewName, lastStreamStateFromRun1.get("stream_name").asText()) + assertEquals(listOf("k"), lastStreamStateFromRun1.get("cursor_field").map { it.asText() }) + assertEquals("test", lastStreamStateFromRun1.get("stream_namespace").asText()) + assertEquals(0, lastStreamStateFromRun1.get("cursor_record_count").asInt()) + } + companion object { val log = KotlinLogging.logger {} val dbContainer: MySQLContainer<*> = MySqlContainerFactory.shared(imageName = "mysql:8.0") @@ -206,6 +224,17 @@ class MySqlSourceCursorBasedIntegrationTest { } } } + + lateinit var viewName: String + fun provisionView(targetConnectionFactory: JdbcConnectionFactory) { + viewName = "$tableName-view" + targetConnectionFactory.get().use { connection: Connection -> + connection.isReadOnly = false + connection.createStatement().use { stmt: Statement -> + stmt.execute("CREATE VIEW test.`$viewName` AS SELECT * FROM test.`$tableName`") + } + } + } } val V1_STATE: String = """ diff --git a/docs/integrations/sources/mysql.md b/docs/integrations/sources/mysql.md index 9b29602d9206..2cf4586bcff3 100644 --- a/docs/integrations/sources/mysql.md +++ b/docs/integrations/sources/mysql.md @@ -226,8 +226,11 @@ Any database or table encoding combination of charset and collation is supported | Version | Date | Pull Request | Subject | |:------------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------| +| 3.10.0-rc.8 | 2025-01-07 | [50965](https://github.com/airbytehq/airbyte/pull/50965) | Fix bug introduced in 3.10.0-rc.3. | +| 3.10.0-rc.7 | 2024-12-27 | [50437](https://github.com/airbytehq/airbyte/pull/50437) | Compatibility with MySQL Views. | +| 3.10.0-rc.6 | 2024-12-18 | [49892](https://github.com/airbytehq/airbyte/pull/49892) | Use a base image: airbyte/java-connector-base:1.0.0 | | 3.10.0-rc.5 | 2025-01-03 | [50868](https://github.com/airbytehq/airbyte/pull/50868) | Fix exception handling rules declaration. | -| 3.10.0-rc.4 | 2024-12-23 | [48587](https://github.com/airbytehq/airbyte/pull/48587) | Fix minor state counting mechanism. | +| 3.10.0-rc.4 | 2024-12-23 | [48587](https://github.com/airbytehq/airbyte/pull/48587) | Fix minor state counting mechanism. | | 3.10.0-rc.3 | 2024-12-20 | [49918](https://github.com/airbytehq/airbyte/pull/49918) | Fix minor datatype handling and conversion bugs, maintain big number precision. | | 3.10.0-rc.2 | 2024-12-20 | [49950](https://github.com/airbytehq/airbyte/pull/49950) | Remove unused configuration field, streamline SSL certificate key store logic. | | 3.10.0-rc.1 | 2024-12-20 | [49948](https://github.com/airbytehq/airbyte/pull/49948) | Pin Bulk CDK version to 231, adopt required changes. |