diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala index ea2735824..be924bc74 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala @@ -142,9 +142,11 @@ object PartitionStreamControl { queueInfo <- Ref.make(QueueInfo(now, 0, None, 0)) requestAndAwaitData = for { - _ <- commandQueue.offer(RunloopCommand.Request(tp)) - _ <- diagnostics.emit(DiagnosticEvent.Request(tp)) - taken <- dataQueue.takeBetween(1, Int.MaxValue) + _ <- commandQueue.offer(RunloopCommand.Request(tp)) + _ <- diagnostics.emit(DiagnosticEvent.Request(tp)) + taken <- dataQueue + .takeBetween(1, Int.MaxValue) + .race(interruptionPromise.await.as(Chunk.empty)) } yield taken stream = ZStream.logAnnotate(