Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support backpressure #2236

Merged
merged 16 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,13 @@ public void config(MetricConfig config) {
this.config = config;
}
}

// AutoMQ inject start
/**
* A public method to expose the {@link #measurableValue} method.
*/
public double measurableValueV2(long timeMs) {
return measurableValue(timeMs);
}
// AutoMQ inject end
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,9 @@ public enum ClientQuotaType {
PRODUCE,
FETCH,
REQUEST,
// AutoMQ for Kafka inject start
SLOW_FETCH,
REQUEST_RATE,
// AutoMQ for Kafka inject end
CONTROLLER_MUTATION
}
5 changes: 3 additions & 2 deletions core/src/main/java/kafka/autobalancer/LoadRetriever.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.controller.Controller;
import org.apache.kafka.controller.ControllerRequestContext;
import org.apache.kafka.server.config.QuotaConfigs;

import com.automq.stream.utils.LogContext;

Expand Down Expand Up @@ -111,7 +112,7 @@ public LoadRetriever(AutoBalancerControllerConfig config, Controller controller,
metricReporterTopicPartition = config.getInt(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_METRICS_TOPIC_NUM_PARTITIONS_CONFIG);
metricReporterTopicRetentionTime = config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_METRICS_TOPIC_RETENTION_MS_CONFIG);
consumerPollTimeout = config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_CONSUMER_POLL_TIMEOUT);
consumerClientIdPrefix = config.getString(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_CONSUMER_CLIENT_ID_PREFIX);
consumerClientIdPrefix = QuotaConfigs.INTERNAL_CLIENT_ID_PREFIX + config.getString(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_CONSUMER_CLIENT_ID_PREFIX);
consumerRetryBackOffMs = config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_CONSUMER_RETRY_BACKOFF_MS);
}

Expand Down Expand Up @@ -154,7 +155,7 @@ protected Properties buildConsumerProps(String bootstrapServer) {
Properties consumerProps = new Properties();
long randomToken = RANDOM.nextLong();
consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, consumerClientIdPrefix + "-consumer-" + randomToken);
consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, consumerClientIdPrefix + randomToken);
consumerProps.setProperty(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, Long.toString(consumerRetryBackOffMs));
consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
consumerProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class AutoBalancerControllerConfig extends AbstractConfig {
public static final Integer DEFAULT_AUTO_BALANCER_CONTROLLER_METRICS_TOPIC_NUM_PARTITIONS_CONFIG = 1;
public static final long DEFAULT_AUTO_BALANCER_CONTROLLER_METRICS_TOPIC_RETENTION_MS_CONFIG = TimeUnit.MINUTES.toMillis(30);
public static final long DEFAULT_AUTO_BALANCER_CONTROLLER_CONSUMER_POLL_TIMEOUT = 1000L;
public static final String DEFAULT_AUTO_BALANCER_CONTROLLER_CONSUMER_CLIENT_ID_PREFIX = "AutoBalancerControllerConsumer";
public static final String DEFAULT_AUTO_BALANCER_CONTROLLER_CONSUMER_CLIENT_ID_PREFIX = "auto_balancer_controller_consumer_";
public static final long DEFAULT_AUTO_BALANCER_CONTROLLER_CONSUMER_RETRY_BACKOFF_MS = 1000;
public static final long DEFAULT_AUTO_BALANCER_CONTROLLER_ACCEPTED_METRICS_DELAY_MS = Duration.ofMinutes(1).toMillis();
public static final String DEFAULT_AUTO_BALANCER_CONTROLLER_GOALS = new StringJoiner(",")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class AutoBalancerMetricsReporterConfig extends AbstractConfig {
public static final String AUTO_BALANCER_METRICS_REPORTER_LINGER_MS_CONFIG = PREFIX + ProducerConfig.LINGER_MS_CONFIG;
public static final String AUTO_BALANCER_METRICS_REPORTER_BATCH_SIZE_CONFIG = PREFIX + ProducerConfig.BATCH_SIZE_CONFIG;
/* Default values */
public static final String DEFAULT_AUTO_BALANCER_METRICS_REPORTER_PRODUCER_CLIENT_ID = "AutoBalancerMetricsReporterProducer";
public static final String DEFAULT_AUTO_BALANCER_METRICS_REPORTER_PRODUCER_CLIENT_ID = "auto_balancer_metrics_reporter_producer";
public static final long DEFAULT_AUTO_BALANCER_METRICS_REPORTER_INTERVAL_MS = TimeUnit.SECONDS.toMillis(10);
public static final int DEFAULT_AUTO_BALANCER_METRICS_REPORTER_LINGER_MS = (int) TimeUnit.SECONDS.toMillis(1);
public static final int DEFAULT_AUTO_BALANCER_METRICS_BATCH_SIZE = 800 * 1000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.server.config.KRaftConfigs;
import org.apache.kafka.server.config.QuotaConfigs;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;

Expand Down Expand Up @@ -229,7 +230,7 @@ public void configure(Map<String, ?> rawConfigs) {

setIfAbsent(producerProps,
ProducerConfig.CLIENT_ID_CONFIG,
reporterConfig.getString(AutoBalancerMetricsReporterConfig.AUTO_BALANCER_METRICS_REPORTER_PRODUCER_CLIENT_ID));
QuotaConfigs.INTERNAL_CLIENT_ID_PREFIX + reporterConfig.getString(AutoBalancerMetricsReporterConfig.AUTO_BALANCER_METRICS_REPORTER_PRODUCER_CLIENT_ID));
setIfAbsent(producerProps, ProducerConfig.LINGER_MS_CONFIG,
reporterConfig.getLong(AutoBalancerMetricsReporterConfig.AUTO_BALANCER_METRICS_REPORTER_LINGER_MS_CONFIG).toString());
setIfAbsent(producerProps, ProducerConfig.BATCH_SIZE_CONFIG,
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/kafka/automq/AutoMQConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
Expand Down Expand Up @@ -171,6 +172,14 @@ public class AutoMQConfig {
public static final String CLUSTER_ID_CONFIG = "cluster.id";
public static final String CLUSTER_ID_DOC = "If the cluster.id is set, Kafka will auto format the storage.";

public static final String S3_BACK_PRESSURE_ENABLED_CONFIG = "automq.backpressure.enabled";
public static final String S3_BACK_PRESSURE_ENABLED_DOC = "Whether back pressure is enabled";
public static final boolean S3_BACK_PRESSURE_ENABLED_DEFAULT = true;

public static final String S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG = "automq.backpressure.cooldown.ms";
public static final String S3_BACK_PRESSURE_COOLDOWN_MS_DOC = "The cooldown time in milliseconds to wait between two regulator actions";
public static final long S3_BACK_PRESSURE_COOLDOWN_MS_DEFAULT = TimeUnit.SECONDS.toMillis(15);

// Deprecated config start
public static final String S3_ENDPOINT_CONFIG = "s3.endpoint";
public static final String S3_ENDPOINT_DOC = "[DEPRECATED]please use s3.data.buckets. The object storage endpoint, ex. <code>https://s3.us-east-1.amazonaws.com</code>.";
Expand Down Expand Up @@ -255,6 +264,8 @@ public static void define(ConfigDef configDef) {
.define(AutoMQConfig.S3_TELEMETRY_EXPORTER_REPORT_INTERVAL_MS_CONFIG, INT, S3_METRICS_EXPORTER_REPORT_INTERVAL_MS, MEDIUM, AutoMQConfig.S3_TELEMETRY_EXPORTER_REPORT_INTERVAL_MS_DOC)
.define(AutoMQConfig.S3_TELEMETRY_METRICS_EXPORTER_URI_CONFIG, STRING, null, HIGH, AutoMQConfig.S3_TELEMETRY_METRICS_EXPORTER_URI_DOC)
.define(AutoMQConfig.S3_TELEMETRY_METRICS_BASE_LABELS_CONFIG, STRING, null, MEDIUM, AutoMQConfig.S3_TELEMETRY_METRICS_BASE_LABELS_DOC)
.define(AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG, BOOLEAN, AutoMQConfig.S3_BACK_PRESSURE_ENABLED_DEFAULT, MEDIUM, AutoMQConfig.S3_BACK_PRESSURE_ENABLED_DOC)
.define(AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG, LONG, AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_DEFAULT, MEDIUM, AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_DOC)
// Deprecated config start
.define(AutoMQConfig.S3_ENDPOINT_CONFIG, STRING, null, HIGH, AutoMQConfig.S3_ENDPOINT_DOC)
.define(AutoMQConfig.S3_REGION_CONFIG, STRING, null, HIGH, AutoMQConfig.S3_REGION_DOC)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright 2024, AutoMQ HK Limited.
*
* The use of this file is governed by the Business Source License,
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package kafka.automq.backpressure;

import kafka.automq.AutoMQConfig;
import kafka.server.KafkaConfig;

import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.ConfigUtils;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public class BackPressureConfig {

public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of(
AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG,
AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG
);

private volatile boolean enabled;
/**
* The cooldown time in milliseconds to wait between two regulator actions.
*/
private long cooldownMs;

public static BackPressureConfig from(KafkaConfig config) {
return new BackPressureConfig(config.s3BackPressureEnabled(), config.s3BackPressureCooldownMs());
}

public static BackPressureConfig from(Map<String, ?> raw) {
Map<String, Object> configs = new HashMap<>(raw);
return new BackPressureConfig(
ConfigUtils.getBoolean(configs, AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG),
ConfigUtils.getLong(configs, AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG)
);
}

public BackPressureConfig(boolean enabled, long cooldownMs) {
this.enabled = enabled;
this.cooldownMs = cooldownMs;
}

public static void validate(Map<String, ?> raw) throws ConfigException {
Map<String, Object> configs = new HashMap<>(raw);
if (configs.containsKey(AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG)) {
ConfigUtils.getBoolean(configs, AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG);
}
if (configs.containsKey(AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG)) {
validateCooldownMs(ConfigUtils.getLong(configs, AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG));
}
}

public static void validateCooldownMs(long cooldownMs) throws ConfigException {
if (cooldownMs < 0) {
throw new ConfigException(AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG, cooldownMs, "The cooldown time must be non-negative.");
}
}

public void update(Map<String, ?> raw) {
Map<String, Object> configs = new HashMap<>(raw);
if (configs.containsKey(AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG)) {
this.enabled = ConfigUtils.getBoolean(configs, AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG);
}
if (configs.containsKey(AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG)) {
this.cooldownMs = ConfigUtils.getLong(configs, AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG);
}
}

public boolean enabled() {
return enabled;
}

public long cooldownMs() {
return cooldownMs;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2024, AutoMQ HK Limited.
*
* The use of this file is governed by the Business Source License,
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package kafka.automq.backpressure;

import org.apache.kafka.common.Reconfigurable;

/**
* It checks the {@link LoadLevel} of the system and takes actions based on the load level
* to prevent the system from being overwhelmed.
*/
public interface BackPressureManager extends Reconfigurable {

/**
* Start the back pressure manager.
*/
void start();

/**
* Register a checker to check the load level of the system.
* Note: It should be called between {@link #start()} and {@link #shutdown()}.
*/
void registerChecker(Checker checker);

/**
* Shutdown the back pressure manager, and release all resources.
*/
void shutdown();
}
33 changes: 33 additions & 0 deletions core/src/main/java/kafka/automq/backpressure/Checker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2024, AutoMQ HK Limited.
*
* The use of this file is governed by the Business Source License,
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package kafka.automq.backpressure;

/**
* A checker to check the load level of the system periodically.
*/
public interface Checker {

/**
* The source of the checker, which should be unique to identify the checker.
*/
String source();

/**
* Check the load level of the system.
*/
LoadLevel check();

/**
* The interval in milliseconds to check the load level of the system.
*/
long intervalMs();
}
Loading
Loading