Skip to content

Commit

Permalink
Refactor commit completion
Browse files Browse the repository at this point in the history
  • Loading branch information
svroonland committed Nov 26, 2024
1 parent 60a67ea commit 7050acc
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,9 @@ object CommitterSpec extends ZIOSpecDefault {
_ <- commitAvailable.await
consumer <- createMockConsumer(offsets => ZIO.succeed(offsets))
_ <- committer.processQueuedCommits(consumer)
_ <- commitFiber.join
_ <- committer.keepCommitsForPartitions(Set.empty)
committedOffsets <- committer.getCommittedOffsets
_ <- commitFiber.join
} yield assertTrue(committedOffsets.offsets.isEmpty)
}
) @@ TestAspect.withLiveClock @@ TestAspect.nonFlaky(100)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,36 +58,48 @@ private[consumer] final class LiveCommitter(
_ <- pendingCommits.update(_ ++ commits)
startTime <- ZIO.clockWith(_.nanoTime)
getCommitResults <- commitAsyncZIO(consumer, offsetsWithMetaData)
_ <- getCommitResults
.zipLeft(
for {
endTime <- ZIO.clockWith(_.nanoTime)
latency = (endTime - startTime).nanoseconds
offsetIncrease <- committedOffsetsRef.modify(_.addCommits(commits))
_ <- consumerMetrics.observeAggregatedCommit(latency, offsetIncrease).when(commits.nonEmpty)
} yield ()
)
.zipLeft(ZIO.foreachDiscard(commits)(_.cont.done(Exit.unit)))
.tap(offsetsWithMetaData => diagnostics.emit(DiagnosticEvent.Commit.Success(offsetsWithMetaData)))
.catchAllCause {
case Cause.Fail(_: RebalanceInProgressException, _) =>
for {
_ <- ZIO.logDebug(s"Rebalance in progress, commit for offsets $offsets will be retried")
_ <- commitQueue.offerAll(commits)
_ <- onCommitAvailable
} yield ()
case c =>
ZIO.foreachDiscard(commits)(_.cont.done(Exit.fail(c.squash))) <* diagnostics.emit(
DiagnosticEvent.Commit.Failure(offsets, c.squash)
)
}
_ <- getCommitResults.exit
.flatMap(handleCommitCompletion(commits, offsetsWithMetaData, startTime, _))
// We don't wait for the completion of the commit here, because it
// will only complete once we poll again.
.forkDaemon
} yield ()
}
} yield ()

private def handleCommitCompletion(
commits: Chunk[Commit],
offsets: Map[TopicPartition, OffsetAndMetadata],
startTime: NanoTime,
commitResults: Exit[Throwable, Map[TopicPartition, OffsetAndMetadata]]
): Task[Unit] =
ZIO
.from(commitResults)
.unexit
.zipLeft(
for {
endTime <- ZIO.clockWith(_.nanoTime)
latency = (endTime - startTime).nanoseconds
offsetIncrease <- committedOffsetsRef.modify(_.addCommits(commits))
_ <- consumerMetrics.observeAggregatedCommit(latency, offsetIncrease).when(commits.nonEmpty)
} yield ()
)
.zipLeft(ZIO.foreachDiscard(commits)(_.cont.done(Exit.unit)))
.tap(offsetsWithMetaData => diagnostics.emit(DiagnosticEvent.Commit.Success(offsetsWithMetaData)))
.catchAllCause {
case Cause.Fail(_: RebalanceInProgressException, _) =>
for {
_ <- ZIO.logDebug(s"Rebalance in progress, commit for offsets $offsets will be retried")
_ <- commitQueue.offerAll(commits)
_ <- onCommitAvailable
} yield ()
case c =>
ZIO.foreachDiscard(commits)(_.cont.done(Exit.fail(c.squash))) <* diagnostics.emit(
DiagnosticEvent.Commit.Failure(offsets, c.squash)
)
}
.unit

private def mergeCommitOffsets(commits: Chunk[Commit]): Map[TopicPartition, OffsetAndMetadata] =
commits
.foldLeft(mutable.Map.empty[TopicPartition, OffsetAndMetadata]) { case (acc, commit) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ private[consumer] final class Runloop private (
* - Poll periodically when we are subscribed but do not have assigned streams yet. This happens after
* initialization and rebalancing
*
* Note that this method is executed on a dedicated single-thread blocking exector
* Note that this method is executed on a dedicated single-thread Executor
*/
private def run(initialState: State, commitExecutor: Executor): ZIO[Scope, Throwable, Any] = {
import Runloop.StreamOps
Expand Down

0 comments on commit 7050acc

Please sign in to comment.