From 26a8c2da3f54e2cec31070519dbec6f991df9368 Mon Sep 17 00:00:00 2001 From: Andrew Belu Date: Thu, 14 Mar 2024 23:59:38 -0400 Subject: [PATCH] Adding DeleteExactKeyStrategy Allows for the Mongo filter opreation to be set to the exact key coming from the key doc rather than having a superfluous _id key. --- .../strategy/DeleteExactKeyStrategy.java | 44 +++++++++++++++++++ .../connect/sink/MongoSinkConfigTest.java | 3 ++ .../strategy/WriteModelStrategyTest.java | 28 +++++++++++- 3 files changed, 73 insertions(+), 2 deletions(-) create mode 100644 src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/DeleteExactKeyStrategy.java diff --git a/src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/DeleteExactKeyStrategy.java b/src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/DeleteExactKeyStrategy.java new file mode 100644 index 00000000..48698cd4 --- /dev/null +++ b/src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/DeleteExactKeyStrategy.java @@ -0,0 +1,44 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Original Work: Apache License, Version 2.0, Copyright 2017 Hans-Peter Grahsl. + */ + +package com.mongodb.kafka.connect.sink.writemodel.strategy; + +import org.apache.kafka.connect.errors.DataException; + +import org.bson.BsonDocument; + +import com.mongodb.client.model.DeleteOneModel; +import com.mongodb.client.model.WriteModel; + +import com.mongodb.kafka.connect.sink.converter.SinkDocument; + +public class DeleteExactKeyStrategy implements WriteModelStrategy { + + @Override + public WriteModel createWriteModel(final SinkDocument document) { + BsonDocument kd = + document + .getKeyDoc() + .orElseThrow( + () -> + new DataException( + "Could not build the WriteModel,the key document was missing unexpectedly")); + + return new DeleteOneModel<>(kd); + } +} diff --git a/src/test/java/com/mongodb/kafka/connect/sink/MongoSinkConfigTest.java b/src/test/java/com/mongodb/kafka/connect/sink/MongoSinkConfigTest.java index cb3e20f0..dd0c7d21 100644 --- a/src/test/java/com/mongodb/kafka/connect/sink/MongoSinkConfigTest.java +++ b/src/test/java/com/mongodb/kafka/connect/sink/MongoSinkConfigTest.java @@ -106,6 +106,7 @@ import com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy; import com.mongodb.kafka.connect.sink.writemodel.strategy.CustomDeleteWriteModelStrategy; import com.mongodb.kafka.connect.sink.writemodel.strategy.DefaultWriteModelStrategy; +import com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteExactKeyStrategy; import com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneBusinessKeyStrategy; import com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneDefaultStrategy; import com.mongodb.kafka.connect.sink.writemodel.strategy.InsertOneDefaultStrategy; @@ -712,6 +713,7 @@ Collection testGetSingleValidWriteModelStrategy() { UpdateOneBusinessKeyTimestampStrategy.class.getName(), UpdateOneBusinessKeyTimestampStrategy.class); put(DeleteOneBusinessKeyStrategy.class.getName(), DeleteOneBusinessKeyStrategy.class); + put(DeleteExactKeyStrategy.class.getName(), DeleteExactKeyStrategy.class); } }; @@ -796,6 +798,7 @@ Collection testGetSingleValidDeleteWriteModelStrategy() { UpdateOneBusinessKeyTimestampStrategy.class.getName(), UpdateOneBusinessKeyTimestampStrategy.class); put(DeleteOneBusinessKeyStrategy.class.getName(), DeleteOneBusinessKeyStrategy.class); + put(DeleteExactKeyStrategy.class.getName(), DeleteExactKeyStrategy.class); } }; diff --git a/src/test/java/com/mongodb/kafka/connect/sink/writemodel/strategy/WriteModelStrategyTest.java b/src/test/java/com/mongodb/kafka/connect/sink/writemodel/strategy/WriteModelStrategyTest.java index ec48d913..dfb1b071 100644 --- a/src/test/java/com/mongodb/kafka/connect/sink/writemodel/strategy/WriteModelStrategyTest.java +++ b/src/test/java/com/mongodb/kafka/connect/sink/writemodel/strategy/WriteModelStrategyTest.java @@ -68,6 +68,8 @@ class WriteModelStrategyTest { private static final DeleteOneBusinessKeyStrategy DELETE_ONE_BUSINESS_KEY_STRATEGY = new DeleteOneBusinessKeyStrategy(); private static final DeleteOneBusinessKeyStrategy DELETE_ONE_BUSINESS_KEY_PARTIAL_STRATEGY; + private static final DeleteExactKeyStrategy DELETE_EXACT_KEY_STRATEGY = + new DeleteExactKeyStrategy(); private static final SinkDocument SINK_DOCUMENT_NULL_VALUE = new SinkDocument(new BsonDocument(), null); private static final SinkDocument SINK_DOCUMENT_NULL_KEY = @@ -403,6 +405,25 @@ void testDeleteOneBusinessKeyStrategyStrategyPartialWithValidSinkDocument() { assertEquals(BUSINESS_KEY_FLATTENED_FILTER, writeModel.getFilter()); } + @Test + @DisplayName("when sink document is valid for DeleteExactKeyStrategy then correct DeleteOneModel") + void testDeleteExactKeyStrategyWitValidSinkDocument() { + BsonDocument keyDoc = BsonDocument.parse("{id: 1234, id2: 4321}"); + + WriteModel result = + DELETE_EXACT_KEY_STRATEGY.createWriteModel(new SinkDocument(keyDoc, null)); + + assertTrue(result instanceof DeleteOneModel, "result expected to be of type DeleteOneModel"); + + DeleteOneModel writeModel = (DeleteOneModel) result; + + assertTrue( + writeModel.getFilter() instanceof BsonDocument, + "filter expected to be of type BsonDocument"); + + assertEquals(BsonDocument.parse("{id: 1234, id2: 4321}"), writeModel.getFilter()); + } + @Test @DisplayName("Test handling empty or missing sink document data") void testIEmptyOrMissingSinkDocumentData() { @@ -493,7 +514,10 @@ void testIEmptyOrMissingSinkDocumentData() { assertThrows( DataException.class, () -> - DELETE_ONE_BUSINESS_KEY_PARTIAL_STRATEGY.createWriteModel( - SINK_DOCUMENT_EMPTY))); + DELETE_ONE_BUSINESS_KEY_PARTIAL_STRATEGY.createWriteModel(SINK_DOCUMENT_EMPTY)), + () -> + assertThrows( + DataException.class, + () -> DELETE_EXACT_KEY_STRATEGY.createWriteModel(SINK_DOCUMENT_NULL_KEY))); } }