From 1b16bbd669802445bad5481a50b2a5be43e8634f Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Fri, 7 Jun 2024 19:56:40 +0200 Subject: [PATCH] Allow stream to be interrupted when there is no traffic When a partition is lost while there is no traffic, `PartitionStreamControl` is blocked waiting for data. We fix this by racing with the `interruptPromise`. (Thanks @josdirksen for the analysis!) Note: this situation can only occur with lost partitions. Timeouts (the other reason for interrupts) do not occur when there is no traffic. Currently, we have no way to test lost partitions. Therefore, there are no added tests. See #1250. --- .../kafka/consumer/internal/PartitionStreamControl.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala index ea2735824..be924bc74 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala @@ -142,9 +142,11 @@ object PartitionStreamControl { queueInfo <- Ref.make(QueueInfo(now, 0, None, 0)) requestAndAwaitData = for { - _ <- commandQueue.offer(RunloopCommand.Request(tp)) - _ <- diagnostics.emit(DiagnosticEvent.Request(tp)) - taken <- dataQueue.takeBetween(1, Int.MaxValue) + _ <- commandQueue.offer(RunloopCommand.Request(tp)) + _ <- diagnostics.emit(DiagnosticEvent.Request(tp)) + taken <- dataQueue + .takeBetween(1, Int.MaxValue) + .race(interruptionPromise.await.as(Chunk.empty)) } yield taken stream = ZStream.logAnnotate(