-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-18056: Fixed bug in handling commitAsync responses #17909
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR.
@@ -421,6 +421,10 @@ public void onComplete(Map<TopicIdPartition, Set<Long>> offsetsMap, Exception ex | |||
Set<Long> mergedOffsets = new HashSet<>(); | |||
mergedOffsets.addAll(oldOffsets); | |||
mergedOffsets.addAll(newOffsets); | |||
if (mergedOffsets.size() < (oldOffsets.size() + newOffsets.size())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is quite a dangerous thing to leave in here. In principle, a test could rely upon repeated delivery of a record (timeout for example) which could actually call the callback twice. I'd remove this because I predict it will be the source of a flaky test later on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah right makes sense, sometimes genuinely we can have the callback called twice. I have removed it now, thanks.
...ts/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ShivsundarR Thanks for the PR. LGTM
What
There was a bug in handling the
ShareAcknowledgeResponse
forcommitAsync()
. Currently after we receive a response, we send out a background event to the application thread to update the acknowledgement commit callbacks for EVERYTopicIdPartition
.The map that was sent was not cleared after sending the event. This meant we ended up sending responses for partitions that were already sent in the previous event. So there will be duplicate calls to the callback.
The PR fixes the bug and adds a unit test for the same.