Skip to content

Commit

Permalink
Attempt to fix flaky test (2)
Browse files Browse the repository at this point in the history
Also: switch to nanoTime to prevent clock shifts
  • Loading branch information
erikvanoosten committed Oct 14, 2023
1 parent 5c03caf commit 6ec678b
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -337,13 +337,15 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
`max.poll.records` = 2
)
consumer <- Consumer.make(settings.withPollTimeout(100.millis))
_ <- scheduledProduce(topic1, Schedule.fixed(100.millis).jittered).runDrain.forkScoped
_ <- scheduledProduce(topic2, Schedule.fixed(100.millis).jittered).runDrain.forkScoped
_ <- scheduledProduce(topic1, Schedule.fixed(50.millis).jittered).runDrain.forkScoped
_ <- scheduledProduce(topic2, Schedule.fixed(200.millis).jittered).runDrain.forkScoped
// The slow consumer:
c1 <- consumer
.plainStream(Subscription.topics(topic1), Serde.string, Serde.string)
.tap(r => ZIO.sleep(5.seconds).when(r.key == "key3"))
.take(100) // Because of chunking, we need to pull a bit more before the interrupt kicks in.
// Use `take` to ensure the test ends quickly, even when the interrupt fails to occur.
// Because of chunking, we need to pull more than 3 records before the interrupt kicks in.
.take(100)
.runDrain
.exit
.fork
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord
import zio.stream.{ Take, ZStream }
import zio.{ Chunk, Clock, Duration, LogAnnotation, Promise, Queue, Ref, UIO, ZIO }

import java.time.Instant
import java.util.concurrent.TimeoutException
import scala.math.Ordered.orderingToOrdered
import scala.util.control.NoStackTrace

final class PartitionStreamControl private (
Expand All @@ -21,6 +19,8 @@ final class PartitionStreamControl private (
queueInfoRef: Ref[QueueInfo],
maxPollInterval: Duration
) {
private val maxPollIntervalNanos = maxPollInterval.toNanos

private val logAnnotate = ZIO.logAnnotate(
LogAnnotation("topic", tp.topic()),
LogAnnotation("partition", tp.partition().toString)
Expand All @@ -29,8 +29,8 @@ final class PartitionStreamControl private (
/** Offer new data for the stream to process. */
private[internal] def offerRecords(data: Chunk[ByteArrayCommittableRecord]): ZIO[Any, Nothing, Unit] =
for {
now <- Clock.instant
newPullDeadline = now.plus(maxPollInterval)
now <- Clock.nanoTime
newPullDeadline = now + maxPollIntervalNanos
_ <- queueInfoRef.update(_.withOffer(newPullDeadline, data.size))
_ <- dataQueue.offer(Take.chunk(data))
} yield ()
Expand All @@ -44,7 +44,7 @@ final class PartitionStreamControl private (
*/
private[internal] def maxPollIntervalExceeded: UIO[Boolean] =
for {
now <- Clock.instant
now <- Clock.nanoTime
queueInfo <- queueInfoRef.get
} yield queueInfo.deadlineExceeded(now)

Expand Down Expand Up @@ -88,10 +88,12 @@ object PartitionStreamControl {
diagnostics: Diagnostics,
maxPollInterval: Duration
): UIO[PartitionStreamControl] = {
val maxPollIntervalNanos = maxPollInterval.toNanos

def registerPull(queueInfo: Ref[QueueInfo], recordCount: Int): UIO[Unit] =
for {
now <- Clock.instant
newPullDeadline = now.plus(maxPollInterval)
now <- Clock.nanoTime
newPullDeadline = now + maxPollIntervalNanos
_ <- queueInfo.update(_.withPull(newPullDeadline, recordCount))
} yield ()

Expand All @@ -100,7 +102,7 @@ object PartitionStreamControl {
interruptionPromise <- Promise.make[Throwable, Unit]
completedPromise <- Promise.make[Nothing, Unit]
dataQueue <- Queue.unbounded[Take[Throwable, ByteArrayCommittableRecord]]
now <- Clock.instant
now <- Clock.nanoTime
queueInfo <- Ref.make(QueueInfo(now, 0))
requestAndAwaitData =
for {
Expand Down Expand Up @@ -137,14 +139,14 @@ object PartitionStreamControl {

// The `pullDeadline` is only relevant when `size > 0`. We initialize `pullDeadline` as soon as size goes above 0.
// (Note that theoretically `size` can go below 0 when the update operations are reordered.)
private final case class QueueInfo(pullDeadline: Instant, size: Int) {
def withOffer(newPullDeadline: Instant, recordCount: Int): QueueInfo =
private final case class QueueInfo(pullDeadline: Long, size: Int) {
def withOffer(newPullDeadline: Long, recordCount: Int): QueueInfo =
QueueInfo(if (size <= 0) newPullDeadline else pullDeadline, size + recordCount)

def withPull(newPullDeadline: Instant, recordCount: Int): QueueInfo =
def withPull(newPullDeadline: Long, recordCount: Int): QueueInfo =
QueueInfo(newPullDeadline, size - recordCount)

def deadlineExceeded(now: Instant): Boolean =
def deadlineExceeded(now: Long): Boolean =
size > 0 && pullDeadline <= now
}

Expand Down

0 comments on commit 6ec678b

Please sign in to comment.