Skip to content

Commit

Permalink
Fixed for 'Failed to execute goal com.diffplug.spotless:spotless-mave…
Browse files Browse the repository at this point in the history
…n-plugin:2.25.0:check (check) on project driver-rocketmq: The following files had format violations: [ERROR] src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQBenchmarkDriver.java' (openmessaging#399)

Co-authored-by: nuolin <[email protected]>
  • Loading branch information
StevenLuMT and nuolin authored Dec 14, 2023
1 parent d0c0277 commit f423781
Showing 1 changed file with 34 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,15 @@ public String getTopicNamePrefix() {

int fetchCnt = 0;

private synchronized Set<String> fetchMasterAndSlaveAddrByClusterName(final MQAdminExt adminExt,
final String clusterName) throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, MQBrokerException, InterruptedException {
private synchronized Set<String> fetchMasterAndSlaveAddrByClusterName(
final MQAdminExt adminExt, final String clusterName)
throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
MQBrokerException, InterruptedException {
Set<String> brokerList = cachedBrokerAddr.get(clusterName);
if (brokerList == null) {
brokerList =
CommandUtil.fetchMasterAndSlaveAddrByClusterName(
adminExt, this.rmqClientConfig.clusterName);
CommandUtil.fetchMasterAndSlaveAddrByClusterName(
adminExt, this.rmqClientConfig.clusterName);
cachedBrokerAddr.put(clusterName, brokerList);
if (brokerList.isEmpty()) {
throw new RuntimeException("get brokerAddr return null, clusterName: " + clusterName);
Expand All @@ -114,35 +115,37 @@ private synchronized Set<String> fetchMasterAndSlaveAddrByClusterName(final MQAd
@Override
public CompletableFuture<Void> createTopic(final String topic, final int partitions) {
return CompletableFuture.runAsync(
() -> {
TopicConfig topicConfig = new TopicConfig();
topicConfig.setOrder(false);
topicConfig.setPerm(6);
topicConfig.setReadQueueNums(partitions);
topicConfig.setWriteQueueNums(partitions);
topicConfig.setTopicName(topic);
if (Boolean.TRUE.equals(this.rmqClientConfig.batchCQ)) {
topicConfig.getAttributes().put("+" + TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName(), "BatchCQ");
}
() -> {
TopicConfig topicConfig = new TopicConfig();
topicConfig.setOrder(false);
topicConfig.setPerm(6);
topicConfig.setReadQueueNums(partitions);
topicConfig.setWriteQueueNums(partitions);
topicConfig.setTopicName(topic);
if (Boolean.TRUE.equals(this.rmqClientConfig.batchCQ)) {
topicConfig
.getAttributes()
.put("+" + TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName(), "BatchCQ");
}

try {
Set<String> brokerList =
fetchMasterAndSlaveAddrByClusterName(
this.rmqAdmin, this.rmqClientConfig.clusterName);
topicConfig.setReadQueueNums(Math.max(1, partitions / brokerList.size()));
topicConfig.setWriteQueueNums(Math.max(1, partitions / brokerList.size()));
try {
Set<String> brokerList =
fetchMasterAndSlaveAddrByClusterName(
this.rmqAdmin, this.rmqClientConfig.clusterName);
topicConfig.setReadQueueNums(Math.max(1, partitions / brokerList.size()));
topicConfig.setWriteQueueNums(Math.max(1, partitions / brokerList.size()));

for (String brokerAddr : brokerList) {
this.rmqAdmin.createAndUpdateTopicConfig(brokerAddr, topicConfig);
for (String brokerAddr : brokerList) {
this.rmqAdmin.createAndUpdateTopicConfig(brokerAddr, topicConfig);
}
} catch (Exception e) {
throw new RuntimeException(
String.format(
"Failed to create topic [%s] to cluster [%s]",
topic, this.rmqClientConfig.clusterName),
e);
}
} catch (Exception e) {
throw new RuntimeException(
String.format(
"Failed to create topic [%s] to cluster [%s]",
topic, this.rmqClientConfig.clusterName),
e);
}
});
});
}

@Override
Expand Down

0 comments on commit f423781

Please sign in to comment.