diff --git a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/producer/KafkaPublisher.java b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/producer/KafkaPublisher.java index 007e749..ffd7253 100644 --- a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/producer/KafkaPublisher.java +++ b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/producer/KafkaPublisher.java @@ -28,7 +28,6 @@ import org.axonframework.lifecycle.Phase; import org.axonframework.messaging.EventPublicationFailedException; import org.axonframework.messaging.unitofwork.CurrentUnitOfWork; -import org.axonframework.messaging.unitofwork.UnitOfWork; import org.axonframework.monitoring.MessageMonitor; import org.axonframework.monitoring.MessageMonitor.MonitorCallback; import org.axonframework.monitoring.NoOpMessageMonitor; @@ -137,7 +136,6 @@ public > void send(T event) { logger.debug("Skip publishing event for [{}] since topicFunction returned empty.", event.getPayloadType()); return; } - UnitOfWork uow = CurrentUnitOfWork.get(); MonitorCallback monitorCallback = messageMonitor.onMessageIngested(event); Producer producer = producerFactory.createProducer(); @@ -150,21 +148,20 @@ public > void send(T event) { // Sends event messages to Kafka and receive a future indicating the status. Future publishStatus = producer.send(messageConverter.createKafkaMessage(event, topic.get())); - uow.onPrepareCommit(u -> { - if (confirmationMode.isTransactional()) { - tryCommit(producer, monitorCallback); - } else if (confirmationMode.isWaitForAck()) { - waitForPublishAck(publishStatus, monitorCallback); - } - tryClose(producer); - }); - - uow.onRollback(u -> { + CurrentUnitOfWork.get().onRollback(u -> { + //provides a way to prevent duplicate messages on Kafka, in case there is a problem with the token store if (confirmationMode.isTransactional()) { tryRollback(producer); } tryClose(producer); }); + + if (confirmationMode.isTransactional()) { + tryCommit(producer, monitorCallback); + } else if (confirmationMode.isWaitForAck()) { + waitForPublishAck(publishStatus, monitorCallback); + } + tryClose(producer); } private void tryBeginTxn(Producer producer) {