From 25d416ec49311b57b564eaae5e06ab2741790f65 Mon Sep 17 00:00:00 2001 From: Rick Hanton Date: Thu, 25 Jan 2024 23:08:09 -0700 Subject: [PATCH] KAFKA-398 - SimplifiedJson.java edit for binary UUID --- .../connect/FullDocumentRoundTripIntegrationTest.java | 3 +++ .../kafka/connect/source/MongoSourceConfig.java | 2 +- .../connect/source/json/formatter/SimplifiedJson.java | 10 ++++++++-- .../json/formatter/JsonWriterSettingsProviderTest.java | 3 +++ .../source/producer/SchemaAndValueProducerTest.java | 4 +++- 5 files changed, 18 insertions(+), 4 deletions(-) diff --git a/src/integrationTest/java/com/mongodb/kafka/connect/FullDocumentRoundTripIntegrationTest.java b/src/integrationTest/java/com/mongodb/kafka/connect/FullDocumentRoundTripIntegrationTest.java index 188bae05..3ea4be16 100644 --- a/src/integrationTest/java/com/mongodb/kafka/connect/FullDocumentRoundTripIntegrationTest.java +++ b/src/integrationTest/java/com/mongodb/kafka/connect/FullDocumentRoundTripIntegrationTest.java @@ -87,6 +87,7 @@ void tearDown() { + " \"B\": {\"$date\": {\"$numberLong\": \"1577863627000\"}}," + " \"C\": {\"$numberDecimal\": \"12345.6789\"}}," + " \"myArray\": [{\"$numberInt\": \"1\"}, {\"$numberInt\": \"2\"}, {\"$numberInt\": \"3\"}]," + + " \"myUuid\": {\"$binary\": {\"base64\": \"v7ESnpXpQ1iQNHXUWz4qcw==\", \"subType\": \"04\"}}," + " \"myBytes\": {\"$binary\": {\"base64\": \"S2Fma2Egcm9ja3Mh\", \"subType\": \"00\"}}," + " \"myDate\": {\"$date\": {\"$numberLong\": \"1234567890\"}}," + " \"myDecimal\": {\"$numberDecimal\": \"12345.6789\"}" @@ -100,6 +101,7 @@ void tearDown() { + "\"myDouble\": 20.21, " + "\"mySubDoc\": {\"A\": \"S2Fma2Egcm9ja3Mh\", \"B\": \"2020-01-01T07:27:07Z\", \"C\": \"12345.6789\"}, " + "\"myArray\": [1, 2, 3], " + + "\"myUuid\": \"bfb1129e-95e9-4358-9034-75d45b3e2a73\", " + "\"myBytes\": \"S2Fma2Egcm9ja3Mh\", " + "\"myDate\": \"1970-01-15T06:56:07.89Z\", " + "\"myDecimal\": \"12345.6789\"}"; @@ -122,6 +124,7 @@ void tearDown() { + " }" + " }, " + " {\"name\": \"myArray\", \"type\": {\"type\" : \"array\", \"items\" : \"int\"}}, " + + " {\"name\": \"myUuid\", \"type\": \"string\"}, " + " {\"name\": \"myBytes\", \"type\": \"string\"}, " + " {\"name\": \"myDate\", \"type\": \"string\"}, " + " {\"name\": \"myDecimal\", \"type\": \"string\"}" diff --git a/src/main/java/com/mongodb/kafka/connect/source/MongoSourceConfig.java b/src/main/java/com/mongodb/kafka/connect/source/MongoSourceConfig.java index 31efbab2..61ebee10 100644 --- a/src/main/java/com/mongodb/kafka/connect/source/MongoSourceConfig.java +++ b/src/main/java/com/mongodb/kafka/connect/source/MongoSourceConfig.java @@ -124,7 +124,7 @@ public class MongoSourceConfig extends AbstractConfig { + " * com.mongodb.kafka.connect.source.json.formatter.DefaultJson: The legacy strict json formatter.\n" + " * com.mongodb.kafka.connect.source.json.formatter.ExtendedJson: The fully type safe extended json formatter.\n" + " * com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson: Simplified Json, " - + "with ObjectId, Decimals, Dates and Binary values represented as strings.\n\n" + + "with ObjectId, UUIDs, Decimals, Dates and Binary values represented as strings.\n\n" + "Users can provide their own implementation of the com.mongodb.kafka.connect.source.json.formatter."; public static final String OUTPUT_SCHEMA_KEY_CONFIG = "output.schema.key"; diff --git a/src/main/java/com/mongodb/kafka/connect/source/json/formatter/SimplifiedJson.java b/src/main/java/com/mongodb/kafka/connect/source/json/formatter/SimplifiedJson.java index ca9effb9..92a12822 100644 --- a/src/main/java/com/mongodb/kafka/connect/source/json/formatter/SimplifiedJson.java +++ b/src/main/java/com/mongodb/kafka/connect/source/json/formatter/SimplifiedJson.java @@ -22,6 +22,7 @@ import java.time.format.DateTimeFormatter; import java.util.Base64; +import org.bson.BsonBinarySubType; import org.bson.json.JsonMode; import org.bson.json.JsonWriterSettings; @@ -32,8 +33,13 @@ public JsonWriterSettings getJsonWriterSettings() { return JsonWriterSettings.builder() .outputMode(JsonMode.RELAXED) .binaryConverter( - (value, writer) -> - writer.writeString(Base64.getEncoder().encodeToString(value.getData()))) + (value, writer) -> { + if (BsonBinarySubType.UUID_STANDARD.getValue() == value.getType()) { + writer.writeString(value.asUuid().toString()); + } else { + writer.writeString(Base64.getEncoder().encodeToString(value.getData())); + } + }) .dateTimeConverter( (value, writer) -> { ZonedDateTime zonedDateTime = Instant.ofEpochMilli(value).atZone(ZoneOffset.UTC); diff --git a/src/test/java/com/mongodb/kafka/connect/source/json/formatter/JsonWriterSettingsProviderTest.java b/src/test/java/com/mongodb/kafka/connect/source/json/formatter/JsonWriterSettingsProviderTest.java index 9ae43cc0..838fd71b 100644 --- a/src/test/java/com/mongodb/kafka/connect/source/json/formatter/JsonWriterSettingsProviderTest.java +++ b/src/test/java/com/mongodb/kafka/connect/source/json/formatter/JsonWriterSettingsProviderTest.java @@ -37,6 +37,7 @@ public class JsonWriterSettingsProviderTest { + " 'B': {'$date': {'$numberLong': '1577863627000'}}, 'C': {'$numberDecimal': '12345.6789'}}, " + "'myArray': [{'$binary': {'base64': 'S2Fma2Egcm9ja3Mh', 'subType': '00'}}, " + " {'$date': {'$numberLong': '1577863627000'}}, {'$numberDecimal': '12345.6789'}], " + + "'myUuid': {'$binary': {'base64': 'v7ESnpXpQ1iQNHXUWz4qcw==', 'subType': '04'}}, " + "'myBytes': {'$binary': {'base64': 'S2Fma2Egcm9ja3Mh', 'subType': '00'}}, " + "'myDate': {'$date': {'$numberLong': '1577863627000'}}, " + "'myDecimal': {'$numberDecimal': '12345.6789'}}"); @@ -51,6 +52,7 @@ public class JsonWriterSettingsProviderTest { + " 'B': {'$date': {'$numberLong': '1577863627000'}}, 'C': {'$numberDecimal': '12345.6789'}}, " + "'myArray': [{'$binary': {'base64': 'S2Fma2Egcm9ja3Mh', 'subType': '00'}}, " + " {'$date': {'$numberLong': '1577863627000'}}, {'$numberDecimal': '12345.6789'}], " + + "'myUuid': {'$binary': {'base64': 'v7ESnpXpQ1iQNHXUWz4qcw==', 'subType': '04'}}, " + "'myBytes': {'$binary': {'base64': 'S2Fma2Egcm9ja3Mh', 'subType': '00'}}, " + "'myDate': {'$date': {'$numberLong': '1577863627000'}}, " + "'myDecimal': {'$numberDecimal': '12345.6789'}}"); @@ -63,6 +65,7 @@ public class JsonWriterSettingsProviderTest { + "myDouble: 20.21, " + "mySubDoc: {A: 'S2Fma2Egcm9ja3Mh', B: '2020-01-01T07:27:07Z', C: '12345.6789'}," + "myArray: ['S2Fma2Egcm9ja3Mh', '2020-01-01T07:27:07Z', '12345.6789']," + + "myUuid: 'bfb1129e-95e9-4358-9034-75d45b3e2a73', " + "myBytes: 'S2Fma2Egcm9ja3Mh', " + "myDate: '2020-01-01T07:27:07Z', " + "myDecimal: '12345.6789'}"); diff --git a/src/test/java/com/mongodb/kafka/connect/source/producer/SchemaAndValueProducerTest.java b/src/test/java/com/mongodb/kafka/connect/source/producer/SchemaAndValueProducerTest.java index 4c1d5b01..e923f072 100644 --- a/src/test/java/com/mongodb/kafka/connect/source/producer/SchemaAndValueProducerTest.java +++ b/src/test/java/com/mongodb/kafka/connect/source/producer/SchemaAndValueProducerTest.java @@ -384,7 +384,9 @@ static String getLsidId(final boolean simplified) { static String getLsidId(final boolean simplified, final boolean quoted) { return simplified - ? quoted ? "\"c//SZESzTGmQ6OfR38A11A==\"" : "c//SZESzTGmQ6OfR38A11A==" + ? quoted + ? "\"73ffd264-44b3-4c69-90e8-e7d1dfc035d4\"" + : "73ffd264-44b3-4c69-90e8-e7d1dfc035d4" : "{\"$binary\": {\"base64\": \"c//SZESzTGmQ6OfR38A11A==\", \"subType\": \"04\"}}"; }