diff --git a/src/main/java/com/mongodb/kafka/connect/source/MongoSourceConfig.java b/src/main/java/com/mongodb/kafka/connect/source/MongoSourceConfig.java index 79ac9128..ef631944 100644 --- a/src/main/java/com/mongodb/kafka/connect/source/MongoSourceConfig.java +++ b/src/main/java/com/mongodb/kafka/connect/source/MongoSourceConfig.java @@ -71,7 +71,9 @@ import com.mongodb.kafka.connect.source.MongoSourceConfig.StartupConfig.StartupMode; import com.mongodb.kafka.connect.source.json.formatter.JsonWriterSettingsProvider; +import com.mongodb.kafka.connect.source.producer.BinaryOutboxSchemaAndValueProducer; import com.mongodb.kafka.connect.source.schema.AvroSchema; +import com.mongodb.kafka.connect.source.topic.mapping.BinaryOutboxTopicMapper; import com.mongodb.kafka.connect.source.topic.mapping.TopicMapper; import com.mongodb.kafka.connect.util.ConfigHelper; import com.mongodb.kafka.connect.util.ConnectConfigException; @@ -113,7 +115,8 @@ public class MongoSourceConfig extends AbstractConfig { "The output format of the data produced by the connector for the value. Supported formats are:\n" + " * `json` - Raw Json strings\n" + " * `bson` - Bson byte array\n" - + " * `schema` - Schema'd output\n"; + + " * `schema` - Schema'd output\n" + + " * `binary` - Binary output\n"; public static final String OUTPUT_JSON_FORMATTER_CONFIG = "output.json.formatter"; private static final String OUTPUT_JSON_FORMATTER_DEFAULT = @@ -714,7 +717,8 @@ boolean allowDiskUse() { public enum OutputFormat { JSON, BSON, - SCHEMA + SCHEMA, + BINARY_OUTBOX } public enum ErrorTolerance { @@ -1236,6 +1240,42 @@ public Map validateAll(final Map props) { ConfigDef.Width.MEDIUM, OUTPUT_JSON_FORMATTER_DISPLAY); + configDef.define( + BinaryOutboxTopicMapper.TOPIC_CONFIG, + Type.STRING, + BinaryOutboxTopicMapper.TOPIC_CONFIG_DEFAULT, + null, + Importance.LOW, + BinaryOutboxTopicMapper.TOPIC_CONFIG_DOC, + "BinaryOutboxTopicMapper", + 0, + Width.MEDIUM, + BinaryOutboxTopicMapper.TOPIC_CONFIG_DISPLAY); + + configDef.define( + BinaryOutboxSchemaAndValueProducer.KEY_CONFIG, + Type.STRING, + BinaryOutboxSchemaAndValueProducer.KEY_CONFIG_DEFAULT, + null, + Importance.LOW, + BinaryOutboxSchemaAndValueProducer.KEY_CONFIG_DOC, + "BinaryOutboxSchemaAndValueProducer", + 0, + Width.MEDIUM, + BinaryOutboxSchemaAndValueProducer.KEY_CONFIG_DISPLAY); + + configDef.define( + BinaryOutboxSchemaAndValueProducer.VALUE_CONFIG, + Type.STRING, + BinaryOutboxSchemaAndValueProducer.VALUE_CONFIG_DEFAULT, + null, + Importance.LOW, + BinaryOutboxSchemaAndValueProducer.VALUE_CONFIG_DOC, + "BinaryOutboxSchemaAndValueProducer", + 0, + Width.MEDIUM, + BinaryOutboxSchemaAndValueProducer.VALUE_CONFIG_DISPLAY); + group = "Startup"; orderInGroup = 0; diff --git a/src/main/java/com/mongodb/kafka/connect/source/StartedMongoSourceTask.java b/src/main/java/com/mongodb/kafka/connect/source/StartedMongoSourceTask.java index b0df1b1d..4e9b11eb 100644 --- a/src/main/java/com/mongodb/kafka/connect/source/StartedMongoSourceTask.java +++ b/src/main/java/com/mongodb/kafka/connect/source/StartedMongoSourceTask.java @@ -248,7 +248,9 @@ private List pollInternal() { } BsonDocument keyDocument = - sourceConfig.getKeyOutputFormat() == MongoSourceConfig.OutputFormat.SCHEMA + (sourceConfig.getKeyOutputFormat() == MongoSourceConfig.OutputFormat.SCHEMA + || sourceConfig.getKeyOutputFormat() + == MongoSourceConfig.OutputFormat.BINARY_OUTBOX) ? changeStreamDocument : new BsonDocument(ID_FIELD, changeStreamDocument.get(ID_FIELD)); diff --git a/src/main/java/com/mongodb/kafka/connect/source/producer/BinaryOutboxSchemaAndValueProducer.java b/src/main/java/com/mongodb/kafka/connect/source/producer/BinaryOutboxSchemaAndValueProducer.java new file mode 100644 index 00000000..f3347aac --- /dev/null +++ b/src/main/java/com/mongodb/kafka/connect/source/producer/BinaryOutboxSchemaAndValueProducer.java @@ -0,0 +1,80 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.kafka.connect.source.producer; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.bson.BsonDocument; + +/** + * topic.mapper = "com.mongodb.kafka.connect.source.topic.mapping.BinaryTopicMapper" + * output.format.value = "BINARY_OUTBOX" output.format.key = "BINARY_OUTBOX" + * binary_outbox.document.topic = "topic" binary_outbox.document.key = "key" + * binary_outbox.document.value = "value" + */ +public class BinaryOutboxSchemaAndValueProducer implements SchemaAndValueProducer { + + private static final Logger LOGGER = + LoggerFactory.getLogger(BinaryOutboxSchemaAndValueProducer.class); + + public static final String KEY_CONFIG = "binary_outbox.document.key"; + public static final String KEY_CONFIG_DEFAULT = "key"; + public static final String KEY_CONFIG_DOC = "binary_outbox.document.key"; + public static final String KEY_CONFIG_DISPLAY = "binary_outbox.document.key"; + + public static final String VALUE_CONFIG = "binary_outbox.document.value"; + public static final String VALUE_CONFIG_DEFAULT = "message"; + public static final String VALUE_CONFIG_DOC = "binary_outbox.document.value"; + public static final String VALUE_CONFIG_DISPLAY = "binary_outbox.document.value"; + + private final boolean isValue; + + private final String fieldName; + + BinaryOutboxSchemaAndValueProducer(final String fieldName, final boolean isValue) { + this.fieldName = fieldName; + this.isValue = isValue; + } + + @Override + public SchemaAndValue get(final BsonDocument changeStreamDocument) { + if (!changeStreamDocument.containsKey("fullDocument")) { + LOGGER.error( + "document does not contain a topic field named 'fullDocument': {}", + changeStreamDocument.toJson()); + return new SchemaAndValue(Schema.BYTES_SCHEMA, null); + } + + BsonDocument fullDocument = changeStreamDocument.getDocument("fullDocument"); + + if (!fullDocument.containsKey(fieldName)) { + LOGGER.error("document does not contain a field named '{}'", fieldName); + return new SchemaAndValue(Schema.BYTES_SCHEMA, null); + } + + if (isValue) { + LOGGER.info("value '{}' = {}", fieldName, fullDocument.getBinary(fieldName).getData()); + return new SchemaAndValue(Schema.BYTES_SCHEMA, fullDocument.getBinary(fieldName).getData()); + } else { + LOGGER.info("key '{}' = {}", fieldName, fullDocument.getString(fieldName).getValue()); + return new SchemaAndValue(Schema.STRING_SCHEMA, fullDocument.getString(fieldName).getValue()); + } + } +} diff --git a/src/main/java/com/mongodb/kafka/connect/source/producer/SchemaAndValueProducers.java b/src/main/java/com/mongodb/kafka/connect/source/producer/SchemaAndValueProducers.java index 77268ec6..98b7329b 100644 --- a/src/main/java/com/mongodb/kafka/connect/source/producer/SchemaAndValueProducers.java +++ b/src/main/java/com/mongodb/kafka/connect/source/producer/SchemaAndValueProducers.java @@ -55,6 +55,12 @@ private static SchemaAndValueProducer createSchemaAndValueProvider( return new InferSchemaAndValueProducer(config.getJsonWriterSettings()); } return new AvroSchemaAndValueProducer(jsonSchema, config.getJsonWriterSettings()); + case BINARY_OUTBOX: + String fieldName = + isValue + ? config.getString(BinaryOutboxSchemaAndValueProducer.VALUE_CONFIG) + : config.getString(BinaryOutboxSchemaAndValueProducer.KEY_CONFIG); + return new BinaryOutboxSchemaAndValueProducer(fieldName, isValue); default: throw new ConnectException("Unsupported key output format" + config.getKeyOutputFormat()); } diff --git a/src/main/java/com/mongodb/kafka/connect/source/topic/mapping/BinaryOutboxTopicMapper.java b/src/main/java/com/mongodb/kafka/connect/source/topic/mapping/BinaryOutboxTopicMapper.java new file mode 100644 index 00000000..ec741e85 --- /dev/null +++ b/src/main/java/com/mongodb/kafka/connect/source/topic/mapping/BinaryOutboxTopicMapper.java @@ -0,0 +1,73 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.kafka.connect.source.topic.mapping; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.bson.BsonDocument; +import org.bson.BsonValue; + +import com.mongodb.kafka.connect.source.MongoSourceConfig; + +/** + * topic.mapper = "com.mongodb.kafka.connect.source.topic.mapping.BinaryOutboxTopicMapper" + * output.format.value = "BINARY_OUTBOX" output.format.key = "BINARY_OUTBOX" + * binary_outbox.document.topic = "topic" binary_outbox.document.key = "key" + * binary_outbox.document.value = "value" + */ +public class BinaryOutboxTopicMapper implements TopicMapper { + + private static final Logger LOGGER = LoggerFactory.getLogger(BinaryOutboxTopicMapper.class); + + public static final String TOPIC_CONFIG = "binary_outbox.document.topic"; + public static final String TOPIC_CONFIG_DEFAULT = "topic"; + public static final String TOPIC_CONFIG_DOC = "property name of topic in Mongo collection"; + public static final String TOPIC_CONFIG_DISPLAY = "property name of topic in Mongo collection"; + + private String topicName; + + @Override + public void configure(final MongoSourceConfig configuration) { + topicName = configuration.getString(TOPIC_CONFIG); + LOGGER.info("topic field is '{}'", topicName); + } + + @Override + public String getTopic(final BsonDocument changeStreamDocument) { + if (!changeStreamDocument.containsKey("fullDocument")) { + LOGGER.error("document does not contain a topic field named 'fullDocument'"); + return ""; + } + + BsonDocument fullDocument = changeStreamDocument.getDocument("fullDocument"); + + if (!fullDocument.containsKey(topicName)) { + LOGGER.error("document does not contain a topic field named '{}'", topicName); + return ""; + } + + BsonValue topicNameValue = fullDocument.get(topicName); + if (!topicNameValue.isString() || topicNameValue.isNull()) { + LOGGER.error("topic field is not of type String"); + return ""; + } + + LOGGER.info("topic name is: {}", topicNameValue.asString().getValue()); + return topicNameValue.asString().getValue(); + } +} diff --git a/src/test/java/com/mongodb/kafka/connect/source/MongoSourceConfigTest.java b/src/test/java/com/mongodb/kafka/connect/source/MongoSourceConfigTest.java index 90c44127..0b6d9b6d 100644 --- a/src/test/java/com/mongodb/kafka/connect/source/MongoSourceConfigTest.java +++ b/src/test/java/com/mongodb/kafka/connect/source/MongoSourceConfigTest.java @@ -149,6 +149,15 @@ void testOutputFormat() { assertEquals( OutputFormat.SCHEMA, createSourceConfig(OUTPUT_FORMAT_VALUE_CONFIG, "schema").getValueOutputFormat()), + () -> + assertEquals( + OutputFormat.BINARY_OUTBOX, + createSourceConfig(OUTPUT_FORMAT_KEY_CONFIG, "binary_outbox").getKeyOutputFormat()), + () -> + assertEquals( + OutputFormat.BINARY_OUTBOX, + createSourceConfig(OUTPUT_FORMAT_VALUE_CONFIG, "binary_outbox") + .getValueOutputFormat()), () -> assertInvalid(OUTPUT_FORMAT_KEY_CONFIG, "avro"), () -> assertInvalid(OUTPUT_FORMAT_VALUE_CONFIG, "avro"), () -> assertInvalid(OUTPUT_FORMAT_KEY_CONFIG, "[]"),