Skip to content

Commit

Permalink
Separate logic for Kafka and Yammer metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Mickael Maison <[email protected]>
  • Loading branch information
mimaison committed Oct 4, 2024
1 parent 3beff0f commit cba272f
Show file tree
Hide file tree
Showing 24 changed files with 918 additions and 519 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.prometheus.metrics.model.registry.PrometheusRegistry;
import io.prometheus.metrics.model.snapshots.PrometheusNaming;
import io.strimzi.kafka.metrics.http.HttpServers;
import io.strimzi.kafka.metrics.kafka.KafkaCollector;
import io.strimzi.kafka.metrics.kafka.KafkaMetricWrapper;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsContext;
Expand All @@ -29,7 +32,7 @@ public class KafkaPrometheusMetricsReporter implements MetricsReporter {
private static final Logger LOG = LoggerFactory.getLogger(KafkaPrometheusMetricsReporter.class);

private final PrometheusRegistry registry;
private final PrometheusCollector collector;
private final KafkaCollector kafkaCollector;
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the configure method
private PrometheusMetricsReporterConfig config;
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the configure method
Expand All @@ -42,13 +45,13 @@ public class KafkaPrometheusMetricsReporter implements MetricsReporter {
*/
public KafkaPrometheusMetricsReporter() {
registry = PrometheusRegistry.defaultRegistry;
collector = PrometheusCollector.register(registry);
kafkaCollector = KafkaCollector.getCollector(PrometheusCollector.register(registry));
}

// for testing
KafkaPrometheusMetricsReporter(PrometheusRegistry registry, PrometheusCollector collector) {
KafkaPrometheusMetricsReporter(PrometheusRegistry registry, KafkaCollector kafkaCollector) {
this.registry = registry;
this.collector = collector;
this.kafkaCollector = kafkaCollector;
}

@Override
Expand All @@ -66,18 +69,18 @@ public void init(List<KafkaMetric> metrics) {
}

public void metricChange(KafkaMetric metric) {
String prometheusName = MetricWrapper.prometheusName(prefix, metric.metricName());
String prometheusName = KafkaMetricWrapper.prometheusName(prefix, metric.metricName());
if (!config.isAllowed(prometheusName)) {
LOG.trace("Ignoring metric {} as it does not match the allowlist", prometheusName);
} else {
MetricWrapper metricWrapper = new MetricWrapper(prometheusName, metric, metric.metricName().name());
collector.addKafkaMetric(metric.metricName(), metricWrapper);
MetricWrapper metricWrapper = new KafkaMetricWrapper(prometheusName, metric, metric.metricName().name());
kafkaCollector.addMetric(metric.metricName(), metricWrapper);
}
}

@Override
public void metricRemoval(KafkaMetric metric) {
collector.removeKafkaMetric(metric.metricName());
kafkaCollector.removeMetric(metric.metricName());
}

@Override
Expand Down
97 changes: 3 additions & 94 deletions src/main/java/io/strimzi/kafka/metrics/MetricWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,54 +4,21 @@
*/
package io.strimzi.kafka.metrics;

import com.yammer.metrics.core.Metric;
import com.yammer.metrics.core.MetricName;
import io.prometheus.metrics.model.snapshots.Labels;
import io.prometheus.metrics.model.snapshots.PrometheusNaming;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Set;

/**
* Wrapper for both Kafka and Yammer metrics to unify logic in the Collectors
*/
public class MetricWrapper {

private static final Logger LOG = LoggerFactory.getLogger(MetricWrapper.class);
public abstract class MetricWrapper {

private final String prometheusName;
private final Labels labels;
private final Object metric;
private final String attribute;

/**
* Constructor from Kafka Metrics
* @param prometheusName The name of the metric in the prometheus format
* @param metric The Kafka metric
* @param attribute The attribute of the Kafka metric
*/
public MetricWrapper(String prometheusName, KafkaMetric metric, String attribute) {
this.prometheusName = prometheusName;
this.labels = labelsFromTags(metric.metricName().tags(), prometheusName);
this.metric = metric;
this.attribute = attribute;
}

/**
* Constructor from Yammer Metrics
* @param prometheusName The name of the metric in the prometheus format
* @param scope The scope of the Yammer metric
* @param metric The Yammer metric
* @param attribute The attribute of the Yammer metric
*/
public MetricWrapper(String prometheusName, String scope, Metric metric, String attribute) {
protected MetricWrapper(String prometheusName, Labels labels, Object metric, String attribute) {
this.prometheusName = prometheusName;
this.labels = labelsFromScope(scope, prometheusName);
this.labels = labels;
this.metric = metric;
this.attribute = attribute;
}
Expand Down Expand Up @@ -88,62 +55,4 @@ public String attribute() {
return attribute;
}

static Labels labelsFromScope(String scope, String metricName) {
Labels.Builder builder = Labels.builder();
Set<String> labelNames = new HashSet<>();
if (scope != null) {
String[] parts = scope.split("\\.");
if (parts.length % 2 == 0) {
for (int i = 0; i < parts.length; i += 2) {
String newLabelName = PrometheusNaming.sanitizeLabelName(parts[i]);
if (labelNames.add(newLabelName)) {
builder.label(newLabelName, parts[i + 1]);
} else {
LOG.warn("Ignoring duplicate label key: {} with value: {} from metric: {} ", newLabelName, parts[i + 1], metricName);
}
}
}
}
return builder.build();
}

static Labels labelsFromTags(Map<String, String> tags, String metricName) {
Labels.Builder builder = Labels.builder();
Set<String> labelNames = new HashSet<>();
for (Map.Entry<String, String> label : tags.entrySet()) {
String newLabelName = PrometheusNaming.sanitizeLabelName(label.getKey());
if (labelNames.add(newLabelName)) {
builder.label(newLabelName, label.getValue());
} else {
LOG.warn("Ignoring duplicate label key: {} with value: {} from metric: {} ", newLabelName, label.getValue(), metricName);
}
}
return builder.build();
}

/**
* Compute the Prometheus name from a Yammer MetricName
* @param metricName The Yammer metric name
* @return The prometheus metric name
*/
public static String prometheusName(MetricName metricName) {
return PrometheusNaming.prometheusName(
PrometheusNaming.sanitizeMetricName(
"kafka_server_" +
metricName.getGroup() + '_' +
metricName.getType() + '_' +
metricName.getName()).toLowerCase(Locale.ROOT));
}

/**
* Compute the Prometheus name from a Kafka MetricName
* @param prefix The prefix to add to the metric name
* @param metricName The Kafka metric name
* @return The prometheus metric name
*/
public static String prometheusName(String prefix, org.apache.kafka.common.MetricName metricName) {
return PrometheusNaming.prometheusName(
PrometheusNaming.sanitizeMetricName(
prefix + '_' + metricName.group() + '_' + metricName.name()).toLowerCase(Locale.ROOT));
}
}
21 changes: 21 additions & 0 deletions src/main/java/io/strimzi/kafka/metrics/MetricsCollector.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.kafka.metrics;

import io.prometheus.metrics.model.snapshots.MetricSnapshot;

import java.util.List;

/**
* Interface for both Kafka and Yammer collectors
*/
public interface MetricsCollector {

/**
* Collect all the metrics added to this Collector
* @return the list of metrics of this collector
*/
List<MetricSnapshot> collect();
}
Loading

0 comments on commit cba272f

Please sign in to comment.