diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index 90ca5e7fa..aefc216e7 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -473,37 +473,53 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { _ <- produceMany(topic2, kvs) _ <- ZIO.scoped { for { - stream1Started <- Promise.make[Nothing, Unit] - stream1Fib <- - Consumer - .plainStreamWithGracefulShutdown(Subscription.topics(topic1), Serde.string, Serde.string) { - stream => - stream - .tap(_ => stream1Started.succeed(())) - .zipWithIndex - .map(_._2) - .takeWhile(_ < 2 * kvs.size - 1) - .runDrain - - } - .forkScoped + stream1Started <- Promise.make[Nothing, Unit] + stream1Done <- Promise.make[Nothing, Unit] + stream1Interrupted <- Promise.make[Nothing, Unit] + stream1Fib <- ZIO.logAnnotate("stream", "1") { + (Consumer + .plainStreamWithGracefulShutdown( + Subscription.topics(topic1), + Serde.string, + Serde.string + ) { stream => + stream + .tap(_ => stream1Started.succeed(())) + .zipWithIndex + .map(_._2) + .runDrain + } + .tapErrorCause(c => ZIO.logErrorCause("Stream 1 failed", c)) + .ensuring(stream1Done.succeed(()))) + .forkScoped + } _ <- stream1Started.await - _ <- - Consumer - .plainStreamWithGracefulShutdown(Subscription.topics(topic2), Serde.string, Serde.string) { - stream => - stream.zipWithIndex - .map(_._2) - .tap(count => stream1Fib.interrupt.when(count == 4)) - .runDrain - } - .forkScoped + _ <- ZIO.logAnnotate("stream", "2") { + Consumer + .plainStreamWithGracefulShutdown( + Subscription.topics(topic2), + Serde.string, + Serde.string + ) { stream => + stream.zipWithIndex + .map(_._2) + .tap(count => + (stream1Fib.interrupt <* stream1Interrupted.succeed(())).when(count == 4) + ) + .runDrain + } + .tapErrorCause(c => ZIO.logErrorCause("Stream 2 failed", c)) + .forkScoped + } + _ <- stream1Interrupted.await + _ <- ZIO.logInfo("Producing second batch topic1") _ <- produceMany(topic1, kvs) - _ <- stream1Fib.join + _ <- stream1Done.await + .tapErrorCause(c => ZIO.logErrorCause("Stream 1 await failed", c)) } yield () }.provideSomeLayer[Kafka with Scope with Producer](consumer(client, Some(group))) } yield assertCompletes - } + } @@ nonFlaky(10) ), test("a consumer timeout interrupts the stream and shuts down the consumer") { // Setup of this test: diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index d085b664f..81bfa30b3 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -16,6 +16,7 @@ import zio.kafka.serde.{ Deserializer, Serde } import zio.kafka.utils.SslHelper import zio.stream._ +import scala.annotation.unused import scala.jdk.CollectionConverters._ import scala.util.control.NoStackTrace @@ -718,24 +719,32 @@ private[consumer] final class ConsumerLive private[consumer] ( */ private def runWithGracefulShutdown[StreamType <: ZStream[_, _, _], R, E]( streamControl: ZIO[Scope, E, SubscriptionStreamControl[StreamType]], - shutdownTimeout: Duration + @unused shutdownTimeout: Duration )( withStream: StreamType => ZIO[R, E, Any] ): ZIO[R, E, Any] = ZIO.scoped[R] { for { control <- streamControl - fib <- withStream(control.stream) - .onInterrupt(ZIO.logError("withStream in runWithGracefulShutdown interrupted, this should not happen")) - .forkScoped + fib <- + (withStream(control.stream) + .onInterrupt( + ZIO.logError("withStream in runWithGracefulShutdown interrupted, this should not happen") + )) + .tapErrorCause(cause => ZIO.logErrorCause("Error in withStream fiber in runWithGracefulShutdown", cause)) + .forkScoped result <- fib.join.onInterrupt( - control.stop *> fib.join - .timeout(shutdownTimeout) - .tapErrorCause(cause => - ZIO.logErrorCause("Error joining withStream fiber in runWithGracefulShutdown", cause) - ) - .ignore + ZIO.fiberIdWith(id => ZIO.logInfo(s"Interrupting from ${id.toString}")) *> + control.stop *> ZIO.logInfo("Control stopped") *> + fib.join + /// TODO this still gives errors.. +// .timeout(shutdownTimeout) + .tapErrorCause(cause => + ZIO.logErrorCause("Error joining withStream fiber in runWithGracefulShutdown", cause) + ) + .tap(_ => ZIO.logInfo("Join done")) + .ignore ) } yield result }