Skip to content

Commit

Permalink
Tests for PartitionStreamControl + fix issue with interruptionPromise (
Browse files Browse the repository at this point in the history
…#1371)

The issue became apparent from the tests. It unfortunately means that
#1251 did not fix the issue of blocking when partitions are lost when
there is no more data in the queue.

* Fix: race should be raceFirst, because interruptionPromise will only
ever fail (relates to #1251)
* Reset queue size on lost
* Abstract requesting data for testing purposes
  • Loading branch information
svroonland authored Nov 10, 2024
1 parent 0f46a92 commit 727e7d7
Show file tree
Hide file tree
Showing 3 changed files with 200 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package zio.kafka.consumer.internal

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import zio._
import zio.kafka.consumer.CommittableRecord
import zio.kafka.consumer.diagnostics.Diagnostics
import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord
import zio.test._

import java.util.concurrent.TimeoutException

object PartitionStreamControlSpec extends ZIOSpecDefault {
override def spec: Spec[Any, Throwable] = suite("PartitionStreamControl")(
suite("Queue info")(
test("offerRecords updates queue size correctly") {
for {
control <- createTestControl
records = createTestRecords(3)
_ <- control.offerRecords(records)
size <- control.queueSize
} yield assertTrue(size == 3)
},
test("empty offerRecords updates outstandingPolls") {
for {
control <- createTestControl
_ <- control.offerRecords(Chunk.empty)
_ <- control.offerRecords(Chunk.empty)
polls <- control.outstandingPolls
} yield assertTrue(polls == 2)
},
test("multiple offers accumulate correctly") {
for {
control <- createTestControl
_ <- control.offerRecords(createTestRecords(2))
_ <- control.offerRecords(createTestRecords(3))
size <- control.queueSize
} yield assertTrue(size == 5)
}
),
// Stream Control Tests
suite("Stream control")(
test("offered records end up in the stream") {
for {
control <- createTestControl
records = createTestRecords(3)
_ <- control.offerRecords(records)
stream = control.stream
pulledRecords <- stream.take(3).runCollect
} yield assertTrue(records == pulledRecords)
},
test("end will end the stream") {
for {
control <- createTestControl
records = createTestRecords(3)
_ <- control.offerRecords(records)
_ <- control.end
pulledRecords <- control.stream.runCollect
} yield assertTrue(records == pulledRecords)
},
test("halt completes the stream with a TimeoutException") {
for {
control <- createTestControl
_ <- control.halt
result <- control.stream.runCollect.exit
} yield assertTrue(result.isFailure, result.causeOrNull.squash.isInstanceOf[TimeoutException])
},
test("lost clears queue and ends stream") {
for {
control <- createTestControl
_ <- control.offerRecords(createTestRecords(5))
_ <- control.lost
_ <- control.stream.runCollect
size <- control.queueSize
completed <- control.isCompleted
} yield assertTrue(size == 0, completed)
},
test("finalizing the stream will set isCompleted") {
for {
control <- createTestControl
initialIsCompleted <- control.isCompleted
records = createTestRecords(3)
_ <- control.offerRecords(records)
_ <- control.end
_ <- control.stream.runCollect
finalIsCompleted <- control.isCompleted
} yield assertTrue(!initialIsCompleted, finalIsCompleted)
},
test("pulling from the stream will request data") {
ZIO.scoped {
for {
requested <- Promise.make[Nothing, Unit]
control <- createTestControlWithRequestData(requested.succeed(()))
_ <- control.stream.runCollect.forkScoped
_ <- requested.await
} yield assertCompletes
}
},
test("pulling from the stream when there are records in the queue will request additional data") {
ZIO.scoped {
for {
requested <- Promise.make[Nothing, Unit]
control <- createTestControlWithRequestData(requested.succeed(()))
records = createTestRecords(3)
_ <- control.offerRecords(records)
_ <- control.stream.runCollect.forkScoped
_ <- requested.await
} yield assertCompletes
}
}
),
suite("Poll deadline")(
test("maxPollIntervalExceeded returns false initially") {
for {
control <- createTestControl
now <- Clock.nanoTime
exceeded <- control.maxPollIntervalExceeded(now)
} yield assertTrue(!exceeded)
},
test("maxPollIntervalExceeded returns true after timeout") {
for {
control <- createTestControl
_ <- control.offerRecords(createTestRecords(1))
now <- Clock.nanoTime
futureTime = now + Duration.fromSeconds(31).toNanos
exceeded <- control.maxPollIntervalExceeded(futureTime)
} yield assertTrue(exceeded)
}
),
suite("Offset Tracking")(
test("lastPulledOffset updates correctly after each pull") {
for {
control <- createTestControl
records = createTestRecords(6)
_ <- control.offerRecords(records.take(3))
offerNextBatch = control.offerRecords(records.slice(3, 6))

offsetsAfterChunks <- Ref.make(Chunk.empty[Option[Long]])
_ <- {
def updateLastPulledOffsets =
control.lastPulledOffset.flatMap(offset => offsetsAfterChunks.update(_ :+ offset.map(_.offset)))

updateLastPulledOffsets *> control.stream
.mapChunksZIO(updateLastPulledOffsets *> offerNextBatch.as(_))
.take(6)
.runCollect
}
lastPulledOffsets <- offsetsAfterChunks.get
} yield assertTrue(lastPulledOffsets == Chunk(None, Some(3L), Some(6L)))
}
)
) @@ TestAspect.withLiveClock @@ TestAspect.timeout(1.minute)

private def createTestControl: ZIO[Any, Nothing, PartitionStreamControl] =
createTestControlWithRequestData(ZIO.unit)

private def createTestControlWithRequestData(requestData: UIO[Any]): ZIO[Any, Nothing, PartitionStreamControl] = {
val tp = new TopicPartition("test-topic", 0)
val diagnostics = Diagnostics.NoOp
PartitionStreamControl.newPartitionStream(
tp,
requestData.unit,
diagnostics,
Duration.fromSeconds(30)
)
}

private def createTestRecords(count: Int): Chunk[ByteArrayCommittableRecord] =
Chunk.fromIterable(
(1 to count).map(i =>
CommittableRecord(
record = new ConsumerRecord[Array[Byte], Array[Byte]](
"test-topic",
0,
i.toLong,
Array[Byte](),
Array[Byte]()
),
commitHandle = _ => ZIO.unit,
consumerGroupMetadata = None
)
)
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ abstract class PartitionStream {
*/
final class PartitionStreamControl private (
val tp: TopicPartition,
stream: ZStream[Any, Throwable, ByteArrayCommittableRecord],
val stream: ZStream[Any, Throwable, ByteArrayCommittableRecord],
dataQueue: Queue[Take[Throwable, ByteArrayCommittableRecord]],
interruptionPromise: Promise[Throwable, Nothing],
val completedPromise: Promise[Nothing, Option[Offset]],
Expand Down Expand Up @@ -85,12 +85,12 @@ final class PartitionStreamControl private (
queueInfoRef.get.map(_.deadlineExceeded(now))

/** To be invoked when the stream is no longer processing. */
private[internal] def halt: UIO[Boolean] = {
private[internal] def halt: UIO[Unit] = {
val timeOutMessage = s"No records were polled for more than $maxPollInterval for topic partition $tp. " +
"Use ConsumerSettings.withMaxPollInterval to set a longer interval if processing a batch of records " +
"needs more time."
val consumeTimeout = new TimeoutException(timeOutMessage) with NoStackTrace
interruptionPromise.fail(consumeTimeout)
interruptionPromise.fail(consumeTimeout).unit
}

/** To be invoked when the partition was lost. It clears the queue and ends the stream. */
Expand All @@ -100,6 +100,7 @@ final class PartitionStreamControl private (
_ <- ZIO.logDebug(s"Partition ${tp.toString} lost")
taken <- dataQueue.takeAll.map(_.size)
_ <- dataQueue.offer(Take.end)
_ <- queueInfoRef.update(_.withQueueClearedOnLost)
_ <- ZIO.logDebug(s"Ignored ${taken} records on lost partition").when(taken != 0)
} yield ()
}
Expand Down Expand Up @@ -127,7 +128,7 @@ object PartitionStreamControl {

private[internal] def newPartitionStream(
tp: TopicPartition,
commandQueue: Queue[RunloopCommand],
requestData: UIO[Unit],
diagnostics: Diagnostics,
maxPollInterval: Duration
): UIO[PartitionStreamControl] = {
Expand All @@ -149,11 +150,11 @@ object PartitionStreamControl {
queueInfo <- Ref.make(QueueInfo(now, 0, None, 0))
requestAndAwaitData =
for {
_ <- commandQueue.offer(RunloopCommand.Request(tp))
_ <- requestData
_ <- diagnostics.emit(DiagnosticEvent.Request(tp))
taken <- dataQueue
.takeBetween(1, Int.MaxValue)
.race(interruptionPromise.await)
.raceFirst(interruptionPromise.await)
} yield taken

stream = ZStream.logAnnotate(
Expand Down Expand Up @@ -216,6 +217,9 @@ object PartitionStreamControl {
outstandingPolls = 0
)

def withQueueClearedOnLost: QueueInfo =
copy(size = 0)

def deadlineExceeded(now: NanoTime): Boolean =
size > 0 && pullDeadline <= now
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,12 @@ private[consumer] final class Runloop private (
private val consumerMetrics = new ZioConsumerMetrics(settings.metricLabels)

private def newPartitionStream(tp: TopicPartition): UIO[PartitionStreamControl] =
PartitionStreamControl.newPartitionStream(tp, commandQueue, diagnostics, maxPollInterval)
PartitionStreamControl.newPartitionStream(
tp,
commandQueue.offer(RunloopCommand.Request(tp)).unit,
diagnostics,
maxPollInterval
)

def stopConsumption: UIO[Unit] =
ZIO.logDebug("stopConsumption called") *>
Expand Down

0 comments on commit 727e7d7

Please sign in to comment.