Skip to content

Commit

Permalink
Merge pull request #15 from Oli180369/Issue14
Browse files Browse the repository at this point in the history
Unit test for issue #14
  • Loading branch information
thake authored Mar 9, 2021
2 parents 281eba1 + 1bbefcb commit 6fbabed
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ object SourceRecordFields {
private const val OWNER = "schema"
private const val TABLE = "table"
private const val CHANGE_USER = "user"
val sourceSchema: Schema = SchemaBuilder.struct().name(LogminerSourceConnector::class.java.`package`.name + ".Source")
val sourceSchema: Schema =
SchemaBuilder.struct().name(LogminerSourceConnector::class.java.`package`.name + ".Source")
.field(VERSION, Schema.STRING_SCHEMA)
.field(CONNECTOR, Schema.STRING_SCHEMA)
.field(RECORD_TIMESTAMP, Timestamp.SCHEMA)
Expand All @@ -41,20 +42,20 @@ object SourceRecordFields {

fun convert(cdcRecord: CdcRecord): Struct {
return Struct(sourceSchema)
.put(VERSION, LogminerSourceConnector.version)
.put(CONNECTOR, LogminerSourceConnector.name)
.put(RECORD_TIMESTAMP, cdcRecord.timestamp)
.put(TRANSACTION, cdcRecord.transaction)
.put(SCN, cdcRecord.scn)
.put(OWNER, cdcRecord.table.owner)
.put(TABLE, cdcRecord.table.table)
.put(CHANGE_USER, cdcRecord.username)
.put(VERSION, LogminerSourceConnector.version)
.put(CONNECTOR, LogminerSourceConnector.name)
.put(RECORD_TIMESTAMP, cdcRecord.timestamp)
.put(TRANSACTION, cdcRecord.transaction)
.put(SCN, cdcRecord.scn)
.put(OWNER, cdcRecord.table.owner)
.put(TABLE, cdcRecord.table.table)
.put(CHANGE_USER, cdcRecord.username)
}
}

class ConnectSchemaFactory(
private val nameService: ConnectNameService,
private val isEmittingTombstones : Boolean
private val isEmittingTombstones: Boolean
) {


Expand All @@ -77,26 +78,28 @@ class ConnectSchemaFactory(
val recordConnectSchema = record.dataSchema.valueSchema

val valueSchema = SchemaBuilder.struct()
.name(name)
.field(CdcRecordFields.OPERATION, Schema.STRING_SCHEMA)
.field(CdcRecordFields.BEFORE, recordConnectSchema)
.field(CdcRecordFields.AFTER, recordConnectSchema)
.field(CdcRecordFields.SOURCE, sourceSchema)
.field(CdcRecordFields.PUBLISH_TIMESTAMP, Timestamp.SCHEMA)
.optional()
.build()
.name(name)
.field(CdcRecordFields.OPERATION, Schema.STRING_SCHEMA)
.field(CdcRecordFields.BEFORE, recordConnectSchema)
.field(CdcRecordFields.AFTER, recordConnectSchema)
.field(CdcRecordFields.SOURCE, sourceSchema)
.field(CdcRecordFields.PUBLISH_TIMESTAMP, Timestamp.SCHEMA)
.optional()
.build()
val struct = with(record) {
var updatedAfter = after

val sourceStruct = SourceRecordFields.convert(record)
val recordStruct = Struct(valueSchema)
.put(CdcRecordFields.OPERATION, operation.stringRep)
.put(CdcRecordFields.SOURCE, sourceStruct)
.put(CdcRecordFields.PUBLISH_TIMESTAMP, java.util.Date())
.put(CdcRecordFields.OPERATION, operation.stringRep)
.put(CdcRecordFields.SOURCE, sourceStruct)
.put(CdcRecordFields.PUBLISH_TIMESTAMP, java.util.Date())
if (operation == Operation.UPDATE && updatedAfter != null && before != null) {
val originalAfter = updatedAfter
//Enrich the after state with values from the before data set
val enrichedAfter = updatedAfter.toMutableMap()
before.forEach { enrichedAfter.putIfAbsent(it.key, it.value) }
val enrichedAfter = originalAfter.toMutableMap()
//Set the after value with the before value if the key does not exist in the after map.
enrichedAfter.putAll(before.filter { it.key !in originalAfter })
updatedAfter = enrichedAfter
}
before?.let {
Expand All @@ -123,7 +126,7 @@ class ConnectSchemaFactory(

val value = createValue(record)
val keyStruct = createKeyStruct(record)
val normalSourceRecord = SourceRecord(
val normalSourceRecord = SourceRecord(
partition,
pollResult.offset.map,
topic,
Expand All @@ -132,7 +135,7 @@ class ConnectSchemaFactory(
value.first,
value.second
)
return if(isEmittingTombstones && pollResult.cdcRecord.operation == Operation.DELETE){
return if (isEmittingTombstones && pollResult.cdcRecord.operation == Operation.DELETE) {
val deleteRecord = SourceRecord(
partition,
pollResult.offset.map,
Expand All @@ -146,15 +149,15 @@ class ConnectSchemaFactory(
normalSourceRecord,
deleteRecord
)
}else{
} else {
listOf(normalSourceRecord)
}

}

private fun convertDataToStruct(dataSchema: Schema, values: Map<String, Any?>): Struct {
return Struct(dataSchema).apply {
values.forEach { this.put(it.key, it.value) }
values.keys.forEach { this.put(it, values.get(it)) }
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package com.github.thake.logminer.kafka.connect.issues

import com.github.thake.logminer.kafka.connect.*
import io.kotest.matchers.collections.shouldBeEmpty
import io.kotest.matchers.collections.shouldHaveSize
import io.kotest.matchers.nulls.shouldBeNull
import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.source.SourceRecord
import org.apache.kafka.connect.source.SourceTaskContext
import org.apache.kafka.connect.storage.OffsetStorageReader
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.slf4j.LoggerFactory
import org.testcontainers.junit.jupiter.Testcontainers
import java.sql.Connection
import java.sql.Types
import java.util.*

@Testcontainers
class Issue14Test : AbstractIntegrationTest() {
private lateinit var sourceTask: SourceTask
private lateinit var offsetManager: MockOffsetStorageReader
private lateinit var defaultConfig: Map<String, String>
private val log = LoggerFactory.getLogger(Issue14Test::class.java)

private class TestSourceTaskContext(
val configs: Map<String, String>,
private val storageReader: OffsetStorageReader = MockOffsetStorageReader()
) : SourceTaskContext {

override fun configs(): MutableMap<String, String> {
return this.configs.toMutableMap()
}

override fun offsetStorageReader(): OffsetStorageReader {
return storageReader
}

}

private class MockOffsetStorageReader : OffsetStorageReader {
private var currentOffset = mutableMapOf<String, Any?>()
fun updateOffset(offset: MutableMap<String, Any?>) {
currentOffset = offset
}

override fun <T : Any?> offsets(partitions: MutableCollection<MutableMap<String, T>>?): MutableMap<MutableMap<String, T>, MutableMap<String, Any>> {
return Collections.emptyMap()
}

override fun <T : Any?> offset(partition: MutableMap<String, T>?): MutableMap<String, Any?> {
return currentOffset
}

}

@BeforeEach
fun setup() {
defaultConfig =
with(SourceConnectorConfig.Companion) {
mapOf(
BATCH_SIZE to "1000",
DB_NAME to "test",
DB_FETCH_SIZE to "10000",
DB_SID to oracle.sid,
DB_HOST to oracle.containerIpAddress,
DB_PORT to oracle.oraclePort.toString(),
DB_USERNAME to oracle.username,
DB_PASSWORD to oracle.password,
START_SCN to "0",
MONITORED_TABLES to STANDARD_TABLE.fullName + ", " + SECOND_TABLE.fullName
)
}
sourceTask = SourceTask()
offsetManager = MockOffsetStorageReader()
sourceTask.initialize(TestSourceTaskContext(defaultConfig, offsetManager))
//Wait for tables to correctly initialize
Thread.sleep(5000)
}

private fun createConfiguration(map: Map<String, String>? = null): Map<String, String> {
return defaultConfig.toMutableMap().apply { map?.let { putAll(it) } }
}

@AfterEach
fun tearDown() {
sourceTask.stop()
}

@Test
fun testUpdateColumnToNull() {
sourceTask.start(
createConfiguration(
mapOf(
SourceConnectorConfig.BATCH_SIZE to "10"
)
)
)
val modifyingConnection = openConnection()
//Initial state
modifyingConnection.insertRow(1)
var result = sourceTask.poll().toMutableList()
assertTrue(result.isNotEmpty())

modifyingConnection.prepareStatement("UPDATE ${STANDARD_TABLE.fullName} SET STRING = ?").use { stmt ->
stmt.setNull(1, Types.NVARCHAR)
stmt.executeUpdate()
}

result = sourceTask.readAllSourceRecords() as MutableList<SourceRecord>
assertTrue(result.size == 1)
((result[0].value() as Struct).get("after") as Struct).getString("STRING").shouldBeNull()
}

private fun SourceTask.readAllSourceRecords(): List<SourceRecord> {
val result = mutableListOf<SourceRecord>()
while (true) {
val currentResult = poll()
if (currentResult.isEmpty()) {
break
} else {
result.addAll(currentResult)
}
}
return result
}
}

0 comments on commit 6fbabed

Please sign in to comment.