diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/fetch/ManyPartitionsQueueSizeBasedFetchStrategySpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/fetch/ManyPartitionsQueueSizeBasedFetchStrategySpec.scala new file mode 100644 index 0000000000..55813da67c --- /dev/null +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/fetch/ManyPartitionsQueueSizeBasedFetchStrategySpec.scala @@ -0,0 +1,87 @@ +package zio.kafka.consumer.fetch + +import org.apache.kafka.common.TopicPartition +import zio.kafka.ZIOSpecDefaultSlf4j +import zio.kafka.consumer.internal.PartitionStreamControlLike +import zio.test.{ assertTrue, Spec, TestEnvironment } +import zio.{ Chunk, Scope, UIO, ZIO } + +object ManyPartitionsQueueSizeBasedFetchStrategySpec extends ZIOSpecDefaultSlf4j { + + private val maxPartitionQueueSize = 50 + private val fetchStrategy = ManyPartitionsQueueSizeBasedFetchStrategy( + maxPartitionQueueSize, + maxTotalQueueSize = 80 + ) + + private val tp10 = new TopicPartition("topic1", 0) + private val tp11 = new TopicPartition("topic1", 1) + private val tp20 = new TopicPartition("topic2", 0) + private val tp21 = new TopicPartition("topic2", 1) + private val tp22 = new TopicPartition("topic2", 2) + + override def spec: Spec[TestEnvironment with Scope, Any] = + suite("ManyPartitionsQueueSizeBasedFetchStrategySpec")( + test("stream with queue size above maxSize is paused") { + val streams = Chunk(newStream(tp10, currentQueueSize = 100)) + for { + result <- fetchStrategy.selectPartitionsToFetch(streams) + } yield assertTrue(result.isEmpty) + }, + test("stream with queue size below maxSize may resume when less-equal global max") { + val streams = Chunk(newStream(tp10, currentQueueSize = 10)) + for { + result <- fetchStrategy.selectPartitionsToFetch(streams) + } yield assertTrue(result == Set(tp10)) + }, + test("all streams with queue size less-equal maxSize may resume when total is less-equal global max") { + val streams = Chunk( + newStream(tp10, currentQueueSize = maxPartitionQueueSize), + newStream(tp11, currentQueueSize = 10), + newStream(tp20, currentQueueSize = 10), + newStream(tp21, currentQueueSize = 10) + ) + for { + result <- fetchStrategy.selectPartitionsToFetch(streams) + } yield assertTrue(result == Set(tp10, tp11, tp20, tp21)) + }, + test("not all streams with queue size less-equal maxSize may resume when total is less-equal global max") { + val streams = Chunk( + newStream(tp10, currentQueueSize = 40), + newStream(tp11, currentQueueSize = 40), + newStream(tp20, currentQueueSize = 40), + newStream(tp21, currentQueueSize = 40) + ) + for { + result <- fetchStrategy.selectPartitionsToFetch(streams) + } yield assertTrue(result.size == 2) + }, + test("all streams with queue size less-equal maxSize may resume eventually") { + val streams = Chunk( + newStream(tp10, currentQueueSize = 60), + newStream(tp11, currentQueueSize = 60), + newStream(tp20, currentQueueSize = 40), + newStream(tp21, currentQueueSize = 40), + newStream(tp22, currentQueueSize = 40) + ) + for { + result1 <- fetchStrategy.selectPartitionsToFetch(streams) + result2 <- fetchStrategy.selectPartitionsToFetch(streams) + result3 <- fetchStrategy.selectPartitionsToFetch(streams) + result4 <- fetchStrategy.selectPartitionsToFetch(streams) + result5 <- fetchStrategy.selectPartitionsToFetch(streams) + result = Chunk(result1, result2, result3, result4, result5) + } yield assertTrue( + result.forall(_.size == 2), + result.forall(_.forall(_.topic() == "topic2")), + result.flatten.toSet.size == 3 + ) + } + ) + + private def newStream(topicPartition: TopicPartition, currentQueueSize: Int): PartitionStreamControlLike = + new PartitionStreamControlLike { + override def tp: TopicPartition = topicPartition + override def queueSize: UIO[Int] = ZIO.succeed(currentQueueSize) + } +} diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/fetch/QueueSizeBasedFetchStrategySpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/fetch/QueueSizeBasedFetchStrategySpec.scala new file mode 100644 index 0000000000..f06958a25a --- /dev/null +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/fetch/QueueSizeBasedFetchStrategySpec.scala @@ -0,0 +1,49 @@ +package zio.kafka.consumer.fetch + +import org.apache.kafka.common.TopicPartition +import zio.kafka.ZIOSpecDefaultSlf4j +import zio.kafka.consumer.internal.PartitionStreamControlLike +import zio.test.{ assertTrue, Spec, TestEnvironment } +import zio.{ Chunk, Scope, UIO, ZIO } + +object QueueSizeBasedFetchStrategySpec extends ZIOSpecDefaultSlf4j { + + private val maxPartitionQueueSize = 50 + private val fetchStrategy = QueueSizeBasedFetchStrategy(maxPartitionQueueSize) + + private val tp10 = new TopicPartition("topic1", 0) + private val tp11 = new TopicPartition("topic1", 1) + private val tp20 = new TopicPartition("topic2", 0) + + override def spec: Spec[TestEnvironment with Scope, Any] = + suite("QueueSizeBasedFetchStrategySpec")( + test("stream with queue size above maxSize is paused") { + val streams = Chunk(newStream(tp10, currentQueueSize = 100)) + for { + result <- fetchStrategy.selectPartitionsToFetch(streams) + } yield assertTrue(result.isEmpty) + }, + test("stream with queue size less-equal maxSize may resume") { + val streams = Chunk(newStream(tp10, currentQueueSize = 10)) + for { + result <- fetchStrategy.selectPartitionsToFetch(streams) + } yield assertTrue(result == Set(tp10)) + }, + test("some streams may resume") { + val streams = Chunk( + newStream(tp10, currentQueueSize = 10), + newStream(tp11, currentQueueSize = maxPartitionQueueSize), + newStream(tp20, currentQueueSize = 100) + ) + for { + result <- fetchStrategy.selectPartitionsToFetch(streams) + } yield assertTrue(result == Set(tp10, tp11)) + } + ) + + private def newStream(topicPartition: TopicPartition, currentQueueSize: Int): PartitionStreamControlLike = + new PartitionStreamControlLike { + override def tp: TopicPartition = topicPartition + override def queueSize: UIO[Int] = ZIO.succeed(currentQueueSize) + } +} diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/FetchStrategy.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/FetchStrategy.scala index f208d169a4..79d02c8a8c 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/FetchStrategy.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/FetchStrategy.scala @@ -1,7 +1,7 @@ package zio.kafka.consumer.fetch import org.apache.kafka.common.TopicPartition -import zio.kafka.consumer.internal.PartitionStreamControl +import zio.kafka.consumer.internal.PartitionStreamControlLike import zio.{ Chunk, ZIO } import scala.collection.mutable @@ -21,7 +21,7 @@ trait FetchStrategy { * @return * the partitions that may fetch in the next poll */ - def selectPartitionsToFetch(streams: Chunk[PartitionStreamControl]): ZIO[Any, Nothing, Set[TopicPartition]] + def selectPartitionsToFetch(streams: Chunk[PartitionStreamControlLike]): ZIO[Any, Nothing, Set[TopicPartition]] } /** @@ -38,11 +38,13 @@ trait FetchStrategy { * The default value for this parameter is 2 * the default `max.poll.records` of 500, rounded to the nearest power of 2. */ final case class QueueSizeBasedFetchStrategy(maxPartitionQueueSize: Int = 1024) extends FetchStrategy { - override def selectPartitionsToFetch(streams: Chunk[PartitionStreamControl]): ZIO[Any, Nothing, Set[TopicPartition]] = + override def selectPartitionsToFetch( + streams: Chunk[PartitionStreamControlLike] + ): ZIO[Any, Nothing, Set[TopicPartition]] = ZIO .foldLeft(streams)(mutable.ArrayBuilder.make[TopicPartition]) { case (acc, stream) => stream.queueSize.map { queueSize => - if (queueSize < maxPartitionQueueSize) acc += stream.tp else acc + if (queueSize <= maxPartitionQueueSize) acc += stream.tp else acc } } .map(_.result().toSet) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/ManyPartitionsQueueSizeBasedFetchStrategy.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/ManyPartitionsQueueSizeBasedFetchStrategy.scala index 1a1c20962c..cdd43c5245 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/ManyPartitionsQueueSizeBasedFetchStrategy.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/ManyPartitionsQueueSizeBasedFetchStrategy.scala @@ -2,7 +2,7 @@ package zio.kafka.consumer.fetch import org.apache.kafka.common.TopicPartition import zio.{ Chunk, ZIO } -import zio.kafka.consumer.internal.PartitionStreamControl +import zio.kafka.consumer.internal.PartitionStreamControlLike import scala.collection.mutable @@ -32,7 +32,7 @@ final case class ManyPartitionsQueueSizeBasedFetchStrategy( maxTotalQueueSize: Int = 20480 ) extends FetchStrategy { override def selectPartitionsToFetch( - streams: Chunk[PartitionStreamControl] + streams: Chunk[PartitionStreamControlLike] ): ZIO[Any, Nothing, Set[TopicPartition]] = { // By shuffling the streams we prevent read-starvation for streams at the end of the list. val shuffledStreams = scala.util.Random.shuffle(streams) @@ -40,7 +40,7 @@ final case class ManyPartitionsQueueSizeBasedFetchStrategy( .foldLeft(shuffledStreams)((mutable.ArrayBuilder.make[TopicPartition], maxTotalQueueSize)) { case (acc @ (partitions, queueBudget), stream) => stream.queueSize.map { queueSize => - if (queueSize < maxPartitionQueueSize && queueSize < queueBudget) { + if (queueSize <= maxPartitionQueueSize && queueSize <= queueBudget) { (partitions += stream.tp, queueBudget - queueSize) } else acc } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/PredictiveFetchStrategy.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/PredictiveFetchStrategy.scala index 9304e2beac..980cb3adfd 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/PredictiveFetchStrategy.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/PredictiveFetchStrategy.scala @@ -1,7 +1,7 @@ package zio.kafka.consumer.fetch import org.apache.kafka.common.TopicPartition -import zio.kafka.consumer.internal.PartitionStreamControl +import zio.kafka.consumer.internal.PartitionStreamControlLike import zio.{ Chunk, ZIO } import scala.collection.mutable @@ -26,9 +26,11 @@ final class PredictiveFetchStrategy(maxEstimatedPollCountsToFetch: Int = 1) exte require(maxEstimatedPollCountsToFetch >= 1, s"`pollCount` must be at least 1, got $maxEstimatedPollCountsToFetch") private val CleanupPollCount = 10 private var cleanupCountDown = CleanupPollCount - private val pollHistories = mutable.Map.empty[PartitionStreamControl, PollHistory] + private val pollHistories = mutable.Map.empty[PartitionStreamControlLike, PollHistory] - override def selectPartitionsToFetch(streams: Chunk[PartitionStreamControl]): ZIO[Any, Nothing, Set[TopicPartition]] = + override def selectPartitionsToFetch( + streams: Chunk[PartitionStreamControlLike] + ): ZIO[Any, Nothing, Set[TopicPartition]] = ZIO.succeed { if (cleanupCountDown == 0) { pollHistories --= (pollHistories.keySet.toSet -- streams) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala index 4928f7d685..e119ead790 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala @@ -6,6 +6,11 @@ import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord import zio.stream.{ Take, ZStream } import zio.{ Chunk, LogAnnotation, Promise, Queue, Ref, UIO, ZIO } +trait PartitionStreamControlLike { + def tp: TopicPartition + def queueSize: UIO[Int] +} + final class PartitionStreamControl private ( val tp: TopicPartition, stream: ZStream[Any, Throwable, ByteArrayCommittableRecord], @@ -13,7 +18,7 @@ final class PartitionStreamControl private ( interruptionPromise: Promise[Throwable, Unit], completedPromise: Promise[Nothing, Unit], queueSizeRef: Ref[Int] -) { +) extends PartitionStreamControlLike { private val logAnnotate = ZIO.logAnnotate( LogAnnotation("topic", tp.topic()),