From 21361c14822932d837ea2032c677f151062e5cd3 Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Sat, 4 Nov 2023 14:55:57 +0100 Subject: [PATCH] Always end streams in rebalance listener, support lost partitions (#1089) Previously, streams would only be ended in the rebalance listener for revoked partitions. Now, they are ended there even when `restartStreamsOnRebalancing` is used. Lost partitions are no longer treated as being revoked. With this change, streams of lost partitions are interrupted. Interrupting them prevents these streams from processing and committing more data. A nice side effect is that Zio-kafka is now faster when the rebalance listener was _not_ called; the 'fast track'. The main reason for this change is to prepare awaiting commits from within the rebalance listener which will prevent duplicate consuming of records (see #830). Also: prevent storing pending requests for streams that are no longer assigned. --- build.sbt | 6 +- .../zio/kafka/consumer/ConsumerSpec.scala | 200 ++++++------ .../internal/PartitionStreamControl.scala | 6 +- .../zio/kafka/consumer/internal/Runloop.scala | 297 ++++++++++-------- 4 files changed, 267 insertions(+), 242 deletions(-) diff --git a/build.sbt b/build.sbt index 97713a3ea..cc9d89d3d 100644 --- a/build.sbt +++ b/build.sbt @@ -3,9 +3,9 @@ import sbt.Def lazy val kafkaVersion = "3.6.0" lazy val embeddedKafkaVersion = "3.6.0" // Should be the same as kafkaVersion, except for the patch part -lazy val kafkaClients = "org.apache.kafka" % "kafka-clients" % kafkaVersion -lazy val scalaCollectionCompat = "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0" -lazy val logback = "ch.qos.logback" % "logback-classic" % "1.3.11" +lazy val kafkaClients = "org.apache.kafka" % "kafka-clients" % kafkaVersion +lazy val scalaCollectionCompat = "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0" +lazy val logback = "ch.qos.logback" % "logback-classic" % "1.3.11" enablePlugins(ZioSbtEcosystemPlugin, ZioSbtCiPlugin) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index cada18e17..ee456d1ab 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -738,8 +738,30 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { } yield assertCompletes }, test("restartStreamsOnRebalancing mode closes all partition streams") { + // Test plan: + // - Throughout the test, continuously produce to all partitions of a topic. + // - Start consumer 1: + // - track which partitions are assigned after each rebalance, + // - track which streams stopped. + // - Start consumer 2 but finish after just a few records. This results in 2 rebalances for consumer 1. + // - Verify that in the first rebalance, consumer 1 ends the streams for _all_ partitions, + // and then starts them again. + // + // NOTE: we need to use the cooperative sticky assignor. The default assignor `ConsumerPartitionAssignor`, + // revokes all partitions and re-assigns them on every rebalance. This means that all streams are restarted + // on every rebalance, exactly what `restartStreamOnRebalancing` would have caused. In other words, with the + // default assignor the externally visible behavior is the same, regardless of whether + // `restartStreamOnRebalancing` is `true` or `false`. + val nrPartitions = 5 - val nrMessages = 100 + val partitionIds = Chunk.fromIterable(0 until nrPartitions) + + def awaitRebalance[A](partitionAssignments: Ref[Chunk[A]], nr: Int): ZIO[Any, Nothing, Unit] = + partitionAssignments.get + .repeat( + Schedule.recurUntil((_: Chunk[A]).size >= nr) && Schedule.fixed(100.millis) + ) + .unit for { // Produce messages on several partitions @@ -750,116 +772,94 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { client2 <- randomClient _ <- ZIO.fromTry(EmbeddedKafka.createCustomTopic(topic, partitions = nrPartitions)) - _ <- ZIO.foreachDiscard(1 to nrMessages) { i => - produceMany(topic, partition = i % nrPartitions, kvs = List(s"key$i" -> s"msg$i")) - } - // Consume messages - messagesReceived <- - ZIO.foreach((0 until nrPartitions).toList)(i => Ref.make[Int](0).map(i -> _)).map(_.toMap) - drainCount <- Ref.make(0) - subscription = Subscription.topics(topic) - fib <- ZIO - .logAnnotate("consumer", "1") { - Consumer - .partitionedAssignmentStream(subscription, Serde.string, Serde.string) - .rechunk(1) - .mapZIO { partitions => - ZIO.logDebug(s"Got partition assignment ${partitions.map(_._1).mkString(",")}") *> - ZStream - .fromIterable(partitions) - .flatMapPar(Int.MaxValue) { case (tp, partitionStream) => - ZStream.finalizer(ZIO.logDebug(s"TP ${tp.toString} finalizer")) *> - partitionStream.mapChunksZIO { records => - OffsetBatch(records.map(_.offset)).commit *> messagesReceived(tp.partition) - .update(_ + records.size) - .as(records) - } - } - .runDrain - } - .mapZIO(_ => - drainCount.updateAndGet(_ + 1).flatMap { - case 2 => ZIO.logDebug("Stopping consumption") *> Consumer.stopConsumption - // 1: when consumer on fib2 starts - // 2: when consumer on fib2 stops, end of test - case _ => ZIO.unit - } - ) - .runDrain - .provideSomeLayer[Kafka]( - consumer( - client1, - Some(group), - clientInstanceId = Some("consumer1"), - restartStreamOnRebalancing = true, - properties = Map(ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> "10") - ) - ) + // Continuously produce messages throughout the test + _ <- ZStream + .fromSchedule(Schedule.fixed(100.millis)) + .mapZIO { i => + ZIO.foreach(partitionIds) { p => + produceMany(topic, p, Seq((s"key.$p.$i", s"msg.$p.$i"))) } - .fork - // fib is running, consuming all the published messages from all partitions. - // Waiting until it recorded all messages - _ <- ZIO - .foreach(messagesReceived.values)(_.get) - .map(_.sum) - .repeat(Schedule.recurUntil((n: Int) => n == nrMessages) && Schedule.fixed(100.millis)) - - // Starting a new consumer that will stop after receiving 20 messages, - // causing two rebalancing events for fib1 consumers on start and stop - fib2 <- ZIO - .logAnnotate("consumer", "2") { + } + .runDrain + .forkScoped + + // Consumer 1 + streamsStarted <- Ref.make[Chunk[Set[Int]]](Chunk.empty) + streamsStopped <- Ref.make[Chunk[Int]](Chunk.empty) + consumer1Settings <- + consumerSettings( + client1, + Some(group), + restartStreamOnRebalancing = true + ).map { + _.withProperties( + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG -> classOf[CooperativeStickyAssignor].getName + ) + } + fib1 <- ZIO + .logAnnotate("consumer", "1") { Consumer - .plainStream(subscription, Serde.string, Serde.string) - .take(20) + .partitionedAssignmentStream(Subscription.topics(topic), Serde.string, Serde.string) + .rechunk(1) + .mapZIO { assignments => + ZIO.logDebug(s"Got partition assignment ${assignments.map(_._1).mkString(",")}") *> + streamsStarted.update(_ :+ assignments.map(_._1.partition()).toSet) *> + ZStream + .fromIterable(assignments) + .flatMapPar(Int.MaxValue) { case (tp, partitionStream) => + ZStream.finalizer { + ZIO.logDebug(s"Stream for ${tp.toString} is done") *> + streamsStopped.update(_ :+ tp.partition()) + } *> + partitionStream.mapChunksZIO { records => + OffsetBatch(records.map(_.offset)).commit.as(records) + } + } + .runDrain + } .runDrain .provideSomeLayer[Kafka]( - consumer( - client2, - Some(group), - clientInstanceId = Some("consumer2"), - properties = Map(ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> "10") - ) + ZLayer.succeed(consumer1Settings) >>> minimalConsumer() ) } .fork - // Waiting until fib1's partition streams got restarted because of the rebalancing - _ <- drainCount.get.repeat(Schedule.recurUntil((n: Int) => n == 1) && Schedule.fixed(100.millis)) - _ <- ZIO.logDebug("Consumer 1 finished rebalancing") + // Wait until consumer 1 was assigned some partitions + _ <- awaitRebalance(streamsStarted, 1) - // All messages processed, the partition streams of fib are still running. - // Saving the values and resetting the counters - messagesReceived0 <- - ZIO - .foreach((0 until nrPartitions).toList) { i => - messagesReceived(i).get.flatMap { v => - Ref.make(v).map(r => i -> r) - } <* messagesReceived(i).set(0) - } - .map(_.toMap) + // Consumer 2 + // Stop after receiving 20 messages, causing two rebalancing events for consumer 1. + consumer2Settings <- consumerSettings(client2, Some(group)) + _ <- ZIO + .logAnnotate("consumer", "2") { + Consumer + .plainStream(Subscription.topics(topic), Serde.string, Serde.string) + .take(20) + .runDrain + .provideSomeLayer[Kafka]( + ZLayer.succeed(consumer2Settings) >>> minimalConsumer() + ) + } + .forkScoped - // Publishing another N messages - now they will be distributed among the two consumers until - // fib2 stops after 20 messages - _ <- ZIO.foreachDiscard((nrMessages + 1) to (2 * nrMessages)) { i => - produceMany(topic, partition = i % nrPartitions, kvs = List(s"key$i" -> s"msg$i")) - } - _ <- fib2.join - _ <- ZIO.logDebug("Consumer 2 done") - _ <- fib.join - _ <- ZIO.logDebug("Consumer 1 done") - // fib2 terminates after 20 messages, fib terminates after fib2 because of the rebalancing (drainCount==2) - messagesPerPartition0 <- - ZIO.foreach(messagesReceived0.values)(_.get) // counts from the first N messages (single consumer) - messagesPerPartition <- - ZIO.foreach(messagesReceived.values)(_.get) // counts from fib after the second consumer joined - - // The first set must contain all the produced messages - // The second set must have at least one and maximum N-20 (because fib2 stops after consuming 20) - - // the exact count cannot be known because fib2's termination triggers fib1's rebalancing asynchronously. - } yield assert(messagesPerPartition0)(forall(equalTo(nrMessages / nrPartitions))) && - assert(messagesPerPartition.view.sum)(isGreaterThan(0) && isLessThanEqualTo(nrMessages - 20)) - } @@ TestAspect.nonFlaky(3), + // Wait until consumer 1's partitions were revoked, and assigned again + _ <- awaitRebalance(streamsStarted, 3) + _ <- fib1.interrupt + + // The started streams after each rebalance + streamsStarted <- streamsStarted.get + _ <- ZIO.logDebug(s"partitions for started streams: $streamsStarted") + + streamsStopped <- streamsStopped.get + _ <- ZIO.logDebug(s"partitions for stopped streams: $streamsStopped") + } yield assertTrue( + // During the first rebalance, all partitions are stopped: + streamsStopped.take(nrPartitions).toSet == partitionIds.toSet, + // Some streams that were assigned at the beginning, are started after the first rebalance: + (streamsStarted(0) intersect streamsStarted(1)).nonEmpty + ) + }, test("handles RebalanceInProgressExceptions transparently") { val nrPartitions = 5 val nrMessages = 10000 diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala index 85f58b044..0e0b3882e 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala @@ -72,8 +72,10 @@ final class PartitionStreamControl private ( queueInfoRef.get.map(_.deadlineExceeded(now)) /** To be invoked when the partition was lost. */ - private[internal] def lost: UIO[Boolean] = - interruptionPromise.fail(new RuntimeException(s"Partition ${tp.toString} was lost")) + private[internal] def lost: UIO[Boolean] = { + val lostException = new RuntimeException(s"Partition ${tp.toString} was lost") with NoStackTrace + interruptionPromise.fail(lostException) + } /** To be invoked when the stream is no longer processing. */ private[internal] def halt: UIO[Boolean] = { diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 586bacff7..09680ec05 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -28,7 +28,7 @@ private[consumer] final class Runloop private ( maxPollInterval: Duration, commitTimeout: Duration, commandQueue: Queue[RunloopCommand], - lastRebalanceEvent: Ref.Synchronized[Option[Runloop.RebalanceEvent]], + lastRebalanceEvent: Ref.Synchronized[Runloop.RebalanceEvent], partitionsHub: Hub[Take[Throwable, PartitionAssignment]], diagnostics: Diagnostics, offsetRetrieval: OffsetRetrieval, @@ -71,45 +71,59 @@ private[consumer] final class Runloop private ( commandQueue.offer(RunloopCommand.RemoveSubscription(subscription)).unit private val rebalanceListener: RebalanceListener = { - val emitDiagnostics = RebalanceListener( - (assigned, _) => diagnostics.emit(DiagnosticEvent.Rebalance.Assigned(assigned)), - (revoked, _) => diagnostics.emit(DiagnosticEvent.Rebalance.Revoked(revoked)), - (lost, _) => diagnostics.emit(DiagnosticEvent.Rebalance.Lost(lost)) - ) - - def restartStreamsRebalancingListener = RebalanceListener( - onAssigned = (assigned, _) => - ZIO.logDebug("Rebalancing completed") *> - lastRebalanceEvent.updateZIO { - case None => - ZIO.some(Runloop.RebalanceEvent.Assigned(assigned)) - case Some(Runloop.RebalanceEvent.Revoked(revokeResult)) => - ZIO.some(Runloop.RebalanceEvent.RevokedAndAssigned(revokeResult, assigned)) - case Some(_) => - ZIO.fail(new IllegalStateException(s"Multiple onAssigned calls on rebalance listener")) - }, - onRevoked = (_, _) => + // During a poll, the java kafka client might call each method of the rebalance listener 0 or 1 times. + // We do not know the order in which the call-back methods are invoked. + // + // Ref `lastRebalanceEvent` is used to track what happens during the poll. Just before the poll the + // `RebalanceEvent.None` is stored. Then during the poll, inside each method of the rebalance listener, + // the ref is updated. + // + // Each method: + // - emits a diagnostic event + // - determines if this is the first method invoked during this poll (`rebalanceEvent.wasInvoked`) to + // make sure that the `restartStreamsOnRebalancing` feature is applied only once per poll + // - ends streams that need to be ended + // - updates `lastRebalanceEvent` + // + val recordRebalanceRebalancingListener = RebalanceListener( + onAssigned = (assignedTps, _) => for { - _ <- ZIO.logDebug("Rebalancing started") - state <- currentStateRef.get - // End all streams - result <- endRevokedPartitions(state.pendingRequests, state.assignedStreams, isRevoked = _ => true) - _ <- lastRebalanceEvent.updateZIO { - case None => - ZIO.some(Runloop.RebalanceEvent.Revoked(result)) - case _ => - ZIO.fail( - new IllegalStateException(s"onRevoked called on rebalance listener with pending assigned event") - ) - } + _ <- ZIO.logDebug(s"${assignedTps.size} partitions are assigned") + _ <- diagnostics.emit(DiagnosticEvent.Rebalance.Assigned(assignedTps)) + rebalanceEvent <- lastRebalanceEvent.get + state <- currentStateRef.get + streamsToEnd = if (restartStreamsOnRebalancing && !rebalanceEvent.wasInvoked) state.assignedStreams + else Chunk.empty + _ <- ZIO.foreachDiscard(streamsToEnd)(_.end) + _ <- lastRebalanceEvent.set(rebalanceEvent.onAssigned(assignedTps, endedStreams = streamsToEnd)) + _ <- ZIO.logTrace("onAssigned done") + } yield (), + onRevoked = (revokedTps, _) => + for { + _ <- ZIO.logDebug(s"${revokedTps.size} partitions are revoked") + _ <- diagnostics.emit(DiagnosticEvent.Rebalance.Revoked(revokedTps)) + rebalanceEvent <- lastRebalanceEvent.get + state <- currentStateRef.get + streamsToEnd = if (restartStreamsOnRebalancing && !rebalanceEvent.wasInvoked) state.assignedStreams + else state.assignedStreams.filter(control => revokedTps.contains(control.tp)) + _ <- ZIO.foreachDiscard(streamsToEnd)(_.end) + _ <- lastRebalanceEvent.set(rebalanceEvent.onRevoked(revokedTps, endedStreams = streamsToEnd)) + _ <- ZIO.logTrace("onRevoked done") + } yield (), + onLost = (lostTps, _) => + for { + _ <- ZIO.logDebug(s"${lostTps.size} partitions are lost") + _ <- diagnostics.emit(DiagnosticEvent.Rebalance.Lost(lostTps)) + rebalanceEvent <- lastRebalanceEvent.get + state <- currentStateRef.get + lostStreams = state.assignedStreams.filter(control => lostTps.contains(control.tp)) + _ <- ZIO.foreachDiscard(lostStreams)(_.lost) + _ <- lastRebalanceEvent.set(rebalanceEvent.onLost(lostTps)) + _ <- ZIO.logTrace(s"onLost done") } yield () ) - if (restartStreamsOnRebalancing) { - emitDiagnostics ++ restartStreamsRebalancingListener ++ userRebalanceListener - } else { - emitDiagnostics ++ userRebalanceListener - } + recordRebalanceRebalancingListener ++ userRebalanceListener } /** This is the implementation behind the user facing api `Offset.commit`. */ @@ -284,12 +298,13 @@ private[consumer] final class Runloop private ( private def handlePoll(state: State): Task[State] = for { - _ <- - ZIO.logDebug( - s"Starting poll with ${state.pendingRequests.size} pending requests and ${state.pendingCommits.size} pending commits" - ) - _ <- currentStateRef.set(state) partitionsToFetch <- fetchStrategy.selectPartitionsToFetch(state.assignedStreams) + _ <- ZIO.logDebug( + s"Starting poll with ${state.pendingRequests.size} pending requests and" + + s" ${state.pendingCommits.size} pending commits," + + s" resuming ${partitionsToFetch} partitions" + ) + _ <- currentStateRef.set(state) pollResult <- consumer.runloopAccess { c => ZIO.suspend { @@ -302,101 +317,86 @@ private[consumer] final class Runloop private ( if (records eq null) ConsumerRecords.empty[Array[Byte], Array[Byte]]() else records } - val currentAssigned = c.assignment().asScala.toSet - val newlyAssigned = currentAssigned -- prevAssigned - - for { - ignoreRecordsForTps <- doSeekForNewPartitions(c, newlyAssigned) - - rebalanceEvent <- lastRebalanceEvent.getAndSet(None) - - revokeResult <- rebalanceEvent match { - case Some(Runloop.RebalanceEvent.Revoked(result)) => - // If we get here, `restartStreamsOnRebalancing == true` - // Use revoke result from endRevokedPartitions that was called previously in the rebalance listener - ZIO.succeed(result) - case Some(Runloop.RebalanceEvent.RevokedAndAssigned(result, _)) => - // If we get here, `restartStreamsOnRebalancing == true` - // Use revoke result from endRevokedPartitions that was called previously in the rebalance listener - ZIO.succeed(result) - case Some(Runloop.RebalanceEvent.Assigned(_)) => - // If we get here, `restartStreamsOnRebalancing == true` - // endRevokedPartitions was not called yet in the rebalance listener, - // and all partitions should be revoked - endRevokedPartitions( - state.pendingRequests, - state.assignedStreams, - isRevoked = _ => true - ) - case None => - // End streams for partitions that are no longer assigned - endRevokedPartitions( - state.pendingRequests, - state.assignedStreams, - isRevoked = (tp: TopicPartition) => !currentAssigned.contains(tp) - ) - } - - startingTps = rebalanceEvent match { - case Some(_) => - // If we get here, `restartStreamsOnRebalancing == true`, - // some partitions were revoked and/or assigned and - // all already assigned streams were ended. - // Therefore, all currently assigned tps are starting, - // either because they are restarting, or because they - // are new. - currentAssigned - case None => - newlyAssigned - } - - _ <- diagnostics.emit { - val providedTps = polledRecords.partitions().asScala.toSet - val requestedPartitions = state.pendingRequests.map(_.tp).toSet - - DiagnosticEvent.Poll( - tpRequested = requestedPartitions, - tpWithData = providedTps, - tpWithoutData = requestedPartitions -- providedTps - ) - } + diagnostics.emit { + val providedTps = polledRecords.partitions().asScala.toSet + val requestedPartitions = state.pendingRequests.map(_.tp).toSet + + DiagnosticEvent.Poll( + tpRequested = requestedPartitions, + tpWithData = providedTps, + tpWithoutData = requestedPartitions -- providedTps + ) + } *> + lastRebalanceEvent.getAndSet(RebalanceEvent.None).flatMap { + case RebalanceEvent(false, _, _, _, _) => + // The fast track, rebalance listener was not invoked: + // no assignment changes, only new records. + ZIO.succeed( + PollResult( + records = polledRecords, + ignoreRecordsForTps = Set.empty, + pendingRequests = state.pendingRequests, + assignedStreams = state.assignedStreams + ) + ) - } yield Runloop.PollResult( - startingTps = startingTps, - pendingRequests = revokeResult.pendingRequests, - assignedStreams = revokeResult.assignedStreams, - records = polledRecords, - ignoreRecordsForTps = ignoreRecordsForTps - ) + case RebalanceEvent(true, assignedTps, revokedTps, lostTps, endedStreams) => + // The slow track, the rebalance listener was invoked: + // some partitions were assigned, revoked or lost, + // some streams have ended. + + val currentAssigned = c.assignment().asScala.toSet + val endedTps = endedStreams.map(_.tp).toSet + for { + ignoreRecordsForTps <- doSeekForNewPartitions(c, assignedTps) + + // The topic partitions that need a new stream are: + // 1. Those that are freshly assigned + // 2. Those that are still assigned but were ended in the rebalance listener because + // of `restartStreamsOnRebalancing` being true + startingTps = assignedTps ++ (currentAssigned intersect endedTps) + + startingStreams <- + ZIO.foreach(Chunk.fromIterable(startingTps))(newPartitionStream).tap { newStreams => + ZIO.logDebug(s"Offering partition assignment $startingTps") *> + partitionsHub.publish( + Take.chunk(newStreams.map(_.tpStream)) + ) + } + + updatedAssignedStreams = + state.assignedStreams.filter(s => !endedTps.contains(s.tp)) ++ startingStreams + + // Remove pending requests for all streams that ended: + // 1. streams that were ended because the partition was lost + // 2. streams that were ended because the partition was revoked + // 3. streams that were ended because of `restartStreamsOnRebalancing` being true + updatedPendingRequests = + state.pendingRequests.filter { pendingRequest => + val tp = pendingRequest.tp + !(lostTps.contains(tp) || revokedTps.contains(tp) || endedStreams.exists(_.tp == tp)) + } + } yield Runloop.PollResult( + records = polledRecords, + ignoreRecordsForTps = ignoreRecordsForTps, + pendingRequests = updatedPendingRequests, + assignedStreams = updatedAssignedStreams + ) + } } } - startingStreams <- - if (pollResult.startingTps.isEmpty) { - ZIO.succeed(Chunk.empty[PartitionStreamControl]) - } else { - ZIO - .foreach(Chunk.fromIterable(pollResult.startingTps))(newPartitionStream) - .tap { newStreams => - ZIO.logDebug(s"Offering partition assignment ${pollResult.startingTps}") *> - partitionsHub.publish(Take.chunk(Chunk.fromIterable(newStreams.map(_.tpStream)))) - } - } - runningStreams <- ZIO.filter(pollResult.assignedStreams)(_.isRunning) - updatedStreams = runningStreams ++ startingStreams fulfillResult <- offerRecordsToStreams( - updatedStreams, + pollResult.assignedStreams, pollResult.pendingRequests, pollResult.ignoreRecordsForTps, pollResult.records ) updatedPendingCommits <- ZIO.filter(state.pendingCommits)(_.isPending) - // Using `runningStreams` instead of `updatedStreams` because starting streams cannot exceed - // their poll interval yet: - _ <- checkStreamPollInterval(runningStreams) + _ <- checkStreamPollInterval(pollResult.assignedStreams) } yield state.copy( pendingRequests = fulfillResult.pendingRequests, pendingCommits = updatedPendingCommits, - assignedStreams = updatedStreams + assignedStreams = pollResult.assignedStreams ) /** @@ -438,7 +438,12 @@ private[consumer] final class Runloop private ( } cmd match { - case req: RunloopCommand.Request => ZIO.succeed(state.addRequest(req)) + case req: RunloopCommand.Request => + // Ignore request from streams that were ended or lost. + ZIO.succeed( + if (state.assignedStreams.exists(_.tp == req.tp)) state.addRequest(req) + else state + ) case cmd @ RunloopCommand.AddSubscription(newSubscription, _) => state.subscriptionState match { case SubscriptionState.NotSubscribed => @@ -580,11 +585,10 @@ private[consumer] object Runloop { type ByteArrayCommittableRecord = CommittableRecord[Array[Byte], Array[Byte]] private final case class PollResult( - startingTps: Set[TopicPartition], - pendingRequests: Chunk[RunloopCommand.Request], - assignedStreams: Chunk[PartitionStreamControl], records: ConsumerRecords[Array[Byte], Array[Byte]], - ignoreRecordsForTps: Set[TopicPartition] + ignoreRecordsForTps: Set[TopicPartition], + pendingRequests: Chunk[RunloopCommand.Request], + assignedStreams: Chunk[PartitionStreamControl] ) private final case class RevokeResult( pendingRequests: Chunk[RunloopCommand.Request], @@ -594,14 +598,33 @@ private[consumer] object Runloop { pendingRequests: Chunk[RunloopCommand.Request] ) - private sealed trait RebalanceEvent + private final case class RebalanceEvent( + wasInvoked: Boolean, + assignedTps: Set[TopicPartition], + revokedTps: Set[TopicPartition], + lostTps: Set[TopicPartition], + endedStreams: Chunk[PartitionStreamControl] + ) { + def onAssigned(assigned: Set[TopicPartition], endedStreams: Chunk[PartitionStreamControl]): RebalanceEvent = + copy( + wasInvoked = true, + assignedTps = assignedTps ++ assigned, + endedStreams = this.endedStreams ++ endedStreams + ) + + def onRevoked(revoked: Set[TopicPartition], endedStreams: Chunk[PartitionStreamControl]): RebalanceEvent = + copy( + wasInvoked = true, + revokedTps = revokedTps ++ revoked, + endedStreams = this.endedStreams ++ endedStreams + ) + + def onLost(lost: Set[TopicPartition]): RebalanceEvent = + copy(wasInvoked = true, lostTps = lostTps ++ lost) + } + private object RebalanceEvent { - final case class Revoked(revokeResult: Runloop.RevokeResult) extends RebalanceEvent - final case class Assigned(newlyAssigned: Set[TopicPartition]) extends RebalanceEvent - final case class RevokedAndAssigned( - revokeResult: Runloop.RevokeResult, - newlyAssigned: Set[TopicPartition] - ) extends RebalanceEvent + val None: RebalanceEvent = RebalanceEvent(wasInvoked = false, Set.empty, Set.empty, Set.empty, Chunk.empty) } def make( @@ -620,7 +643,7 @@ private[consumer] object Runloop { for { _ <- ZIO.addFinalizer(diagnostics.emit(Finalization.RunloopFinalized)) commandQueue <- ZIO.acquireRelease(Queue.unbounded[RunloopCommand])(_.shutdown) - lastRebalanceEvent <- Ref.Synchronized.make[Option[Runloop.RebalanceEvent]](None) + lastRebalanceEvent <- Ref.Synchronized.make[Runloop.RebalanceEvent](Runloop.RebalanceEvent.None) initialState = State.initial currentStateRef <- Ref.make(initialState) runtime <- ZIO.runtime[Any]