Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bulk-cdk-toolkit-extract-cdc: fix CustomConverter bug #50967

Merged
merged 1 commit into from
Jan 7, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,41 +7,40 @@ package io.airbyte.cdk.read.cdc
import io.debezium.spi.converter.CustomConverter
import io.debezium.spi.converter.RelationalColumn
import java.util.Properties
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.SchemaBuilder

/** Used by Debezium to transform record values into their expected format. */
interface RelationalColumnCustomConverter : CustomConverter<Schema, RelationalColumn> {
interface RelationalColumnCustomConverter : CustomConverter<SchemaBuilder, RelationalColumn> {

/** A nice name for use in Debezium properties. */
val debeziumPropertiesKey: String

/** Fall-through list of handlers to try to match and register for each column. */
val handlers: List<Handler>

data class Handler(
interface Handler {
/** Predicate to match the column by. */
val predicate: (RelationalColumn) -> Boolean,
fun matches(column: RelationalColumn): Boolean

/** Schema of the output values. */
val outputSchema: Schema,
fun outputSchemaBuilder(): SchemaBuilder

/** Partial conversion functions, applied in sequence until conversion occurs. */
val partialConverters: List<PartialConverter>
)
}

override fun configure(props: Properties?) {}

override fun converterFor(
column: RelationalColumn?,
registration: CustomConverter.ConverterRegistration<Schema>?
registration: CustomConverter.ConverterRegistration<SchemaBuilder>?
) {
if (column == null || registration == null) {
return
}
for (handler in handlers) {
if (!handler.predicate(column)) continue
val converter: CustomConverter.Converter =
ConverterFactory(javaClass).build(column, handler.partialConverters)
registration.register(handler.outputSchema, converter)
return
}
val handler: Handler = handlers.find { it.matches(column) } ?: return
val converter: CustomConverter.Converter =
ConverterFactory(javaClass).build(column, handler.partialConverters)
registration.register(handler.outputSchemaBuilder(), converter)
}
}
Loading