Skip to content

Commit

Permalink
Add --dynamic-batch-size flag
Browse files Browse the repository at this point in the history
  • Loading branch information
acogoluegnes committed Nov 7, 2024
1 parent 7b18622 commit 0192196
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.slf4j.Logger;
Expand All @@ -56,6 +57,7 @@ class DefaultPerformanceMetrics implements PerformanceMetrics {
private final boolean summaryFile;
private final PrintWriter out;
private final boolean includeByteRates;
private final boolean includeBatchSize;
private final Supplier<String> memoryReportSupplier;
private volatile Closeable closingSequence = () -> {};
private volatile long lastPublishedCount = 0;
Expand All @@ -73,11 +75,13 @@ class DefaultPerformanceMetrics implements PerformanceMetrics {
String metricsPrefix,
boolean summaryFile,
boolean includeByteRates,
boolean batchSize,
boolean confirmLatency,
Supplier<String> memoryReportSupplier,
PrintWriter out) {
this.summaryFile = summaryFile;
this.includeByteRates = includeByteRates;
this.includeBatchSize = batchSize;
this.memoryReportSupplier = memoryReportSupplier;
this.out = out;
this.metricsPrefix = metricsPrefix;
Expand Down Expand Up @@ -116,6 +120,7 @@ public void start(String description) throws Exception {
long startTime = System.nanoTime();

String metricPublished = metricsName("published");
String metricPublishBatchSize = metricsName("publish_batch_size");
String metricProducerConfirmed = metricsName("producer_confirmed");
String metricConsumed = metricsName("consumed");
String metricChunkSize = metricsName("chunk_size");
Expand All @@ -133,6 +138,10 @@ public void start(String description) throws Exception {
metricChunkSize,
metricLatency));

if (this.includeBatchSize) {
allMetrics.add(metricPublishBatchSize);
}

if (confirmLatency()) {
allMetrics.add(metricConfirmLatency);
}
Expand Down Expand Up @@ -191,6 +200,17 @@ public void start(String description) throws Exception {
});
});

Consumer<StringBuilder> publishBatchSizeCallback;
if (this.includeBatchSize) {
HistogramSupport publishBatchSize = meterRegistry.get(metricPublishBatchSize).summary();
Function<HistogramSupport, String> formatPublishBatchSize =
histogram -> String.format("publish batch size %.0f", histogram.takeSnapshot().mean());
publishBatchSizeCallback =
sb -> sb.append(formatPublishBatchSize.apply(publishBatchSize)).append(", ");
} else {
publishBatchSizeCallback = ignored -> {};
}

HistogramSupport chunkSize = meterRegistry.get(metricChunkSize).summary();
Function<HistogramSupport, String> formatChunkSize =
histogram -> String.format("chunk size %.0f", histogram.takeSnapshot().mean());
Expand Down Expand Up @@ -244,6 +264,7 @@ public void start(String description) throws Exception {
.append(", ");
}
builder.append(formatLatency.apply("latency", latency)).append(", ");
publishBatchSizeCallback.accept(builder);
builder.append(formatChunkSize.apply(chunkSize));
this.out.println(builder);
String memoryReport = this.memoryReportSupplier.get();
Expand Down Expand Up @@ -299,6 +320,7 @@ public void start(String description) throws Exception {
.append(", ");
}
builder.append(formatLatencySummary.apply("latency", latency)).append(", ");
publishBatchSizeCallback.accept(builder);
builder.append(formatChunkSize.apply(chunkSize));
this.out.println();
this.out.println(builder);
Expand Down
18 changes: 17 additions & 1 deletion src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,18 @@ public class StreamPerfTest implements Callable<Integer> {
defaultValue = "false")
private boolean noDevMode;

@CommandLine.Option(
names = {"--dynamic-batch-size", "-dbs"},
description = "use dynamic batch size for publishing",
defaultValue = "false")
private boolean dynamicBatch;

@CommandLine.Option(
names = {"--batch-size-metric", "-bsm"},
description = "display batch size",
defaultValue = "false")
private boolean includeBatchSizeMetric;

static class InstanceSyncOptions {

@CommandLine.Option(
Expand Down Expand Up @@ -689,7 +701,9 @@ public Integer call() throws Exception {
.tags(tags)
.register(meterRegistry);
}
this.metricsCollector = new PerformanceMicrometerMetricsCollector(meterRegistry, metricsPrefix);
this.metricsCollector =
new PerformanceMicrometerMetricsCollector(
meterRegistry, metricsPrefix, this.includeBatchSizeMetric);

Counter producerConfirm = meterRegistry.counter(metricsPrefix + ".producer_confirmed");

Expand Down Expand Up @@ -751,6 +765,7 @@ public Integer call() throws Exception {
metricsPrefix,
this.summaryFile,
this.includeByteRates,
this.includeBatchSizeMetric,
this.confirmLatency,
memoryReportSupplier,
this.out);
Expand Down Expand Up @@ -972,6 +987,7 @@ public Integer call() throws Exception {
ProducerBuilder producerBuilder =
environment
.producerBuilder()
.dynamicBatch(this.dynamicBatch)
.batchPublishingDelay(ofMillis(this.batchPublishingDelay));

String producerName = this.producerNameStrategy.apply(stream, i + 1);
Expand Down
27 changes: 23 additions & 4 deletions src/main/java/com/rabbitmq/stream/perf/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.net.ssl.KeyManager;
Expand Down Expand Up @@ -827,8 +825,23 @@ public String apply(String stream, Integer index) {

static class PerformanceMicrometerMetricsCollector extends MicrometerMetricsCollector {

public PerformanceMicrometerMetricsCollector(MeterRegistry registry, String prefix) {
private final IntConsumer publisherCallback;

public PerformanceMicrometerMetricsCollector(
MeterRegistry registry, String prefix, boolean batchSize) {
super(registry, prefix);
if (batchSize) {
DistributionSummary publishBatchSize =
DistributionSummary.builder(prefix + ".publish_batch_size")
.description("publish batch size")
.publishPercentiles(0.5, 0.75, 0.95, 0.99)
.distributionStatisticExpiry(ofSeconds(1))
.serviceLevelObjectives()
.register(registry);
this.publisherCallback = publishBatchSize::record;
} else {
this.publisherCallback = ignored -> {};
}
}

@Override
Expand All @@ -849,6 +862,12 @@ protected DistributionSummary createChunkSizeDistributionSummary(
.register(registry);
}

@Override
public void publish(int count) {
super.publish(count);
this.publisherCallback.accept(count);
}

@Override
public void chunk(int entriesCount) {
this.chunkSize.record(entriesCount);
Expand Down

0 comments on commit 0192196

Please sign in to comment.