From 8ad3f1826e47954af20cb35f8f0d6a8d8cc8fe90 Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Wed, 4 Oct 2023 10:14:35 +0200 Subject: [PATCH] Small cleanup * Use for-loop for more readability. * Hide `State` for rest of package. * Add note to flaky test. --- .../zio/kafka/consumer/ConsumerSpec.scala | 5 +- .../zio/kafka/consumer/internal/Runloop.scala | 75 +++++++++---------- 2 files changed, 39 insertions(+), 41 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index d68b93a8a..48a494665 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -351,7 +351,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { }, test("handle rebalancing by completing topic-partition streams") { val nrMessages = 50 - val nrPartitions = 6 + val nrPartitions = 6 // Must be even and strictly positive for { // Produce messages on several partitions @@ -1142,6 +1142,9 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { test( "it's possible to start a new consumption session from a Consumer that had a consumption session stopped previously" ) { + // NOTE: + // When this test fails with the message `100000 was not less than 100000`, it's because + // your computer is so fast that the first consumer already consumed all 100000 messages. val numberOfMessages: Int = 100000 val kvs: Iterable[(String, String)] = Iterable.tabulate(numberOfMessages)(i => (s"key-$i", s"msg-$i")) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 13d718803..bc04dee73 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -10,6 +10,7 @@ import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics } import zio.kafka.consumer.fetch.FetchStrategy import zio.kafka.consumer.internal.ConsumerAccess.ByteArrayKafkaConsumer +import zio.kafka.consumer.internal.Runloop._ import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment import zio.stream._ @@ -86,26 +87,20 @@ private[consumer] final class Runloop private ( ZIO.fail(new IllegalStateException(s"Multiple onAssigned calls on rebalance listener")) }, onRevoked = (_, _) => - ZIO.logDebug("Rebalancing started") *> - currentStateRef.get.flatMap { state => - // End all streams - endRevokedPartitions( - state.pendingRequests, - state.assignedStreams, - isRevoked = _ => true - ).flatMap { result => - lastRebalanceEvent.updateZIO { - case None => - ZIO.some(Runloop.RebalanceEvent.Revoked(result)) - case _ => - ZIO.fail( - new IllegalStateException( - s"onRevoked called on rebalance listener with pending assigned event" - ) - ) - } - } - } + for { + _ <- ZIO.logDebug("Rebalancing started") + state <- currentStateRef.get + // End all streams + result <- endRevokedPartitions(state.pendingRequests, state.assignedStreams, isRevoked = _ => true) + _ <- lastRebalanceEvent.updateZIO { + case None => + ZIO.some(Runloop.RebalanceEvent.Revoked(result)) + case _ => + ZIO.fail( + new IllegalStateException(s"onRevoked called on rebalance listener with pending assigned event") + ) + } + } yield () ) if (restartStreamsOnRebalancing) { @@ -483,7 +478,7 @@ private[consumer] final class Runloop private ( * - Poll periodically when we are subscribed but do not have assigned streams yet. This happens after * initialization and rebalancing */ - def run(initialState: State): ZIO[Scope, Throwable, Any] = { + private def run(initialState: State): ZIO[Scope, Throwable, Any] = { import Runloop.StreamOps ZStream @@ -606,26 +601,26 @@ private[consumer] object Runloop { ZIO.logDebug("Shut down Runloop") ) } yield runloop -} -private[internal] final case class State( - pendingRequests: Chunk[RunloopCommand.Request], - pendingCommits: Chunk[RunloopCommand.Commit], - assignedStreams: Chunk[PartitionStreamControl], - subscriptionState: SubscriptionState -) { - def addCommit(c: RunloopCommand.Commit): State = copy(pendingCommits = pendingCommits :+ c) - def addRequest(r: RunloopCommand.Request): State = copy(pendingRequests = pendingRequests :+ r) + private final case class State( + pendingRequests: Chunk[RunloopCommand.Request], + pendingCommits: Chunk[RunloopCommand.Commit], + assignedStreams: Chunk[PartitionStreamControl], + subscriptionState: SubscriptionState + ) { + def addCommit(c: RunloopCommand.Commit): State = copy(pendingCommits = pendingCommits :+ c) + def addRequest(r: RunloopCommand.Request): State = copy(pendingRequests = pendingRequests :+ r) - def shouldPoll: Boolean = - subscriptionState.isSubscribed && (pendingRequests.nonEmpty || pendingCommits.nonEmpty || assignedStreams.isEmpty) -} + def shouldPoll: Boolean = + subscriptionState.isSubscribed && (pendingRequests.nonEmpty || pendingCommits.nonEmpty || assignedStreams.isEmpty) + } -object State { - val initial: State = State( - pendingRequests = Chunk.empty, - pendingCommits = Chunk.empty, - assignedStreams = Chunk.empty, - subscriptionState = SubscriptionState.NotSubscribed - ) + private object State { + val initial: State = State( + pendingRequests = Chunk.empty, + pendingCommits = Chunk.empty, + assignedStreams = Chunk.empty, + subscriptionState = SubscriptionState.NotSubscribed + ) + } }