Skip to content

Commit

Permalink
chore: add simple integration test (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
sverrehu authored Oct 2, 2023
1 parent 9db3d93 commit 8fa94c8
Show file tree
Hide file tree
Showing 5 changed files with 217 additions and 0 deletions.
21 changes: 21 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@
<version>4.13.2</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.19.0</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand All @@ -68,6 +75,20 @@
<version>3.1.2</version>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>3.1.2</version>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/no/statnett/k3alagexporter/Conf.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,8 @@ public static void setFromFile(final String configFile) {
conf = ConfigFactory.parseFile(new File(configFile)).withFallback(DEFAULT_CONFIG).resolve();
}

public static void setFromString(final String config) {
conf = ConfigFactory.parseString(config).withFallback(DEFAULT_CONFIG).resolve();
}

}
114 changes: 114 additions & 0 deletions src/test/java/no/statnett/k3alagexporter/itest/K3aLagExporterIT.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package no.statnett.k3alagexporter.itest;

import no.statnett.k3alagexporter.ClusterLagCollector;
import no.statnett.k3alagexporter.Conf;
import no.statnett.k3alagexporter.itest.services.KafkaCluster;
import no.statnett.k3alagexporter.model.ClusterData;
import no.statnett.k3alagexporter.model.ConsumerGroupData;
import no.statnett.k3alagexporter.model.TopicPartitionData;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.ExecutionException;

public final class K3aLagExporterIT {

private static KafkaCluster kafkaCluster;
private static final String TOPIC = "the-topic";
private static final String CONSUMER_GROUP_ID = "consumer-group";
private static ClusterLagCollector lagCollector;
private int nextProducedValue = 0;

@BeforeClass
public static void before() {
kafkaCluster = new KafkaCluster();
kafkaCluster.start();
Conf.setFromString(createConfig(kafkaCluster));
lagCollector = new ClusterLagCollector(Conf.getClusterName());
}

@AfterClass
public static void after() {
kafkaCluster.stop();
}

private static String createConfig(final KafkaCluster kafkaCluster) {
return "k3a-lag-exporter {\n"
+ " clusters = [ {\n"
+ " name = \"the-cluster\"\n"
+ " bootstrap-servers = \"" + kafkaCluster.getBootstrapServers() + "\"\n"
+ " consumer-properties = {}\n"
+ " admin-properties = {}\n"
+ " } ]\n"
+ "}\n";
}

@Test
public void shouldDetectLag() {
try (final Producer<Integer, Integer> producer = kafkaCluster.getProducer()) {
try (final Consumer<Integer, Integer> consumer = kafkaCluster.getConsumer(CONSUMER_GROUP_ID)) {
consumer.subscribe(Collections.singleton(TOPIC));
produce(producer);
final int consumedValue = consume(consumer);
Assert.assertEquals(nextProducedValue - 1, consumedValue);
assertLag(0);
produce(producer);
assertLag(1);
produce(producer);
assertLag(2);
produce(producer);
assertLag(3);
consume(consumer);
consume(consumer);
consume(consumer);
assertLag(0);
}
}
}

private void assertLag(final int expected) {
final ClusterData clusterData = lagCollector.collect();
final TopicPartitionData topicPartitionData = clusterData.findTopicPartitionData(new TopicPartition(TOPIC, 0));
Assert.assertNotNull(topicPartitionData);
final ConsumerGroupData consumerGroupData = topicPartitionData.findConsumerGroupData(CONSUMER_GROUP_ID);
Assert.assertNotNull(consumerGroupData);
Assert.assertEquals(expected, consumerGroupData.getLag(), 0.00001);
}

private void produce(final Producer<Integer, Integer> producer) {
final ProducerRecord<Integer, Integer> record = new ProducerRecord<>(TOPIC, 0, nextProducedValue);
try {
producer.send(record, (metadata, exception) -> {
if (exception != null) {
throw (exception instanceof RuntimeException) ? (RuntimeException) exception : new RuntimeException(exception);
}
}).get(); // Make call synchronous, to be able to get exceptions in time.
++nextProducedValue;
} catch (final InterruptedException | ExecutionException e) {
final Throwable cause = e.getCause();
throw (cause instanceof RuntimeException) ? (RuntimeException) cause : new RuntimeException(e);
}
producer.flush();
}

private int consume(final Consumer<Integer, Integer> consumer) {
int lastValue = -1;
final ConsumerRecords<Integer, Integer> records = consumer.poll(Duration.ofMillis(1000));
for (final ConsumerRecord<Integer, Integer> record : records) {
lastValue = record.value();
consumer.commitAsync();
}
return lastValue;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package no.statnett.k3alagexporter.itest.services;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

import java.util.HashMap;
import java.util.Map;

public final class KafkaCluster {

private KafkaContainer container;

public void start() {
final DockerImageName imageName = DockerImageName.parse("confluentinc/cp-kafka:" + Versions.CONFLUENT_VERSION);
container = new KafkaContainer(imageName).withKraft();
container.start();
}

public void stop() {
container.stop();
}

public String getBootstrapServers() {
return String.format("%s:%s", container.getHost(), container.getMappedPort(KafkaContainer.KAFKA_PORT));
}

public Producer<Integer, Integer> getProducer() {
final Map<String, Object> map = getCommonConfig();
map.put(ProducerConfig.ACKS_CONFIG, "all");
map.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "3000");
map.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "3000");
map.put(ProducerConfig.LINGER_MS_CONFIG, "0");
map.put(ProducerConfig.RETRIES_CONFIG, "0");
map.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
map.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
return new KafkaProducer<>(map);
}

public Consumer<Integer, Integer> getConsumer(final String consumerGroupId) {
final Map<String, Object> map = getCommonConfig();
map.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
map.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
map.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
return new KafkaConsumer<>(map);
}

private Map<String, Object> getCommonConfig() {
final Map<String, Object> map = new HashMap<>();
map.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers());
return map;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package no.statnett.k3alagexporter.itest.services;

/**
* Contains versions of container images used for testing. All in one place to
* make it easier to keep them updated.
*/
final class Versions {

static final String CONFLUENT_VERSION = "7.5.0";

private Versions() {
}

}

0 comments on commit 8fa94c8

Please sign in to comment.