Skip to content

Commit

Permalink
refactor(tools/perf): retry sending messages in when waiting topics r…
Browse files Browse the repository at this point in the history
…eady

Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 committed Nov 7, 2024
1 parent 42debe7 commit aad2094
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 8 deletions.
23 changes: 19 additions & 4 deletions tools/src/main/java/org/apache/kafka/tools/automq/PerfCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

package org.apache.kafka.tools.automq;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.tools.automq.perf.ConsumerService;
import org.apache.kafka.tools.automq.perf.PerfConfig;
import org.apache.kafka.tools.automq.perf.ProducerService;
Expand All @@ -31,9 +32,12 @@
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

Expand All @@ -51,7 +55,13 @@ public class PerfCommand implements AutoCloseable {
private final ProducerService producerService;
private final ConsumerService consumerService;
private final Stats stats = new Stats();
/**
* Partitions that are ready to be consumed.
* Only used during the initial topic readiness check, which is, {@link #preparing} is true.
*/
private final Set<TopicPartition> readyPartitions = Collections.newSetFromMap(new ConcurrentHashMap<>());

private volatile boolean preparing = true;
private volatile boolean running = true;

public static void main(String[] args) throws Exception {
Expand Down Expand Up @@ -98,6 +108,8 @@ private void run() {
List<byte[]> payloads = randomPayloads(config.recordSize, config.randomRatio, config.randomPoolSize);
producerService.start(payloads, config.sendRate);

preparing = false;

if (config.warmupDurationMinutes > 0) {
LOGGER.info("Warming up for {} minutes...", config.warmupDurationMinutes);
long warmupStart = System.nanoTime();
Expand Down Expand Up @@ -148,23 +160,26 @@ private void messageSent(int size, long sendTimeNanos, Exception exception) {
}
}

private void messageReceived(byte[] payload, long sendTimeNanos) {
private void messageReceived(TopicPartition topicPartition, byte[] payload, long sendTimeNanos) {
if (preparing) {
readyPartitions.add(topicPartition);
}
stats.messageReceived(payload.length, sendTimeNanos);
}

private void waitTopicsReady() {
int sent = producerService.probe();
long start = System.nanoTime();
boolean ready = false;
while (System.nanoTime() < start + TOPIC_READY_TIMEOUT_NANOS) {
long received = stats.toCumulativeStats().totalMessagesReceived;
int sent = producerService.probe();
int received = readyPartitions.size();
LOGGER.info("Waiting for topics to be ready... sent: {}, received: {}", sent, received);
if (received >= sent) {
ready = true;
break;
}
try {
Thread.sleep(2000);
Thread.sleep(TimeUnit.SECONDS.toMillis(5));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,11 @@ public interface ConsumerCallback {
/**
* Called when a message is received.
*
* @param payload the received message payload
* @param sendTimeNanos the time in nanoseconds when the message was sent
* @param topicPartition the topic partition of the received message
* @param payload the received message payload
* @param sendTimeNanos the time in nanoseconds when the message was sent
*/
void messageReceived(byte[] payload, long sendTimeNanos);
void messageReceived(TopicPartition topicPartition, byte[] payload, long sendTimeNanos);
}

public static class ConsumersConfig {
Expand Down Expand Up @@ -306,7 +307,8 @@ private void pollRecords(KafkaConsumer<String, byte[]> consumer, ConsumerCallbac
ConsumerRecords<String, byte[]> records = consumer.poll(POLL_TIMEOUT);
for (ConsumerRecord<String, byte[]> record : records) {
long sendTimeNanos = Long.parseLong(new String(record.headers().lastHeader(HEADER_KEY_SEND_TIME_NANOS).value(), HEADER_KEY_CHARSET));
callback.messageReceived(record.value(), sendTimeNanos);
TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
callback.messageReceived(topicPartition, record.value(), sendTimeNanos);
}
} catch (InterruptException e) {
// ignore, as we are closing
Expand Down

0 comments on commit aad2094

Please sign in to comment.