Skip to content

Commit

Permalink
Prevent unlimited enqueueing of CommitAvailable commands
Browse files Browse the repository at this point in the history
  • Loading branch information
svroonland committed Nov 16, 2024
1 parent b8a4e06 commit 5e37292
Showing 1 changed file with 11 additions and 6 deletions.
17 changes: 11 additions & 6 deletions zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ private[consumer] final class Runloop private (
maxStreamPullInterval: Duration,
maxRebalanceDuration: Duration,
currentStateRef: Ref[State],
committedOffsetsRef: Ref[CommitOffsets]
committedOffsetsRef: Ref[CommitOffsets],
commitAvailable: Ref[Boolean]
) {
private val commitTimeout = settings.commitTimeout
private val commitTimeoutNanos = settings.commitTimeout.toNanos
Expand Down Expand Up @@ -327,10 +328,11 @@ private[consumer] final class Runloop private (
for {
p <- Promise.make[Throwable, Unit]
startTime = java.lang.System.nanoTime()
_ <- commitQueue.offer(Runloop.Commit(java.lang.System.nanoTime(), offsets, p))
_ <- commandQueue.offer(RunloopCommand.CommitAvailable)
_ <- diagnostics.emit(DiagnosticEvent.Commit.Started(offsets))
_ <- p.await.timeoutFail(CommitTimeout)(commitTimeout)
_ <- commitQueue.offer(Runloop.Commit(java.lang.System.nanoTime(), offsets, p))
commitAvailable <- commitAvailable.getAndSet(true)
_ <- commandQueue.offer(RunloopCommand.CommitAvailable).unless(commitAvailable)
_ <- diagnostics.emit(DiagnosticEvent.Commit.Started(offsets))
_ <- p.await.timeoutFail(CommitTimeout)(commitTimeout)
endTime = java.lang.System.nanoTime()
latency = (endTime - startTime).nanoseconds
_ <- consumerMetrics.observeCommit(latency)
Expand Down Expand Up @@ -819,6 +821,7 @@ private[consumer] final class Runloop private (
.takeWhile(_ != RunloopCommand.StopRunloop)
.runFoldChunksDiscardZIO(initialState) { (state, commands) =>
for {
_ <- commitAvailable.set(false)
commitCommands <- commitQueue.takeAll
_ <- ZIO.logDebug(
s"Processing ${commitCommands.size} commits," +
Expand Down Expand Up @@ -958,6 +961,7 @@ object Runloop {
lastRebalanceEvent <- Ref.Synchronized.make[Runloop.RebalanceEvent](Runloop.RebalanceEvent.None)
initialState = State.initial
currentStateRef <- Ref.make(initialState)
commitAvailable <- Ref.make(false)
committedOffsetsRef <- Ref.make(CommitOffsets.empty)
sameThreadRuntime <- ZIO.runtime[Any].provideLayer(SameThreadRuntimeLayer)
executor <- ZIO.executor
Expand All @@ -974,7 +978,8 @@ object Runloop {
maxStreamPullInterval = maxStreamPullInterval,
maxRebalanceDuration = maxRebalanceDuration,
currentStateRef = currentStateRef,
committedOffsetsRef = committedOffsetsRef
committedOffsetsRef = committedOffsetsRef,
commitAvailable = commitAvailable
)
_ <- ZIO.logDebug("Starting Runloop")

Expand Down

0 comments on commit 5e37292

Please sign in to comment.