diff --git a/src/main/java/com/solace/connector/kafka/connect/source/SolaceSourceTask.java b/src/main/java/com/solace/connector/kafka/connect/source/SolaceSourceTask.java index 13b2432..ece8571 100644 --- a/src/main/java/com/solace/connector/kafka/connect/source/SolaceSourceTask.java +++ b/src/main/java/com/solace/connector/kafka/connect/source/SolaceSourceTask.java @@ -23,6 +23,7 @@ import com.solacesystems.jcsmp.DeliveryMode; import com.solacesystems.jcsmp.JCSMPException; import com.solacesystems.jcsmp.JCSMPSession; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; @@ -34,6 +35,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; @@ -45,8 +47,10 @@ public class SolaceSourceTask extends SourceTask { // implements XMLMessageListe private SolSessionHandler solSessionHandler = null; BlockingQueue ingressMessages = new LinkedBlockingQueue<>(); // LinkedBlockingQueue for any incoming message from PS+ topics and queue - BlockingQueue outstandingAckList - = new LinkedBlockingQueue<>(); // LinkedBlockingQueue for Solace Flow messages + Map pendingAcks = new ConcurrentHashMap<>(); // Pending acks for solace messages + Map sourceRecordAcks = new ConcurrentHashMap<>(); // Map record to solace message id + Map sourceRecordsLeftInAck = new ConcurrentHashMap<>(); // Number of created record from solace message + String skafkaTopic; SolaceSourceTopicListener topicListener = null; SolaceSourceQueueConsumer queueConsumer = null; @@ -132,37 +136,55 @@ public synchronized List poll() throws InterruptedException { } catch (Exception e) { if (connectorConfig.getBoolean(SolaceSourceConstants.SOL_MESSAGE_PROCESSOR_IGNORE_ERROR)) { log.warn("================ Encountered exception in message processing....discarded.", e); - scheduleForAck(msg); + msg.ackMessage(); // Effective discard solace message discarded++; continue; } else { throw new ConnectException("Encountered exception in message processing", e); } } - Collections.addAll(records, processor.getRecords(skafkaTopic)); - scheduleForAck(msg); + SourceRecord[] processorRecords = processor.getRecords(skafkaTopic); + Collections.addAll(records, processorRecords); + scheduleForAck(msg, processorRecords); } log.debug("Processed {} records in this batch. Discarded {}", processedInThisBatch - discarded, discarded); return records; } - private synchronized void scheduleForAck(BytesXMLMessage msg) { + private synchronized void scheduleForAck(BytesXMLMessage msg, SourceRecord[] processorRecords) { if (msg.getDeliveryMode() == DeliveryMode.NON_PERSISTENT || msg.getDeliveryMode() == DeliveryMode.PERSISTENT) { - outstandingAckList.add(msg); // enqueue messages received from guaranteed messaging endpoint for later ack + sourceRecordsLeftInAck.put(msg.getAckMessageId(), processorRecords.length); // Set number of created records per solace message + for (SourceRecord processorRecord : processorRecords) { + sourceRecordAcks.put(processorRecord, msg.getAckMessageId()); // Map each record to solace message id + } + pendingAcks.put(msg.getAckMessageId(), msg); // enqueue messages received from guaranteed messaging endpoint for later ack } } - /** - * Kafka Connect method that write records to disk. - */ - public synchronized void commit() throws InterruptedException { - log.trace("Committing records"); - int currentLoad = outstandingAckList.size(); - int count = 0; - while (count != currentLoad) { - outstandingAckList.take().ackMessage(); - count++; + @Override + public synchronized void commitRecord(SourceRecord record, RecordMetadata metadata) { + Long ackMessageId = sourceRecordAcks.remove(record); + if (ackMessageId == null) { + return; + } + + if (!sourceRecordsLeftInAck.containsKey(ackMessageId)) { + log.error("Unable to find message counter for {}", ackMessageId); // Shouldn't happens + } + + sourceRecordsLeftInAck.computeIfPresent(ackMessageId, (k, o) -> o > 1 ? --o : null);// Reduce counter of records per message, remove on last + + if (!sourceRecordsLeftInAck.containsKey(ackMessageId)) {// Last record was commited in the group + BytesXMLMessage msg = pendingAcks.remove(ackMessageId); + + if (msg != null) { + // Acknowledge the message since all records were commited in kafka + msg.ackMessage(); + log.debug("Acknowledged message for messageId {}", ackMessageId); + } else { + log.error("Message for messageId {} was not found in pending ack map.", ackMessageId); + } } } @@ -183,6 +205,13 @@ public synchronized void stop() { } solSessionHandler = null; // At this point filling the ingress queue is stopped ingressMessages.clear(); // Remove all remaining ingressed messages, these will be no longer imported to Kafka + sourceRecordAcks.clear(); + sourceRecordsLeftInAck.clear(); + if (!pendingAcks.isEmpty()) { + log.warn("Potential duplicates might be spotted"); + pendingAcks.forEach((s, m) -> log.warn("CorrelationId: {}", m.getCorrelationId())); + pendingAcks.clear(); + } log.info("PubSub+ Source Connector stopped"); }