diff --git a/tools/src/main/java/org/apache/kafka/tools/automq/PerfCommand.java b/tools/src/main/java/org/apache/kafka/tools/automq/PerfCommand.java index 1145489522..b7a50be5d0 100644 --- a/tools/src/main/java/org/apache/kafka/tools/automq/PerfCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/automq/PerfCommand.java @@ -11,6 +11,7 @@ package org.apache.kafka.tools.automq; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.tools.automq.perf.ConsumerService; import org.apache.kafka.tools.automq.perf.PerfConfig; import org.apache.kafka.tools.automq.perf.ProducerService; @@ -31,9 +32,12 @@ import java.text.SimpleDateFormat; import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.Date; import java.util.List; import java.util.Random; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; @@ -51,7 +55,13 @@ public class PerfCommand implements AutoCloseable { private final ProducerService producerService; private final ConsumerService consumerService; private final Stats stats = new Stats(); + /** + * Partitions that are ready to be consumed. + * Only used during the initial topic readiness check, which is, {@link #preparing} is true. + */ + private final Set readyPartitions = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private volatile boolean preparing = true; private volatile boolean running = true; public static void main(String[] args) throws Exception { @@ -98,6 +108,8 @@ private void run() { List payloads = randomPayloads(config.recordSize, config.randomRatio, config.randomPoolSize); producerService.start(payloads, config.sendRate); + preparing = false; + if (config.warmupDurationMinutes > 0) { LOGGER.info("Warming up for {} minutes...", config.warmupDurationMinutes); long warmupStart = System.nanoTime(); @@ -148,23 +160,26 @@ private void messageSent(int size, long sendTimeNanos, Exception exception) { } } - private void messageReceived(byte[] payload, long sendTimeNanos) { + private void messageReceived(TopicPartition topicPartition, byte[] payload, long sendTimeNanos) { + if (preparing) { + readyPartitions.add(topicPartition); + } stats.messageReceived(payload.length, sendTimeNanos); } private void waitTopicsReady() { - int sent = producerService.probe(); long start = System.nanoTime(); boolean ready = false; while (System.nanoTime() < start + TOPIC_READY_TIMEOUT_NANOS) { - long received = stats.toCumulativeStats().totalMessagesReceived; + int sent = producerService.probe(); + int received = readyPartitions.size(); LOGGER.info("Waiting for topics to be ready... sent: {}, received: {}", sent, received); if (received >= sent) { ready = true; break; } try { - Thread.sleep(2000); + Thread.sleep(TimeUnit.SECONDS.toMillis(5)); } catch (InterruptedException e) { throw new RuntimeException(e); } diff --git a/tools/src/main/java/org/apache/kafka/tools/automq/perf/ConsumerService.java b/tools/src/main/java/org/apache/kafka/tools/automq/perf/ConsumerService.java index 7dda842a3c..a6ea3739ef 100644 --- a/tools/src/main/java/org/apache/kafka/tools/automq/perf/ConsumerService.java +++ b/tools/src/main/java/org/apache/kafka/tools/automq/perf/ConsumerService.java @@ -127,10 +127,11 @@ public interface ConsumerCallback { /** * Called when a message is received. * - * @param payload the received message payload - * @param sendTimeNanos the time in nanoseconds when the message was sent + * @param topicPartition the topic partition of the received message + * @param payload the received message payload + * @param sendTimeNanos the time in nanoseconds when the message was sent */ - void messageReceived(byte[] payload, long sendTimeNanos); + void messageReceived(TopicPartition topicPartition, byte[] payload, long sendTimeNanos); } public static class ConsumersConfig { @@ -306,7 +307,8 @@ private void pollRecords(KafkaConsumer consumer, ConsumerCallbac ConsumerRecords records = consumer.poll(POLL_TIMEOUT); for (ConsumerRecord record : records) { long sendTimeNanos = Long.parseLong(new String(record.headers().lastHeader(HEADER_KEY_SEND_TIME_NANOS).value(), HEADER_KEY_CHARSET)); - callback.messageReceived(record.value(), sendTimeNanos); + TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition()); + callback.messageReceived(topicPartition, record.value(), sendTimeNanos); } } catch (InterruptException e) { // ignore, as we are closing