Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make partitionPreFetchBufferLimit value 0 disable prefetching #1091

Merged
merged 9 commits into from
Oct 29, 2023
4 changes: 2 additions & 2 deletions docs/consumer-tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ The most important settings for tuning throughput and latency are:
* kafka's [configuration `max.poll.records`](https://kafka.apache.org/documentation/#consumerconfigs_max.poll.records) — The maximum number of records a poll will return. Kafka defaults
this to `500`. You can set this higher for more throughput, or lower for lower latency.
* zio-kafka's fetch-strategy `partitionPreFetchBufferLimit` — when the number of records in a partition queue is
below this value, zio-kafka will start to pre-fetch and buffer more records from Kafka. The default value for this
parameter is `1024`; 2 * the default `max.poll.records` of 500, rounded to the nearest power of 2.
at or below this value, zio-kafka will start to pre-fetch and buffer more records from Kafka. The default value for
this parameter is `1024`; 2 * the default `max.poll.records` of 500, rounded to the nearest power of 2.

Zio-kafka provides 2 methods that set these settings for 2 common use cases: `ConsumerSettings.tuneForHighThroughput`
and `ConsumerSettings.tuneForLowLatency`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
maxPollInterval = 1.second,
`max.poll.records` = 2
)
.map(_.withPollTimeout(50.millis))
.map(_.withPollTimeout(50.millis).withPartitionPreFetchBufferLimit(0))
consumer <- Consumer.make(settings)
_ <- scheduledProduce(topic1, Schedule.fixed(50.millis).jittered).runDrain.forkScoped
_ <- scheduledProduce(topic2, Schedule.fixed(200.millis).jittered).runDrain.forkScoped
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,19 @@ object QueueSizeBasedFetchStrategySpec extends ZIOSpecDefaultSlf4j {
override def spec: Spec[TestEnvironment with Scope, Any] =
suite("QueueSizeBasedFetchStrategySpec")(
test("stream with queue size above limit is paused") {
val streams = Chunk(newStream(tp10, currentQueueSize = 100))
val streams = Chunk(newStream(tp10, currentQueueSize = partitionPreFetchBufferLimit + 1))
for {
result <- fetchStrategy.selectPartitionsToFetch(streams)
} yield assertTrue(result.isEmpty)
},
test("stream with queue size equal to limit is paused") {
test("stream with queue size equal to limit may resume") {
val streams = Chunk(newStream(tp10, currentQueueSize = partitionPreFetchBufferLimit))
for {
result <- fetchStrategy.selectPartitionsToFetch(streams)
} yield assertTrue(result.isEmpty)
} yield assertTrue(result == Set(tp10))
},
test("stream with queue size below limit may resume") {
val streams = Chunk(newStream(tp10, currentQueueSize = 10))
val streams = Chunk(newStream(tp10, currentQueueSize = partitionPreFetchBufferLimit - 1))
for {
result <- fetchStrategy.selectPartitionsToFetch(streams)
} yield assertTrue(result == Set(tp10))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,19 +159,27 @@ final case class ConsumerSettings(

/**
* @param partitionPreFetchBufferLimit
* The queue size below which more records are fetched and buffered (per partition). This buffer improves throughput
* and supports varying downstream message processing time, while maintaining some backpressure. Large values
* effectively disable backpressure at the cost of high memory usage, low values will effectively disable
* prefetching in favour of low memory consumption. The number of records that is fetched on every poll is
* controlled by the `max.poll.records` setting, the number of records fetched for every partition is somewhere
* between 0 and `max.poll.records`. A value that is a power of 2 offers somewhat better queueing performance.
* The queue size at or below which more records are fetched and buffered (per partition). This buffer improves
* throughput and supports varying downstream message processing time, while maintaining some backpressure. Large
* values effectively disable backpressure at the cost of high memory usage, low values will effectively disable
* prefetching in favor of low memory consumption. The number of records that is fetched on every poll is controlled
* by the `max.poll.records` setting, the number of records fetched for every partition is somewhere between 0 and
* `max.poll.records`. A value that is a power of 2 offers somewhat better queueing performance.
*
* The default value for this parameter is 1024. It is calculated by taking 2 * the default `max.poll.records` of 500,
erikvanoosten marked this conversation as resolved.
Show resolved Hide resolved
* rounded to the nearest power of 2.
*
* The value `0` disables pre-fetching.
*/
def withPartitionPreFetchBufferLimit(partitionPreFetchBufferLimit: Int): ConsumerSettings =
copy(fetchStrategy = QueueSizeBasedFetchStrategy(partitionPreFetchBufferLimit))

/**
* Disables partition record pre-fetching.
*/
def withoutPartitionPreFetching(): ConsumerSettings =
withPartitionPreFetchBufferLimit(0)

@deprecated("Use withPartitionPreFetchBufferLimit instead", "2.6.0")
def withMaxPartitionQueueSize(partitionPreFetchBufferLimit: Int): ConsumerSettings =
withPartitionPreFetchBufferLimit(partitionPreFetchBufferLimit)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,32 @@ trait FetchStrategy {
}

/**
* A fetch strategy that allows a stream to fetch data when its queue size is below `partitionPreFetchBufferLimit`.
* A fetch strategy that allows a stream to fetch data when its queue size is at or below
* `partitionPreFetchBufferLimit`.
*
* @param partitionPreFetchBufferLimit
* The queue size below which more records are fetched and buffered (per partition). This buffer improves throughput
* and supports varying downstream message processing time, while maintaining some backpressure. Large values
* effectively disable backpressure at the cost of high memory usage, low values will effectively disable prefetching
* in favour of low memory consumption. The number of records that is fetched on every poll is controlled by the
* `max.poll.records` setting, the number of records fetched for every partition is somewhere between 0 and
* The queue size at or below which more records are fetched and buffered (per partition). This buffer improves
* throughput and supports varying downstream message processing time, while maintaining some backpressure. Large
* values effectively disable backpressure at the cost of high memory usage, low values will effectively disable
* prefetching in favor of low memory consumption. The number of records that is fetched on every poll is controlled
* by the `max.poll.records` setting, the number of records fetched for every partition is somewhere between 0 and
* `max.poll.records`. A value that is a power of 2 offers somewhat better queueing performance.
*
* The default value for this parameter is 2 * the default `max.poll.records` of 500, rounded to the nearest power of 2.
*
* The value `0` disables pre-fetching.
*/
final case class QueueSizeBasedFetchStrategy(partitionPreFetchBufferLimit: Int = 1024) extends FetchStrategy {
require(
partitionPreFetchBufferLimit >= 0,
s"partitionPreFetchBufferLimit must be at least 0, got $partitionPreFetchBufferLimit"
)

override def selectPartitionsToFetch(streams: Chunk[PartitionStream]): ZIO[Any, Nothing, Set[TopicPartition]] =
ZIO
.foldLeft(streams)(mutable.ArrayBuilder.make[TopicPartition]) { case (acc, stream) =>
stream.queueSize.map { queueSize =>
if (queueSize < partitionPreFetchBufferLimit) acc += stream.tp else acc
if (queueSize <= partitionPreFetchBufferLimit) acc += stream.tp else acc
}
}
.map(_.result().toSet)
Expand Down
Loading