Skip to content

Commit

Permalink
Adding option to interpret timestamps without timezone information.
Browse files Browse the repository at this point in the history
Also distinguished between TIMESTAMP, TIMESTAMP WITH LOCAL TIME ZONE and TIMESTAMP WITH TIME ZONE types.
  • Loading branch information
thake committed Sep 24, 2020
1 parent 1d55551 commit 9c2cc89
Show file tree
Hide file tree
Showing 14 changed files with 343 additions and 68 deletions.
6 changes: 6 additions & 0 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@ The following configuration parameter are available:
- Type: string
- Default: ONLINE
- Importance: low
- `db.timezone`
The timezone in which TIMESTAMP columns (without any timezone information) should be interpreted as. Valid values are all values that can be passed to https://docs.oracle.com/javase/8/docs/api/java/time/ZoneId.html#of-java.lang.String-

- Type: string
- Default: UTC
- Importance: high

- `batch.size`
Batch size of rows that should be fetched in one batch
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
package com.github.thake.logminer.kafka.connect

import com.github.thake.logminer.kafka.connect.SchemaType.TimeType.TimestampType
import org.apache.kafka.connect.data.Date
import org.apache.kafka.connect.data.Decimal
import org.apache.kafka.connect.data.SchemaBuilder
import org.apache.kafka.connect.data.Timestamp
import java.math.BigDecimal
import java.sql.ResultSet
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.ZoneId
import java.time.ZoneOffset
import java.time.*
import java.time.format.DateTimeFormatter
import java.util.*

const val NUMERIC_TYPE_SCALE_LOW = -84
val UNRESOLVABLE_DATE_TIME_EXPRESSIONS = arrayOf(
Expand Down Expand Up @@ -74,7 +73,8 @@ sealed class SchemaType<T> {
override fun convert(str: String): BigDecimal = str.toBigDecimal().setScale(scale)
override fun createSchemaBuilder(): SchemaBuilder = Decimal.builder(scale)
override fun toString(): String = "BigDecimal"
override fun extract(index: Int, resultSet: ResultSet): BigDecimal? = resultSet.getBigDecimal(index)?.let { it.setScale(scale) }
override fun extract(index: Int, resultSet: ResultSet): BigDecimal? = resultSet.getBigDecimal(index)
?.setScale(scale)
}
}

Expand Down Expand Up @@ -118,31 +118,79 @@ sealed class SchemaType<T> {
java.util.Date.from(it.toLocalDate().atStartOfDay(ZoneOffset.UTC).toInstant())
}
}
abstract class TimestampType(val fractionalSeconds: Int = 6) : TimeType(){
val fractionalSecondsPart : String
get() = if(fractionalSeconds > 0) "[.${"S".repeat(fractionalSeconds)}]" else ""

object TimestampType : TimeType() {
//Format: 2020-01-27 06:00:00
val localDateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss[.SSSSSS]")

override fun convert(str: String): java.util.Date {
return java.util.Date
.from(
LocalDateTime.parse(str, localDateTimeFormatter).atZone(
ZoneId.systemDefault()
).toInstant()
)
abstract val pattern : String
val dateTimeFormatter : DateTimeFormatter by lazy {
DateTimeFormatter.ofPattern(pattern)
}

override fun convert(str: String): java.util.Date = java.util.Date.from(parse(str))
protected open fun parse(str: String) : Instant = ZonedDateTime.parse(str, dateTimeFormatter).toInstant()
override fun cleanDefaultStr(str: String) = str.removeSurrounding("TIMESTAMP '", "'")
override fun createSchemaBuilder(): SchemaBuilder = Timestamp.builder()
override fun toString(): String = "Timestamp"
override fun extract(index: Int, resultSet: ResultSet): java.util.Date? =
resultSet.getTimestamp(index)?.let { java.util.Date(it.time) }

override fun equals(other: Any?): Boolean {
if (this === other) return true
if (other !is TimestampType) return false

if (fractionalSeconds != other.fractionalSeconds) return false
if (pattern != other.pattern) return false

return true
}
override fun hashCode(): Int {
var result = fractionalSeconds
result = 31 * result + pattern.hashCode()
return result
}

class TimestampWithoutTimezone(val defaultTimeZone : ZoneId, fractionalSeconds : Int = 6) : TimestampType(fractionalSeconds) {
override val pattern: String
get() = "yyyy-MM-dd HH:mm:ss$fractionalSecondsPart"
private val cal = Calendar.getInstance(TimeZone.getTimeZone(defaultTimeZone))
override fun parse(str: String): Instant = LocalDateTime.parse(str, dateTimeFormatter).atZone(defaultTimeZone).toInstant()
override fun extract(index: Int, resultSet: ResultSet): java.util.Date? =
resultSet.getTimestamp(index,cal)?.let { java.util.Date(it.time) }
override fun toString(): String = "Timestamp($fractionalSeconds) ($defaultTimeZone)"
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (other !is TimestampWithoutTimezone) return false
if (!super.equals(other)) return false

if (defaultTimeZone != other.defaultTimeZone) return false

return true
}
override fun hashCode(): Int {
var result = super.hashCode()
result = 31 * result + defaultTimeZone.hashCode()
return result
}
}
class TimestampWithTimezone(fractionalSeconds: Int = 6) : TimestampType(fractionalSeconds) {
//Format: 2020-01-27 06:00:00.640000 US/Pacific PDT
override val pattern: String
get() = "yyyy-MM-dd HH:mm:ss$fractionalSecondsPart VV [zzz]"

override fun toString(): String = "Timestamp($fractionalSeconds) with timezone"
}
class TimestampWithLocalTimezone(fractionalSeconds: Int = 6) : TimestampType(fractionalSeconds){
//Format: 2020-09-24 10:11:26.684000+00:00
override val pattern: String
get() = "yyyy-MM-dd HH:mm:ss${fractionalSecondsPart}xxx"
override fun toString(): String = "Timestamp with local timezone"
}
}

}


companion object {
fun toSchemaType(columnDataType: ColumnDefinition): SchemaType<out Any> {
fun toSchemaType(columnDataType: ColumnDefinition, defaultZoneId: ZoneId): SchemaType<out Any> {
val scale = columnDataType.scale
val precision = columnDataType.precision
return when (columnDataType.type) {
Expand Down Expand Up @@ -187,7 +235,12 @@ sealed class SchemaType<T> {
"DATE" -> TimeType.DateType
else ->
if (columnDataType.type.startsWith("TIMESTAMP")) {
TimeType.TimestampType
val fractionalSeconds = columnDataType.scale ?: 6
when {
columnDataType.type.endsWith("WITH TIME ZONE") -> TimestampType.TimestampWithTimezone(fractionalSeconds)
columnDataType.type.endsWith("WITH LOCAL TIME ZONE") -> TimestampType.TimestampWithLocalTimezone(fractionalSeconds)
else -> TimestampType.TimestampWithoutTimezone(defaultZoneId,fractionalSeconds)
}
} else {
throw IllegalArgumentException("Type for column data type $columnDataType is currently not supported")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import mu.KotlinLogging
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.SchemaBuilder
import java.sql.Connection
import java.time.ZoneId

private val logger = KotlinLogging.logger {}

Expand All @@ -26,7 +27,10 @@ data class SchemaDefinition(
fun getColumnSchemaType(columnName: String) = columnTypes[columnName]
}

class SchemaService(private val nameService: ConnectNameService) {
class SchemaService(
private val nameService: ConnectNameService,
private val defaultZoneId : ZoneId
) {
private val cachedSchemas: MutableMap<TableId, SchemaDefinition> = mutableMapOf()

fun getSchema(dbConn: Connection, table: TableId) = cachedSchemas.getOrPut(table, { buildTableSchema(dbConn, table) })
Expand Down Expand Up @@ -83,7 +87,7 @@ class SchemaService(private val nameService: ConnectNameService) {
val doc = result.getString("COMMENTS")
val nullable = result.getString("NULLABLE") == "Y"
val columnDef = ColumnDefinition(name, type, scale, precision, defaultValue, nullable, doc)
val schemaType = SchemaType.toSchemaType(columnDef)
val schemaType = SchemaType.toSchemaType(columnDef,defaultZoneId)
columnTypes[name] = schemaType
val columnSchema = createColumnSchema(columnDef, schemaType)
valueSchemaBuilder.field(name, columnSchema)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
package com.github.thake.logminer.kafka.connect

import mu.KotlinLogging
import org.apache.kafka.common.config.AbstractConfig
import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.common.config.ConfigDef.Importance
import java.sql.Connection
import java.sql.DriverManager
import java.sql.SQLException
import java.time.Duration
import java.time.ZoneId

private val logger = KotlinLogging.logger {}
sealed class LogMinerSelector
data class TableSelector(val owner: String, val tableName: String) : LogMinerSelector()
data class SchemaSelector(val owner: String) : LogMinerSelector()
Expand All @@ -19,6 +25,34 @@ class SourceConnectorConfig(
conf(),
parsedConfig
)
val connection by lazy {
val dbUri = "${dbHostName}:${dbPort}/${dbSid}"
fun openConnection(): Connection {
return DriverManager.getConnection(
"jdbc:oracle:thin:@$dbUri",
dbUser, dbPassword
).also {
logger.info { "Connected to database at $dbUri" }
}
}

var currentAttempt = 0
var connection: Connection? = null
while (currentAttempt < dbAttempts && connection == null) {
if (currentAttempt > 0) {
logger.info { "Waiting ${dbBackoff.toMillis()} ms before next attempt to acquire a connection" }
Thread.sleep(dbBackoff.toMillis())
}
currentAttempt++
try {
connection = openConnection()
} catch (e: SQLException) {
logger.error(e) { "Couldn't connect to database with url $dbUri. Attempt $currentAttempt." }

}
}
connection ?: throw SQLException("Couldn't initialize Connection to $dbUri after $dbAttempts attempts.")
}


val dbSid: String
Expand All @@ -39,6 +73,10 @@ class SourceConnectorConfig(
val dbName: String
get() = getString(DB_NAME)

val dbZoneId : ZoneId
get() = ZoneId.of(getString(DB_TIMEZONE))


val logminerDictionarySource : LogminerDictionarySource
get() = LogminerDictionarySource.valueOf(getString(DB_LOGMINER_DICTIONARY))

Expand Down Expand Up @@ -84,13 +122,13 @@ class SourceConnectorConfig(
const val DB_ATTEMPTS = "db.attempts"
const val DB_BACKOFF_MS = "db.backoff.ms"
const val DB_LOGMINER_DICTIONARY = "db.logminer.dictionary"
const val DB_TIMEZONE = "db.timezone"
const val MONITORED_TABLES = "table.whitelist"
const val DB_FETCH_SIZE = "db.fetch.size"
const val START_SCN = "start.scn"
const val BATCH_SIZE = "batch.size"
const val POLL_INTERVAL_MS = "poll.interval.ms"


fun conf(): ConfigDef {
return ConfigDef()
.define(
Expand Down Expand Up @@ -136,6 +174,13 @@ class SourceConnectorConfig(
Importance.LOW,
"Type of logminer dictionary that should be used. Valid values: "+LogminerDictionarySource.values().joinToString { it.name }
)
.define(
DB_TIMEZONE,
ConfigDef.Type.STRING,
"UTC",
Importance.HIGH,
"The timezone in which TIMESTAMP columns (without any timezone information) should be interpreted as. Valid values are all values that can be passed to https://docs.oracle.com/javase/8/docs/api/java/time/ZoneId.html#of-java.lang.String-"
)
.define(
MONITORED_TABLES,
ConfigDef.Type.STRING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import org.apache.kafka.connect.source.SourceRecord
import org.apache.kafka.connect.source.SourceTask
import org.apache.kafka.connect.source.SourceTaskContext
import java.sql.Connection
import java.sql.DriverManager
import java.sql.SQLException
import java.util.*

Expand All @@ -20,40 +19,12 @@ sealed class TaskState
object StoppedState : TaskState()
data class StartedState(val config: SourceConnectorConfig, val context: SourceTaskContext) : TaskState() {
val connection: Connection by lazy {
with(config) {
val dbUri = "${dbHostName}:${dbPort}/${dbSid}"
fun openConnection(): Connection {
return DriverManager.getConnection(
"jdbc:oracle:thin:@$dbUri",
dbUser, dbPassword
).also {
logger.info { "Connected to database at $dbUri" }
}
}

var currentAttempt = 0
var connection: Connection? = null
while (currentAttempt < dbAttempts && connection == null) {
if (currentAttempt > 0) {
logger.info { "Waiting ${dbBackoff.toMillis()} ms before next attempt to acquire a connection" }
Thread.sleep(dbBackoff.toMillis())
}
currentAttempt++
try {
connection = openConnection()
} catch (e: SQLException) {
logger.error(e) { "Couldn't connect to database with url $dbUri. Attempt $currentAttempt." }

}
}
connection ?: throw SQLException("Couldn't initialize Connection to $dbUri after $dbAttempts attempts.")

}
config.connection
}
var offset: Offset?
val nameService: ConnectNameService = SourceDatabaseNameService(config.dbName)
private val schemaService: SchemaService by lazy {
SchemaService(nameService)
SchemaService(nameService,config.dbZoneId)
}
private var source: Source<out Offset?>
private val sourcePartition = Collections.singletonMap(TaskConstants.LOG_MINER_OFFSET, config.dbName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import com.github.thake.logminer.kafka.connect.logminer.LogminerSource
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeEach
import org.testcontainers.junit.jupiter.Testcontainers
import java.time.ZoneId

@Testcontainers
abstract class AbstractCdcSourceIntegrationTest : AbstractIntegrationTest() {
private lateinit var cdcSource: LogminerSource

protected open val tableSelector: TableSelector
get() = TableSelector(OWNER, TABLE_NAME)

Expand All @@ -32,6 +34,6 @@ abstract class AbstractCdcSourceIntegrationTest : AbstractIntegrationTest() {
logminerDictionarySource = logminerDictionarySource
),
offset = offset,
schemaService = SchemaService(SourceDatabaseNameService("A"))
schemaService = SchemaService(SourceDatabaseNameService("A"),defaultZone)
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import java.sql.Date
import java.sql.Timestamp
import java.time.Instant
import java.time.LocalDate
import java.time.ZoneId

enum class Columns {
ID, TIME, STRING, integer, long, date, BIG_DECIMAL
Expand All @@ -25,12 +26,13 @@ val STANDARD_TABLE = TableId(OWNER, TABLE_NAME)
val SECOND_TABLE = TableId(OWNER, "SECOND_TAB")

abstract class AbstractIntegrationTest {
protected open val defaultZone = ZoneId.of("Europe/Berlin")

@Container
protected open val oracle: OracleContainer =
OracleContainer("thake/oracle-xe-11g-archivelog").withInitScript("InitTestTable.sql").withReuse(false)

OracleContainer("thake/oracle-xe-11g-archivelog").withInitScript(getInitScript()).withReuse(false)

protected open fun getInitScript() = "InitTestTable.sql"
fun openConnection(): Connection = oracle.createConnection("")

protected fun Connection.executeUpdate(sql: String): Int {
Expand Down
Loading

0 comments on commit 9c2cc89

Please sign in to comment.