From 1a8a9fc1b74e0defdcf2d1e7f69c923f962e876e Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Sat, 8 Jun 2024 09:29:53 +0200 Subject: [PATCH] More type safety --- .../kafka/consumer/internal/PartitionStreamControl.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala index be924bc74..550bc4349 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala @@ -38,7 +38,7 @@ final class PartitionStreamControl private ( val tp: TopicPartition, stream: ZStream[Any, Throwable, ByteArrayCommittableRecord], dataQueue: Queue[Take[Throwable, ByteArrayCommittableRecord]], - interruptionPromise: Promise[Throwable, Unit], + interruptionPromise: Promise[Throwable, Nothing], val completedPromise: Promise[Nothing, Option[Offset]], queueInfoRef: Ref[QueueInfo], maxPollInterval: Duration @@ -135,7 +135,7 @@ object PartitionStreamControl { for { _ <- ZIO.logDebug(s"Creating partition stream ${tp.toString}") - interruptionPromise <- Promise.make[Throwable, Unit] + interruptionPromise <- Promise.make[Throwable, Nothing] completedPromise <- Promise.make[Nothing, Option[Offset]] dataQueue <- Queue.unbounded[Take[Throwable, ByteArrayCommittableRecord]] now <- Clock.nanoTime @@ -146,7 +146,7 @@ object PartitionStreamControl { _ <- diagnostics.emit(DiagnosticEvent.Request(tp)) taken <- dataQueue .takeBetween(1, Int.MaxValue) - .race(interruptionPromise.await.as(Chunk.empty)) + .race(interruptionPromise.await) } yield taken stream = ZStream.logAnnotate(