From b4f95d31e1ff9adc33e0d1c129ced051bdcc087c Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Tue, 29 Oct 2024 11:12:16 +0100 Subject: [PATCH 1/8] Remove lost partitions from assigned streams Fixes #1288 --- .../main/scala/zio/kafka/consumer/internal/Runloop.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 32d0b162c..6f2aa27ec 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -236,7 +236,7 @@ private[consumer] final class Runloop private ( state <- currentStateRef.get lostStreams = state.assignedStreams.filter(control => lostTps.contains(control.tp)) _ <- ZIO.foreachDiscard(lostStreams)(_.lost) - _ <- lastRebalanceEvent.set(rebalanceEvent.onLost(lostTps)) + _ <- lastRebalanceEvent.set(rebalanceEvent.onLost(lostTps, lostStreams)) _ <- ZIO.logTrace(s"onLost done") } yield () ) @@ -830,11 +830,12 @@ object Runloop { endedStreams = this.endedStreams ++ endedStreams ) - def onLost(lost: Set[TopicPartition]): RebalanceEvent = + def onLost(lost: Set[TopicPartition], endedStreams: Chunk[PartitionStreamControl]): RebalanceEvent = copy( wasInvoked = true, assignedTps = assignedTps -- lost, - lostTps = lostTps ++ lost + lostTps = lostTps ++ lost, + endedStreams = this.endedStreams ++ endedStreams ) } From e7ad36702c98ea9c6cd065e3f966b0e77103fd34 Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Tue, 29 Oct 2024 14:52:28 +0100 Subject: [PATCH 2/8] Log a warning --- .../main/scala/zio/kafka/consumer/internal/Runloop.scala | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 6f2aa27ec..c98203cc9 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -560,6 +560,13 @@ private[consumer] final class Runloop private ( ended = endedStreams.map(_.tp).toSet ) ) + // Ensure that all assigned partitions have a stream and no streams are present for unassigned streams + _ <- + ZIO + .logWarning( + s"Not all assigned partitions have a stream or vice versa. Assigned: ${assignedTps.mkString(",")}, streams: ${state.assignedStreams.map(_.tp).mkString(",")}" + ) + .when(assignedTps != state.assignedStreams.map(_.tp).toSet) } yield Runloop.PollResult( records = polledRecords, ignoreRecordsForTps = ignoreRecordsForTps, From fded40a418b6694693c55d04e9b579cae57bb1b7 Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Tue, 29 Oct 2024 14:53:37 +0100 Subject: [PATCH 3/8] Extend condition --- .../main/scala/zio/kafka/consumer/internal/Runloop.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index c98203cc9..f90b76b7a 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -564,9 +564,13 @@ private[consumer] final class Runloop private ( _ <- ZIO .logWarning( - s"Not all assigned partitions have a stream or vice versa. Assigned: ${assignedTps.mkString(",")}, streams: ${state.assignedStreams.map(_.tp).mkString(",")}" + s"Not all assigned partitions have a (single) stream or vice versa. Assigned: ${assignedTps.mkString(",")}, streams: ${state.assignedStreams.map(_.tp).mkString(",")}" + ) + .when( + assignedTps != state.assignedStreams + .map(_.tp) + .toSet || assignedTps.size != state.assignedStreams.size ) - .when(assignedTps != state.assignedStreams.map(_.tp).toSet) } yield Runloop.PollResult( records = polledRecords, ignoreRecordsForTps = ignoreRecordsForTps, From b61789d9083b3642ff833711851b7f1229936954 Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Tue, 29 Oct 2024 14:58:44 +0100 Subject: [PATCH 4/8] Use updated state --- .../main/scala/zio/kafka/consumer/internal/Runloop.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index f90b76b7a..8984db4b8 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -564,12 +564,12 @@ private[consumer] final class Runloop private ( _ <- ZIO .logWarning( - s"Not all assigned partitions have a (single) stream or vice versa. Assigned: ${assignedTps.mkString(",")}, streams: ${state.assignedStreams.map(_.tp).mkString(",")}" + s"Not all assigned partitions have a (single) stream or vice versa. Assigned: ${assignedTps.mkString(",")}, streams: ${updatedAssignedStreams.map(_.tp).mkString(",")}" ) .when( - assignedTps != state.assignedStreams + assignedTps != updatedAssignedStreams .map(_.tp) - .toSet || assignedTps.size != state.assignedStreams.size + .toSet || assignedTps.size != updatedAssignedStreams.size ) } yield Runloop.PollResult( records = polledRecords, From 869e77947e41ec87d36f3c9543895c1fc5e41995 Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Tue, 29 Oct 2024 15:01:13 +0100 Subject: [PATCH 5/8] Faster order of check --- .../main/scala/zio/kafka/consumer/internal/Runloop.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 8984db4b8..4f3e81f49 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -567,9 +567,10 @@ private[consumer] final class Runloop private ( s"Not all assigned partitions have a (single) stream or vice versa. Assigned: ${assignedTps.mkString(",")}, streams: ${updatedAssignedStreams.map(_.tp).mkString(",")}" ) .when( - assignedTps != updatedAssignedStreams - .map(_.tp) - .toSet || assignedTps.size != updatedAssignedStreams.size + assignedTps.size != updatedAssignedStreams.size || + assignedTps != updatedAssignedStreams + .map(_.tp) + .toSet ) } yield Runloop.PollResult( records = polledRecords, From 0df325e104e37996e3962bb69afc9d48dbee8fc2 Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Tue, 29 Oct 2024 18:07:44 +0100 Subject: [PATCH 6/8] Fix check --- .../main/scala/zio/kafka/consumer/internal/Runloop.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 4f3e81f49..ed06b70da 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -564,13 +564,12 @@ private[consumer] final class Runloop private ( _ <- ZIO .logWarning( - s"Not all assigned partitions have a (single) stream or vice versa. Assigned: ${assignedTps.mkString(",")}, streams: ${updatedAssignedStreams.map(_.tp).mkString(",")}" + s"Not all assigned partitions have a (single) stream or vice versa. Assigned: ${currentAssigned.mkString(",")}, streams: ${updatedAssignedStreams.map(_.tp).mkString(",")}" ) .when( - assignedTps.size != updatedAssignedStreams.size || - assignedTps != updatedAssignedStreams - .map(_.tp) - .toSet + currentAssigned != updatedAssignedStreams + .map(_.tp) + .toSet || currentAssigned.size != updatedAssignedStreams.size ) } yield Runloop.PollResult( records = polledRecords, From 6af06ece658edebd1e9f14e76d87ef541eff2a45 Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Tue, 29 Oct 2024 19:08:24 +0100 Subject: [PATCH 7/8] Test case + enable logging --- .../kafka/consumer/internal/RunloopSpec.scala | 68 +++++++++++++++++-- 1 file changed, 63 insertions(+), 5 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopSpec.scala index 126ed41fa..be5ec19d4 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopSpec.scala @@ -1,20 +1,27 @@ package zio.kafka.consumer.internal -import org.apache.kafka.clients.consumer.{ ConsumerRecord, MockConsumer, OffsetResetStrategy } +import org.apache.kafka.clients.consumer.{ + ConsumerRebalanceListener, + ConsumerRecord, + MockConsumer, + OffsetResetStrategy +} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.{ AuthenticationException, AuthorizationException } import zio._ -import zio.kafka.consumer.{ ConsumerSettings, Subscription } +import zio.kafka.ZIOSpecDefaultSlf4j import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics } import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment +import zio.kafka.consumer.{ ConsumerSettings, Subscription } import zio.metrics.{ MetricState, Metrics } import zio.stream.{ Take, ZStream } import zio.test.TestAspect.withLiveClock import zio.test._ +import java.util import scala.jdk.CollectionConverters._ -object RunloopSpec extends ZIOSpecDefault { +object RunloopSpec extends ZIOSpecDefaultSlf4j { private type BinaryMockConsumer = MockConsumer[Array[Byte], Array[Byte]] private type PartitionsHub = Hub[Take[Throwable, PartitionAssignment]] @@ -93,6 +100,55 @@ object RunloopSpec extends ZIOSpecDefault { } } }, + test( + "runloop continues polling after a lost partition" + ) { + Diagnostics.SlidingQueue.make(100).flatMap { diagnostics => + var rebalanceListener: ConsumerRebalanceListener = null + + // Catches the rebalance listener so we can + val mockConsumer: BinaryMockConsumer = new BinaryMockConsumer(OffsetResetStrategy.LATEST) { + override def subscribe( + topics: util.Collection[String], + listener: ConsumerRebalanceListener + ): Unit = { + rebalanceListener = listener + super.subscribe(topics, listener) + } + } + + withRunloop(diagnostics, mockConsumer) { (mockConsumer, partitionsHub, runloop) => + mockConsumer.schedulePollTask { () => + mockConsumer.updateEndOffsets(Map(tp10 -> Long.box(0L), tp11 -> Long.box(0L)).asJava) + mockConsumer.rebalance(Seq(tp10, tp11).asJava) + } + mockConsumer.schedulePollTask { () => + rebalanceListener.onPartitionsLost(Seq(tp10, tp11).asJava) + } + mockConsumer.schedulePollTask { () => + mockConsumer.rebalance(Seq.empty.asJava) + mockConsumer.rebalance(Seq(tp10, tp11).asJava) + } + + for { + streamStream <- ZStream.fromHubScoped(partitionsHub) + _ <- runloop.addSubscription(Subscription.Topics(Set(tp10, tp11).map(_.topic()))) + result <- streamStream + .map(_.exit) + .flattenExitOption + .flattenChunks + .take(3) + .mapZIO { case (_, stream) => + stream.runHead + } + .runDrain + .timeout(10.seconds) + } yield assertTrue( + result.isDefined + ) // Test will not finish if polling did not continue after partitions lost + } + } + }, test("runloop retries poll upon AuthorizationException and AuthenticationException") { withRunloop() { (mockConsumer, partitionsHub, runloop) => mockConsumer.schedulePollTask { () => @@ -131,11 +187,13 @@ object RunloopSpec extends ZIOSpecDefault { } ) @@ withLiveClock - private def withRunloop(diagnostics: Diagnostics = Diagnostics.NoOp)( + private def withRunloop( + diagnostics: Diagnostics = Diagnostics.NoOp, + mockConsumer: BinaryMockConsumer = new BinaryMockConsumer(OffsetResetStrategy.LATEST) + )( f: (BinaryMockConsumer, PartitionsHub, Runloop) => ZIO[Scope, Throwable, TestResult] ): ZIO[Scope, Throwable, TestResult] = ZIO.scoped { - val mockConsumer = new BinaryMockConsumer(OffsetResetStrategy.LATEST) for { consumerAccess <- ConsumerAccess.make(mockConsumer) consumerScope <- ZIO.scope From 6c8d287162cbd84941315a53352a00b95ae07165 Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Tue, 29 Oct 2024 19:42:34 +0100 Subject: [PATCH 8/8] Finish comment --- .../test/scala/zio/kafka/consumer/internal/RunloopSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopSpec.scala index be5ec19d4..e5a4ac71d 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopSpec.scala @@ -106,7 +106,7 @@ object RunloopSpec extends ZIOSpecDefaultSlf4j { Diagnostics.SlidingQueue.make(100).flatMap { diagnostics => var rebalanceListener: ConsumerRebalanceListener = null - // Catches the rebalance listener so we can + // Catches the rebalance listener so we can use it val mockConsumer: BinaryMockConsumer = new BinaryMockConsumer(OffsetResetStrategy.LATEST) { override def subscribe( topics: util.Collection[String],