diff --git a/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/jackson/HistogramSerializer.java b/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/jackson/HistogramSerializer.java index a1b61afa..c1c253f1 100644 --- a/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/jackson/HistogramSerializer.java +++ b/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/jackson/HistogramSerializer.java @@ -17,6 +17,7 @@ import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.SerializerProvider; import com.fasterxml.jackson.databind.ser.std.StdSerializer; +import com.google.common.base.Preconditions; import java.io.IOException; import java.nio.ByteBuffer; import org.HdrHistogram.Histogram; @@ -30,16 +31,40 @@ public HistogramSerializer() { super(Histogram.class); } + static byte[] toByteArray(ByteBuffer buffer) { + byte[] encodedBuffer = new byte[buffer.remaining()]; + buffer.get(encodedBuffer); + return encodedBuffer; + } + + static ByteBuffer serializeHistogram(Histogram histo, ByteBuffer buffer) { + buffer.clear(); + while (true) { + final int outBytes = histo.encodeIntoCompressedByteBuffer(buffer); + Preconditions.checkState(outBytes == buffer.position()); + final int capacity = buffer.capacity(); + if (outBytes < capacity) { + // encoding succesful + break; + } + // We filled the entire buffer, an indication that the buffer was not + // large enough, so we double the buffer and try again. + // See: https://github.com/HdrHistogram/HdrHistogram/issues/201 + buffer = ByteBuffer.allocate(capacity * 2); + } + buffer.flip(); + return buffer; + } + @Override public void serialize( Histogram histogram, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException { ByteBuffer buffer = threadBuffer.get(); - buffer.clear(); - histogram.encodeIntoCompressedByteBuffer(buffer); - byte[] arr = new byte[buffer.position()]; - buffer.flip(); - buffer.get(arr); - jsonGenerator.writeBinary(arr); + ByteBuffer newBuffer = serializeHistogram(histogram, buffer); + if (newBuffer != buffer) { + threadBuffer.set(newBuffer); + } + jsonGenerator.writeBinary(toByteArray(newBuffer)); } } diff --git a/benchmark-framework/src/test/java/io/openmessaging/benchmark/worker/jackson/HistogramSerDeTest.java b/benchmark-framework/src/test/java/io/openmessaging/benchmark/worker/jackson/HistogramSerDeTest.java index b06ef756..9769eac6 100644 --- a/benchmark-framework/src/test/java/io/openmessaging/benchmark/worker/jackson/HistogramSerDeTest.java +++ b/benchmark-framework/src/test/java/io/openmessaging/benchmark/worker/jackson/HistogramSerDeTest.java @@ -18,6 +18,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.module.SimpleModule; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Random; import org.HdrHistogram.Histogram; import org.junit.jupiter.api.Test; @@ -41,4 +43,64 @@ void deserialize() throws IOException { assertThat(deserialized).isEqualTo(value); } + + /** + * Create a random histogram and insert the given number of samples. + * + * @param samples the number of samples to record into the histogram + * @return a Histogram with the given number of samples + */ + private Histogram randomHisto(int samples) { + Random r = new Random(0xBADBEEF); + Histogram h = new org.HdrHistogram.Histogram(5); + for (int i = 0; i < samples; i++) { + h.recordValue(r.nextInt(10000000)); + } + + return h; + } + + byte[] serializeRandomHisto(int samples, int initialBufferSize) throws Exception { + ByteBuffer inbuffer = ByteBuffer.allocate(initialBufferSize); + Histogram inHisto = randomHisto(samples); + byte[] serialBytes = + HistogramSerializer.toByteArray(HistogramSerializer.serializeHistogram(inHisto, inbuffer)); + + // check roundtrip + Histogram outHisto = + Histogram.decodeFromCompressedByteBuffer(ByteBuffer.wrap(serialBytes), Long.MIN_VALUE); + assertThat(inHisto).isEqualTo(outHisto); + + return serialBytes; + } + + @Test + public void testHistogram() throws Exception { + + // in the worker it's 8 MB but it takes a while to make a histogram that big + final int bufSize = 1002; + + int samples = 300; + + // we do an exponential search to fit the crossover point where we need to grow the buffer + while (true) { + byte[] serialBytes = serializeRandomHisto(samples, bufSize); + // System.out.println("Samples: " + samples + ", histogram size: " + serialBytes.length); + if (serialBytes.length >= bufSize) { + break; + } + samples *= 1.05; + } + + // then walk backwards across the point linearly with increment of 1 to check the boundary + // carefully + while (true) { + samples--; + byte[] serialBytes = serializeRandomHisto(samples, bufSize); + // System.out.println("Samples: " + samples + ", histogram size: " + serialBytes.length); + if (serialBytes.length < bufSize - 10) { + break; + } + } + } } diff --git a/driver-rocketmq/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQBenchmarkDriver.java b/driver-rocketmq/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQBenchmarkDriver.java index cc54c757..ccad035f 100644 --- a/driver-rocketmq/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQBenchmarkDriver.java +++ b/driver-rocketmq/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQBenchmarkDriver.java @@ -92,14 +92,15 @@ public String getTopicNamePrefix() { int fetchCnt = 0; - private synchronized Set fetchMasterAndSlaveAddrByClusterName(final MQAdminExt adminExt, - final String clusterName) throws RemotingConnectException, RemotingSendRequestException, - RemotingTimeoutException, MQBrokerException, InterruptedException { + private synchronized Set fetchMasterAndSlaveAddrByClusterName( + final MQAdminExt adminExt, final String clusterName) + throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, + MQBrokerException, InterruptedException { Set brokerList = cachedBrokerAddr.get(clusterName); if (brokerList == null) { brokerList = - CommandUtil.fetchMasterAndSlaveAddrByClusterName( - adminExt, this.rmqClientConfig.clusterName); + CommandUtil.fetchMasterAndSlaveAddrByClusterName( + adminExt, this.rmqClientConfig.clusterName); cachedBrokerAddr.put(clusterName, brokerList); if (brokerList.isEmpty()) { throw new RuntimeException("get brokerAddr return null, clusterName: " + clusterName); @@ -114,35 +115,37 @@ private synchronized Set fetchMasterAndSlaveAddrByClusterName(final MQAd @Override public CompletableFuture createTopic(final String topic, final int partitions) { return CompletableFuture.runAsync( - () -> { - TopicConfig topicConfig = new TopicConfig(); - topicConfig.setOrder(false); - topicConfig.setPerm(6); - topicConfig.setReadQueueNums(partitions); - topicConfig.setWriteQueueNums(partitions); - topicConfig.setTopicName(topic); - if (Boolean.TRUE.equals(this.rmqClientConfig.batchCQ)) { - topicConfig.getAttributes().put("+" + TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName(), "BatchCQ"); - } + () -> { + TopicConfig topicConfig = new TopicConfig(); + topicConfig.setOrder(false); + topicConfig.setPerm(6); + topicConfig.setReadQueueNums(partitions); + topicConfig.setWriteQueueNums(partitions); + topicConfig.setTopicName(topic); + if (Boolean.TRUE.equals(this.rmqClientConfig.batchCQ)) { + topicConfig + .getAttributes() + .put("+" + TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName(), "BatchCQ"); + } - try { - Set brokerList = - fetchMasterAndSlaveAddrByClusterName( - this.rmqAdmin, this.rmqClientConfig.clusterName); - topicConfig.setReadQueueNums(Math.max(1, partitions / brokerList.size())); - topicConfig.setWriteQueueNums(Math.max(1, partitions / brokerList.size())); + try { + Set brokerList = + fetchMasterAndSlaveAddrByClusterName( + this.rmqAdmin, this.rmqClientConfig.clusterName); + topicConfig.setReadQueueNums(Math.max(1, partitions / brokerList.size())); + topicConfig.setWriteQueueNums(Math.max(1, partitions / brokerList.size())); - for (String brokerAddr : brokerList) { - this.rmqAdmin.createAndUpdateTopicConfig(brokerAddr, topicConfig); + for (String brokerAddr : brokerList) { + this.rmqAdmin.createAndUpdateTopicConfig(brokerAddr, topicConfig); + } + } catch (Exception e) { + throw new RuntimeException( + String.format( + "Failed to create topic [%s] to cluster [%s]", + topic, this.rmqClientConfig.clusterName), + e); } - } catch (Exception e) { - throw new RuntimeException( - String.format( - "Failed to create topic [%s] to cluster [%s]", - topic, this.rmqClientConfig.clusterName), - e); - } - }); + }); } @Override