Skip to content

Commit

Permalink
perf(tools/perf): assuming all partitions have the same offset at the…
Browse files Browse the repository at this point in the history
… same time (#2127)

* feat(tools/perf): log progress on resetting offsets

Signed-off-by: Ning Yu <[email protected]>

* fix: reset timeouts

Signed-off-by: Ning Yu <[email protected]>

* feat: increase the log interval

Signed-off-by: Ning Yu <[email protected]>

* perf(tools/perf): assuming all partitions have the same offset at the same time

Signed-off-by: Ning Yu <[email protected]>

* feat: limit the min of --backlog-duration

Signed-off-by: Ning Yu <[email protected]>

---------

Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 authored Nov 7, 2024
1 parent 7f9a631 commit c243eaf
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@

package org.apache.kafka.tools.automq.perf;

import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.OffsetSpec;
Expand All @@ -21,6 +23,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
Expand All @@ -41,7 +44,6 @@
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand All @@ -63,9 +65,9 @@ public class ConsumerService implements AutoCloseable {

public ConsumerService(String bootstrapServer) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
properties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, (int) TimeUnit.SECONDS.toMillis(300));
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, (int) TimeUnit.MINUTES.toMillis(2));
properties.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, (int) TimeUnit.MINUTES.toMillis(10));
this.admin = Admin.create(properties);
this.groupSuffix = new SimpleDateFormat("HHmmss").format(System.currentTimeMillis());
}
Expand All @@ -84,7 +86,7 @@ public int createConsumers(List<Topic> topics, ConsumersConfig config) {
for (int g = 0; g < config.groupsPerTopic; g++) {
Group group = new Group(g, config.consumersPerGroup, topics, config);
groups.add(group);
count += group.size();
count += group.consumerCount();
}
return count;
}
Expand All @@ -106,12 +108,12 @@ public void resume() {
}

public void resetOffset(long startMillis, long intervalMillis) {
AtomicLong start = new AtomicLong(startMillis);
CompletableFuture.allOf(
groups.stream()
.map(group -> group.seek(start.getAndAdd(intervalMillis)))
.toArray(CompletableFuture[]::new)
).join();
AtomicLong timestamp = new AtomicLong(startMillis);
for (int i = 0, size = groups.size(); i < size; i++) {
Group group = groups.get(i);
group.seek(timestamp.getAndAdd(intervalMillis));
LOGGER.info("Reset consumer group offsets: {}/{}", i + 1, size);
}
}

@Override
Expand Down Expand Up @@ -181,20 +183,27 @@ public void resume() {
consumers().forEach(Consumer::resume);
}

public CompletableFuture<Void> seek(long timestamp) {
return admin.listOffsets(listOffsetsRequest(timestamp))
.all()
.toCompletionStage()
.toCompletableFuture()
.thenCompose(offsetMap -> CompletableFuture.allOf(consumers.keySet().stream()
.map(topic -> admin.alterConsumerGroupOffsets(groupId(topic), resetOffsetsRequest(topic, offsetMap)))
public void seek(long timestamp) {
// assuming all partitions approximately have the same offset at the given timestamp
TopicPartition firstPartition = consumers.keySet().iterator().next().firstPartition();
try {
ListOffsetsResult.ListOffsetsResultInfo offsetInfo = admin.listOffsets(Map.of(firstPartition, OffsetSpec.forTimestamp(timestamp)))
.partitionResult(firstPartition)
.get();
KafkaFuture.allOf(consumers.keySet().stream()
.map(topic -> admin.alterConsumerGroupOffsets(groupId(topic), resetOffsetsRequest(topic, offsetInfo.offset())))
.map(AlterConsumerGroupOffsetsResult::all)
.map(KafkaFuture::toCompletionStage)
.map(CompletionStage::toCompletableFuture)
.toArray(CompletableFuture[]::new)));
.toArray(KafkaFuture[]::new)).get();
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException("Failed to list or reset consumer offsets", e);
}
}

public int size() {
public int consumerGroupCount() {
return consumers.size();
}

public int consumerCount() {
return consumers.values().stream()
.mapToInt(List::size)
.sum();
Expand All @@ -212,6 +221,7 @@ private Properties toProperties(ConsumersConfig config) {
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.LATEST.toString());
return properties;
}

Expand All @@ -230,23 +240,11 @@ private String groupId(Topic topic) {
return String.format("sub-%s-%s-%03d", topic.name, groupSuffix, index);
}

private Map<TopicPartition, OffsetSpec> listOffsetsRequest(long timestamp) {
return consumers.keySet().stream()
.map(Topic::partitions)
.flatMap(List::stream)
private Map<TopicPartition, OffsetAndMetadata> resetOffsetsRequest(Topic topic, long offset) {
return topic.partitions().stream()
.collect(Collectors.toMap(
partition -> partition,
partition -> OffsetSpec.forTimestamp(timestamp)
));
}

private Map<TopicPartition, OffsetAndMetadata> resetOffsetsRequest(Topic topic,
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> offsetMap) {
return offsetMap.entrySet().stream()
.filter(entry -> topic.containsPartition(entry.getKey()))
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> new OffsetAndMetadata(entry.getValue().offset())
ignore -> new OffsetAndMetadata(offset)
));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
import static org.apache.kafka.tools.automq.perf.PerfConfig.IntegerArgumentType.nonNegativeInteger;
import static org.apache.kafka.tools.automq.perf.PerfConfig.IntegerArgumentType.notLessThan;
import static org.apache.kafka.tools.automq.perf.PerfConfig.IntegerArgumentType.positiveInteger;

public class PerfConfig {
Expand Down Expand Up @@ -204,7 +205,7 @@ public static ArgumentParser parser() {
.help("The send rate in messages per second during catchup. If not set, the send rate will be used.");
parser.addArgument("-b", "--backlog-duration")
.setDefault(0)
.type(nonNegativeInteger())
.type(notLessThan(300))
.dest("backlogDurationSeconds")
.metavar("BACKLOG_DURATION_SECONDS")
.help("The backlog duration in seconds, and zero means no backlog. Should not be less than GROUPS_PER_TOPIC * GROUP_START_DELAY_SECONDS.");
Expand Down Expand Up @@ -316,6 +317,10 @@ public static IntegerArgumentType nonNegativeInteger() {
public static IntegerArgumentType positiveInteger() {
return new IntegerArgumentType(value -> value <= 0 ? "expected a positive integer, but got " + value : null);
}

public static IntegerArgumentType notLessThan(int min) {
return new IntegerArgumentType(value -> value < min ? "expected an integer not less than " + min + ", but got " + value : null);
}
}

@FunctionalInterface
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ public List<TopicPartition> partitions() {
.collect(Collectors.toList());
}

public TopicPartition firstPartition() {
return new TopicPartition(name, 0);
}

public boolean containsPartition(TopicPartition partition) {
return name.equals(partition.topic()) && partition.partition() < partitions;
}
Expand Down

0 comments on commit c243eaf

Please sign in to comment.