Skip to content

Commit

Permalink
Refactor consumeEncryptedMessages -> consumeMessageFromKafkaCluster
Browse files Browse the repository at this point in the history
rh-pre-commit.version: 2.0.1
rh-pre-commit.check-secrets: ENABLED
  • Loading branch information
SamBarker committed May 1, 2024
1 parent bdc8685 commit 05d92ed
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,13 @@ public static String consumeMessages(String namespace, String topicName, String
* @param kafkaClusterName the name of the kafka cluster to read from
* @param kafkaNamespace the namespace in which the broker is operating
* @param numberOfMessages the number of messages
* @param timeout the timeout
* @param timeout maximum time to wait for the expectedMessage to appear
* @param expectedMessage the message to check for.
* @return the string
*/
public static String consumeEncryptedMessages(String clientNamespace, String topicName, String kafkaClusterName, String kafkaNamespace, int numberOfMessages,
Duration timeout) {
public static String consumeMessageFromKafkaCluster(String clientNamespace, String topicName, String kafkaClusterName, String kafkaNamespace, int numberOfMessages,
Duration timeout, String expectedMessage) {
String kafkaBootstrap = kafkaClusterName + "-kafka-bootstrap." + kafkaNamespace + ".svc.cluster.local:9092";
return KafkaUtils.consumeEncryptedMessageWithTestClients(clientNamespace, topicName, kafkaBootstrap, numberOfMessages, timeout);
return KafkaUtils.consumeMessageWithTestClients(clientNamespace, topicName, kafkaBootstrap, expectedMessage, numberOfMessages, timeout);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,22 +93,6 @@ public static String consumeMessageWithTestClients(String deployNamespace, Strin
return consumeMessages(topicName, name, deployNamespace, testClientJob, message, timeout);
}

/**
* Consume encrypted message with test clients.
*
* @param deployNamespace the deploy namespace
* @param topicName the topic name
* @param bootstrap the bootstrap
* @param numOfMessages the num of messages
* @param timeout the timeout
* @return the string
*/
public static String consumeEncryptedMessageWithTestClients(String deployNamespace, String topicName, String bootstrap, int numOfMessages, Duration timeout) {
String name = Constants.KAFKA_CONSUMER_CLIENT_LABEL;
Job testClientJob = TestClientsJobTemplates.defaultTestClientConsumerJob(name, bootstrap, topicName, numOfMessages).build();
return consumeMessages(topicName, name, deployNamespace, testClientJob, "key: kroxylicious.io/encryption", timeout);
}

/**
* Gets pod name by label.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ void ensureClusterHasEncryptedMessage(String namespace) {
KroxyliciousSteps.produceMessages(namespace, topicName, bootstrap, MESSAGE, numberOfMessages);

LOGGER.atInfo().setMessage("Then the messages are consumed").log();
String resultEncrypted = KroxyliciousSteps.consumeEncryptedMessages(namespace, topicName, clusterName, Constants.KAFKA_DEFAULT_NAMESPACE, numberOfMessages,
Duration.ofMinutes(2));
String resultEncrypted = KroxyliciousSteps.consumeMessageFromKafkaCluster(namespace, topicName, clusterName, Constants.KAFKA_DEFAULT_NAMESPACE, numberOfMessages,
Duration.ofMinutes(2), "key: kroxylicious.io/encryption");
LOGGER.atInfo().setMessage("Received: {}").addArgument(resultEncrypted).log();
assertThat(resultEncrypted)
.withFailMessage("expected message have not been received!")
Expand Down

0 comments on commit 05d92ed

Please sign in to comment.