Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

new feature about CDC Handler for 1 => N(multi row) data handle #116

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.mongodb.kafka.connect.sink;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;

Expand All @@ -40,15 +42,15 @@ final class MongoProcessedSinkRecordData {
private final MongoNamespace namespace;
private final SinkRecord sinkRecord;
private final SinkDocument sinkDocument;
private final WriteModel<BsonDocument> writeModel;
private final List<WriteModel<BsonDocument>> writeModelList;
private Exception exception;

MongoProcessedSinkRecordData(final SinkRecord sinkRecord, final MongoSinkConfig sinkConfig) {
this.sinkRecord = sinkRecord;
this.config = sinkConfig.getMongoSinkTopicConfig(sinkRecord.topic());
this.sinkDocument = SINK_CONVERTER.convert(sinkRecord);
this.namespace = createNamespace();
this.writeModel = createWriteModel();
this.writeModelList = createMultiRowWriteModel();
}

public MongoSinkTopicConfig getConfig() {
Expand All @@ -63,8 +65,8 @@ public SinkRecord getSinkRecord() {
return sinkRecord;
}

public WriteModel<BsonDocument> getWriteModel() {
return writeModel;
public List<WriteModel<BsonDocument>> getWriteModelList() {
return writeModelList;
}

public Exception getException() {
Expand All @@ -77,8 +79,21 @@ private MongoNamespace createNamespace() {
.orElse(null);
}

private WriteModel<BsonDocument> createWriteModel() {
return config.getCdcHandler().isPresent() ? buildWriteModelCDC() : buildWriteModel();
private List<WriteModel<BsonDocument>> createMultiRowWriteModel() {
return config.getCdcMultiRowHandler().isPresent()
? buildMultiRowWriteModelCDC()
: config.getCdcHandler().isPresent()
? Arrays.asList(buildWriteModelCDC())
: Arrays.asList(buildWriteModel());
}

private List<WriteModel<BsonDocument>> buildMultiRowWriteModelCDC() {
return tryProcess(
() ->
config
.getCdcMultiRowHandler()
.flatMap(cdcHandler -> cdcHandler.handle(sinkDocument)))
.orElse(null);
}

private WriteModel<BsonDocument> buildWriteModel() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ static List<List<MongoProcessedSinkRecordData>> orderedGroupByTopicAndNamespace(
if (processedData.getException() != null) {
errorReporter.report(processedData.getSinkRecord(), processedData.getException());
continue;
} else if (processedData.getNamespace() == null || processedData.getWriteModel() == null) {
} else if (processedData.getNamespace() == null
|| processedData.getWriteModelList() == null) {
// Some CDC events can be Noops (eg tombstone events)
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import com.mongodb.client.model.TimeSeriesGranularity;

import com.mongodb.kafka.connect.sink.cdc.CdcHandler;
import com.mongodb.kafka.connect.sink.cdc.CdcMultiRowHandler;
import com.mongodb.kafka.connect.sink.namespace.mapping.NamespaceMapper;
import com.mongodb.kafka.connect.sink.processor.PostProcessors;
import com.mongodb.kafka.connect.sink.processor.id.strategy.FullKeyStrategy;
Expand Down Expand Up @@ -335,6 +336,8 @@ public String value() {

// Change Data Capture
public static final String CHANGE_DATA_CAPTURE_HANDLER_CONFIG = "change.data.capture.handler";
public static final String CHANGE_DATA_CAPTURE_MULTI_ROW_HANDLER_CONFIG =
"change.data.capture.multi.row.handler";
private static final String CHANGE_DATA_CAPTURE_HANDLER_DISPLAY = "The CDC handler";
private static final String CHANGE_DATA_CAPTURE_HANDLER_DOC =
"The class name of the CDC handler to use for processing";
Expand Down Expand Up @@ -417,7 +420,8 @@ public String value() {
MongoSinkTopicConfig::getWriteModelStrategy,
MongoSinkTopicConfig::getDeleteOneWriteModelStrategy,
MongoSinkTopicConfig::getRateLimitSettings,
MongoSinkTopicConfig::getCdcHandler);
MongoSinkTopicConfig::getCdcHandler,
MongoSinkTopicConfig::getCdcMultiRowHandler);

private final String topic;
private NamespaceMapper namespaceMapper;
Expand All @@ -427,6 +431,7 @@ public String value() {
private WriteModelStrategy deleteOneWriteModelStrategy;
private RateLimitSettings rateLimitSettings;
private CdcHandler cdcHandler;
private CdcMultiRowHandler cdcMultiRowHandler;

MongoSinkTopicConfig(final String topic, final Map<String, String> originals) {
this(topic, originals, true);
Expand Down Expand Up @@ -571,6 +576,27 @@ Optional<CdcHandler> getCdcHandler() {
return Optional.of(this.cdcHandler);
}

Optional<CdcMultiRowHandler> getCdcMultiRowHandler() {
String cdcMultiRowHandler = getString(CHANGE_DATA_CAPTURE_MULTI_ROW_HANDLER_CONFIG);
if (cdcMultiRowHandler.isEmpty()) {
return Optional.empty();
}

if (this.cdcMultiRowHandler == null) {
this.cdcMultiRowHandler =
createInstance(
CHANGE_DATA_CAPTURE_MULTI_ROW_HANDLER_CONFIG,
cdcMultiRowHandler,
CdcMultiRowHandler.class,
() ->
(CdcMultiRowHandler)
Class.forName(cdcMultiRowHandler)
.getConstructor(MongoSinkTopicConfig.class)
.newInstance(this));
}
return Optional.of(this.cdcMultiRowHandler);
}

RateLimitSettings getRateLimitSettings() {
if (rateLimitSettings == null) {
rateLimitSettings =
Expand Down Expand Up @@ -1083,6 +1109,18 @@ private static ConfigDef createConfigDef() {
ConfigDef.Width.MEDIUM,
CHANGE_DATA_CAPTURE_HANDLER_DISPLAY);

configDef.define(
CHANGE_DATA_CAPTURE_MULTI_ROW_HANDLER_CONFIG,
ConfigDef.Type.STRING,
CHANGE_DATA_CAPTURE_HANDLER_DEFAULT,
Validators.emptyString().or(Validators.matching(FULLY_QUALIFIED_CLASS_NAME)),
ConfigDef.Importance.LOW,
CHANGE_DATA_CAPTURE_HANDLER_DOC,
group,
++orderInGroup,
ConfigDef.Width.MEDIUM,
CHANGE_DATA_CAPTURE_HANDLER_DISPLAY);

group = "Time series";
orderInGroup = 0;
configDef.define(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ private void bulkWriteBatch(final List<MongoProcessedSinkRecordData> batch) {

List<WriteModel<BsonDocument>> writeModels =
batch.stream()
.map(MongoProcessedSinkRecordData::getWriteModel)
.map(MongoProcessedSinkRecordData::getWriteModelList)
.flatMap(list -> list.stream())
.collect(Collectors.toList());
boolean bulkWriteOrdered = config.getBoolean(BULK_WRITE_ORDERED_CONFIG);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Original Work: Apache License, Version 2.0, Copyright 2017 Hans-Peter Grahsl.
*/

package com.mongodb.kafka.connect.sink.cdc;

import java.util.List;

import org.bson.BsonDocument;

import com.mongodb.client.model.WriteModel;

import com.mongodb.kafka.connect.sink.converter.SinkDocument;

public interface CdcMultiOperation {

List<WriteModel<BsonDocument>> perform(SinkDocument doc);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Original Work: Apache License, Version 2.0, Copyright 2017 Hans-Peter Grahsl.
*/

package com.mongodb.kafka.connect.sink.cdc;

import java.util.List;
import java.util.Optional;

import org.bson.BsonDocument;

import com.mongodb.client.model.WriteModel;

import com.mongodb.kafka.connect.sink.MongoSinkTopicConfig;
import com.mongodb.kafka.connect.sink.converter.SinkDocument;

public abstract class CdcMultiRowHandler {

private final MongoSinkTopicConfig config;

public CdcMultiRowHandler(final MongoSinkTopicConfig config) {
this.config = config;
}

public MongoSinkTopicConfig getConfig() {
return config;
}

public abstract Optional<List<WriteModel<BsonDocument>>> handle(SinkDocument doc);
}
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ void testRenameIdHandling() {
new MongoProcessedSinkRecordData(sinkRecord, createSinkConfig(topicConfig));
assertNull(processedData.getException());
UpdateOneModel<BsonDocument> writeModel =
(UpdateOneModel<BsonDocument>) processedData.getWriteModel();
(UpdateOneModel<BsonDocument>) processedData.getWriteModelList().get(0);
assertTrue(writeModel.getOptions().isUpsert());
assertEquals(BsonDocument.parse("{'a': 'a', 'b': 'b', '_id': 'c'}"), writeModel.getFilter());

Expand All @@ -251,7 +251,7 @@ void assertWriteModel(
final ReplaceOneModel<BsonDocument> expectedWriteModel) {
assertNull(processedData.getException());
ReplaceOneModel<BsonDocument> writeModel =
(ReplaceOneModel<BsonDocument>) processedData.getWriteModel();
(ReplaceOneModel<BsonDocument>) processedData.getWriteModelList().get(0);
assertEquals(expectedWriteModel.getFilter(), writeModel.getFilter());
assertEquals(expectedWriteModel.getReplacement(), writeModel.getReplacement());
assertEquals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,7 @@
import static com.mongodb.kafka.connect.sink.MongoSinkConfig.TOPICS_REGEX_CONFIG;
import static com.mongodb.kafka.connect.sink.MongoSinkConfig.TOPIC_OVERRIDE_CONFIG;
import static com.mongodb.kafka.connect.sink.MongoSinkConfig.createOverrideKey;
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.CHANGE_DATA_CAPTURE_HANDLER_CONFIG;
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.COLLECTION_CONFIG;
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.DATABASE_CONFIG;
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.DOCUMENT_ID_STRATEGY_CONFIG;
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.FIELD_RENAMER_MAPPING_CONFIG;
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.FIELD_RENAMER_REGEXP_CONFIG;
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.KEY_PROJECTION_LIST_CONFIG;
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.KEY_PROJECTION_TYPE_CONFIG;
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.NAMESPACE_MAPPER_CONFIG;
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.POST_PROCESSOR_CHAIN_CONFIG;
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.TIMESERIES_EXPIRE_AFTER_SECONDS_CONFIG;
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.TIMESERIES_GRANULARITY_CONFIG;
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.TIMESERIES_METAFIELD_CONFIG;
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.TIMESERIES_TIMEFIELD_AUTO_CONVERSION_CONFIG;
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.TIMESERIES_TIMEFIELD_AUTO_CONVERSION_DATE_FORMAT_CONFIG;
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.TIMESERIES_TIMEFIELD_CONFIG;
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.TOPIC_OVERRIDE_PREFIX;
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.VALUE_PROJECTION_LIST_CONFIG;
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.VALUE_PROJECTION_TYPE_CONFIG;
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.WRITEMODEL_STRATEGY_CONFIG;
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.*;
import static com.mongodb.kafka.connect.sink.SinkConfigSoftValidator.OBSOLETE_CONFIGS;
import static com.mongodb.kafka.connect.sink.SinkTestHelper.CLIENT_URI_AUTH_SETTINGS;
import static com.mongodb.kafka.connect.sink.SinkTestHelper.CLIENT_URI_DEFAULT_SETTINGS;
Expand Down Expand Up @@ -72,6 +53,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand All @@ -83,10 +65,16 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestFactory;

import org.bson.BsonDocument;

import com.mongodb.client.model.WriteModel;

import com.mongodb.kafka.connect.sink.cdc.CdcMultiRowHandler;
import com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandler;
import com.mongodb.kafka.connect.sink.cdc.debezium.rdbms.RdbmsHandler;
import com.mongodb.kafka.connect.sink.cdc.debezium.rdbms.mysql.MysqlHandler;
import com.mongodb.kafka.connect.sink.cdc.debezium.rdbms.postgres.PostgresHandler;
import com.mongodb.kafka.connect.sink.converter.SinkDocument;
import com.mongodb.kafka.connect.sink.namespace.mapping.DefaultNamespaceMapper;
import com.mongodb.kafka.connect.sink.processor.BlacklistValueProjector;
import com.mongodb.kafka.connect.sink.processor.DocumentIdAdder;
Expand Down Expand Up @@ -515,6 +503,47 @@ Collection<DynamicTest> testValidChangeDataCaptureHandlerNames() {
return tests;
}

public static class TestMultiRowHandler extends CdcMultiRowHandler {
public TestMultiRowHandler() {
super(null);
}

public TestMultiRowHandler(MongoSinkTopicConfig config) {
super(config);
}

@Override
public Optional<List<WriteModel<BsonDocument>>> handle(SinkDocument doc) {
return Optional.empty();
}
}

@TestFactory
@DisplayName("test valid change data capture multi row handler names")
Collection<DynamicTest> testValidChangeDataCaptureMultiRowHandlerNames() {
List<DynamicTest> tests = new ArrayList<>();
String json = "{'%s': '%s'}";
List<String> cdcHandlers = asList(TestMultiRowHandler.class.getName());
cdcHandlers.forEach(
s ->
tests.add(
dynamicTest(
"cdc multi row Handler for " + s,
() -> {
MongoSinkConfig cfg =
createSinkConfig(
format(json, CHANGE_DATA_CAPTURE_MULTI_ROW_HANDLER_CONFIG, s));
assertEquals(
cfg.getMongoSinkTopicConfig(TEST_TOPIC)
.getCdcMultiRowHandler()
.get()
.getClass()
.getName(),
s);
})));
return tests;
}

@Test
@DisplayName("test invalid change data capture handler names")
void testInvalidChangeDataCaptureHandlerNames() {
Expand Down