Skip to content

Commit

Permalink
Merge pull request #191 from quarkiverse/release-1.0.2
Browse files Browse the repository at this point in the history
Release 1.0.2
  • Loading branch information
ChMThiel authored May 17, 2024
2 parents c65d91d + c24b483 commit 3c9df6e
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 25 deletions.
2 changes: 1 addition & 1 deletion .github/project.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
release:
current-version: "1.0.1"
current-version: "1.0.2"
next-version: "1.0.0-SNAPSHOT"

2 changes: 1 addition & 1 deletion docs/modules/ROOT/pages/includes/attributes.adoc
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
:project-version: 1.0.1
:project-version: 1.0.2

:examples-dir: ./../examples/
Original file line number Diff line number Diff line change
Expand Up @@ -70,29 +70,38 @@ public KafkaChannelBinding getKafkaChannelBindings(String aTopic) {
}

KafkaChannelTopicConfiguration getTopicConfiguration(AdminClient aClient, String aTopic) {
Map<String, ConfigEntry> configMap = aClient
.describeConfigs(List.of(new ConfigResource(ConfigResource.Type.TOPIC, aTopic)))
.values().values().stream()
.map(f -> {
try {
return f.get();
} catch (InterruptedException | ExecutionException interruptedException) {
return null;
}
})
.filter(Objects::nonNull)
.map(Config::entries)
.flatMap(Collection::stream)
.collect(Collectors.toMap(ConfigEntry::name, Function.identity()));
KafkaChannelTopicCleanupPolicy cleanUpPolicy = KafkaChannelTopicCleanupPolicy
.valueOf(configMap.get(CLEANUP_POLICY).value());
return KafkaChannelTopicConfiguration.builder()
.cleanupPolicy(List.of(cleanUpPolicy))
.retentionMs(Integer.valueOf(configMap.get(RETENTION_MS).value()))
.retentionBytes(Integer.valueOf(configMap.get(RETENTION_BYTES).value()))
.deleteRetentionMs(Integer.valueOf(configMap.get(DELETE_RETENTION_MS).value()))
.maxMessageBytes(Integer.valueOf(configMap.get(MAX_MESSAGE_BYTES).value()))
.build();
KafkaChannelTopicConfiguration.KafkaChannelTopicConfigurationBuilder builder = KafkaChannelTopicConfiguration.builder();
try {
Map<String, ConfigEntry> configMap = aClient
.describeConfigs(List.of(new ConfigResource(ConfigResource.Type.TOPIC, aTopic)))
.values().values().stream()
.map(f -> {
try {
return f.get();
} catch (InterruptedException | ExecutionException interruptedException) {
return null;
}
})
.filter(Objects::nonNull)
.map(Config::entries)
.flatMap(Collection::stream)
.collect(Collectors.toMap(ConfigEntry::name, Function.identity()));
String cleanUpPolicyString = configMap.get(CLEANUP_POLICY).value();
List<KafkaChannelTopicCleanupPolicy> cleanUpPolicies = cleanUpPolicyString == null
? null
: List.of(KafkaChannelTopicCleanupPolicy.valueOf(cleanUpPolicyString.toUpperCase()));
return builder
.cleanupPolicy(cleanUpPolicies)
.retentionMs(Integer.valueOf(configMap.get(RETENTION_MS).value()))
.retentionBytes(Integer.valueOf(configMap.get(RETENTION_BYTES).value()))
.deleteRetentionMs(Integer.valueOf(configMap.get(DELETE_RETENTION_MS).value()))
.maxMessageBytes(Integer.valueOf(configMap.get(MAX_MESSAGE_BYTES).value()))
.build();
} catch (Exception e) {
LOGGER.warning("Unable to read kafka-config for topic " + aTopic);
LOGGER.throwing("KafkaResolver", "getTopicConfiguration", e);
return builder.build();
}
}

private boolean isTopicExists(AdminClient admin, String topicName) throws InterruptedException, ExecutionException {
Expand Down

0 comments on commit 3c9df6e

Please sign in to comment.