diff --git a/.github/workflows/profile.yml b/.github/workflows/profile.yml index ab7ee46ff..5191dbe13 100644 --- a/.github/workflows/profile.yml +++ b/.github/workflows/profile.yml @@ -1,16 +1,9 @@ # Copied from Flavio W. Brasil's work on Kyo: https://github.com/fwbrasil/kyo name: profile on: - push: - branches: - - main - pull_request: - types: [ opened, reopened, synchronize ] - -# Prevent multiple builds at the same time from the same branch (except for 'master'). -concurrency: - group: ${{ github.workflow }}-${{ github.ref == format('refs/heads/{0}', github.event.repository.default_branch) && github.run_id || github.ref }} - cancel-in-progress: true + release: + types: [ created ] + workflow_dispatch: permissions: contents: write diff --git a/build.sbt b/build.sbt index f1bbc88ca..425c2c41a 100644 --- a/build.sbt +++ b/build.sbt @@ -28,8 +28,8 @@ lazy val binCompatVersionToCompare = compatVersion } -lazy val kafkaVersion = "3.8.1" -lazy val embeddedKafkaVersion = "3.8.1" // Should be the same as kafkaVersion, except for the patch part +lazy val kafkaVersion = "3.9.0" +lazy val embeddedKafkaVersion = "3.9.0" // Should be the same as kafkaVersion, except for the patch part lazy val kafkaClients = "org.apache.kafka" % "kafka-clients" % kafkaVersion lazy val logback = "ch.qos.logback" % "logback-classic" % "1.5.12" diff --git a/docs/consumer-tuning.md b/docs/consumer-tuning.md index 625e5c336..374d30b27 100644 --- a/docs/consumer-tuning.md +++ b/docs/consumer-tuning.md @@ -57,8 +57,7 @@ the partition queues. A very rough estimate for the maximum amount of heap neede The total can be tuned by changing the `partitionPreFetchBufferLimit`, `max.poll.records` settings. Another option is to write a custom `FetchStrategy`. For example the `ManyPartitionsQueueSizeBasedFetchStrategy` in -[draft PR 970](https://github.com/zio/zio-kafka/pull/970) (merged into zio-kafka since 2.8.1). Note that the fetch strategy API is marked as -experimental and may change without notice in any future zio-kafka version. +[draft PR 970](https://github.com/zio/zio-kafka/pull/970) (merged into zio-kafka since 2.8.1). ## Long processing durations diff --git a/zio-kafka-bench/README.md b/zio-kafka-bench/README.md new file mode 100644 index 000000000..7cf5e337c --- /dev/null +++ b/zio-kafka-bench/README.md @@ -0,0 +1,87 @@ +# Comparison Benchmarks + +## Results + +The benchmark are run from a GitHub action on every commit. The results are published +on https://zio.github.io/zio-kafka/dev/bench/. + +The results are automatically pruned by [a scala script](https://github.com/zio/zio-kafka/blob/gh-pages/scripts/prune-benchmark-history.sc) on the `gh-pages` branch. + +## The consumer benchmarks + +When comparing the zio-kafka benchmarks against the regular Kafka clients, keep in mind that these benchmarks represent +the worst possible case for zio-kafka. This is because these consumers only count the received records, there is no +processing. This makes the comparison look bad for zio-kafka because zio-kafka programs normally process records in +parallel, while other Kafka consumers process records serially. + +All consumer benchmarks send 50k ~512 byte records per run. + +#### zio.kafka.bench.ZioKafkaConsumerBenchmark.throughput + +Uses zio-kafka's `plainStream` with a topic subscription. The offsets of the consumed records are _not_ committed. + +#### zio.kafka.bench.ZioKafkaConsumerBenchmark.throughputWithCommits + +Same as above, but now the offsets of the consumed records are committed. + +#### zio.kafka.bench.comparison.KafkaClientBenchmarks.kafkaClients + +The simplest possible Kafka client that subscribes to a topic. It directly calls the poll method in a tight loop. + +#### zio.kafka.bench.comparison.KafkaClientBenchmarks.manualKafkaClients + +Same as above, but now using partition assignment instead of topic subscription. + +#### zio.kafka.bench.comparison.ZioKafkaBenchmarks.zioKafka + +Does the same as `zio.kafka.bench.ZioKafkaConsumerBenchmark.throughput`. + +#### zio.kafka.bench.comparison.ZioKafkaBenchmarks.manualZioKafka + +Does the same as `zio.kafka.bench.ZioKafkaConsumerBenchmark.throughput`, but uses a partition assignment instead of a +topic subscription. + +## The producer benchmarks + +#### zio.kafka.bench.ZioKafkaProducerBenchmark.produceChunkSeq + +Sequentially produces 30 batches, where each batch contains 500 small records. + +#### zio.kafka.bench.ZioKafkaProducerBenchmark.produceChunkPar + +Produces the same batches as the above, but from 4 fibers. + +#### zio.kafka.bench.ZioKafkaProducerBenchmark.produceSingleRecordSeq + +Sequentially produces 100 small records. + +#### zio.kafka.bench.ZioKafkaProducerBenchmark.produceSingleRecordPar + +Produces 100 small records from 4 fibers. + +## How to run the benchmarks + +To run these "comparison" benchmarks, in a sbt console, run: + +```scala +clean +Test/compile +zioKafkaBench/Jmh/run -wi 10 -i 10 -r 1 -w 1 -t 1 -f 5 -foe true .*comparison.* +``` + +The `.*comparison.*` part is the selector telling to JMH which benchmarks to run. +Here, we're only selecting the ones living in the `comparison` package. + +## Tuning JMH runs + +To list all possible options and understand these configurations, see run `sbt "zioKafkaBench/Jmh/run -h"` + +Used options meaning: + + - "-wi 10": 10 warmup iterations + - "-i 10": 10 benchmark iterations + - "-r 1": Minimum time to spend at each measurement iteration. 1 second + - "-w 1": Minimum time to spend at each warmup iteration. 1 second + - "-t 1": Number of worker threads to run with. 1 thread + - "-f 5": How many times to fork a single benchmark. 5 forks + - "-foe true": Should JMH fail immediately if any benchmark had experienced an unrecoverable error?. True \ No newline at end of file diff --git a/zio-kafka-bench/src/main/scala/zio/kafka/bench/ZioBenchmark.scala b/zio-kafka-bench/src/main/scala/zio/kafka/bench/ZioBenchmark.scala index 68880018e..0df95b7d1 100644 --- a/zio-kafka-bench/src/main/scala/zio/kafka/bench/ZioBenchmark.scala +++ b/zio-kafka-bench/src/main/scala/zio/kafka/bench/ZioBenchmark.scala @@ -5,6 +5,23 @@ import zio.{ ZLayer, _ } import java.util.UUID +trait ConsumerZioBenchmark[Environment] extends ZioBenchmark[Environment] { + private val recordDataSize = 512 + private def genString(i: Int): String = i.toString + scala.util.Random.alphanumeric.take(recordDataSize).mkString + + protected val recordCount: Int = 50000 + protected val kvs: Iterable[(String, String)] = Iterable.tabulate(recordCount)(i => (s"key$i", genString(i))) + protected val topic1 = "topic1" + protected val partitionCount = 6 +} + +trait ProducerZioBenchmark[Environment] extends ZioBenchmark[Environment] { + protected val recordCount = 500 + protected val kvs: List[(String, String)] = List.tabulate(recordCount)(i => (s"key$i", s"msg$i")) + protected val topic1 = "topic1" + protected val partitionCount = 6 +} + trait ZioBenchmark[Environment] { var runtime: Runtime.Scoped[Environment] = _ diff --git a/zio-kafka-bench/src/main/scala/zio/kafka/bench/ConsumerBenchmark.scala b/zio-kafka-bench/src/main/scala/zio/kafka/bench/ZioKafkaConsumerBenchmark.scala similarity index 85% rename from zio-kafka-bench/src/main/scala/zio/kafka/bench/ConsumerBenchmark.scala rename to zio-kafka-bench/src/main/scala/zio/kafka/bench/ZioKafkaConsumerBenchmark.scala index 2695b05cb..801801b49 100644 --- a/zio-kafka-bench/src/main/scala/zio/kafka/bench/ConsumerBenchmark.scala +++ b/zio-kafka-bench/src/main/scala/zio/kafka/bench/ZioKafkaConsumerBenchmark.scala @@ -16,18 +16,14 @@ import java.util.concurrent.TimeUnit @State(Scope.Benchmark) @OutputTimeUnit(TimeUnit.MILLISECONDS) -class ConsumerBenchmark extends ZioBenchmark[Kafka with Producer] { - val topic1 = "topic1" - val nrPartitions = 6 - val nrMessages = 50000 - val kvs: List[(String, String)] = List.tabulate(nrMessages)(i => (s"key$i", s"msg$i")) +class ZioKafkaConsumerBenchmark extends ConsumerZioBenchmark[Kafka with Producer] { override protected def bootstrap: ZLayer[Any, Nothing, Kafka with Producer] = ZLayer.make[Kafka with Producer](Kafka.embedded, producer).orDie override def initialize: ZIO[Kafka with Producer, Throwable, Any] = for { _ <- ZIO.succeed(EmbeddedKafka.deleteTopics(List(topic1))).ignore - _ <- ZIO.succeed(EmbeddedKafka.createCustomTopic(topic1, partitions = nrPartitions)) + _ <- ZIO.succeed(EmbeddedKafka.createCustomTopic(topic1, partitions = partitionCount)) _ <- produceMany(topic1, kvs) } yield () @@ -47,7 +43,7 @@ class ConsumerBenchmark extends ZioBenchmark[Kafka with Producer] { _ <- Consumer .plainStream(Subscription.topics(topic1), Serde.byteArray, Serde.byteArray) .tap { _ => - counter.updateAndGet(_ + 1).flatMap(count => Consumer.stopConsumption.when(count == nrMessages)) + counter.updateAndGet(_ + 1).flatMap(count => Consumer.stopConsumption.when(count == recordCount)) } .runDrain .provideSome[Kafka](env) @@ -67,7 +63,7 @@ class ConsumerBenchmark extends ZioBenchmark[Kafka with Producer] { .tap(batch => counter.update(_ + batch.size)) .map(OffsetBatch.apply) .mapZIO(_.commit) - .takeUntilZIO(_ => counter.get.map(_ >= nrMessages)) + .takeUntilZIO(_ => counter.get.map(_ >= recordCount)) .runDrain .provideSome[Kafka](env) } diff --git a/zio-kafka-bench/src/main/scala/zio/kafka/bench/ProducerBenchmark.scala b/zio-kafka-bench/src/main/scala/zio/kafka/bench/ZioKafkaProducerBenchmark.scala similarity index 85% rename from zio-kafka-bench/src/main/scala/zio/kafka/bench/ProducerBenchmark.scala rename to zio-kafka-bench/src/main/scala/zio/kafka/bench/ZioKafkaProducerBenchmark.scala index a6c66e278..db9380170 100644 --- a/zio-kafka-bench/src/main/scala/zio/kafka/bench/ProducerBenchmark.scala +++ b/zio-kafka-bench/src/main/scala/zio/kafka/bench/ZioKafkaProducerBenchmark.scala @@ -14,11 +14,7 @@ import java.util.concurrent.TimeUnit @State(Scope.Benchmark) @OutputTimeUnit(TimeUnit.MILLISECONDS) -class ProducerBenchmark extends ZioBenchmark[Kafka with Producer] { - val topic1 = "topic1" - val nrPartitions = 6 - val nrMessages = 500 - val kvs: List[(String, String)] = List.tabulate(nrMessages)(i => (s"key$i", s"msg$i")) +class ZioKafkaProducerBenchmark extends ProducerZioBenchmark[Kafka with Producer] { val records: Chunk[ProducerRecord[String, String]] = Chunk.fromIterable(kvs.map { case (k, v) => new ProducerRecord(topic1, k, v) }) @@ -28,7 +24,7 @@ class ProducerBenchmark extends ZioBenchmark[Kafka with Producer] { override def initialize: ZIO[Kafka with Producer, Throwable, Any] = for { _ <- ZIO.succeed(EmbeddedKafka.deleteTopics(List(topic1))).ignore - _ <- ZIO.succeed(EmbeddedKafka.createCustomTopic(topic1, partitions = nrPartitions)) + _ <- ZIO.succeed(EmbeddedKafka.createCustomTopic(topic1, partitions = partitionCount)) } yield () @Benchmark @@ -54,7 +50,7 @@ class ProducerBenchmark extends ZioBenchmark[Kafka with Producer] { @BenchmarkMode(Array(Mode.Throughput)) @OutputTimeUnit(TimeUnit.SECONDS) def produceSingleRecordSeq(): Any = runZIO { - // Produce 50 records sequentially + // Produce 100 records sequentially Producer.produce(topic1, "key", "value", Serde.string, Serde.string).repeatN(99) } diff --git a/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/ComparisonBenchmark.scala b/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/ComparisonBenchmark.scala index 67a68b193..59e015767 100644 --- a/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/ComparisonBenchmark.scala +++ b/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/ComparisonBenchmark.scala @@ -4,7 +4,7 @@ import io.github.embeddedkafka.EmbeddedKafka import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.common.serialization.ByteArrayDeserializer import zio.kafka.admin.AdminClient.TopicPartition -import zio.kafka.bench.ZioBenchmark +import zio.kafka.bench.ConsumerZioBenchmark import zio.kafka.bench.ZioBenchmark.randomThing import zio.kafka.bench.comparison.ComparisonBenchmark._ import zio.kafka.consumer.{ Consumer, ConsumerSettings } @@ -15,14 +15,10 @@ import zio.{ ULayer, ZIO, ZLayer } import scala.jdk.CollectionConverters._ -trait ComparisonBenchmark extends ZioBenchmark[Env] { +trait ComparisonBenchmark extends ConsumerZioBenchmark[Env] { - protected final val topic1: String = "topic1" - protected final val nrPartitions: Int = 6 protected final val topicPartitions: List[TopicPartition] = - (0 until nrPartitions).map(TopicPartition(topic1, _)).toList - protected final val numberOfMessages: Int = 1000000 - protected final val kvs: Iterable[(String, String)] = Iterable.tabulate(numberOfMessages)(i => (s"key$i", s"msg$i")) + (0 until partitionCount).map(TopicPartition(topic1, _)).toList private val javaKafkaConsumer: ZLayer[ConsumerSettings, Throwable, LowLevelKafka] = ZLayer.scoped { @@ -63,7 +59,7 @@ trait ComparisonBenchmark extends ZioBenchmark[Env] { override final def initialize: ZIO[Env, Throwable, Any] = for { _ <- ZIO.succeed(EmbeddedKafka.deleteTopics(List(topic1))).ignore - _ <- ZIO.succeed(EmbeddedKafka.createCustomTopic(topic1, partitions = nrPartitions)) + _ <- ZIO.succeed(EmbeddedKafka.createCustomTopic(topic1, partitions = partitionCount)) _ <- produceMany(topic1, kvs) } yield () diff --git a/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/KafkaClientBenchmarks.scala b/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/KafkaClientBenchmarks.scala index 7365d05fc..031f0be8b 100644 --- a/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/KafkaClientBenchmarks.scala +++ b/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/KafkaClientBenchmarks.scala @@ -23,14 +23,14 @@ class KafkaClientBenchmarks extends ComparisonBenchmark { consumer.subscribe(java.util.Arrays.asList(topic1)) var count = 0L - while (count < numberOfMessages) { + while (count < recordCount) { val records = consumer.poll(settings.pollTimeout) count += records.count() } consumer.unsubscribe() count - }.flatMap(r => zAssert(r == numberOfMessages, s"Consumed $r messages instead of $numberOfMessages")) + }.flatMap(r => zAssert(r == recordCount, s"Consumed $r records instead of $recordCount")) } } } @@ -46,14 +46,14 @@ class KafkaClientBenchmarks extends ComparisonBenchmark { consumer.assign(topicPartitions.map(_.asJava).asJava) var count = 0L - while (count < numberOfMessages) { + while (count < recordCount) { val records = consumer.poll(settings.pollTimeout) count += records.count() } consumer.unsubscribe() count - }.flatMap(r => zAssert(r == numberOfMessages, s"Consumed $r messages instead of $numberOfMessages")) + }.flatMap(r => zAssert(r == recordCount, s"Consumed $r records instead of $recordCount")) } } } diff --git a/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/README.md b/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/README.md deleted file mode 100644 index d7d1baa15..000000000 --- a/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/README.md +++ /dev/null @@ -1,26 +0,0 @@ -# Comparison Benchmarks - -## How to run them - -To run these "comparison" benchmarks, in a sbt console, run: -```scala -clean -Test/compile -zioKafkaBench/Jmh/run -wi 10 -i 10 -r 1 -w 1 -t 1 -f 5 -foe true .*comparison.* -``` - -The `.*comparison.*` part is the selector telling to JMH which benchmarks to run. -Here, we're only selecting the ones living in the `comparison` package. - -## Tuning JMH runs - -To list all possible options and understand these configurations, see run `sbt "zioKafkaBench/Jmh/run -h"` - -Used options meaning: - - "-wi 10": 10 warmup iterations - - "-i 10": 10 benchmark iterations - - "-r 1": Minimum time to spend at each measurement iteration. 1 second - - "-w 1": Minimum time to spend at each warmup iteration. 1 second - - "-t 1": Number of worker threads to run with. 1 thread - - "-f 5": How many times to fork a single benchmark. 5 forks - - "-foe true": Should JMH fail immediately if any benchmark had experienced an unrecoverable error?. True \ No newline at end of file diff --git a/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/ZioKafkaBenchmarks.scala b/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/ZioKafkaBenchmarks.scala index cff054455..facd1d07b 100644 --- a/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/ZioKafkaBenchmarks.scala +++ b/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/ZioKafkaBenchmarks.scala @@ -17,9 +17,9 @@ class ZioKafkaBenchmarks extends ComparisonBenchmark { runZIO { Consumer .plainStream(Subscription.topics(topic1), Serde.byteArray, Serde.byteArray) - .take(numberOfMessages.toLong) + .take(recordCount.toLong) .runCount - .flatMap(r => zAssert(r == numberOfMessages, s"Consumed $r messages instead of $numberOfMessages")) + .flatMap(r => zAssert(r == recordCount, s"Consumed $r records instead of $recordCount")) } @Benchmark @@ -32,9 +32,9 @@ class ZioKafkaBenchmarks extends ComparisonBenchmark { Serde.byteArray, Serde.byteArray ) - .take(numberOfMessages.toLong) + .take(recordCount.toLong) .runCount - .flatMap(r => zAssert(r == numberOfMessages, s"Consumed $r messages instead of $numberOfMessages")) + .flatMap(r => zAssert(r == recordCount, s"Consumed $r records instead of $recordCount")) } } diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index 679a0d582..13d51c789 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -28,6 +28,7 @@ import zio.test.Assertion._ import zio.test.TestAspect._ import zio.test._ +import java.util.concurrent.ExecutionException import scala.reflect.ClassTag //noinspection SimplifyAssertInspection @@ -292,7 +293,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { }.tap { case (_, idx) => ZIO.logDebug(s"Consumed $idx") } } .runDrain - .tap(_ => ZIO.logDebug("Stream completed")) + .zipLeft(ZIO.logDebug("Stream completed")) .provideSomeLayer[Kafka]( consumer(client, Some(group)) ) @@ -323,21 +324,20 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { } yield assert(offset.map(_.offset))(isSome(equalTo(9L))) }, test("process outstanding commits after a graceful shutdown with aggregateAsync using `maxRebalanceDuration`") { - val kvs = (1 to 100).toList.map(i => (s"key$i", s"msg$i")) for { - topic <- randomTopic - group <- randomGroup - client <- randomClient - _ <- produceMany(topic, kvs) - messagesReceived <- Ref.make[Int](0) + topic <- randomTopic + group <- randomGroup + client <- randomClient + _ <- scheduledProduce(topic, Schedule.fixed(50.millis).jittered).runDrain.forkScoped + lastProcessedOffset <- Ref.make(0L) offset <- ( Consumer .plainStream(Subscription.topics(topic), Serde.string, Serde.string) - .mapConcatZIO { record => + .mapZIO { record => for { - nr <- messagesReceived.updateAndGet(_ + 1) + nr <- lastProcessedOffset.updateAndGet(_ + 1) _ <- Consumer.stopConsumption.when(nr == 10) - } yield if (nr < 10) Seq(record.offset) else Seq.empty + } yield record.offset } .aggregateAsync(Consumer.offsetBatches) .mapZIO(_.commit) @@ -353,7 +353,8 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { maxRebalanceDuration = 6.seconds ) ) - } yield assertTrue(offset.map(_.offset).contains(9L)) + lastOffset <- lastProcessedOffset.get + } yield assertTrue(offset.map(_.offset).contains(lastOffset)) } @@ TestAspect.nonFlaky(2), test("a consumer timeout interrupts the stream and shuts down the consumer") { // Setup of this test: @@ -530,18 +531,30 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { // Consume messages subscription = Subscription.topics(topic) + assignedPartitionsRef <- Ref.make(Set.empty[Int]) // Set of partition numbers + // Create a Promise to signal when consumer1 has processed half the partitions + consumer1Ready <- Promise.make[Nothing, Unit] consumer1 <- Consumer .partitionedStream(subscription, Serde.string, Serde.string) .flatMapPar(nrPartitions) { case (tp, partition) => ZStream - .fromZIO(partition.runDrain) + .fromZIO( + consumer1Ready + .succeed(()) + .whenZIO( + assignedPartitionsRef + .updateAndGet(_ + tp.partition()) + .map(_.size >= (nrPartitions / 2)) + ) *> + partition.runDrain + ) .as(tp) } .take(nrPartitions.toLong / 2) .runDrain .provideSomeLayer[Kafka](consumer(client1, Some(group))) .fork - _ <- Live.live(ZIO.sleep(5.seconds)) + _ <- consumer1Ready.await consumer2 <- Consumer .partitionedStream(subscription, Serde.string, Serde.string) .take(nrPartitions.toLong / 2) @@ -574,11 +587,22 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { // Consume messages subscription = Subscription.topics(topic) + consumer1Ready <- Promise.make[Nothing, Unit] + assignedPartitionsRef <- Ref.make(Set.empty[Int]) // Set of partition numbers consumer1 <- Consumer .partitionedStream(subscription, Serde.string, Serde.string) .flatMapPar(nrPartitions) { case (tp, partition) => ZStream - .fromZIO(partition.runDrain) + .fromZIO( + consumer1Ready + .succeed(()) + .whenZIO( + assignedPartitionsRef + .updateAndGet(_ + tp.partition()) + .map(_.size >= (nrPartitions / 2)) + ) *> + partition.runDrain + ) .as(tp) } .take(nrPartitions.toLong / 2) @@ -592,7 +616,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .collect { case rebalance: DiagnosticEvent.Rebalance => rebalance } .runCollect .fork - _ <- ZIO.sleep(5.seconds) + _ <- consumer1Ready.await consumer2 <- Consumer .partitionedStream(subscription, Serde.string, Serde.string) .take(nrPartitions.toLong / 2) @@ -600,7 +624,6 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .provideSomeLayer[Kafka](consumer(client2, Some(group))) .fork _ <- consumer1.join - _ <- consumer1.join _ <- consumer2.join } yield diagnosticStream.join } @@ -1377,9 +1400,13 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { }: _*) @@ TestAspect.nonFlaky(2), test("running streams don't stall after a poll timeout") { for { - topic <- randomTopic - clientId <- randomClient - _ <- ZIO.fromTry(EmbeddedKafka.createCustomTopic(topic)) + topic <- randomTopic + clientId <- randomClient + _ <- ZIO.attempt(EmbeddedKafka.createCustomTopic(topic)).flatMap(ZIO.fromTry(_)).retryN(3).catchSome { + case e: ExecutionException + if e.getCause.isInstanceOf[org.apache.kafka.common.errors.TopicExistsException] => + ZIO.unit + } settings <- consumerSettings(clientId) consumer <- Consumer.make(settings.withPollTimeout(50.millis)) recordsOut <- Queue.unbounded[Unit] @@ -1480,10 +1507,8 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { test( "it's possible to start a new consumption session from a Consumer that had a consumption session stopped previously" ) { - // NOTE: - // When this test fails with the message `100000 was not less than 100000`, it's because - // your computer is so fast that the first consumer already consumed all 100000 messages. val numberOfMessages: Int = 100000 + val messagesToConsumeBeforeStop = 1000 // Adjust this threshold as needed val kvs: Iterable[(String, String)] = Iterable.tabulate(numberOfMessages)(i => (s"key-$i", s"msg-$i")) def test(diagnostics: Diagnostics): ZIO[Producer & Scope & Kafka, Throwable, TestResult] = @@ -1493,20 +1518,28 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { settings <- consumerSettings(clientId = clientId) consumer <- Consumer.make(settings, diagnostics = diagnostics) _ <- produceMany(topic, kvs) + // Create a Ref to track messages consumed and a Promise to signal when to stop consumption + messagesConsumedRef <- Ref.make(0) + stopPromise <- Promise.make[Nothing, Unit] // Starting a consumption session to start the Runloop. fiber <- consumer .plainStream(Subscription.manual(topic -> 0), Serde.string, Serde.string) + .mapZIO { _ => + messagesConsumedRef.updateAndGet(_ + 1).flatMap { count => + if (count >= messagesToConsumeBeforeStop) stopPromise.succeed(()).as(1L) + else ZIO.succeed(1L) + } + } .take(numberOfMessages.toLong) - .runCount + .runSum .forkScoped - _ <- ZIO.sleep(200.millis) + + // Wait for the consumption to reach the desired threshold + _ <- stopPromise.await _ <- consumer.stopConsumption consumed0 <- fiber.join _ <- ZIO.logDebug(s"consumed0: $consumed0") - _ <- ZIO.logDebug("About to sleep 5 seconds") - _ <- ZIO.sleep(5.seconds) - _ <- ZIO.logDebug("Slept 5 seconds") consumed1 <- consumer .plainStream(Subscription.manual(topic -> 0), Serde.string, Serde.string) .take(numberOfMessages.toLong) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopCommitOffsetsSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/CommitOffsetsSpec.scala similarity index 75% rename from zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopCommitOffsetsSpec.scala rename to zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/CommitOffsetsSpec.scala index 77dbc77b0..ffc79101e 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopCommitOffsetsSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/CommitOffsetsSpec.scala @@ -1,11 +1,13 @@ package zio.kafka.consumer.internal +import org.apache.kafka.clients.consumer.OffsetAndMetadata import org.apache.kafka.common.TopicPartition import zio._ -import org.apache.kafka.clients.consumer.OffsetAndMetadata +import zio.kafka.consumer.internal.Committer.CommitOffsets +import zio.kafka.consumer.internal.LiveCommitter.Commit import zio.test._ -object RunloopCommitOffsetsSpec extends ZIOSpecDefault { +object CommitOffsetsSpec extends ZIOSpecDefault { private val tp10 = new TopicPartition("t1", 0) private val tp11 = new TopicPartition("t1", 1) @@ -14,9 +16,9 @@ object RunloopCommitOffsetsSpec extends ZIOSpecDefault { private val tp22 = new TopicPartition("t2", 2) override def spec: Spec[TestEnvironment with Scope, Any] = - suite("Runloop.CommitOffsets spec")( + suite("CommitOffsets spec")( test("addCommits adds to empty CommitOffsets") { - val s1 = Runloop.CommitOffsets(Map.empty) + val s1 = CommitOffsets(Map.empty) val (inc, s2) = s1.addCommits(Chunk(makeCommit(Map(tp10 -> 10)))) assertTrue( inc == 0, @@ -24,7 +26,7 @@ object RunloopCommitOffsetsSpec extends ZIOSpecDefault { ) }, test("addCommits updates offset when it is higher") { - val s1 = Runloop.CommitOffsets(Map(tp10 -> 4L)) + val s1 = CommitOffsets(Map(tp10 -> 4L)) val (inc, s2) = s1.addCommits(Chunk(makeCommit(Map(tp10 -> 10)))) assertTrue( inc == 10 - 4, @@ -32,7 +34,7 @@ object RunloopCommitOffsetsSpec extends ZIOSpecDefault { ) }, test("addCommits ignores an offset when it is lower") { - val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L)) + val s1 = CommitOffsets(Map(tp10 -> 10L)) val (inc, s2) = s1.addCommits(Chunk(makeCommit(Map(tp10 -> 5)))) assertTrue( inc == 0, @@ -40,7 +42,7 @@ object RunloopCommitOffsetsSpec extends ZIOSpecDefault { ) }, test("addCommits keeps unrelated partitions") { - val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L)) + val s1 = CommitOffsets(Map(tp10 -> 10L)) val (inc, s2) = s1.addCommits(Chunk(makeCommit(Map(tp11 -> 11)))) assertTrue( inc == 0, @@ -48,7 +50,7 @@ object RunloopCommitOffsetsSpec extends ZIOSpecDefault { ) }, test("addCommits does it all at once") { - val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 205L, tp21 -> 210L, tp22 -> 220L)) + val s1 = CommitOffsets(Map(tp10 -> 10L, tp20 -> 205L, tp21 -> 210L, tp22 -> 220L)) val (inc, s2) = s1.addCommits(Chunk(makeCommit(Map(tp11 -> 11, tp20 -> 206L, tp21 -> 209L, tp22 -> 220L)))) assertTrue( inc == /* tp10 */ 0 + /* tp11 */ 0 + /* tp20 */ 1 + /* tp21 */ 0 + /* tp22 */ 0, @@ -56,7 +58,7 @@ object RunloopCommitOffsetsSpec extends ZIOSpecDefault { ) }, test("addCommits adds multiple commits") { - val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 200L, tp21 -> 210L, tp22 -> 220L)) + val s1 = CommitOffsets(Map(tp10 -> 10L, tp20 -> 200L, tp21 -> 210L, tp22 -> 220L)) val (inc, s2) = s1.addCommits( Chunk( makeCommit(Map(tp11 -> 11, tp20 -> 199L, tp21 -> 211L, tp22 -> 219L)), @@ -69,35 +71,35 @@ object RunloopCommitOffsetsSpec extends ZIOSpecDefault { ) }, test("keepPartitions removes some partitions") { - val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L)) + val s1 = CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L)) val s2 = s1.keepPartitions(Set(tp10)) assertTrue(s2.offsets == Map(tp10 -> 10L)) }, test("does not 'contain' offset when tp is not present") { - val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L)) + val s1 = CommitOffsets(Map(tp10 -> 10L)) val result = s1.contains(tp20, 10) assertTrue(!result) }, test("does not 'contain' a higher offset") { - val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L)) + val s1 = CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L)) val result = s1.contains(tp10, 11) assertTrue(!result) }, test("does 'contain' equal offset") { - val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L)) + val s1 = CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L)) val result = s1.contains(tp10, 10) assertTrue(result) }, test("does 'contain' lower offset") { - val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L)) + val s1 = CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L)) val result = s1.contains(tp20, 19) assertTrue(result) } ) - private def makeCommit(offsets: Map[TopicPartition, Long]): Runloop.Commit = { + private def makeCommit(offsets: Map[TopicPartition, Long]): Commit = { val o = offsets.map { case (tp, offset) => tp -> new OffsetAndMetadata(offset) } val p = Unsafe.unsafe(implicit unsafe => Promise.unsafe.make[Throwable, Unit](FiberId.None)) - Runloop.Commit(0L, o, p) + Commit(0L, o, p) } } diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/CommitterSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/CommitterSpec.scala new file mode 100644 index 000000000..9e0832201 --- /dev/null +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/CommitterSpec.scala @@ -0,0 +1,204 @@ +package zio.kafka.consumer.internal + +import org.apache.kafka.clients.consumer.OffsetAndMetadata +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.RebalanceInProgressException +import zio.kafka.consumer.diagnostics.Diagnostics +import zio.test._ +import zio.{ durationInt, Promise, Ref, ZIO } + +import java.util.{ Map => JavaMap } +import scala.jdk.CollectionConverters.MapHasAsJava + +object CommitterSpec extends ZIOSpecDefault { + override def spec = suite("Committer")( + test("signals that a new commit is available") { + for { + runtime <- ZIO.runtime[Any] + commitAvailable <- Promise.make[Nothing, Unit] + committer <- LiveCommitter + .make( + 10.seconds, + Diagnostics.NoOp, + new DummyMetrics, + onCommitAvailable = commitAvailable.succeed(()).unit, + sameThreadRuntime = runtime + ) + tp = new TopicPartition("topic", 0) + _ <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped + _ <- commitAvailable.await + } yield assertCompletes + }, + test("handles a successful commit by completing the commit effect") { + for { + runtime <- ZIO.runtime[Any] + commitAvailable <- Promise.make[Nothing, Unit] + committer <- LiveCommitter.make( + 10.seconds, + Diagnostics.NoOp, + new DummyMetrics, + onCommitAvailable = commitAvailable.succeed(()).unit, + sameThreadRuntime = runtime + ) + tp = new TopicPartition("topic", 0) + commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped + _ <- commitAvailable.await + _ <- committer.processQueuedCommits((offsets, callback) => ZIO.attempt(callback.onComplete(offsets, null))) + _ <- commitFiber.join + } yield assertCompletes + }, + test("handles a failed commit by completing the commit effect with a failure") { + for { + runtime <- ZIO.runtime[Any] + commitAvailable <- Promise.make[Nothing, Unit] + committer <- LiveCommitter.make( + 10.seconds, + Diagnostics.NoOp, + new DummyMetrics, + onCommitAvailable = commitAvailable.succeed(()).unit, + sameThreadRuntime = runtime + ) + tp = new TopicPartition("topic", 0) + commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped + _ <- commitAvailable.await + _ <- committer.processQueuedCommits((offsets, callback) => + ZIO.attempt(callback.onComplete(offsets, new RuntimeException("Commit failed"))) + ) + result <- commitFiber.await + } yield assertTrue(result.isFailure) + }, + test("retries when rebalancing") { + for { + runtime <- ZIO.runtime[Any] + commitAvailable <- Promise.make[Nothing, Unit] + committer <- LiveCommitter.make( + 10.seconds, + Diagnostics.NoOp, + new DummyMetrics, + onCommitAvailable = commitAvailable.succeed(()).unit, + sameThreadRuntime = runtime + ) + tp = new TopicPartition("topic", 0) + commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped + _ <- commitAvailable.await + _ <- committer.processQueuedCommits((offsets, callback) => + ZIO.attempt(callback.onComplete(offsets, new RebalanceInProgressException("Rebalance in progress"))) + ) + _ <- committer.processQueuedCommits((offsets, callback) => ZIO.attempt(callback.onComplete(offsets, null))) + result <- commitFiber.await + } yield assertTrue(result.isSuccess) + }, + test("adds 1 to the committed last offset") { + for { + runtime <- ZIO.runtime[Any] + commitAvailable <- Promise.make[Nothing, Unit] + committer <- LiveCommitter.make( + 10.seconds, + Diagnostics.NoOp, + new DummyMetrics, + onCommitAvailable = commitAvailable.succeed(()).unit, + sameThreadRuntime = runtime + ) + tp = new TopicPartition("topic", 0) + _ <- committer.commit(Map(tp -> new OffsetAndMetadata(1))).forkScoped + _ <- commitAvailable.await + committedOffsets <- Promise.make[Nothing, JavaMap[TopicPartition, OffsetAndMetadata]] + _ <- committer.processQueuedCommits((offsets, callback) => + committedOffsets.succeed(offsets) *> ZIO.attempt(callback.onComplete(offsets, null)) + ) + offsetsCommitted <- committedOffsets.await + } yield assertTrue( + offsetsCommitted == Map(tp -> new OffsetAndMetadata(2)).asJava + ) + }, + test("batches commits from multiple partitions and offsets") { + for { + runtime <- ZIO.runtime[Any] + commitsAvailable <- Promise.make[Nothing, Unit] + nrCommitsDone <- Ref.make(0) + committer <- LiveCommitter.make( + 10.seconds, + Diagnostics.NoOp, + new DummyMetrics, + onCommitAvailable = + ZIO.whenZIO(nrCommitsDone.updateAndGet(_ + 1).map(_ == 3))(commitsAvailable.succeed(())).unit, + sameThreadRuntime = runtime + ) + tp = new TopicPartition("topic", 0) + tp2 = new TopicPartition("topic", 1) + commitFiber1 <- committer.commit(Map(tp -> new OffsetAndMetadata(1))).forkScoped + commitFiber2 <- committer.commit(Map(tp -> new OffsetAndMetadata(2))).forkScoped + commitFiber3 <- committer.commit(Map(tp2 -> new OffsetAndMetadata(3))).forkScoped + _ <- commitsAvailable.await + committedOffsets <- Promise.make[Nothing, JavaMap[TopicPartition, OffsetAndMetadata]] + _ <- committer.processQueuedCommits((offsets, callback) => + committedOffsets.succeed(offsets) *> ZIO.attempt(callback.onComplete(offsets, null)) + ) + _ <- commitFiber1.join zip commitFiber2.join zip commitFiber3.join + offsetsCommitted <- committedOffsets.await + } yield assertTrue( + offsetsCommitted == Map(tp -> new OffsetAndMetadata(3), tp2 -> new OffsetAndMetadata(4)).asJava + ) + }, + test("keeps track of pending commits") { + for { + runtime <- ZIO.runtime[Any] + commitAvailable <- Promise.make[Nothing, Unit] + committer <- LiveCommitter.make( + 10.seconds, + Diagnostics.NoOp, + new DummyMetrics, + onCommitAvailable = commitAvailable.succeed(()).unit, + sameThreadRuntime = runtime + ) + tp = new TopicPartition("topic", 0) + commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped + _ <- commitAvailable.await + _ <- committer.processQueuedCommits((offsets, callback) => ZIO.attempt(callback.onComplete(offsets, null))) + pendingCommitsDuringCommit <- committer.pendingCommitCount + _ <- committer.cleanupPendingCommits + pendingCommitsAfterCommit <- committer.pendingCommitCount + _ <- commitFiber.join + } yield assertTrue(pendingCommitsDuringCommit == 1 && pendingCommitsAfterCommit == 0) + }, + test("keep track of committed offsets") { + for { + runtime <- ZIO.runtime[Any] + commitAvailable <- Promise.make[Nothing, Unit] + committer <- LiveCommitter.make( + 10.seconds, + Diagnostics.NoOp, + new DummyMetrics, + onCommitAvailable = commitAvailable.succeed(()).unit, + sameThreadRuntime = runtime + ) + tp = new TopicPartition("topic", 0) + commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped + _ <- commitAvailable.await + _ <- committer.processQueuedCommits((offsets, callback) => ZIO.attempt(callback.onComplete(offsets, null))) + committedOffsets <- committer.getCommittedOffsets + _ <- commitFiber.join + } yield assertTrue(committedOffsets.offsets == Map(tp -> 0L)) + }, + test("clean committed offsets of no-longer assigned partitions") { + for { + runtime <- ZIO.runtime[Any] + commitAvailable <- Promise.make[Nothing, Unit] + committer <- LiveCommitter.make( + 10.seconds, + Diagnostics.NoOp, + new DummyMetrics, + onCommitAvailable = commitAvailable.succeed(()).unit, + sameThreadRuntime = runtime + ) + tp = new TopicPartition("topic", 0) + commitFiber <- committer.commit(Map(tp -> new OffsetAndMetadata(0))).forkScoped + _ <- commitAvailable.await + _ <- committer.processQueuedCommits((offsets, callback) => ZIO.attempt(callback.onComplete(offsets, null))) + _ <- committer.keepCommitsForPartitions(Set.empty) + committedOffsets <- committer.getCommittedOffsets + _ <- commitFiber.join + } yield assertTrue(committedOffsets.offsets.isEmpty) + } + ) @@ TestAspect.withLiveClock @@ TestAspect.nonFlaky(100) +} diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/DummyMetrics.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/DummyMetrics.scala new file mode 100644 index 000000000..5879a1a1c --- /dev/null +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/DummyMetrics.scala @@ -0,0 +1,23 @@ +package zio.kafka.consumer.internal +import zio.{ UIO, ZIO } + +private[internal] class DummyMetrics extends ConsumerMetrics { + override def observePoll(resumedCount: Int, pausedCount: Int, latency: zio.Duration, pollSize: Int): UIO[Unit] = + ZIO.unit + + override def observeCommit(latency: zio.Duration): UIO[Unit] = ZIO.unit + override def observeAggregatedCommit(latency: zio.Duration, commitSize: NanoTime): UIO[Unit] = ZIO.unit + override def observeRebalance( + currentlyAssignedCount: Int, + assignedCount: Int, + revokedCount: Int, + lostCount: Int + ): UIO[Unit] = ZIO.unit + override def observeRunloopMetrics( + state: Runloop.State, + commandQueueSize: Int, + commitQueueSize: Int, + pendingCommits: Int + ): UIO[Unit] = ZIO.unit + override def observePollAuthError(): UIO[Unit] = ZIO.unit +} diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RebalanceCoordinatorSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RebalanceCoordinatorSpec.scala new file mode 100644 index 000000000..562db3f2c --- /dev/null +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RebalanceCoordinatorSpec.scala @@ -0,0 +1,218 @@ +package zio.kafka.consumer.internal + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.TopicPartition +import zio.kafka.ZIOSpecDefaultSlf4j +import zio.kafka.consumer.diagnostics.Diagnostics +import zio.kafka.consumer.internal.Committer.CommitOffsets +import zio.kafka.consumer.internal.LiveCommitter.Commit +import zio.kafka.consumer.internal.RebalanceCoordinator.RebalanceEvent +import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord +import zio.kafka.consumer.{ CommittableRecord, ConsumerSettings } +import zio.test._ +import zio.{ durationInt, Chunk, Promise, Ref, Scope, Semaphore, Task, UIO, ZIO } + +import java.util + +object RebalanceCoordinatorSpec extends ZIOSpecDefaultSlf4j { + type BinaryMockConsumer = MockConsumer[Array[Byte], Array[Byte]] + + private val mockMetrics = new ConsumerMetrics { + override def observePoll(resumedCount: Int, pausedCount: Int, latency: zio.Duration, pollSize: Int): UIO[Unit] = + ZIO.unit + + override def observeCommit(latency: zio.Duration): UIO[Unit] = ZIO.unit + override def observeAggregatedCommit(latency: zio.Duration, commitSize: NanoTime): UIO[Unit] = ZIO.unit + override def observeRebalance( + currentlyAssignedCount: Int, + assignedCount: Int, + revokedCount: Int, + lostCount: Int + ): UIO[Unit] = ZIO.unit + override def observeRunloopMetrics( + state: Runloop.State, + commandQueueSize: Int, + commitQueueSize: Int, + pendingCommits: Int + ): UIO[Unit] = ZIO.unit + override def observePollAuthError(): UIO[Unit] = ZIO.unit + } + + def spec = suite("RunloopRebalanceListener")( + test("should track assigned, revoked and lost partitions") { + for { + lastEvent <- Ref.Synchronized.make(RebalanceCoordinator.RebalanceEvent.None) + consumer = new BinaryMockConsumer(OffsetResetStrategy.LATEST) {} + tp = new TopicPartition("topic", 0) + tp2 = new TopicPartition("topic", 1) + tp3 = new TopicPartition("topic", 2) + tp4 = new TopicPartition("topic", 3) + listener <- makeCoordinator(lastEvent, consumer) + _ <- listener.toRebalanceListener.onAssigned(Set(tp)) + _ <- listener.toRebalanceListener.onAssigned(Set(tp4)) + _ <- listener.toRebalanceListener.onRevoked(Set(tp2)) + _ <- listener.toRebalanceListener.onLost(Set(tp3)) + event <- lastEvent.get + } yield assertTrue( + event.wasInvoked && event.assignedTps == Set(tp, tp4) && event.revokedTps == Set(tp2) && event.lostTps == Set( + tp3 + ) + ) + }, + test("should end streams for revoked and lost partitions") { + for { + lastEvent <- Ref.Synchronized.make(RebalanceCoordinator.RebalanceEvent.None) + consumer = new BinaryMockConsumer(OffsetResetStrategy.LATEST) {} + tp = new TopicPartition("topic", 0) + tp2 = new TopicPartition("topic", 1) + tp3 = new TopicPartition("topic", 2) + assignedStreams <- ZIO.foreach(Chunk(tp, tp2, tp3))(makeStreamControl) + listener <- makeCoordinator(lastEvent, consumer, assignedStreams = assignedStreams) + _ <- listener.toRebalanceListener.onAssigned(Set(tp)) + _ <- listener.toRebalanceListener.onRevoked(Set(tp2)) + _ <- listener.toRebalanceListener.onLost(Set(tp3)) + event <- lastEvent.get + // Lost and end partition's stream should be ended + _ <- assignedStreams(1).stream.runDrain + _ <- assignedStreams(2).stream.runDrain + } yield assertTrue(event.endedStreams.map(_.tp).toSet == Set(tp2, tp3)) + }, + suite("rebalanceSafeCommits")( + test("should wait for the last pulled offset to commit") { + for { + lastEvent <- Ref.Synchronized.make(RebalanceCoordinator.RebalanceEvent.None) + consumer = new BinaryMockConsumer(OffsetResetStrategy.LATEST) { + override def commitAsync( + offsets: util.Map[TopicPartition, OffsetAndMetadata], + callback: OffsetCommitCallback + ): Unit = + // Do nothing during rebalancing + if (callback != null) callback.onComplete(offsets, null) + + override def commitSync(): Unit = () + } + tp = new TopicPartition("topic", 0) + streamControl <- makeStreamControl(tp) + records = createTestRecords(3) + recordsPulled <- Promise.make[Nothing, Unit] + _ <- streamControl.offerRecords(records) + runtime <- ZIO.runtime[Any] + committer <- LiveCommitter.make(10.seconds, Diagnostics.NoOp, mockMetrics, ZIO.unit, runtime) + + streamDrain <- + streamControl.stream + .tap(_ => recordsPulled.succeed(())) + .tap(record => + committer + .commit( + Map( + new TopicPartition("topic", record.partition) -> new OffsetAndMetadata(record.offset.offset, null) + ) + ) + ) + .runDrain + .forkScoped + listener <- + makeCoordinator( + lastEvent, + consumer, + assignedStreams = Chunk(streamControl), + rebalanceSafeCommits = true, + committer = committer + ) + _ <- listener.toRebalanceListener.onRevoked(Set(tp)) + _ <- streamDrain.join + } yield assertCompletes + }, + test("should continue if waiting for the stream to continue has timed out") { + for { + lastEvent <- Ref.Synchronized.make(RebalanceCoordinator.RebalanceEvent.None) + consumer = new BinaryMockConsumer(OffsetResetStrategy.LATEST) {} + tp = new TopicPartition("topic", 0) + streamControl <- makeStreamControl(tp) + records = createTestRecords(3) + recordsPulled <- Promise.make[Nothing, Unit] + _ <- streamControl.offerRecords(records) + committedOffsets <- Ref.make(CommitOffsets.empty) + done <- Promise.make[Throwable, Unit] + committer = new MockCommitter { + override val commit = + offsets => + committedOffsets + .update(_.addCommits(Chunk(Commit(0L, offsets, done)))._2) + override def getCommittedOffsets = committedOffsets.get + } + streamDrain <- + streamControl.stream + .tap(_ => recordsPulled.succeed(())) + .runDrain + .forkScoped + listener <- + makeCoordinator( + lastEvent, + consumer, + assignedStreams = Chunk(streamControl), + rebalanceSafeCommits = true, + committer = committer + ) + _ <- listener.toRebalanceListener.onRevoked(Set(tp)) + _ <- streamDrain.join + } yield assertCompletes + } + ) + ) @@ TestAspect.withLiveClock + + private def makeStreamControl(tp: TopicPartition): UIO[PartitionStreamControl] = + PartitionStreamControl.newPartitionStream(tp, ZIO.unit, Diagnostics.NoOp, 30.seconds) + + private def makeCoordinator( + lastEvent: Ref.Synchronized[RebalanceEvent], + mockConsumer: BinaryMockConsumer, + assignedStreams: Chunk[PartitionStreamControl] = Chunk.empty, + committer: Committer = new MockCommitter {}, + settings: ConsumerSettings = ConsumerSettings(List("")).withCommitTimeout(1.second), + rebalanceSafeCommits: Boolean = false + ): ZIO[Scope, Throwable, RebalanceCoordinator] = + Semaphore.make(1).map(new ConsumerAccess(mockConsumer, _)).map { consumerAccess => + new RebalanceCoordinator( + lastEvent, + settings.withRebalanceSafeCommits(rebalanceSafeCommits), + consumerAccess, + 5.seconds, + ZIO.succeed(assignedStreams), + committer + ) + } + + private def createTestRecords(count: Int): Chunk[ByteArrayCommittableRecord] = + Chunk.fromIterable( + (1 to count).map(i => + CommittableRecord( + record = new ConsumerRecord[Array[Byte], Array[Byte]]( + "test-topic", + 0, + i.toLong, + Array[Byte](), + Array[Byte]() + ), + commitHandle = _ => ZIO.unit, + consumerGroupMetadata = None + ) + ) + ) +} + +abstract class MockCommitter extends Committer { + override val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] = _ => ZIO.unit + + override def processQueuedCommits( + commitAsync: (java.util.Map[TopicPartition, OffsetAndMetadata], OffsetCommitCallback) => zio.Task[Unit], + executeOnEmpty: Boolean + ): zio.Task[Unit] = ZIO.unit + override def queueSize: UIO[Int] = ZIO.succeed(0) + override def pendingCommitCount: UIO[Int] = ZIO.succeed(0) + override def getPendingCommits: UIO[CommitOffsets] = ZIO.succeed(CommitOffsets.empty) + override def cleanupPendingCommits: UIO[Unit] = ZIO.unit + override def keepCommitsForPartitions(assignedPartitions: Set[TopicPartition]): UIO[Unit] = ZIO.unit + override def getCommittedOffsets: UIO[CommitOffsets] = ZIO.succeed(CommitOffsets.empty) +} diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala index e5e63771f..20325fd02 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala @@ -284,9 +284,6 @@ final case class ConsumerSettings( withPartitionPreFetchBufferLimit(partitionPreFetchBufferLimit) /** - * WARNING: [[zio.kafka.consumer.fetch.FetchStrategy]] is an EXPERIMENTAL API and may change in an incompatible way - * without notice in any zio-kafka version. - * * @param fetchStrategy * The fetch strategy which selects which partitions can fetch data in the next poll. The default is to use the * [[zio.kafka.consumer.fetch.QueueSizeBasedFetchStrategy]] with a `partitionPreFetchBufferLimit` parameter of 1024, diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/FetchStrategy.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/FetchStrategy.scala index e8098fac0..2cd5d6e39 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/FetchStrategy.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/FetchStrategy.scala @@ -8,8 +8,6 @@ import scala.collection.mutable /** * A fetch strategy determined which stream are allowed to fetch data in the next poll. - * - * WARNING: this is an EXPERIMENTAL API and may change in an incompatible way without notice in any zio-kafka version. */ trait FetchStrategy { diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Committer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Committer.scala new file mode 100644 index 000000000..ab8b98ac5 --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Committer.scala @@ -0,0 +1,90 @@ +package zio.kafka.consumer.internal + +import org.apache.kafka.clients.consumer.{ OffsetAndMetadata, OffsetCommitCallback } +import org.apache.kafka.common.TopicPartition +import zio.kafka.consumer.internal.Committer.CommitOffsets +import zio.kafka.consumer.internal.LiveCommitter.Commit +import zio.{ Chunk, Task, UIO } + +import java.lang.Math.max +import java.util.{ Map => JavaMap } +import scala.collection.mutable + +private[internal] trait Committer { + val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] + + /** + * Takes commits from the queue, commits them and adds them to pending commits + * + * If the queue is empty, nothing happens, unless executeOnEmpty is true. + * + * WARNING: this method is used during a rebalance from the same-thread-runtime. This restricts what ZIO operations + * may be used. Please see [[RebalanceCoordinator]] for more information. + * + * @param commitAsync + * Function 'commitAsync' on the KafkaConsumer. This is isolated from the whole KafkaConsumer for testing purposes. + * The caller should ensure exclusive access to the KafkaConsumer. + * @param executeOnEmpty + * Execute commitAsync() even if there are no commits + */ + def processQueuedCommits( + commitAsync: (JavaMap[TopicPartition, OffsetAndMetadata], OffsetCommitCallback) => Task[Unit], + executeOnEmpty: Boolean = false + ): Task[Unit] + + def queueSize: UIO[Int] + + def pendingCommitCount: UIO[Int] + + def getPendingCommits: UIO[CommitOffsets] + + /** Removes all completed commits from `pendingCommits`. */ + def cleanupPendingCommits: UIO[Unit] + + def keepCommitsForPartitions(assignedPartitions: Set[TopicPartition]): UIO[Unit] + + def getCommittedOffsets: UIO[CommitOffsets] +} + +private[internal] object Committer { + final case class CommitOffsets(offsets: Map[TopicPartition, Long]) { + + /** Returns an estimate of the total offset increase, and a new `CommitOffsets` with the given offsets added. */ + def addCommits(c: Chunk[Commit]): (Long, CommitOffsets) = { + val updatedOffsets = mutable.Map.empty[TopicPartition, Long] + updatedOffsets.sizeHint(offsets.size) + updatedOffsets ++= offsets + var offsetIncrease = 0L + c.foreach { commit => + commit.offsets.foreach { case (tp, offsetAndMeta) => + val offset = offsetAndMeta.offset() + val maxOffset = updatedOffsets.get(tp) match { + case Some(existingOffset) => + offsetIncrease += max(0L, offset - existingOffset) + max(existingOffset, offset) + case None => + // This partition was not committed to from this consumer yet. Therefore we do not know the offset + // increase. A good estimate would be the poll size for this consumer, another okayish estimate is 0. + // Let's go with the simplest for now: ```offsetIncrease += 0``` + offset + } + updatedOffsets += tp -> maxOffset + } + } + (offsetIncrease, CommitOffsets(offsets = updatedOffsets.toMap)) + } + + def keepPartitions(tps: Set[TopicPartition]): CommitOffsets = + CommitOffsets(offsets.filter { case (tp, _) => tps.contains(tp) }) + + def contains(tp: TopicPartition, offset: Long): Boolean = + offsets.get(tp).exists(_ >= offset) + + def get(tp: TopicPartition): Option[Long] = offsets.get(tp) + } + + private[internal] object CommitOffsets { + val empty: CommitOffsets = CommitOffsets(Map.empty) + } + +} diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerMetrics.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerMetrics.scala index f43329a4f..aff76f7cd 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerMetrics.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerMetrics.scala @@ -16,7 +16,12 @@ private[internal] trait ConsumerMetrics { def observeCommit(latency: Duration): UIO[Unit] def observeAggregatedCommit(latency: Duration, commitSize: Long): UIO[Unit] def observeRebalance(currentlyAssignedCount: Int, assignedCount: Int, revokedCount: Int, lostCount: Int): UIO[Unit] - def observeRunloopMetrics(state: Runloop.State, commandQueueSize: Int, commitQueueSize: Int): UIO[Unit] + def observeRunloopMetrics( + state: Runloop.State, + commandQueueSize: Int, + commitQueueSize: Int, + pendingCommits: Int + ): UIO[Unit] def observePollAuthError(): UIO[Unit] } @@ -330,14 +335,19 @@ private[internal] class ZioConsumerMetrics(metricLabels: Set[MetricLabel]) exten .contramap[Int](_.toDouble) .tagged(metricLabels) - override def observeRunloopMetrics(state: Runloop.State, commandQueueSize: Int, commitQueueSize: Int): UIO[Unit] = + override def observeRunloopMetrics( + state: Runloop.State, + commandQueueSize: Int, + commitQueueSize: Int, + pendingCommits: Int + ): UIO[Unit] = for { _ <- ZIO.foreachDiscard(state.assignedStreams)(_.outstandingPolls @@ queuePollsHistogram) queueSizes <- ZIO.foreach(state.assignedStreams)(_.queueSize) _ <- ZIO.foreachDiscard(queueSizes)(qs => queueSizeHistogram.update(qs)) _ <- allQueueSizeHistogram.update(queueSizes.sum) _ <- pendingRequestsHistogram.update(state.pendingRequests.size) - _ <- pendingCommitsHistogram.update(state.pendingCommits.size) + _ <- pendingCommitsHistogram.update(pendingCommits) _ <- subscriptionStateGauge.update(state.subscriptionState) _ <- commandQueueSizeHistogram.update(commandQueueSize) _ <- commitQueueSizeHistogram.update(commitQueueSize) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/LiveCommitter.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/LiveCommitter.scala new file mode 100644 index 000000000..f976c1f9e --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/LiveCommitter.scala @@ -0,0 +1,167 @@ +package zio.kafka.consumer.internal +import org.apache.kafka.clients.consumer.{ OffsetAndMetadata, OffsetCommitCallback } +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.RebalanceInProgressException +import zio.kafka.consumer.Consumer.CommitTimeout +import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics } +import zio.kafka.consumer.internal.Committer.CommitOffsets +import zio.kafka.consumer.internal.LiveCommitter.Commit +import zio.{ durationLong, Chunk, Duration, Exit, Promise, Queue, Ref, Runtime, Scope, Task, UIO, Unsafe, ZIO } + +import java.util +import java.util.{ Map => JavaMap } +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +private[consumer] final class LiveCommitter( + commitQueue: Queue[Commit], + commitTimeout: Duration, + diagnostics: Diagnostics, + consumerMetrics: ConsumerMetrics, + onCommitAvailable: UIO[Unit], + committedOffsetsRef: Ref[CommitOffsets], + sameThreadRuntime: Runtime[Any], + pendingCommits: Ref.Synchronized[Chunk[Commit]] +) extends Committer { + + /** This is the implementation behind the user facing api `Offset.commit`. */ + override val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] = + offsets => + for { + p <- Promise.make[Throwable, Unit] + startTime = java.lang.System.nanoTime() + _ <- commitQueue.offer(Commit(startTime, offsets, p)) + _ <- onCommitAvailable + _ <- diagnostics.emit(DiagnosticEvent.Commit.Started(offsets)) + _ <- p.await.timeoutFail(CommitTimeout)(commitTimeout) + endTime = java.lang.System.nanoTime() + latency = (endTime - startTime).nanoseconds + _ <- consumerMetrics.observeCommit(latency) + } yield () + + /** + * WARNING: this method is used during a rebalance from the same-thread-runtime. This restricts what ZIO operations + * may be used. Please see [[RebalanceCoordinator]] for more information. + */ + override def processQueuedCommits( + commitAsync: (JavaMap[TopicPartition, OffsetAndMetadata], OffsetCommitCallback) => Task[Unit], + executeOnEmpty: Boolean = false + ): Task[Unit] = for { + commits <- commitQueue.takeAll + _ <- ZIO.logDebug(s"Processing ${commits.size} commits") + _ <- ZIO.unless(commits.isEmpty && !executeOnEmpty) { + val (offsets, callback, onFailure) = asyncCommitParameters(commits) + pendingCommits.update(_ ++ commits) *> + // We don't wait for the completion of the commit here, because it + // will only complete once we poll again. + commitAsync(offsets, callback) + .catchAll(onFailure) + } + } yield () + + /** + * Merge commits and prepare parameters for calling `consumer.commitAsync`. + * + * WARNING: this method is used during a rebalance from the same-thread-runtime. This restricts what ZIO operations + * may be used. Please see [[RebalanceCoordinator]] for more information. + */ + private def asyncCommitParameters( + commits: Chunk[Commit] + ): (JavaMap[TopicPartition, OffsetAndMetadata], OffsetCommitCallback, Throwable => UIO[Unit]) = { + val offsets = commits + .foldLeft(mutable.Map.empty[TopicPartition, OffsetAndMetadata]) { case (acc, commit) => + commit.offsets.foreach { case (tp, offset) => + acc += (tp -> acc + .get(tp) + .map(current => if (current.offset() > offset.offset()) current else offset) + .getOrElse(offset)) + } + acc + } + .toMap + val offsetsWithMetaData = offsets.map { case (tp, offset) => + tp -> new OffsetAndMetadata(offset.offset + 1, offset.leaderEpoch, offset.metadata) + } + val cont = (e: Exit[Throwable, Unit]) => ZIO.foreachDiscard(commits)(_.cont.done(e)) + // We assume the commit is started immediately after returning from this method. + val startTime = java.lang.System.nanoTime() + val onSuccess = { + val endTime = java.lang.System.nanoTime() + val latency = (endTime - startTime).nanoseconds + for { + offsetIncrease <- committedOffsetsRef.modify(_.addCommits(commits)) + _ <- consumerMetrics.observeAggregatedCommit(latency, offsetIncrease).when(commits.nonEmpty) + result <- cont(Exit.unit) + _ <- diagnostics.emit(DiagnosticEvent.Commit.Success(offsetsWithMetaData)) + } yield result + } + val onFailure: Throwable => UIO[Unit] = { + case _: RebalanceInProgressException => + for { + _ <- ZIO.logDebug(s"Rebalance in progress, commit for offsets $offsets will be retried") + _ <- commitQueue.offerAll(commits) + _ <- onCommitAvailable + } yield () + case err: Throwable => + cont(Exit.fail(err)) <* diagnostics.emit(DiagnosticEvent.Commit.Failure(offsetsWithMetaData, err)) + } + val callback = + new OffsetCommitCallback { + override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = + Unsafe.unsafe { implicit u => + sameThreadRuntime.unsafe.run { + if (exception eq null) onSuccess else onFailure(exception) + } + .getOrThrowFiberFailure() + } + } + (offsetsWithMetaData.asJava, callback, onFailure) + } + + override def queueSize: UIO[Int] = commitQueue.size + + override def pendingCommitCount: UIO[Int] = pendingCommits.get.map(_.size) + + override def getPendingCommits: UIO[CommitOffsets] = + pendingCommits.get.map(CommitOffsets.empty.addCommits(_)._2) + + override def cleanupPendingCommits: UIO[Unit] = + pendingCommits.updateZIO(_.filterZIO(_.isPending)) + + override def keepCommitsForPartitions(assignedPartitions: Set[TopicPartition]): UIO[Unit] = + committedOffsetsRef.update(_.keepPartitions(assignedPartitions)) + + override def getCommittedOffsets: UIO[CommitOffsets] = committedOffsetsRef.get +} + +private[internal] object LiveCommitter { + def make( + commitTimeout: Duration, + diagnostics: Diagnostics, + consumerMetrics: ConsumerMetrics, + onCommitAvailable: UIO[Unit], + sameThreadRuntime: Runtime[Any] + ): ZIO[Scope, Nothing, LiveCommitter] = for { + pendingCommits <- Ref.Synchronized.make(Chunk.empty[Commit]) + commitQueue <- ZIO.acquireRelease(Queue.unbounded[Commit])(_.shutdown) + committedOffsetsRef <- Ref.make(CommitOffsets.empty) + } yield new LiveCommitter( + commitQueue, + commitTimeout, + diagnostics, + consumerMetrics, + onCommitAvailable, + committedOffsetsRef, + sameThreadRuntime, + pendingCommits + ) + + private[internal] final case class Commit( + createdAt: NanoTime, + offsets: Map[TopicPartition, OffsetAndMetadata], + cont: Promise[Throwable, Unit] + ) { + @inline def isPending: UIO[Boolean] = cont.isDone.negate + } + +} diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RebalanceCoordinator.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RebalanceCoordinator.scala new file mode 100644 index 000000000..43be57980 --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RebalanceCoordinator.scala @@ -0,0 +1,288 @@ +package zio.kafka.consumer.internal +import org.apache.kafka.common.TopicPartition +import zio.kafka.consumer.internal.ConsumerAccess.ByteArrayKafkaConsumer +import zio.kafka.consumer.internal.RebalanceCoordinator.{ + EndOffsetCommitPending, + EndOffsetCommitted, + EndOffsetNotCommitted, + RebalanceEvent, + StreamCompletionStatus +} +import zio.kafka.consumer.{ ConsumerSettings, RebalanceListener } +import zio.stream.ZStream +import zio.{ durationInt, Chunk, Duration, Ref, Task, UIO, ZIO } + +/** + * The Runloop's RebalanceListener gets notified of partitions that are assigned, revoked and lost + * + * Because this happens during the call to `poll()`, we communicate any results to the Runloop via a `Ref` + * + * When rebalanceSafeCommits is enabled, we await completion of all revoked partitions' streams and their commits before + * continuing. + */ +private[internal] class RebalanceCoordinator( + lastRebalanceEvent: Ref.Synchronized[RebalanceEvent], + settings: ConsumerSettings, + consumer: ConsumerAccess, + maxRebalanceDuration: Duration, + getCurrentAssignedStreams: UIO[Chunk[PartitionStreamControl]], + committer: Committer +) { + private val commitTimeoutNanos = settings.commitTimeout.toNanos + + private val restartStreamsOnRebalancing = settings.restartStreamOnRebalancing + private val rebalanceSafeCommits = settings.rebalanceSafeCommits + private val commitTimeout = settings.commitTimeout + + // All code in this block is called from the rebalance listener and therefore runs on the same-thread-runtime. This + // is because the Java kafka client requires us to invoke the consumer from the same thread that invoked the + // rebalance listener. + // Unfortunately the same-thread-runtime does not work for all ZIO operations. For example, `ZIO.timeout`, + // `ZStream.repeat`, `Promise.await` on non-completed promises, and any other ZIO operation that shifts the work to + // another thread cannot be used. + + // Time between polling the commit queue from the rebalance listener when `rebalanceSafeCommits` is enabled. + private val commitQueuePollInterval = 100.millis + + def getAndResetLastEvent: UIO[RebalanceEvent] = + lastRebalanceEvent.getAndSet(RebalanceEvent.None) + + // End streams from the rebalance listener. + // When `rebalanceSafeCommits` is enabled, wait for consumed offsets to be committed. + private def endStreams(streamsToEnd: Chunk[PartitionStreamControl]): Task[Any] = + ZIO.unless(streamsToEnd.isEmpty) { + for { + _ <- ZIO.foreachDiscard(streamsToEnd)(_.end) + _ <- consumer.rebalanceListenerAccess(doAwaitStreamCommits(_, streamsToEnd)).when(rebalanceSafeCommits) + } yield () + } + + private def doAwaitStreamCommits( + consumer: ByteArrayKafkaConsumer, + streamsToEnd: Chunk[PartitionStreamControl] + ): Task[Unit] = { + val deadline = java.lang.System.nanoTime() + maxRebalanceDuration.toNanos - commitTimeoutNanos + + def timeToDeadlineMillis(): Long = (deadline - java.lang.System.nanoTime()) / 1000000L + + def completionStatusesAsString(completionStatuses: Chunk[StreamCompletionStatus]): String = + "Revoked partitions: " + completionStatuses.map(_.toString).mkString("; ") + + def getStreamCompletionStatuses: UIO[Chunk[StreamCompletionStatus]] = + for { + committedOffsets <- committer.getCommittedOffsets + latestPendingCommitOffsets <- committer.getPendingCommits.map(_.offsets) + streamResults <- + ZIO.foreach(streamsToEnd) { stream => + for { + isDone <- stream.completedPromise.isDone + lastPulledOffset <- stream.lastPulledOffset + endOffset <- if (isDone) stream.completedPromise.await else ZIO.none + + endOffsetCommitStatus = + endOffset match { + case Some(endOffset) if committedOffsets.contains(stream.tp, endOffset.offset) => + EndOffsetCommitted + case Some(endOffset) if latestPendingCommitOffsets.get(stream.tp).contains(endOffset.offset) => + EndOffsetCommitPending + case _ => EndOffsetNotCommitted + } + } yield StreamCompletionStatus( + stream.tp, + isDone, + lastPulledOffset.map(_.offset), + committedOffsets.get(stream.tp), + endOffsetCommitStatus + ) + } + } yield streamResults + + @inline + def logStreamCompletionStatuses(completionStatuses: Chunk[StreamCompletionStatus]): UIO[Unit] = { + val statusStrings = completionStatusesAsString(completionStatuses) + ZIO.logDebug( + s"Delaying rebalance until ${streamsToEnd.size} streams (of revoked partitions) have committed " + + s"the offsets of the records they consumed. Deadline in ${timeToDeadlineMillis()}ms. $statusStrings" + ) + } + + def logInitialStreamCompletionStatuses: UIO[Unit] = + for { + completionStatuses <- getStreamCompletionStatuses + _ <- logStreamCompletionStatuses(completionStatuses) + } yield () + + def endingStreamsCompletedAndCommitsExist: UIO[Boolean] = + for { + completionStatuses <- getStreamCompletionStatuses + _ <- logStreamCompletionStatuses(completionStatuses) + } yield completionStatuses.forall { status => + // A stream is complete when it never got any records, or when it committed the offset of the last consumed record + status.lastPulledOffset.isEmpty || (status.streamEnded && status.endOffsetCommitStatus != EndOffsetNotCommitted) + } + + def logFinalStreamCompletionStatuses(completed: Boolean): UIO[Unit] = + if (completed) + ZIO.logInfo("Continuing rebalance, all offsets of consumed records in the revoked partitions were committed.") + else + for { + completionStatuses <- getStreamCompletionStatuses + statusStrings = completionStatusesAsString(completionStatuses) + _ <- + ZIO.logWarning( + s"Exceeded deadline waiting for streams (of revoked partitions) to commit the offsets of " + + s"the records they consumed; the rebalance will continue. " + + s"This might cause another consumer to process some records again. $statusStrings" + ) + } yield () + + def commitSync: Task[Unit] = + ZIO.attempt(consumer.commitSync(java.util.Collections.emptyMap(), commitTimeout)) + + // Outline: + // - Every `commitQueuePollInterval` until the deadline has been reached: + // - Get all commits from the commit queue. + // - Start an async commit for these commits. + // - Collect all these new (pending) commits. + // - repeat the above until: + // - All streams that were ended have completed their work, and + // - we have seen a completed or pending commit for all end-offsets. + // An end-offset of a stream is the offset of the last record given to that stream. + // - Do a single sync commit without any offsets, this has the side-effect of blocking until all + // preceding async commits are complete (this requires kafka-client 3.6.0 or later). + // Because all commits created here (including those from non-ending streams) are now complete, we do not + // have to add them to the pending commits of the runloop state. + // + // Note, we cannot use ZStream.fromQueue because that will emit nothing when the queue is empty. + // Instead, we poll the queue in a loop. + for { + _ <- logInitialStreamCompletionStatuses + completed <- + ZStream + .fromZIO(blockingSleep(commitQueuePollInterval)) + .forever + // Even if there is nothing to commit, continue to drive communication with the broker + // so that commits can complete and the streams can make progress, by setting + // executeOnEmpty = true + .tap { _ => + committer.processQueuedCommits( + (offsets, callback) => ZIO.attempt(consumer.commitAsync(offsets, callback)), + executeOnEmpty = true + ) + } + .takeWhile(_ => java.lang.System.nanoTime() <= deadline) + .mapZIO(_ => endingStreamsCompletedAndCommitsExist) + .takeUntil(completed => completed) + .runLast + .map(_.getOrElse(false)) + _ <- logFinalStreamCompletionStatuses(completed) + _ <- commitSync + _ <- ZIO.logDebug(s"Done waiting for ${streamsToEnd.size} streams to end") + } yield () + } + + // During a poll, the java kafka client might call each method of the rebalance listener 0 or 1 times. + // We do not know the order in which the call-back methods are invoked. + // + // Ref `lastRebalanceEvent` is used to track what happens during the poll. Just before the poll the + // `RebalanceEvent.None` is stored. Then during the poll, inside each method of the rebalance listener, + // the ref is updated. + // + // Each method: + // - emits a diagnostic event + // - determines if this is the first method invoked during this poll (`rebalanceEvent.wasInvoked`) to + // make sure that the `restartStreamsOnRebalancing` feature is applied only once per poll + // - ends streams that need to be ended + // - updates `lastRebalanceEvent` + // + def toRebalanceListener: RebalanceListener = RebalanceListener( + onAssigned = assignedTps => + lastRebalanceEvent.updateZIO { rebalanceEvent => + for { + _ <- ZIO.logDebug { + val sameRebalance = if (rebalanceEvent.wasInvoked) " in same rebalance" else "" + s"${assignedTps.size} partitions are assigned$sameRebalance" + } + assignedStreams <- getCurrentAssignedStreams + streamsToEnd = if (restartStreamsOnRebalancing && !rebalanceEvent.wasInvoked) assignedStreams + else Chunk.empty + _ <- endStreams(streamsToEnd) + _ <- ZIO.logTrace("onAssigned done") + } yield rebalanceEvent.copy( + wasInvoked = true, + assignedTps = rebalanceEvent.assignedTps ++ assignedTps, + endedStreams = rebalanceEvent.endedStreams ++ streamsToEnd + ) + }, + onRevoked = revokedTps => + lastRebalanceEvent.updateZIO { rebalanceEvent => + for { + _ <- ZIO.logDebug { + val sameRebalance = if (rebalanceEvent.wasInvoked) " in same rebalance" else "" + s"${revokedTps.size} partitions are revoked$sameRebalance" + } + assignedStreams <- getCurrentAssignedStreams + streamsToEnd = if (restartStreamsOnRebalancing && !rebalanceEvent.wasInvoked) assignedStreams + else assignedStreams.filter(control => revokedTps.contains(control.tp)) + _ <- endStreams(streamsToEnd) + _ <- ZIO.logTrace("onRevoked done") + } yield rebalanceEvent.copy( + wasInvoked = true, + assignedTps = rebalanceEvent.assignedTps -- revokedTps, + revokedTps = rebalanceEvent.revokedTps ++ revokedTps, + endedStreams = rebalanceEvent.endedStreams ++ streamsToEnd + ) + }, + onLost = lostTps => + lastRebalanceEvent.updateZIO { rebalanceEvent => + for { + _ <- ZIO.logDebug(s"${lostTps.size} partitions are lost") + assignedStreams <- getCurrentAssignedStreams + lostStreams = assignedStreams.filter(control => lostTps.contains(control.tp)) + _ <- ZIO.foreachDiscard(lostStreams)(_.lost) + _ <- ZIO.logTrace(s"onLost done") + } yield rebalanceEvent.copy( + wasInvoked = true, + assignedTps = rebalanceEvent.assignedTps -- lostTps, + lostTps = rebalanceEvent.lostTps ++ lostTps, + endedStreams = rebalanceEvent.endedStreams ++ lostStreams + ) + } + ) +} + +private[internal] object RebalanceCoordinator { + + sealed trait EndOffsetCommitStatus + case object EndOffsetNotCommitted extends EndOffsetCommitStatus { override def toString = "not committed" } + case object EndOffsetCommitPending extends EndOffsetCommitStatus { override def toString = "commit pending" } + case object EndOffsetCommitted extends EndOffsetCommitStatus { override def toString = "committed" } + + final case class StreamCompletionStatus( + tp: TopicPartition, + streamEnded: Boolean, + lastPulledOffset: Option[Long], + lastCommittedOffset: Option[Long], + endOffsetCommitStatus: EndOffsetCommitStatus + ) { + override def toString: String = + s"$tp: " + + s"${if (streamEnded) "stream ended" else "stream is running"}, " + + s"last pulled offset=${lastPulledOffset.getOrElse("none")}, " + + s"last committed offset=${lastCommittedOffset.getOrElse("none")}, " + + endOffsetCommitStatus + } + + final case class RebalanceEvent( + wasInvoked: Boolean, + assignedTps: Set[TopicPartition], + revokedTps: Set[TopicPartition], + lostTps: Set[TopicPartition], + endedStreams: Chunk[PartitionStreamControl] + ) + + object RebalanceEvent { + val None: RebalanceEvent = + RebalanceEvent(wasInvoked = false, Set.empty, Set.empty, Set.empty, Chunk.empty) + } +} diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index b017c7a3d..39570b7c7 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -2,21 +2,18 @@ package zio.kafka.consumer.internal import org.apache.kafka.clients.consumer._ import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.errors.{ AuthenticationException, AuthorizationException, RebalanceInProgressException } +import org.apache.kafka.common.errors.{ AuthenticationException, AuthorizationException } import zio._ -import zio.kafka.consumer.Consumer.{ CommitTimeout, OffsetRetrieval } +import zio.kafka.consumer.Consumer.OffsetRetrieval import zio.kafka.consumer._ import zio.kafka.consumer.diagnostics.DiagnosticEvent.{ Finalization, Rebalance } import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics } import zio.kafka.consumer.internal.ConsumerAccess.ByteArrayKafkaConsumer import zio.kafka.consumer.internal.Runloop._ import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment +import zio.kafka.consumer.internal.RebalanceCoordinator.RebalanceEvent import zio.stream._ -import java.lang.Math.max -import java.util -import java.util.{ Map => JavaMap } -import scala.collection.mutable import scala.jdk.CollectionConverters._ //noinspection SimplifyWhenInspection,SimplifyUnlessInspection @@ -25,25 +22,16 @@ private[consumer] final class Runloop private ( topLevelExecutor: Executor, sameThreadRuntime: Runtime[Any], consumer: ConsumerAccess, - commitQueue: Queue[Commit], commandQueue: Queue[RunloopCommand], - lastRebalanceEvent: Ref.Synchronized[Runloop.RebalanceEvent], + commitAvailableQueue: Queue[Boolean], partitionsHub: Hub[Take[Throwable, PartitionAssignment]], diagnostics: Diagnostics, maxStreamPullInterval: Duration, - maxRebalanceDuration: Duration, currentStateRef: Ref[State], - committedOffsetsRef: Ref[CommitOffsets], - commitAvailable: Ref[Boolean] + rebalanceCoordinator: RebalanceCoordinator, + consumerMetrics: ConsumerMetrics, + committer: Committer ) { - private val commitTimeout = settings.commitTimeout - private val commitTimeoutNanos = settings.commitTimeout.toNanos - - private val restartStreamsOnRebalancing = settings.restartStreamOnRebalancing - private val rebalanceSafeCommits = settings.rebalanceSafeCommits - - private val consumerMetrics = new ZioConsumerMetrics(settings.metricLabels) - private def newPartitionStream(tp: TopicPartition): UIO[PartitionStreamControl] = PartitionStreamControl.newPartitionStream( tp, @@ -82,236 +70,6 @@ private[consumer] final class Runloop private ( commandQueue.offer(RunloopCommand.RemoveSubscription(subscription)).unit private def makeRebalanceListener: ConsumerRebalanceListener = { - // All code in this block is called from the rebalance listener and therefore runs on the same-thread-runtime. This - // is because the Java kafka client requires us to invoke the consumer from the same thread that invoked the - // rebalance listener. - // Unfortunately the same-thread-runtime does not work for all ZIO operations. For example, `ZIO.timeout`, - // `ZStream.repeat`, `Promise.await` on non-completed promises, and any other ZIO operation that shifts the work to - // another thread cannot be used. - - // Time between polling the commit queue from the rebalance listener when `rebalanceSafeCommits` is enabled. - val commitQueuePollInterval = 100.millis - - // End streams from the rebalance listener. - // When `rebalanceSafeCommits` is enabled, wait for consumed offsets to be committed. - def endStreams(state: State, streamsToEnd: Chunk[PartitionStreamControl]): Task[Unit] = - if (streamsToEnd.isEmpty) ZIO.unit - else { - for { - _ <- ZIO.foreachDiscard(streamsToEnd)(_.end) - _ <- if (rebalanceSafeCommits) - consumer.rebalanceListenerAccess(doAwaitStreamCommits(_, state, streamsToEnd)) - else ZIO.unit - } yield () - } - - def doAwaitStreamCommits( - consumer: ByteArrayKafkaConsumer, - state: State, - streamsToEnd: Chunk[PartitionStreamControl] - ): Task[Unit] = { - val deadline = java.lang.System.nanoTime() + maxRebalanceDuration.toNanos - commitTimeoutNanos - - def timeToDeadlineMillis(): Long = (deadline - java.lang.System.nanoTime()) / 1000000L - - val endingTps = streamsToEnd.map(_.tp).toSet - - def commitsOfEndingStreams(commits: Chunk[Runloop.Commit]): Chunk[Runloop.Commit] = - commits.filter(commit => (commit.offsets.keySet intersect endingTps).nonEmpty) - - lazy val previousPendingCommits: Chunk[Commit] = - commitsOfEndingStreams(state.pendingCommits) - - def commitAsync(commits: Chunk[Commit]): UIO[Unit] = - if (commits.nonEmpty) { - val (offsets, callback, onFailure) = asyncCommitParameters(commits) - ZIO.logDebug(s"Async commit of ${offsets.size} offsets for ${commits.size} commits") *> - ZIO.attempt(consumer.commitAsync(offsets, callback)).catchAll(onFailure) - } else { - // Continue to drive communication with the broker so that commits can complete and the streams can - // make progress. - ZIO.attempt(consumer.commitAsync(java.util.Collections.emptyMap(), null)).orDie - } - - sealed trait EndOffsetCommitStatus - case object EndOffsetNotCommitted extends EndOffsetCommitStatus { override def toString = "not committed" } - case object EndOffsetCommitPending extends EndOffsetCommitStatus { override def toString = "commit pending" } - case object EndOffsetCommitted extends EndOffsetCommitStatus { override def toString = "committed" } - - final case class StreamCompletionStatus( - tp: TopicPartition, - streamEnded: Boolean, - lastPulledOffset: Option[Long], - endOffsetCommitStatus: EndOffsetCommitStatus - ) { - override def toString: String = - s"${tp}: " + - s"${if (streamEnded) "stream ended" else "stream is running"}, " + - s"last pulled offset=${lastPulledOffset.getOrElse("none")}, " + - endOffsetCommitStatus - } - - def completionStatusesAsString(completionStatuses: Chunk[StreamCompletionStatus]): String = - "Revoked partitions: " + completionStatuses.map(_.toString).mkString("; ") - - def getStreamCompletionStatuses(newCommits: Chunk[Commit]): UIO[Chunk[StreamCompletionStatus]] = - for { - committedOffsets <- committedOffsetsRef.get - allPendingCommitOffsets = - (previousPendingCommits ++ commitsOfEndingStreams(newCommits)).flatMap(_.offsets).map { - case (tp, offsetAndMetadata) => (tp, offsetAndMetadata.offset()) - } - streamResults <- - ZIO.foreach(streamsToEnd) { stream => - for { - isDone <- stream.completedPromise.isDone - lastPulledOffset <- stream.lastPulledOffset - endOffset <- if (isDone) stream.completedPromise.await else ZIO.none - - endOffsetCommitStatus = - endOffset match { - case Some(endOffset) if committedOffsets.contains(stream.tp, endOffset.offset) => - EndOffsetCommitted - case Some(endOffset) if allPendingCommitOffsets.contains((stream.tp, endOffset.offset)) => - EndOffsetCommitPending - case _ => EndOffsetNotCommitted - } - } yield StreamCompletionStatus(stream.tp, isDone, lastPulledOffset.map(_.offset), endOffsetCommitStatus) - } - } yield streamResults - - @inline - def logStreamCompletionStatuses(completionStatuses: Chunk[StreamCompletionStatus]): UIO[Unit] = { - val statusStrings = completionStatusesAsString(completionStatuses) - ZIO.logInfo( - s"Delaying rebalance until ${streamsToEnd.size} streams (of revoked partitions) have committed " + - s"the offsets of the records they consumed. Deadline in ${timeToDeadlineMillis()}ms. $statusStrings" - ) - } - - def logInitialStreamCompletionStatuses: UIO[Unit] = - for { - completionStatuses <- getStreamCompletionStatuses(newCommits = Chunk.empty) - _ <- logStreamCompletionStatuses(completionStatuses) - } yield () - - def endingStreamsCompletedAndCommitsExist(newCommits: Chunk[Commit]): UIO[Boolean] = - for { - completionStatuses <- getStreamCompletionStatuses(newCommits) - _ <- logStreamCompletionStatuses(completionStatuses) - } yield completionStatuses.forall { status => - // A stream is complete when it never got any records, or when it committed the offset of the last consumed record - status.lastPulledOffset.isEmpty || (status.streamEnded && status.endOffsetCommitStatus != EndOffsetNotCommitted) - } - - def logFinalStreamCompletionStatuses(completed: Boolean, newCommits: Chunk[Commit]): UIO[Unit] = - if (completed) - ZIO.logInfo("Continuing rebalance, all offsets of consumed records in the revoked partitions were committed.") - else - for { - completionStatuses <- getStreamCompletionStatuses(newCommits) - statusStrings = completionStatusesAsString(completionStatuses) - _ <- - ZIO.logWarning( - s"Exceeded deadline waiting for streams (of revoked partitions) to commit the offsets of " + - s"the records they consumed; the rebalance will continue. " + - s"This might cause another consumer to process some records again. $statusStrings" - ) - } yield () - - def commitSync: Task[Unit] = - ZIO.attempt(consumer.commitSync(java.util.Collections.emptyMap(), commitTimeout)) - - // Outline: - // - Every `commitQueuePollInterval` until the deadline has been reached: - // - Get all commits from the commit queue. - // - Start an async commit for these commits. - // - Collect all these new (pending) commits. - // - repeat the above until: - // - All streams that were ended have completed their work, and - // - we have seen a completed or pending commit for all end-offsets. - // An end-offset of a stream is the offset of the last record given to that stream. - // - Do a single sync commit without any offsets, this has the side-effect of blocking until all - // preceding async commits are complete (this requires kafka-client 3.6.0 or later). - // Because all commits created here (including those from non-ending streams) are now complete, we do not - // have to add them to the pending commits of the runloop state. - // - // Note, we cannot use ZStream.fromQueue because that will emit nothing when the queue is empty. - // Instead, we poll the queue in a loop. - for { - _ <- logInitialStreamCompletionStatuses - completedAndCommits <- - ZStream - .fromZIO(blockingSleep(commitQueuePollInterval) *> commitQueue.takeAll) - .tap(commitAsync) - .forever - .takeWhile(_ => java.lang.System.nanoTime() <= deadline) - .scan(Chunk.empty[Runloop.Commit])(_ ++ _) - .mapZIO(commits => endingStreamsCompletedAndCommitsExist(commits).map((_, commits))) - .takeUntil { case (completed, _) => completed } - .runLast - .map(_.getOrElse((false, Chunk.empty))) - _ <- logFinalStreamCompletionStatuses(completedAndCommits._1, completedAndCommits._2) - _ <- commitSync - _ <- ZIO.logDebug(s"Done waiting for ${streamsToEnd.size} streams to end") - } yield () - } - - // During a poll, the java kafka client might call each method of the rebalance listener 0 or 1 times. - // We do not know the order in which the call-back methods are invoked. - // - // Ref `lastRebalanceEvent` is used to track what happens during the poll. Just before the poll the - // `RebalanceEvent.None` is stored. Then during the poll, inside each method of the rebalance listener, - // the ref is updated. - // - // Each method: - // - emits a diagnostic event - // - determines if this is the first method invoked during this poll (`rebalanceEvent.wasInvoked`) to - // make sure that the `restartStreamsOnRebalancing` feature is applied only once per poll - // - ends streams that need to be ended - // - updates `lastRebalanceEvent` - // - val recordRebalanceRebalancingListener = RebalanceListener( - onAssigned = assignedTps => - for { - rebalanceEvent <- lastRebalanceEvent.get - _ <- ZIO.logDebug { - val sameRebalance = if (rebalanceEvent.wasInvoked) " in same rebalance" else "" - s"${assignedTps.size} partitions are assigned$sameRebalance" - } - state <- currentStateRef.get - streamsToEnd = if (restartStreamsOnRebalancing && !rebalanceEvent.wasInvoked) state.assignedStreams - else Chunk.empty - _ <- endStreams(state, streamsToEnd) - _ <- lastRebalanceEvent.set(rebalanceEvent.onAssigned(assignedTps, endedStreams = streamsToEnd)) - _ <- ZIO.logTrace("onAssigned done") - } yield (), - onRevoked = revokedTps => - for { - rebalanceEvent <- lastRebalanceEvent.get - _ <- ZIO.logDebug { - val sameRebalance = if (rebalanceEvent.wasInvoked) " in same rebalance" else "" - s"${revokedTps.size} partitions are revoked$sameRebalance" - } - state <- currentStateRef.get - streamsToEnd = if (restartStreamsOnRebalancing && !rebalanceEvent.wasInvoked) state.assignedStreams - else state.assignedStreams.filter(control => revokedTps.contains(control.tp)) - _ <- endStreams(state, streamsToEnd) - _ <- lastRebalanceEvent.set(rebalanceEvent.onRevoked(revokedTps, endedStreams = streamsToEnd)) - _ <- ZIO.logTrace("onRevoked done") - } yield (), - onLost = lostTps => - for { - _ <- ZIO.logDebug(s"${lostTps.size} partitions are lost") - rebalanceEvent <- lastRebalanceEvent.get - state <- currentStateRef.get - lostStreams = state.assignedStreams.filter(control => lostTps.contains(control.tp)) - _ <- ZIO.foreachDiscard(lostStreams)(_.lost) - _ <- lastRebalanceEvent.set(rebalanceEvent.onLost(lostTps, lostStreams)) - _ <- ZIO.logTrace(s"onLost done") - } yield () - ) - // Here we just want to avoid any executor shift if the user provided listener is the noop listener. val userRebalanceListener = settings.rebalanceListener match { @@ -319,94 +77,9 @@ private[consumer] final class Runloop private ( case _ => settings.rebalanceListener.runOnExecutor(topLevelExecutor) } - RebalanceListener.toKafka(recordRebalanceRebalancingListener ++ userRebalanceListener, sameThreadRuntime) + RebalanceListener.toKafka(rebalanceCoordinator.toRebalanceListener ++ userRebalanceListener, sameThreadRuntime) } - /** This is the implementation behind the user facing api `Offset.commit`. */ - private val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] = - offsets => - for { - p <- Promise.make[Throwable, Unit] - startTime = java.lang.System.nanoTime() - _ <- commitQueue.offer(Runloop.Commit(java.lang.System.nanoTime(), offsets, p)) - commitAvailable <- commitAvailable.getAndSet(true) - _ <- commandQueue.offer(RunloopCommand.CommitAvailable).unless(commitAvailable) - _ <- diagnostics.emit(DiagnosticEvent.Commit.Started(offsets)) - _ <- p.await.timeoutFail(CommitTimeout)(commitTimeout) - endTime = java.lang.System.nanoTime() - latency = (endTime - startTime).nanoseconds - _ <- consumerMetrics.observeCommit(latency) - } yield () - - /** Merge commits and prepare parameters for calling `consumer.commitAsync`. */ - private def asyncCommitParameters( - commits: Chunk[Runloop.Commit] - ): (JavaMap[TopicPartition, OffsetAndMetadata], OffsetCommitCallback, Throwable => UIO[Unit]) = { - val offsets = commits - .foldLeft(mutable.Map.empty[TopicPartition, OffsetAndMetadata]) { case (acc, commit) => - commit.offsets.foreach { case (tp, offset) => - acc += (tp -> acc - .get(tp) - .map(current => if (current.offset() > offset.offset()) current else offset) - .getOrElse(offset)) - } - acc - } - .toMap - val offsetsWithMetaData = offsets.map { case (tp, offset) => - tp -> new OffsetAndMetadata(offset.offset + 1, offset.leaderEpoch, offset.metadata) - } - val cont = (e: Exit[Throwable, Unit]) => ZIO.foreachDiscard(commits)(_.cont.done(e)) - // We assume the commit is started immediately after returning from this method. - val startTime = java.lang.System.nanoTime() - val onSuccess = { - val endTime = java.lang.System.nanoTime() - val latency = (endTime - startTime).nanoseconds - for { - offsetIncrease <- committedOffsetsRef.modify(_.addCommits(commits)) - _ <- consumerMetrics.observeAggregatedCommit(latency, offsetIncrease).when(commits.nonEmpty) - result <- cont(Exit.unit) - _ <- diagnostics.emit(DiagnosticEvent.Commit.Success(offsetsWithMetaData)) - } yield result - } - val onFailure: Throwable => UIO[Unit] = { - case _: RebalanceInProgressException => - for { - _ <- ZIO.logDebug(s"Rebalance in progress, commit for offsets $offsets will be retried") - _ <- commitQueue.offerAll(commits) - _ <- commandQueue.offer(RunloopCommand.CommitAvailable) - } yield () - case err: Throwable => - cont(Exit.fail(err)) <* diagnostics.emit(DiagnosticEvent.Commit.Failure(offsetsWithMetaData, err)) - } - val callback = - new OffsetCommitCallback { - override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = - Unsafe.unsafe { implicit u => - sameThreadRuntime.unsafe.run { - if (exception eq null) onSuccess else onFailure(exception) - } - .getOrThrowFiberFailure() - } - } - (offsetsWithMetaData.asJava, callback, onFailure) - } - - private def handleCommits(state: State, commits: Chunk[Runloop.Commit]): UIO[State] = - if (commits.isEmpty) { - ZIO.succeed(state) - } else { - val (offsets, callback, onFailure) = asyncCommitParameters(commits) - val newState = state.addPendingCommits(commits) - consumer.runloopAccess { c => - // We don't wait for the completion of the commit here, because it - // will only complete once we poll again. - ZIO.attempt(c.commitAsync(offsets, callback)) - } - .catchAll(onFailure) - .as(newState) - } - /** * Does all needed to end revoked partitions: * 1. Complete the revoked assigned streams 2. Remove from the list of pending requests @@ -470,7 +143,7 @@ private[consumer] final class Runloop private ( builder += CommittableRecord[Array[Byte], Array[Byte]]( record = consumerRecord, - commitHandle = commit, + commitHandle = committer.commit, consumerGroupMetadata = consumerGroupMetadata ) } @@ -537,7 +210,7 @@ private[consumer] final class Runloop private ( partitionsToFetch <- settings.fetchStrategy.selectPartitionsToFetch(state.assignedStreams) _ <- ZIO.logDebug( s"Starting poll with ${state.pendingRequests.size} pending requests and" + - s" ${state.pendingCommits.size} pending commits," + + s" ${committer.pendingCommitCount} pending commits," + s" resuming $partitionsToFetch partitions" ) _ <- currentStateRef.set(state) @@ -561,7 +234,7 @@ private[consumer] final class Runloop private ( tpWithoutData = requestedPartitions -- providedTps ) } - pollresult <- lastRebalanceEvent.getAndSet(RebalanceEvent.None).flatMap { + pollresult <- rebalanceCoordinator.getAndResetLastEvent.flatMap { case RebalanceEvent(false, _, _, _, _) => // The fast track, rebalance listener was not invoked: // no assignment changes, no new commits, only new records. @@ -615,8 +288,7 @@ private[consumer] final class Runloop private ( // Remove committed offsets for partitions that are no longer assigned: // NOTE: the type annotation is needed to keep the IntelliJ compiler happy. _ <- - committedOffsetsRef - .update(_.keepPartitions(updatedAssignedStreams.map(_.tp).toSet)): Task[Unit] + committer.keepCommitsForPartitions(updatedAssignedStreams.map(_.tp).toSet): Task[Unit] _ <- consumerMetrics.observeRebalance( currentAssigned.size, @@ -658,11 +330,10 @@ private[consumer] final class Runloop private ( pollResult.ignoreRecordsForTps, pollResult.records ) - updatedPendingCommits <- ZIO.filter(state.pendingCommits)(_.isPending) - _ <- checkStreamPullInterval(pollResult.assignedStreams) + _ <- committer.cleanupPendingCommits + _ <- checkStreamPullInterval(pollResult.assignedStreams) } yield state.copy( pendingRequests = fulfillResult.pendingRequests, - pendingCommits = updatedPendingCommits, assignedStreams = pollResult.assignedStreams ) } @@ -818,23 +489,25 @@ private[consumer] final class Runloop private ( ZStream .fromQueue(commandQueue) + .merge(ZStream.fromQueue(commitAvailableQueue).as(RunloopCommand.CommitAvailable)) .takeWhile(_ != RunloopCommand.StopRunloop) .runFoldChunksDiscardZIO(initialState) { (state, commands) => for { - _ <- commitAvailable.set(false) - commitCommands <- commitQueue.takeAll - _ <- ZIO.logDebug( - s"Processing ${commitCommands.size} commits," + - s" ${commands.size} commands: ${commands.mkString(",")}" - ) - stateAfterCommits <- handleCommits(state, commitCommands) + _ <- ZIO.logDebug(s"Processing ${commands.size} commands: ${commands.mkString(",")}") + _ <- consumer.runloopAccess { consumer => + committer.processQueuedCommits((offsets, callback) => + ZIO.attempt(consumer.commitAsync(offsets, callback)) + ) + } streamCommands = commands.collect { case cmd: RunloopCommand.StreamCommand => cmd } - stateAfterCommands <- ZIO.foldLeft(streamCommands)(stateAfterCommits)(handleCommand) + stateAfterCommands <- ZIO.foldLeft(streamCommands)(state)(handleCommand) - updatedStateAfterPoll <- if (stateAfterCommands.shouldPoll) handlePoll(stateAfterCommands) - else ZIO.succeed(stateAfterCommands) + updatedStateAfterPoll <- shouldPoll(stateAfterCommands).flatMap { + case true => handlePoll(stateAfterCommands) + case false => ZIO.succeed(stateAfterCommands) + } // Immediately poll again, after processing all new queued commands - _ <- if (updatedStateAfterPoll.shouldPoll) commandQueue.offer(RunloopCommand.Poll) else ZIO.unit + _ <- ZIO.whenZIO(shouldPoll(updatedStateAfterPoll))(commandQueue.offer(RunloopCommand.Poll)) // Save the current state for other parts of Runloop (read-only, for metrics only) _ <- currentStateRef.set(updatedStateAfterPoll) } yield updatedStateAfterPoll @@ -843,13 +516,19 @@ private[consumer] final class Runloop private ( .onError(cause => partitionsHub.offer(Take.failCause(cause))) } + def shouldPoll(state: State): UIO[Boolean] = + committer.pendingCommitCount.map { pendingCommitCount => + state.subscriptionState.isSubscribed && (state.pendingRequests.nonEmpty || pendingCommitCount > 0 || state.assignedStreams.isEmpty) + } + private def observeRunloopMetrics(runloopMetricsSchedule: Schedule[Any, Unit, Long]): ZIO[Any, Nothing, Unit] = { val observe = for { - currentState <- currentStateRef.get - commandQueueSize <- commandQueue.size - commitQueueSize <- commitQueue.size + currentState <- currentStateRef.get + commandQueueSize <- commandQueue.size + commitQueueSize <- committer.queueSize + pendingCommitCount <- committer.pendingCommitCount _ <- consumerMetrics - .observeRunloopMetrics(currentState, commandQueueSize, commitQueueSize) + .observeRunloopMetrics(currentState, commandQueueSize, commitQueueSize, pendingCommitCount) } yield () observe @@ -895,57 +574,6 @@ object Runloop { pendingRequests: Chunk[RunloopCommand.Request] ) - private final case class RebalanceEvent( - wasInvoked: Boolean, - assignedTps: Set[TopicPartition], - revokedTps: Set[TopicPartition], - lostTps: Set[TopicPartition], - endedStreams: Chunk[PartitionStreamControl] - ) { - def onAssigned( - assigned: Set[TopicPartition], - endedStreams: Chunk[PartitionStreamControl] - ): RebalanceEvent = - copy( - wasInvoked = true, - assignedTps = assignedTps ++ assigned, - endedStreams = this.endedStreams ++ endedStreams - ) - - def onRevoked( - revoked: Set[TopicPartition], - endedStreams: Chunk[PartitionStreamControl] - ): RebalanceEvent = - copy( - wasInvoked = true, - assignedTps = assignedTps -- revoked, - revokedTps = revokedTps ++ revoked, - endedStreams = this.endedStreams ++ endedStreams - ) - - def onLost(lost: Set[TopicPartition], endedStreams: Chunk[PartitionStreamControl]): RebalanceEvent = - copy( - wasInvoked = true, - assignedTps = assignedTps -- lost, - lostTps = lostTps ++ lost, - endedStreams = this.endedStreams ++ endedStreams - ) - } - - private object RebalanceEvent { - val None: RebalanceEvent = - RebalanceEvent(wasInvoked = false, Set.empty, Set.empty, Set.empty, Chunk.empty) - } - - private[internal] final case class Commit( - createdAt: NanoTime, - offsets: Map[TopicPartition, OffsetAndMetadata], - cont: Promise[Throwable, Unit] - ) { - @inline def isDone: UIO[Boolean] = cont.isDone - @inline def isPending: UIO[Boolean] = isDone.negate - } - private[consumer] def make( settings: ConsumerSettings, maxStreamPullInterval: Duration, @@ -955,31 +583,45 @@ object Runloop { partitionsHub: Hub[Take[Throwable, PartitionAssignment]] ): URIO[Scope, Runloop] = for { - _ <- ZIO.addFinalizer(diagnostics.emit(Finalization.RunloopFinalized)) - commitQueue <- ZIO.acquireRelease(Queue.unbounded[Runloop.Commit])(_.shutdown) - commandQueue <- ZIO.acquireRelease(Queue.unbounded[RunloopCommand])(_.shutdown) - lastRebalanceEvent <- Ref.Synchronized.make[Runloop.RebalanceEvent](Runloop.RebalanceEvent.None) + _ <- ZIO.addFinalizer(diagnostics.emit(Finalization.RunloopFinalized)) + commandQueue <- ZIO.acquireRelease(Queue.unbounded[RunloopCommand])(_.shutdown) + commitAvailableQueue <- ZIO.acquireRelease(Queue.dropping[Boolean](1))(_.shutdown) + lastRebalanceEvent <- Ref.Synchronized.make[RebalanceEvent](RebalanceEvent.None) initialState = State.initial - currentStateRef <- Ref.make(initialState) - commitAvailable <- Ref.make(false) - committedOffsetsRef <- Ref.make(CommitOffsets.empty) - sameThreadRuntime <- ZIO.runtime[Any].provideLayer(SameThreadRuntimeLayer) - executor <- ZIO.executor + currentStateRef <- Ref.make(initialState) + sameThreadRuntime <- ZIO.runtime[Any].provideLayer(SameThreadRuntimeLayer) + executor <- ZIO.executor + metrics = new ZioConsumerMetrics(settings.metricLabels) + committer <- LiveCommitter + .make( + settings.commitTimeout, + diagnostics, + metrics, + commitAvailableQueue.offer(true).unit, + sameThreadRuntime + ) + rebalanceCoordinator = new RebalanceCoordinator( + lastRebalanceEvent, + settings, + consumer, + maxRebalanceDuration, + currentStateRef.get.map(_.assignedStreams), + committer + ) runloop = new Runloop( settings = settings, topLevelExecutor = executor, sameThreadRuntime = sameThreadRuntime, consumer = consumer, - commitQueue = commitQueue, commandQueue = commandQueue, - lastRebalanceEvent = lastRebalanceEvent, + commitAvailableQueue = commitAvailableQueue, partitionsHub = partitionsHub, diagnostics = diagnostics, maxStreamPullInterval = maxStreamPullInterval, - maxRebalanceDuration = maxRebalanceDuration, currentStateRef = currentStateRef, - committedOffsetsRef = committedOffsetsRef, - commitAvailable = commitAvailable + consumerMetrics = metrics, + rebalanceCoordinator = rebalanceCoordinator, + committer = committer ) _ <- ZIO.logDebug("Starting Runloop") @@ -1000,62 +642,18 @@ object Runloop { private[internal] final case class State( pendingRequests: Chunk[RunloopCommand.Request], - pendingCommits: Chunk[Runloop.Commit], assignedStreams: Chunk[PartitionStreamControl], subscriptionState: SubscriptionState ) { - def addPendingCommits(c: Chunk[Runloop.Commit]): State = copy(pendingCommits = pendingCommits ++ c) - def addRequest(r: RunloopCommand.Request): State = copy(pendingRequests = pendingRequests :+ r) - - def shouldPoll: Boolean = - subscriptionState.isSubscribed && (pendingRequests.nonEmpty || pendingCommits.nonEmpty || assignedStreams.isEmpty) + def addRequest(r: RunloopCommand.Request): State = copy(pendingRequests = pendingRequests :+ r) } private object State { val initial: State = State( pendingRequests = Chunk.empty, - pendingCommits = Chunk.empty, assignedStreams = Chunk.empty, subscriptionState = SubscriptionState.NotSubscribed ) } - // package private for unit testing - private[internal] final case class CommitOffsets(offsets: Map[TopicPartition, Long]) { - - /** Returns an estimate of the total offset increase, and a new `CommitOffsets` with the given offsets added. */ - def addCommits(c: Chunk[Runloop.Commit]): (Long, CommitOffsets) = { - val updatedOffsets = mutable.Map.empty[TopicPartition, Long] - updatedOffsets.sizeHint(offsets.size) - updatedOffsets ++= offsets - var offsetIncrease = 0L - c.foreach { commit => - commit.offsets.foreach { case (tp, offsetAndMeta) => - val offset = offsetAndMeta.offset() - val maxOffset = updatedOffsets.get(tp) match { - case Some(existingOffset) => - offsetIncrease += max(0L, offset - existingOffset) - max(existingOffset, offset) - case None => - // This partition was not committed to from this consumer yet. Therefore we do not know the offset - // increase. A good estimate would be the poll size for this consumer, another okayish estimate is 0. - // Lets go with the simplest for now: ```offsetIncrease += 0``` - offset - } - updatedOffsets += tp -> maxOffset - } - } - (offsetIncrease, CommitOffsets(offsets = updatedOffsets.toMap)) - } - - def keepPartitions(tps: Set[TopicPartition]): CommitOffsets = - CommitOffsets(offsets.filter { case (tp, _) => tps.contains(tp) }) - - def contains(tp: TopicPartition, offset: Long): Boolean = - offsets.get(tp).exists(_ >= offset) - } - - private[internal] object CommitOffsets { - val empty: CommitOffsets = CommitOffsets(Map.empty) - } }