From c381f7caa7acad3c26ab7c08fa3cb6f3d1012f16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luk=C3=A1=C5=A1=20Kr=C3=A1l?= <53821852+im-konge@users.noreply.github.com> Date: Thu, 23 Mar 2023 20:26:48 +0100 Subject: [PATCH] Fix race condition where the client will wait for acks, but all messages will be sent (#59) * fix race condition where the client will wait for acks, but all messages will be sent Signed-off-by: Lukas Kral * Maros' comment Signed-off-by: Lukas Kral --------- Signed-off-by: Lukas Kral --- .../src/main/java/io/strimzi/kafka/KafkaProducerClient.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/io/strimzi/kafka/KafkaProducerClient.java b/clients/src/main/java/io/strimzi/kafka/KafkaProducerClient.java index 0138ba1..bded095 100644 --- a/clients/src/main/java/io/strimzi/kafka/KafkaProducerClient.java +++ b/clients/src/main/java/io/strimzi/kafka/KafkaProducerClient.java @@ -124,7 +124,6 @@ public void sendMessages() { List records = configuration.getDelayMs() == 0 ? generateMessages() : Collections.singletonList(generateMessage(messageIndex)); int currentMsgIndex = configuration.getDelayMs() == 0 ? 0 : messageIndex; - messageIndex += records.size(); for (ProducerRecord record : records) { @@ -139,6 +138,8 @@ public void sendMessages() { messageSuccessfullySent++; } catch (Exception e) { LOGGER.error("Failed to send messages: {} due to: \n{}", record.toString(), e.getMessage()); + } finally { + messageIndex++; } if (configuration.isTransactionalProducer() && (currentMsgIndex + 1) % configuration.getMessagesPerTransaction() == 0) {