Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collect consumer metrics #1143

Merged
merged 28 commits into from
Jan 21, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
cb595dc
Collect consumer metrics
erikvanoosten Dec 24, 2023
8ea6721
Improvements:
erikvanoosten Dec 25, 2023
0b220dc
Update metrics names to zio naming conventions
erikvanoosten Dec 29, 2023
b1803d2
Use group instance id if possible
erikvanoosten Dec 30, 2023
fa3d780
Give user full control of metric labels
erikvanoosten Dec 31, 2023
3ad1fda
Add hint about using group id as label
erikvanoosten Dec 31, 2023
15a0326
Add metric for the number of polls records are idling in the queue
erikvanoosten Dec 31, 2023
12ac387
Change metric In type to Int iso Double
erikvanoosten Dec 31, 2023
f9d9abb
Collect rebalance metrics
erikvanoosten Jan 1, 2024
9fbaab1
Collect poll metrics
erikvanoosten Jan 1, 2024
a3e0bad
Collect commit metrics (WIP)
erikvanoosten Jan 1, 2024
d02480b
Collect partition stream metrics on a fixed schedule
erikvanoosten Jan 1, 2024
3bdfe0e
Complete commit metrics
erikvanoosten Jan 6, 2024
392033a
Add resume/pause metrics
erikvanoosten Jan 6, 2024
bd7a439
Add command/commit queue metrics
erikvanoosten Jan 6, 2024
aceb325
Add command/commit queue metrics
erikvanoosten Jan 6, 2024
749b642
Fix tests, simplify runloop metrics
erikvanoosten Jan 6, 2024
af097d3
Minimize code diff
erikvanoosten Jan 6, 2024
ce6d0df
Merge branch 'master' into consumer-metrics
erikvanoosten Jan 14, 2024
f137a70
Address review comments
erikvanoosten Jan 14, 2024
90ff904
Use ZIO's timed, and make `runloopMetricsSchedule` configurable
erikvanoosten Jan 14, 2024
0737fe3
Fix for scala 3
erikvanoosten Jan 14, 2024
3f3e2c7
Prepare for user customization
erikvanoosten Jan 14, 2024
624a8c4
Merge branch 'master' into consumer-metrics
erikvanoosten Jan 18, 2024
4812592
Merge branch 'master' into consumer-metrics
erikvanoosten Jan 20, 2024
8cb0552
Measure commit requests _and_ aggregated commits
erikvanoosten Jan 20, 2024
05810df
Fix typo in comment
erikvanoosten Jan 20, 2024
06e2c3d
Better metric descriptions
erikvanoosten Jan 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ final case class ConsumerSettings(
restartStreamOnRebalancing: Boolean = false,
rebalanceSafeCommits: Boolean = false,
maxRebalanceDuration: Option[Duration] = None,
fetchStrategy: FetchStrategy = QueueSizeBasedFetchStrategy()
fetchStrategy: FetchStrategy = QueueSizeBasedFetchStrategy(),
metricsConsumerId: Option[String] = None
) {

/**
Expand Down Expand Up @@ -278,6 +279,21 @@ final case class ConsumerSettings(
*/
def withFetchStrategy(fetchStrategy: FetchStrategy): ConsumerSettings =
copy(fetchStrategy = fetchStrategy)

/**
* @param consumerId
* The value given to the metrics label `consumer_id` for all metrics. When this value is not set, the consumer
* group id is used, when no group id is set, it defaults to a random value that is generated when the consumer is
* created.
*/
def withMetricsConsumerId(consumerId: String): ConsumerSettings =
copy(metricsConsumerId = Some(consumerId))

def derivedMetricsConsumerId: String =
erikvanoosten marked this conversation as resolved.
Show resolved Hide resolved
metricsConsumerId
.orElse(properties.get(ConsumerConfig.GROUP_ID_CONFIG).map(_.toString))
.getOrElse(Seq.fill(6)(scala.util.Random.nextInt(16).toHexString).mkString)

}

object ConsumerSettings {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package zio.kafka.consumer.internal

import zio.metrics.MetricKeyType.Histogram
import zio.metrics._
import zio.{ Chunk, UIO, ZIO }

final case class ConsumerMetrics(metricsConsumerId: String) {
erikvanoosten marked this conversation as resolved.
Show resolved Hide resolved

// Chunk(0,1,3,8,21,55,149,404,1097,2981)
private val streamCountBoundaries: Histogram.Boundaries =
MetricKeyType.Histogram.Boundaries.fromChunk(Chunk(0.0) ++ Chunk.iterate(1.0, 9)(_ * Math.E).map(Math.ceil))

// Chunk(0,100,272,739,2009,5460,14842,40343,109664,298096)
private val streamSizeBoundaries: Histogram.Boundaries =
svroonland marked this conversation as resolved.
Show resolved Hide resolved
MetricKeyType.Histogram.Boundaries.fromChunk(Chunk(0.0) ++ Chunk.iterate(100.0, 9)(_ * Math.E).map(Math.ceil))

private val pendingRequestsHistogram =
Metric
.histogram(
"ziokafka_consumer_pending_requests",
"The number of partition streams that are awaiting new records.",
streamCountBoundaries
)
.tagged("consumer_id", metricsConsumerId)

private val pendingCommitsHistogram =
Metric
.histogram(
"ziokafka_consumer_pending_commits",
"The number of commits that are awaiting completion.",
streamCountBoundaries
)
.tagged("consumer_id", metricsConsumerId)

private val queueSizeHistogram =
Metric
.histogram(
"ziokafka_consumer_queue_size",
"The number of records queued per partition.",
streamSizeBoundaries
)
.tagged("consumer_id", metricsConsumerId)

private val allQueueSizeHistogram =
Metric
.histogram(
"ziokafka_consumer_all_queue_size",
"The number of records queued in the consumer (all partitions).",
streamSizeBoundaries
)
.tagged("consumer_id", metricsConsumerId)

def observeMetrics(state: Runloop.State): UIO[Unit] =
ZIO
.when(state.subscriptionState.isSubscribed) {
for {
queueSizes <- ZIO.foreach(state.assignedStreams)(_.queueSize.map(_.toDouble))
_ <- ZIO.foreachDiscard(queueSizes)(qs => queueSizeHistogram.update(qs))
_ <- allQueueSizeHistogram.update(queueSizes.sum)
_ <- ZIO.succeed(state.pendingRequests.size.toDouble) @@ pendingRequestsHistogram
_ <- ZIO.succeed(state.pendingCommits.size.toDouble) @@ pendingCommitsHistogram
} yield ()
}
.unit

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.jdk.CollectionConverters._

//noinspection SimplifyWhenInspection,SimplifyUnlessInspection
private[consumer] final class Runloop private (
metricsConsumerId: String,
sameThreadRuntime: Runtime[Any],
hasGroupId: Boolean,
consumer: ConsumerAccess,
Expand All @@ -43,6 +44,8 @@ private[consumer] final class Runloop private (
fetchStrategy: FetchStrategy
) {

private val consumerMetrics = ConsumerMetrics(metricsConsumerId)

private def newPartitionStream(tp: TopicPartition): UIO[PartitionStreamControl] =
PartitionStreamControl.newPartitionStream(tp, commandQueue, diagnostics, maxPollInterval)

Expand Down Expand Up @@ -683,6 +686,7 @@ private[consumer] final class Runloop private (
.takeWhile(_ != RunloopCommand.StopRunloop)
.runFoldChunksDiscardZIO(initialState) { (state, commands) =>
for {
_ <- consumerMetrics.observeMetrics(state).fork
commitCommands <- commitQueue.takeAll
_ <- ZIO.logDebug(
s"Processing ${commitCommands.size} commits," +
Expand Down Expand Up @@ -785,6 +789,7 @@ object Runloop {
}

private[consumer] def make(
metricsConsumerId: String,
hasGroupId: Boolean,
consumer: ConsumerAccess,
pollTimeout: Duration,
Expand All @@ -809,6 +814,7 @@ object Runloop {
committedOffsetsRef <- Ref.make(CommitOffsets.empty)
sameThreadRuntime <- ZIO.runtime[Any].provideLayer(SameThreadRuntimeLayer)
runloop = new Runloop(
metricsConsumerId = metricsConsumerId,
sameThreadRuntime = sameThreadRuntime,
hasGroupId = hasGroupId,
consumer = consumer,
Expand Down Expand Up @@ -844,7 +850,7 @@ object Runloop {
)
} yield runloop

private final case class State(
private[internal] final case class State(
pendingRequests: Chunk[RunloopCommand.Request],
pendingCommits: Chunk[Runloop.Commit],
assignedStreams: Chunk[PartitionStreamControl],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ private[consumer] object RunloopAccess {
runloopStateRef <- Ref.Synchronized.make[RunloopState](RunloopState.NotStarted)
makeRunloop = Runloop
.make(
metricsConsumerId = settings.derivedMetricsConsumerId,
hasGroupId = settings.hasGroupId,
consumer = consumerAccess,
pollTimeout = settings.pollTimeout,
Expand Down