Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer doesn't consume after onLost #1288

Closed
ya-at opened this issue Jul 19, 2024 · 9 comments · Fixed by #1350
Closed

Consumer doesn't consume after onLost #1288

ya-at opened this issue Jul 19, 2024 · 9 comments · Fixed by #1350

Comments

@ya-at
Copy link

ya-at commented Jul 19, 2024

When the broker is down, the consumer loses connection to the broker, and tries to reconnect, then onLost happens and after that runloop will never call poll(), so there are no new events. Is it a desired behavior? If yes, then how to restart consumer when onLost happens? (Last event was consumed at 20:08). Also in the application there are two consumer groups and they read the same topic (in parallel); one fails (onLost happens), one continues to work (onLost doesn't happen, since it's connected to a broker that doesn't go down).

Related #1250. Version: 2.8.0.

Logs (the first message — the newest)
2024-07-19 20:10:37.441	zio-kafka-runloop-thread-3   zio.kafka.consumer.internal.Runloop   Offering partition assignment Set()   location: Runloop.scala:523
2024-07-19 20:10:37.438	zio-kafka-runloop-thread-3   zio.kafka.consumer.internal.Runloop.makeRebalanceListener   onLost done   location: Runloop.scala:240
2024-07-19 20:10:37.433	zio-kafka-runloop-thread-3   zio.kafka.consumer.internal.PartitionStreamControl   Partition sample-topic-3 lost   location: PartitionStreamControl.scala:98 partition: 3 topic: sample-topic
2024-07-19 20:10:37.428	zio-kafka-runloop-thread-3   zio.kafka.consumer.internal.Runloop.makeRebalanceListener   1 partitions are lost   location: Runloop.scala:234
2024-07-19 20:10:37.375	zio-kafka-runloop-thread-3   zio.kafka.consumer.internal.Runloop   Starting poll with 1 pending requests and 0 pending commits, resuming Set(sample-topic-3) partitions   location: Runloop.scala:466
2024-07-19 20:10:37.375	zio-kafka-runloop-thread-3   zio.kafka.consumer.internal.Runloop   Processing 0 commits, 1 commands: Poll   location: Runloop.scala:733
2024-07-19 20:10:37.325	zio-kafka-runloop-thread-3   zio.kafka.consumer.internal.Runloop   Starting poll with 1 pending requests and 0 pending commits, resuming Set(sample-topic-3) partitions   location: Runloop.scala:466
2024-07-19 20:10:37.325	zio-kafka-runloop-thread-3   zio.kafka.consumer.internal.Runloop   Processing 0 commits, 1 commands: Poll   location: Runloop.scala:733
2024-07-19 20:10:37.275	zio-kafka-runloop-thread-3   zio.kafka.consumer.internal.Runloop   Starting poll with 1 pending requests and 0 pending commits, resuming Set(sample-topic-3) partitions   location: Runloop.scala:466
2024-07-19 20:10:37.275	zio-kafka-runloop-thread-3   zio.kafka.consumer.internal.Runloop   Processing 0 commits, 1 commands: Poll   location: Runloop.scala:733
2024-07-19 20:10:37.225	zio-kafka-runloop-thread-3   zio.kafka.consumer.internal.Runloop   Starting poll with 1 pending requests and 0 pending commits, resuming Set(sample-topic-3) partitions   location: Runloop.scala:466
2024-07-19 20:10:37.225	zio-kafka-runloop-thread-3   zio.kafka.consumer.internal.Runloop   Processing 0 commits, 1 commands: Poll   location: Runloop.scala:733
2024-07-19 20:10:37.174	zio-kafka-runloop-thread-3   zio.kafka.consumer.internal.Runloop   Starting poll with 1 pending requests and 0 pending commits, resuming Set(sample-topic-3) partitions   location: Runloop.scala:466
2024-07-19 20:10:37.174	zio-kafka-runloop-thread-3   zio.kafka.consumer.internal.Runloop   Processing 0 commits, 1 commands: Poll   location: Runloop.scala:733
2024-07-19 20:10:37.124	zio-kafka-runloop-thread-3   zio.kafka.consumer.internal.Runloop   Starting poll with 1 pending requests and 0 pending commits, resuming Set(sample-topic-3) partitions   location: Runloop.scala:466
2024-07-19 20:10:37.124	zio-kafka-runloop-thread-3   zio.kafka.consumer.internal.Runloop   Processing 0 commits, 1 commands: Poll   location: Runloop.scala:733
@ya-at ya-at changed the title Consumer doesn't consumer after onLost Consumer doesn't consume after onLost Jul 19, 2024
@erikvanoosten
Copy link
Collaborator

erikvanoosten commented Jul 20, 2024

Hello @ya-at. Thanks for your detailed bug report.
I think we can release 2.8.1 and then (due to #1252) a lost partition is no longer considered fatal.

@erikvanoosten
Copy link
Collaborator

erikvanoosten commented Jul 20, 2024

For now, your options are:

  • restart the consumer by re-subscribing
  • create a new consumer
    - downgrade zio-kafka to 2.6.0.
  • restart the application

@erikvanoosten
Copy link
Collaborator

Correction: #1252 is already part of zio-kafka 2.8.0, so something else is going on.

@erikvanoosten
Copy link
Collaborator

The newest log line (first line) indicates that no partitions (Set()) are assigned to this consumer. That should not cause polling to stop! (See shouldPoll, subscriptionState.isSubscribed and assignedStreams.isEmpty should be true in this case.)

Can you check the java consumer configurations?

@ya-at
Copy link
Author

ya-at commented Jul 20, 2024

Can you check the java consumer configurations?

The settings are almost default. Things we changed are client.id, group.id, metrics.reporter (and these options are passed through ConsumerSettings). I don't think it's because of metrics.reporter, since every consumer has this property.

@josdirksen
Copy link

We're seeing the same thing again as well. In the 2.7.5 version where the onLost triggered a failure, everything was working. After this change: #1252, we experienced the same issue again as we had before the #1251 fix.

What we often see (especially in the case of a low number of instances) is that all instance lose the partitions at the same time. Everything then stops processing, and no rebalances are triggered. We're moving back to the 2.7.5 version, since that has well defined behaviour.

@svroonland
Copy link
Collaborator

svroonland commented Oct 29, 2024

def shouldPoll = subscriptionState.isSubscribed && (pendingRequests.nonEmpty || pendingCommits.nonEmpty || assignedStreams.isEmpty)

What if assignedStreams is not empty after all partitions are lost? There would be no more pending requests and commits, but still assigned streams so shouldPoll becomes false.

Are we sure we are clearing the lost partitions from assignedStreams? Looking at RebalanceEvent.onLost, we're not filling endedStreams, which would mean assignedStreams is not cleared of the lost partitions.

WDYT @erikvanoosten

@erikvanoosten
Copy link
Collaborator

Are we sure we are clearing the lost partitions from assignedStreams? Looking at RebalanceEvent.onLost, we're not filling endedStreams, which would mean assignedStreams is not cleared of the lost partitions.

Yes, that sounds extremely plausible! Good find!

svroonland added a commit that referenced this issue Oct 29, 2024
Fixes #1288. See also #1233 and #1250.

When all partitions are lost after some connection issue to the broker,
the streams for lost partitions are ended but polling stops, due to the
conditions in `Runloop.State#shouldPoll`. This PR fixes this by removing
the lost partition streams from the `assignedStreams` in the state,
thereby not disabling polling.

Also adds a warning that is logged whenever the assigned partitions
(according to the apache kafka consumer) are different from the assigned
streams, which helps to identify other issues or any future regressions
of this issue.

~Still needs a good test, the `MockConsumer` used in other tests
unfortunately does not allow simulating lost partitions, and the exact
behavior of the kafka client in this situation is hard to predict..~
Includes a test that fails when undoing the change to Runloop
@svroonland
Copy link
Collaborator

The issue of no more polling after all partitions were lost is (very likely, assuming our reproduction is fully representative of the issue) fixed in v2.8.3.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants