From 566b0f3a5a98de129e8dedf6c14fc3e5e4da7eae Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Sat, 9 Nov 2024 15:31:39 +0100 Subject: [PATCH] Add some logging of failures --- .../scala/zio/kafka/consumer/internal/ConsumerAccess.scala | 5 ++++- .../main/scala/zio/kafka/consumer/internal/Runloop.scala | 6 +++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerAccess.scala index 54690dc47..d2526dc59 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerAccess.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerAccess.scala @@ -60,7 +60,10 @@ private[consumer] object ConsumerAccess { ) } } { consumer => - ZIO.blocking(access.withPermit(ZIO.attempt(consumer.close(settings.closeTimeout)))).orDie + ZIO + .blocking(access.withPermit(ZIO.attempt(consumer.close(settings.closeTimeout)))) + .tapErrorCause(c => ZIO.logErrorCause("Error closing Runloop", c)) + .orDie } } yield new ConsumerAccess(consumer, access) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 7159f774e..5fd49187f 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -733,7 +733,7 @@ private[consumer] final class Runloop private ( * - Poll periodically when we are subscribed but do not have assigned streams yet. This happens after * initialization and rebalancing */ - private def run(initialState: State): ZIO[Scope, Throwable, Any] = { + private def run(initialState: State): ZIO[Any, Throwable, Any] = { import Runloop.StreamOps ZStream @@ -905,12 +905,12 @@ object Runloop { // Run the entire loop on a dedicated thread to avoid executor shifts executor <- RunloopExecutor.newInstance fiber <- ZIO.onExecutor(executor)(runloop.run(initialState)).forkScoped - waitForRunloopStop = fiber.join.orDie + waitForRunloopStop = fiber.join _ <- ZIO.addFinalizer( ZIO.logDebug("Shutting down Runloop") *> runloop.shutdown *> - waitForRunloopStop <* + waitForRunloopStop.tapErrorCause(c => ZIO.logErrorCause("Error waiting for Runloop stop", c)).orDie <* ZIO.logDebug("Shut down Runloop") ) } yield runloop