Skip to content

Commit

Permalink
More type safety
Browse files Browse the repository at this point in the history
  • Loading branch information
erikvanoosten committed Jun 8, 2024
1 parent 1b16bbd commit 1a8a9fc
Showing 1 changed file with 3 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ final class PartitionStreamControl private (
val tp: TopicPartition,
stream: ZStream[Any, Throwable, ByteArrayCommittableRecord],
dataQueue: Queue[Take[Throwable, ByteArrayCommittableRecord]],
interruptionPromise: Promise[Throwable, Unit],
interruptionPromise: Promise[Throwable, Nothing],
val completedPromise: Promise[Nothing, Option[Offset]],
queueInfoRef: Ref[QueueInfo],
maxPollInterval: Duration
Expand Down Expand Up @@ -135,7 +135,7 @@ object PartitionStreamControl {

for {
_ <- ZIO.logDebug(s"Creating partition stream ${tp.toString}")
interruptionPromise <- Promise.make[Throwable, Unit]
interruptionPromise <- Promise.make[Throwable, Nothing]
completedPromise <- Promise.make[Nothing, Option[Offset]]
dataQueue <- Queue.unbounded[Take[Throwable, ByteArrayCommittableRecord]]
now <- Clock.nanoTime
Expand All @@ -146,7 +146,7 @@ object PartitionStreamControl {
_ <- diagnostics.emit(DiagnosticEvent.Request(tp))
taken <- dataQueue
.takeBetween(1, Int.MaxValue)
.race(interruptionPromise.await.as(Chunk.empty))
.race(interruptionPromise.await)
} yield taken

stream = ZStream.logAnnotate(
Expand Down

0 comments on commit 1a8a9fc

Please sign in to comment.