diff --git a/build.gradle b/build.gradle index 5d432b4..012e3df 100644 --- a/build.gradle +++ b/build.gradle @@ -11,7 +11,7 @@ plugins { } group = 'io.qdrant' -version = '1.1.2' +version = '1.2.0' description = 'Kafka Sink Connector for Qdrant.' java.sourceCompatibility = JavaVersion.VERSION_1_8 java.targetCompatibility = JavaVersion.VERSION_1_8 diff --git a/src/main/java/io/qdrant/kafka/QdrantSinkTask.java b/src/main/java/io/qdrant/kafka/QdrantSinkTask.java index 5453780..331956e 100644 --- a/src/main/java/io/qdrant/kafka/QdrantSinkTask.java +++ b/src/main/java/io/qdrant/kafka/QdrantSinkTask.java @@ -1,9 +1,12 @@ package io.qdrant.kafka; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.protobuf.InvalidProtocolBufferException; import io.qdrant.client.grpc.Points.PointStruct; import java.util.*; import java.util.concurrent.ExecutionException; import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.sink.ErrantRecordReporter; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; import org.slf4j.Logger; @@ -14,10 +17,11 @@ public class QdrantSinkTask extends SinkTask { private static final Logger log = LoggerFactory.getLogger(QdrantSinkTask.class); private QdrantSinkConfig config; private QdrantGrpc qdrantGrpc; + private ErrantRecordReporter reporter; @Override public String version() { - return "1.0.0"; + return "1.2.0"; } @Override @@ -28,6 +32,10 @@ public void start(Map props) { protected void start(Map props, QdrantGrpc qdrantGrpc) { this.config = new QdrantSinkConfig(props); this.qdrantGrpc = qdrantGrpc == null ? new QdrantGrpc(config) : qdrantGrpc; + this.reporter = context.errantRecordReporter(); + if (reporter == null) { + log.warn("Errant record reporter is not configured."); + } log.info("Starting QdrantSinkTask at " + config.getGrpcUrl()); } @@ -36,34 +44,41 @@ public void put(Collection records) { if (records.isEmpty()) { return; } + Map> pointsWithRecords = new HashMap<>(); - try { - - Map> points = new HashMap<>(); - for (SinkRecord record : records) { + for (SinkRecord record : records) { + try { if (record.value() == null) { log.warn("Record value is null. Skipping."); continue; } ValueExtractor e = new ValueExtractor(record.value()); e.validateOptions(); - - points - .computeIfAbsent(e.getCollectionName(), k -> new ArrayList<>()) - .add(e.getPointStruct()); + pointsWithRecords + .computeIfAbsent(e.getCollectionName(), k -> new HashMap<>()) + .put(e.getPointStruct(), record); + } catch (InvalidProtocolBufferException | JsonProcessingException e) { + if (reporter == null) throw new DataException("Invalid sink record", e); + reporter.report(record, e); } - - points.forEach( - (collectionName, pointsList) -> { - try { - qdrantGrpc.upsert(collectionName, pointsList, null); - } catch (InterruptedException | ExecutionException e) { - throw new DataException("Qdrant server exception.", e); - } - }); - } catch (Exception e) { - throw new DataException("Failed to put record.", e); } + + pointsWithRecords.forEach( + (collectionName, pointsMap) -> { + List pointsList = new ArrayList<>(pointsMap.keySet()); + try { + qdrantGrpc.upsert(collectionName, pointsList, null); + } catch (InterruptedException | ExecutionException e) { + pointsMap + .values() + .forEach( + record -> { + if (reporter == null) + throw new DataException("Qdrant server exception during upsert.", e); + reporter.report(record, e); + }); + } + }); } @Override