Skip to content

Commit

Permalink
Add implementation (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
MSDehghan authored Apr 29, 2021
1 parent 22a9dd5 commit 7ba0903
Show file tree
Hide file tree
Showing 6 changed files with 393 additions and 7 deletions.
26 changes: 19 additions & 7 deletions .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,26 @@ on: [push, pull_request]

jobs:
build:

runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v1
- name: Set up JDK 1.8
uses: actions/setup-java@v1

- name: Setup Java 8
uses: actions/setup-java@v2
with:
java-version: 1.8
- name: Build with Maven
run: mvn test --file pom.xml
distribution: 'adopt'
java-version: 8

- name: Run Tests
run: mvn test

- name: Setup Java 11 # SonarCloud does not support Java 8 anymore!
uses: actions/setup-java@v2
with:
distribution: 'adopt'
java-version: 11

- name: Scan with SonarCloud
run: mvn org.sonarsource.scanner.maven:sonar-maven-plugin:sonar
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
# Kafka Micrometer Binder
[![Tests](https://github.com/sahabpardaz/kafka-micrometer-binder/actions/workflows/maven.yml/badge.svg)](https://github.com/sahabpardaz/kafka-micrometer-binder/actions/workflows/maven.yml)
[![Coverage](https://sonarcloud.io/api/project_badges/measure?project=sahabpardaz_kafka-micrometer-binder&metric=coverage)](https://sonarcloud.io/dashboard?id=sahabpardaz_kafka-micrometer-binder)
[![Quality Gate Status](https://sonarcloud.io/api/project_badges/measure?project=sahabpardaz_kafka-micrometer-binder&metric=alert_status)](https://sonarcloud.io/dashboard?id=sahabpardaz_kafka-micrometer-binder)

An implementation of Kafka `MetricsReporter` which binds Kafka client metrics to Micrometer.

Kafka has a built-in mechanism which allows us to gather its metrics and report them to different sources.
Expand Down
121 changes: 121 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>ir.sahab</groupId>
<artifactId>kafka-micrometer-binder</artifactId>
<version>1.0.0</version>

<properties>
<kafka.version>2.7.0</kafka.version>

<sonar.host.url>https://sonarcloud.io</sonar.host.url>
<sonar.organization>sahabpardaz</sonar.organization>
<sonar.projectKey>sahabpardaz_kafka-micrometer-binder</sonar.projectKey>

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.30</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>

<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
<version>1.6.6</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.14.1</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>2.7.0</version>
<scope>test</scope>
</dependency>
</dependencies>


<build>
<plugins>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>0.8.6</version>
<executions>
<execution>
<goals>
<goal>prepare-agent</goal>
</goals>
</execution>
<execution>
<id>report</id>
<phase>test</phase>
<goals>
<goal>report</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.sonarsource.scanner.maven</groupId>
<artifactId>sonar-maven-plugin</artifactId>
<version>3.8.0.2131</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>3.2.1</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package ir.sahab.micrometer.kafka;

import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Reports kafka consumer and producer metrics to Micrometer.
* To use this class you should set {@link CommonClientConfigs#METRIC_REPORTER_CLASSES_CONFIG} to fully qualified
* name of this class. Also you must set {@link CommonClientConfigs#CLIENT_ID_CONFIG} to a unique value in JVM.
*
* @see org.apache.kafka.common.metrics.JmxReporter
* @see io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics
*/
public class MicrometerKafkaMetricsReporter implements MetricsReporter {
private static final Logger logger = LoggerFactory.getLogger(MicrometerKafkaMetricsReporter.class);

private static final String METRICS_PREFIX = "kafka.";
private static final CompositeMeterRegistry registry = Metrics.globalRegistry;

// Methods of this class are called by different kafka threads so we use lock for thread-safety.
private static final Object LOCK = new Object();

/**
* Kafka provides same metric name with multiple tags for example
* it provides both 'fetch.ms{id="client1"}' and 'fetch.ms{id="client1", partition="p1"}' but prometheus requires
* that a metric name always have same set of tags. Because of this we only keep metrics with higher number of
* tags (which are more precise) and we ignore metric names with lower number of tags.
* This map saves metric names and highest number of tags seen already for each one.
*/
private static final Map<String, Integer> currentMetrics = new HashMap<>();

/**
* By default kafka add clientId tag to all metrics. we save cleintId so we can filter metrics by it later.
* If user don't specify the clientId, Kafka will automatically assign a clientId to it which will make our
* metric tags non-deterministic and absolutely is not intended.
*/
@Override
public void configure(Map<String, ?> configs) {
if (!configs.containsKey(CommonClientConfigs.CLIENT_ID_CONFIG)) {
throw new IllegalArgumentException(CommonClientConfigs.CLIENT_ID_CONFIG + " must be specified");
}
String clientId = (String) configs.get(CommonClientConfigs.CLIENT_ID_CONFIG);
if (clientId.trim().isEmpty()) {
throw new IllegalArgumentException(CommonClientConfigs.CLIENT_ID_CONFIG + " can't be empty");
}
logger.info("Kafka metric exporter for client {} started", clientId);
}

@Override
public void init(List<KafkaMetric> metrics) {
for (KafkaMetric metric : metrics) {
metricChange(metric);
}
}

@Override
public void metricChange(KafkaMetric metric) {
// Ignore deprecated old metrics and non-numeric metrics
if (metric.metricName().description().contains("DEPRECATED") || !(metric.metricValue() instanceof Number)) {
return;
}

synchronized (LOCK) {
addMetric(metric);
}
}

@Override
public void metricRemoval(KafkaMetric metric) {
synchronized (LOCK) {
final String metericName = meterName(metric);
if (metric.metricName().tags().size() == currentMetrics.getOrDefault(metericName, -1)) {
registry.find(metericName).tags(meterTags(metric)).meters().forEach(registry::remove);
logger.trace("Kafka metric {} with tags {} removed from registry",
metericName, metric.metricName().tags());
}
}
}

/**
* Registers given kafka metric to Micrometer.
* If a metric with same name but higher number of tags seen before, given metric will be ignored.
* If given metric has more tags than previously seen metrics with same name, all of metrics with that name
* will be removed from registry and the given one will be registered.
* After some time, our {@link #currentMetrics} will converge to stable values and is not changed afterwards.
* @param metric KafkaMetric to add
*/
private static void addMetric(KafkaMetric metric) {
final String metricName = meterName(metric);
final Map<String, String> metricTags = metric.metricName().tags();
if (currentMetrics.containsKey(metricName)) {
Integer currentNumberOfTags = currentMetrics.get(metricName);
if (metricTags.size() < currentNumberOfTags) {
logger.trace("Kafka metric {} with tags {} ignored.", metricName, metricTags);
return;
}

// Remove previously added metric with less tags.
if (metricTags.size() > currentNumberOfTags) {
registry.find(metricName).meters().forEach(registry::remove);
}
}
bindMeter(metric, metricName, meterTags(metric));
currentMetrics.put(metricName, metricTags.size());
}

/**
* Binds given metric to Micrometer registry.
*/
private static void bindMeter(KafkaMetric metric, String name, Iterable<Tag> tags) {
logger.trace("Kafka metric {} with tags {} bound to registry", name, tags);
registerGauge(metric, name, tags);
}

private static void registerGauge(Metric metric, String name, Iterable<Tag> tags) {
Gauge.builder(name, metric, MicrometerKafkaMetricsReporter::getMetricValue)
.tags(tags)
.description(metric.metricName().description())
.register(registry);
}

private static double getMetricValue(Metric metric) {
return ((Number) metric.metricValue()).doubleValue();
}

private static List<Tag> meterTags(Metric metric) {
return metric.metricName().tags().entrySet().stream()
.map(e -> Tag.of(e.getKey(), e.getValue()))
.collect(Collectors.toList());
}

/**
* Creates a metric name based on given Kafka metric.
* Kafka metric names are hard-coded and distributed in kafka client code-base. This function is tested with
* Kafka 1.1.1 and changing client version may or may not change metric names and their convention.
*/
private static String meterName(Metric metric) {
String name = METRICS_PREFIX + metric.metricName().group() + "." + metric.metricName().name();
return name.replace("-metrics", "").replace("-", ".");
}

@Override
public void close() {
// There is nothing to close
}
}
Loading

0 comments on commit 7ba0903

Please sign in to comment.