Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always end streams in rebalance listener, support lost partitions #1089

Merged
merged 14 commits into from
Nov 4, 2023

Conversation

erikvanoosten
Copy link
Collaborator

@erikvanoosten erikvanoosten commented Oct 27, 2023

Previously, streams would only be ended in the rebalance listener for revoked partitions. Now, they are ended there even when restartStreamsOnRebalancing is used.

Lost partitions are no longer treated as being revoked. With this change, streams of lost partitions are interrupted. Interrupting them prevents these streams from processing and committing more data.

A nice side effect is that Zio-kafka is now faster when the rebalance listener was not called; the 'fast track'.

The main reason for this change is to prepare awaiting commits from within the rebalance listener which will prevent duplicate consuming of records (see #1098).

Also: fix test restartStreamsOnRebalancing mode closes all partition streams so that it detects rebalances properly on fast computers.

@erikvanoosten erikvanoosten requested review from svroonland, vigoo and guizmaii and removed request for vigoo October 27, 2023 07:39
Copy link
Collaborator

@svroonland svroonland left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, looks familiar from #591 of course. Good to have this in a separate PR.

@erikvanoosten erikvanoosten force-pushed the rebalance-event branch 2 times, most recently from 1db802e to 1ad29ef Compare October 28, 2023 08:51
Previously, streams would only be ended in the rebalance listener for revoked partitions. Now, they are ended there even when `restartStreamsOnRebalancing` is used.

Lost partitions are no longer treated as being revoked. With this change, streams of lost partitions are interrupted. Interrupting them prevents these streams from processing and committing more data.

A nice side effect is that Zio-kafka is now faster when the rebalance listener was _not_ called; the 'fast track'.

The main reason for this change is to prepare awaiting commits from within the rebalance listener which will prevent duplicate consuming of records (see #830).

Also: fix test `restartStreamsOnRebalancing mode closes all partition streams` so that it detects rebalances properly on fast computers.
In test `restartStreamsOnRebalancing mode closes all partition streams` consumer 1 is expected to receive at least 1 message. However, consumer 2 might grab all them through pre-fetching. Fix this by disabling pre-fetching for consumer 2.
@erikvanoosten
Copy link
Collaborator Author

erikvanoosten commented Nov 1, 2023

@svroonland @guizmaii For many days I have been trying to patch up the test that fails on Github action runner but passes on my laptop: restartStreamsOnRebalancing mode closes all partition streams. Yesterday I was fed up and decided to take a better look at what the test does and why it could fail. I discovered some interesting things:

  • The test was complete rubbish; it didn't test at all what it was supposed to test.
  • The test was completely doomed; it is impossible to observe (black box test) the restartStreamsOnRebalance feature because (as far as I can tell) the default partition assignor externally behaves the same as the restartStreamsOnRebalance feature.

(Did you notice I like drama? 😉) Anyways, I wrote a new test that actually tests whether streams are restarted.

But now I really wonder if we have more tests like this one...

Anyways, this PR could use some review love. Thanks in advance!

Copy link
Collaborator

@svroonland svroonland left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work! A few thinks to consider

@erikvanoosten erikvanoosten merged commit 21361c1 into master Nov 4, 2023
13 checks passed
@erikvanoosten erikvanoosten deleted the rebalance-event branch November 4, 2023 13:55
erikvanoosten added a commit that referenced this pull request Nov 5, 2023
Fixes #590 "Many records duplicately processed after rebalancing"

In this change we introduce a new mode that holds up a rebalance until all messages that were provided to the stream of a revoked partition, have been committed.

### Motivation

Here is a common (single partition) scenario around rebalances:

1. a consumer polls some messages and puts them in the streams (let's say messages with offsets 0 to 100)
1. asynchronously, the user processes these messages. Some of them are committed (let's say up to offset 50), the rest is still being processed when...
1. a rebalance happens, the partition is revoked and assigned to another consumer on another instance
1. the consumer continues to process the remaining messages with offsets 50 to 100, and tries to commit those offsets
1. _at the same time,_ another consumer on another instance, starts consuming from the last committed offset (which is 50) and will process the same messages with offsets 50 to 100

Messages with offsets 50 to 100 are being processed by both consumers simultaneously. Note that both consumers will try to commit these offsets. Until the first consumer is ready, the stored offsets can go up and down and are therefore unreliable.

After merging this change, the scenario will unfold as follows:

1. a consumer polls some messages and puts them in the streams (let's say messages with offsets 0 to 100). Zio-kafka keeps track of the highest provided offset
1. asynchronously, the user processes these messages. Some of them are committed (let's say up to offset 50), the rest is still being processed when...
1. a rebalance happens, the partition is revoked and assigned to another consumer on another instance
   * the consumer continues to process the remaining messages with offsets 50 to 100, and tries to commit those offsets
   * inside the onRevoked callback, zio-kafka continues to process commit commands from the user
   * zio-kafka continues to do so until the commit with the highest provided offset (offset 100) completes
   * the onRevoked callback completes, signalling to Kafka that the next consumer may start consuming from the partition
1. another consumer on another instance, starts consuming from the last committed offset (which is now 100, problem solved!)

### Commit queue

Because both the main runloop, and the rebalance listener need to process (and thus receive) commits commands, the commit commands were moved to a separate queue. Because the main runloop may still need to be kickstarted when it is no longer polling, a new command `CommitAvailable` was introduced.

### Complications

1. The chosen solution is not suitable for all consumers.
   - There are use cases where not all messages are read from the stream. For example, some want to read exactly 100 messages from a topic and then stop consuming. In that case the user has no intention to commit all messages, and therefore we should not wait for that to happen. Since stream consumers can basically do whatever they want, the only way we can support such use cases is by letting the consumer tell zio-kafka that they are done with committing. This requires an API change. For example, we can let the user tell zio-kafka that a given commit is the last one.
   - Not all consumers commit offsets (to Kafka) in the first place. In a future change we could make it work for commits to other stores though. As a workaround, these users can commit to both places.
1. It requires Kafka client 3.6.0. In earlier versions there was no way to wait for async commits to complete.

### Same thread executor

The Kafka client requires that any nested invocations (that is, from the rebalance listener callback) to the java consumer happens from the same thread. This is very much at odds with how ZIO works. Attempts to convince the Kafka committers to relax this requirement failed; they could not be convinced that this is a problem. This is circumvented by using a special same-thread-runtime which runs on the thread of the caller. However, some operations such as `ZIO.timeout` and anything with `Schedules` will still shift work to another thread. We work around this by using blocking time.

### Collateral

This change also:
- fixes order of `private` and `final`
- removes some completely useless tests

### Related

The same issue is present in:
- f2s-kafka: fd4s/fs2-kafka#1200
- alpakka-kafka: akka/alpakka-kafka#1038

In fact, every program that does polls and commits asynchronously is likely affected.

### Non-goals

This change does not try to solve the following goals. However, these can be addressed in future PRs.

- Awaiting commits after stopping the consumer, e.g. due to program shutdown (see #1087).
- Support consumers that want to commit only a portion of the given messages.
- Support transactional consumer/producer.
- Support external commits.

This branch is based on the work of abandoned PRs #788 and #830 and builds on preparatory work in PRs #744, #1068, #1073 #1086, #1089 and #1097.
erikvanoosten added a commit that referenced this pull request Nov 16, 2023
Fixes #590 "Many records duplicately processed after rebalancing"

In this change we introduce a new experimental mode that holds up a rebalance until all messages that were provided to the stream of a revoked partition, have been committed.

### Motivation

Here is a common (single partition) scenario around rebalances:

1. a consumer polls some messages and puts them in the streams (let's say messages with offsets 0 to 100)
1. asynchronously, the user processes these messages. Some of them are committed (let's say up to offset 50), the rest is still being processed when...
1. a rebalance happens, the partition is revoked and assigned to another consumer on another instance
1. the consumer continues to process the remaining messages with offsets 50 to 100, and tries to commit those offsets
1. _at the same time,_ another consumer on another instance, starts consuming from the last committed offset (which is 50) and will process the same messages with offsets 50 to 100

Messages with offsets 50 to 100 are being processed by both consumers simultaneously. Note that both consumers will try to commit these offsets. Until the first consumer is ready, the stored offsets can go up and down and are therefore unreliable.

After merging this change, the scenario will unfold as follows:

1. a consumer polls some messages and puts them in the streams (let's say messages with offsets 0 to 100). Zio-kafka keeps track of the highest provided offset
1. asynchronously, the user processes these messages. Some of them are committed (let's say up to offset 50), the rest is still being processed when...
1. a rebalance happens, the partition is revoked and assigned to another consumer on another instance
   * the consumer continues to process the remaining messages with offsets 50 to 100, and tries to commit those offsets
   * inside the onRevoked callback, zio-kafka continues to process commit commands from the user
   * zio-kafka continues to do so until the commit with the highest provided offset (offset 100) completes
   * the onRevoked callback completes, signalling to Kafka that the next consumer may start consuming from the partition
1. another consumer on another instance, starts consuming from the last committed offset (which is now 100, problem solved!)

### Commit queue

Because both the main runloop, and the rebalance listener need to process (and thus receive) commits commands, the commit commands were moved to a separate queue. Because the main runloop may still need to be kickstarted when it is no longer polling, a new command `CommitAvailable` was introduced.

### Complications

1. The chosen solution is not suitable for all consumers.
   - There are use cases where not all messages are read from the stream. For example, some want to read exactly 100 messages from a topic and then stop consuming. In that case the user has no intention to commit all messages, and therefore we should not wait for that to happen. Since stream consumers can basically do whatever they want, the only way we can support such use cases is by letting the consumer tell zio-kafka that they are done with committing. This requires an API change. For example, we can let the user tell zio-kafka that a given commit is the last one.
   - Not all consumers commit offsets (to Kafka) in the first place. In a future change we could make it work for commits to other stores though. As a workaround, these users can commit to both places.
1. It requires Kafka client 3.6.0. In earlier versions there was no way to wait for async commits to complete.

### Same thread executor

The Kafka client requires that any nested invocations (that is, from the rebalance listener callback) to the java consumer happens from the same thread. This is very much at odds with how ZIO works. Attempts to convince the Kafka committers to relax this requirement failed; they could not be convinced that this is a problem. This is circumvented by using a special same-thread-runtime which runs on the thread of the caller. However, some operations such as `ZIO.timeout` and anything with `Schedules` will still shift work to another thread. We work around this by using blocking time.

### Experimental

Because holding up the rebalance may have unforeseen consequences, this feature is marked as experimental. This allows us to collect experiences before we recommend this mode to all users.

### Collateral

This change also:
- fixes order of `private` and `final`
- removes some completely useless tests

### Related

The same issue is present in:
- f2s-kafka: fd4s/fs2-kafka#1200
- alpakka-kafka: akka/alpakka-kafka#1038

In fact, every program that does polls and commits asynchronously is likely affected.

### Non-goals

This change does not try to solve the following goals. However, these can be addressed in future PRs.

- Awaiting commits after stopping the consumer, e.g. due to program shutdown (see #1087).
- Support consumers that want to commit only a portion of the given messages.
- Support transactional consumer/producer.
- Support external commits.

This branch is based on the work of abandoned PRs #788 and #830 and builds on preparatory work in PRs #744, #1068, #1073 #1086, #1089 and #1097.
erikvanoosten added a commit that referenced this pull request Nov 16, 2023
Fixes #590 "Many records duplicately processed after rebalancing"

In this change we introduce a new experimental mode that holds up a rebalance until all messages that were provided to the stream of a revoked partition, have been committed.

### Motivation

Here is a common (single partition) scenario around rebalances:

1. a consumer polls some messages and puts them in the streams (let's say messages with offsets 0 to 100)
1. asynchronously, the user processes these messages. Some of them are committed (let's say up to offset 50), the rest is still being processed when...
1. a rebalance happens, the partition is revoked and assigned to another consumer on another instance
1. the consumer continues to process the remaining messages with offsets 50 to 100, and tries to commit those offsets
1. _at the same time,_ another consumer on another instance, starts consuming from the last committed offset (which is 50) and will process the same messages with offsets 50 to 100

Messages with offsets 50 to 100 are being processed by both consumers simultaneously. Note that both consumers will try to commit these offsets. Until the first consumer is ready, the stored offsets can go up and down and are therefore unreliable.

After merging this change, the scenario will unfold as follows:

1. a consumer polls some messages and puts them in the streams (let's say messages with offsets 0 to 100). Zio-kafka keeps track of the highest provided offset
1. asynchronously, the user processes these messages. Some of them are committed (let's say up to offset 50), the rest is still being processed when...
1. a rebalance happens, the partition is revoked and assigned to another consumer on another instance
   * the consumer continues to process the remaining messages with offsets 50 to 100, and tries to commit those offsets
   * inside the onRevoked callback, zio-kafka continues to process commit commands from the user
   * zio-kafka continues to do so until the commit with the highest provided offset (offset 100) completes
   * the onRevoked callback completes, signalling to Kafka that the next consumer may start consuming from the partition
1. another consumer on another instance, starts consuming from the last committed offset (which is now 100, problem solved!)

### Commit queue

Because both the main runloop, and the rebalance listener need to process (and thus receive) commits commands, the commit commands were moved to a separate queue. Because the main runloop may still need to be kickstarted when it is no longer polling, a new command `CommitAvailable` was introduced.

### Complications

1. The chosen solution is not suitable for all consumers.
   - There are use cases where not all messages are read from the stream. For example, some want to read exactly 100 messages from a topic and then stop consuming. In that case the user has no intention to commit all messages, and therefore we should not wait for that to happen. Since stream consumers can basically do whatever they want, the only way we can support such use cases is by letting the consumer tell zio-kafka that they are done with committing. This requires an API change. For example, we can let the user tell zio-kafka that a given commit is the last one.
   - Not all consumers commit offsets (to Kafka) in the first place. In a future change we could make it work for commits to other stores though. As a workaround, these users can commit to both places.
1. It requires Kafka client 3.6.0. In earlier versions there was no way to wait for async commits to complete.

### Same thread executor

The Kafka client requires that any nested invocations (that is, from the rebalance listener callback) to the java consumer happens from the same thread. This is very much at odds with how ZIO works. Attempts to convince the Kafka committers to relax this requirement failed; they could not be convinced that this is a problem. This is circumvented by using a special same-thread-runtime which runs on the thread of the caller. However, some operations such as `ZIO.timeout` and anything with `Schedules` will still shift work to another thread. We work around this by using blocking time.

### Experimental

Because holding up the rebalance may have unforeseen consequences, this feature is marked as experimental. This allows us to collect experiences before we recommend this mode to all users.

### Collateral

This change also:
- fixes order of `private` and `final`
- removes some completely useless tests

### Related

The same issue is present in:
- f2s-kafka: fd4s/fs2-kafka#1200
- alpakka-kafka: akka/alpakka-kafka#1038

In fact, every program that does polls and commits asynchronously is likely affected.

### Non-goals

This change does not try to solve the following goals. However, these can be addressed in future PRs.

- Awaiting commits after stopping the consumer, e.g. due to program shutdown (see #1087).
- Support consumers that want to commit only a portion of the given messages.
- Support transactional consumer/producer.
- Support external commits.

This branch is based on the work of abandoned PRs #788 and #830 and builds on preparatory work in PRs #744, #1068, #1073 #1086, #1089 and #1097.
erikvanoosten added a commit that referenced this pull request Nov 16, 2023
Fixes #590 "Many records duplicately processed after rebalancing"

In this change we introduce a new experimental mode that holds up a rebalance until all messages that were provided to the stream of a revoked partition, have been committed.

### Motivation

Here is a common (single partition) scenario around rebalances:

1. a consumer polls some messages and puts them in the streams (let's say messages with offsets 0 to 100)
1. asynchronously, the user processes these messages. Some of them are committed (let's say up to offset 50), the rest is still being processed when...
1. a rebalance happens, the partition is revoked and assigned to another consumer on another instance
1. the consumer continues to process the remaining messages with offsets 50 to 100, and tries to commit those offsets
1. _at the same time,_ another consumer on another instance, starts consuming from the last committed offset (which is 50) and will process the same messages with offsets 50 to 100

Messages with offsets 50 to 100 are being processed by both consumers simultaneously. Note that both consumers will try to commit these offsets. Until the first consumer is ready, the stored offsets can go up and down and are therefore unreliable.

After merging this change, the scenario will unfold as follows:

1. a consumer polls some messages and puts them in the streams (let's say messages with offsets 0 to 100). Zio-kafka keeps track of the highest provided offset
1. asynchronously, the user processes these messages. Some of them are committed (let's say up to offset 50), the rest is still being processed when...
1. a rebalance happens, the partition is revoked and assigned to another consumer on another instance
   * the consumer continues to process the remaining messages with offsets 50 to 100, and tries to commit those offsets
   * inside the onRevoked callback, zio-kafka continues to process commit commands from the user
   * zio-kafka continues to do so until the commit with the highest provided offset (offset 100) completes
   * the onRevoked callback completes, signalling to Kafka that the next consumer may start consuming from the partition
1. another consumer on another instance, starts consuming from the last committed offset (which is now 100, problem solved!)

### Commit queue

Because both the main runloop, and the rebalance listener need to process (and thus receive) commits commands, the commit commands were moved to a separate queue. Because the main runloop may still need to be kickstarted when it is no longer polling, a new command `CommitAvailable` was introduced.

### Complications

1. The chosen solution is not suitable for all consumers.
   - There are use cases where not all messages are read from the stream. For example, some want to read exactly 100 messages from a topic and then stop consuming. In that case the user has no intention to commit all messages, and therefore we should not wait for that to happen. Since stream consumers can basically do whatever they want, the only way we can support such use cases is by letting the consumer tell zio-kafka that they are done with committing. This requires an API change. For example, we can let the user tell zio-kafka that a given commit is the last one.
   - Not all consumers commit offsets (to Kafka) in the first place. In a future change we could make it work for commits to other stores though. As a workaround, these users can commit to both places.
1. It requires Kafka client 3.6.0. In earlier versions there was no way to wait for async commits to complete.

### Same thread executor

The Kafka client requires that any nested invocations (that is, from the rebalance listener callback) to the java consumer happens from the same thread. This is very much at odds with how ZIO works. Attempts to convince the Kafka committers to relax this requirement failed; they could not be convinced that this is a problem. This is circumvented by using a special same-thread-runtime which runs on the thread of the caller. However, some operations such as `ZIO.timeout` and anything with `Schedules` will still shift work to another thread. We work around this by using blocking time.

### Experimental

Because holding up the rebalance may have unforeseen consequences, this feature is marked as experimental. This allows us to collect experiences before we recommend this mode to all users.

### Collateral

This change also:
- fixes order of `private` and `final`
- removes some completely useless tests

### Related

The same issue is present in:
- f2s-kafka: fd4s/fs2-kafka#1200
- alpakka-kafka: akka/alpakka-kafka#1038

In fact, every program that does polls and commits asynchronously is likely affected.

### Non-goals

This change does not try to solve the following goals. However, these can be addressed in future PRs.

- Awaiting commits after stopping the consumer, e.g. due to program shutdown (see #1087).
- Support consumers that want to commit only a portion of the given messages.
- Support transactional consumer/producer.
- Support external commits.

This branch is based on the work of abandoned PRs #788 and #830 and builds on preparatory work in PRs #744, #1068, #1073 #1086, #1089 and #1097.
erikvanoosten added a commit that referenced this pull request Nov 18, 2023
Fixes #590 "Many records duplicately processed after rebalancing"

In this change we introduce a new experimental mode that holds up a rebalance until all messages that were provided to the stream of a revoked partition, have been committed.

### Motivation

Here is a common (single partition) scenario around rebalances:

1. a consumer polls some messages and puts them in the streams (let's say messages with offsets 0 to 100)
1. asynchronously, the user processes these messages. Some of them are committed (let's say up to offset 50), the rest is still being processed when...
1. a rebalance happens, the partition is revoked and assigned to another consumer on another instance
1. the consumer continues to process the remaining messages with offsets 50 to 100, and tries to commit those offsets
1. _at the same time,_ another consumer on another instance, starts consuming from the last committed offset (which is 50) and will process the same messages with offsets 50 to 100

Messages with offsets 50 to 100 are being processed by both consumers simultaneously. Note that both consumers will try to commit these offsets. Until the first consumer is ready, the stored offsets can go up and down and are therefore unreliable.

After merging this change, the scenario will unfold as follows:

1. a consumer polls some messages and puts them in the streams (let's say messages with offsets 0 to 100). Zio-kafka keeps track of the highest provided offset
1. asynchronously, the user processes these messages. Some of them are committed (let's say up to offset 50), the rest is still being processed when...
1. a rebalance happens, the partition is revoked and assigned to another consumer on another instance
   * the consumer continues to process the remaining messages with offsets 50 to 100, and tries to commit those offsets
   * inside the onRevoked callback, zio-kafka continues to process commit commands from the user
   * zio-kafka continues to do so until the commit with the highest provided offset (offset 100) completes
   * the onRevoked callback completes, signalling to Kafka that the next consumer may start consuming from the partition
1. another consumer on another instance, starts consuming from the last committed offset (which is now 100, problem solved!)

### Commit queue

Because both the main runloop, and the rebalance listener need to process (and thus receive) commits commands, the commit commands were moved to a separate queue. Because the main runloop may still need to be kickstarted when it is no longer polling, a new command `CommitAvailable` was introduced.

### Complications

1. The chosen solution is not suitable for all consumers.
   - There are use cases where not all messages are read from the stream. For example, some want to read exactly 100 messages from a topic and then stop consuming. In that case the user has no intention to commit all messages, and therefore we should not wait for that to happen. Since stream consumers can basically do whatever they want, the only way we can support such use cases is by letting the consumer tell zio-kafka that they are done with committing. This requires an API change. For example, we can let the user tell zio-kafka that a given commit is the last one.
   - Not all consumers commit offsets (to Kafka) in the first place. In a future change we could make it work for commits to other stores though. As a workaround, these users can commit to both places.
1. It requires Kafka client 3.6.0. In earlier versions there was no way to wait for async commits to complete.

### Same thread executor

The Kafka client requires that any nested invocations (that is, from the rebalance listener callback) to the java consumer happens from the same thread. This is very much at odds with how ZIO works. Attempts to convince the Kafka committers to relax this requirement failed; they could not be convinced that this is a problem. This is circumvented by using a special same-thread-runtime which runs on the thread of the caller. However, some operations such as `ZIO.timeout` and anything with `Schedules` will still shift work to another thread. We work around this by using blocking time.

### Experimental

Because holding up the rebalance may have unforeseen consequences, this feature is marked as experimental. This allows us to collect experiences before we recommend this mode to all users.

### Collateral

This change also:
- fixes order of `private` and `final`
- removes some completely useless tests

### Related

The same issue is present in:
- f2s-kafka: fd4s/fs2-kafka#1200
- alpakka-kafka: akka/alpakka-kafka#1038

In fact, every program that does polls and commits asynchronously is likely affected.

### Non-goals

This change does not try to solve the following goals. However, these can be addressed in future PRs.

- Awaiting commits after stopping the consumer, e.g. due to program shutdown (see #1087).
- Support consumers that want to commit only a portion of the given messages.
- Support transactional consumer/producer.
- Support external commits.

This branch is based on the work of abandoned PRs #788 and #830 and builds on preparatory work in PRs #744, #1068, #1073 #1086, #1089 and #1097.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants