Skip to content

Commit

Permalink
feat(tools/perf): support to limit the max poll rate of consumers
Browse files Browse the repository at this point in the history
Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 committed Jan 13, 2025
1 parent 07e3652 commit 9cb9cd8
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 5 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2232,6 +2232,7 @@ project(':tools') {
// AutoMQ inject start
implementation project(':automq-shell')
implementation libs.kafkaAvroSerializer
implementation libs.bucket4j
// AutoMQ inject end

implementation project(':storage')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private void run() {

LOGGER.info("Creating consumers...");
int consumers = consumerService.createConsumers(topics, config.consumersConfig());
consumerService.start(this::messageReceived);
consumerService.start(this::messageReceived, config.maxPollRate);
LOGGER.info("Created {} consumers, took {} ms", consumers, timer.elapsedAndResetAs(TimeUnit.MILLISECONDS));

LOGGER.info("Creating producers...");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import io.github.bucket4j.BlockingBucket;
import io.github.bucket4j.Bucket;

import static org.apache.kafka.tools.automq.perf.ProducerService.HEADER_KEY_CHARSET;
import static org.apache.kafka.tools.automq.perf.ProducerService.HEADER_KEY_SEND_TIME_NANOS;

Expand Down Expand Up @@ -91,10 +94,15 @@ public int createConsumers(List<Topic> topics, ConsumersConfig config) {
return count;
}

public void start(ConsumerCallback callback) {
public void start(ConsumerCallback callback, int pollRate) {
BlockingBucket bucket = rateLimitBucket(pollRate);
ConsumerCallback callbackWithRateLimit = (tp, p, st) -> {
callback.messageReceived(tp, p, st);
bucket.consume(1);
};
CompletableFuture.allOf(
groups.stream()
.map(group -> group.start(callback))
.map(group -> group.start(callbackWithRateLimit))
.toArray(CompletableFuture[]::new)
).join();
}
Expand Down Expand Up @@ -122,6 +130,15 @@ public int consumerCount() {
.sum();
}

private BlockingBucket rateLimitBucket(int rateLimit) {
return Bucket.builder()
.addLimit(limit -> limit
.capacity(rateLimit / 10)
.refillGreedy(rateLimit, Duration.ofSeconds(1))
).build()
.asBlocking();
}

@Override
public void close() {
admin.close();
Expand All @@ -137,7 +154,7 @@ public interface ConsumerCallback {
* @param payload the received message payload
* @param sendTimeNanos the time in nanoseconds when the message was sent
*/
void messageReceived(TopicPartition topicPartition, byte[] payload, long sendTimeNanos);
void messageReceived(TopicPartition topicPartition, byte[] payload, long sendTimeNanos) throws InterruptedException;
}

public static class ConsumersConfig {
Expand Down Expand Up @@ -316,7 +333,7 @@ private void pollRecords(KafkaConsumer<String, byte[]> consumer, ConsumerCallbac
TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
callback.messageReceived(topicPartition, record.value(), sendTimeNanos);
}
} catch (InterruptException e) {
} catch (InterruptException | InterruptedException e) {
// ignore, as we are closing
} catch (Exception e) {
LOGGER.warn("exception occur while consuming message", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.ThreadLocalRandom;

import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
import static org.apache.kafka.tools.automq.perf.PerfConfig.IntegerArgumentType.between;
import static org.apache.kafka.tools.automq.perf.PerfConfig.IntegerArgumentType.nonNegativeInteger;
import static org.apache.kafka.tools.automq.perf.PerfConfig.IntegerArgumentType.notLessThan;
import static org.apache.kafka.tools.automq.perf.PerfConfig.IntegerArgumentType.positiveInteger;
Expand All @@ -53,6 +54,7 @@ public class PerfConfig {
public final int randomPoolSize;
public final int sendRate;
public final int sendRateDuringCatchup;
public final int maxPollRate;
public final int backlogDurationSeconds;
public final int groupStartDelaySeconds;
public final int warmupDurationMinutes;
Expand Down Expand Up @@ -92,6 +94,7 @@ public PerfConfig(String[] args) {
randomPoolSize = ns.getInt("randomPoolSize");
sendRate = ns.getInt("sendRate");
sendRateDuringCatchup = ns.getInt("sendRateDuringCatchup") == null ? sendRate : ns.getInt("sendRateDuringCatchup");
maxPollRate = ns.getInt("maxPollRate");
backlogDurationSeconds = ns.getInt("backlogDurationSeconds");
groupStartDelaySeconds = ns.getInt("groupStartDelaySeconds");
warmupDurationMinutes = ns.getInt("warmupDurationMinutes");
Expand Down Expand Up @@ -209,6 +212,12 @@ public static ArgumentParser parser() {
.dest("sendRateDuringCatchup")
.metavar("SEND_RATE_DURING_CATCHUP")
.help("The send rate in messages per second during catchup. If not set, the send rate will be used.");
parser.addArgument("-m", "--max-poll-rate")
.setDefault(1_000_000_000)
.type(between(0, 1_000_000_000))
.dest("maxPollRate")
.metavar("MAX_POLL_RATE")
.help("The max poll rate in messages per second.");
parser.addArgument("-b", "--backlog-duration")
.setDefault(0)
.type(notLessThan(300))
Expand Down Expand Up @@ -351,6 +360,10 @@ public static IntegerArgumentType positiveInteger() {
public static IntegerArgumentType notLessThan(int min) {
return new IntegerArgumentType(value -> value < min ? "expected an integer not less than " + min + ", but got " + value : null);
}

public static IntegerArgumentType between(int min, int max) {
return new IntegerArgumentType(value -> value < min || value > max ? "expected an integer between " + min + " and " + max + ", but got " + value : null);
}
}

@FunctionalInterface
Expand Down

0 comments on commit 9cb9cd8

Please sign in to comment.