Skip to content

Commit

Permalink
Renames + comment
Browse files Browse the repository at this point in the history
  • Loading branch information
svroonland committed Nov 26, 2024
1 parent 137794f commit 6afa3ef
Showing 1 changed file with 9 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ private[consumer] final class Runloop private (
sameThreadRuntime: Runtime[Any],
consumer: ConsumerAccess,
commandQueue: Queue[RunloopCommand],
commitAvailableQueue: Queue[Boolean],
commitAvailable: Queue[Boolean],
partitionsHub: Hub[Take[Throwable, PartitionAssignment]],
diagnostics: Diagnostics,
maxStreamPullInterval: Duration,
Expand Down Expand Up @@ -489,7 +489,7 @@ private[consumer] final class Runloop private (

ZStream
.fromQueue(commandQueue)
.merge(ZStream.fromQueue(commitAvailableQueue).as(RunloopCommand.CommitAvailable))
.merge(ZStream.fromQueue(commitAvailable).as(RunloopCommand.CommitAvailable))
.takeWhile(_ != RunloopCommand.StopRunloop)
.runFoldChunksDiscardZIO(initialState) { (state, commands) =>
for {
Expand Down Expand Up @@ -583,10 +583,11 @@ object Runloop {
partitionsHub: Hub[Take[Throwable, PartitionAssignment]]
): URIO[Scope, Runloop] =
for {
_ <- ZIO.addFinalizer(diagnostics.emit(Finalization.RunloopFinalized))
commandQueue <- ZIO.acquireRelease(Queue.unbounded[RunloopCommand])(_.shutdown)
commitAvailableQueue <- ZIO.acquireRelease(Queue.dropping[Boolean](1))(_.shutdown)
lastRebalanceEvent <- Ref.Synchronized.make[RebalanceEvent](RebalanceEvent.None)
_ <- ZIO.addFinalizer(diagnostics.emit(Finalization.RunloopFinalized))
commandQueue <- ZIO.acquireRelease(Queue.unbounded[RunloopCommand])(_.shutdown)
// A one-element dropping queue used to signal between two fibers that new commits are pending and we should poll
commitAvailable <- ZIO.acquireRelease(Queue.dropping[Boolean](1))(_.shutdown)
lastRebalanceEvent <- Ref.Synchronized.make[RebalanceEvent](RebalanceEvent.None)
initialState = State.initial
currentStateRef <- Ref.make(initialState)
sameThreadRuntime <- ZIO.runtime[Any].provideLayer(SameThreadRuntimeLayer)
Expand All @@ -597,7 +598,7 @@ object Runloop {
settings.commitTimeout,
diagnostics,
metrics,
commitAvailableQueue.offer(true).unit,
commitAvailable.offer(true).unit,
sameThreadRuntime
)
rebalanceCoordinator = new RebalanceCoordinator(
Expand All @@ -614,7 +615,7 @@ object Runloop {
sameThreadRuntime = sameThreadRuntime,
consumer = consumer,
commandQueue = commandQueue,
commitAvailableQueue = commitAvailableQueue,
commitAvailable = commitAvailable,
partitionsHub = partitionsHub,
diagnostics = diagnostics,
maxStreamPullInterval = maxStreamPullInterval,
Expand Down

0 comments on commit 6afa3ef

Please sign in to comment.