From 8d3c5211f55eadc1ad2f50f033afe884509a6e28 Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Sat, 28 Oct 2023 12:11:41 +0200 Subject: [PATCH 1/8] Make `partitionPreFetchBufferLimit` value `0` disable prefetching Also: - add ConsumerSettings.withoutPartitionPreFetching - attempt to make test `a consumer timeout interrupts the stream and shuts down the consumer` stable by disabling prefetching --- docs/consumer-tuning.md | 4 ++-- .../zio/kafka/consumer/ConsumerSpec.scala | 2 +- .../QueueSizeBasedFetchStrategySpec.scala | 8 +++---- .../zio/kafka/consumer/ConsumerSettings.scala | 20 ++++++++++++----- .../kafka/consumer/fetch/FetchStrategy.scala | 22 +++++++++++++------ 5 files changed, 36 insertions(+), 20 deletions(-) diff --git a/docs/consumer-tuning.md b/docs/consumer-tuning.md index a08666c2c..65460e14b 100644 --- a/docs/consumer-tuning.md +++ b/docs/consumer-tuning.md @@ -29,8 +29,8 @@ The most important settings for tuning throughput and latency are: * kafka's [configuration `max.poll.records`](https://kafka.apache.org/documentation/#consumerconfigs_max.poll.records) — The maximum number of records a poll will return. Kafka defaults this to `500`. You can set this higher for more throughput, or lower for lower latency. * zio-kafka's fetch-strategy `partitionPreFetchBufferLimit` — when the number of records in a partition queue is - below this value, zio-kafka will start to pre-fetch and buffer more records from Kafka. The default value for this - parameter is `1024`; 2 * the default `max.poll.records` of 500, rounded to the nearest power of 2. + at or below this value, zio-kafka will start to pre-fetch and buffer more records from Kafka. The default value for + this parameter is `1024`; 2 * the default `max.poll.records` of 500, rounded to the nearest power of 2. Zio-kafka provides 2 methods that set these settings for 2 common use cases: `ConsumerSettings.tuneForHighThroughput` and `ConsumerSettings.tuneForLowLatency`. diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index 63cfc269d..4d2c4bea6 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -336,7 +336,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { maxPollInterval = 1.second, `max.poll.records` = 2 ) - .map(_.withPollTimeout(50.millis)) + .map(_.withPollTimeout(50.millis).withPartitionPreFetchBufferLimit(0)) consumer <- Consumer.make(settings) _ <- scheduledProduce(topic1, Schedule.fixed(50.millis).jittered).runDrain.forkScoped _ <- scheduledProduce(topic2, Schedule.fixed(200.millis).jittered).runDrain.forkScoped diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/fetch/QueueSizeBasedFetchStrategySpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/fetch/QueueSizeBasedFetchStrategySpec.scala index 4d5480657..758f0f941 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/fetch/QueueSizeBasedFetchStrategySpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/fetch/QueueSizeBasedFetchStrategySpec.scala @@ -18,19 +18,19 @@ object QueueSizeBasedFetchStrategySpec extends ZIOSpecDefaultSlf4j { override def spec: Spec[TestEnvironment with Scope, Any] = suite("QueueSizeBasedFetchStrategySpec")( test("stream with queue size above limit is paused") { - val streams = Chunk(newStream(tp10, currentQueueSize = 100)) + val streams = Chunk(newStream(tp10, currentQueueSize = partitionPreFetchBufferLimit + 1)) for { result <- fetchStrategy.selectPartitionsToFetch(streams) } yield assertTrue(result.isEmpty) }, - test("stream with queue size equal to limit is paused") { + test("stream with queue size equal to limit may resume") { val streams = Chunk(newStream(tp10, currentQueueSize = partitionPreFetchBufferLimit)) for { result <- fetchStrategy.selectPartitionsToFetch(streams) - } yield assertTrue(result.isEmpty) + } yield assertTrue(result == Set(tp10)) }, test("stream with queue size below limit may resume") { - val streams = Chunk(newStream(tp10, currentQueueSize = 10)) + val streams = Chunk(newStream(tp10, currentQueueSize = partitionPreFetchBufferLimit - 1)) for { result <- fetchStrategy.selectPartitionsToFetch(streams) } yield assertTrue(result == Set(tp10)) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala index 8e083d0fa..73159cd8b 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala @@ -159,19 +159,27 @@ final case class ConsumerSettings( /** * @param partitionPreFetchBufferLimit - * The queue size below which more records are fetched and buffered (per partition). This buffer improves throughput - * and supports varying downstream message processing time, while maintaining some backpressure. Large values - * effectively disable backpressure at the cost of high memory usage, low values will effectively disable - * prefetching in favour of low memory consumption. The number of records that is fetched on every poll is - * controlled by the `max.poll.records` setting, the number of records fetched for every partition is somewhere - * between 0 and `max.poll.records`. A value that is a power of 2 offers somewhat better queueing performance. + * The queue size at or below which more records are fetched and buffered (per partition). This buffer improves + * throughput and supports varying downstream message processing time, while maintaining some backpressure. Large + * values effectively disable backpressure at the cost of high memory usage, low values will effectively disable + * prefetching in favor of low memory consumption. The number of records that is fetched on every poll is controlled + * by the `max.poll.records` setting, the number of records fetched for every partition is somewhere between 0 and + * `max.poll.records`. A value that is a power of 2 offers somewhat better queueing performance. * * The default value for this parameter is 1024. It is calculated by taking 2 * the default `max.poll.records` of 500, * rounded to the nearest power of 2. + * + * The value `0` disables pre-fetching. */ def withPartitionPreFetchBufferLimit(partitionPreFetchBufferLimit: Int): ConsumerSettings = copy(fetchStrategy = QueueSizeBasedFetchStrategy(partitionPreFetchBufferLimit)) + /** + * Disables partition record pre-fetching. + */ + def withoutPartitionPreFetching(): ConsumerSettings = + withPartitionPreFetchBufferLimit(0) + @deprecated("Use withPartitionPreFetchBufferLimit instead", "2.6.0") def withMaxPartitionQueueSize(partitionPreFetchBufferLimit: Int): ConsumerSettings = withPartitionPreFetchBufferLimit(partitionPreFetchBufferLimit) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/FetchStrategy.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/FetchStrategy.scala index 7894bbc87..72567687f 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/FetchStrategy.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/FetchStrategy.scala @@ -25,24 +25,32 @@ trait FetchStrategy { } /** - * A fetch strategy that allows a stream to fetch data when its queue size is below `partitionPreFetchBufferLimit`. + * A fetch strategy that allows a stream to fetch data when its queue size is at or below + * `partitionPreFetchBufferLimit`. * * @param partitionPreFetchBufferLimit - * The queue size below which more records are fetched and buffered (per partition). This buffer improves throughput - * and supports varying downstream message processing time, while maintaining some backpressure. Large values - * effectively disable backpressure at the cost of high memory usage, low values will effectively disable prefetching - * in favour of low memory consumption. The number of records that is fetched on every poll is controlled by the - * `max.poll.records` setting, the number of records fetched for every partition is somewhere between 0 and + * The queue size at or below which more records are fetched and buffered (per partition). This buffer improves + * throughput and supports varying downstream message processing time, while maintaining some backpressure. Large + * values effectively disable backpressure at the cost of high memory usage, low values will effectively disable + * prefetching in favor of low memory consumption. The number of records that is fetched on every poll is controlled + * by the `max.poll.records` setting, the number of records fetched for every partition is somewhere between 0 and * `max.poll.records`. A value that is a power of 2 offers somewhat better queueing performance. * * The default value for this parameter is 2 * the default `max.poll.records` of 500, rounded to the nearest power of 2. + * + * The value `0` disables pre-fetching. */ final case class QueueSizeBasedFetchStrategy(partitionPreFetchBufferLimit: Int = 1024) extends FetchStrategy { + require( + partitionPreFetchBufferLimit >= 0, + s"partitionPreFetchBufferLimit must be at least 0, got $partitionPreFetchBufferLimit" + ) + override def selectPartitionsToFetch(streams: Chunk[PartitionStream]): ZIO[Any, Nothing, Set[TopicPartition]] = ZIO .foldLeft(streams)(mutable.ArrayBuilder.make[TopicPartition]) { case (acc, stream) => stream.queueSize.map { queueSize => - if (queueSize < partitionPreFetchBufferLimit) acc += stream.tp else acc + if (queueSize <= partitionPreFetchBufferLimit) acc += stream.tp else acc } } .map(_.result().toSet) From 6eb68307af5c694e41c0ec1177917a97319d1bce Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Sun, 29 Oct 2023 14:24:35 +0100 Subject: [PATCH 2/8] Another attempt to fix the test Also: remove parenthesis from `withoutPartitionPreFetching` --- .../zio/kafka/consumer/ConsumerSpec.scala | 30 ++++++++++++------- .../zio/kafka/consumer/ConsumerSettings.scala | 2 +- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index 4d2c4bea6..69c2c8ca8 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -322,33 +322,43 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { } yield assert(offset.map(_.offset))(isSome(equalTo(9L))) }, test("a consumer timeout interrupts the stream and shuts down the consumer") { + // Setup of this test: + // Set the max poll interval very low: a couple of seconds. + // Consumer 1 consumes very slowly; each chunk takes more than the max poll interval. + // Consumer 2 is fast. + // We assert that consumer 1 is interrupted and that consumer 2 is stopped. ZIO.scoped { for { topic1 <- randomTopic topic2 <- randomTopic group <- randomGroup clientId <- randomClient - _ <- ZIO.fromTry(EmbeddedKafka.createCustomTopic(topic1)) + _ <- ZIO.fromTry(EmbeddedKafka.createCustomTopic(topic1, partitions = 1)) _ <- ZIO.fromTry(EmbeddedKafka.createCustomTopic(topic2)) settings <- consumerSettings( clientId = clientId, groupId = Some(group), - maxPollInterval = 1.second, - `max.poll.records` = 2 + maxPollInterval = 2.seconds, + `max.poll.records` = 1 ) - .map(_.withPollTimeout(50.millis).withPartitionPreFetchBufferLimit(0)) + .map(_.withoutPartitionPreFetching) consumer <- Consumer.make(settings) - _ <- scheduledProduce(topic1, Schedule.fixed(50.millis).jittered).runDrain.forkScoped + _ <- scheduledProduce(topic1, Schedule.fixed(100.millis).jittered).runDrain.forkScoped _ <- scheduledProduce(topic2, Schedule.fixed(200.millis).jittered).runDrain.forkScoped // The slow consumer: c1 <- consumer .plainStream(Subscription.topics(topic1), Serde.string, Serde.string) - // The runner for GitHub Actions is a bit underpowered. The machine is so busy that the logic - // that detects the timeout doesn't get the chance to execute quickly enough. To compensate we - // sleep a huge amount of time: - .tap(r => ZIO.sleep(20.seconds).when(r.key == "key3")) + // consumer timeout detection is done per chunk, sleep for some seconds to simulate + // a consumer that is stuck. Note: we only sleep for the very first chunk. + .chunksWith( + _.tap { c => + ZIO.logDebug(s"chunk of ${c.size} elements") *> + ZIO.sleep(4.seconds).when(c.head.key == "key0") + } + ) // Use `take` to ensure the test ends quickly, even when the interrupt fails to occur. - // Because of chunking, we need to pull more than 3 records before the interrupt kicks in. + // Because of a race condition in ZStream.interruptWhen, we need to pull a lot of + // chunks before the interrupt kicks in. .take(100) .runDrain .exit diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala index 73159cd8b..ea5402666 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala @@ -177,7 +177,7 @@ final case class ConsumerSettings( /** * Disables partition record pre-fetching. */ - def withoutPartitionPreFetching(): ConsumerSettings = + def withoutPartitionPreFetching: ConsumerSettings = withPartitionPreFetchBufferLimit(0) @deprecated("Use withPartitionPreFetchBufferLimit instead", "2.6.0") From 00fe6e4190dd2569a19d06b7a036bf2e91f9ec55 Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Sun, 29 Oct 2023 15:34:38 +0100 Subject: [PATCH 3/8] Another attempt to fix the flaky test --- .../zio/kafka/consumer/ConsumerSpec.scala | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index 69c2c8ca8..df76960ae 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -323,10 +323,12 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { }, test("a consumer timeout interrupts the stream and shuts down the consumer") { // Setup of this test: - // Set the max poll interval very low: a couple of seconds. - // Consumer 1 consumes very slowly; each chunk takes more than the max poll interval. - // Consumer 2 is fast. - // We assert that consumer 1 is interrupted and that consumer 2 is stopped. + // - Set the max poll interval very low: a couple of seconds. + // - Continuously produce records so that data is always available. We produce more than the consumers + // can consume due to the max-poll-records and poll-timeout configurations. + // - Consumer 1 consumes very slowly; each chunk takes more than the max poll interval. + // - Consumer 2 is fast. + // - We assert that consumer 1 is interrupted and that consumer 2 is stopped. ZIO.scoped { for { topic1 <- randomTopic @@ -341,10 +343,10 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { maxPollInterval = 2.seconds, `max.poll.records` = 1 ) - .map(_.withoutPartitionPreFetching) + .map(_.withoutPartitionPreFetching.withPollTimeout(1.second)) consumer <- Consumer.make(settings) - _ <- scheduledProduce(topic1, Schedule.fixed(100.millis).jittered).runDrain.forkScoped - _ <- scheduledProduce(topic2, Schedule.fixed(200.millis).jittered).runDrain.forkScoped + _ <- scheduledProduce(topic1, Schedule.fixed(500.millis).jittered).runDrain.forkScoped + _ <- scheduledProduce(topic2, Schedule.fixed(500.millis).jittered).runDrain.forkScoped // The slow consumer: c1 <- consumer .plainStream(Subscription.topics(topic1), Serde.string, Serde.string) @@ -358,8 +360,8 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { ) // Use `take` to ensure the test ends quickly, even when the interrupt fails to occur. // Because of a race condition in ZStream.interruptWhen, we need to pull a lot of - // chunks before the interrupt kicks in. - .take(100) + // chunks before the stream sees that the underlying stream has been interrupted. + .take(200) .runDrain .exit .fork @@ -369,7 +371,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .runDrain .forkScoped c1Exit <- c1.join - subscriptions <- consumer.subscription.delay(100.millis) + subscriptions <- consumer.subscription.delay(500.millis) } yield assertTrue( c1Exit.isFailure, subscriptions.isEmpty From c64a16ef886fc21e031213d1a107acb88d1339bf Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Sun, 29 Oct 2023 15:50:17 +0100 Subject: [PATCH 4/8] Another attempt to fix the flaky test --- .../src/test/scala/zio/kafka/consumer/ConsumerSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index df76960ae..171dc01b7 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -377,7 +377,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { subscriptions.isEmpty ) } - }, + } @@ flaky(3), test("a slow producer doesnot interrupt the stream") { ZIO.scoped { for { From ab3c17d7f3c521c6653d375bd35d6f998a5d9f4d Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Sun, 29 Oct 2023 16:58:32 +0100 Subject: [PATCH 5/8] Another attempt to fix the flaky test --- .../src/test/scala/zio/kafka/consumer/ConsumerSpec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index 171dc01b7..624104691 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -351,11 +351,11 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { c1 <- consumer .plainStream(Subscription.topics(topic1), Serde.string, Serde.string) // consumer timeout detection is done per chunk, sleep for some seconds to simulate - // a consumer that is stuck. Note: we only sleep for the very first chunk. + // a consumer that is stuck. .chunksWith( _.tap { c => ZIO.logDebug(s"chunk of ${c.size} elements") *> - ZIO.sleep(4.seconds).when(c.head.key == "key0") + ZIO.sleep(4.seconds) } ) // Use `take` to ensure the test ends quickly, even when the interrupt fails to occur. From beb1817e3e47067c88b10f2e957dd3840a3296bf Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Sun, 29 Oct 2023 18:27:26 +0100 Subject: [PATCH 6/8] Fix the flaky test --- .../src/test/scala/zio/kafka/consumer/ConsumerSpec.scala | 2 +- .../zio/kafka/consumer/internal/PartitionStreamControl.scala | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index 624104691..f0dcc25a0 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -377,7 +377,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { subscriptions.isEmpty ) } - } @@ flaky(3), + }, test("a slow producer doesnot interrupt the stream") { ZIO.scoped { for { diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala index f1ab25db1..7866e542a 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala @@ -154,7 +154,10 @@ object PartitionStreamControl { dataQueue.takeAll.flatMap(data => if (data.isEmpty) requestAndAwaitData else ZIO.succeed(data)) }.flattenTake .chunksWith(_.tap(records => registerPull(queueInfo, records))) - .interruptWhen(interruptionPromise) + // Due to https://github.com/zio/zio/issues/8515 we cannot use Zstream.interruptWhen. + .mapZIO { chunk => + interruptionPromise.await.whenZIO(interruptionPromise.isDone).as(chunk) + } } yield new PartitionStreamControl( tp, stream, From 82696d5869720decbc3170a650b35c45ab65c392 Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Sun, 29 Oct 2023 19:44:00 +0100 Subject: [PATCH 7/8] Retain chunking structure Also: correct docs about queueing and powers of 2 --- .../test/scala/zio/kafka/consumer/ConsumerSpec.scala | 8 +++----- .../scala/zio/kafka/consumer/ConsumerSettings.scala | 2 +- .../zio/kafka/consumer/fetch/FetchStrategy.scala | 2 +- .../consumer/internal/PartitionStreamControl.scala | 11 +++++------ 4 files changed, 10 insertions(+), 13 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index f0dcc25a0..46bad86a6 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -351,17 +351,15 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { c1 <- consumer .plainStream(Subscription.topics(topic1), Serde.string, Serde.string) // consumer timeout detection is done per chunk, sleep for some seconds to simulate - // a consumer that is stuck. + // a consumer that is stuck (only sleep for the first chunk). .chunksWith( _.tap { c => ZIO.logDebug(s"chunk of ${c.size} elements") *> - ZIO.sleep(4.seconds) + ZIO.sleep(4.seconds).when(c.head.key == "key0") } ) // Use `take` to ensure the test ends quickly, even when the interrupt fails to occur. - // Because of a race condition in ZStream.interruptWhen, we need to pull a lot of - // chunks before the stream sees that the underlying stream has been interrupted. - .take(200) + .take(8) .runDrain .exit .fork diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala index ea5402666..741990f39 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala @@ -164,7 +164,7 @@ final case class ConsumerSettings( * values effectively disable backpressure at the cost of high memory usage, low values will effectively disable * prefetching in favor of low memory consumption. The number of records that is fetched on every poll is controlled * by the `max.poll.records` setting, the number of records fetched for every partition is somewhere between 0 and - * `max.poll.records`. A value that is a power of 2 offers somewhat better queueing performance. + * `max.poll.records`. * * The default value for this parameter is 1024. It is calculated by taking 2 * the default `max.poll.records` of 500, * rounded to the nearest power of 2. diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/FetchStrategy.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/FetchStrategy.scala index 72567687f..1b0f98d82 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/FetchStrategy.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/FetchStrategy.scala @@ -34,7 +34,7 @@ trait FetchStrategy { * values effectively disable backpressure at the cost of high memory usage, low values will effectively disable * prefetching in favor of low memory consumption. The number of records that is fetched on every poll is controlled * by the `max.poll.records` setting, the number of records fetched for every partition is somewhere between 0 and - * `max.poll.records`. A value that is a power of 2 offers somewhat better queueing performance. + * `max.poll.records`. * * The default value for this parameter is 2 * the default `max.poll.records` of 500, rounded to the nearest power of 2. * diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala index 7866e542a..b602521c9 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala @@ -152,12 +152,11 @@ object PartitionStreamControl { // First try to take all records that are available right now. // When no data is available, request more data and await its arrival. dataQueue.takeAll.flatMap(data => if (data.isEmpty) requestAndAwaitData else ZIO.succeed(data)) - }.flattenTake - .chunksWith(_.tap(records => registerPull(queueInfo, records))) - // Due to https://github.com/zio/zio/issues/8515 we cannot use Zstream.interruptWhen. - .mapZIO { chunk => - interruptionPromise.await.whenZIO(interruptionPromise.isDone).as(chunk) - } + }.flattenTake.chunksWith { s => + s.tap(records => registerPull(queueInfo, records)) + // Due to https://github.com/zio/zio/issues/8515 we cannot use Zstream.interruptWhen. + .mapZIO(chunk => interruptionPromise.await.whenZIO(interruptionPromise.isDone).as(chunk)) + } } yield new PartitionStreamControl( tp, stream, From 74ef80c7e0c89d5f5defdc0175c21dd3e29c266e Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Sun, 29 Oct 2023 20:34:53 +0100 Subject: [PATCH 8/8] Attempt to fix flaky test Also: make detection more performant by only fetching nano time once per run-loop --- .../zio/kafka/consumer/ConsumerSpec.scala | 92 +++++++++---------- .../internal/PartitionStreamControl.scala | 13 ++- .../zio/kafka/consumer/internal/Runloop.scala | 4 +- 3 files changed, 53 insertions(+), 56 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index 46bad86a6..cada18e17 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -324,57 +324,53 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { test("a consumer timeout interrupts the stream and shuts down the consumer") { // Setup of this test: // - Set the max poll interval very low: a couple of seconds. - // - Continuously produce records so that data is always available. We produce more than the consumers - // can consume due to the max-poll-records and poll-timeout configurations. + // - Continuously produce records so that data is always available. // - Consumer 1 consumes very slowly; each chunk takes more than the max poll interval. // - Consumer 2 is fast. // - We assert that consumer 1 is interrupted and that consumer 2 is stopped. - ZIO.scoped { - for { - topic1 <- randomTopic - topic2 <- randomTopic - group <- randomGroup - clientId <- randomClient - _ <- ZIO.fromTry(EmbeddedKafka.createCustomTopic(topic1, partitions = 1)) - _ <- ZIO.fromTry(EmbeddedKafka.createCustomTopic(topic2)) - settings <- consumerSettings( - clientId = clientId, - groupId = Some(group), - maxPollInterval = 2.seconds, - `max.poll.records` = 1 - ) - .map(_.withoutPartitionPreFetching.withPollTimeout(1.second)) - consumer <- Consumer.make(settings) - _ <- scheduledProduce(topic1, Schedule.fixed(500.millis).jittered).runDrain.forkScoped - _ <- scheduledProduce(topic2, Schedule.fixed(500.millis).jittered).runDrain.forkScoped - // The slow consumer: - c1 <- consumer - .plainStream(Subscription.topics(topic1), Serde.string, Serde.string) - // consumer timeout detection is done per chunk, sleep for some seconds to simulate - // a consumer that is stuck (only sleep for the first chunk). - .chunksWith( - _.tap { c => - ZIO.logDebug(s"chunk of ${c.size} elements") *> - ZIO.sleep(4.seconds).when(c.head.key == "key0") - } - ) - // Use `take` to ensure the test ends quickly, even when the interrupt fails to occur. - .take(8) - .runDrain - .exit - .fork - // Another consumer: - _ <- consumer - .plainStream(Subscription.topics(topic2), Serde.string, Serde.string) - .runDrain - .forkScoped - c1Exit <- c1.join - subscriptions <- consumer.subscription.delay(500.millis) - } yield assertTrue( - c1Exit.isFailure, - subscriptions.isEmpty - ) - } + for { + topic1 <- randomTopic + topic2 <- randomTopic + group <- randomGroup + clientId <- randomClient + _ <- ZIO.fromTry(EmbeddedKafka.createCustomTopic(topic1, partitions = 1)) + _ <- ZIO.fromTry(EmbeddedKafka.createCustomTopic(topic2)) + settings <- consumerSettings( + clientId = clientId, + groupId = Some(group), + maxPollInterval = 2.seconds, + `max.poll.records` = 2 + ) + .map(_.withoutPartitionPreFetching.withPollTimeout(100.millis)) + consumer <- Consumer.make(settings) + _ <- scheduledProduce(topic1, Schedule.fixed(500.millis).jittered).runDrain.forkScoped + _ <- scheduledProduce(topic2, Schedule.fixed(500.millis).jittered).runDrain.forkScoped + // The slow consumer: + c1 <- consumer + .plainStream(Subscription.topics(topic1), Serde.string, Serde.string) + // Consumer timeout detection is done per chunk. From here on, work per chunk + .chunks + // Sleep for some seconds to simulate a consumer that is stuck (only sleep for the first chunk). + // Because slow consumers are only detected once every run-loop, detection can take many seconds. + .tap { c => + ZIO.sleep(10.seconds).when(c.head.key == "key0") + } + // Use `take` to ensure the test ends quickly, even when the interrupt fails to occur. + .take(8) + .runDrain + .exit + .fork + // Another consumer: + _ <- consumer + .plainStream(Subscription.topics(topic2), Serde.string, Serde.string) + .runDrain + .forkScoped + c1Exit <- c1.join + subscriptions <- consumer.subscription.delay(500.millis) + } yield assertTrue( + c1Exit.isFailure, + subscriptions.isEmpty + ) }, test("a slow producer doesnot interrupt the stream") { ZIO.scoped { diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala index b602521c9..85f58b044 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala @@ -3,7 +3,7 @@ package zio.kafka.consumer.internal import org.apache.kafka.common.TopicPartition import zio.kafka.consumer.Offset import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics } -import zio.kafka.consumer.internal.PartitionStreamControl.QueueInfo +import zio.kafka.consumer.internal.PartitionStreamControl.{ NanoTime, QueueInfo } import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord import zio.stream.{ Take, ZStream } import zio.{ Chunk, Clock, Duration, LogAnnotation, Promise, Queue, Ref, UIO, ZIO } @@ -62,15 +62,14 @@ final class PartitionStreamControl private ( def queueSize: UIO[Int] = queueInfoRef.get.map(_.size) /** + * @param now + * the time as given by `Clock.nanoTime` * @return * `true` when the stream has data available, but none has been pulled for more than `maxPollInterval` (since data * became available), `false` otherwise */ - private[internal] def maxPollIntervalExceeded: UIO[Boolean] = - for { - now <- Clock.nanoTime - queueInfo <- queueInfoRef.get - } yield queueInfo.deadlineExceeded(now) + private[internal] def maxPollIntervalExceeded(now: NanoTime): UIO[Boolean] = + queueInfoRef.get.map(_.deadlineExceeded(now)) /** To be invoked when the partition was lost. */ private[internal] def lost: UIO[Boolean] = @@ -106,7 +105,7 @@ final class PartitionStreamControl private ( object PartitionStreamControl { - private type NanoTime = Long + type NanoTime = Long private[internal] def newPartitionStream( tp: TopicPartition, diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index c7aeb6659..586bacff7 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -405,8 +405,10 @@ private[consumer] final class Runloop private ( */ private def checkStreamPollInterval(streams: Chunk[PartitionStreamControl]): ZIO[Any, Nothing, Unit] = for { + now <- Clock.nanoTime anyExceeded <- ZIO.foldLeft(streams)(false) { case (acc, stream) => - stream.maxPollIntervalExceeded + stream + .maxPollIntervalExceeded(now) .tap(exceeded => if (exceeded) stream.halt else ZIO.unit) .map(acc || _) }