Skip to content

Commit

Permalink
Fix race condition where the client will wait for acks, but all messa…
Browse files Browse the repository at this point in the history
…ges will be sent (#59)

* fix race condition where the client will wait for acks, but all messages will be sent

Signed-off-by: Lukas Kral <[email protected]>

* Maros' comment

Signed-off-by: Lukas Kral <[email protected]>

---------

Signed-off-by: Lukas Kral <[email protected]>
  • Loading branch information
im-konge committed Mar 24, 2023
1 parent d25a704 commit c381f7c
Showing 1 changed file with 2 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ public void sendMessages() {
List<ProducerRecord> records = configuration.getDelayMs() == 0 ? generateMessages() : Collections.singletonList(generateMessage(messageIndex));

int currentMsgIndex = configuration.getDelayMs() == 0 ? 0 : messageIndex;
messageIndex += records.size();

for (ProducerRecord record : records) {

Expand All @@ -139,6 +138,8 @@ public void sendMessages() {
messageSuccessfullySent++;
} catch (Exception e) {
LOGGER.error("Failed to send messages: {} due to: \n{}", record.toString(), e.getMessage());
} finally {
messageIndex++;
}

if (configuration.isTransactionalProducer() && (currentMsgIndex + 1) % configuration.getMessagesPerTransaction() == 0) {
Expand Down

0 comments on commit c381f7c

Please sign in to comment.