From f4a95c237542480b02d356c57c6d6e212c5bced9 Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Sun, 15 Oct 2023 14:05:04 +0200 Subject: [PATCH] Fix flaky test (#1082) Also: switch to nano clock to avoid the time jumps that come with the regular system clock. --- .../zio/kafka/consumer/ConsumerSpec.scala | 14 ++++++---- .../internal/PartitionStreamControl.scala | 28 +++++++++++-------- 2 files changed, 24 insertions(+), 18 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index 04335c09f..0d272b858 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -333,17 +333,19 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { settings <- consumerSettings( clientId = clientId, groupId = Some(group), - maxPollInterval = 1.second + maxPollInterval = 1.second, + `max.poll.records` = 2 ) - consumer <- Consumer.make(settings.withPollTimeout(500.millis)) - _ <- scheduledProduce(topic1, Schedule.fixed(100.millis).jittered).runDrain.forkScoped - _ <- scheduledProduce(topic2, Schedule.fixed(100.millis).jittered).runDrain.forkScoped + consumer <- Consumer.make(settings.withPollTimeout(100.millis)) + _ <- scheduledProduce(topic1, Schedule.fixed(50.millis).jittered).runDrain.forkScoped + _ <- scheduledProduce(topic2, Schedule.fixed(200.millis).jittered).runDrain.forkScoped // The slow consumer: c1 <- consumer .plainStream(Subscription.topics(topic1), Serde.string, Serde.string) - .rechunk(5) // Time out detection is at the chunk level. We need at least 2 chunks. .tap(r => ZIO.sleep(5.seconds).when(r.key == "key3")) - .take(100) // Because of chunking, we need to pull a bit more before the interrupt kicks in. + // Use `take` to ensure the test ends quickly, even when the interrupt fails to occur. + // Because of chunking, we need to pull more than 3 records before the interrupt kicks in. + .take(100) .runDrain .exit .fork diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala index f61c521c8..8338427d1 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala @@ -7,9 +7,7 @@ import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord import zio.stream.{ Take, ZStream } import zio.{ Chunk, Clock, Duration, LogAnnotation, Promise, Queue, Ref, UIO, ZIO } -import java.time.Instant import java.util.concurrent.TimeoutException -import scala.math.Ordered.orderingToOrdered import scala.util.control.NoStackTrace final class PartitionStreamControl private ( @@ -21,6 +19,8 @@ final class PartitionStreamControl private ( queueInfoRef: Ref[QueueInfo], maxPollInterval: Duration ) { + private val maxPollIntervalNanos = maxPollInterval.toNanos + private val logAnnotate = ZIO.logAnnotate( LogAnnotation("topic", tp.topic()), LogAnnotation("partition", tp.partition().toString) @@ -29,8 +29,8 @@ final class PartitionStreamControl private ( /** Offer new data for the stream to process. */ private[internal] def offerRecords(data: Chunk[ByteArrayCommittableRecord]): ZIO[Any, Nothing, Unit] = for { - now <- Clock.instant - newPullDeadline = now.plus(maxPollInterval) + now <- Clock.nanoTime + newPullDeadline = now + maxPollIntervalNanos _ <- queueInfoRef.update(_.withOffer(newPullDeadline, data.size)) _ <- dataQueue.offer(Take.chunk(data)) } yield () @@ -44,7 +44,7 @@ final class PartitionStreamControl private ( */ private[internal] def maxPollIntervalExceeded: UIO[Boolean] = for { - now <- Clock.instant + now <- Clock.nanoTime queueInfo <- queueInfoRef.get } yield queueInfo.deadlineExceeded(now) @@ -82,16 +82,20 @@ final class PartitionStreamControl private ( object PartitionStreamControl { + private type NanoTime = Long + private[internal] def newPartitionStream( tp: TopicPartition, commandQueue: Queue[RunloopCommand], diagnostics: Diagnostics, maxPollInterval: Duration ): UIO[PartitionStreamControl] = { + val maxPollIntervalNanos = maxPollInterval.toNanos + def registerPull(queueInfo: Ref[QueueInfo], recordCount: Int): UIO[Unit] = for { - now <- Clock.instant - newPullDeadline = now.plus(maxPollInterval) + now <- Clock.nanoTime + newPullDeadline = now + maxPollIntervalNanos _ <- queueInfo.update(_.withPull(newPullDeadline, recordCount)) } yield () @@ -100,7 +104,7 @@ object PartitionStreamControl { interruptionPromise <- Promise.make[Throwable, Unit] completedPromise <- Promise.make[Nothing, Unit] dataQueue <- Queue.unbounded[Take[Throwable, ByteArrayCommittableRecord]] - now <- Clock.instant + now <- Clock.nanoTime queueInfo <- Ref.make(QueueInfo(now, 0)) requestAndAwaitData = for { @@ -137,14 +141,14 @@ object PartitionStreamControl { // The `pullDeadline` is only relevant when `size > 0`. We initialize `pullDeadline` as soon as size goes above 0. // (Note that theoretically `size` can go below 0 when the update operations are reordered.) - private final case class QueueInfo(pullDeadline: Instant, size: Int) { - def withOffer(newPullDeadline: Instant, recordCount: Int): QueueInfo = + private final case class QueueInfo(pullDeadline: NanoTime, size: Int) { + def withOffer(newPullDeadline: NanoTime, recordCount: Int): QueueInfo = QueueInfo(if (size <= 0) newPullDeadline else pullDeadline, size + recordCount) - def withPull(newPullDeadline: Instant, recordCount: Int): QueueInfo = + def withPull(newPullDeadline: NanoTime, recordCount: Int): QueueInfo = QueueInfo(newPullDeadline, size - recordCount) - def deadlineExceeded(now: Instant): Boolean = + def deadlineExceeded(now: NanoTime): Boolean = size > 0 && pullDeadline <= now }