Skip to content

Commit

Permalink
Make partitionPreFetchBufferLimit value 0 disable prefetching (#1091
Browse files Browse the repository at this point in the history
)

Before `ConsumerSettings.partitionPreFetchBufferLimit(1)` was needed to disable pre-fetching. After this change you can disable pre-fetching with `ConsumerSettings.partitionPreFetchBufferLimit(0)`, or simply with `ConsumerSettings.withoutPartitionPreFetching`.

Also: 
 - fix flaky test `a consumer timeout interrupts the stream and shuts down the consumer` by no longer using `Zstream.interruptWhen`,
 - detect slow consumers slightly more performant by requesting time less often,
 - removed no longer true statement about powers of 2 in the documentation.
  • Loading branch information
erikvanoosten authored Oct 29, 2023
1 parent 9b27a8b commit a4ec8e0
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 74 deletions.
4 changes: 2 additions & 2 deletions docs/consumer-tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ The most important settings for tuning throughput and latency are:
* kafka's [configuration `max.poll.records`](https://kafka.apache.org/documentation/#consumerconfigs_max.poll.records) — The maximum number of records a poll will return. Kafka defaults
this to `500`. You can set this higher for more throughput, or lower for lower latency.
* zio-kafka's fetch-strategy `partitionPreFetchBufferLimit` — when the number of records in a partition queue is
below this value, zio-kafka will start to pre-fetch and buffer more records from Kafka. The default value for this
parameter is `1024`; 2 * the default `max.poll.records` of 500, rounded to the nearest power of 2.
at or below this value, zio-kafka will start to pre-fetch and buffer more records from Kafka. The default value for
this parameter is `1024`; 2 * the default `max.poll.records` of 500, rounded to the nearest power of 2.

Zio-kafka provides 2 methods that set these settings for 2 common use cases: `ConsumerSettings.tuneForHighThroughput`
and `ConsumerSettings.tuneForLowLatency`.
Expand Down
92 changes: 49 additions & 43 deletions zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -322,49 +322,55 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
} yield assert(offset.map(_.offset))(isSome(equalTo(9L)))
},
test("a consumer timeout interrupts the stream and shuts down the consumer") {
ZIO.scoped {
for {
topic1 <- randomTopic
topic2 <- randomTopic
group <- randomGroup
clientId <- randomClient
_ <- ZIO.fromTry(EmbeddedKafka.createCustomTopic(topic1))
_ <- ZIO.fromTry(EmbeddedKafka.createCustomTopic(topic2))
settings <- consumerSettings(
clientId = clientId,
groupId = Some(group),
maxPollInterval = 1.second,
`max.poll.records` = 2
)
.map(_.withPollTimeout(50.millis))
consumer <- Consumer.make(settings)
_ <- scheduledProduce(topic1, Schedule.fixed(50.millis).jittered).runDrain.forkScoped
_ <- scheduledProduce(topic2, Schedule.fixed(200.millis).jittered).runDrain.forkScoped
// The slow consumer:
c1 <- consumer
.plainStream(Subscription.topics(topic1), Serde.string, Serde.string)
// The runner for GitHub Actions is a bit underpowered. The machine is so busy that the logic
// that detects the timeout doesn't get the chance to execute quickly enough. To compensate we
// sleep a huge amount of time:
.tap(r => ZIO.sleep(20.seconds).when(r.key == "key3"))
// Use `take` to ensure the test ends quickly, even when the interrupt fails to occur.
// Because of chunking, we need to pull more than 3 records before the interrupt kicks in.
.take(100)
.runDrain
.exit
.fork
// Another consumer:
_ <- consumer
.plainStream(Subscription.topics(topic2), Serde.string, Serde.string)
.runDrain
.forkScoped
c1Exit <- c1.join
subscriptions <- consumer.subscription.delay(100.millis)
} yield assertTrue(
c1Exit.isFailure,
subscriptions.isEmpty
)
}
// Setup of this test:
// - Set the max poll interval very low: a couple of seconds.
// - Continuously produce records so that data is always available.
// - Consumer 1 consumes very slowly; each chunk takes more than the max poll interval.
// - Consumer 2 is fast.
// - We assert that consumer 1 is interrupted and that consumer 2 is stopped.
for {
topic1 <- randomTopic
topic2 <- randomTopic
group <- randomGroup
clientId <- randomClient
_ <- ZIO.fromTry(EmbeddedKafka.createCustomTopic(topic1, partitions = 1))
_ <- ZIO.fromTry(EmbeddedKafka.createCustomTopic(topic2))
settings <- consumerSettings(
clientId = clientId,
groupId = Some(group),
maxPollInterval = 2.seconds,
`max.poll.records` = 2
)
.map(_.withoutPartitionPreFetching.withPollTimeout(100.millis))
consumer <- Consumer.make(settings)
_ <- scheduledProduce(topic1, Schedule.fixed(500.millis).jittered).runDrain.forkScoped
_ <- scheduledProduce(topic2, Schedule.fixed(500.millis).jittered).runDrain.forkScoped
// The slow consumer:
c1 <- consumer
.plainStream(Subscription.topics(topic1), Serde.string, Serde.string)
// Consumer timeout detection is done per chunk. From here on, work per chunk
.chunks
// Sleep for some seconds to simulate a consumer that is stuck (only sleep for the first chunk).
// Because slow consumers are only detected once every run-loop, detection can take many seconds.
.tap { c =>
ZIO.sleep(10.seconds).when(c.head.key == "key0")
}
// Use `take` to ensure the test ends quickly, even when the interrupt fails to occur.
.take(8)
.runDrain
.exit
.fork
// Another consumer:
_ <- consumer
.plainStream(Subscription.topics(topic2), Serde.string, Serde.string)
.runDrain
.forkScoped
c1Exit <- c1.join
subscriptions <- consumer.subscription.delay(500.millis)
} yield assertTrue(
c1Exit.isFailure,
subscriptions.isEmpty
)
},
test("a slow producer doesnot interrupt the stream") {
ZIO.scoped {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,19 @@ object QueueSizeBasedFetchStrategySpec extends ZIOSpecDefaultSlf4j {
override def spec: Spec[TestEnvironment with Scope, Any] =
suite("QueueSizeBasedFetchStrategySpec")(
test("stream with queue size above limit is paused") {
val streams = Chunk(newStream(tp10, currentQueueSize = 100))
val streams = Chunk(newStream(tp10, currentQueueSize = partitionPreFetchBufferLimit + 1))
for {
result <- fetchStrategy.selectPartitionsToFetch(streams)
} yield assertTrue(result.isEmpty)
},
test("stream with queue size equal to limit is paused") {
test("stream with queue size equal to limit may resume") {
val streams = Chunk(newStream(tp10, currentQueueSize = partitionPreFetchBufferLimit))
for {
result <- fetchStrategy.selectPartitionsToFetch(streams)
} yield assertTrue(result.isEmpty)
} yield assertTrue(result == Set(tp10))
},
test("stream with queue size below limit may resume") {
val streams = Chunk(newStream(tp10, currentQueueSize = 10))
val streams = Chunk(newStream(tp10, currentQueueSize = partitionPreFetchBufferLimit - 1))
for {
result <- fetchStrategy.selectPartitionsToFetch(streams)
} yield assertTrue(result == Set(tp10))
Expand Down
20 changes: 14 additions & 6 deletions zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -159,19 +159,27 @@ final case class ConsumerSettings(

/**
* @param partitionPreFetchBufferLimit
* The queue size below which more records are fetched and buffered (per partition). This buffer improves throughput
* and supports varying downstream message processing time, while maintaining some backpressure. Large values
* effectively disable backpressure at the cost of high memory usage, low values will effectively disable
* prefetching in favour of low memory consumption. The number of records that is fetched on every poll is
* controlled by the `max.poll.records` setting, the number of records fetched for every partition is somewhere
* between 0 and `max.poll.records`. A value that is a power of 2 offers somewhat better queueing performance.
* The queue size at or below which more records are fetched and buffered (per partition). This buffer improves
* throughput and supports varying downstream message processing time, while maintaining some backpressure. Large
* values effectively disable backpressure at the cost of high memory usage, low values will effectively disable
* prefetching in favor of low memory consumption. The number of records that is fetched on every poll is controlled
* by the `max.poll.records` setting, the number of records fetched for every partition is somewhere between 0 and
* `max.poll.records`.
*
* The default value for this parameter is 1024. It is calculated by taking 2 * the default `max.poll.records` of 500,
* rounded to the nearest power of 2.
*
* The value `0` disables pre-fetching.
*/
def withPartitionPreFetchBufferLimit(partitionPreFetchBufferLimit: Int): ConsumerSettings =
copy(fetchStrategy = QueueSizeBasedFetchStrategy(partitionPreFetchBufferLimit))

/**
* Disables partition record pre-fetching.
*/
def withoutPartitionPreFetching: ConsumerSettings =
withPartitionPreFetchBufferLimit(0)

@deprecated("Use withPartitionPreFetchBufferLimit instead", "2.6.0")
def withMaxPartitionQueueSize(partitionPreFetchBufferLimit: Int): ConsumerSettings =
withPartitionPreFetchBufferLimit(partitionPreFetchBufferLimit)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,32 @@ trait FetchStrategy {
}

/**
* A fetch strategy that allows a stream to fetch data when its queue size is below `partitionPreFetchBufferLimit`.
* A fetch strategy that allows a stream to fetch data when its queue size is at or below
* `partitionPreFetchBufferLimit`.
*
* @param partitionPreFetchBufferLimit
* The queue size below which more records are fetched and buffered (per partition). This buffer improves throughput
* and supports varying downstream message processing time, while maintaining some backpressure. Large values
* effectively disable backpressure at the cost of high memory usage, low values will effectively disable prefetching
* in favour of low memory consumption. The number of records that is fetched on every poll is controlled by the
* `max.poll.records` setting, the number of records fetched for every partition is somewhere between 0 and
* `max.poll.records`. A value that is a power of 2 offers somewhat better queueing performance.
* The queue size at or below which more records are fetched and buffered (per partition). This buffer improves
* throughput and supports varying downstream message processing time, while maintaining some backpressure. Large
* values effectively disable backpressure at the cost of high memory usage, low values will effectively disable
* prefetching in favor of low memory consumption. The number of records that is fetched on every poll is controlled
* by the `max.poll.records` setting, the number of records fetched for every partition is somewhere between 0 and
* `max.poll.records`.
*
* The default value for this parameter is 2 * the default `max.poll.records` of 500, rounded to the nearest power of 2.
*
* The value `0` disables pre-fetching.
*/
final case class QueueSizeBasedFetchStrategy(partitionPreFetchBufferLimit: Int = 1024) extends FetchStrategy {
require(
partitionPreFetchBufferLimit >= 0,
s"partitionPreFetchBufferLimit must be at least 0, got $partitionPreFetchBufferLimit"
)

override def selectPartitionsToFetch(streams: Chunk[PartitionStream]): ZIO[Any, Nothing, Set[TopicPartition]] =
ZIO
.foldLeft(streams)(mutable.ArrayBuilder.make[TopicPartition]) { case (acc, stream) =>
stream.queueSize.map { queueSize =>
if (queueSize < partitionPreFetchBufferLimit) acc += stream.tp else acc
if (queueSize <= partitionPreFetchBufferLimit) acc += stream.tp else acc
}
}
.map(_.result().toSet)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package zio.kafka.consumer.internal
import org.apache.kafka.common.TopicPartition
import zio.kafka.consumer.Offset
import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics }
import zio.kafka.consumer.internal.PartitionStreamControl.QueueInfo
import zio.kafka.consumer.internal.PartitionStreamControl.{ NanoTime, QueueInfo }
import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord
import zio.stream.{ Take, ZStream }
import zio.{ Chunk, Clock, Duration, LogAnnotation, Promise, Queue, Ref, UIO, ZIO }
Expand Down Expand Up @@ -62,15 +62,14 @@ final class PartitionStreamControl private (
def queueSize: UIO[Int] = queueInfoRef.get.map(_.size)

/**
* @param now
* the time as given by `Clock.nanoTime`
* @return
* `true` when the stream has data available, but none has been pulled for more than `maxPollInterval` (since data
* became available), `false` otherwise
*/
private[internal] def maxPollIntervalExceeded: UIO[Boolean] =
for {
now <- Clock.nanoTime
queueInfo <- queueInfoRef.get
} yield queueInfo.deadlineExceeded(now)
private[internal] def maxPollIntervalExceeded(now: NanoTime): UIO[Boolean] =
queueInfoRef.get.map(_.deadlineExceeded(now))

/** To be invoked when the partition was lost. */
private[internal] def lost: UIO[Boolean] =
Expand Down Expand Up @@ -106,7 +105,7 @@ final class PartitionStreamControl private (

object PartitionStreamControl {

private type NanoTime = Long
type NanoTime = Long

private[internal] def newPartitionStream(
tp: TopicPartition,
Expand Down Expand Up @@ -152,9 +151,11 @@ object PartitionStreamControl {
// First try to take all records that are available right now.
// When no data is available, request more data and await its arrival.
dataQueue.takeAll.flatMap(data => if (data.isEmpty) requestAndAwaitData else ZIO.succeed(data))
}.flattenTake
.chunksWith(_.tap(records => registerPull(queueInfo, records)))
.interruptWhen(interruptionPromise)
}.flattenTake.chunksWith { s =>
s.tap(records => registerPull(queueInfo, records))
// Due to https://github.com/zio/zio/issues/8515 we cannot use Zstream.interruptWhen.
.mapZIO(chunk => interruptionPromise.await.whenZIO(interruptionPromise.isDone).as(chunk))
}
} yield new PartitionStreamControl(
tp,
stream,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,8 +405,10 @@ private[consumer] final class Runloop private (
*/
private def checkStreamPollInterval(streams: Chunk[PartitionStreamControl]): ZIO[Any, Nothing, Unit] =
for {
now <- Clock.nanoTime
anyExceeded <- ZIO.foldLeft(streams)(false) { case (acc, stream) =>
stream.maxPollIntervalExceeded
stream
.maxPollIntervalExceeded(now)
.tap(exceeded => if (exceeded) stream.halt else ZIO.unit)
.map(acc || _)
}
Expand Down

0 comments on commit a4ec8e0

Please sign in to comment.