Skip to content

Commit

Permalink
Topics system tests (kroxylicious#834)
Browse files Browse the repository at this point in the history
* First try using topics with admin test client

* added kafka topic creation with admin test client using fabric exec

* remove unused code

* fix KafkaSteps

* added changes suggested by Keith

* reduce code smells

* replaced long timeout milliseconds by duration timeout

* using two different byteArrays for output and error

* Replaced kubernetes exec by run job

* increase timeout topic creation

* remove useless dependency from system tests pom file

* increase strimzi version, read kafka version from parent pom file

* added Keith comments
  • Loading branch information
franvila authored Dec 22, 2023
1 parent d8aa2ab commit dec3575
Show file tree
Hide file tree
Showing 13 changed files with 170 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

import java.time.Duration;

import static io.kroxylicious.systemtests.Environment.KAFKA_VERSION_DEFAULT;

/**
* The interface Constants.
*/
Expand Down Expand Up @@ -125,7 +123,11 @@ public interface Constants {
/**
* Strimzi kafka image url in quay
*/
String STRIMZI_KAFKA_IMAGE = "quay.io/strimzi/kafka:latest-kafka-" + KAFKA_VERSION_DEFAULT;
String STRIMZI_KAFKA_IMAGE = "quay.io/strimzi/kafka:latest-kafka-" + Environment.KAFKA_VERSION;
/**
* Test clients image url
*/
String TEST_CLIENTS_IMAGE = "quay.io/strimzi-test-clients/test-clients:latest-kafka-" + Environment.KAFKA_VERSION;

/**
* The cert manager url to install it on kubernetes
Expand All @@ -139,4 +141,8 @@ public interface Constants {
* kafka producer client label to identify the producer test client
*/
String KAFKA_PRODUCER_CLIENT_LABEL = "kafka-producer-client";
/**
* kafka admin client label to identify the admin test client
*/
String KAFKA_ADMIN_CLIENT_LABEL = "admin-client-cli";
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ private Environment() {
/**
* The kafka version default value
*/
public static final String KAFKA_VERSION_DEFAULT = "3.6.0";
public static final String KAFKA_VERSION_DEFAULT;

static {
KAFKA_VERSION_DEFAULT = determineKafkaVersion();
}

/**
* The kroxy version default value
Expand All @@ -43,27 +47,6 @@ private Environment() {
KROXY_VERSION_DEFAULT = determineKroxyliciousVersion();
}

private static String determineKroxyliciousVersion() {
var p = new Properties();
var metadataProps = "/metadata.properties";
try (var stream = Environment.class.getResourceAsStream(metadataProps)) {
Objects.requireNonNull(stream, metadataProps + " is not present on the classpath");
p.load(stream);
var version = p.getProperty("kroxylicious.version");
if (version == null) {
throw new IllegalStateException("kroxylicious version key absent in " + metadataProps);
}
else if (version.startsWith("$")) {
throw new IllegalStateException(
"likely unexpanded property reference found in '" + version + "', check Maven filtering configuration of resource " + metadataProps);
}
return version;
}
catch (IOException e) {
throw new UncheckedIOException("error while streaming " + metadataProps, e);
}
}

/**
* The url where kroxylicious image lives to be downloaded.
*/
Expand Down Expand Up @@ -107,4 +90,33 @@ private static String getOrDefault(String varName, String defaultValue) {
private static <T> T getOrDefault(String varName, Function<String, T> converter, T defaultValue) {
return System.getenv(varName) != null ? converter.apply(System.getenv(varName)) : defaultValue;
}

private static String readMetadataProperty(String property) {
var p = new Properties();
var metadataProps = "/metadata.properties";
try (var stream = Environment.class.getResourceAsStream(metadataProps)) {
Objects.requireNonNull(stream, metadataProps + " is not present on the classpath");
p.load(stream);
var version = p.getProperty(property);
if (version == null) {
throw new IllegalStateException(property + " key absent in " + metadataProps);
}
else if (version.startsWith("$")) {
throw new IllegalStateException(
"likely unexpanded property reference found in '" + version + "', check Maven filtering configuration of resource " + metadataProps);
}
return version;
}
catch (IOException e) {
throw new UncheckedIOException("error while streaming " + metadataProps, e);
}
}

private static String determineKroxyliciousVersion() {
return readMetadataProperty("kroxylicious.version");
}

private static String determineKafkaVersion() {
return readMetadataProperty("kafka.version");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
package io.kroxylicious.systemtests.k8s;

import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

import io.fabric8.kubernetes.api.model.DeletionPropagation;
import io.fabric8.kubernetes.api.model.Namespace;
import io.fabric8.kubernetes.api.model.NamespaceBuilder;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodStatus;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.client.KubernetesClient;
Expand Down Expand Up @@ -110,6 +112,7 @@ public void deleteNamespace(String name) {
// =========================
// ---------> POD <---------
// =========================

/**
* List pods list.
*
Expand Down Expand Up @@ -189,15 +192,27 @@ public Deployment getDeployment(String namespaceName, String deploymentName) {
}

/**
* Gets deployment status
* Gets if the deployment is ready
* @param namespaceName the namespace name
* @param deploymentName the deployment name
* @return the deployment status
* @return true if the deployment is ready, false otherwise
*/
public boolean isDeploymentReady(String namespaceName, String deploymentName) {
return client.apps().deployments().inNamespace(namespaceName).withName(deploymentName).isReady();
}

/**
* Is the pod run succeeded.
*
* @param namespaceName the namespace name
* @param podName the pod name
* @return true if the job is succeeded. false otherwise
*/
public Boolean isPodRunSucceeded(String namespaceName, String podName) {
return Optional.ofNullable(client.pods().inNamespace(namespaceName).withName(podName).get().getStatus()).map(PodStatus::getPhase)
.map(s -> s.equalsIgnoreCase("succeeded")).orElse(false);
}

/**
* Gets service.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,36 +6,47 @@

package io.kroxylicious.systemtests.steps;

import java.time.Duration;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.kroxylicious.systemtests.Constants;
import io.kroxylicious.systemtests.resources.manager.ResourceManager;
import io.kroxylicious.systemtests.templates.strimzi.KafkaTopicTemplates;
import io.kroxylicious.systemtests.utils.DeploymentUtils;
import io.kroxylicious.systemtests.utils.KafkaUtils;

import static io.kroxylicious.systemtests.k8s.KubeClusterResource.kubeClient;
import static org.hamcrest.MatcherAssert.assertThat;

/**
* The type Kafka steps.
*/
public class KafkaSteps {
private static final ResourceManager resourceManager = ResourceManager.getInstance();
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSteps.class);

private KafkaSteps() {
}

/**
* Create topic.
*
* @param clusterName the cluster name
* @param topicName the topic name
* @param namespace the namespace
* @param bootstrap the bootstrap
* @param partitions the partitions
* @param replicas the replicas
* @param minIsr the min isr
*/
public static void createTopic(String clusterName, String topicName, String namespace,
int partitions, int replicas, int minIsr) {
resourceManager.createResourceWithWait(
KafkaTopicTemplates.defaultTopic(namespace, clusterName, topicName, partitions, replicas, minIsr).build());
public static void createTopic(String deployNamespace, String topicName, String bootstrap, int partitions, int replicas) {
kubeClient().getClient().run().inNamespace(deployNamespace).withNewRunConfig()
.withImage(Constants.TEST_CLIENTS_IMAGE)
.withName(Constants.KAFKA_ADMIN_CLIENT_LABEL)
.withRestartPolicy("Never")
.withCommand("admin-client")
.withArgs("topic", "create", "--bootstrap-server=" + bootstrap, "--topic=" + topicName, "--topic-partitions=" + partitions,
"--topic-rep-factor=" + replicas)
.done();

DeploymentUtils.waitForPodRunSucceeded(deployNamespace, Constants.KAFKA_ADMIN_CLIENT_LABEL, Duration.ofSeconds(30));
LOGGER.debug("Admin client pod log: {}", kubeClient().logsInSpecificNamespace(deployNamespace, Constants.KAFKA_ADMIN_CLIENT_LABEL));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

package io.kroxylicious.systemtests.steps;

import java.time.Duration;

import io.kroxylicious.systemtests.utils.KafkaUtils;

/**
Expand Down Expand Up @@ -34,10 +36,10 @@ public static void produceMessages(String namespace, String topicName, String bo
* @param topicName the topic name
* @param bootstrap the bootstrap
* @param numberOfMessages the number of messages
* @param timeoutMillis the timeout millis
* @param timeout the timeout
* @return the string
*/
public static String consumeMessages(String namespace, String topicName, String bootstrap, int numberOfMessages, long timeoutMillis) {
return KafkaUtils.ConsumeMessageWithTestClients(namespace, topicName, bootstrap, numberOfMessages, timeoutMillis);
public static String consumeMessages(String namespace, String topicName, String bootstrap, int numberOfMessages, Duration timeout) {
return KafkaUtils.consumeMessageWithTestClients(namespace, topicName, bootstrap, numberOfMessages, timeout);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ private static KafkaBuilder defaultKafka(String namespaceName, String clusterNam
.addToConfig("transaction.state.log.replication.factor", Math.min(kafkaReplicas, 3))
.addToConfig("default.replication.factor", Math.min(kafkaReplicas, 3))
.addToConfig("min.insync.replicas", Math.min(Math.max(kafkaReplicas - 1, 1), 2))
.addToConfig("auto.create.topics.enable", false)
.withListeners(new GenericKafkaListenerBuilder()
.withName(Constants.PLAIN_LISTENER_NAME)
.withPort(9092)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
import java.util.Collections;

import org.apache.commons.io.FileUtils;
import org.awaitility.core.ConditionEvaluationListener;
import org.awaitility.core.EvaluatedCondition;
import org.awaitility.core.TimeoutEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -26,6 +29,7 @@
import static io.kroxylicious.systemtests.k8s.KubeClusterResource.cmdKubeClient;
import static io.kroxylicious.systemtests.k8s.KubeClusterResource.kubeClient;
import static org.awaitility.Awaitility.await;
import static org.awaitility.Awaitility.with;

/**
* The type Deployment utils.
Expand Down Expand Up @@ -133,4 +137,28 @@ public static boolean checkLoadBalancerIsWorking(String namespace) {
kubeClient().getClient().apps().deployments().inNamespace(namespace).withName(TEST_LOAD_BALANCER_NAME).delete();
return isWorking;
}

/**
* Wait for run succeeded boolean.
*
* @param namespaceName the namespace name
* @param podName the pod name
* @param timeout the timeout
*/
public static void waitForPodRunSucceeded(String namespaceName, String podName, Duration timeout) {
LOGGER.info("Waiting for pod run: {}/{} to be succeeded", namespaceName, podName);
with().conditionEvaluationListener(new ConditionEvaluationListener<>() {
@Override
public void onTimeout(TimeoutEvent timeoutEvent) {
LOGGER.error("Run failed! Error: {}", kubeClient().logsInSpecificNamespace(namespaceName, podName));
}

@Override
public void conditionEvaluated(EvaluatedCondition condition) {
// unused
}
}).await().atMost(timeout).pollInterval(Duration.ofMillis(200))
.until(() -> kubeClient().getPod(namespaceName, podName) != null
&& kubeClient().isPodRunSucceeded(namespaceName, podName));
}
}
Loading

0 comments on commit dec3575

Please sign in to comment.