Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable MongoDB sink with unique fields in MongoDB Sharded cluster #135

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ public class ChangeStreamHandler extends MongoDbHandler {
{
put(OperationType.CREATE, new MongoDbInsert());
put(OperationType.READ, new MongoDbInsert());
put(OperationType.UPDATE, new MongoDbUpdate(MongoDbUpdate.EventFormat.ChangeStream));
put(
OperationType.UPDATE,
new MongoDbUpdate(MongoDbUpdate.EventFormat.ChangeStream, false));
put(OperationType.DELETE, new MongoDbDelete());
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class MongoDbHandler extends DebeziumCdcHandler {
{
put(OperationType.CREATE, new MongoDbInsert());
put(OperationType.READ, new MongoDbInsert());
put(OperationType.UPDATE, new MongoDbUpdate(MongoDbUpdate.EventFormat.Oplog));
put(OperationType.UPDATE, new MongoDbUpdate(MongoDbUpdate.EventFormat.Oplog, false));
put(OperationType.DELETE, new MongoDbDelete());
}
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.mongodb.kafka.connect.sink.cdc.debezium.mongodb;

import java.util.HashMap;
import java.util.Map;

import com.mongodb.kafka.connect.sink.MongoSinkTopicConfig;
import com.mongodb.kafka.connect.sink.cdc.CdcOperation;
import com.mongodb.kafka.connect.sink.cdc.debezium.OperationType;

public class MongoDbUniqueFieldHandler extends MongoDbHandler {
private static final Map<OperationType, CdcOperation> DEFAULT_OPERATIONS =
new HashMap<OperationType, CdcOperation>() {
{
put(OperationType.CREATE, new MongoDbInsert());
put(OperationType.READ, new MongoDbInsert());
put(OperationType.UPDATE, new MongoDbUpdate(MongoDbUpdate.EventFormat.Oplog, true));
put(OperationType.DELETE, new MongoDbDelete());
}
};

public MongoDbUniqueFieldHandler(final MongoSinkTopicConfig config) {
super(config, DEFAULT_OPERATIONS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,14 @@ public enum EventFormat {

private static final String JSON_DOC_FIELD_AFTER = "after";
public static final String INTERNAL_OPLOG_FIELD_V = "$v";
public static final String JSON_DOC_FIELD_FILTER = "filter";

private final EventFormat eventFormat;
private final boolean isUseFilterInValueDoc;

public MongoDbUpdate(final EventFormat eventFormat) {
public MongoDbUpdate(final EventFormat eventFormat, final boolean isUseFilterInValueDoc) {
this.eventFormat = eventFormat;
this.isUseFilterInValueDoc = isUseFilterInValueDoc;
}

@Override
Expand Down Expand Up @@ -91,8 +94,11 @@ private WriteModel<BsonDocument> handleOplogEvent(final SinkDocument doc) {
return new ReplaceOneModel<>(filterDoc, updateDoc, REPLACE_OPTIONS);
}

BsonDocument filterDoc =
!isUseFilterInValueDoc ? getFilterDocByKeyId(doc) : getFilterDocByValue(doc);

// patch contains idempotent change only to update original document with
return new UpdateOneModel<>(getFilterDocByKeyId(doc), updateDoc);
return new UpdateOneModel<>(filterDoc, updateDoc);
}

private WriteModel<BsonDocument> handleChangeStreamEvent(final SinkDocument doc) {
Expand All @@ -119,6 +125,23 @@ private BsonDocument getFilterDocByKeyId(final SinkDocument doc) {
format("{%s: %s}", ID_FIELD, keyDoc.getString(JSON_ID_FIELD).getValue()));
}

private BsonDocument getFilterDocByValue(final SinkDocument doc) {
BsonDocument valueDoc = getDocumentValue(doc);

if (!valueDoc.containsKey(JSON_DOC_FIELD_FILTER)) {
throw new DataException(format("Update document missing `%s` field.", JSON_DOC_FIELD_FILTER));
}

BsonDocument filterDoc =
BsonDocument.parse(valueDoc.getString(JSON_DOC_FIELD_FILTER).getValue());

if (!filterDoc.containsKey(ID_FIELD)) {
throw new DataException(format("Filter document missing `%s` field.", ID_FIELD));
}

return filterDoc;
}

private BsonDocument getDocumentKey(final SinkDocument doc) {
return doc.getKeyDoc()
.orElseThrow(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
package com.mongodb.kafka.connect.sink.cdc.debezium.mongodb;

import static com.mongodb.kafka.connect.sink.SinkTestHelper.createTopicConfig;
import static java.util.Collections.emptyMap;
import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.DynamicTest.dynamicTest;

import java.util.Optional;
import java.util.stream.Stream;

import org.apache.kafka.connect.errors.DataException;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.DynamicTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestFactory;

import org.bson.BsonDocument;
import org.bson.BsonInt32;
import org.bson.BsonNull;
import org.bson.BsonString;

import com.mongodb.client.model.DeleteOneModel;
import com.mongodb.client.model.ReplaceOneModel;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.WriteModel;

import com.mongodb.kafka.connect.sink.cdc.debezium.OperationType;
import com.mongodb.kafka.connect.sink.converter.SinkDocument;

class MongoDbUniqueFieldHandlerTest {

private static final MongoDbUniqueFieldHandler HANDLER_DEFAULT_MAPPING =
new MongoDbUniqueFieldHandler(createTopicConfig());

@Test
@DisplayName("verify existing default config from base class")
void testExistingDefaultConfig() {
assertAll(
() ->
assertNotNull(
HANDLER_DEFAULT_MAPPING.getConfig(), "default config for handler must not be null"),
() ->
assertNotNull(
new MongoDbHandler(createTopicConfig(), emptyMap()).getConfig(),
"default config for handler must not be null"));
}

@Test
@DisplayName("when key document missing then DataException")
void testMissingKeyDocument() {
assertThrows(
DataException.class, () -> HANDLER_DEFAULT_MAPPING.handle(new SinkDocument(null, null)));
}

@Test
@DisplayName("when key doc contains 'id' field but value is empty then null due to tombstone")
void testTombstoneEvent() {
assertEquals(
Optional.empty(),
HANDLER_DEFAULT_MAPPING.handle(
new SinkDocument(new BsonDocument("id", new BsonInt32(1234)), new BsonDocument())),
"tombstone event must result in Optional.empty()");
}

@Test
@DisplayName("when value doc contains unknown operation type then DataException")
void testUnknownCdcOperationType() {
SinkDocument cdcEvent =
new SinkDocument(
new BsonDocument("id", new BsonInt32(1234)),
new BsonDocument("op", new BsonString("x")));
assertThrows(DataException.class, () -> HANDLER_DEFAULT_MAPPING.handle(cdcEvent));
}

@Test
@DisplayName("when value doc contains unmapped operation type then DataException")
void testUnmappedCdcOperationType() {
SinkDocument cdcEvent =
new SinkDocument(
new BsonDocument("_id", new BsonInt32(1234)),
new BsonDocument("op", new BsonString("z"))
.append("after", new BsonString("{_id:1234,foo:\"blah\"}")));

assertThrows(DataException.class, () -> HANDLER_DEFAULT_MAPPING.handle(cdcEvent));
}

@Test
@DisplayName("when value doc contains operation type other than string then DataException")
void testInvalidCdcOperationType() {
SinkDocument cdcEvent =
new SinkDocument(
new BsonDocument("id", new BsonInt32(1234)),
new BsonDocument("op", new BsonInt32('c')));

assertThrows(DataException.class, () -> HANDLER_DEFAULT_MAPPING.handle(cdcEvent));
}

@Test
@DisplayName("when value doc is missing operation type then DataException")
void testMissingCdcOperationType() {
SinkDocument cdcEvent =
new SinkDocument(
new BsonDocument("id", new BsonInt32(1234)), new BsonDocument("po", BsonNull.VALUE));

assertThrows(DataException.class, () -> HANDLER_DEFAULT_MAPPING.handle(cdcEvent));
}

@TestFactory
@DisplayName("when valid CDC event then correct WriteModel")
Stream<DynamicTest> testValidCdcDocument() {

return Stream.of(
dynamicTest(
"test operation " + OperationType.CREATE,
() -> {
Optional<WriteModel<BsonDocument>> result =
HANDLER_DEFAULT_MAPPING.handle(
new SinkDocument(
new BsonDocument("_id", new BsonString("1234")),
new BsonDocument("op", new BsonString("c"))
.append("after", new BsonString("{_id:1234,foo:\"blah\"}"))));
assertTrue(result.isPresent());
assertTrue(
result.get() instanceof ReplaceOneModel,
"result expected to be of type ReplaceOneModel");
}),
dynamicTest(
"test operation " + OperationType.READ,
() -> {
Optional<WriteModel<BsonDocument>> result =
HANDLER_DEFAULT_MAPPING.handle(
new SinkDocument(
new BsonDocument("_id", new BsonString("1234")),
new BsonDocument("op", new BsonString("r"))
.append("after", new BsonString("{_id:1234,foo:\"blah\"}"))));
assertTrue(result.isPresent());
assertTrue(
result.get() instanceof ReplaceOneModel,
"result expected to be of type ReplaceOneModel");
}),
dynamicTest(
"test operation " + OperationType.UPDATE,
() -> {
Optional<WriteModel<BsonDocument>> result =
HANDLER_DEFAULT_MAPPING.handle(
new SinkDocument(
new BsonDocument("id", new BsonString("1234")),
new BsonDocument("op", new BsonString("u"))
.append("patch", new BsonString("{\"$set\":{foo:\"blah\"}}"))
.append("filter", new BsonString("{_id:1234,email:\"blah\"}"))));
assertTrue(result.isPresent());
assertTrue(
result.get() instanceof UpdateOneModel,
"result expected to be of type UpdateOneModel");
}),
dynamicTest(
"test operation " + OperationType.DELETE,
() -> {
Optional<WriteModel<BsonDocument>> result =
HANDLER_DEFAULT_MAPPING.handle(
new SinkDocument(
new BsonDocument("id", new BsonString("1234")),
new BsonDocument("op", new BsonString("d"))));
assertTrue(result.isPresent(), "write model result must be present");
assertTrue(
result.get() instanceof DeleteOneModel,
"result expected to be of type DeleteOneModel");
}));
}

@TestFactory
@DisplayName("when valid cdc operation type then correct MongoDB CdcOperation")
Stream<DynamicTest> testValidCdcOpertionTypes() {

return Stream.of(
dynamicTest(
"test operation " + OperationType.CREATE,
() ->
assertTrue(
HANDLER_DEFAULT_MAPPING.getCdcOperation(
new BsonDocument("op", new BsonString("c")))
instanceof MongoDbInsert)),
dynamicTest(
"test operation " + OperationType.READ,
() ->
assertTrue(
HANDLER_DEFAULT_MAPPING.getCdcOperation(
new BsonDocument("op", new BsonString("r")))
instanceof MongoDbInsert)),
dynamicTest(
"test operation " + OperationType.UPDATE,
() ->
assertTrue(
HANDLER_DEFAULT_MAPPING.getCdcOperation(
new BsonDocument("op", new BsonString("u")))
instanceof MongoDbUpdate)),
dynamicTest(
"test operation " + OperationType.DELETE,
() ->
assertTrue(
HANDLER_DEFAULT_MAPPING.getCdcOperation(
new BsonDocument("op", new BsonString("d")))
instanceof MongoDbDelete)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,17 @@

class MongoDbUpdateTest {
private static final MongoDbUpdate OPLOG_UPDATE =
new MongoDbUpdate(MongoDbUpdate.EventFormat.Oplog);
new MongoDbUpdate(MongoDbUpdate.EventFormat.Oplog, false);
private static final MongoDbUpdate OPLOG_UPDATE_WITH_UNIQUE_FILTER =
new MongoDbUpdate(MongoDbUpdate.EventFormat.Oplog, true);

private static final MongoDbUpdate CHANGE_STREAM_UPDATE =
new MongoDbUpdate(MongoDbUpdate.EventFormat.ChangeStream);
new MongoDbUpdate(MongoDbUpdate.EventFormat.ChangeStream, false);
private static final BsonDocument FILTER_DOC = BsonDocument.parse("{_id: 1234}");
private static final BsonDocument FILTER_DOC_WITH_ID_AND_UNIQUE_FIELD =
BsonDocument.parse("{_id: 1234, username: 'unique'}");
private static final BsonDocument FILTER_DOC_WITH_UNIQUE_FIELD_ONLY =
BsonDocument.parse("{username: 'unique'}");
private static final BsonDocument REPLACEMENT_DOC =
BsonDocument.parse("{_id: 1234, first_name: 'Grace', last_name: 'Hopper'}");
private static final BsonDocument UPDATE_DOC =
Expand Down Expand Up @@ -119,6 +125,55 @@ public void testValidSinkDocumentWithInternalOploagFieldForUpdate() {
assertEquals(FILTER_DOC, writeModel.getFilter());
}

@Test
@DisplayName(
"when valid doc change cdc event containing the filter field then correct UpdateOneModel")
public void testValidSinkDocumentWithFilterFieldForUpdate() {
BsonDocument keyDoc = BsonDocument.parse("{id: '1234'}");
BsonDocument valueDoc =
new BsonDocument("op", new BsonString("u"))
.append("patch", new BsonString(UPDATE_DOC.toJson()))
.append("filter", new BsonString(FILTER_DOC_WITH_ID_AND_UNIQUE_FIELD.toJson()));

WriteModel<BsonDocument> result =
OPLOG_UPDATE_WITH_UNIQUE_FILTER.perform(new SinkDocument(keyDoc, valueDoc));
assertTrue(result instanceof UpdateOneModel, "result expected to be of type UpdateOneModel");

UpdateOneModel<BsonDocument> writeModel = (UpdateOneModel<BsonDocument>) result;
assertEquals(UPDATE_DOC, writeModel.getUpdate(), "update doc not matching what is expected");
assertTrue(
writeModel.getFilter() instanceof BsonDocument,
"filter expected to be of type BsonDocument");
assertEquals(FILTER_DOC_WITH_ID_AND_UNIQUE_FIELD, writeModel.getFilter());
}

@Test
@DisplayName("when missing filter document then DataException")
public void testMissingFilterDocument() {
BsonDocument keyDoc = BsonDocument.parse("{id: '1234'}");
BsonDocument valueDoc =
new BsonDocument("op", new BsonString("u"))
.append("patch", new BsonString(UPDATE_DOC.toJson()));

assertThrows(
DataException.class,
() -> OPLOG_UPDATE_WITH_UNIQUE_FILTER.perform(new SinkDocument(keyDoc, valueDoc)));
}

@Test
@DisplayName("when missing id doc in the filter document then DataException")
public void testMissingIdInFilterDocument() {
BsonDocument keyDoc = BsonDocument.parse("{id: '1234'}");
BsonDocument valueDoc =
new BsonDocument("op", new BsonString("u"))
.append("patch", new BsonString(UPDATE_DOC.toJson()))
.append("filter", new BsonString(FILTER_DOC_WITH_UNIQUE_FIELD_ONLY.toJson()));

assertThrows(
DataException.class,
() -> OPLOG_UPDATE_WITH_UNIQUE_FILTER.perform(new SinkDocument(keyDoc, valueDoc)));
}

@Test
@DisplayName("when valid doc replace change stream cdc event then correct ReplaceOneModel")
void testValidChangeStreamSinkDocumentForReplacement() {
Expand Down