Skip to content

Commit

Permalink
Cleanup message extraction.
Browse files Browse the repository at this point in the history
This means the tests fail with an assertion error as we'd expect rather than throwing exceptions like
```java.lang.ArrayIndexOutOfBoundsException: Index 1 out of bounds for length 1

   	at io.kroxylicious.systemtests.clients.StrimziTestClient.extractRecordLinesFromLog(StrimziTestClient.java:117)
   	at io.kroxylicious.systemtests.clients.StrimziTestClient.consumeMessages(StrimziTestClient.java:97)
   	at io.kroxylicious.systemtests.steps.KroxyliciousSteps.consumeMessages(KroxyliciousSteps.java:47)
   	at io.kroxylicious.systemtests.steps.KroxyliciousSteps.consumeMessageFromKafkaCluster(KroxyliciousSteps.java:65)
   	at io.kroxylicious.systemtests.RecordEncryptionST.ensureClusterHasEncryptedMessage(RecordEncryptionST.java:97)
   	```

rh-pre-commit.version: 2.0.1
rh-pre-commit.check-secrets: ENABLED

Signed-off-by: Sam Barker <[email protected]>
  • Loading branch information
SamBarker committed Jul 16, 2024
1 parent 56db469 commit f6960f7
Showing 1 changed file with 20 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
package io.kroxylicious.systemtests.clients;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Predicate;
import java.util.stream.Stream;

import org.awaitility.core.ConditionTimeoutException;
Expand Down Expand Up @@ -94,7 +94,7 @@ public List<ConsumerRecord> consumeMessages(String topicName, String bootstrap,
String podName = KafkaUtils.createJob(deployNamespace, name, testClientJob);
String log = waitForConsumer(deployNamespace, podName, timeout);
LOGGER.atInfo().setMessage("Log: {}").addArgument(log).log();
List<String> logRecords = extractRecordLinesFromLog(log);
Stream<String> logRecords = extractRecordLinesFromLog(log);
return getConsumerRecords(logRecords);
}

Expand All @@ -103,20 +103,27 @@ private String waitForConsumer(String namespace, String podName, Duration timeou
return kubeClient().logsInSpecificNamespace(namespace, podName);
}

private List<ConsumerRecord> getConsumerRecords(List<String> logRecords) {
return logRecords.stream().map(x -> ConsumerRecord.parseFromJsonString(VALUE_TYPE_REF, x))
.filter(Objects::nonNull).map(ConsumerRecord.class::cast).toList();
private List<ConsumerRecord> getConsumerRecords(Stream<String> logRecords) {
return logRecords.filter(Predicate.not(String::isBlank))
.map(x -> ConsumerRecord.parseFromJsonString(VALUE_TYPE_REF, x))
.filter(Objects::nonNull)
.map(ConsumerRecord.class::cast)
.toList();
}

private List<String> extractRecordLinesFromLog(String log) {
List<String> records = new ArrayList<>();
private Stream<String> extractRecordLinesFromLog(String log) {
String stringToSeek = "Received message:";

List<String> receivedMessages = Stream.of(log.split("\n")).filter(l -> l.contains(stringToSeek)).toList();
for (String receivedMessage : receivedMessages) {
records.add(receivedMessage.split(stringToSeek)[1].trim());
}

return records;
return Stream.of(log.split("\n"))
.filter(l -> l.contains(stringToSeek))
.map(line -> {
final String[] split = line.split(stringToSeek, 1);
if (split.length > 1) {
return split[1];
}
else {
return "";
}
});
}
}

0 comments on commit f6960f7

Please sign in to comment.