Skip to content

Commit

Permalink
Fixed spotbugs warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat committed Dec 19, 2023
1 parent 35b6cff commit e8c7185
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import com.google.common.base.Preconditions;

import java.io.IOException;
import java.nio.ByteBuffer;
import org.HdrHistogram.Histogram;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Random;

import org.HdrHistogram.Histogram;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -64,10 +63,12 @@ private Histogram randomHisto(int samples) {
byte[] serializeRandomHisto(int samples, int initialBufferSize) throws Exception {
ByteBuffer inbuffer = ByteBuffer.allocate(initialBufferSize);
Histogram inHisto = randomHisto(samples);
byte[] serialBytes = HistogramSerializer.toByteArray(HistogramSerializer.serializeHistogram(inHisto, inbuffer));
byte[] serialBytes =
HistogramSerializer.toByteArray(HistogramSerializer.serializeHistogram(inHisto, inbuffer));

// check roundtrip
Histogram outHisto = Histogram.decodeFromCompressedByteBuffer(ByteBuffer.wrap(serialBytes), Long.MIN_VALUE);
Histogram outHisto =
Histogram.decodeFromCompressedByteBuffer(ByteBuffer.wrap(serialBytes), Long.MIN_VALUE);
assertThat(inHisto).isEqualTo(outHisto);

return serialBytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,15 @@ public String getTopicNamePrefix() {

int fetchCnt = 0;

private synchronized Set<String> fetchMasterAndSlaveAddrByClusterName(final MQAdminExt adminExt,
final String clusterName) throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, MQBrokerException, InterruptedException {
private synchronized Set<String> fetchMasterAndSlaveAddrByClusterName(
final MQAdminExt adminExt, final String clusterName)
throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
MQBrokerException, InterruptedException {
Set<String> brokerList = cachedBrokerAddr.get(clusterName);
if (brokerList == null) {
brokerList =
CommandUtil.fetchMasterAndSlaveAddrByClusterName(
adminExt, this.rmqClientConfig.clusterName);
CommandUtil.fetchMasterAndSlaveAddrByClusterName(
adminExt, this.rmqClientConfig.clusterName);
cachedBrokerAddr.put(clusterName, brokerList);
if (brokerList.isEmpty()) {
throw new RuntimeException("get brokerAddr return null, clusterName: " + clusterName);
Expand All @@ -114,35 +115,37 @@ private synchronized Set<String> fetchMasterAndSlaveAddrByClusterName(final MQAd
@Override
public CompletableFuture<Void> createTopic(final String topic, final int partitions) {
return CompletableFuture.runAsync(
() -> {
TopicConfig topicConfig = new TopicConfig();
topicConfig.setOrder(false);
topicConfig.setPerm(6);
topicConfig.setReadQueueNums(partitions);
topicConfig.setWriteQueueNums(partitions);
topicConfig.setTopicName(topic);
if (Boolean.TRUE.equals(this.rmqClientConfig.batchCQ)) {
topicConfig.getAttributes().put("+" + TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName(), "BatchCQ");
}
() -> {
TopicConfig topicConfig = new TopicConfig();
topicConfig.setOrder(false);
topicConfig.setPerm(6);
topicConfig.setReadQueueNums(partitions);
topicConfig.setWriteQueueNums(partitions);
topicConfig.setTopicName(topic);
if (Boolean.TRUE.equals(this.rmqClientConfig.batchCQ)) {
topicConfig
.getAttributes()
.put("+" + TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName(), "BatchCQ");
}

try {
Set<String> brokerList =
fetchMasterAndSlaveAddrByClusterName(
this.rmqAdmin, this.rmqClientConfig.clusterName);
topicConfig.setReadQueueNums(Math.max(1, partitions / brokerList.size()));
topicConfig.setWriteQueueNums(Math.max(1, partitions / brokerList.size()));
try {
Set<String> brokerList =
fetchMasterAndSlaveAddrByClusterName(
this.rmqAdmin, this.rmqClientConfig.clusterName);
topicConfig.setReadQueueNums(Math.max(1, partitions / brokerList.size()));
topicConfig.setWriteQueueNums(Math.max(1, partitions / brokerList.size()));

for (String brokerAddr : brokerList) {
this.rmqAdmin.createAndUpdateTopicConfig(brokerAddr, topicConfig);
for (String brokerAddr : brokerList) {
this.rmqAdmin.createAndUpdateTopicConfig(brokerAddr, topicConfig);
}
} catch (Exception e) {
throw new RuntimeException(
String.format(
"Failed to create topic [%s] to cluster [%s]",
topic, this.rmqClientConfig.clusterName),
e);
}
} catch (Exception e) {
throw new RuntimeException(
String.format(
"Failed to create topic [%s] to cluster [%s]",
topic, this.rmqClientConfig.clusterName),
e);
}
});
});
}

@Override
Expand Down

0 comments on commit e8c7185

Please sign in to comment.