diff --git a/README.md b/README.md index f9e5562ab..43b8c3b8d 100644 --- a/README.md +++ b/README.md @@ -19,8 +19,8 @@ Kafka has a mature Java client for producing and consuming events, but it has a In order to use this library, we need to add the following line in our `build.sbt` file: ```scala -libraryDependencies += "dev.zio" %% "zio-kafka" % "2.7.1" -libraryDependencies += "dev.zio" %% "zio-kafka-testkit" % "2.7.1" % Test +libraryDependencies += "dev.zio" %% "zio-kafka" % "2.7.2" +libraryDependencies += "dev.zio" %% "zio-kafka-testkit" % "2.7.2" % Test ``` ## Example diff --git a/build.sbt b/build.sbt index 1433a0308..ebecf9742 100644 --- a/build.sbt +++ b/build.sbt @@ -140,7 +140,7 @@ lazy val zioKafkaTest = libraryDependencies ++= Seq( kafkaClients, logback % Test, - "dev.zio" %% "zio-logging-slf4j" % "2.1.16" % Test, + "dev.zio" %% "zio-logging-slf4j" % "2.2.0" % Test, scalaCollectionCompat ) ++ `embedded-kafka`.value ) @@ -165,7 +165,7 @@ lazy val zioKafkaExample = libraryDependencies ++= Seq( "dev.zio" %% "zio" % "2.0.20", "dev.zio" %% "zio-kafka" % "2.7.2", - "dev.zio" %% "zio-logging-slf4j2" % "2.1.16", + "dev.zio" %% "zio-logging-slf4j2" % "2.2.0", "io.github.embeddedkafka" %% "embedded-kafka" % embeddedKafkaVersion, logback, "dev.zio" %% "zio-kafka-testkit" % "2.7.2" % Test, diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopCommitOffsetsSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopCommitOffsetsSpec.scala index 1ae1ad54e..77dbc77b0 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopCommitOffsetsSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopCommitOffsetsSpec.scala @@ -16,39 +16,57 @@ object RunloopCommitOffsetsSpec extends ZIOSpecDefault { override def spec: Spec[TestEnvironment with Scope, Any] = suite("Runloop.CommitOffsets spec")( test("addCommits adds to empty CommitOffsets") { - val s1 = Runloop.CommitOffsets(Map.empty) - val s2 = s1.addCommits(Chunk(makeCommit(Map(tp10 -> 10)))) - assertTrue(s2.offsets == Map(tp10 -> 10L)) + val s1 = Runloop.CommitOffsets(Map.empty) + val (inc, s2) = s1.addCommits(Chunk(makeCommit(Map(tp10 -> 10)))) + assertTrue( + inc == 0, + s2.offsets == Map(tp10 -> 10L) + ) }, test("addCommits updates offset when it is higher") { - val s1 = Runloop.CommitOffsets(Map(tp10 -> 5L)) - val s2 = s1.addCommits(Chunk(makeCommit(Map(tp10 -> 10)))) - assertTrue(s2.offsets == Map(tp10 -> 10L)) + val s1 = Runloop.CommitOffsets(Map(tp10 -> 4L)) + val (inc, s2) = s1.addCommits(Chunk(makeCommit(Map(tp10 -> 10)))) + assertTrue( + inc == 10 - 4, + s2.offsets == Map(tp10 -> 10L) + ) }, test("addCommits ignores an offset when it is lower") { - val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L)) - val s2 = s1.addCommits(Chunk(makeCommit(Map(tp10 -> 5)))) - assertTrue(s2.offsets == Map(tp10 -> 10L)) + val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L)) + val (inc, s2) = s1.addCommits(Chunk(makeCommit(Map(tp10 -> 5)))) + assertTrue( + inc == 0, + s2.offsets == Map(tp10 -> 10L) + ) }, test("addCommits keeps unrelated partitions") { - val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L)) - val s2 = s1.addCommits(Chunk(makeCommit(Map(tp11 -> 11)))) - assertTrue(s2.offsets == Map(tp10 -> 10L, tp11 -> 11L)) + val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L)) + val (inc, s2) = s1.addCommits(Chunk(makeCommit(Map(tp11 -> 11)))) + assertTrue( + inc == 0, + s2.offsets == Map(tp10 -> 10L, tp11 -> 11L) + ) }, test("addCommits does it all at once") { - val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 205L, tp21 -> 210L, tp22 -> 220L)) - val s2 = s1.addCommits(Chunk(makeCommit(Map(tp11 -> 11, tp20 -> 206L, tp21 -> 209L, tp22 -> 220L)))) - assertTrue(s2.offsets == Map(tp10 -> 10L, tp11 -> 11L, tp20 -> 206L, tp21 -> 210L, tp22 -> 220L)) + val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 205L, tp21 -> 210L, tp22 -> 220L)) + val (inc, s2) = s1.addCommits(Chunk(makeCommit(Map(tp11 -> 11, tp20 -> 206L, tp21 -> 209L, tp22 -> 220L)))) + assertTrue( + inc == /* tp10 */ 0 + /* tp11 */ 0 + /* tp20 */ 1 + /* tp21 */ 0 + /* tp22 */ 0, + s2.offsets == Map(tp10 -> 10L, tp11 -> 11L, tp20 -> 206L, tp21 -> 210L, tp22 -> 220L) + ) }, test("addCommits adds multiple commits") { val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 200L, tp21 -> 210L, tp22 -> 220L)) - val s2 = s1.addCommits( + val (inc, s2) = s1.addCommits( Chunk( makeCommit(Map(tp11 -> 11, tp20 -> 199L, tp21 -> 211L, tp22 -> 219L)), makeCommit(Map(tp20 -> 198L, tp21 -> 209L, tp22 -> 221L)) ) ) - assertTrue(s2.offsets == Map(tp10 -> 10L, tp11 -> 11L, tp20 -> 200L, tp21 -> 211L, tp22 -> 221L)) + assertTrue( + inc == /* tp10 */ 0 + /* tp11 */ 0 + /* tp20 */ 0 + /* tp21 */ 1 + /* tp22 */ 1, + s2.offsets == Map(tp10 -> 10L, tp11 -> 11L, tp20 -> 200L, tp21 -> 211L, tp22 -> 221L) + ) }, test("keepPartitions removes some partitions") { val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L)) @@ -80,6 +98,6 @@ object RunloopCommitOffsetsSpec extends ZIOSpecDefault { private def makeCommit(offsets: Map[TopicPartition, Long]): Runloop.Commit = { val o = offsets.map { case (tp, offset) => tp -> new OffsetAndMetadata(offset) } val p = Unsafe.unsafe(implicit unsafe => Promise.unsafe.make[Throwable, Unit](FiberId.None)) - Runloop.Commit(o, p) + Runloop.Commit(0L, o, p) } } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala index a7c10ed06..23cd628c1 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala @@ -5,6 +5,7 @@ import zio._ import zio.kafka.consumer.Consumer.OffsetRetrieval import zio.kafka.consumer.fetch.{ FetchStrategy, QueueSizeBasedFetchStrategy } import zio.kafka.security.KafkaCredentialStore +import zio.metrics.MetricLabel /** * Settings for the consumer. @@ -30,7 +31,9 @@ final case class ConsumerSettings( restartStreamOnRebalancing: Boolean = false, rebalanceSafeCommits: Boolean = false, maxRebalanceDuration: Option[Duration] = None, - fetchStrategy: FetchStrategy = QueueSizeBasedFetchStrategy() + fetchStrategy: FetchStrategy = QueueSizeBasedFetchStrategy(), + metricLabels: Set[MetricLabel] = Set.empty, + runloopMetricsSchedule: Schedule[Any, Unit, Long] = Schedule.fixed(500.millis) ) { /** @@ -278,6 +281,29 @@ final case class ConsumerSettings( */ def withFetchStrategy(fetchStrategy: FetchStrategy): ConsumerSettings = copy(fetchStrategy = fetchStrategy) + + /** + * @param metricLabels + * The labels given to all metrics collected by zio-kafka. By default no labels are set. + * + * For applications with multiple consumers it is recommended to set some metric labels. For example, if one is used, + * the consumer group id could be used as a label: + * + * {{{ + * consumerSettings.withMetricLabels(Set(MetricLabel("group-id", groupId))) + * }}} + */ + def withMetricsLabels(metricLabels: Set[MetricLabel]): ConsumerSettings = + copy(metricLabels = metricLabels) + + /** + * @param runloopMetricsSchedule + * The schedule at which the runloop metrics are measured. Example runloop metrics are queue sizes and number of + * outstanding commits. The default is to measure every 500ms. + */ + def withRunloopMetricsSchedule(runloopMetricsSchedule: Schedule[Any, Unit, Long]): ConsumerSettings = + copy(runloopMetricsSchedule = runloopMetricsSchedule) + } object ConsumerSettings { diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerMetrics.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerMetrics.scala new file mode 100644 index 000000000..44aede826 --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerMetrics.scala @@ -0,0 +1,345 @@ +package zio.kafka.consumer.internal + +import zio.metrics.MetricKeyType.Histogram +import zio.metrics._ +import zio._ + +/** + * Implementations of this trait are responsible for measuring all consumer metrics. The different methods are invoked + * from different places in the consumer. + * + * WARNING: This is an INTERNAL API and may change in an incompatible way, or disappear, without notice, in any + * zio-kafka version. + */ +private[internal] trait ConsumerMetrics { + def observePoll(resumedCount: Int, pausedCount: Int, latency: Duration, pollSize: Int): UIO[Unit] + def observeCommit(latency: Duration): UIO[Unit] + def observeAggregatedCommit(latency: Duration, commitSize: Long): UIO[Unit] + def observeRebalance(currentlyAssignedCount: Int, assignedCount: Int, revokedCount: Int, lostCount: Int): UIO[Unit] + def observeRunloopMetrics(state: Runloop.State, commandQueueSize: Int, commitQueueSize: Int): UIO[Unit] +} + +/** + * A `ConsumerMetrics` that uses zio-metrics for measuring. + * + * Sub-classes are allowed to override the Histogram boundaries. + * + * WARNING: This is an INTERNAL API and may change in an incompatible way, or disappear, without notice, in any + * zio-kafka version. + * + * @param metricLabels + * the metric labels that are added to each metric + */ +//noinspection ScalaWeakerAccess +private[internal] class ZioConsumerMetrics(metricLabels: Set[MetricLabel]) extends ConsumerMetrics { + + // ----------------------------------------------------- + // + // Poll metrics + // + + // 0.01,0.03,0.08,0.21,0.55,1.49,4.04,10.97,29.81,81.04 in seconds + // 10,30,80,210,550,1490,4040,10970,29810,81040 in milliseconds + protected val pollLatencyBoundaries: Histogram.Boundaries = + MetricKeyType.Histogram.Boundaries.fromChunk( + Chunk.iterate(0.01, 10)(_ * Math.E).map(d => Math.ceil(d * 100.0) / 100.0) + ) + + // 1,3,8,21,55,149,404,1097,2981,8104 + protected val pollSizeBoundaries: Histogram.Boundaries = + MetricKeyType.Histogram.Boundaries.fromChunk(Chunk.iterate(1.0, 10)(_ * Math.E).map(Math.ceil)) + + private val pollCounter: Metric.Counter[Int] = + Metric + .counterInt("ziokafka_consumer_polls", "The number of polls.") + .tagged(metricLabels) + + private val partitionsResumedInLatestPollGauge: Metric.Gauge[Int] = + Metric + .gauge( + "ziokafka_consumer_partitions_resumed_in_latest_poll", + "The number of partitions resumed in the latest poll call." + ) + .contramap[Int](_.toDouble) + .tagged(metricLabels) + + private val partitionsPausedInLatestPollGauge: Metric.Gauge[Int] = + Metric + .gauge( + "ziokafka_consumer_partitions_paused_in_latest_poll", + "The number of partitions paused in the latest poll call (because of backpressure)." + ) + .contramap[Int](_.toDouble) + .tagged(metricLabels) + + private val pollLatencyHistogram: Metric.Histogram[Duration] = + Metric + .histogram( + "ziokafka_consumer_poll_latency_seconds", + "The duration of a single poll in seconds.", + pollLatencyBoundaries + ) + .contramap[Duration](_.toNanos.toDouble / 1e9) + .tagged(metricLabels) + + private val pollSizeHistogram: Metric.Histogram[Int] = + Metric + .histogram( + "ziokafka_consumer_poll_size", + "The number of records fetched by a single poll.", + pollSizeBoundaries + ) + .contramap[Int](_.toDouble) + .tagged(metricLabels) + + override def observePoll(resumedCount: Int, pausedCount: Int, latency: Duration, pollSize: Int): UIO[Unit] = + for { + _ <- pollCounter.increment + _ <- partitionsResumedInLatestPollGauge.update(resumedCount) + _ <- partitionsPausedInLatestPollGauge.update(pausedCount) + _ <- pollLatencyHistogram.update(latency) + _ <- pollSizeHistogram.update(pollSize) + } yield () + + // ----------------------------------------------------- + // + // Commit metrics + // + + // 0.01,0.03,0.08,0.21,0.55,1.49,4.04,10.97,29.81,81.04 in seconds + // 10,30,80,210,550,1490,4040,10970,29810,81040 in milliseconds + protected val commitLatencyBoundaries: Histogram.Boundaries = + MetricKeyType.Histogram.Boundaries.fromChunk( + Chunk.iterate(0.01, 10)(_ * Math.E).map(d => Math.ceil(d * 100.0) / 100.0) + ) + + private val commitCounter: Metric.Counter[Int] = + Metric + .counterInt("ziokafka_consumer_commits", "The number of commits.") + .tagged(metricLabels) + + private val commitLatencyHistogram: Metric.Histogram[Duration] = + Metric + .histogram( + "ziokafka_consumer_commit_latency_seconds", + "The duration of a commit in seconds.", + commitLatencyBoundaries + ) + .contramap[Duration](_.toNanos.toDouble / 1e9) + .tagged(metricLabels) + + override def observeCommit(latency: zio.Duration): UIO[Unit] = + for { + _ <- commitCounter.increment + _ <- commitLatencyHistogram.update(latency) + } yield () + + // ----------------------------------------------------- + // + // Aggregated commit metrics + // + // Each runloop cycle zio-kafka aggregates all commit requests into a single aggregated commit. + // + + // 0.01,0.03,0.08,0.21,0.55,1.49,4.04,10.97,29.81,81.04 in seconds + // 10,30,80,210,550,1490,4040,10970,29810,81040 in milliseconds + protected val aggregatedCommitLatencyBoundaries: Histogram.Boundaries = + MetricKeyType.Histogram.Boundaries.fromChunk( + Chunk.iterate(0.01, 10)(_ * Math.E).map(d => Math.ceil(d * 100.0) / 100.0) + ) + + // 1,3,8,21,55,149,404,1097,2981,8104 + protected val aggregatedCommitSizeBoundaries: Histogram.Boundaries = + MetricKeyType.Histogram.Boundaries.fromChunk(Chunk.iterate(1.0, 10)(_ * Math.E).map(Math.ceil)) + + private val aggregatedCommitCounter: Metric.Counter[Int] = + Metric + .counterInt("ziokafka_consumer_aggregated_commits", "The number of aggregated commits.") + .tagged(metricLabels) + + private val aggregatedCommitLatencyHistogram: Metric.Histogram[Duration] = + Metric + .histogram( + "ziokafka_consumer_aggregated_commit_latency_seconds", + "The duration of an aggregated commit in seconds.", + aggregatedCommitLatencyBoundaries + ) + .contramap[Duration](_.toNanos.toDouble / 1e9) + .tagged(metricLabels) + + // Note: the metric is an approximation because the first commit to a partition is not included. + private val aggregatedCommitSizeHistogram: Metric.Histogram[Long] = + Metric + .histogram( + "ziokafka_consumer_aggregated_commit_size", + "An approximation of the number of records (offsets) per aggregated commit.", + aggregatedCommitSizeBoundaries + ) + .contramap[Long](_.toDouble) + .tagged(metricLabels) + + override def observeAggregatedCommit(latency: Duration, commitSize: Long): UIO[Unit] = + for { + _ <- aggregatedCommitCounter.increment + _ <- aggregatedCommitLatencyHistogram.update(latency) + _ <- aggregatedCommitSizeHistogram.update(commitSize) + } yield () + + // ----------------------------------------------------- + // + // Rebalance metrics + // + + private val rebalanceCounter: Metric.Counter[Int] = + Metric + .counterInt("ziokafka_consumer_rebalances", "The number of rebalances") + .tagged(metricLabels) + + private val partitionsCurrentlyAssignedGauge: Metric.Gauge[Int] = + Metric + .gauge( + "ziokafka_consumer_partitions_currently_assigned", + "The number of partitions currently assigned to the consumer" + ) + .contramap[Int](_.toDouble) + .tagged(metricLabels) + + private def partitionsToStateCounter(state: String): Metric.Counter[Int] = + Metric + .counterInt( + s"ziokafka_consumer_partitions_$state", + s"The number of partitions $state to the consumer" + ) + .tagged(metricLabels) + + private val partitionsAssignedCounter = partitionsToStateCounter("assigned") + private val partitionsRevokedCounter = partitionsToStateCounter("revoked") + private val partitionsLostCounter = partitionsToStateCounter("lost") + + override def observeRebalance( + currentlyAssignedCount: Int, + assignedCount: Int, + revokedCount: Int, + lostCount: Int + ): UIO[Unit] = + for { + _ <- rebalanceCounter.increment + _ <- partitionsCurrentlyAssignedGauge.update(currentlyAssignedCount) + _ <- partitionsAssignedCounter.incrementBy(assignedCount) + _ <- partitionsRevokedCounter.incrementBy(revokedCount) + _ <- partitionsLostCounter.incrementBy(lostCount) + } yield () + + // ----------------------------------------------------- + // + // Runloop metrics + // + + // 0,1,3,8,21,55,149,404,1097,2981 + protected val streamCountBoundaries: Histogram.Boundaries = + MetricKeyType.Histogram.Boundaries.fromChunk(Chunk(0.0) ++ Chunk.iterate(1.0, 9)(_ * Math.E).map(Math.ceil)) + + // 0,100,272,739,2009,5460,14842,40343,109664,298096 + protected val streamSizeBoundaries: Histogram.Boundaries = + MetricKeyType.Histogram.Boundaries.fromChunk(Chunk(0.0) ++ Chunk.iterate(100.0, 9)(_ * Math.E).map(Math.ceil)) + + protected val queuePollSizeBoundaries: Histogram.Boundaries = + MetricKeyType.Histogram.Boundaries.fromChunk(Chunk[Double](0, 1, 2, 3, 4, 5, 6, 7, 8, 9)) + + private val pendingRequestsHistogram: Metric.Histogram[Int] = + Metric + .histogram( + "ziokafka_consumer_pending_requests", + "The number of partition queues that that ran out of records.", + streamCountBoundaries + ) + .contramap[Int](_.toDouble) + .tagged(metricLabels) + + private val pendingCommitsHistogram: Metric.Histogram[Int] = + Metric + .histogram( + "ziokafka_consumer_pending_commits", + "The number of commits that are awaiting completion.", + streamCountBoundaries + ) + .contramap[Int](_.toDouble) + .tagged(metricLabels) + + private val queueSizeHistogram: Metric.Histogram[Int] = + Metric + .histogram( + "ziokafka_consumer_queue_size", + "The number of records in a partition queue.", + streamSizeBoundaries + ) + .contramap[Int](_.toDouble) + .tagged(metricLabels) + + private val queuePollsHistogram: Metric.Histogram[Int] = + Metric + .histogram( + "ziokafka_consumer_queue_polls", + "The number of polls during which records are idling in a partition queue.", + queuePollSizeBoundaries + ) + .contramap[Int](_.toDouble) + .tagged(metricLabels) + + private val allQueueSizeHistogram: Metric.Histogram[Int] = + Metric + .histogram( + "ziokafka_consumer_all_queue_size", + "The total number of records in all partition queues.", + streamSizeBoundaries + ) + .contramap[Int](_.toDouble) + .tagged(metricLabels) + + private val subscriptionStateGauge: Metric.Gauge[SubscriptionState] = + Metric + .gauge( + "ziokafka_consumer_subscription_state", + "Whether the consumer is subscribed (1) or not (0)." + ) + .contramap[SubscriptionState](s => if (s.isSubscribed) 1 else 0) + .tagged(metricLabels) + + // 0,1,3,8,21,55,149,404,1097,2981 + protected val commandAndCommitQueueSizeBoundaries: Histogram.Boundaries = + MetricKeyType.Histogram.Boundaries.fromChunk(Chunk(0.0) ++ Chunk.iterate(1.0, 9)(_ * Math.E).map(Math.ceil)) + + private val commandQueueSizeHistogram: Metric.Histogram[Int] = + Metric + .histogram( + "ziokafka_consumer_command_queue_size", + "The number of commands queued in the consumer.", + commandAndCommitQueueSizeBoundaries + ) + .contramap[Int](_.toDouble) + .tagged(metricLabels) + + private val commitQueueSizeHistogram: Metric.Histogram[Int] = + Metric + .histogram( + "ziokafka_consumer_commit_queue_size", + "The number of commits queued in the consumer.", + commandAndCommitQueueSizeBoundaries + ) + .contramap[Int](_.toDouble) + .tagged(metricLabels) + + override def observeRunloopMetrics(state: Runloop.State, commandQueueSize: Int, commitQueueSize: Int): UIO[Unit] = + for { + _ <- ZIO.foreachDiscard(state.assignedStreams)(_.outstandingPolls @@ queuePollsHistogram) + queueSizes <- ZIO.foreach(state.assignedStreams)(_.queueSize) + _ <- ZIO.foreachDiscard(queueSizes)(qs => queueSizeHistogram.update(qs)) + _ <- allQueueSizeHistogram.update(queueSizes.sum) + _ <- pendingRequestsHistogram.update(state.pendingRequests.size) + _ <- pendingCommitsHistogram.update(state.pendingCommits.size) + _ <- subscriptionStateGauge.update(state.subscriptionState) + _ <- commandQueueSizeHistogram.update(commandQueueSize) + _ <- commitQueueSizeHistogram.update(commitQueueSize) + } yield () + +} diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala index 0e0b3882e..ea2735824 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala @@ -3,7 +3,7 @@ package zio.kafka.consumer.internal import org.apache.kafka.common.TopicPartition import zio.kafka.consumer.Offset import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics } -import zio.kafka.consumer.internal.PartitionStreamControl.{ NanoTime, QueueInfo } +import zio.kafka.consumer.internal.PartitionStreamControl.QueueInfo import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord import zio.stream.{ Take, ZStream } import zio.{ Chunk, Clock, Duration, LogAnnotation, Promise, Queue, Ref, UIO, ZIO } @@ -50,17 +50,28 @@ final class PartitionStreamControl private ( LogAnnotation("partition", tp.partition().toString) ) - /** Offer new data for the stream to process. */ + /** Offer new data for the stream to process. Should be called on every poll, also when `data.isEmpty` */ private[internal] def offerRecords(data: Chunk[ByteArrayCommittableRecord]): ZIO[Any, Nothing, Unit] = - for { - now <- Clock.nanoTime - newPullDeadline = now + maxPollIntervalNanos - _ <- queueInfoRef.update(_.withOffer(newPullDeadline, data.size)) - _ <- dataQueue.offer(Take.chunk(data)) - } yield () + if (data.isEmpty) { + queueInfoRef.update(_.withEmptyPoll) + } else { + for { + now <- Clock.nanoTime + newPullDeadline = now + maxPollIntervalNanos + _ <- queueInfoRef.update(_.withOffer(newPullDeadline, data.size)) + _ <- dataQueue.offer(Take.chunk(data)) + } yield () + } def queueSize: UIO[Int] = queueInfoRef.get.map(_.size) + /** + * @return + * the number of polls there are records idling in the queue. It is increased on every poll (when the queue is + * nonEmpty) and reset to 0 when the stream pulls the records + */ + def outstandingPolls: UIO[Int] = queueInfoRef.get.map(_.outstandingPolls) + /** * @param now * the time as given by `Clock.nanoTime` @@ -107,8 +118,6 @@ final class PartitionStreamControl private ( object PartitionStreamControl { - type NanoTime = Long - private[internal] def newPartitionStream( tp: TopicPartition, commandQueue: Queue[RunloopCommand], @@ -130,7 +139,7 @@ object PartitionStreamControl { completedPromise <- Promise.make[Nothing, Option[Offset]] dataQueue <- Queue.unbounded[Take[Throwable, ByteArrayCommittableRecord]] now <- Clock.nanoTime - queueInfo <- Ref.make(QueueInfo(now, 0, None)) + queueInfo <- Ref.make(QueueInfo(now, 0, None, 0)) requestAndAwaitData = for { _ <- commandQueue.offer(RunloopCommand.Request(tp)) @@ -171,12 +180,32 @@ object PartitionStreamControl { // The `pullDeadline` is only relevant when `size > 0`. We initialize `pullDeadline` as soon as size goes above 0. // (Note that theoretically `size` can go below 0 when the update operations are reordered.) - private final case class QueueInfo(pullDeadline: NanoTime, size: Int, lastPulledOffset: Option[Offset]) { + private final case class QueueInfo( + pullDeadline: NanoTime, + size: Int, + lastPulledOffset: Option[Offset], + outstandingPolls: Int + ) { + // To be called when a poll resulted in 0 records. + def withEmptyPoll: QueueInfo = + copy(outstandingPolls = outstandingPolls + 1) + + // To be called when a poll resulted in >0 records. def withOffer(newPullDeadline: NanoTime, recordCount: Int): QueueInfo = - QueueInfo(if (size <= 0) newPullDeadline else pullDeadline, size + recordCount, lastPulledOffset) + QueueInfo( + pullDeadline = if (size <= 0) newPullDeadline else pullDeadline, + size = size + recordCount, + lastPulledOffset = lastPulledOffset, + outstandingPolls = outstandingPolls + 1 + ) def withPull(newPullDeadline: NanoTime, records: Chunk[ByteArrayCommittableRecord]): QueueInfo = - QueueInfo(newPullDeadline, size - records.size, records.lastOption.map(_.offset).orElse(lastPulledOffset)) + QueueInfo( + pullDeadline = newPullDeadline, + size = size - records.size, + lastPulledOffset = records.lastOption.map(_.offset).orElse(lastPulledOffset), + outstandingPolls = 0 + ) def deadlineExceeded(now: NanoTime): Boolean = size > 0 && pullDeadline <= now diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 13e729391..c79e671b2 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -8,7 +8,6 @@ import zio.kafka.consumer.Consumer.{ CommitTimeout, OffsetRetrieval } import zio.kafka.consumer._ import zio.kafka.consumer.diagnostics.DiagnosticEvent.{ Finalization, Rebalance } import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics } -import zio.kafka.consumer.fetch.FetchStrategy import zio.kafka.consumer.internal.ConsumerAccess.ByteArrayKafkaConsumer import zio.kafka.consumer.internal.Runloop._ import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment @@ -22,26 +21,26 @@ import scala.jdk.CollectionConverters._ //noinspection SimplifyWhenInspection,SimplifyUnlessInspection private[consumer] final class Runloop private ( + settings: ConsumerSettings, sameThreadRuntime: Runtime[Any], - hasGroupId: Boolean, consumer: ConsumerAccess, - pollTimeout: Duration, maxPollInterval: Duration, - commitTimeout: Duration, commitQueue: Queue[Commit], commandQueue: Queue[RunloopCommand], lastRebalanceEvent: Ref.Synchronized[Runloop.RebalanceEvent], partitionsHub: Hub[Take[Throwable, PartitionAssignment]], diagnostics: Diagnostics, - offsetRetrieval: OffsetRetrieval, - userRebalanceListener: RebalanceListener, - restartStreamsOnRebalancing: Boolean, maxRebalanceDuration: Duration, - rebalanceSafeCommits: Boolean, currentStateRef: Ref[State], - committedOffsetsRef: Ref[CommitOffsets], - fetchStrategy: FetchStrategy + committedOffsetsRef: Ref[CommitOffsets] ) { + private val commitTimeout = settings.commitTimeout + private val commitTimeoutNanos = settings.commitTimeout.toNanos + + private val restartStreamsOnRebalancing = settings.restartStreamOnRebalancing + private val rebalanceSafeCommits = settings.rebalanceSafeCommits + + private val consumerMetrics = new ZioConsumerMetrics(settings.metricLabels) private def newPartitionStream(tp: TopicPartition): UIO[PartitionStreamControl] = PartitionStreamControl.newPartitionStream(tp, commandQueue, diagnostics, maxPollInterval) @@ -103,7 +102,7 @@ private[consumer] final class Runloop private ( state: State, streamsToEnd: Chunk[PartitionStreamControl] ): Task[Unit] = { - val deadline = java.lang.System.nanoTime() + maxRebalanceDuration.toNanos - commitTimeout.toNanos + val deadline = java.lang.System.nanoTime() + maxRebalanceDuration.toNanos - commitTimeoutNanos val endingTps = streamsToEnd.map(_.tp).toSet @@ -240,7 +239,7 @@ private[consumer] final class Runloop private ( } yield () ) - recordRebalanceRebalancingListener ++ userRebalanceListener + recordRebalanceRebalancingListener ++ settings.rebalanceListener } /** This is the implementation behind the user facing api `Offset.commit`. */ @@ -248,10 +247,14 @@ private[consumer] final class Runloop private ( offsets => for { p <- Promise.make[Throwable, Unit] - _ <- commitQueue.offer(Runloop.Commit(offsets, p)) + startTime = java.lang.System.nanoTime() + _ <- commitQueue.offer(Runloop.Commit(java.lang.System.nanoTime(), offsets, p)) _ <- commandQueue.offer(RunloopCommand.CommitAvailable) _ <- diagnostics.emit(DiagnosticEvent.Commit.Started(offsets)) _ <- p.await.timeoutFail(CommitTimeout)(commitTimeout) + endTime = java.lang.System.nanoTime() + latency = (endTime - startTime).nanoseconds + _ <- consumerMetrics.observeCommit(latency) } yield () /** Merge commits and prepare parameters for calling `consumer.commitAsync`. */ @@ -273,10 +276,18 @@ private[consumer] final class Runloop private ( tp -> new OffsetAndMetadata(offset.offset + 1, offset.leaderEpoch, offset.metadata) } val cont = (e: Exit[Throwable, Unit]) => ZIO.foreachDiscard(commits)(_.cont.done(e)) - val onSuccess = - committedOffsetsRef.update(_.addCommits(commits)) *> - cont(Exit.unit) <* - diagnostics.emit(DiagnosticEvent.Commit.Success(offsetsWithMetaData)) + // We assume the commit is started immediately after returning from this method. + val startTime = java.lang.System.nanoTime() + val onSuccess = { + val endTime = java.lang.System.nanoTime() + val latency = (endTime - startTime).nanoseconds + for { + offsetIncrease <- committedOffsetsRef.modify(_.addCommits(commits)) + _ <- consumerMetrics.observeAggregatedCommit(latency, offsetIncrease).when(commits.nonEmpty) + result <- cont(Exit.unit) + _ <- diagnostics.emit(DiagnosticEvent.Commit.Success(offsetsWithMetaData)) + } yield result + } val onFailure: Throwable => UIO[Unit] = { case _: RebalanceInProgressException => for { @@ -368,8 +379,9 @@ private[consumer] final class Runloop private ( _ <- ZIO.foreachParDiscard(streams) { streamControl => val tp = streamControl.tp val records = polledRecords.records(tp) - if (records.isEmpty) ZIO.unit - else { + if (records.isEmpty) { + streamControl.offerRecords(Chunk.empty) + } else { val builder = ChunkBuilder.make[Record](records.size()) val iterator = records.iterator() while (iterator.hasNext) { @@ -389,12 +401,12 @@ private[consumer] final class Runloop private ( } private val getConsumerGroupMetadataIfAny: UIO[Option[ConsumerGroupMetadata]] = - if (hasGroupId) consumer.runloopAccess(c => ZIO.attempt(c.groupMetadata())).fold(_ => None, Some(_)) + if (settings.hasGroupId) consumer.runloopAccess(c => ZIO.attempt(c.groupMetadata())).fold(_ => None, Some(_)) else ZIO.none /** @return the topic-partitions for which received records should be ignored */ private def doSeekForNewPartitions(c: ByteArrayKafkaConsumer, tps: Set[TopicPartition]): Task[Set[TopicPartition]] = - offsetRetrieval match { + settings.offsetRetrieval match { case OffsetRetrieval.Auto(_) => ZIO.succeed(Set.empty) case OffsetRetrieval.Manual(getOffsets, _) => if (tps.isEmpty) ZIO.succeed(Set.empty) @@ -411,19 +423,28 @@ private[consumer] final class Runloop private ( */ private def resumeAndPausePartitions( c: ByteArrayKafkaConsumer, - assignment: Set[TopicPartition], requestedPartitions: Set[TopicPartition] - ): Unit = { - val toResume = assignment intersect requestedPartitions - val toPause = assignment -- requestedPartitions + ): Task[(Int, Int)] = ZIO.attempt { + val assignment = c.assignment().asScala.toSet + val toResume = assignment intersect requestedPartitions + val toPause = assignment -- requestedPartitions if (toResume.nonEmpty) c.resume(toResume.asJava) if (toPause.nonEmpty) c.pause(toPause.asJava) + + (toResume.size, toPause.size) } - private def handlePoll(state: State): Task[State] = + private def doPoll(c: ByteArrayKafkaConsumer): Task[ConsumerRecords[Array[Byte], Array[Byte]]] = + ZIO.attempt { + val recordsOrNull = c.poll(settings.pollTimeout) + if (recordsOrNull eq null) ConsumerRecords.empty[Array[Byte], Array[Byte]]() + else recordsOrNull + } + + private def handlePoll(state: State): Task[State] = { for { - partitionsToFetch <- fetchStrategy.selectPartitionsToFetch(state.assignedStreams) + partitionsToFetch <- settings.fetchStrategy.selectPartitionsToFetch(state.assignedStreams) _ <- ZIO.logDebug( s"Starting poll with ${state.pendingRequests.size} pending requests and" + s" ${state.pendingCommits.size} pending commits," + @@ -432,97 +453,103 @@ private[consumer] final class Runloop private ( _ <- currentStateRef.set(state) pollResult <- consumer.runloopAccess { c => - ZIO.suspend { - val prevAssigned = c.assignment().asScala.toSet - - resumeAndPausePartitions(c, prevAssigned, partitionsToFetch) + for { + resumeAndPauseCounts <- resumeAndPausePartitions(c, partitionsToFetch) + (toResumeCount, toPauseCount) = resumeAndPauseCounts - val polledRecords = { - val records = c.poll(pollTimeout) - if (records eq null) ConsumerRecords.empty[Array[Byte], Array[Byte]]() else records - } + pullDurationAndRecords <- doPoll(c).timed + (pollDuration, polledRecords) = pullDurationAndRecords - diagnostics.emit { - val providedTps = polledRecords.partitions().asScala.toSet - val requestedPartitions = state.pendingRequests.map(_.tp).toSet - - DiagnosticEvent.Poll( - tpRequested = requestedPartitions, - tpWithData = providedTps, - tpWithoutData = requestedPartitions -- providedTps - ) - } *> - lastRebalanceEvent.getAndSet(RebalanceEvent.None).flatMap { - case RebalanceEvent(false, _, _, _, _) => - // The fast track, rebalance listener was not invoked: - // no assignment changes, no new commits, only new records. - ZIO.succeed( - PollResult( - records = polledRecords, - ignoreRecordsForTps = Set.empty, - pendingRequests = state.pendingRequests, - assignedStreams = state.assignedStreams - ) - ) + _ <- consumerMetrics.observePoll(toResumeCount, toPauseCount, pollDuration, polledRecords.count()) *> + diagnostics.emit { + val providedTps = polledRecords.partitions().asScala.toSet + val requestedPartitions = state.pendingRequests.map(_.tp).toSet - case RebalanceEvent(true, assignedTps, revokedTps, lostTps, endedStreams) => - // The slow track, the rebalance listener was invoked: - // some partitions were assigned, revoked or lost, - // some streams have ended. - - val currentAssigned = c.assignment().asScala.toSet - val endedTps = endedStreams.map(_.tp).toSet - for { - ignoreRecordsForTps <- doSeekForNewPartitions(c, assignedTps) - - // The topic partitions that need a new stream are: - // 1. Those that are freshly assigned - // 2. Those that are still assigned but were ended in the rebalance listener because - // of `restartStreamsOnRebalancing` being true - startingTps = assignedTps ++ (currentAssigned intersect endedTps) - - startingStreams <- - ZIO.foreach(Chunk.fromIterable(startingTps))(newPartitionStream).tap { newStreams => - ZIO.logDebug(s"Offering partition assignment $startingTps") *> - partitionsHub.publish( - Take.chunk(newStreams.map(_.tpStream)) - ) - } - - updatedAssignedStreams = - state.assignedStreams.filter(s => !endedTps.contains(s.tp)) ++ startingStreams - - // Remove pending requests for all streams that ended: - // 1. streams that were ended because the partition was lost - // 2. streams that were ended because the partition was revoked - // 3. streams that were ended because of `restartStreamsOnRebalancing` being true - updatedPendingRequests = - state.pendingRequests.filter { pendingRequest => - val tp = pendingRequest.tp - !(lostTps.contains(tp) || revokedTps.contains(tp) || endedStreams.exists(_.tp == tp)) - } - - // Remove committed offsets for partitions that are no longer assigned: - // NOTE: the type annotation is needed to keep the IntelliJ compiler happy. - _ <- - committedOffsetsRef.update(_.keepPartitions(updatedAssignedStreams.map(_.tp).toSet)): Task[Unit] - - _ <- diagnostics.emit( - Rebalance( - revoked = revokedTps, - assigned = assignedTps, - lost = lostTps, - ended = endedStreams.map(_.tp).toSet - ) - ) - } yield Runloop.PollResult( - records = polledRecords, - ignoreRecordsForTps = ignoreRecordsForTps, - pendingRequests = updatedPendingRequests, - assignedStreams = updatedAssignedStreams - ) - } - } + DiagnosticEvent.Poll( + tpRequested = requestedPartitions, + tpWithData = providedTps, + tpWithoutData = requestedPartitions -- providedTps + ) + } + pollresult <- lastRebalanceEvent.getAndSet(RebalanceEvent.None).flatMap { + case RebalanceEvent(false, _, _, _, _) => + // The fast track, rebalance listener was not invoked: + // no assignment changes, no new commits, only new records. + ZIO.succeed( + PollResult( + records = polledRecords, + ignoreRecordsForTps = Set.empty, + pendingRequests = state.pendingRequests, + assignedStreams = state.assignedStreams + ) + ) + + case RebalanceEvent(true, assignedTps, revokedTps, lostTps, endedStreams) => + // The slow track, the rebalance listener was invoked: + // some partitions were assigned, revoked or lost, + // some streams have ended. + + val currentAssigned = c.assignment().asScala.toSet + val endedTps = endedStreams.map(_.tp).toSet + for { + ignoreRecordsForTps <- doSeekForNewPartitions(c, assignedTps) + + // The topic partitions that need a new stream are: + // 1. Those that are freshly assigned + // 2. Those that are still assigned but were ended in the rebalance listener because + // of `restartStreamsOnRebalancing` being true + startingTps = assignedTps ++ (currentAssigned intersect endedTps) + + startingStreams <- + ZIO.foreach(Chunk.fromIterable(startingTps))(newPartitionStream).tap { newStreams => + ZIO.logDebug(s"Offering partition assignment $startingTps") *> + partitionsHub.publish( + Take.chunk(newStreams.map(_.tpStream)) + ) + } + + updatedAssignedStreams = + state.assignedStreams.filter(s => !endedTps.contains(s.tp)) ++ startingStreams + + // Remove pending requests for all streams that ended: + // 1. streams that were ended because the partition was lost + // 2. streams that were ended because the partition was revoked + // 3. streams that were ended because of `restartStreamsOnRebalancing` being true + updatedPendingRequests = + state.pendingRequests.filter { pendingRequest => + val tp = pendingRequest.tp + !(lostTps.contains(tp) || revokedTps.contains(tp) || endedStreams + .exists(_.tp == tp)) + } + + // Remove committed offsets for partitions that are no longer assigned: + // NOTE: the type annotation is needed to keep the IntelliJ compiler happy. + _ <- + committedOffsetsRef + .update(_.keepPartitions(updatedAssignedStreams.map(_.tp).toSet)): Task[Unit] + + _ <- consumerMetrics.observeRebalance( + currentAssigned.size, + assignedTps.size, + revokedTps.size, + lostTps.size + ) + _ <- diagnostics.emit( + Rebalance( + revoked = revokedTps, + assigned = assignedTps, + lost = lostTps, + ended = endedStreams.map(_.tp).toSet + ) + ) + } yield Runloop.PollResult( + records = polledRecords, + ignoreRecordsForTps = ignoreRecordsForTps, + pendingRequests = updatedPendingRequests, + assignedStreams = updatedAssignedStreams + ) + } + } yield pollresult } fulfillResult <- offerRecordsToStreams( pollResult.assignedStreams, @@ -537,6 +564,7 @@ private[consumer] final class Runloop private ( pendingCommits = updatedPendingCommits, assignedStreams = pollResult.assignedStreams ) + } /** * Check each stream to see if it exceeded its poll interval. If so, halt it. In addition, if any stream has exceeded @@ -696,11 +724,28 @@ private[consumer] final class Runloop private ( else ZIO.succeed(stateAfterCommands) // Immediately poll again, after processing all new queued commands _ <- if (updatedStateAfterPoll.shouldPoll) commandQueue.offer(RunloopCommand.Poll) else ZIO.unit + // Save the current state for other parts of Runloop (read-only, for metrics only) + _ <- currentStateRef.set(updatedStateAfterPoll) } yield updatedStateAfterPoll } .tapErrorCause(cause => ZIO.logErrorCause("Error in Runloop", cause)) .onError(cause => partitionsHub.offer(Take.failCause(cause))) } + + private def observeRunloopMetrics(runloopMetricsSchedule: Schedule[Any, Unit, Long]): ZIO[Any, Nothing, Unit] = { + val observe = for { + currentState <- currentStateRef.get + commandQueueSize <- commandQueue.size + commitQueueSize <- commitQueue.size + _ <- consumerMetrics + .observeRunloopMetrics(currentState, commandQueueSize, commitQueueSize) + } yield () + + observe + .repeat(runloopMetricsSchedule) + .unit + .interruptible + } } object Runloop { @@ -777,6 +822,7 @@ object Runloop { } private[internal] final case class Commit( + createdAt: NanoTime, offsets: Map[TopicPartition, OffsetAndMetadata], cont: Promise[Throwable, Unit] ) { @@ -785,19 +831,12 @@ object Runloop { } private[consumer] def make( - hasGroupId: Boolean, - consumer: ConsumerAccess, - pollTimeout: Duration, + settings: ConsumerSettings, maxPollInterval: Duration, - commitTimeout: Duration, - diagnostics: Diagnostics, - offsetRetrieval: OffsetRetrieval, - userRebalanceListener: RebalanceListener, - restartStreamsOnRebalancing: Boolean, - rebalanceSafeCommits: Boolean, maxRebalanceDuration: Duration, - partitionsHub: Hub[Take[Throwable, PartitionAssignment]], - fetchStrategy: FetchStrategy + diagnostics: Diagnostics, + consumer: ConsumerAccess, + partitionsHub: Hub[Take[Throwable, PartitionAssignment]] ): URIO[Scope, Runloop] = for { _ <- ZIO.addFinalizer(diagnostics.emit(Finalization.RunloopFinalized)) @@ -809,29 +848,24 @@ object Runloop { committedOffsetsRef <- Ref.make(CommitOffsets.empty) sameThreadRuntime <- ZIO.runtime[Any].provideLayer(SameThreadRuntimeLayer) runloop = new Runloop( + settings = settings, sameThreadRuntime = sameThreadRuntime, - hasGroupId = hasGroupId, consumer = consumer, - pollTimeout = pollTimeout, maxPollInterval = maxPollInterval, - commitTimeout = commitTimeout, commitQueue = commitQueue, commandQueue = commandQueue, lastRebalanceEvent = lastRebalanceEvent, partitionsHub = partitionsHub, diagnostics = diagnostics, - offsetRetrieval = offsetRetrieval, - userRebalanceListener = userRebalanceListener, - restartStreamsOnRebalancing = restartStreamsOnRebalancing, - rebalanceSafeCommits = rebalanceSafeCommits, maxRebalanceDuration = maxRebalanceDuration, currentStateRef = currentStateRef, - committedOffsetsRef = committedOffsetsRef, - fetchStrategy = fetchStrategy + committedOffsetsRef = committedOffsetsRef ) _ <- ZIO.logDebug("Starting Runloop") - // Run the entire loop on the a dedicated thread to avoid executor shifts + _ <- runloop.observeRunloopMetrics(settings.runloopMetricsSchedule).forkScoped + + // Run the entire loop on a dedicated thread to avoid executor shifts executor <- RunloopExecutor.newInstance fiber <- ZIO.onExecutor(executor)(runloop.run(initialState)).forkScoped waitForRunloopStop = fiber.join.orDie @@ -844,7 +878,7 @@ object Runloop { ) } yield runloop - private final case class State( + private[internal] final case class State( pendingRequests: Chunk[RunloopCommand.Request], pendingCommits: Chunk[Runloop.Commit], assignedStreams: Chunk[PartitionStreamControl], @@ -868,17 +902,30 @@ object Runloop { // package private for unit testing private[internal] final case class CommitOffsets(offsets: Map[TopicPartition, Long]) { - def addCommits(c: Chunk[Runloop.Commit]): CommitOffsets = { + + /** Returns an estimate of the total offset increase, and a new `CommitOffsets` with the given offsets added. */ + def addCommits(c: Chunk[Runloop.Commit]): (Long, CommitOffsets) = { val updatedOffsets = mutable.Map.empty[TopicPartition, Long] updatedOffsets.sizeHint(offsets.size) updatedOffsets ++= offsets + var offsetIncrease = 0L c.foreach { commit => commit.offsets.foreach { case (tp, offsetAndMeta) => val offset = offsetAndMeta.offset() - updatedOffsets += tp -> updatedOffsets.get(tp).fold(offset)(max(_, offset)) + val maxOffset = updatedOffsets.get(tp) match { + case Some(existingOffset) => + offsetIncrease += max(0L, (offset - existingOffset)) + max(existingOffset, offset) + case None => + // This partition was not committed to from this consumer yet. Therefore we do not know the offset + // increase. A good estimate would be the poll size for this consumer, another okayish estimate is 0. + // Lets go with the simplest for now: ```offsetIncrease += 0``` + offset + } + updatedOffsets += tp -> maxOffset } } - CommitOffsets(offsets = updatedOffsets.toMap) + (offsetIncrease, CommitOffsets(offsets = updatedOffsets.toMap)) } def keepPartitions(tps: Set[TopicPartition]): CommitOffsets = diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala index 3e1d0eb8d..e4c385771 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala @@ -89,19 +89,12 @@ private[consumer] object RunloopAccess { runloopStateRef <- Ref.Synchronized.make[RunloopState](RunloopState.NotStarted) makeRunloop = Runloop .make( - hasGroupId = settings.hasGroupId, - consumer = consumerAccess, - pollTimeout = settings.pollTimeout, + settings = settings, maxPollInterval = maxPollInterval, - commitTimeout = settings.commitTimeout, - diagnostics = diagnostics, - offsetRetrieval = settings.offsetRetrieval, - userRebalanceListener = settings.rebalanceListener, - restartStreamsOnRebalancing = settings.restartStreamOnRebalancing, - rebalanceSafeCommits = settings.rebalanceSafeCommits, maxRebalanceDuration = maxRebalanceDuration, - partitionsHub = partitionsHub, - fetchStrategy = settings.fetchStrategy + diagnostics = diagnostics, + consumer = consumerAccess, + partitionsHub = partitionsHub ) .withFinalizer(_ => runloopStateRef.set(RunloopState.Finalized)) .provide(ZLayer.succeed(consumerScope)) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/package.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/package.scala index b18fe95e1..e26b477a4 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/package.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/package.scala @@ -5,6 +5,8 @@ import zio.internal.ExecutionMetrics package object internal { + private[internal] type NanoTime = Long + /** * A runtime layer that can be used to run everything on the thread of the caller. *