Skip to content

Commit

Permalink
Add unit tests for QueueSizeBasedFetchStrategySpec and ManyPartitions…
Browse files Browse the repository at this point in the history
…QueueSizeBasedFetchStrategySpec
  • Loading branch information
erikvanoosten committed Aug 9, 2023
1 parent 733a9aa commit deb81f3
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package zio.kafka.consumer.fetch

import org.apache.kafka.common.TopicPartition
import zio.kafka.ZIOSpecDefaultSlf4j
import zio.kafka.consumer.internal.PartitionStreamControlLike
import zio.test.{ assertTrue, Spec, TestEnvironment }
import zio.{ Chunk, Scope, UIO, ZIO }

object ManyPartitionsQueueSizeBasedFetchStrategySpec extends ZIOSpecDefaultSlf4j {

private val maxPartitionQueueSize = 50
private val fetchStrategy = ManyPartitionsQueueSizeBasedFetchStrategy(
maxPartitionQueueSize,
maxTotalQueueSize = 80
)

private val tp10 = new TopicPartition("topic1", 0)
private val tp11 = new TopicPartition("topic1", 1)
private val tp20 = new TopicPartition("topic2", 0)
private val tp21 = new TopicPartition("topic2", 1)
private val tp22 = new TopicPartition("topic2", 2)

override def spec: Spec[TestEnvironment with Scope, Any] =
suite("ManyPartitionsQueueSizeBasedFetchStrategySpec")(
test("stream with queue size above maxSize is paused") {
val streams = Chunk(newStream(tp10, currentQueueSize = 100))
for {
result <- fetchStrategy.selectPartitionsToFetch(streams)
} yield assertTrue(result.isEmpty)
},
test("stream with queue size below maxSize may resume when less-equal global max") {
val streams = Chunk(newStream(tp10, currentQueueSize = 10))
for {
result <- fetchStrategy.selectPartitionsToFetch(streams)
} yield assertTrue(result == Set(tp10))
},
test("all streams with queue size less-equal maxSize may resume when total is less-equal global max") {
val streams = Chunk(
newStream(tp10, currentQueueSize = maxPartitionQueueSize),
newStream(tp11, currentQueueSize = 10),
newStream(tp20, currentQueueSize = 10),
newStream(tp21, currentQueueSize = 10)
)
for {
result <- fetchStrategy.selectPartitionsToFetch(streams)
} yield assertTrue(result == Set(tp10, tp11, tp20, tp21))
},
test("not all streams with queue size less-equal maxSize may resume when total is less-equal global max") {
val streams = Chunk(
newStream(tp10, currentQueueSize = 40),
newStream(tp11, currentQueueSize = 40),
newStream(tp20, currentQueueSize = 40),
newStream(tp21, currentQueueSize = 40)
)
for {
result <- fetchStrategy.selectPartitionsToFetch(streams)
} yield assertTrue(result.size == 2)
},
test("all streams with queue size less-equal maxSize may resume eventually") {
val streams = Chunk(
newStream(tp10, currentQueueSize = 60),
newStream(tp11, currentQueueSize = 60),
newStream(tp20, currentQueueSize = 40),
newStream(tp21, currentQueueSize = 40),
newStream(tp22, currentQueueSize = 40)
)
for {
result1 <- fetchStrategy.selectPartitionsToFetch(streams)
result2 <- fetchStrategy.selectPartitionsToFetch(streams)
result3 <- fetchStrategy.selectPartitionsToFetch(streams)
result4 <- fetchStrategy.selectPartitionsToFetch(streams)
result5 <- fetchStrategy.selectPartitionsToFetch(streams)
result = Chunk(result1, result2, result3, result4, result5)
} yield assertTrue(
result.forall(_.size == 2),
result.forall(_.forall(_.topic() == "topic2")),
result.flatten.toSet.size == 3
)
}
)

private def newStream(topicPartition: TopicPartition, currentQueueSize: Int): PartitionStreamControlLike =
new PartitionStreamControlLike {
override def tp: TopicPartition = topicPartition
override def queueSize: UIO[Int] = ZIO.succeed(currentQueueSize)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package zio.kafka.consumer.fetch

import org.apache.kafka.common.TopicPartition
import zio.kafka.ZIOSpecDefaultSlf4j
import zio.kafka.consumer.internal.PartitionStreamControlLike
import zio.test.{ assertTrue, Spec, TestEnvironment }
import zio.{ Chunk, Scope, UIO, ZIO }

object QueueSizeBasedFetchStrategySpec extends ZIOSpecDefaultSlf4j {

private val maxPartitionQueueSize = 50
private val fetchStrategy = QueueSizeBasedFetchStrategy(maxPartitionQueueSize)

private val tp10 = new TopicPartition("topic1", 0)
private val tp11 = new TopicPartition("topic1", 1)
private val tp20 = new TopicPartition("topic2", 0)

override def spec: Spec[TestEnvironment with Scope, Any] =
suite("QueueSizeBasedFetchStrategySpec")(
test("stream with queue size above maxSize is paused") {
val streams = Chunk(newStream(tp10, currentQueueSize = 100))
for {
result <- fetchStrategy.selectPartitionsToFetch(streams)
} yield assertTrue(result.isEmpty)
},
test("stream with queue size less-equal maxSize may resume") {
val streams = Chunk(newStream(tp10, currentQueueSize = 10))
for {
result <- fetchStrategy.selectPartitionsToFetch(streams)
} yield assertTrue(result == Set(tp10))
},
test("some streams may resume") {
val streams = Chunk(
newStream(tp10, currentQueueSize = 10),
newStream(tp11, currentQueueSize = maxPartitionQueueSize),
newStream(tp20, currentQueueSize = 100)
)
for {
result <- fetchStrategy.selectPartitionsToFetch(streams)
} yield assertTrue(result == Set(tp10, tp11))
}
)

private def newStream(topicPartition: TopicPartition, currentQueueSize: Int): PartitionStreamControlLike =
new PartitionStreamControlLike {
override def tp: TopicPartition = topicPartition
override def queueSize: UIO[Int] = ZIO.succeed(currentQueueSize)
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package zio.kafka.consumer.fetch

import org.apache.kafka.common.TopicPartition
import zio.kafka.consumer.internal.PartitionStreamControl
import zio.kafka.consumer.internal.PartitionStreamControlLike
import zio.{ Chunk, ZIO }

import scala.collection.mutable
Expand All @@ -21,7 +21,7 @@ trait FetchStrategy {
* @return
* the partitions that may fetch in the next poll
*/
def selectPartitionsToFetch(streams: Chunk[PartitionStreamControl]): ZIO[Any, Nothing, Set[TopicPartition]]
def selectPartitionsToFetch(streams: Chunk[PartitionStreamControlLike]): ZIO[Any, Nothing, Set[TopicPartition]]
}

/**
Expand All @@ -38,11 +38,13 @@ trait FetchStrategy {
* The default value for this parameter is 2 * the default `max.poll.records` of 500, rounded to the nearest power of 2.
*/
final case class QueueSizeBasedFetchStrategy(maxPartitionQueueSize: Int = 1024) extends FetchStrategy {
override def selectPartitionsToFetch(streams: Chunk[PartitionStreamControl]): ZIO[Any, Nothing, Set[TopicPartition]] =
override def selectPartitionsToFetch(
streams: Chunk[PartitionStreamControlLike]
): ZIO[Any, Nothing, Set[TopicPartition]] =
ZIO
.foldLeft(streams)(mutable.ArrayBuilder.make[TopicPartition]) { case (acc, stream) =>
stream.queueSize.map { queueSize =>
if (queueSize < maxPartitionQueueSize) acc += stream.tp else acc
if (queueSize <= maxPartitionQueueSize) acc += stream.tp else acc
}
}
.map(_.result().toSet)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package zio.kafka.consumer.fetch

import org.apache.kafka.common.TopicPartition
import zio.{ Chunk, ZIO }
import zio.kafka.consumer.internal.PartitionStreamControl
import zio.kafka.consumer.internal.PartitionStreamControlLike

import scala.collection.mutable

Expand Down Expand Up @@ -32,15 +32,15 @@ final case class ManyPartitionsQueueSizeBasedFetchStrategy(
maxTotalQueueSize: Int = 20480
) extends FetchStrategy {
override def selectPartitionsToFetch(
streams: Chunk[PartitionStreamControl]
streams: Chunk[PartitionStreamControlLike]
): ZIO[Any, Nothing, Set[TopicPartition]] = {
// By shuffling the streams we prevent read-starvation for streams at the end of the list.
val shuffledStreams = scala.util.Random.shuffle(streams)
ZIO
.foldLeft(shuffledStreams)((mutable.ArrayBuilder.make[TopicPartition], maxTotalQueueSize)) {
case (acc @ (partitions, queueBudget), stream) =>
stream.queueSize.map { queueSize =>
if (queueSize < maxPartitionQueueSize && queueSize < queueBudget) {
if (queueSize <= maxPartitionQueueSize && queueSize <= queueBudget) {
(partitions += stream.tp, queueBudget - queueSize)
} else acc
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package zio.kafka.consumer.fetch

import org.apache.kafka.common.TopicPartition
import zio.kafka.consumer.internal.PartitionStreamControl
import zio.kafka.consumer.internal.PartitionStreamControlLike
import zio.{ Chunk, ZIO }

import scala.collection.mutable
Expand All @@ -26,9 +26,11 @@ final class PredictiveFetchStrategy(maxEstimatedPollCountsToFetch: Int = 1) exte
require(maxEstimatedPollCountsToFetch >= 1, s"`pollCount` must be at least 1, got $maxEstimatedPollCountsToFetch")
private val CleanupPollCount = 10
private var cleanupCountDown = CleanupPollCount
private val pollHistories = mutable.Map.empty[PartitionStreamControl, PollHistory]
private val pollHistories = mutable.Map.empty[PartitionStreamControlLike, PollHistory]

override def selectPartitionsToFetch(streams: Chunk[PartitionStreamControl]): ZIO[Any, Nothing, Set[TopicPartition]] =
override def selectPartitionsToFetch(
streams: Chunk[PartitionStreamControlLike]
): ZIO[Any, Nothing, Set[TopicPartition]] =
ZIO.succeed {
if (cleanupCountDown == 0) {
pollHistories --= (pollHistories.keySet.toSet -- streams)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,19 @@ import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord
import zio.stream.{ Take, ZStream }
import zio.{ Chunk, LogAnnotation, Promise, Queue, Ref, UIO, ZIO }

trait PartitionStreamControlLike {
def tp: TopicPartition
def queueSize: UIO[Int]
}

final class PartitionStreamControl private (
val tp: TopicPartition,
stream: ZStream[Any, Throwable, ByteArrayCommittableRecord],
dataQueue: Queue[Take[Throwable, ByteArrayCommittableRecord]],
interruptionPromise: Promise[Throwable, Unit],
completedPromise: Promise[Nothing, Unit],
queueSizeRef: Ref[Int]
) {
) extends PartitionStreamControlLike {

private val logAnnotate = ZIO.logAnnotate(
LogAnnotation("topic", tp.topic()),
Expand Down

0 comments on commit deb81f3

Please sign in to comment.