diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/CommitterSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/CommitterSpec.scala index 1f901c89c..c505dc237 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/CommitterSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/CommitterSpec.scala @@ -188,9 +188,9 @@ object CommitterSpec extends ZIOSpecDefault { _ <- commitAvailable.await consumer <- createMockConsumer(offsets => ZIO.succeed(offsets)) _ <- committer.processQueuedCommits(consumer) + _ <- commitFiber.join _ <- committer.keepCommitsForPartitions(Set.empty) committedOffsets <- committer.getCommittedOffsets - _ <- commitFiber.join } yield assertTrue(committedOffsets.offsets.isEmpty) } ) @@ TestAspect.withLiveClock @@ TestAspect.nonFlaky(100) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/LiveCommitter.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/LiveCommitter.scala index 9e2af07ca..f80041ad0 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/LiveCommitter.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/LiveCommitter.scala @@ -58,29 +58,8 @@ private[consumer] final class LiveCommitter( _ <- pendingCommits.update(_ ++ commits) startTime <- ZIO.clockWith(_.nanoTime) getCommitResults <- commitAsyncZIO(consumer, offsetsWithMetaData) - _ <- getCommitResults - .zipLeft( - for { - endTime <- ZIO.clockWith(_.nanoTime) - latency = (endTime - startTime).nanoseconds - offsetIncrease <- committedOffsetsRef.modify(_.addCommits(commits)) - _ <- consumerMetrics.observeAggregatedCommit(latency, offsetIncrease).when(commits.nonEmpty) - } yield () - ) - .zipLeft(ZIO.foreachDiscard(commits)(_.cont.done(Exit.unit))) - .tap(offsetsWithMetaData => diagnostics.emit(DiagnosticEvent.Commit.Success(offsetsWithMetaData))) - .catchAllCause { - case Cause.Fail(_: RebalanceInProgressException, _) => - for { - _ <- ZIO.logDebug(s"Rebalance in progress, commit for offsets $offsets will be retried") - _ <- commitQueue.offerAll(commits) - _ <- onCommitAvailable - } yield () - case c => - ZIO.foreachDiscard(commits)(_.cont.done(Exit.fail(c.squash))) <* diagnostics.emit( - DiagnosticEvent.Commit.Failure(offsets, c.squash) - ) - } + _ <- getCommitResults.exit + .flatMap(handleCommitCompletion(commits, offsetsWithMetaData, startTime, _)) // We don't wait for the completion of the commit here, because it // will only complete once we poll again. .forkDaemon @@ -88,6 +67,39 @@ private[consumer] final class LiveCommitter( } } yield () + private def handleCommitCompletion( + commits: Chunk[Commit], + offsets: Map[TopicPartition, OffsetAndMetadata], + startTime: NanoTime, + commitResults: Exit[Throwable, Map[TopicPartition, OffsetAndMetadata]] + ): Task[Unit] = + ZIO + .from(commitResults) + .unexit + .zipLeft( + for { + endTime <- ZIO.clockWith(_.nanoTime) + latency = (endTime - startTime).nanoseconds + offsetIncrease <- committedOffsetsRef.modify(_.addCommits(commits)) + _ <- consumerMetrics.observeAggregatedCommit(latency, offsetIncrease).when(commits.nonEmpty) + } yield () + ) + .zipLeft(ZIO.foreachDiscard(commits)(_.cont.done(Exit.unit))) + .tap(offsetsWithMetaData => diagnostics.emit(DiagnosticEvent.Commit.Success(offsetsWithMetaData))) + .catchAllCause { + case Cause.Fail(_: RebalanceInProgressException, _) => + for { + _ <- ZIO.logDebug(s"Rebalance in progress, commit for offsets $offsets will be retried") + _ <- commitQueue.offerAll(commits) + _ <- onCommitAvailable + } yield () + case c => + ZIO.foreachDiscard(commits)(_.cont.done(Exit.fail(c.squash))) <* diagnostics.emit( + DiagnosticEvent.Commit.Failure(offsets, c.squash) + ) + } + .unit + private def mergeCommitOffsets(commits: Chunk[Commit]): Map[TopicPartition, OffsetAndMetadata] = commits .foldLeft(mutable.Map.empty[TopicPartition, OffsetAndMetadata]) { case (acc, commit) => diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index e275d65e9..704c116ee 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -484,7 +484,7 @@ private[consumer] final class Runloop private ( * - Poll periodically when we are subscribed but do not have assigned streams yet. This happens after * initialization and rebalancing * - * Note that this method is executed on a dedicated single-thread blocking exector + * Note that this method is executed on a dedicated single-thread Executor */ private def run(initialState: State, commitExecutor: Executor): ZIO[Scope, Throwable, Any] = { import Runloop.StreamOps