Skip to content

Commit

Permalink
[System Tests] Added Consumer Record support for consumers (kroxylici…
Browse files Browse the repository at this point in the history
…ous#1290)

* Added Consumer Record support for consumers

Signed-off-by: Francisco Vila <[email protected]>

* formatted

Signed-off-by: Francisco Vila <[email protected]>

* added kafka clients dependency

Signed-off-by: Francisco Vila <[email protected]>

* removed methods from KafkaClient API

Signed-off-by: Francisco Vila <[email protected]>

* refactor to deserialize only once per message

Signed-off-by: Francisco Vila <[email protected]>

* created consumer records for each kafka client

Signed-off-by: Francisco Vila <[email protected]>

* created consumer records for each kafka client

Signed-off-by: Francisco Vila <[email protected]>

* fix format

Signed-off-by: Francisco Vila <[email protected]>

* fixed Entry for ClientConsumerRecord

Signed-off-by: Francisco Vila <[email protected]>

* improve logging in error

Signed-off-by: Francisco Vila <[email protected]>

* Update kroxylicious-systemtests/src/main/java/io/kroxylicious/systemtests/clients/records/KafConsumerRecord.java

Co-authored-by: Keith Wall <[email protected]>
Signed-off-by: Francisco Vila <[email protected]>

* refactored kafka clients

Signed-off-by: Francisco Vila <[email protected]>

* format

Signed-off-by: Francisco Vila <[email protected]>

* Update kroxylicious-systemtests/src/main/java/io/kroxylicious/systemtests/clients/KafClient.java

Co-authored-by: Keith Wall <[email protected]>
Signed-off-by: Francisco Vila <[email protected]>

* refactored getNumberOfJsonMessages

Signed-off-by: Francisco Vila <[email protected]>

* Refactor BaseConsumerRecord and kafka clients consumerRecords

Signed-off-by: Francisco Vila <[email protected]>

* format

Signed-off-by: Francisco Vila <[email protected]>

* added dependency in pom file

Signed-off-by: Francisco Vila <[email protected]>

* fix null pointer exception

Signed-off-by: Francisco Vila <[email protected]>

* align parameters each in one line

Signed-off-by: Francisco Vila <[email protected]>

* Setting String type for key

Signed-off-by: Francisco Vila <[email protected]>

* inline objectMapper

Signed-off-by: Francisco Vila <[email protected]>

* added JsonIgnoreProperties and refactored BaseConsumerRecord to ConsumerRecord

Signed-off-by: Francisco Vila <[email protected]>

* fixes

Signed-off-by: Francisco Vila <[email protected]>

* remove getTimestamType method

Signed-off-by: Francisco Vila <[email protected]>

* fix format

Signed-off-by: Francisco Vila <[email protected]>

* removed kafka clients from pom file

Signed-off-by: Francisco Vila <[email protected]>

* rename payload to value

Signed-off-by: Francisco Vila <[email protected]>

* add more Keith's suggestions

Signed-off-by: Francisco Vila <[email protected]>

* format

Signed-off-by: Francisco Vila <[email protected]>

* fix issue

Signed-off-by: Francisco Vila <[email protected]>

---------

Signed-off-by: Francisco Vila <[email protected]>
Signed-off-by: Francisco Vila <[email protected]>
Co-authored-by: Keith Wall <[email protected]>
  • Loading branch information
franvila and k-wall authored Jun 17, 2024
1 parent 8cb8894 commit 49a20fa
Show file tree
Hide file tree
Showing 21 changed files with 562 additions and 145 deletions.
4 changes: 2 additions & 2 deletions kroxylicious-systemtests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@
<artifactId>testcontainers</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,38 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;

import org.awaitility.core.ConditionTimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.type.TypeReference;

import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.client.KubernetesClientException;

import io.kroxylicious.systemtests.Constants;
import io.kroxylicious.systemtests.clients.records.ConsumerRecord;
import io.kroxylicious.systemtests.clients.records.KafConsumerRecord;
import io.kroxylicious.systemtests.enums.KafkaClientType;
import io.kroxylicious.systemtests.templates.testclients.TestClientsJobTemplates;
import io.kroxylicious.systemtests.utils.KafkaUtils;
import io.kroxylicious.systemtests.utils.TestUtils;

import edu.umd.cs.findbugs.annotations.Nullable;

import static io.kroxylicious.systemtests.k8s.KubeClusterResource.cmdKubeClient;
import static io.kroxylicious.systemtests.k8s.KubeClusterResource.kubeClient;
import static org.awaitility.Awaitility.await;

/**
* The type Kaf client (sarama client based CLI).
*/
public class KafClient implements KafkaClient {
private static final Logger LOGGER = LoggerFactory.getLogger(KafClient.class);
private static final TypeReference<KafConsumerRecord> VALUE_TYPE_REF = new TypeReference<>() {
};
private String deployNamespace;

/**
Expand All @@ -55,7 +66,7 @@ public void produceMessages(String topicName, String bootstrap, String message,
List<String> executableCommand = new ArrayList<>(List.of(cmdKubeClient(deployNamespace).toString(), "run", "-i",
"-n", deployNamespace, name,
"--image=" + Constants.KAF_CLIENT_IMAGE,
"--", "kaf", "-n", String.valueOf(numOfMessages), "-b", bootstrap, "produce", topicName));
"--", "kaf", "-n", String.valueOf(numOfMessages), "-b", bootstrap));
recordKey.ifPresent(key -> {
executableCommand.add("--key");
executableCommand.add(key);
Expand All @@ -66,11 +77,57 @@ public void produceMessages(String topicName, String bootstrap, String message,
}

@Override
public String consumeMessages(String topicName, String bootstrap, String messageToCheck, int numOfMessages, Duration timeout) {
public List<ConsumerRecord> consumeMessages(String topicName, String bootstrap, int numOfMessages, Duration timeout) {
LOGGER.atInfo().log("Consuming messages using kaf");
String name = Constants.KAFKA_CONSUMER_CLIENT_LABEL + "-kafka-go";
List<String> args = List.of("kaf", "-b", bootstrap, "consume", topicName);
List<String> args = List.of("kaf", "-b", bootstrap, "consume", topicName, "--output", "json");
Job goClientJob = TestClientsJobTemplates.defaultKafkaGoConsumerJob(name, args).build();
return KafkaUtils.consumeMessages(topicName, name, deployNamespace, goClientJob, messageToCheck, numOfMessages, timeout);
String podName = KafkaUtils.createJob(deployNamespace, name, goClientJob);
String log = waitForConsumer(podName, numOfMessages, timeout);
LOGGER.atInfo().setMessage("Log: {}").addArgument(log).log();
List<String> logRecords = extractRecordLinesFromLog(log);
return getConsumerRecords(topicName, logRecords);
}

private String waitForConsumer(String podName, int numOfMessages, Duration timeout) {
String log;
try {
log = await().alias("Consumer waiting to receive messages")
.ignoreException(KubernetesClientException.class)
.atMost(timeout)
.until(() -> {
if (kubeClient().getClient().pods().inNamespace(deployNamespace).withName(podName).get() != null) {
return kubeClient().logsInSpecificNamespace(deployNamespace, podName);
}
return null;
}, m -> getNumberOfJsonMessages(m) == numOfMessages);
}
catch (ConditionTimeoutException e) {
log = kubeClient().logsInSpecificNamespace(deployNamespace, podName);
LOGGER.atInfo().setMessage("Timeout! Received: {}").addArgument(log).log();
if (!kubeClient().isPodRunSucceeded(deployNamespace, podName)) {
throw new IllegalStateException("Error in consumer: ", e);
}
}
return log;
}

private int getNumberOfJsonMessages(String log) {
return log == null ? 0 : extractRecordLinesFromLog(log).size();
}

private List<String> extractRecordLinesFromLog(String log) {
return Stream.of(log.split("\n")).filter(TestUtils::isValidJson).toList();
}

private List<ConsumerRecord> getConsumerRecords(String topicName, List<String> logRecords) {
List<ConsumerRecord> records = new ArrayList<>();
for (String logRecord : logRecords) {
KafConsumerRecord kafConsumerRecord = ConsumerRecord.parseFromJsonString(VALUE_TYPE_REF, logRecord);
kafConsumerRecord.setTopic(topicName);
records.add(kafConsumerRecord);
}

return records;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
package io.kroxylicious.systemtests.clients;

import java.time.Duration;
import java.util.List;

import io.kroxylicious.systemtests.clients.records.ConsumerRecord;
import io.kroxylicious.systemtests.k8s.exception.KubeClusterException;

import edu.umd.cs.findbugs.annotations.Nullable;
Expand Down Expand Up @@ -52,10 +54,9 @@ default void produceMessages(String topicName, String bootstrap, String message,
*
* @param topicName the topic name
* @param bootstrap the bootstrap
* @param message the message
* @param numOfMessages the num of messages
* @param timeout the timeout
* @return the string
* @return the list of ConsumerRecords
*/
String consumeMessages(String topicName, String bootstrap, String message, int numOfMessages, Duration timeout);
List<ConsumerRecord> consumeMessages(String topicName, String bootstrap, int numOfMessages, Duration timeout);
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,25 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Stream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.type.TypeReference;

import io.fabric8.kubernetes.api.model.batch.v1.Job;

import io.kroxylicious.systemtests.Constants;
import io.kroxylicious.systemtests.clients.records.ConsumerRecord;
import io.kroxylicious.systemtests.clients.records.KcatConsumerRecord;
import io.kroxylicious.systemtests.enums.KafkaClientType;
import io.kroxylicious.systemtests.templates.testclients.TestClientsJobTemplates;
import io.kroxylicious.systemtests.utils.DeploymentUtils;
import io.kroxylicious.systemtests.utils.KafkaUtils;
import io.kroxylicious.systemtests.utils.TestUtils;

import edu.umd.cs.findbugs.annotations.Nullable;

Expand All @@ -31,6 +39,8 @@
*/
public class KcatClient implements KafkaClient {
private static final Logger LOGGER = LoggerFactory.getLogger(KcatClient.class);
private static final TypeReference<KcatConsumerRecord> VALUE_TYPE_REF = new TypeReference<>() {
};
private String deployNamespace;

/**
Expand Down Expand Up @@ -71,11 +81,29 @@ public void produceMessages(String topicName, String bootstrap, String message,
}

@Override
public String consumeMessages(String topicName, String bootstrap, String messageToCheck, int numOfMessages, Duration timeout) {
public List<ConsumerRecord> consumeMessages(String topicName, String bootstrap, int numOfMessages, Duration timeout) {
LOGGER.atInfo().log("Consuming messages using kcat");
String name = Constants.KAFKA_CONSUMER_CLIENT_LABEL + "-kcat";
List<String> args = List.of("-b", bootstrap, "-t", topicName, "-C", "-c" + numOfMessages);
List<String> args = List.of("-b", bootstrap, "-K ,", "-t", topicName, "-C", "-c", String.valueOf(numOfMessages), "-e", "-J");
Job kCatClientJob = TestClientsJobTemplates.defaultKcatJob(name, args).build();
return KafkaUtils.consumeMessages(topicName, name, deployNamespace, kCatClientJob, messageToCheck, numOfMessages, timeout);
String podName = KafkaUtils.createJob(deployNamespace, name, kCatClientJob);
String log = waitForConsumer(deployNamespace, podName, timeout);
LOGGER.atInfo().setMessage("Log: {}").addArgument(log).log();
List<String> logRecords = extractRecordLinesFromLog(log);
return getConsumerRecords(logRecords);
}

private String waitForConsumer(String namespace, String podName, Duration timeout) {
DeploymentUtils.waitForPodRunSucceeded(namespace, podName, timeout);
return kubeClient().logsInSpecificNamespace(namespace, podName);
}

private List<String> extractRecordLinesFromLog(String log) {
return Stream.of(log.split("\n")).filter(TestUtils::isValidJson).toList();
}

private List<ConsumerRecord> getConsumerRecords(List<String> logRecords) {
return logRecords.stream().map(x -> ConsumerRecord.parseFromJsonString(VALUE_TYPE_REF, x))
.filter(Objects::nonNull).map(ConsumerRecord.class::cast).toList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,39 @@
package io.kroxylicious.systemtests.clients;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Stream;

import org.awaitility.core.ConditionTimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.type.TypeReference;

import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.client.KubernetesClientException;

import io.kroxylicious.systemtests.Constants;
import io.kroxylicious.systemtests.clients.records.ConsumerRecord;
import io.kroxylicious.systemtests.clients.records.StrimziTestClientConsumerRecord;
import io.kroxylicious.systemtests.templates.testclients.TestClientsJobTemplates;
import io.kroxylicious.systemtests.utils.DeploymentUtils;
import io.kroxylicious.systemtests.utils.KafkaUtils;

import edu.umd.cs.findbugs.annotations.Nullable;

import static io.kroxylicious.systemtests.k8s.KubeClusterResource.kubeClient;
import static org.awaitility.Awaitility.await;

/**
* The type Strimzi Test client (java client based CLI).
*/
public class StrimziTestClient implements KafkaClient {
private static final Logger LOGGER = LoggerFactory.getLogger(StrimziTestClient.class);
private static final TypeReference<StrimziTestClientConsumerRecord> VALUE_TYPE_REF = new TypeReference<>() {
};
private String deployNamespace;

/**
Expand All @@ -47,13 +61,62 @@ public void produceMessages(String topicName, String bootstrap, String message,
String name = Constants.KAFKA_PRODUCER_CLIENT_LABEL;
Job testClientJob = TestClientsJobTemplates.defaultTestClientProducerJob(name, bootstrap, topicName, numOfMessages, message, messageKey).build();
KafkaUtils.produceMessages(deployNamespace, topicName, name, testClientJob);
String podName = KafkaUtils.getPodNameByLabel(deployNamespace, "app", name, Duration.ofSeconds(30));
String log = waitForProducer(deployNamespace, podName, Duration.ofSeconds(60));
LOGGER.atInfo().setMessage("client producer log: {}").addArgument(log).log();
}

private static String waitForProducer(String namespace, String podName, Duration timeout) {
String log;
try {
log = await().alias("Consumer waiting to receive messages")
.ignoreException(KubernetesClientException.class)
.atMost(timeout)
.until(() -> {
if (kubeClient().getClient().pods().inNamespace(namespace).withName(podName).get() != null) {
return kubeClient().logsInSpecificNamespace(namespace, podName);
}
return null;
}, m -> m != null && m.contains("Sending message:"));
}
catch (ConditionTimeoutException e) {
log = kubeClient().logsInSpecificNamespace(namespace, podName);
LOGGER.atInfo().setMessage("Timeout! Unable to produce the messages: {}").addArgument(log).log();
}
return log;
}

@Override
public String consumeMessages(String topicName, String bootstrap, String messageToCheck, int numOfMessages, Duration timeout) {
public List<ConsumerRecord> consumeMessages(String topicName, String bootstrap, int numOfMessages, Duration timeout) {
LOGGER.atInfo().log("Consuming messages using Strimzi Test Client");
String name = Constants.KAFKA_CONSUMER_CLIENT_LABEL;
Job testClientJob = TestClientsJobTemplates.defaultTestClientConsumerJob(name, bootstrap, topicName, numOfMessages).build();
return KafkaUtils.consumeMessages(topicName, name, deployNamespace, testClientJob, messageToCheck, numOfMessages, timeout);
String podName = KafkaUtils.createJob(deployNamespace, name, testClientJob);
String log = waitForConsumer(deployNamespace, podName, timeout);
LOGGER.atInfo().setMessage("Log: {}").addArgument(log).log();
List<String> logRecords = extractRecordLinesFromLog(log);
return getConsumerRecords(logRecords);
}

private String waitForConsumer(String namespace, String podName, Duration timeout) {
DeploymentUtils.waitForPodRunSucceeded(namespace, podName, timeout);
return kubeClient().logsInSpecificNamespace(namespace, podName);
}

private List<ConsumerRecord> getConsumerRecords(List<String> logRecords) {
return logRecords.stream().map(x -> ConsumerRecord.parseFromJsonString(VALUE_TYPE_REF, x))
.filter(Objects::nonNull).map(ConsumerRecord.class::cast).toList();
}

private List<String> extractRecordLinesFromLog(String log) {
List<String> records = new ArrayList<>();
String stringToSeek = "Received message:";

List<String> receivedMessages = Stream.of(log.split("\n")).filter(l -> l.contains(stringToSeek)).toList();
for (String receivedMessage : receivedMessages) {
records.add(receivedMessage.split(stringToSeek)[1].trim());
}

return records;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright Kroxylicious Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/

package io.kroxylicious.systemtests.clients.records;

import java.io.UncheckedIOException;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;

public class ConsumerRecord {
private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerRecord.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

protected String topic;
protected String key;
protected String value;
protected int partition;
protected long offset;
protected Map<String, String> recordHeaders;

public String getTopic() {
return topic;
}

public String getKey() {
return key;
}

public String getValue() {
return value;
}

public int getPartition() {
return partition;
}

public long getOffset() {
return offset;
}

public Map<String, String> getRecordHeaders() {
return recordHeaders;
}

/**
* Parse from json string t.
*
* @param <T> the type parameter
* @param valueTypeRef the value type ref
* @param response the response
* @return the t
*/
public static <T> T parseFromJsonString(TypeReference<T> valueTypeRef, String response) {
try {
return OBJECT_MAPPER.readValue(response, valueTypeRef);
}
catch (JsonProcessingException e) {
LOGGER.atError().setMessage("Something bad happened").setCause(e).log();
throw new UncheckedIOException(e);
}
}

@Override
public String toString() {
return "ConsumerRecord(topic: " + this.topic +
", key: " + this.key +
", value: " + this.value +
", partition: " + this.partition +
", offset: " + this.offset +
", headers: " + this.recordHeaders +
")";
}
}
Loading

0 comments on commit 49a20fa

Please sign in to comment.