Skip to content

Commit

Permalink
Use global Collector to handle multiple reporters in the JVM (#46)
Browse files Browse the repository at this point in the history
Signed-off-by: Mickael Maison <[email protected]>
  • Loading branch information
mimaison authored Sep 30, 2024
1 parent 89c966e commit 9fa996a
Show file tree
Hide file tree
Showing 12 changed files with 475 additions and 545 deletions.
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,23 @@ consumer.metric.reporters=io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporte
consumer.auto.include.jmx.reporter=false
```

When setting configurations for the Prometheus metrics reporter, they also need to be set with the `admin.`, `producer.` and `consumer.`.
For example, to set the `listener` to `http://:8081`:
```properties
metric.reporters=io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter
prometheus.metrics.reporter.listener=http://:8081
auto.include.jmx.reporter=false
admin.metric.reporters=io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter
admin.prometheus.metrics.reporter.listener=http://:8081
admin.auto.include.jmx.reporter=false
producer.metric.reporters=io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter
producer.prometheus.metrics.reporter.listener=http://:8081
producer.auto.include.jmx.reporter=false
consumer.metric.reporters=io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter
consumer.prometheus.metrics.reporter.listener=http://:8081
consumer.auto.include.jmx.reporter=false
```

## Accessing Metrics

Metrics are exposed on the configured listener on the `GET /metrics` endpoint. For example, by default this is `http://localhost:8080/metrics`.
Expand Down
94 changes: 0 additions & 94 deletions src/main/java/io/strimzi/kafka/metrics/KafkaMetricsCollector.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package io.strimzi.kafka.metrics;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.prometheus.metrics.instrumentation.jvm.JvmMetrics;
import io.prometheus.metrics.model.registry.PrometheusRegistry;
import io.prometheus.metrics.model.snapshots.PrometheusNaming;
import org.apache.kafka.common.config.ConfigException;
Expand All @@ -23,15 +22,14 @@

/**
* MetricsReporter implementation that expose Kafka metrics in the Prometheus format.
*
* This can be used by Kafka brokers and clients.
*/
public class KafkaPrometheusMetricsReporter implements MetricsReporter {

private static final Logger LOG = LoggerFactory.getLogger(KafkaPrometheusMetricsReporter.class);

private final PrometheusRegistry registry;
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the configure method
private KafkaMetricsCollector collector;
private final PrometheusCollector collector;
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the configure method
private PrometheusMetricsReporterConfig config;
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the configure method
Expand All @@ -44,26 +42,24 @@ public class KafkaPrometheusMetricsReporter implements MetricsReporter {
*/
public KafkaPrometheusMetricsReporter() {
registry = PrometheusRegistry.defaultRegistry;
collector = PrometheusCollector.register(registry);
}

// for testing
KafkaPrometheusMetricsReporter(PrometheusRegistry registry) {
KafkaPrometheusMetricsReporter(PrometheusRegistry registry, PrometheusCollector collector) {
this.registry = registry;
this.collector = collector;
}

@Override
public void configure(Map<String, ?> map) {
config = new PrometheusMetricsReporterConfig(map, registry);
collector = new KafkaMetricsCollector();
// Add JVM metrics
JvmMetrics.builder().register();
httpServer = config.startHttpServer();
LOG.debug("KafkaPrometheusMetricsReporter configured with {}", config);
}

@Override
public void init(List<KafkaMetric> metrics) {
registry.register(collector);
for (KafkaMetric metric : metrics) {
metricChange(metric);
}
Expand All @@ -75,18 +71,17 @@ public void metricChange(KafkaMetric metric) {
LOG.trace("Ignoring metric {} as it does not match the allowlist", prometheusName);
} else {
MetricWrapper metricWrapper = new MetricWrapper(prometheusName, metric, metric.metricName().name());
collector.addMetric(metric.metricName(), metricWrapper);
collector.addKafkaMetric(metric.metricName(), metricWrapper);
}
}

@Override
public void metricRemoval(KafkaMetric metric) {
collector.removeMetric(metric.metricName());
collector.removeKafkaMetric(metric.metricName());
}

@Override
public void close() {
registry.unregister(collector);
httpServer.ifPresent(HttpServers::release);
}

Expand Down
Loading

0 comments on commit 9fa996a

Please sign in to comment.