Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Binary outbox pattern #146

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@

import com.mongodb.kafka.connect.source.MongoSourceConfig.StartupConfig.StartupMode;
import com.mongodb.kafka.connect.source.json.formatter.JsonWriterSettingsProvider;
import com.mongodb.kafka.connect.source.producer.BinaryOutboxSchemaAndValueProducer;
import com.mongodb.kafka.connect.source.schema.AvroSchema;
import com.mongodb.kafka.connect.source.topic.mapping.BinaryOutboxTopicMapper;
import com.mongodb.kafka.connect.source.topic.mapping.TopicMapper;
import com.mongodb.kafka.connect.util.ConfigHelper;
import com.mongodb.kafka.connect.util.ConnectConfigException;
Expand Down Expand Up @@ -113,7 +115,8 @@ public class MongoSourceConfig extends AbstractConfig {
"The output format of the data produced by the connector for the value. Supported formats are:\n"
+ " * `json` - Raw Json strings\n"
+ " * `bson` - Bson byte array\n"
+ " * `schema` - Schema'd output\n";
+ " * `schema` - Schema'd output\n"
+ " * `binary` - Binary output\n";

public static final String OUTPUT_JSON_FORMATTER_CONFIG = "output.json.formatter";
private static final String OUTPUT_JSON_FORMATTER_DEFAULT =
Expand Down Expand Up @@ -714,7 +717,8 @@ boolean allowDiskUse() {
public enum OutputFormat {
JSON,
BSON,
SCHEMA
SCHEMA,
BINARY_OUTBOX
}

public enum ErrorTolerance {
Expand Down Expand Up @@ -1236,6 +1240,42 @@ public Map<String, ConfigValue> validateAll(final Map<String, String> props) {
ConfigDef.Width.MEDIUM,
OUTPUT_JSON_FORMATTER_DISPLAY);

configDef.define(
BinaryOutboxTopicMapper.TOPIC_CONFIG,
Type.STRING,
BinaryOutboxTopicMapper.TOPIC_CONFIG_DEFAULT,
null,
Importance.LOW,
BinaryOutboxTopicMapper.TOPIC_CONFIG_DOC,
"BinaryOutboxTopicMapper",
0,
Width.MEDIUM,
BinaryOutboxTopicMapper.TOPIC_CONFIG_DISPLAY);

configDef.define(
BinaryOutboxSchemaAndValueProducer.KEY_CONFIG,
Type.STRING,
BinaryOutboxSchemaAndValueProducer.KEY_CONFIG_DEFAULT,
null,
Importance.LOW,
BinaryOutboxSchemaAndValueProducer.KEY_CONFIG_DOC,
"BinaryOutboxSchemaAndValueProducer",
0,
Width.MEDIUM,
BinaryOutboxSchemaAndValueProducer.KEY_CONFIG_DISPLAY);

configDef.define(
BinaryOutboxSchemaAndValueProducer.VALUE_CONFIG,
Type.STRING,
BinaryOutboxSchemaAndValueProducer.VALUE_CONFIG_DEFAULT,
null,
Importance.LOW,
BinaryOutboxSchemaAndValueProducer.VALUE_CONFIG_DOC,
"BinaryOutboxSchemaAndValueProducer",
0,
Width.MEDIUM,
BinaryOutboxSchemaAndValueProducer.VALUE_CONFIG_DISPLAY);

group = "Startup";
orderInGroup = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,9 @@ private List<SourceRecord> pollInternal() {
}

BsonDocument keyDocument =
sourceConfig.getKeyOutputFormat() == MongoSourceConfig.OutputFormat.SCHEMA
(sourceConfig.getKeyOutputFormat() == MongoSourceConfig.OutputFormat.SCHEMA
|| sourceConfig.getKeyOutputFormat()
== MongoSourceConfig.OutputFormat.BINARY_OUTBOX)
? changeStreamDocument
: new BsonDocument(ID_FIELD, changeStreamDocument.get(ID_FIELD));

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.mongodb.kafka.connect.source.producer;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.bson.BsonDocument;

/**
* topic.mapper = "com.mongodb.kafka.connect.source.topic.mapping.BinaryTopicMapper"
* output.format.value = "BINARY_OUTBOX" output.format.key = "BINARY_OUTBOX"
* binary_outbox.document.topic = "topic" binary_outbox.document.key = "key"
* binary_outbox.document.value = "value"
*/
public class BinaryOutboxSchemaAndValueProducer implements SchemaAndValueProducer {

private static final Logger LOGGER =
LoggerFactory.getLogger(BinaryOutboxSchemaAndValueProducer.class);

public static final String KEY_CONFIG = "binary_outbox.document.key";
public static final String KEY_CONFIG_DEFAULT = "key";
public static final String KEY_CONFIG_DOC = "binary_outbox.document.key";
public static final String KEY_CONFIG_DISPLAY = "binary_outbox.document.key";

public static final String VALUE_CONFIG = "binary_outbox.document.value";
public static final String VALUE_CONFIG_DEFAULT = "message";
public static final String VALUE_CONFIG_DOC = "binary_outbox.document.value";
public static final String VALUE_CONFIG_DISPLAY = "binary_outbox.document.value";

private final boolean isValue;

private final String fieldName;

BinaryOutboxSchemaAndValueProducer(final String fieldName, final boolean isValue) {
this.fieldName = fieldName;
this.isValue = isValue;
}

@Override
public SchemaAndValue get(final BsonDocument changeStreamDocument) {
if (!changeStreamDocument.containsKey("fullDocument")) {
LOGGER.error(
"document does not contain a topic field named 'fullDocument': {}",
changeStreamDocument.toJson());
return new SchemaAndValue(Schema.BYTES_SCHEMA, null);
}

BsonDocument fullDocument = changeStreamDocument.getDocument("fullDocument");

if (!fullDocument.containsKey(fieldName)) {
LOGGER.error("document does not contain a field named '{}'", fieldName);
return new SchemaAndValue(Schema.BYTES_SCHEMA, null);
}

if (isValue) {
LOGGER.info("value '{}' = {}", fieldName, fullDocument.getBinary(fieldName).getData());
return new SchemaAndValue(Schema.BYTES_SCHEMA, fullDocument.getBinary(fieldName).getData());
} else {
LOGGER.info("key '{}' = {}", fieldName, fullDocument.getString(fieldName).getValue());
return new SchemaAndValue(Schema.STRING_SCHEMA, fullDocument.getString(fieldName).getValue());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ private static SchemaAndValueProducer createSchemaAndValueProvider(
return new InferSchemaAndValueProducer(config.getJsonWriterSettings());
}
return new AvroSchemaAndValueProducer(jsonSchema, config.getJsonWriterSettings());
case BINARY_OUTBOX:
String fieldName =
isValue
? config.getString(BinaryOutboxSchemaAndValueProducer.VALUE_CONFIG)
: config.getString(BinaryOutboxSchemaAndValueProducer.KEY_CONFIG);
return new BinaryOutboxSchemaAndValueProducer(fieldName, isValue);
default:
throw new ConnectException("Unsupported key output format" + config.getKeyOutputFormat());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.mongodb.kafka.connect.source.topic.mapping;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.bson.BsonDocument;
import org.bson.BsonValue;

import com.mongodb.kafka.connect.source.MongoSourceConfig;

/**
* topic.mapper = "com.mongodb.kafka.connect.source.topic.mapping.BinaryOutboxTopicMapper"
* output.format.value = "BINARY_OUTBOX" output.format.key = "BINARY_OUTBOX"
* binary_outbox.document.topic = "topic" binary_outbox.document.key = "key"
* binary_outbox.document.value = "value"
*/
public class BinaryOutboxTopicMapper implements TopicMapper {

private static final Logger LOGGER = LoggerFactory.getLogger(BinaryOutboxTopicMapper.class);

public static final String TOPIC_CONFIG = "binary_outbox.document.topic";
public static final String TOPIC_CONFIG_DEFAULT = "topic";
public static final String TOPIC_CONFIG_DOC = "property name of topic in Mongo collection";
public static final String TOPIC_CONFIG_DISPLAY = "property name of topic in Mongo collection";

private String topicName;

@Override
public void configure(final MongoSourceConfig configuration) {
topicName = configuration.getString(TOPIC_CONFIG);
LOGGER.info("topic field is '{}'", topicName);
}

@Override
public String getTopic(final BsonDocument changeStreamDocument) {
if (!changeStreamDocument.containsKey("fullDocument")) {
LOGGER.error("document does not contain a topic field named 'fullDocument'");
return "";
}

BsonDocument fullDocument = changeStreamDocument.getDocument("fullDocument");

if (!fullDocument.containsKey(topicName)) {
LOGGER.error("document does not contain a topic field named '{}'", topicName);
return "";
}

BsonValue topicNameValue = fullDocument.get(topicName);
if (!topicNameValue.isString() || topicNameValue.isNull()) {
LOGGER.error("topic field is not of type String");
return "";
}

LOGGER.info("topic name is: {}", topicNameValue.asString().getValue());
return topicNameValue.asString().getValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,15 @@ void testOutputFormat() {
assertEquals(
OutputFormat.SCHEMA,
createSourceConfig(OUTPUT_FORMAT_VALUE_CONFIG, "schema").getValueOutputFormat()),
() ->
assertEquals(
OutputFormat.BINARY_OUTBOX,
createSourceConfig(OUTPUT_FORMAT_KEY_CONFIG, "binary_outbox").getKeyOutputFormat()),
() ->
assertEquals(
OutputFormat.BINARY_OUTBOX,
createSourceConfig(OUTPUT_FORMAT_VALUE_CONFIG, "binary_outbox")
.getValueOutputFormat()),
() -> assertInvalid(OUTPUT_FORMAT_KEY_CONFIG, "avro"),
() -> assertInvalid(OUTPUT_FORMAT_VALUE_CONFIG, "avro"),
() -> assertInvalid(OUTPUT_FORMAT_KEY_CONFIG, "[]"),
Expand Down