Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent OOM by introducing a max on total buffer capacity #944

Closed
wants to merge 7 commits into from

Conversation

erikvanoosten
Copy link
Collaborator

No description provided.

// By shuffling the streams we prevent read-starvation for streams at the end of the list.
val streams =
if (maxTotalQueueSize == Int.MaxValue) state.assignedStreams
else scala.util.Random.shuffle(state.assignedStreams)
Copy link
Collaborator Author

@erikvanoosten erikvanoosten Jun 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of shuffling, we can also sort the streams by queueSize (smallest first) 🤔

Copy link
Collaborator Author

@erikvanoosten erikvanoosten Jun 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind, when we do that, behavior becomes unpredictable again. I would not be surprised when sorting by queueSize would cause instead of prevent read-starvation.

Base automatically changed from review_803 to master June 24, 2023 15:57
Copy link
Collaborator

@svroonland svroonland left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add motivation why this can not be controlled by reducing the maxPartitionQueueSize such that mPQS * nr of partitions < maxTotalQueueSize ?

@erikvanoosten
Copy link
Collaborator Author

Could you add motivation why this can not be controlled by reducing the maxPartitionQueueSize such that mPQS * nr of partitions < maxTotalQueueSize ?

Of course!

Suppose there are 2000 partitions to consume from and messages are around 30kB. With maxPartitionQueueSize set to the default of 1024 that means we need up to 2000 * 1024 * 30k = 63 GB of heap. Lets say we want to stay below 2GB for all queues in total. With your suggestion we need to set maxPartitionQueueSize to 2GB / 2000 / 30k = 35.

35 is so low that once a queue reaches this size, the remaining messages will be processed well before a new poll is executed. This leads to low throughput.

The introduction of a global maximum allows a higher maxPartitionQueueSize (e.g. 1024, the default is fine). For example, to stay within 2GB, we can set maxTotalQueueSize to 2G/30k = 70k. This would allow 70k/1024 = 70 partitions to get high throughput. At every poll there is a different selection of partitions that benefit, therefore, there would always be high throughput progress.

Note: the numbers in this comment are not made up, they are taken from a service we deploy at my company.

@svroonland
Copy link
Collaborator

svroonland commented Jun 28, 2023

35 is so low that once a queue reaches this size, the remaining messages will be processed well before a new poll is executed. This leads to low throughput.

If you have 2000 partitions, that's 70.000 messages to be processed before the next poll, right? So isn't there plenty of messages in the queue then? Or are you assuming unequal nr of messages on the partitions?

@erikvanoosten
Copy link
Collaborator Author

35 is so low that once a queue reaches this size, the remaining messages will be processed well before a new poll is executed. This leads to low throughput.

If you have 2000 partitions, that's 70.000 messages to be processed before the next poll, right? So isn't there plenty of messages in the queue then? Or are you assuming unequal nr of messages on the partitions?

Indeed, in our application there is a huge imbalance in the partitions. Some partitions have thousands of msg per second while other partitions have a handful per day. Currently when we need to process backlog, those high throughput partitions take a long time to catch up.

@@ -245,17 +246,23 @@ private[consumer] final class Runloop private (
s"Starting poll with ${state.pendingRequests.size} pending requests and ${state.pendingCommits.size} pending commits"
)
_ <- currentStateRef.set(state)
partitionsToFetch <-
partitionsToFetch <- {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we do a random shuffling of partitions to resume. That to me sounds like it becomes very unpredictable when to expect new messages on some partition, that's treading into dangerous / unpredictable terroritory if you ask me.

At the very least we'd need some good unit tests to validate behavior in some scenarios.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without shuffling the partitions that are at the beginning of the list have a higher chance to get data than partitions at the bottom of the list. By shuffling at each poll, each partition gets (on average) an equals chance to get data. So because there are many polls, and we shuffle differently every time, the average behavior is very predictable.

Note that we still resume a lot of partitions (e.g. roughly 70 in the example above), so it is still up to the broker to select which partitions to sent data for.

What do you think might go wrong?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know what can go wrong. Can you write a unit test that validates the on-average fairness in throughput for the partitions and that the total queue size is always smaller than the config setting?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That should be possible (though not easy). I did not want to make the effort of writing unit tests if no-one likes the idea to begin with. But if I can convince you with a test, I will write it :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, well it is a quite extreme use case to have that many partitions for one consumer, so I would prefer a mechanism outside of zio-kafka, or at least an optional component outside the Runloop.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not agree that 2000 partitions is extreme. It is well within the capabilities of a Kafka cluster.

Do you have an idea on how else to build this feature?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, even with 200 partitions and 30KB messages the memory requirements are quite huge (~6GB).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could create a trait PrefetchStrategy that the user can supply an instance for.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be good for testing the prefetch strategy in unit tests as well

@erikvanoosten
Copy link
Collaborator Author

Replaced by #970.

@erikvanoosten erikvanoosten deleted the global-queue-limit branch July 10, 2023 20:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants