From 019219649df1d468729ab0a1ed22b863a9cfc124 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Thu, 7 Nov 2024 11:34:16 +0100 Subject: [PATCH] Add --dynamic-batch-size flag References rabbitmq/rabbitmq-stream-java-client#649 --- .../perf/DefaultPerformanceMetrics.java | 22 +++++++++++++++ .../rabbitmq/stream/perf/StreamPerfTest.java | 18 ++++++++++++- .../java/com/rabbitmq/stream/perf/Utils.java | 27 ++++++++++++++++--- 3 files changed, 62 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/perf/DefaultPerformanceMetrics.java b/src/main/java/com/rabbitmq/stream/perf/DefaultPerformanceMetrics.java index a4b0b11..7213bbf 100644 --- a/src/main/java/com/rabbitmq/stream/perf/DefaultPerformanceMetrics.java +++ b/src/main/java/com/rabbitmq/stream/perf/DefaultPerformanceMetrics.java @@ -41,6 +41,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import org.slf4j.Logger; @@ -56,6 +57,7 @@ class DefaultPerformanceMetrics implements PerformanceMetrics { private final boolean summaryFile; private final PrintWriter out; private final boolean includeByteRates; + private final boolean includeBatchSize; private final Supplier memoryReportSupplier; private volatile Closeable closingSequence = () -> {}; private volatile long lastPublishedCount = 0; @@ -73,11 +75,13 @@ class DefaultPerformanceMetrics implements PerformanceMetrics { String metricsPrefix, boolean summaryFile, boolean includeByteRates, + boolean batchSize, boolean confirmLatency, Supplier memoryReportSupplier, PrintWriter out) { this.summaryFile = summaryFile; this.includeByteRates = includeByteRates; + this.includeBatchSize = batchSize; this.memoryReportSupplier = memoryReportSupplier; this.out = out; this.metricsPrefix = metricsPrefix; @@ -116,6 +120,7 @@ public void start(String description) throws Exception { long startTime = System.nanoTime(); String metricPublished = metricsName("published"); + String metricPublishBatchSize = metricsName("publish_batch_size"); String metricProducerConfirmed = metricsName("producer_confirmed"); String metricConsumed = metricsName("consumed"); String metricChunkSize = metricsName("chunk_size"); @@ -133,6 +138,10 @@ public void start(String description) throws Exception { metricChunkSize, metricLatency)); + if (this.includeBatchSize) { + allMetrics.add(metricPublishBatchSize); + } + if (confirmLatency()) { allMetrics.add(metricConfirmLatency); } @@ -191,6 +200,17 @@ public void start(String description) throws Exception { }); }); + Consumer publishBatchSizeCallback; + if (this.includeBatchSize) { + HistogramSupport publishBatchSize = meterRegistry.get(metricPublishBatchSize).summary(); + Function formatPublishBatchSize = + histogram -> String.format("publish batch size %.0f", histogram.takeSnapshot().mean()); + publishBatchSizeCallback = + sb -> sb.append(formatPublishBatchSize.apply(publishBatchSize)).append(", "); + } else { + publishBatchSizeCallback = ignored -> {}; + } + HistogramSupport chunkSize = meterRegistry.get(metricChunkSize).summary(); Function formatChunkSize = histogram -> String.format("chunk size %.0f", histogram.takeSnapshot().mean()); @@ -244,6 +264,7 @@ public void start(String description) throws Exception { .append(", "); } builder.append(formatLatency.apply("latency", latency)).append(", "); + publishBatchSizeCallback.accept(builder); builder.append(formatChunkSize.apply(chunkSize)); this.out.println(builder); String memoryReport = this.memoryReportSupplier.get(); @@ -299,6 +320,7 @@ public void start(String description) throws Exception { .append(", "); } builder.append(formatLatencySummary.apply("latency", latency)).append(", "); + publishBatchSizeCallback.accept(builder); builder.append(formatChunkSize.apply(chunkSize)); this.out.println(); this.out.println(builder); diff --git a/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java b/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java index 00f8208..15d2c62 100644 --- a/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java +++ b/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java @@ -458,6 +458,18 @@ public class StreamPerfTest implements Callable { defaultValue = "false") private boolean noDevMode; + @CommandLine.Option( + names = {"--dynamic-batch-size", "-dbs"}, + description = "use dynamic batch size for publishing", + defaultValue = "false") + private boolean dynamicBatch; + + @CommandLine.Option( + names = {"--batch-size-metric", "-bsm"}, + description = "display batch size", + defaultValue = "false") + private boolean includeBatchSizeMetric; + static class InstanceSyncOptions { @CommandLine.Option( @@ -689,7 +701,9 @@ public Integer call() throws Exception { .tags(tags) .register(meterRegistry); } - this.metricsCollector = new PerformanceMicrometerMetricsCollector(meterRegistry, metricsPrefix); + this.metricsCollector = + new PerformanceMicrometerMetricsCollector( + meterRegistry, metricsPrefix, this.includeBatchSizeMetric); Counter producerConfirm = meterRegistry.counter(metricsPrefix + ".producer_confirmed"); @@ -751,6 +765,7 @@ public Integer call() throws Exception { metricsPrefix, this.summaryFile, this.includeByteRates, + this.includeBatchSizeMetric, this.confirmLatency, memoryReportSupplier, this.out); @@ -972,6 +987,7 @@ public Integer call() throws Exception { ProducerBuilder producerBuilder = environment .producerBuilder() + .dynamicBatch(this.dynamicBatch) .batchPublishingDelay(ofMillis(this.batchPublishingDelay)); String producerName = this.producerNameStrategy.apply(stream, i + 1); diff --git a/src/main/java/com/rabbitmq/stream/perf/Utils.java b/src/main/java/com/rabbitmq/stream/perf/Utils.java index 17c71ea..8f8e4ca 100644 --- a/src/main/java/com/rabbitmq/stream/perf/Utils.java +++ b/src/main/java/com/rabbitmq/stream/perf/Utils.java @@ -66,9 +66,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BiFunction; -import java.util.function.Function; -import java.util.function.LongSupplier; +import java.util.function.*; import java.util.stream.Collectors; import java.util.stream.IntStream; import javax.net.ssl.KeyManager; @@ -827,8 +825,23 @@ public String apply(String stream, Integer index) { static class PerformanceMicrometerMetricsCollector extends MicrometerMetricsCollector { - public PerformanceMicrometerMetricsCollector(MeterRegistry registry, String prefix) { + private final IntConsumer publisherCallback; + + public PerformanceMicrometerMetricsCollector( + MeterRegistry registry, String prefix, boolean batchSize) { super(registry, prefix); + if (batchSize) { + DistributionSummary publishBatchSize = + DistributionSummary.builder(prefix + ".publish_batch_size") + .description("publish batch size") + .publishPercentiles(0.5, 0.75, 0.95, 0.99) + .distributionStatisticExpiry(ofSeconds(1)) + .serviceLevelObjectives() + .register(registry); + this.publisherCallback = publishBatchSize::record; + } else { + this.publisherCallback = ignored -> {}; + } } @Override @@ -849,6 +862,12 @@ protected DistributionSummary createChunkSizeDistributionSummary( .register(registry); } + @Override + public void publish(int count) { + super.publish(count); + this.publisherCallback.accept(count); + } + @Override public void chunk(int entriesCount) { this.chunkSize.record(entriesCount);