Skip to content

Commit

Permalink
Allow users to use templates for data generation with String and Byte…
Browse files Browse the repository at this point in the history
…Array serializer/deserializer (#118)

Signed-off-by: Jakub Stejskal <[email protected]>
  • Loading branch information
Frawless authored Jan 13, 2025
1 parent 07e0bba commit 411cbbf
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 1 deletion.
13 changes: 12 additions & 1 deletion clients/src/main/java/io/strimzi/kafka/KafkaProducerClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@
import io.strimzi.test.tracing.TracingUtil;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -119,7 +122,15 @@ public ProducerRecord generateMessage(int numOfMessage) {
Object message;

if (this.configuration.getMessageTemplate() != null) {
message = dataGenerator.generateData();
if (this.configuration.getValueSerializer().contains(StringSerializer.class.getName())) {
// Convert generated data to String
message = dataGenerator.generateData().toString();
} else if (this.configuration.getValueSerializer().equals(ByteArraySerializer.class.getName())) {
// Convert generated data to bytes
message = dataGenerator.generateData().toString().getBytes(StandardCharsets.UTF_8);
} else {
message = dataGenerator.generateData();
}
} else {
message = configuration.getMessage() + " - " + numOfMessage;
}
Expand Down
72 changes: 72 additions & 0 deletions clients/src/test/java/io/strimzi/integration/KafkaClientIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,32 @@
*/
package io.strimzi.integration;

import io.skodjob.datagenerator.enums.ETemplateType;
import io.strimzi.configuration.ConfigurationConstants;
import io.strimzi.kafka.KafkaConsumerClient;
import io.strimzi.kafka.KafkaProducerClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -60,4 +72,64 @@ void testSimpleExchange() throws ExecutionException, InterruptedException, NoSuc
consumedMessages.setAccessible(true);
assertThat(consumedMessages.get(kafkaConsumerClient), is(100));
}

@ParameterizedTest
@MethodSource("getImplementedTemplates")
void testUseTemplateWithoutRegistry(String templateName, String serializer, String deserializer) throws ExecutionException, InterruptedException, NoSuchFieldException, IllegalAccessException, TimeoutException {
String topicName = (templateName + serializer).toLowerCase(Locale.ROOT);
Map<String, String> configuration;
configuration = new HashMap<>();
configuration.put(ConfigurationConstants.BOOTSTRAP_SERVERS_ENV, kafkaCluster.getBootstrapServers());
configuration.put(ConfigurationConstants.TOPIC_ENV, topicName);
configuration.put(ConfigurationConstants.MESSAGE_COUNT_ENV, "100");
configuration.put(ConfigurationConstants.MESSAGE_TEMPLATE_ENV, templateName);
configuration.put("ADDITIONAL_CONFIG", ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG + "=" + serializer);

createKafkaTopic(topicName, Map.of(TopicConfig.RETENTION_MS_CONFIG, "604800000"));

KafkaProducerClient kafkaProducerClient = new KafkaProducerClient(configuration);

CompletableFuture<Void> future = CompletableFuture.runAsync(kafkaProducerClient::run);
// Wait for the process to complete within a reasonable time
future.get(10, TimeUnit.SECONDS);

Field producedMessages = KafkaProducerClient.class.getDeclaredField("messageSuccessfullySent");
producedMessages.setAccessible(true);
assertThat(producedMessages.get(kafkaProducerClient), is(100));

configuration = new HashMap<>();
configuration.put(ConfigurationConstants.BOOTSTRAP_SERVERS_ENV, kafkaCluster.getBootstrapServers());
configuration.put(ConfigurationConstants.TOPIC_ENV, topicName);
configuration.put(ConfigurationConstants.MESSAGE_COUNT_ENV, "100");
configuration.put(ConfigurationConstants.GROUP_ID_ENV, topicName);
configuration.put(ConfigurationConstants.CLIENT_ID_ENV, topicName);
configuration.put("ADDITIONAL_CONFIG", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG + "=" + deserializer);

KafkaConsumerClient kafkaConsumerClient = new KafkaConsumerClient(configuration);

future = CompletableFuture.runAsync(kafkaConsumerClient::run);
// Wait for the process to complete within a reasonable time
future.get(10, TimeUnit.SECONDS);

Field consumedMessages = KafkaConsumerClient.class.getDeclaredField("consumedMessages");
consumedMessages.setAccessible(true);
assertThat(consumedMessages.get(kafkaConsumerClient), is(100));
}

private Stream<Arguments> getImplementedTemplates() {
return Stream.of(
Arguments.of(ETemplateType.PAYROLL.name(), StringSerializer.class.getName(), StringDeserializer.class.getName()),
Arguments.of(ETemplateType.IOT_DEVICE.name(), StringSerializer.class.getName(), StringDeserializer.class.getName()),
Arguments.of(ETemplateType.STARGATE.name(), StringSerializer.class.getName(), StringDeserializer.class.getName()),
Arguments.of(ETemplateType.STARWARS.name(), StringSerializer.class.getName(), StringDeserializer.class.getName()),
Arguments.of(ETemplateType.PAYMENT_FIAT.name(), StringSerializer.class.getName(), StringDeserializer.class.getName()),
Arguments.of(ETemplateType.FLIGHTS.name(), StringSerializer.class.getName(), StringDeserializer.class.getName()),
Arguments.of(ETemplateType.PAYROLL.name(), ByteArraySerializer.class.getName(), ByteArrayDeserializer.class.getName()),
Arguments.of(ETemplateType.IOT_DEVICE.name(), ByteArraySerializer.class.getName(), ByteArrayDeserializer.class.getName()),
Arguments.of(ETemplateType.STARGATE.name(), ByteArraySerializer.class.getName(), ByteArrayDeserializer.class.getName()),
Arguments.of(ETemplateType.STARWARS.name(), ByteArraySerializer.class.getName(), ByteArrayDeserializer.class.getName()),
Arguments.of(ETemplateType.PAYMENT_FIAT.name(), ByteArraySerializer.class.getName(), ByteArrayDeserializer.class.getName()),
Arguments.of(ETemplateType.FLIGHTS.name(), ByteArraySerializer.class.getName(), ByteArrayDeserializer.class.getName())
);
}
}

0 comments on commit 411cbbf

Please sign in to comment.