Skip to content

Commit

Permalink
Fix slow rebalanceSafeCommits behavior (#1358)
Browse files Browse the repository at this point in the history
After consumer 1 is shutdown (using stopConsumption), rebalances happen
and partitions from consumer 2 are assigned. These streams are never
started, so the finalizer completing completedPromise is never called.
Waiting for these to complete takes 3 minutes (default
maxRebalanceDuration).

In case that streams were assigned and no record was ever put in their
queues, there's no use in waiting for the stream to complete by
committing some offset.
  • Loading branch information
svroonland authored Nov 2, 2024
1 parent 9f316f9 commit d59e8f6
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ final class PartitionStreamControl private (

def queueSize: UIO[Int] = queueInfoRef.get.map(_.size)

def lastPulledOffset: UIO[Option[Offset]] = queueInfoRef.get.map(_.lastPulledOffset)

/**
* @return
* the number of polls there are records idling in the queue. It is increased on every poll (when the queue is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,10 @@ private[consumer] final class Runloop private (
streamResults <-
ZIO.foreach(streamsToEnd) { stream =>
for {
isDone <- stream.completedPromise.isDone
endOffset <- if (isDone) stream.completedPromise.await else ZIO.none
} yield (isDone, endOffset)
isDone <- stream.completedPromise.isDone
lastPulledOffset <- stream.lastPulledOffset
endOffset <- if (isDone) stream.completedPromise.await else ZIO.none
} yield (isDone || lastPulledOffset.isEmpty, endOffset)
}
committedOffsets <- committedOffsetsRef.get
} yield {
Expand Down

0 comments on commit d59e8f6

Please sign in to comment.