diff --git a/tools/src/main/java/org/apache/kafka/tools/automq/perf/ConsumerService.java b/tools/src/main/java/org/apache/kafka/tools/automq/perf/ConsumerService.java index faab997a98..7dda842a3c 100644 --- a/tools/src/main/java/org/apache/kafka/tools/automq/perf/ConsumerService.java +++ b/tools/src/main/java/org/apache/kafka/tools/automq/perf/ConsumerService.java @@ -11,7 +11,9 @@ package org.apache.kafka.tools.automq.perf; +import java.util.concurrent.ExecutionException; import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsResult; import org.apache.kafka.clients.admin.ListOffsetsResult; import org.apache.kafka.clients.admin.OffsetSpec; @@ -21,6 +23,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.InterruptException; @@ -41,7 +44,6 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -63,9 +65,9 @@ public class ConsumerService implements AutoCloseable { public ConsumerService(String bootstrapServer) { Properties properties = new Properties(); - properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); - properties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, (int) TimeUnit.SECONDS.toMillis(300)); - properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); + properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, (int) TimeUnit.MINUTES.toMillis(2)); + properties.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, (int) TimeUnit.MINUTES.toMillis(10)); this.admin = Admin.create(properties); this.groupSuffix = new SimpleDateFormat("HHmmss").format(System.currentTimeMillis()); } @@ -84,7 +86,7 @@ public int createConsumers(List topics, ConsumersConfig config) { for (int g = 0; g < config.groupsPerTopic; g++) { Group group = new Group(g, config.consumersPerGroup, topics, config); groups.add(group); - count += group.size(); + count += group.consumerCount(); } return count; } @@ -106,12 +108,12 @@ public void resume() { } public void resetOffset(long startMillis, long intervalMillis) { - AtomicLong start = new AtomicLong(startMillis); - CompletableFuture.allOf( - groups.stream() - .map(group -> group.seek(start.getAndAdd(intervalMillis))) - .toArray(CompletableFuture[]::new) - ).join(); + AtomicLong timestamp = new AtomicLong(startMillis); + for (int i = 0, size = groups.size(); i < size; i++) { + Group group = groups.get(i); + group.seek(timestamp.getAndAdd(intervalMillis)); + LOGGER.info("Reset consumer group offsets: {}/{}", i + 1, size); + } } @Override @@ -181,20 +183,27 @@ public void resume() { consumers().forEach(Consumer::resume); } - public CompletableFuture seek(long timestamp) { - return admin.listOffsets(listOffsetsRequest(timestamp)) - .all() - .toCompletionStage() - .toCompletableFuture() - .thenCompose(offsetMap -> CompletableFuture.allOf(consumers.keySet().stream() - .map(topic -> admin.alterConsumerGroupOffsets(groupId(topic), resetOffsetsRequest(topic, offsetMap))) + public void seek(long timestamp) { + // assuming all partitions approximately have the same offset at the given timestamp + TopicPartition firstPartition = consumers.keySet().iterator().next().firstPartition(); + try { + ListOffsetsResult.ListOffsetsResultInfo offsetInfo = admin.listOffsets(Map.of(firstPartition, OffsetSpec.forTimestamp(timestamp))) + .partitionResult(firstPartition) + .get(); + KafkaFuture.allOf(consumers.keySet().stream() + .map(topic -> admin.alterConsumerGroupOffsets(groupId(topic), resetOffsetsRequest(topic, offsetInfo.offset()))) .map(AlterConsumerGroupOffsetsResult::all) - .map(KafkaFuture::toCompletionStage) - .map(CompletionStage::toCompletableFuture) - .toArray(CompletableFuture[]::new))); + .toArray(KafkaFuture[]::new)).get(); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException("Failed to list or reset consumer offsets", e); + } } - public int size() { + public int consumerGroupCount() { + return consumers.size(); + } + + public int consumerCount() { return consumers.values().stream() .mapToInt(List::size) .sum(); @@ -212,6 +221,7 @@ private Properties toProperties(ConsumersConfig config) { properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.LATEST.toString()); return properties; } @@ -230,23 +240,11 @@ private String groupId(Topic topic) { return String.format("sub-%s-%s-%03d", topic.name, groupSuffix, index); } - private Map listOffsetsRequest(long timestamp) { - return consumers.keySet().stream() - .map(Topic::partitions) - .flatMap(List::stream) + private Map resetOffsetsRequest(Topic topic, long offset) { + return topic.partitions().stream() .collect(Collectors.toMap( partition -> partition, - partition -> OffsetSpec.forTimestamp(timestamp) - )); - } - - private Map resetOffsetsRequest(Topic topic, - Map offsetMap) { - return offsetMap.entrySet().stream() - .filter(entry -> topic.containsPartition(entry.getKey())) - .collect(Collectors.toMap( - Map.Entry::getKey, - entry -> new OffsetAndMetadata(entry.getValue().offset()) + ignore -> new OffsetAndMetadata(offset) )); } } diff --git a/tools/src/main/java/org/apache/kafka/tools/automq/perf/PerfConfig.java b/tools/src/main/java/org/apache/kafka/tools/automq/perf/PerfConfig.java index f85160105a..8e46482508 100644 --- a/tools/src/main/java/org/apache/kafka/tools/automq/perf/PerfConfig.java +++ b/tools/src/main/java/org/apache/kafka/tools/automq/perf/PerfConfig.java @@ -30,6 +30,7 @@ import static net.sourceforge.argparse4j.impl.Arguments.storeTrue; import static org.apache.kafka.tools.automq.perf.PerfConfig.IntegerArgumentType.nonNegativeInteger; +import static org.apache.kafka.tools.automq.perf.PerfConfig.IntegerArgumentType.notLessThan; import static org.apache.kafka.tools.automq.perf.PerfConfig.IntegerArgumentType.positiveInteger; public class PerfConfig { @@ -204,7 +205,7 @@ public static ArgumentParser parser() { .help("The send rate in messages per second during catchup. If not set, the send rate will be used."); parser.addArgument("-b", "--backlog-duration") .setDefault(0) - .type(nonNegativeInteger()) + .type(notLessThan(300)) .dest("backlogDurationSeconds") .metavar("BACKLOG_DURATION_SECONDS") .help("The backlog duration in seconds, and zero means no backlog. Should not be less than GROUPS_PER_TOPIC * GROUP_START_DELAY_SECONDS."); @@ -316,6 +317,10 @@ public static IntegerArgumentType nonNegativeInteger() { public static IntegerArgumentType positiveInteger() { return new IntegerArgumentType(value -> value <= 0 ? "expected a positive integer, but got " + value : null); } + + public static IntegerArgumentType notLessThan(int min) { + return new IntegerArgumentType(value -> value < min ? "expected an integer not less than " + min + ", but got " + value : null); + } } @FunctionalInterface diff --git a/tools/src/main/java/org/apache/kafka/tools/automq/perf/TopicService.java b/tools/src/main/java/org/apache/kafka/tools/automq/perf/TopicService.java index aad521d01d..a0ed549c0e 100644 --- a/tools/src/main/java/org/apache/kafka/tools/automq/perf/TopicService.java +++ b/tools/src/main/java/org/apache/kafka/tools/automq/perf/TopicService.java @@ -128,6 +128,10 @@ public List partitions() { .collect(Collectors.toList()); } + public TopicPartition firstPartition() { + return new TopicPartition(name, 0); + } + public boolean containsPartition(TopicPartition partition) { return name.equals(partition.topic()) && partition.partition() < partitions; }