diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index 679a0d582..defc9484e 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -1480,9 +1480,6 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { test( "it's possible to start a new consumption session from a Consumer that had a consumption session stopped previously" ) { - // NOTE: - // When this test fails with the message `100000 was not less than 100000`, it's because - // your computer is so fast that the first consumer already consumed all 100000 messages. val numberOfMessages: Int = 100000 val kvs: Iterable[(String, String)] = Iterable.tabulate(numberOfMessages)(i => (s"key-$i", s"msg-$i")) @@ -1494,11 +1491,13 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { consumer <- Consumer.make(settings, diagnostics = diagnostics) _ <- produceMany(topic, kvs) // Starting a consumption session to start the Runloop. - fiber <- consumer - .plainStream(Subscription.manual(topic -> 0), Serde.string, Serde.string) - .take(numberOfMessages.toLong) - .runCount - .forkScoped + fiber <- + consumer + .plainStream(Subscription.manual(topic -> 0), Serde.string, Serde.string) + .tap(_ => ZIO.sleep(1.millisecond)) // sleep to avoid consuming all messages in under 200 millis + .take(numberOfMessages.toLong) + .runCount + .forkScoped _ <- ZIO.sleep(200.millis) _ <- consumer.stopConsumption consumed0 <- fiber.join