From 33a6d82ad97144ee720cce2e75d3086f64a58894 Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Mon, 20 May 2024 15:58:57 +0200 Subject: [PATCH] This works with timeout --- .../zio/kafka/consumer/ConsumerSpec.scala | 1 - .../scala/zio/kafka/consumer/Consumer.scala | 18 ++++++++---------- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index aefc216e7..d395183c5 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -512,7 +512,6 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .forkScoped } _ <- stream1Interrupted.await - _ <- ZIO.logInfo("Producing second batch topic1") _ <- produceMany(topic1, kvs) _ <- stream1Done.await .tapErrorCause(c => ZIO.logErrorCause("Stream 1 await failed", c)) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index 81bfa30b3..9439fedab 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -728,22 +728,20 @@ private[consumer] final class ConsumerLive private[consumer] ( control <- streamControl fib <- (withStream(control.stream) - .onInterrupt( - ZIO.logError("withStream in runWithGracefulShutdown interrupted, this should not happen") - )) - .tapErrorCause(cause => ZIO.logErrorCause("Error in withStream fiber in runWithGracefulShutdown", cause)) - .forkScoped + .onInterrupt(ZIO.logError("withStream in runWithGracefulShutdown was interrupted, this should not happen"))) + .tapErrorCause(cause => + ZIO.logErrorCause("Error in withStream fiber in runWithGracefulShutdown", cause) *> ZIO.getFiberRefs.debug + ) + .forkDaemon // Does not work with forkScoped, this Fiber would then be interrupted unintended sometimes result <- fib.join.onInterrupt( - ZIO.fiberIdWith(id => ZIO.logInfo(s"Interrupting from ${id.toString}")) *> - control.stop *> ZIO.logInfo("Control stopped") *> + control.stop *> fib.join - /// TODO this still gives errors.. -// .timeout(shutdownTimeout) + .timeout(shutdownTimeout) .tapErrorCause(cause => ZIO.logErrorCause("Error joining withStream fiber in runWithGracefulShutdown", cause) ) - .tap(_ => ZIO.logInfo("Join done")) + .interruptible // Not having this here results in errors. Also, onInterrupt is run interruptibly .ignore ) } yield result