From 6775ab93a831d5256990179df453fb05a6981282 Mon Sep 17 00:00:00 2001 From: Spencer Carlson Date: Thu, 18 Nov 2021 15:42:09 -0700 Subject: [PATCH 1/2] timestamp working --- .../connect/util/TimestampConverter.java | 122 ++++++++---------- .../connect/util/TimestampConverterTests.java | 119 ++++++++--------- 2 files changed, 113 insertions(+), 128 deletions(-) diff --git a/src/main/java/oryanmoshe/kafka/connect/util/TimestampConverter.java b/src/main/java/oryanmoshe/kafka/connect/util/TimestampConverter.java index 8c87252..db50781 100644 --- a/src/main/java/oryanmoshe/kafka/connect/util/TimestampConverter.java +++ b/src/main/java/oryanmoshe/kafka/connect/util/TimestampConverter.java @@ -5,7 +5,11 @@ import java.text.SimpleDateFormat; import java.time.Instant; -import java.util.Date; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoField; +import java.time.temporal.ChronoUnit; import java.util.List; import java.util.Map; import java.util.Properties; @@ -33,33 +37,32 @@ public class TimestampConverter implements CustomConverter s.equalsIgnoreCase(column.typeName()))) { - boolean isTime = "time".equalsIgnoreCase(column.typeName()); // Use a new SchemaBuilder every time in order to avoid changing "Already set" options // in the schema builder between tables. - registration.register(SchemaBuilder.string().optional(), rawValue -> { + + // Building the schema for the payload + final SchemaBuilder builder = column.isOptional() ? SchemaBuilder.string().optional() : SchemaBuilder.string().required(); + registration.register(builder, rawValue -> { if (rawValue == null) { - // DEBUG - if (this.debug) { - System.out.printf("[TimestampConverter.converterFor] rawValue of %s is null.%n", column.name()); - } - - if (column.isOptional()) { - return null; - } - else if (column.hasDefaultValue()) { - return column.defaultValue(); - } - return rawValue; + if (this.debug) { System.out.printf("[TimestampConverter.converterFor] rawValue of %s is null.%n", column.name()); } + + if (column.isOptional()) { return null; } + else if (column.hasDefaultValue()) { return column.defaultValue(); } + return null; } - Long millis = getMillis(rawValue.toString(), isTime); - if (millis == null) - return rawValue.toString(); + final Long epoch = parseToEpoch(rawValue.toString()); + + if (this.debug) { System.out.printf("[TimestampConverter.converterFor] Parsed epoch: %d%n", epoch); } + if (epoch == null) { return rawValue.toString(); } - Instant instant = Instant.ofEpochMilli(millis); - Date dateObject = Date.from(instant); - if (this.debug) + final Instant instant = Instant.EPOCH.plus(epoch, ChronoUnit.MICROS); + + if (this.debug) { System.out.printf( - "[TimestampConverter.converterFor] Before returning conversion. column.name: %s, column.typeName: %s, millis: %d%n", - column.name(), column.typeName(), millis); + "[TimestampConverter.converterFor] Before returning conversion. column.name: %s, column.typeName: %s, epoch: %d%n", + column.name(), column.typeName(), epoch); + } switch (column.typeName().toLowerCase()) { case "time": - return this.simpleTimeFormatter.format(dateObject); + if (this.debug) { System.out.println("Using timeFormatter"); } + return this.timeFormatter.format(instant); case "date": - return this.simpleDateFormatter.format(dateObject); + if (this.debug) { System.out.println("Using dateFormatter"); } + return this.dateFormatter.format(instant); default: - return this.simpleDatetimeFormatter.format(dateObject); + if (this.debug) { System.out.println("Using dateTimeFormatter"); } + return this.dateTimeFormatter.format(instant); } }); } } - private Long getMillis(String timestamp, boolean isTime) { - if (timestamp.isBlank()) - return null; - + private Long parseToEpoch(final String timestamp) { + if (timestamp == null || timestamp.isBlank()) { return null; } if (timestamp.contains(":") || timestamp.contains("-")) { - return milliFromDateString(timestamp); + return epochFromDateString(timestamp); } + return Long.parseLong(timestamp); + } - int excessLength = timestamp.length() - MILLIS_LENGTH; - long longTimestamp = Long.parseLong(timestamp); - - if (isTime) - return longTimestamp; - - if (excessLength < 0) - return longTimestamp * 24 * 60 * 60 * 1000; + private Long epochFromDateString(final String timestamp) { + System.out.println("TIMESTAMP: " + timestamp); - long millis = longTimestamp / (long) Math.pow(10, excessLength); - return millis; - } +// final Instant instant = LocalDateTime.parse(timestamp, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS")).atZone(ZoneOffset.UTC).toInstant(); +// return Long.parseLong(String.valueOf(instant.getEpochSecond()) + instant.getLong(ChronoField.MICRO_OF_SECOND)); - private Long milliFromDateString(String timestamp) { Matcher matches = regexPattern.matcher(timestamp); if (matches.find()) { @@ -146,13 +141,10 @@ private Long milliFromDateString(String timestamp) { String second = matches.group("second") != null ? matches.group("second") : "00"; String milli = matches.group("milli") != null ? matches.group("milli") : "000"; - if (milli.length() > 3) - milli = milli.substring(0, 3); - String dateStr = ""; dateStr += String.format("%s:%s:%s.%s", ("00".substring(hour.length()) + hour), ("00".substring(minute.length()) + minute), ("00".substring(second.length()) + second), - (milli + "000".substring(milli.length()))); + (milli + "000000".substring(milli.length()))); if (year != null) { if (month.length() > 2) @@ -164,10 +156,10 @@ private Long milliFromDateString(String timestamp) { dateStr = String.format("%s-%s-%sT%sZ", "2020", "01", "01", dateStr); } - Date dateObj = Date.from(Instant.parse(dateStr)); - return dateObj.getTime(); - } + final Instant instant = Instant.parse(dateStr); + return Long.parseLong(String.valueOf(instant.getEpochSecond()) + instant.getLong(ChronoField.MICRO_OF_SECOND)); + } return null; } } diff --git a/src/test/java/oryanmoshe/kafka/connect/util/TimestampConverterTests.java b/src/test/java/oryanmoshe/kafka/connect/util/TimestampConverterTests.java index 5a01f37..93384f3 100644 --- a/src/test/java/oryanmoshe/kafka/connect/util/TimestampConverterTests.java +++ b/src/test/java/oryanmoshe/kafka/connect/util/TimestampConverterTests.java @@ -1,6 +1,7 @@ package oryanmoshe.kafka.connect.util; import org.apache.kafka.connect.data.SchemaBuilder; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; @@ -8,12 +9,13 @@ import io.debezium.spi.converter.CustomConverter.Converter; import io.debezium.spi.converter.CustomConverter.ConverterRegistration; +import java.util.Map; import java.util.OptionalInt; import java.util.Properties; import static org.junit.jupiter.api.Assertions.assertEquals; public class TimestampConverterTests { - private class MockRegistration implements ConverterRegistration { + private static class MockRegistration implements ConverterRegistration { public Converter _converter; public S _schema; @@ -24,61 +26,64 @@ public void register(S fieldSchema, Converter converter) { } } - @ParameterizedTest - @CsvSource({ "YY-MM-dd, YY-MM-dd", "," + TimestampConverter.DEFAULT_DATE_FORMAT }) - void configureDateTest(final String inputFormat, final String expectedFormat) { - final TimestampConverter tsConverter = new TimestampConverter(); + @Test + void converterDateTimeStampTest() { + final String columnType = "timestamp"; + final String format = "yyyy-MM-dd HH:mm:ss.SSSSSS"; + final String input = "1637208584795307"; - Properties props = new Properties(); - if (inputFormat != null) - props.put("format.date", inputFormat); + final String expectedResult = "2021-11-18 04:09:44.795307"; - final String beforeConfig = tsConverter.strDateFormat; - assertEquals(null, beforeConfig, beforeConfig + " before configuration, should equal " + null); - System.out.println(beforeConfig); + final Properties props = new Properties(); + props.putAll(Map.of(String.format("format.%s", "datetime"),format, "debug", "true")); + final TimestampConverter tsConverter = new TimestampConverter(); tsConverter.configure(props); - final String actualResult = tsConverter.strDateFormat; - assertEquals(expectedFormat, actualResult, - actualResult + " after configuration, should equal " + expectedFormat); - System.out.println(actualResult); - } + final RelationalColumn mockColumn = getMockColumn(columnType); + final MockRegistration mockRegistration = new MockRegistration<>(); - @ParameterizedTest - @CsvSource({ "mm:ss.SSS, mm:ss.SSS", "," + TimestampConverter.DEFAULT_TIME_FORMAT }) - void configureTimeTest(final String inputFormat, final String expectedFormat) { - final TimestampConverter tsConverter = new TimestampConverter(); + tsConverter.converterFor(mockColumn, mockRegistration); - Properties props = new Properties(); - if (inputFormat != null) - props.put("format.time", inputFormat); + final Object actualResult = mockRegistration._converter.convert(input); - final String beforeConfig = tsConverter.strTimeFormat; - assertEquals(null, beforeConfig, beforeConfig + " before configuration, should equal " + null); - System.out.println(beforeConfig); + assertEquals(expectedResult, actualResult); - tsConverter.configure(props); - final String actualResult = tsConverter.strTimeFormat; - assertEquals(expectedFormat, actualResult, - actualResult + " after configuration, should equal " + expectedFormat); - System.out.println(actualResult); + // Convert from string back + final String columnType2 = "datetime"; + final RelationalColumn mockColumn2 = getMockColumn(columnType2); + tsConverter.converterFor(mockColumn2, mockRegistration); + + final Object actualResult2 = mockRegistration._converter.convert(expectedResult); + + assertEquals(actualResult2, expectedResult); } + @ParameterizedTest - @CsvSource({ "date, YYYY-MM-dd, 18368, 2020-04-16", "date,, 18368, 2020-04-16", "time, mm:ss.SSS, 2230, 00:02.230", - "time,, 2230, 00:00:02.230", "datetime, YYYY-MM-dd, 1587042000279, 2020-04-16", - "datetime,, 1587042000279, 2020-04-16T13:00:00.279Z", "timestamp, YYYY-MM-dd, 1587042000279, 2020-04-16", - "datetime2,, 1587042000279, 2020-04-16T13:00:00.279Z", "datetime2, YYYY-MM-dd, 1587042000279, 2020-04-16", - "timestamp,, 1587042000279, 2020-04-16T13:00:00.279Z", "date, YYYY-MM-dd, 2019-04-19, 2019-04-19", - "datetime,, 2019-04-19 15:13:20.345123, 2019-04-19T15:13:20.345Z", "time,, 15:13:20, 15:13:20.000", - "time,HH:mm:ss, 15:13:20, 15:13:20", "timestamp,, 2019-04-19 15:13:20, 2019-04-19T15:13:20.000Z", + @CsvSource({ "date, YYYY-MM-dd, 18368, 2020-04-16", + "date,, 18368, 2020-04-16", + "time, mm:ss.SSS, 2230, 00:02.230", + "time,, 2230, 00:00:02.230", + "datetime, YYYY-MM-dd, 1587042000279, 2020-04-16", + "datetime,, 1587042000279, 2020-04-16T13:00:00.279Z", + "timestamp, YYYY-MM-dd, 1587042000279, 2020-04-16", + "datetime2,, 1587042000279, 2020-04-16T13:00:00.279Z", + "datetime2, YYYY-MM-dd, 1587042000279, 2020-04-16", + "timestamp,, 1587042000279, 2020-04-16T13:00:00.279Z", + "date, YYYY-MM-dd, 2019-04-19, 2019-04-19", + "datetime,, 2019-04-19 15:13:20.345123, 2019-04-19T15:13:20.345Z", + "time,, 15:13:20, 15:13:20.000", + "time,HH:mm:ss, 15:13:20, 15:13:20", + "timestamp,, 2019-04-19 15:13:20, 2019-04-19T15:13:20.000Z", "datetime,, 19-Apr-2019 15:13:20.345123, 2019-04-19T15:13:20.345Z", "datetime,, 19/04/2019 15:13:20.345123, 2019-04-19T15:13:20.345Z", "datetime,, 2019-4-19 15:13:20.345123, 2019-04-19T15:13:20.345Z", "datetime2,, 2019-4-19 15:13:20.345123, 2019-04-19T15:13:20.345Z", - "datetime,, 2019-4-19 3:1:0.345123, 2019-04-19T03:01:00.345Z", "datetime,YYYY-MM-dd,,", "timestamp,,,", "date,,,"}) + "datetime,, 2019-4-19 3:1:0.345123, 2019-04-19T03:01:00.345Z", + "datetime,YYYY-MM-dd,,", + "timestamp,,,", "date,,,"}) void converterTest(final String columnType, final String format, final String input, final String expectedResult) { final TimestampConverter tsConverter = new TimestampConverter(); @@ -92,24 +97,12 @@ void converterTest(final String columnType, final String format, final String in tsConverter.configure(props); RelationalColumn mockColumn = getMockColumn(columnType); - MockRegistration mockRegistration = new MockRegistration(); + MockRegistration mockRegistration = new MockRegistration<>(); tsConverter.converterFor(mockColumn, mockRegistration); - System.out.println(mockRegistration._schema.name()); - Object actualResult = mockRegistration._converter.convert(input); - System.out.println(actualResult); - if (actualResult == null) { - assertEquals(actualResult, null, String.format( - "columnType: %s, format: %s, input: %s, actualTimeFormat: %s, actualDateFormat: %s, props: %s", - columnType, format, input, tsConverter.strTimeFormat, tsConverter.strDateFormat, props)); - } else { - assertEquals(expectedResult, actualResult, - String.format( - "columnType: %s, format: %s, input: %s, actualTimeFormat: %s, actualDateFormat: %s, props: %s", - columnType, format, input, tsConverter.strTimeFormat, tsConverter.strDateFormat, props)); - } + if (actualResult != null) { assertEquals(expectedResult, actualResult); } } RelationalColumn getMockColumn(String type) { @@ -154,7 +147,7 @@ public String typeExpression() { @Override public OptionalInt scale() { - return null; + return OptionalInt.empty(); } @Override @@ -164,7 +157,7 @@ public int nativeType() { @Override public OptionalInt length() { - return null; + return OptionalInt.empty(); } @Override @@ -214,7 +207,7 @@ public String typeExpression() { @Override public OptionalInt scale() { - return null; + return OptionalInt.empty(); } @Override @@ -224,7 +217,7 @@ public int nativeType() { @Override public OptionalInt length() { - return null; + return OptionalInt.empty(); } @Override @@ -274,7 +267,7 @@ public String typeExpression() { @Override public OptionalInt scale() { - return null; + return OptionalInt.empty(); } @Override @@ -284,7 +277,7 @@ public int nativeType() { @Override public OptionalInt length() { - return null; + return OptionalInt.empty(); } @Override @@ -334,7 +327,7 @@ public String typeExpression() { @Override public OptionalInt scale() { - return null; + return OptionalInt.empty(); } @Override @@ -344,7 +337,7 @@ public int nativeType() { @Override public OptionalInt length() { - return null; + return OptionalInt.empty(); } @Override @@ -394,7 +387,7 @@ public String typeExpression() { @Override public OptionalInt scale() { - return null; + return OptionalInt.empty(); } @Override @@ -404,7 +397,7 @@ public int nativeType() { @Override public OptionalInt length() { - return null; + return OptionalInt.empty(); } @Override From fe41998d7c9fc762cc5f3f055578c0e3e0b2c809 Mon Sep 17 00:00:00 2001 From: Spencer Carlson Date: Fri, 19 Nov 2021 08:15:34 -0700 Subject: [PATCH 2/2] fix tests --- .../connect/util/TimestampConverter.java | 164 +++++++++++------- .../connect/util/TimestampConverterTests.java | 128 +++++++++++++- 2 files changed, 227 insertions(+), 65 deletions(-) diff --git a/src/main/java/oryanmoshe/kafka/connect/util/TimestampConverter.java b/src/main/java/oryanmoshe/kafka/connect/util/TimestampConverter.java index db50781..556aed5 100644 --- a/src/main/java/oryanmoshe/kafka/connect/util/TimestampConverter.java +++ b/src/main/java/oryanmoshe/kafka/connect/util/TimestampConverter.java @@ -2,30 +2,24 @@ import io.debezium.spi.converter.CustomConverter; import io.debezium.spi.converter.RelationalColumn; +import org.apache.kafka.connect.data.SchemaBuilder; -import java.text.SimpleDateFormat; import java.time.Instant; -import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; -import java.time.temporal.ChronoField; import java.time.temporal.ChronoUnit; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.TimeZone; import java.util.regex.Matcher; import java.util.regex.Pattern; -import org.apache.kafka.connect.data.SchemaBuilder; - public class TimestampConverter implements CustomConverter { private static final Map MONTH_MAP = Map.ofEntries(Map.entry("jan", "01"), Map.entry("feb", "02"), Map.entry("mar", "03"), Map.entry("apr", "04"), Map.entry("may", "05"), Map.entry("jun", "06"), Map.entry("jul", "07"), Map.entry("aug", "08"), Map.entry("sep", "09"), Map.entry("oct", "10"), Map.entry("nov", "11"), Map.entry("dec", "12")); - public static final int MILLIS_LENGTH = 13; public static final String DEFAULT_DATETIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; public static final String DEFAULT_DATE_FORMAT = "YYYY-MM-dd"; @@ -34,30 +28,30 @@ public class TimestampConverter implements CustomConverter SUPPORTED_DATA_TYPES = List.of("date", "time", "datetime", "timestamp", "datetime2"); - private static final String DATETIME_REGEX = "(?(?(?:(?\\d{4})-(?\\d{1,2})-(?\\d{1,2}))|(?:(?\\d{1,2})\\/(?\\d{1,2})\\/(?\\d{4}))|(?:(?\\d{1,2})-(?\\w{3})-(?\\d{4})))?(?:\\s?T?(?