From d2b1059d09c77af2c1ed5734fdc0da58d24f16ef Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Thu, 23 Mar 2023 11:46:29 -0300 Subject: [PATCH 1/3] Grow the histogram buffer if it is too small When we serialize a histogram to a byte array, the intermediate ByteBuffer that we pass may be too small which may result in silent truncation of the encoded histogram. This will manifest on the driver side as a decoding failure. This change detects this case and grows the buffer by a factor of 2 until it fits. Fixes openmessaging/benchmark#369. --- .../worker/jackson/HistogramSerializer.java | 38 ++++++++++++++++--- 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/jackson/HistogramSerializer.java b/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/jackson/HistogramSerializer.java index a1b61afa3..02287393c 100644 --- a/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/jackson/HistogramSerializer.java +++ b/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/jackson/HistogramSerializer.java @@ -17,6 +17,8 @@ import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.SerializerProvider; import com.fasterxml.jackson.databind.ser.std.StdSerializer; +import com.google.common.base.Preconditions; + import java.io.IOException; import java.nio.ByteBuffer; import org.HdrHistogram.Histogram; @@ -30,16 +32,40 @@ public HistogramSerializer() { super(Histogram.class); } + static byte[] toByteArray(ByteBuffer buffer) { + byte[] encodedBuffer = new byte[buffer.remaining()]; + buffer.get(encodedBuffer); + return encodedBuffer; + } + + static ByteBuffer serializeHistogram(Histogram histo, ByteBuffer buffer) { + buffer.clear(); + while (true) { + final int outBytes = histo.encodeIntoCompressedByteBuffer(buffer); + Preconditions.checkState(outBytes == buffer.position()); + final int capacity = buffer.capacity(); + if (outBytes < capacity) { + // encoding succesful + break; + } + // We filled the entire buffer, an indication that the buffer was not + // large enough, so we double the buffer and try again. + // See: https://github.com/HdrHistogram/HdrHistogram/issues/201 + buffer = ByteBuffer.allocate(capacity * 2); + } + buffer.flip(); + return buffer; + } + @Override public void serialize( Histogram histogram, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException { ByteBuffer buffer = threadBuffer.get(); - buffer.clear(); - histogram.encodeIntoCompressedByteBuffer(buffer); - byte[] arr = new byte[buffer.position()]; - buffer.flip(); - buffer.get(arr); - jsonGenerator.writeBinary(arr); + ByteBuffer newBuffer = serializeHistogram(histogram, buffer); + if (newBuffer != buffer) { + threadBuffer.set(newBuffer); + } + jsonGenerator.writeBinary(toByteArray(newBuffer)); } } From 35b6cff25836d65cc701af7f1aed3dec3dc3f023 Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Thu, 23 Mar 2023 11:46:29 -0300 Subject: [PATCH 2/3] Add Histogram deserialization test Add an addition test to the HistogramSerDeTest which tests that histogram deserialization roundtrips even when the serialized size exceeds the initial buffer. --- .../worker/jackson/HistogramSerDeTest.java | 61 +++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/benchmark-framework/src/test/java/io/openmessaging/benchmark/worker/jackson/HistogramSerDeTest.java b/benchmark-framework/src/test/java/io/openmessaging/benchmark/worker/jackson/HistogramSerDeTest.java index b06ef7566..d1234c20d 100644 --- a/benchmark-framework/src/test/java/io/openmessaging/benchmark/worker/jackson/HistogramSerDeTest.java +++ b/benchmark-framework/src/test/java/io/openmessaging/benchmark/worker/jackson/HistogramSerDeTest.java @@ -18,6 +18,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.module.SimpleModule; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Random; + import org.HdrHistogram.Histogram; import org.junit.jupiter.api.Test; @@ -41,4 +44,62 @@ void deserialize() throws IOException { assertThat(deserialized).isEqualTo(value); } + + /** + * Create a random histogram and insert the given number of samples. + * + * @param samples the number of samples to record into the histogram + * @return a Histogram with the given number of samples + */ + private Histogram randomHisto(int samples) { + Random r = new Random(0xBADBEEF); + Histogram h = new org.HdrHistogram.Histogram(5); + for (int i = 0; i < samples; i++) { + h.recordValue(r.nextInt(10000000)); + } + + return h; + } + + byte[] serializeRandomHisto(int samples, int initialBufferSize) throws Exception { + ByteBuffer inbuffer = ByteBuffer.allocate(initialBufferSize); + Histogram inHisto = randomHisto(samples); + byte[] serialBytes = HistogramSerializer.toByteArray(HistogramSerializer.serializeHistogram(inHisto, inbuffer)); + + // check roundtrip + Histogram outHisto = Histogram.decodeFromCompressedByteBuffer(ByteBuffer.wrap(serialBytes), Long.MIN_VALUE); + assertThat(inHisto).isEqualTo(outHisto); + + return serialBytes; + } + + @Test + public void testHistogram() throws Exception { + + // in the worker it's 8 MB but it takes a while to make a histogram that big + final int bufSize = 1002; + + int samples = 300; + + // we do an exponential search to fit the crossover point where we need to grow the buffer + while (true) { + byte[] serialBytes = serializeRandomHisto(samples, bufSize); + // System.out.println("Samples: " + samples + ", histogram size: " + serialBytes.length); + if (serialBytes.length >= bufSize) { + break; + } + samples *= 1.05; + } + + // then walk backwards across the point linearly with increment of 1 to check the boundary + // carefully + while (true) { + samples--; + byte[] serialBytes = serializeRandomHisto(samples, bufSize); + // System.out.println("Samples: " + samples + ", histogram size: " + serialBytes.length); + if (serialBytes.length < bufSize - 10) { + break; + } + } + } } From e8c7185cdb6c8b81bf6cd013c2eb6ee1e6f5facb Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 19 Dec 2023 15:53:36 -0800 Subject: [PATCH 3/3] Fixed spotbugs warnings --- .../worker/jackson/HistogramSerializer.java | 1 - .../worker/jackson/HistogramSerDeTest.java | 7 +- .../rocketmq/RocketMQBenchmarkDriver.java | 65 ++++++++++--------- 3 files changed, 38 insertions(+), 35 deletions(-) diff --git a/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/jackson/HistogramSerializer.java b/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/jackson/HistogramSerializer.java index 02287393c..c1c253f16 100644 --- a/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/jackson/HistogramSerializer.java +++ b/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/jackson/HistogramSerializer.java @@ -18,7 +18,6 @@ import com.fasterxml.jackson.databind.SerializerProvider; import com.fasterxml.jackson.databind.ser.std.StdSerializer; import com.google.common.base.Preconditions; - import java.io.IOException; import java.nio.ByteBuffer; import org.HdrHistogram.Histogram; diff --git a/benchmark-framework/src/test/java/io/openmessaging/benchmark/worker/jackson/HistogramSerDeTest.java b/benchmark-framework/src/test/java/io/openmessaging/benchmark/worker/jackson/HistogramSerDeTest.java index d1234c20d..9769eac6f 100644 --- a/benchmark-framework/src/test/java/io/openmessaging/benchmark/worker/jackson/HistogramSerDeTest.java +++ b/benchmark-framework/src/test/java/io/openmessaging/benchmark/worker/jackson/HistogramSerDeTest.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Random; - import org.HdrHistogram.Histogram; import org.junit.jupiter.api.Test; @@ -64,10 +63,12 @@ private Histogram randomHisto(int samples) { byte[] serializeRandomHisto(int samples, int initialBufferSize) throws Exception { ByteBuffer inbuffer = ByteBuffer.allocate(initialBufferSize); Histogram inHisto = randomHisto(samples); - byte[] serialBytes = HistogramSerializer.toByteArray(HistogramSerializer.serializeHistogram(inHisto, inbuffer)); + byte[] serialBytes = + HistogramSerializer.toByteArray(HistogramSerializer.serializeHistogram(inHisto, inbuffer)); // check roundtrip - Histogram outHisto = Histogram.decodeFromCompressedByteBuffer(ByteBuffer.wrap(serialBytes), Long.MIN_VALUE); + Histogram outHisto = + Histogram.decodeFromCompressedByteBuffer(ByteBuffer.wrap(serialBytes), Long.MIN_VALUE); assertThat(inHisto).isEqualTo(outHisto); return serialBytes; diff --git a/driver-rocketmq/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQBenchmarkDriver.java b/driver-rocketmq/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQBenchmarkDriver.java index cc54c7577..ccad035fd 100644 --- a/driver-rocketmq/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQBenchmarkDriver.java +++ b/driver-rocketmq/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQBenchmarkDriver.java @@ -92,14 +92,15 @@ public String getTopicNamePrefix() { int fetchCnt = 0; - private synchronized Set fetchMasterAndSlaveAddrByClusterName(final MQAdminExt adminExt, - final String clusterName) throws RemotingConnectException, RemotingSendRequestException, - RemotingTimeoutException, MQBrokerException, InterruptedException { + private synchronized Set fetchMasterAndSlaveAddrByClusterName( + final MQAdminExt adminExt, final String clusterName) + throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, + MQBrokerException, InterruptedException { Set brokerList = cachedBrokerAddr.get(clusterName); if (brokerList == null) { brokerList = - CommandUtil.fetchMasterAndSlaveAddrByClusterName( - adminExt, this.rmqClientConfig.clusterName); + CommandUtil.fetchMasterAndSlaveAddrByClusterName( + adminExt, this.rmqClientConfig.clusterName); cachedBrokerAddr.put(clusterName, brokerList); if (brokerList.isEmpty()) { throw new RuntimeException("get brokerAddr return null, clusterName: " + clusterName); @@ -114,35 +115,37 @@ private synchronized Set fetchMasterAndSlaveAddrByClusterName(final MQAd @Override public CompletableFuture createTopic(final String topic, final int partitions) { return CompletableFuture.runAsync( - () -> { - TopicConfig topicConfig = new TopicConfig(); - topicConfig.setOrder(false); - topicConfig.setPerm(6); - topicConfig.setReadQueueNums(partitions); - topicConfig.setWriteQueueNums(partitions); - topicConfig.setTopicName(topic); - if (Boolean.TRUE.equals(this.rmqClientConfig.batchCQ)) { - topicConfig.getAttributes().put("+" + TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName(), "BatchCQ"); - } + () -> { + TopicConfig topicConfig = new TopicConfig(); + topicConfig.setOrder(false); + topicConfig.setPerm(6); + topicConfig.setReadQueueNums(partitions); + topicConfig.setWriteQueueNums(partitions); + topicConfig.setTopicName(topic); + if (Boolean.TRUE.equals(this.rmqClientConfig.batchCQ)) { + topicConfig + .getAttributes() + .put("+" + TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName(), "BatchCQ"); + } - try { - Set brokerList = - fetchMasterAndSlaveAddrByClusterName( - this.rmqAdmin, this.rmqClientConfig.clusterName); - topicConfig.setReadQueueNums(Math.max(1, partitions / brokerList.size())); - topicConfig.setWriteQueueNums(Math.max(1, partitions / brokerList.size())); + try { + Set brokerList = + fetchMasterAndSlaveAddrByClusterName( + this.rmqAdmin, this.rmqClientConfig.clusterName); + topicConfig.setReadQueueNums(Math.max(1, partitions / brokerList.size())); + topicConfig.setWriteQueueNums(Math.max(1, partitions / brokerList.size())); - for (String brokerAddr : brokerList) { - this.rmqAdmin.createAndUpdateTopicConfig(brokerAddr, topicConfig); + for (String brokerAddr : brokerList) { + this.rmqAdmin.createAndUpdateTopicConfig(brokerAddr, topicConfig); + } + } catch (Exception e) { + throw new RuntimeException( + String.format( + "Failed to create topic [%s] to cluster [%s]", + topic, this.rmqClientConfig.clusterName), + e); } - } catch (Exception e) { - throw new RuntimeException( - String.format( - "Failed to create topic [%s] to cluster [%s]", - topic, this.rmqClientConfig.clusterName), - e); - } - }); + }); } @Override