diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 39570b7c7..6d8d9178f 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -23,7 +23,7 @@ private[consumer] final class Runloop private ( sameThreadRuntime: Runtime[Any], consumer: ConsumerAccess, commandQueue: Queue[RunloopCommand], - commitAvailableQueue: Queue[Boolean], + commitAvailable: Queue[Boolean], partitionsHub: Hub[Take[Throwable, PartitionAssignment]], diagnostics: Diagnostics, maxStreamPullInterval: Duration, @@ -489,7 +489,7 @@ private[consumer] final class Runloop private ( ZStream .fromQueue(commandQueue) - .merge(ZStream.fromQueue(commitAvailableQueue).as(RunloopCommand.CommitAvailable)) + .merge(ZStream.fromQueue(commitAvailable).as(RunloopCommand.CommitAvailable)) .takeWhile(_ != RunloopCommand.StopRunloop) .runFoldChunksDiscardZIO(initialState) { (state, commands) => for { @@ -583,10 +583,11 @@ object Runloop { partitionsHub: Hub[Take[Throwable, PartitionAssignment]] ): URIO[Scope, Runloop] = for { - _ <- ZIO.addFinalizer(diagnostics.emit(Finalization.RunloopFinalized)) - commandQueue <- ZIO.acquireRelease(Queue.unbounded[RunloopCommand])(_.shutdown) - commitAvailableQueue <- ZIO.acquireRelease(Queue.dropping[Boolean](1))(_.shutdown) - lastRebalanceEvent <- Ref.Synchronized.make[RebalanceEvent](RebalanceEvent.None) + _ <- ZIO.addFinalizer(diagnostics.emit(Finalization.RunloopFinalized)) + commandQueue <- ZIO.acquireRelease(Queue.unbounded[RunloopCommand])(_.shutdown) + // A one-element dropping queue used to signal between two fibers that new commits are pending and we should poll + commitAvailable <- ZIO.acquireRelease(Queue.dropping[Boolean](1))(_.shutdown) + lastRebalanceEvent <- Ref.Synchronized.make[RebalanceEvent](RebalanceEvent.None) initialState = State.initial currentStateRef <- Ref.make(initialState) sameThreadRuntime <- ZIO.runtime[Any].provideLayer(SameThreadRuntimeLayer) @@ -597,7 +598,7 @@ object Runloop { settings.commitTimeout, diagnostics, metrics, - commitAvailableQueue.offer(true).unit, + commitAvailable.offer(true).unit, sameThreadRuntime ) rebalanceCoordinator = new RebalanceCoordinator( @@ -614,7 +615,7 @@ object Runloop { sameThreadRuntime = sameThreadRuntime, consumer = consumer, commandQueue = commandQueue, - commitAvailableQueue = commitAvailableQueue, + commitAvailable = commitAvailable, partitionsHub = partitionsHub, diagnostics = diagnostics, maxStreamPullInterval = maxStreamPullInterval,