Skip to content

Commit

Permalink
This works with timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
svroonland committed May 20, 2024
1 parent c54b3e9 commit 33a6d82
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,6 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
.forkScoped
}
_ <- stream1Interrupted.await
_ <- ZIO.logInfo("Producing second batch topic1")
_ <- produceMany(topic1, kvs)
_ <- stream1Done.await
.tapErrorCause(c => ZIO.logErrorCause("Stream 1 await failed", c))
Expand Down
18 changes: 8 additions & 10 deletions zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -728,22 +728,20 @@ private[consumer] final class ConsumerLive private[consumer] (
control <- streamControl
fib <-
(withStream(control.stream)
.onInterrupt(
ZIO.logError("withStream in runWithGracefulShutdown interrupted, this should not happen")
))
.tapErrorCause(cause => ZIO.logErrorCause("Error in withStream fiber in runWithGracefulShutdown", cause))
.forkScoped
.onInterrupt(ZIO.logError("withStream in runWithGracefulShutdown was interrupted, this should not happen")))
.tapErrorCause(cause =>
ZIO.logErrorCause("Error in withStream fiber in runWithGracefulShutdown", cause) *> ZIO.getFiberRefs.debug
)
.forkDaemon // Does not work with forkScoped, this Fiber would then be interrupted unintended sometimes
result <-
fib.join.onInterrupt(
ZIO.fiberIdWith(id => ZIO.logInfo(s"Interrupting from ${id.toString}")) *>
control.stop *> ZIO.logInfo("Control stopped") *>
control.stop *>
fib.join
/// TODO this still gives errors..
// .timeout(shutdownTimeout)
.timeout(shutdownTimeout)
.tapErrorCause(cause =>
ZIO.logErrorCause("Error joining withStream fiber in runWithGracefulShutdown", cause)
)
.tap(_ => ZIO.logInfo("Join done"))
.interruptible // Not having this here results in errors. Also, onInterrupt is run interruptibly
.ignore
)
} yield result
Expand Down

0 comments on commit 33a6d82

Please sign in to comment.