Skip to content

Commit

Permalink
Add integration tests for Consumer, Producer and Streams (#67)
Browse files Browse the repository at this point in the history
Signed-off-by: Mickael Maison <[email protected]>
  • Loading branch information
mimaison authored Dec 6, 2024
1 parent b772450 commit 8263db2
Show file tree
Hide file tree
Showing 12 changed files with 445 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import java.util.Map;
import java.util.Optional;

import static io.strimzi.kafka.metrics.MetricsUtils.getMetrics;
import static io.strimzi.kafka.metrics.MetricsUtils.newKafkaMetric;
import static io.strimzi.kafka.metrics.TestUtils.getMetrics;
import static io.strimzi.kafka.metrics.TestUtils.newKafkaMetric;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
import java.util.ArrayList;
import java.util.List;

import static io.strimzi.kafka.metrics.MetricsUtils.assertCounterSnapshot;
import static io.strimzi.kafka.metrics.MetricsUtils.assertGaugeSnapshot;
import static io.strimzi.kafka.metrics.MetricsUtils.assertInfoSnapshot;
import static io.strimzi.kafka.metrics.MetricsUtils.assertSummarySnapshot;
import static io.strimzi.kafka.metrics.TestUtils.assertCounterSnapshot;
import static io.strimzi.kafka.metrics.TestUtils.assertGaugeSnapshot;
import static io.strimzi.kafka.metrics.TestUtils.assertInfoSnapshot;
import static io.strimzi.kafka.metrics.TestUtils.assertSummarySnapshot;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,48 @@
import io.prometheus.metrics.model.snapshots.MetricSnapshot;
import io.prometheus.metrics.model.snapshots.Quantiles;
import io.prometheus.metrics.model.snapshots.SummarySnapshot;
import io.strimzi.kafka.metrics.http.Listener;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.function.ThrowingConsumer;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.MountableFile;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;

/**
* Utility class to create and retrieve metrics
*/
@SuppressWarnings("ClassFanOutComplexity")
public class MetricsUtils {
public class TestUtils {

private static final String VERSION = "1.0.0-SNAPSHOT";
private static final String KAFKA_VERSION = "3.9.0";
private static final String CLIENTS_IMAGE = "quay.io/strimzi-test-clients/test-clients:latest-kafka-" + KAFKA_VERSION;
private static final Duration TIMEOUT = Duration.ofSeconds(10L);

public static final String REPORTER_JARS = "target/metrics-reporter-" + VERSION + "/metrics-reporter-" + VERSION + "/libs/";
public static final String MOUNT_PATH = "/opt/strimzi/metrics-reporter/";
public static final int PORT = Listener.parseListener(PrometheusMetricsReporterConfig.LISTENER_CONFIG_DEFAULT).port;
public static final String KAFKA_NETWORK_ALIAS = "kafka";

/**
* Query the HTTP endpoint and returns the output
Expand Down Expand Up @@ -164,4 +183,44 @@ public static void assertSummarySnapshot(MetricSnapshot<?> snapshot, int expecte
assertEquals(expectedLabels, datapoint.getLabels());
}

/**
* Filter metrics that start with a specified prefix
* @param allMetrics all the metric names
* @param prefix the prefix
* @return the list of metric names that start with the prefix
*/
public static List<String> filterMetrics(List<String> allMetrics, String prefix) {
List<String> metrics = new ArrayList<>();
for (String metric : allMetrics) {
if (metric.startsWith(prefix)) {
metrics.add(metric);
}
}
return metrics;
}

public static void verify(GenericContainer<?> container, String prefix, ThrowingConsumer<List<String>> condition) {
assertTimeoutPreemptively(TIMEOUT, () -> {
while (true) {
try {
List<String> filteredMetrics = filterMetrics(getMetrics(container.getHost(), container.getMappedPort(PORT)), prefix);
condition.accept(filteredMetrics);
return;
} catch (Throwable t) {
assertInstanceOf(AssertionError.class, t);
TimeUnit.MILLISECONDS.sleep(100L);
}
}
});
}

public static GenericContainer<?> clientContainer(Map<String, String> env) {
return new GenericContainer<>(CLIENTS_IMAGE)
.withNetwork(Network.SHARED)
.withExposedPorts(PORT)
.withCopyFileToContainer(MountableFile.forHostPath(REPORTER_JARS), MOUNT_PATH)
.withEnv(env)
.waitingFor(Wait.forHttp("/metrics").forStatusCode(200));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import java.util.Map;
import java.util.Properties;

import static io.strimzi.kafka.metrics.MetricsUtils.getMetrics;
import static io.strimzi.kafka.metrics.TestUtils.getMetrics;
import static org.junit.jupiter.api.Assertions.assertEquals;

public class YammerPrometheusMetricsReporterTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,14 @@
package io.strimzi.kafka.metrics.integration;

import io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter;
import io.strimzi.kafka.metrics.MetricsUtils;
import io.strimzi.kafka.metrics.PrometheusMetricsReporterConfig;
import io.strimzi.kafka.metrics.TestUtils;
import io.strimzi.kafka.metrics.YammerPrometheusMetricsReporter;
import io.strimzi.kafka.metrics.http.Listener;
import io.strimzi.test.container.StrimziKafkaContainer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.utility.MountableFile;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -27,11 +23,6 @@

public class TestBrokerMetricsIT {

private static final String VERSION = "1.0.0-SNAPSHOT";
private static final String REPORTER_JARS = "target/metrics-reporter-" + VERSION + "/metrics-reporter-" + VERSION + "/libs/";
private static final String MOUNT_PATH = "/opt/strimzi/metrics-reporter/";
private static final int PORT = Listener.parseListener(PrometheusMetricsReporterConfig.LISTENER_CONFIG_DEFAULT).port;

private Map<String, String> configs;
private StrimziKafkaContainer broker;

Expand All @@ -44,10 +35,10 @@ public void setUp() {
broker = new StrimziKafkaContainer()
.withNodeId(0)
.withKraft()
.withCopyFileToContainer(MountableFile.forHostPath(REPORTER_JARS), MOUNT_PATH)
.withExposedPorts(9092, PORT)
.withCopyFileToContainer(MountableFile.forHostPath(TestUtils.REPORTER_JARS), TestUtils.MOUNT_PATH)
.withExposedPorts(9092, TestUtils.PORT)
.withKafkaConfigurationMap(configs)
.withEnv(Collections.singletonMap("CLASSPATH", MOUNT_PATH + "*"));
.withEnv(Collections.singletonMap("CLASSPATH", TestUtils.MOUNT_PATH + "*"));
}

@AfterEach
Expand All @@ -56,50 +47,44 @@ public void tearDown() {
}

@Test
public void testMetricsReporter() throws Exception {
public void testBrokerMetrics() throws Exception {
broker.start();
List<String> metrics = MetricsUtils.getMetrics(broker.getHost(), broker.getMappedPort(PORT));
List<String> prefixes = Arrays.asList(
"jvm_",
"kafka_controller_",
"kafka_coordinator_",
"kafka_log_",
"kafka_network_",
"kafka_server_");

List<String> prefixes = List.of(
"jvm_",
"process_",
"kafka_controller_",
"kafka_coordinator_",
"kafka_log_",
"kafka_network_",
"kafka_server_");
List<String> metrics = TestUtils.getMetrics(broker.getHost(), broker.getMappedPort(TestUtils.PORT));
for (String prefix : prefixes) {
assertFalse(filterMetrics(metrics, prefix).isEmpty());
assertFalse(TestUtils.filterMetrics(metrics, prefix).isEmpty());
}
}

@Test
public void testMetricsReporterWithAllowlist() throws Exception {
public void testBrokerMetricsWithAllowlist() throws Exception {
configs.put("prometheus.metrics.reporter.allowlist", "kafka_controller.*,kafka_server.*");
broker.withKafkaConfigurationMap(configs);
broker.start();
List<String> metrics = MetricsUtils.getMetrics(broker.getHost(), broker.getMappedPort(PORT));
List<String> allowedPrefixes = Arrays.asList(
"jvm_",
"kafka_controller_",
"kafka_server_");

List<String> metrics = TestUtils.getMetrics(broker.getHost(), broker.getMappedPort(TestUtils.PORT));
List<String> allowedPrefixes = List.of(
"jvm_",
"process_",
"kafka_controller_",
"kafka_server_");
for (String prefix : allowedPrefixes) {
assertFalse(filterMetrics(metrics, prefix).isEmpty());
assertFalse(TestUtils.filterMetrics(metrics, prefix).isEmpty());
}
List<String> disallowPrefixes = Arrays.asList(
"kafka_coordinator_",
"kafka_log_",
"kafka_network_");
List<String> disallowPrefixes = List.of(
"kafka_coordinator_",
"kafka_log_",
"kafka_network_");
for (String prefix : disallowPrefixes) {
assertTrue(filterMetrics(metrics, prefix).isEmpty());
}
}

private List<String> filterMetrics(List<String> allMetrics, String prefix) {
List<String> metrics = new ArrayList<>();
for (String metric : allMetrics) {
if (metric.startsWith(prefix)) {
metrics.add(metric);
}
assertTrue(TestUtils.filterMetrics(metrics, prefix).isEmpty());
}
return metrics;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.kafka.metrics.integration;

import io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter;
import io.strimzi.kafka.metrics.TestUtils;
import io.strimzi.test.container.StrimziKafkaContainer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.GenericContainer;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class TestConsumerMetricsIT {

private StrimziKafkaContainer broker;
private Map<String, String> env;

@BeforeEach
public void setUp() {
broker = new StrimziKafkaContainer()
.withKraft()
.withNetworkAliases(TestUtils.KAFKA_NETWORK_ALIAS);
broker.start();

env = new HashMap<>();
env.put("CLIENT_TYPE", "KafkaConsumer");
env.put("BOOTSTRAP_SERVERS", TestUtils.KAFKA_NETWORK_ALIAS + ":9091");
env.put("TOPIC", "my-topic");
env.put("GROUP_ID", "my-group");
env.put("ADDITIONAL_CONFIG", "metric.reporters=" + KafkaPrometheusMetricsReporter.class.getName());
env.put("CLASSPATH", TestUtils.MOUNT_PATH + "*");
env.put("MESSAGE_COUNT", "1000");
}

@AfterEach
public void tearDown() {
broker.stop();
}

@Test
public void testConsumerMetrics() {
try (GenericContainer<?> consumer = TestUtils.clientContainer(env)) {
consumer.start();

List<String> prefixes = List.of(
"jvm_",
"process_",
"kafka_consumer_app_info_",
"kafka_consumer_kafka_metrics_",
"kafka_consumer_consumer_metrics_",
"kafka_consumer_consumer_node_metrics_",
"kafka_consumer_consumer_coordinator_metrics_",
"kafka_consumer_consumer_fetch_manager_metrics_");
for (String prefix : prefixes) {
TestUtils.verify(consumer, prefix, metrics -> assertFalse(metrics.isEmpty()));
}
}
}

@Test
public void testConsumerMetricsWithAllowlist() {
env.put("ADDITIONAL_CONFIG",
"metric.reporters=" + KafkaPrometheusMetricsReporter.class.getName() + "\n" +
"prometheus.metrics.reporter.allowlist=kafka_consumer_kafka_metrics_.*,kafka_consumer_consumer_coordinator_metrics_.*");
try (GenericContainer<?> consumer = TestUtils.clientContainer(env)) {
consumer.start();

List<String> allowedPrefixes = List.of(
"jvm_",
"process_",
"kafka_consumer_kafka_metrics_",
"kafka_consumer_consumer_coordinator_metrics_");
for (String prefix : allowedPrefixes) {
TestUtils.verify(consumer, prefix, metrics -> assertFalse(metrics.isEmpty()));
}
List<String> disallowedPrefixes = List.of(
"kafka_consumer_app_info_",
"kafka_consumer_consumer_metrics_",
"kafka_consumer_consumer_node_metrics_",
"kafka_consumer_consumer_fetch_manager_metrics_");
for (String prefix : disallowedPrefixes) {
TestUtils.verify(consumer, prefix, metrics -> assertTrue(metrics.isEmpty()));
}
}
}
}
Loading

0 comments on commit 8263db2

Please sign in to comment.