From ad6e527b863e819fcb21d4a73bca804d0dde9b1a Mon Sep 17 00:00:00 2001 From: Oliver Puetter Date: Thu, 4 Mar 2021 15:52:19 +0100 Subject: [PATCH 1/4] Unit test for issue #14 --- .../kafka/connect/issues/Issue14Test.kt | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 src/test/kotlin/com/github/thake/logminer/kafka/connect/issues/Issue14Test.kt diff --git a/src/test/kotlin/com/github/thake/logminer/kafka/connect/issues/Issue14Test.kt b/src/test/kotlin/com/github/thake/logminer/kafka/connect/issues/Issue14Test.kt new file mode 100644 index 0000000..0d8511b --- /dev/null +++ b/src/test/kotlin/com/github/thake/logminer/kafka/connect/issues/Issue14Test.kt @@ -0,0 +1,45 @@ +package com.github.thake.logminer.kafka.connect.issues + +import com.github.thake.logminer.kafka.connect.* +import com.github.thake.logminer.kafka.connect.logminer.LogminerSource +import io.kotest.matchers.nulls.shouldBeNull +import io.kotest.matchers.nulls.shouldNotBeNull +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.EnumSource +import java.sql.Connection +import java.sql.Types + + +class Issue14Test : AbstractCdcSourceIntegrationTest() { + + private fun performInsertBeforeChange(conn: Connection, cdcSource: LogminerSource){ + val insertedId = 1 + conn.insertRow(insertedId) + val results = cdcSource.getResults(conn) + assertContainsOnlySpecificOperationForIds(results, insertedId.rangeTo(insertedId), Operation.INSERT) + assertAllAfterColumnsContained(results) + } + + @ParameterizedTest + @EnumSource + fun testUpdateColumnToNull(dictionarySource: LogminerDictionarySource) { + val conn = openConnection() + val cdcSource = getCdcSource(dictionarySource) + + val insertedId = 1 + conn.insertRow(insertedId) + + conn.prepareStatement("UPDATE ${STANDARD_TABLE.fullName} SET STRING = ?").use { stmt -> + stmt.setNull(1,Types.NVARCHAR) + stmt.executeUpdate() + } + val results = cdcSource.getResults(conn) + assertContainsOnlySpecificOperationForIds(results, insertedId.rangeTo(insertedId), Operation.UPDATE) + assertAllAfterColumnsContained(results) + results.forEach { + val after = it.cdcRecord.after + after.shouldNotBeNull() + after[Columns.STRING.name].shouldBeNull() + } + } +} \ No newline at end of file From f82fc67b9cc8796d80095bb8a273f3785c3f2ad7 Mon Sep 17 00:00:00 2001 From: Thorsten Hake Date: Thu, 4 Mar 2021 17:21:18 +0100 Subject: [PATCH 2/4] Fixed errors in test --- .../thake/logminer/kafka/connect/issues/Issue14Test.kt | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/test/kotlin/com/github/thake/logminer/kafka/connect/issues/Issue14Test.kt b/src/test/kotlin/com/github/thake/logminer/kafka/connect/issues/Issue14Test.kt index 0d8511b..e9f7948 100644 --- a/src/test/kotlin/com/github/thake/logminer/kafka/connect/issues/Issue14Test.kt +++ b/src/test/kotlin/com/github/thake/logminer/kafka/connect/issues/Issue14Test.kt @@ -12,8 +12,7 @@ import java.sql.Types class Issue14Test : AbstractCdcSourceIntegrationTest() { - private fun performInsertBeforeChange(conn: Connection, cdcSource: LogminerSource){ - val insertedId = 1 + private fun performInsertBeforeChange(insertedId : Int, conn: Connection, cdcSource: LogminerSource){ conn.insertRow(insertedId) val results = cdcSource.getResults(conn) assertContainsOnlySpecificOperationForIds(results, insertedId.rangeTo(insertedId), Operation.INSERT) @@ -27,15 +26,13 @@ class Issue14Test : AbstractCdcSourceIntegrationTest() { val cdcSource = getCdcSource(dictionarySource) val insertedId = 1 - conn.insertRow(insertedId) - + performInsertBeforeChange(insertedId, conn, cdcSource) conn.prepareStatement("UPDATE ${STANDARD_TABLE.fullName} SET STRING = ?").use { stmt -> stmt.setNull(1,Types.NVARCHAR) stmt.executeUpdate() } val results = cdcSource.getResults(conn) assertContainsOnlySpecificOperationForIds(results, insertedId.rangeTo(insertedId), Operation.UPDATE) - assertAllAfterColumnsContained(results) results.forEach { val after = it.cdcRecord.after after.shouldNotBeNull() From 01285f6172c632623c038ae6bfff7b48d34d0b20 Mon Sep 17 00:00:00 2001 From: Oliver Puetter Date: Mon, 8 Mar 2021 14:54:31 +0100 Subject: [PATCH 3/4] Possible fix --- .../kafka/connect/ConnectSchemaFactory.kt | 6 +- .../kafka/connect/issues/Issue14Test.kt | 138 ++++++++++++++---- 2 files changed, 117 insertions(+), 27 deletions(-) diff --git a/src/main/kotlin/com/github/thake/logminer/kafka/connect/ConnectSchemaFactory.kt b/src/main/kotlin/com/github/thake/logminer/kafka/connect/ConnectSchemaFactory.kt index bcc2b1a..61dab14 100644 --- a/src/main/kotlin/com/github/thake/logminer/kafka/connect/ConnectSchemaFactory.kt +++ b/src/main/kotlin/com/github/thake/logminer/kafka/connect/ConnectSchemaFactory.kt @@ -96,7 +96,9 @@ class ConnectSchemaFactory( if (operation == Operation.UPDATE && updatedAfter != null && before != null) { //Enrich the after state with values from the before data set val enrichedAfter = updatedAfter.toMutableMap() - before.forEach { enrichedAfter.putIfAbsent(it.key, it.value) } + before.forEach { + if (!updatedAfter!!.containsKey(it.key)) + enrichedAfter.putIfAbsent(it.key, it.value) } updatedAfter = enrichedAfter } before?.let { @@ -154,7 +156,7 @@ class ConnectSchemaFactory( private fun convertDataToStruct(dataSchema: Schema, values: Map): Struct { return Struct(dataSchema).apply { - values.forEach { this.put(it.key, it.value) } + values.keys.forEach { this.put(it, values.get(it)) } } } } \ No newline at end of file diff --git a/src/test/kotlin/com/github/thake/logminer/kafka/connect/issues/Issue14Test.kt b/src/test/kotlin/com/github/thake/logminer/kafka/connect/issues/Issue14Test.kt index e9f7948..a73206d 100644 --- a/src/test/kotlin/com/github/thake/logminer/kafka/connect/issues/Issue14Test.kt +++ b/src/test/kotlin/com/github/thake/logminer/kafka/connect/issues/Issue14Test.kt @@ -1,42 +1,130 @@ package com.github.thake.logminer.kafka.connect.issues import com.github.thake.logminer.kafka.connect.* -import com.github.thake.logminer.kafka.connect.logminer.LogminerSource +import io.kotest.matchers.collections.shouldBeEmpty +import io.kotest.matchers.collections.shouldHaveSize import io.kotest.matchers.nulls.shouldBeNull -import io.kotest.matchers.nulls.shouldNotBeNull -import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.EnumSource +import org.apache.kafka.connect.data.Struct +import org.apache.kafka.connect.source.SourceRecord +import org.apache.kafka.connect.source.SourceTaskContext +import org.apache.kafka.connect.storage.OffsetStorageReader +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.slf4j.LoggerFactory +import org.testcontainers.junit.jupiter.Testcontainers import java.sql.Connection import java.sql.Types +import java.util.* +@Testcontainers +class Issue14Test : AbstractIntegrationTest() { + private lateinit var sourceTask: SourceTask + private lateinit var offsetManager: MockOffsetStorageReader + private lateinit var defaultConfig: Map + private val log = LoggerFactory.getLogger(Issue14Test::class.java) -class Issue14Test : AbstractCdcSourceIntegrationTest() { + private class TestSourceTaskContext( + val configs: Map, + private val storageReader: OffsetStorageReader = MockOffsetStorageReader() + ) : SourceTaskContext { + + override fun configs(): MutableMap { + return this.configs.toMutableMap() + } + + override fun offsetStorageReader(): OffsetStorageReader { + return storageReader + } - private fun performInsertBeforeChange(insertedId : Int, conn: Connection, cdcSource: LogminerSource){ - conn.insertRow(insertedId) - val results = cdcSource.getResults(conn) - assertContainsOnlySpecificOperationForIds(results, insertedId.rangeTo(insertedId), Operation.INSERT) - assertAllAfterColumnsContained(results) } - @ParameterizedTest - @EnumSource - fun testUpdateColumnToNull(dictionarySource: LogminerDictionarySource) { - val conn = openConnection() - val cdcSource = getCdcSource(dictionarySource) + private class MockOffsetStorageReader : OffsetStorageReader { + private var currentOffset = mutableMapOf() + fun updateOffset(offset: MutableMap) { + currentOffset = offset + } + + override fun offsets(partitions: MutableCollection>?): MutableMap, MutableMap> { + return Collections.emptyMap() + } + + override fun offset(partition: MutableMap?): MutableMap { + return currentOffset + } + + } - val insertedId = 1 - performInsertBeforeChange(insertedId, conn, cdcSource) - conn.prepareStatement("UPDATE ${STANDARD_TABLE.fullName} SET STRING = ?").use { stmt -> - stmt.setNull(1,Types.NVARCHAR) + @BeforeEach + fun setup() { + defaultConfig = + with(SourceConnectorConfig.Companion) { + mapOf( + BATCH_SIZE to "1000", + DB_NAME to "test", + DB_FETCH_SIZE to "10000", + DB_SID to oracle.sid, + DB_HOST to oracle.containerIpAddress, + DB_PORT to oracle.oraclePort.toString(), + DB_USERNAME to oracle.username, + DB_PASSWORD to oracle.password, + START_SCN to "0", + MONITORED_TABLES to STANDARD_TABLE.fullName + ", " + SECOND_TABLE.fullName + ) + } + sourceTask = SourceTask() + offsetManager = MockOffsetStorageReader() + sourceTask.initialize(TestSourceTaskContext(defaultConfig, offsetManager)) + //Wait for tables to correctly initialize + Thread.sleep(5000) + } + + private fun createConfiguration(map: Map? = null): Map { + return defaultConfig.toMutableMap().apply { map?.let { putAll(it) } } + } + + @AfterEach + fun tearDown() { + sourceTask.stop() + } + + @Test + fun testUpdateColumnToNull() { + sourceTask.start( + createConfiguration( + mapOf( + SourceConnectorConfig.BATCH_SIZE to "10" + ) + ) + ) + val modifyingConnection = openConnection() + //Initial state + (0 until 1).forEach { modifyingConnection.insertRow(it) } + var result = sourceTask.poll().toMutableList() + assertTrue(result.isNotEmpty()) + + modifyingConnection.prepareStatement("UPDATE ${STANDARD_TABLE.fullName} SET STRING = ?").use { stmt -> + stmt.setNull(1, Types.NVARCHAR) stmt.executeUpdate() } - val results = cdcSource.getResults(conn) - assertContainsOnlySpecificOperationForIds(results, insertedId.rangeTo(insertedId), Operation.UPDATE) - results.forEach { - val after = it.cdcRecord.after - after.shouldNotBeNull() - after[Columns.STRING.name].shouldBeNull() + + result = sourceTask.readAllSourceRecords() as MutableList + assertTrue(result.size == 1) + ((result.get(0).value() as Struct).get("after")as Struct).getString("STRING").shouldBeNull() + } + + private fun SourceTask.readAllSourceRecords(): List { + val result = mutableListOf() + while (true) { + val currentResult = poll() + if (currentResult.isEmpty()) { + break + } else { + result.addAll(currentResult) + } } + return result } } \ No newline at end of file From 1bbefcb85e3a60ebcc774d9ee0084b423b926749 Mon Sep 17 00:00:00 2001 From: Thorsten Hake Date: Tue, 9 Mar 2021 21:10:36 +0100 Subject: [PATCH 4/4] Minor changes --- .../kafka/connect/ConnectSchemaFactory.kt | 57 ++++++++++--------- .../kafka/connect/issues/Issue14Test.kt | 4 +- 2 files changed, 31 insertions(+), 30 deletions(-) diff --git a/src/main/kotlin/com/github/thake/logminer/kafka/connect/ConnectSchemaFactory.kt b/src/main/kotlin/com/github/thake/logminer/kafka/connect/ConnectSchemaFactory.kt index 61dab14..d3165dc 100644 --- a/src/main/kotlin/com/github/thake/logminer/kafka/connect/ConnectSchemaFactory.kt +++ b/src/main/kotlin/com/github/thake/logminer/kafka/connect/ConnectSchemaFactory.kt @@ -28,7 +28,8 @@ object SourceRecordFields { private const val OWNER = "schema" private const val TABLE = "table" private const val CHANGE_USER = "user" - val sourceSchema: Schema = SchemaBuilder.struct().name(LogminerSourceConnector::class.java.`package`.name + ".Source") + val sourceSchema: Schema = + SchemaBuilder.struct().name(LogminerSourceConnector::class.java.`package`.name + ".Source") .field(VERSION, Schema.STRING_SCHEMA) .field(CONNECTOR, Schema.STRING_SCHEMA) .field(RECORD_TIMESTAMP, Timestamp.SCHEMA) @@ -41,20 +42,20 @@ object SourceRecordFields { fun convert(cdcRecord: CdcRecord): Struct { return Struct(sourceSchema) - .put(VERSION, LogminerSourceConnector.version) - .put(CONNECTOR, LogminerSourceConnector.name) - .put(RECORD_TIMESTAMP, cdcRecord.timestamp) - .put(TRANSACTION, cdcRecord.transaction) - .put(SCN, cdcRecord.scn) - .put(OWNER, cdcRecord.table.owner) - .put(TABLE, cdcRecord.table.table) - .put(CHANGE_USER, cdcRecord.username) + .put(VERSION, LogminerSourceConnector.version) + .put(CONNECTOR, LogminerSourceConnector.name) + .put(RECORD_TIMESTAMP, cdcRecord.timestamp) + .put(TRANSACTION, cdcRecord.transaction) + .put(SCN, cdcRecord.scn) + .put(OWNER, cdcRecord.table.owner) + .put(TABLE, cdcRecord.table.table) + .put(CHANGE_USER, cdcRecord.username) } } class ConnectSchemaFactory( private val nameService: ConnectNameService, - private val isEmittingTombstones : Boolean + private val isEmittingTombstones: Boolean ) { @@ -77,28 +78,28 @@ class ConnectSchemaFactory( val recordConnectSchema = record.dataSchema.valueSchema val valueSchema = SchemaBuilder.struct() - .name(name) - .field(CdcRecordFields.OPERATION, Schema.STRING_SCHEMA) - .field(CdcRecordFields.BEFORE, recordConnectSchema) - .field(CdcRecordFields.AFTER, recordConnectSchema) - .field(CdcRecordFields.SOURCE, sourceSchema) - .field(CdcRecordFields.PUBLISH_TIMESTAMP, Timestamp.SCHEMA) - .optional() - .build() + .name(name) + .field(CdcRecordFields.OPERATION, Schema.STRING_SCHEMA) + .field(CdcRecordFields.BEFORE, recordConnectSchema) + .field(CdcRecordFields.AFTER, recordConnectSchema) + .field(CdcRecordFields.SOURCE, sourceSchema) + .field(CdcRecordFields.PUBLISH_TIMESTAMP, Timestamp.SCHEMA) + .optional() + .build() val struct = with(record) { var updatedAfter = after val sourceStruct = SourceRecordFields.convert(record) val recordStruct = Struct(valueSchema) - .put(CdcRecordFields.OPERATION, operation.stringRep) - .put(CdcRecordFields.SOURCE, sourceStruct) - .put(CdcRecordFields.PUBLISH_TIMESTAMP, java.util.Date()) + .put(CdcRecordFields.OPERATION, operation.stringRep) + .put(CdcRecordFields.SOURCE, sourceStruct) + .put(CdcRecordFields.PUBLISH_TIMESTAMP, java.util.Date()) if (operation == Operation.UPDATE && updatedAfter != null && before != null) { + val originalAfter = updatedAfter //Enrich the after state with values from the before data set - val enrichedAfter = updatedAfter.toMutableMap() - before.forEach { - if (!updatedAfter!!.containsKey(it.key)) - enrichedAfter.putIfAbsent(it.key, it.value) } + val enrichedAfter = originalAfter.toMutableMap() + //Set the after value with the before value if the key does not exist in the after map. + enrichedAfter.putAll(before.filter { it.key !in originalAfter }) updatedAfter = enrichedAfter } before?.let { @@ -125,7 +126,7 @@ class ConnectSchemaFactory( val value = createValue(record) val keyStruct = createKeyStruct(record) - val normalSourceRecord = SourceRecord( + val normalSourceRecord = SourceRecord( partition, pollResult.offset.map, topic, @@ -134,7 +135,7 @@ class ConnectSchemaFactory( value.first, value.second ) - return if(isEmittingTombstones && pollResult.cdcRecord.operation == Operation.DELETE){ + return if (isEmittingTombstones && pollResult.cdcRecord.operation == Operation.DELETE) { val deleteRecord = SourceRecord( partition, pollResult.offset.map, @@ -148,7 +149,7 @@ class ConnectSchemaFactory( normalSourceRecord, deleteRecord ) - }else{ + } else { listOf(normalSourceRecord) } diff --git a/src/test/kotlin/com/github/thake/logminer/kafka/connect/issues/Issue14Test.kt b/src/test/kotlin/com/github/thake/logminer/kafka/connect/issues/Issue14Test.kt index a73206d..891ed78 100644 --- a/src/test/kotlin/com/github/thake/logminer/kafka/connect/issues/Issue14Test.kt +++ b/src/test/kotlin/com/github/thake/logminer/kafka/connect/issues/Issue14Test.kt @@ -101,7 +101,7 @@ class Issue14Test : AbstractIntegrationTest() { ) val modifyingConnection = openConnection() //Initial state - (0 until 1).forEach { modifyingConnection.insertRow(it) } + modifyingConnection.insertRow(1) var result = sourceTask.poll().toMutableList() assertTrue(result.isNotEmpty()) @@ -112,7 +112,7 @@ class Issue14Test : AbstractIntegrationTest() { result = sourceTask.readAllSourceRecords() as MutableList assertTrue(result.size == 1) - ((result.get(0).value() as Struct).get("after")as Struct).getString("STRING").shouldBeNull() + ((result[0].value() as Struct).get("after") as Struct).getString("STRING").shouldBeNull() } private fun SourceTask.readAllSourceRecords(): List {