Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the allowlist reconfigurable #64

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ The metrics reporter has the following configurations:

- `prometheus.metrics.reporter.listener`: The HTTP listener to expose the metrics. It must be in the `http://[host]:[port]` format. This defaults to `http://:8080`.
- `prometheus.metrics.reporter.listener.enable`: Enable the listener to expose the metrics. This defaults to `true`.
- `prometheus.metrics.reporter.allowlist`: A comma separated list of regex patterns to specify the metrics to collect. This defaults to `.*`.
- `prometheus.metrics.reporter.allowlist`: A comma separated list of regex patterns to specify the metrics to collect.
This defaults to `.*`. The patterns are only applied to metric names and cannot be used to filter labels.

## Running

Expand All @@ -43,6 +44,14 @@ kafka.metrics.reporters=io.strimzi.kafka.metrics.YammerPrometheusMetricsReporter
auto.include.jmx.reporter=false
```

On brokers the allowlist is reconfigurable at runtime using the Kafka Admin API.
For example to update the allowlist to only collect `kafka_controller` and `kafka_log` metrics, you can use:
```shell
bin/kafka-configs.sh --bootstrap-server localhost:9092 \
--alter --entity-type brokers --entity-default \
--add-config "prometheus.metrics.reporter.allowlist=[kafka_controller.*,kafka_log.*]"
```

### Kafka Clients

To use the reporter with Kafka producers, consumers or admin clients, add the following to your client configuration:
Expand Down Expand Up @@ -82,6 +91,16 @@ consumer.prometheus.metrics.reporter.listener=http://:8081
consumer.auto.include.jmx.reporter=false
```

The allowlist must be configured without prefixes and should include patterns for the desired metrics for all the clients.
For example to only collect `kafka_consumer` and `kafka_producer` metrics, use:
```properties
metric.reporters=io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter
prometheus.metrics.reporter.allowlist=kafka_consumer.*,kafka_producer.*
admin.metric.reporters=io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter
producer.metric.reporters=io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter
consumer.metric.reporters=io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter
```

## Accessing Metrics

Metrics are exposed on the configured listener on the `GET /metrics` endpoint. For example, by default this is `http://localhost:8080/metrics`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,18 @@
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.prometheus.metrics.model.registry.PrometheusRegistry;
import io.prometheus.metrics.model.snapshots.PrometheusNaming;
import io.strimzi.kafka.metrics.collector.MetricWrapper;
import io.strimzi.kafka.metrics.collector.PrometheusCollector;
import io.strimzi.kafka.metrics.collector.kafka.KafkaCollector;
import io.strimzi.kafka.metrics.collector.kafka.KafkaMetricWrapper;
import io.strimzi.kafka.metrics.http.HttpServers;
import io.strimzi.kafka.metrics.kafka.KafkaCollector;
import io.strimzi.kafka.metrics.kafka.KafkaMetricWrapper;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -33,8 +34,7 @@ public class KafkaPrometheusMetricsReporter implements MetricsReporter {

private final PrometheusRegistry registry;
private final KafkaCollector kafkaCollector;
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the configure method
private PrometheusMetricsReporterConfig config;
private final PrometheusCollector prometheusCollector;
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the configure method
private Optional<HttpServers.ServerCounter> httpServer;
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the contextChange method
Expand All @@ -45,18 +45,21 @@ public class KafkaPrometheusMetricsReporter implements MetricsReporter {
*/
public KafkaPrometheusMetricsReporter() {
registry = PrometheusRegistry.defaultRegistry;
kafkaCollector = KafkaCollector.getCollector(PrometheusCollector.register(registry));
prometheusCollector = PrometheusCollector.register(registry);
kafkaCollector = KafkaCollector.getCollector(prometheusCollector);
}

// for testing
KafkaPrometheusMetricsReporter(PrometheusRegistry registry, KafkaCollector kafkaCollector) {
KafkaPrometheusMetricsReporter(PrometheusRegistry registry, KafkaCollector kafkaCollector, PrometheusCollector prometheusCollector) {
this.registry = registry;
this.kafkaCollector = kafkaCollector;
this.prometheusCollector = prometheusCollector;
}

@Override
public void configure(Map<String, ?> map) {
config = new PrometheusMetricsReporterConfig(map, registry);
PrometheusMetricsReporterConfig config = new PrometheusMetricsReporterConfig(map, registry);
prometheusCollector.updateAllowlist(config.allowlist());
httpServer = config.startHttpServer();
LOG.debug("KafkaPrometheusMetricsReporter configured with {}", config);
}
Expand All @@ -70,12 +73,8 @@ public void init(List<KafkaMetric> metrics) {

public void metricChange(KafkaMetric metric) {
String prometheusName = KafkaMetricWrapper.prometheusName(prefix, metric.metricName());
if (!config.isAllowed(prometheusName)) {
LOG.trace("Ignoring metric {} as it does not match the allowlist", prometheusName);
} else {
MetricWrapper metricWrapper = new KafkaMetricWrapper(prometheusName, metric, metric.metricName().name());
kafkaCollector.addMetric(metric.metricName(), metricWrapper);
}
MetricWrapper metricWrapper = new KafkaMetricWrapper(prometheusName, metric, metric.metricName().name());
kafkaCollector.addMetric(metric.metricName(), metricWrapper);
}

@Override
Expand All @@ -90,15 +89,19 @@ public void close() {

@Override
public void reconfigure(Map<String, ?> configs) {
PrometheusMetricsReporterConfig newConfig = new PrometheusMetricsReporterConfig(configs, null);
prometheusCollector.updateAllowlist(newConfig.allowlist());
LOG.debug("KafkaPrometheusMetricsReporter reconfigured with {}", newConfig);
}

@Override
public void validateReconfiguration(Map<String, ?> configs) throws ConfigException {
new PrometheusMetricsReporterConfig(configs, null);
}

@Override
public Set<String> reconfigurableConfigs() {
return Collections.emptySet();
return PrometheusMetricsReporterConfig.RECONFIGURABLES;
}

@Override
Expand Down
22 changes: 0 additions & 22 deletions src/main/java/io/strimzi/kafka/metrics/MetricsCollector.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -67,6 +69,11 @@ public class PrometheusMetricsReporterConfig extends AbstractConfig {
.define(ALLOWLIST_CONFIG, ConfigDef.Type.LIST, ALLOWLIST_CONFIG_DEFAULT, ConfigDef.Importance.HIGH, ALLOWLIST_CONFIG_DOC)
.define(LISTENER_ENABLE_CONFIG, ConfigDef.Type.BOOLEAN, LISTENER_ENABLE_CONFIG_DEFAULT, ConfigDef.Importance.HIGH, LISTENER_ENABLE_CONFIG_DOC);

/**
* The list of configurations that are reconfigurable at runtime.
*/
public static final Set<String> RECONFIGURABLES = Collections.singleton(ALLOWLIST_CONFIG);

private final Listener listener;
private final boolean listenerEnabled;
private final Pattern allowlist;
Expand All @@ -87,16 +94,11 @@ public PrometheusMetricsReporterConfig(Map<?, ?> props, PrometheusRegistry regis
}

/**
* Check if a metric is allowed.
*
* @param name the name of the metric.
* @return true if the metric is allowed, false otherwise.
* Compile the allowlist into a {@link Pattern}.
* @param allowlist the list of regex patterns
* @return the Pattern for the allowlist.
*/
public boolean isAllowed(String name) {
return allowlist.matcher(name).matches();
}

private Pattern compileAllowlist(List<String> allowlist) {
public static Pattern compileAllowlist(List<String> allowlist) {
for (String entry : allowlist) {
try {
Pattern.compile(entry);
Expand All @@ -109,8 +111,15 @@ private Pattern compileAllowlist(List<String> allowlist) {
}

/**
* Gets the listener.
*
* Get the allowlist.
* @return the allowlist.
*/
public Pattern allowlist() {
return allowlist;
}

/**
* Get the listener.
* @return the listener.
*/
public String listener() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
import com.yammer.metrics.core.MetricsRegistryListener;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.prometheus.metrics.model.registry.PrometheusRegistry;
import io.strimzi.kafka.metrics.yammer.YammerCollector;
import io.strimzi.kafka.metrics.yammer.YammerMetricWrapper;
import io.strimzi.kafka.metrics.collector.MetricWrapper;
import io.strimzi.kafka.metrics.collector.PrometheusCollector;
import io.strimzi.kafka.metrics.collector.yammer.YammerCollector;
import io.strimzi.kafka.metrics.collector.yammer.YammerMetricWrapper;
import kafka.metrics.KafkaMetricsReporter;
import kafka.utils.VerifiableProperties;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
Expand All @@ -30,6 +32,7 @@ public class YammerPrometheusMetricsReporter implements KafkaMetricsReporter, Me

private final PrometheusRegistry registry;
private final YammerCollector yammerCollector;
private final PrometheusCollector prometheusCollector;
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the init method
/* test */ PrometheusMetricsReporterConfig config;

Expand All @@ -38,13 +41,15 @@ public class YammerPrometheusMetricsReporter implements KafkaMetricsReporter, Me
*/
public YammerPrometheusMetricsReporter() {
registry = PrometheusRegistry.defaultRegistry;
yammerCollector = YammerCollector.getCollector(PrometheusCollector.register(registry));
prometheusCollector = PrometheusCollector.register(registry);
yammerCollector = YammerCollector.getCollector(prometheusCollector);
}

// for testing
YammerPrometheusMetricsReporter(PrometheusRegistry registry, PrometheusCollector prometheusCollector) {
this.registry = registry;
yammerCollector = YammerCollector.getCollector(prometheusCollector);
this.prometheusCollector = prometheusCollector;
}

@Override
Expand All @@ -53,18 +58,15 @@ public void init(VerifiableProperties props) {
for (MetricsRegistry yammerRegistry : Arrays.asList(KafkaYammerMetrics.defaultRegistry(), Metrics.defaultRegistry())) {
yammerRegistry.addListener(this);
}
yammerCollector.updateAllowlist(config.allowlist());
LOG.debug("YammerPrometheusMetricsReporter configured with {}", config);
}

@Override
public void onMetricAdded(MetricName name, Metric metric) {
String prometheusName = YammerMetricWrapper.prometheusName(name);
if (!config.isAllowed(prometheusName)) {
LOG.trace("Ignoring metric {} as it does not match the allowlist", prometheusName);
} else {
MetricWrapper metricWrapper = new YammerMetricWrapper(prometheusName, name.getScope(), metric, name.getName());
yammerCollector.addMetric(name, metricWrapper);
}
MetricWrapper metricWrapper = new YammerMetricWrapper(prometheusName, name.getScope(), metric, name.getName());
yammerCollector.addMetric(name, metricWrapper);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.kafka.metrics;
package io.strimzi.kafka.metrics.collector;

import io.prometheus.metrics.model.snapshots.CounterSnapshot;
import io.prometheus.metrics.model.snapshots.GaugeSnapshot;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.kafka.metrics;
package io.strimzi.kafka.metrics.collector;

import io.prometheus.metrics.model.snapshots.Labels;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.kafka.metrics.collector;

import io.prometheus.metrics.model.snapshots.MetricSnapshot;
import io.strimzi.kafka.metrics.PrometheusMetricsReporterConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;

/**
* Abstract class for both Kafka and Yammer collectors
*/
public abstract class MetricsCollector {

private static final Logger LOG = LoggerFactory.getLogger(MetricsCollector.class);

private final Map<Object, MetricWrapper> allowedMetrics = new ConcurrentHashMap<>();
private final Map<Object, MetricWrapper> otherMetrics = new ConcurrentHashMap<>();
/* test */ Pattern allowlist = Pattern.compile(PrometheusMetricsReporterConfig.ALLOWLIST_CONFIG_DEFAULT);

/**
* Collect all the metrics added to this Collector
*
* @return the list of metrics of this collector
*/
public abstract List<MetricSnapshot<?>> collect();

/**
* Update the allowlist used by the Collector to filter metrics
* @param allowlist the new allowlist Pattern
*/
public void updateAllowlist(Pattern allowlist) {
this.allowlist = allowlist;
update();
}

/**
* Add a metric to be collected.
* @param name The name of the metric to add.
* @param metric The metric to add.
*/
public void addMetric(Object name, MetricWrapper metric) {
if (allowlist.matcher(metric.prometheusName()).matches()) {
allowedMetrics.put(name, metric);
} else {
LOG.trace("Ignoring metric {} as it does not match the allowlist", metric.prometheusName());
otherMetrics.put(name, metric);
}
}

/**
* Remove a metric from collection.
* @param name The name of metric to remove.
*/
public void removeMetric(Object name) {
allowedMetrics.remove(name);
otherMetrics.remove(name);
}

/**
* Retrieve the allowed metrics
* @return the collection of allowed MetricWrapper
*/
public Collection<MetricWrapper> allowedMetrics() {
return allowedMetrics.values();
}

private void update() {
Map<Object, MetricWrapper> newAllowedMetrics = new HashMap<>();
for (Map.Entry<Object, MetricWrapper> entry : otherMetrics.entrySet()) {
String name = entry.getValue().prometheusName();
if (allowlist.matcher(name).matches()) {
newAllowedMetrics.put(entry.getKey(), entry.getValue());
otherMetrics.remove(entry.getKey());
}
}
for (Map.Entry<Object, MetricWrapper> entry : allowedMetrics.entrySet()) {
String name = entry.getValue().prometheusName();
if (!allowlist.matcher(name).matches()) {
otherMetrics.put(entry.getKey(), entry.getValue());
allowedMetrics.remove(entry.getKey());
}
}
allowedMetrics.putAll(newAllowedMetrics);
}
}
Loading