Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add KafkaMetricsRegistry #324

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,53 @@ val records: IO[ConsumerRecords[String, String]] = consumer.use { consumer =>
}
```

## Collecting metrics
Both `Producer` and `Consumer` expose metrics, internally collected by the Java client via `clientMetrics` method.
To simplify collection of metrics from multiple clients inside the same VM you can use `KafkaMetricsRegistry`.
It allows 'registering' functions that obtain metrics from different clients, aggregating them into a single list
of metrics when collected. This allows defining clients in different code units with the only requirement of registering
them in `KafkaMetricsRegistry`. The registered functions will be saved in a `Ref` and invoked every time metrics
are collected.
Examples:
1. Manual registration of each client
```scala
import com.evolutiongaming.skafka.consumer.ConsumerOf
import com.evolutiongaming.skafka.metrics.KafkaMetricsRegistry
import com.evolutiongaming.skafka.producer.ProducerOf

val consumerOf: ConsumerOf[F] = ???
val producerOf: ProducerOf[F] = ???
val kafkaRegistry: KafkaMetricsRegistry[F] = ???

for {
consumer <- consumerOf.apply(consumerConfig)
<- kafkaRegistry.register(consumer.clientMetrics)
producer <- producerOf.apply(producerConfig)
_ <- kafkaRegistry.register(producer.clientMetrics)

metrics <- kafkaRegistry.collectAll
} yield ()
```
2. Wrapping `ConsumerOf` or `ProducerOf` with a syntax extension
```scala
import com.evolutiongaming.skafka.metrics.syntax._

val kafkaRegistry: KafkaMetricsRegistry[F] = ...
val consumerOf: ConsumerOf[F] = ConsumerOf.apply1[F](...).withNativeMetrics(kafkaRegistry)
val producerOf: ProducerOf[F] = ProducerOf.apply1[F](...).withNativeMetrics(kafkaRegistry)

for {
consumer <- consumerOf.apply(consumerConfig)
producer <- producerOf.apply(producerConfig)
metrics <- kafkaRegistry.collectAll
} yield ()
```
#### Metrics duplication
`KafkaMetricsRegistry` deduplicates metrics by default. It can be turned off by using a different factory method
accepting `allowDuplicates` parameter.
When using it in the default mode it's important to use different `client.id` values for different clients inside a
single VM, otherwise only one of them will be picked (order is not guaranteed).

## Setup

```scala
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package com.evolutiongaming.skafka.metrics

import cats.effect.kernel.Sync
import cats.effect.{Ref, Resource}
import cats.syntax.all._
import com.evolutiongaming.skafka.ClientMetric

import java.util.UUID

/** Allows reporting metrics of multiple Kafka clients inside a single VM.
*
* Example:
* {{{
* val consumerOf: ConsumerOf[F] = ???
* val producerOf: ProducerOf[F] = ???
* val kafkaRegistry: KafkaMetricsRegistry[F] = ???
*
* for {
* consumer <- consumerOf.apply(consumerConfig)
* _ <- kafkaRegistry.register(consumer.clientMetrics)
*
* producer <- producerOf.apply(producerConfig)
* _ <- kafkaRegistry.register(producer.clientMetrics)
*
* metrics <- kafkaRegistry.collectAll
* } yield ()
* }}}
*
* To avoid manually registering each client there are syntax extension, wrapping `ProducerOf` and `ConsumerOf`,
* see `com.evolutiongaming.skafka.metrics.syntax`.
*
* Example:
* {{{
* val kafkaRegistry: KafkaMetricsRegistry[F] = ...
* val consumerOf: ConsumerOf[F] = ConsumerOf.apply1[F](...).withNativeMetrics(kafkaRegistry)
* val producerOf: ProducerOf[F] = ProducerOf.apply1[F](...).withNativeMetrics(kafkaRegistry)
*
* for {
* consumer <- consumerOf.apply(consumerConfig).
* producer <- producerOf.apply(producerConfig)
* metrics <- kafkaRegistry.collectAll
* } yield ()
* }}}
*/
trait KafkaMetricsRegistry[F[_]] {

/**
* Register a function to obtain a list of client metrics.
* Normally, you would pass [[com.evolutiongaming.skafka.consumer.Consumer.clientMetrics]] or
* [[com.evolutiongaming.skafka.producer.Producer.clientMetrics]]
*
* @return synthetic ID of registered function
*/
def register(metrics: F[Seq[ClientMetric[F]]]): Resource[F, UUID]

/** Collect metrics from all registered functions */
def collectAll: F[Seq[ClientMetric[F]]]
}

object KafkaMetricsRegistry {
private final class FromRef[F[_]: Sync](ref: Ref[F, Map[UUID, F[Seq[ClientMetric[F]]]]], allowDuplicates: Boolean)
extends KafkaMetricsRegistry[F] {
override def register(metrics: F[Seq[ClientMetric[F]]]): Resource[F, UUID] = {
val acquire: F[UUID] = for {
id <- Sync[F].delay(UUID.randomUUID())
_ <- ref.update(m => m + (id -> metrics))
} yield id

def release(id: UUID): F[Unit] =
ref.update(m => m - id)

Resource.make(acquire)(id => release(id))
}

override def collectAll: F[Seq[ClientMetric[F]]] =
ref.get.flatMap { map: Map[UUID, F[Seq[ClientMetric[F]]]] =>
map.values.toList.sequence.map { metrics =>
val results: List[ClientMetric[F]] = metrics.flatten

if (allowDuplicates) {
results
} else {
results
.groupBy(metric => (metric.name, metric.group, metric.tags))
.map { case (_, values) => values.head }
.toSeq
}
}
}
}

def ref[F[_]: Sync](allowDuplicates: Boolean): F[KafkaMetricsRegistry[F]] = {
Ref.of[F, Map[UUID, F[Seq[ClientMetric[F]]]]](Map.empty).map(ref => new FromRef[F](ref, allowDuplicates))
}

def ref[F[_]: Sync]: F[KafkaMetricsRegistry[F]] = ref[F](allowDuplicates = false)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.evolutiongaming.skafka.metrics

import cats.effect.Resource
import com.evolutiongaming.skafka.FromBytes
import com.evolutiongaming.skafka.consumer.{Consumer, ConsumerConfig, ConsumerOf}

class MeteredConsumerOf[F[_]](consumerOf: ConsumerOf[F], kafkaMetricsRegistry: KafkaMetricsRegistry[F])
extends ConsumerOf[F] {

override def apply[K, V](
config: ConsumerConfig
)(implicit fromBytesK: FromBytes[F, K], fromBytesV: FromBytes[F, V]): Resource[F, Consumer[F, K, V]] = {
for {
consumer <- consumerOf.apply[K, V](config)
_ <- kafkaMetricsRegistry.register(consumer.clientMetrics)
} yield consumer
}
}

object MeteredConsumerOf {
def wrap[F[_]](consumerOf: ConsumerOf[F], kafkaMetricsRegistry: KafkaMetricsRegistry[F]): MeteredConsumerOf[F] =
new MeteredConsumerOf[F](consumerOf, kafkaMetricsRegistry)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.evolutiongaming.skafka.metrics

import cats.effect.Resource
import com.evolutiongaming.skafka.producer.{Producer, ProducerConfig, ProducerOf}

class MeteredProducerOf[F[_]](producerOf: ProducerOf[F], kafkaMetricsRegistry: KafkaMetricsRegistry[F])
extends ProducerOf[F] {
override def apply(config: ProducerConfig): Resource[F, Producer[F]] = {
for {
producer <- producerOf.apply(config)
_ <- kafkaMetricsRegistry.register(producer.clientMetrics)
} yield producer
}
}

object MeteredProducerOf {
def wrap[F[_]](producerOf: ProducerOf[F], kafkaMetricsRegistry: KafkaMetricsRegistry[F]): MeteredProducerOf[F] =
new MeteredProducerOf[F](producerOf, kafkaMetricsRegistry)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.evolutiongaming.skafka.metrics

import com.evolutiongaming.skafka.consumer.ConsumerOf
import com.evolutiongaming.skafka.producer.ProducerOf

object syntax {
implicit final class MeteredProducerOfOps[F[_]](val producerOf: ProducerOf[F]) extends AnyVal {
def withNativeMetrics(registry: KafkaMetricsRegistry[F]): ProducerOf[F] = MeteredProducerOf.wrap(producerOf, registry)
}

implicit final class MeteredConsumerOfOps[F[_]](val consumerOf: ConsumerOf[F]) extends AnyVal {
def withNativeMetrics(registry: KafkaMetricsRegistry[F]): ConsumerOf[F] = MeteredConsumerOf.wrap(consumerOf, registry)
}
}