Skip to content

Commit

Permalink
Add some logging of failures
Browse files Browse the repository at this point in the history
  • Loading branch information
svroonland committed Nov 9, 2024
1 parent 0423f31 commit 566b0f3
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ private[consumer] object ConsumerAccess {
)
}
} { consumer =>
ZIO.blocking(access.withPermit(ZIO.attempt(consumer.close(settings.closeTimeout)))).orDie
ZIO
.blocking(access.withPermit(ZIO.attempt(consumer.close(settings.closeTimeout))))
.tapErrorCause(c => ZIO.logErrorCause("Error closing Runloop", c))
.orDie
}
} yield new ConsumerAccess(consumer, access)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,7 @@ private[consumer] final class Runloop private (
* - Poll periodically when we are subscribed but do not have assigned streams yet. This happens after
* initialization and rebalancing
*/
private def run(initialState: State): ZIO[Scope, Throwable, Any] = {
private def run(initialState: State): ZIO[Any, Throwable, Any] = {
import Runloop.StreamOps

ZStream
Expand Down Expand Up @@ -905,12 +905,12 @@ object Runloop {
// Run the entire loop on a dedicated thread to avoid executor shifts
executor <- RunloopExecutor.newInstance
fiber <- ZIO.onExecutor(executor)(runloop.run(initialState)).forkScoped
waitForRunloopStop = fiber.join.orDie
waitForRunloopStop = fiber.join

_ <- ZIO.addFinalizer(
ZIO.logDebug("Shutting down Runloop") *>
runloop.shutdown *>
waitForRunloopStop <*
waitForRunloopStop.tapErrorCause(c => ZIO.logErrorCause("Error waiting for Runloop stop", c)).orDie <*
ZIO.logDebug("Shut down Runloop")
)
} yield runloop
Expand Down

0 comments on commit 566b0f3

Please sign in to comment.