Skip to content

Commit

Permalink
This works
Browse files Browse the repository at this point in the history
  • Loading branch information
svroonland committed May 20, 2024
1 parent f977001 commit c54b3e9
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 36 deletions.
68 changes: 42 additions & 26 deletions zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -473,37 +473,53 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
_ <- produceMany(topic2, kvs)
_ <- ZIO.scoped {
for {
stream1Started <- Promise.make[Nothing, Unit]
stream1Fib <-
Consumer
.plainStreamWithGracefulShutdown(Subscription.topics(topic1), Serde.string, Serde.string) {
stream =>
stream
.tap(_ => stream1Started.succeed(()))
.zipWithIndex
.map(_._2)
.takeWhile(_ < 2 * kvs.size - 1)
.runDrain

}
.forkScoped
stream1Started <- Promise.make[Nothing, Unit]
stream1Done <- Promise.make[Nothing, Unit]
stream1Interrupted <- Promise.make[Nothing, Unit]
stream1Fib <- ZIO.logAnnotate("stream", "1") {
(Consumer
.plainStreamWithGracefulShutdown(
Subscription.topics(topic1),
Serde.string,
Serde.string
) { stream =>
stream
.tap(_ => stream1Started.succeed(()))
.zipWithIndex
.map(_._2)
.runDrain
}
.tapErrorCause(c => ZIO.logErrorCause("Stream 1 failed", c))
.ensuring(stream1Done.succeed(())))
.forkScoped
}
_ <- stream1Started.await
_ <-
Consumer
.plainStreamWithGracefulShutdown(Subscription.topics(topic2), Serde.string, Serde.string) {
stream =>
stream.zipWithIndex
.map(_._2)
.tap(count => stream1Fib.interrupt.when(count == 4))
.runDrain
}
.forkScoped
_ <- ZIO.logAnnotate("stream", "2") {
Consumer
.plainStreamWithGracefulShutdown(
Subscription.topics(topic2),
Serde.string,
Serde.string
) { stream =>
stream.zipWithIndex
.map(_._2)
.tap(count =>
(stream1Fib.interrupt <* stream1Interrupted.succeed(())).when(count == 4)
)
.runDrain
}
.tapErrorCause(c => ZIO.logErrorCause("Stream 2 failed", c))
.forkScoped
}
_ <- stream1Interrupted.await
_ <- ZIO.logInfo("Producing second batch topic1")
_ <- produceMany(topic1, kvs)
_ <- stream1Fib.join
_ <- stream1Done.await
.tapErrorCause(c => ZIO.logErrorCause("Stream 1 await failed", c))
} yield ()
}.provideSomeLayer[Kafka with Scope with Producer](consumer(client, Some(group)))
} yield assertCompletes
}
} @@ nonFlaky(10)
),
test("a consumer timeout interrupts the stream and shuts down the consumer") {
// Setup of this test:
Expand Down
29 changes: 19 additions & 10 deletions zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import zio.kafka.serde.{ Deserializer, Serde }
import zio.kafka.utils.SslHelper
import zio.stream._

import scala.annotation.unused
import scala.jdk.CollectionConverters._
import scala.util.control.NoStackTrace

Expand Down Expand Up @@ -718,24 +719,32 @@ private[consumer] final class ConsumerLive private[consumer] (
*/
private def runWithGracefulShutdown[StreamType <: ZStream[_, _, _], R, E](
streamControl: ZIO[Scope, E, SubscriptionStreamControl[StreamType]],
shutdownTimeout: Duration
@unused shutdownTimeout: Duration
)(
withStream: StreamType => ZIO[R, E, Any]
): ZIO[R, E, Any] =
ZIO.scoped[R] {
for {
control <- streamControl
fib <- withStream(control.stream)
.onInterrupt(ZIO.logError("withStream in runWithGracefulShutdown interrupted, this should not happen"))
.forkScoped
fib <-
(withStream(control.stream)
.onInterrupt(
ZIO.logError("withStream in runWithGracefulShutdown interrupted, this should not happen")
))
.tapErrorCause(cause => ZIO.logErrorCause("Error in withStream fiber in runWithGracefulShutdown", cause))
.forkScoped
result <-
fib.join.onInterrupt(
control.stop *> fib.join
.timeout(shutdownTimeout)
.tapErrorCause(cause =>
ZIO.logErrorCause("Error joining withStream fiber in runWithGracefulShutdown", cause)
)
.ignore
ZIO.fiberIdWith(id => ZIO.logInfo(s"Interrupting from ${id.toString}")) *>
control.stop *> ZIO.logInfo("Control stopped") *>
fib.join
/// TODO this still gives errors..
// .timeout(shutdownTimeout)
.tapErrorCause(cause =>
ZIO.logErrorCause("Error joining withStream fiber in runWithGracefulShutdown", cause)
)
.tap(_ => ZIO.logInfo("Join done"))
.ignore
)
} yield result
}
Expand Down

0 comments on commit c54b3e9

Please sign in to comment.