Skip to content

Commit

Permalink
chore: refactor ClusterLagCollector to be independent of Conf (#21)
Browse files Browse the repository at this point in the history
  • Loading branch information
sverrehu authored Oct 2, 2023
1 parent 8fa94c8 commit 065953b
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 28 deletions.
18 changes: 13 additions & 5 deletions src/main/java/no/statnett/k3alagexporter/ClusterLagCollector.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
Expand All @@ -28,13 +29,20 @@ public final class ClusterLagCollector {
private final String clusterName;
private final RegexStringListFilter topicFilter;
private final RegexStringListFilter consumerGroupFilter;
private final Map<String, Object> consumerConfig;
private final Map<String, Object> adminConfig;
private Admin admin;
private Consumer<?, ?> consumer;

public ClusterLagCollector(final String clusterName) {
public ClusterLagCollector(final String clusterName,
final Collection<String> topicAllowList, final Collection<String> topicDenyList,
final Collection<String> consumerGroupAllowList, final Collection<String> consumerGroupDenyList,
final Map<String, Object> consumerConfig, final Map<String, Object> adminConfig) {
this.clusterName = clusterName;
this.topicFilter = new RegexStringListFilter(Conf.getTopicAllowList(), Conf.getTopicDenyList());
this.consumerGroupFilter = new RegexStringListFilter(Conf.getConsumerGroupAllowList(), Conf.getConsumerGroupDenyList());
this.topicFilter = new RegexStringListFilter(topicAllowList, topicDenyList);
this.consumerGroupFilter = new RegexStringListFilter(consumerGroupAllowList, consumerGroupDenyList);
this.consumerConfig = consumerConfig;
this.adminConfig = adminConfig;
}

public ClusterData collect() {
Expand Down Expand Up @@ -127,14 +135,14 @@ private void findEndOffsetsAndUpdateLag(final Consumer<?, ?> consumer, final Set

private Admin getAdmin() {
if (admin == null) {
admin = AdminClient.create(Conf.getAdminConfigs());
admin = AdminClient.create(adminConfig);
}
return admin;
}

private Consumer<?, ?> getConsumer() {
if (consumer == null) {
consumer = new KafkaConsumer<>(Conf.getConsumerConfigs());
consumer = new KafkaConsumer<>(consumerConfig);
}
return consumer;
}
Expand Down
8 changes: 2 additions & 6 deletions src/main/java/no/statnett/k3alagexporter/Conf.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ public static String getClusterName() {
return getCluster().getString("name");
}

public static Map<String, Object> getConsumerConfigs() {
public static Map<String, Object> getConsumerConfig() {
final Map<String, Object> map = configToMap(getCluster().getConfig("consumer-properties"));
map.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers());
map.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
map.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
return map;
}

public static Map<String, Object> getAdminConfigs() {
public static Map<String, Object> getAdminConfig() {
final Map<String, Object> map = configToMap(getCluster().getConfig("admin-properties"));
map.putIfAbsent(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers());
return map;
Expand Down Expand Up @@ -94,8 +94,4 @@ public static void setFromFile(final String configFile) {
conf = ConfigFactory.parseFile(new File(configFile)).withFallback(DEFAULT_CONFIG).resolve();
}

public static void setFromString(final String config) {
conf = ConfigFactory.parseString(config).withFallback(DEFAULT_CONFIG).resolve();
}

}
5 changes: 4 additions & 1 deletion src/main/java/no/statnett/k3alagexporter/K3aLagExporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ private void doit() {
final long msBetweenCollections = Conf.getPollIntervalMs();
final PrometheusReporter prometheusReporter = new PrometheusReporter();
prometheusReporter.start();
final ClusterLagCollector collector = new ClusterLagCollector(Conf.getClusterName());
final ClusterLagCollector collector = new ClusterLagCollector(Conf.getClusterName(),
Conf.getTopicAllowList(), Conf.getTopicDenyList(),
Conf.getConsumerGroupAllowList(), Conf.getConsumerGroupDenyList(),
Conf.getConsumerConfig(), Conf.getAdminConfig());
for (;;) {
long t = System.currentTimeMillis();
final ClusterData clusterData = collector.collect();
Expand Down
4 changes: 2 additions & 2 deletions src/test/java/no/statnett/k3alagexporter/ConfTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ public void shouldFindClusterName() {

@Test
public void shouldFindConsumerProperties() {
final Map<String, Object> map = Conf.getConsumerConfigs();
final Map<String, Object> map = Conf.getConsumerConfig();
Assert.assertNotNull(map);
Assert.assertEquals("SSL", map.get("security.protocol"));
}

@Test
public void shouldFindAdminProperties() {
final Map<String, Object> map = Conf.getAdminConfigs();
final Map<String, Object> map = Conf.getAdminConfig();
Assert.assertNotNull(map);
Assert.assertEquals("password", map.get("ssl.keystore.password"));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package no.statnett.k3alagexporter.itest;

import no.statnett.k3alagexporter.ClusterLagCollector;
import no.statnett.k3alagexporter.Conf;
import no.statnett.k3alagexporter.itest.services.KafkaCluster;
import no.statnett.k3alagexporter.model.ClusterData;
import no.statnett.k3alagexporter.model.ConsumerGroupData;
Expand All @@ -24,6 +23,7 @@
public final class K3aLagExporterIT {

private static KafkaCluster kafkaCluster;
private static final String CLUSTER_NAME = "the-cluster";
private static final String TOPIC = "the-topic";
private static final String CONSUMER_GROUP_ID = "consumer-group";
private static ClusterLagCollector lagCollector;
Expand All @@ -33,26 +33,16 @@ public final class K3aLagExporterIT {
public static void before() {
kafkaCluster = new KafkaCluster();
kafkaCluster.start();
Conf.setFromString(createConfig(kafkaCluster));
lagCollector = new ClusterLagCollector(Conf.getClusterName());
lagCollector = new ClusterLagCollector(CLUSTER_NAME,
null, null, null, null,
kafkaCluster.getMinimalConsumerConfig(), kafkaCluster.getMinimalAdminConfig());
}

@AfterClass
public static void after() {
kafkaCluster.stop();
}

private static String createConfig(final KafkaCluster kafkaCluster) {
return "k3a-lag-exporter {\n"
+ " clusters = [ {\n"
+ " name = \"the-cluster\"\n"
+ " bootstrap-servers = \"" + kafkaCluster.getBootstrapServers() + "\"\n"
+ " consumer-properties = {}\n"
+ " admin-properties = {}\n"
+ " } ]\n"
+ "}\n";
}

@Test
public void shouldDetectLag() {
try (final Producer<Integer, Integer> producer = kafkaCluster.getProducer()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.testcontainers.containers.KafkaContainer;
Expand Down Expand Up @@ -55,6 +56,17 @@ public Consumer<Integer, Integer> getConsumer(final String consumerGroupId) {
return new KafkaConsumer<>(map);
}

public Map<String, Object> getMinimalAdminConfig() {
return getCommonConfig();
}

public Map<String, Object> getMinimalConsumerConfig() {
final Map<String, Object> map = getCommonConfig();
map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
return map;
}

private Map<String, Object> getCommonConfig() {
final Map<String, Object> map = new HashMap<>();
map.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers());
Expand Down

0 comments on commit 065953b

Please sign in to comment.