diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java index 1d31855db5..2d21150b21 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java @@ -129,4 +129,13 @@ public void config(MetricConfig config) { this.config = config; } } + + // AutoMQ inject start + /** + * A public method to expose the {@link #measurableValue} method. + */ + public double measurableValueV2(long timeMs) { + return measurableValue(timeMs); + } + // AutoMQ inject end } diff --git a/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaType.java b/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaType.java index 5b0828a082..d5f8b27ced 100644 --- a/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaType.java +++ b/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaType.java @@ -23,5 +23,9 @@ public enum ClientQuotaType { PRODUCE, FETCH, REQUEST, + // AutoMQ for Kafka inject start + SLOW_FETCH, + REQUEST_RATE, + // AutoMQ for Kafka inject end CONTROLLER_MUTATION } diff --git a/core/src/main/java/kafka/autobalancer/LoadRetriever.java b/core/src/main/java/kafka/autobalancer/LoadRetriever.java index 3505a33faa..98b92aa2d3 100644 --- a/core/src/main/java/kafka/autobalancer/LoadRetriever.java +++ b/core/src/main/java/kafka/autobalancer/LoadRetriever.java @@ -49,6 +49,7 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.controller.Controller; import org.apache.kafka.controller.ControllerRequestContext; +import org.apache.kafka.server.config.QuotaConfigs; import com.automq.stream.utils.LogContext; @@ -111,7 +112,7 @@ public LoadRetriever(AutoBalancerControllerConfig config, Controller controller, metricReporterTopicPartition = config.getInt(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_METRICS_TOPIC_NUM_PARTITIONS_CONFIG); metricReporterTopicRetentionTime = config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_METRICS_TOPIC_RETENTION_MS_CONFIG); consumerPollTimeout = config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_CONSUMER_POLL_TIMEOUT); - consumerClientIdPrefix = config.getString(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_CONSUMER_CLIENT_ID_PREFIX); + consumerClientIdPrefix = QuotaConfigs.INTERNAL_CLIENT_ID_PREFIX + config.getString(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_CONSUMER_CLIENT_ID_PREFIX); consumerRetryBackOffMs = config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_CONSUMER_RETRY_BACKOFF_MS); } @@ -154,7 +155,7 @@ protected Properties buildConsumerProps(String bootstrapServer) { Properties consumerProps = new Properties(); long randomToken = RANDOM.nextLong(); consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); - consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, consumerClientIdPrefix + "-consumer-" + randomToken); + consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, consumerClientIdPrefix + randomToken); consumerProps.setProperty(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, Long.toString(consumerRetryBackOffMs)); consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); consumerProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); diff --git a/core/src/main/java/kafka/autobalancer/config/AutoBalancerControllerConfig.java b/core/src/main/java/kafka/autobalancer/config/AutoBalancerControllerConfig.java index 96057c141e..1a37ad85b7 100644 --- a/core/src/main/java/kafka/autobalancer/config/AutoBalancerControllerConfig.java +++ b/core/src/main/java/kafka/autobalancer/config/AutoBalancerControllerConfig.java @@ -53,7 +53,7 @@ public class AutoBalancerControllerConfig extends AbstractConfig { public static final Integer DEFAULT_AUTO_BALANCER_CONTROLLER_METRICS_TOPIC_NUM_PARTITIONS_CONFIG = 1; public static final long DEFAULT_AUTO_BALANCER_CONTROLLER_METRICS_TOPIC_RETENTION_MS_CONFIG = TimeUnit.MINUTES.toMillis(30); public static final long DEFAULT_AUTO_BALANCER_CONTROLLER_CONSUMER_POLL_TIMEOUT = 1000L; - public static final String DEFAULT_AUTO_BALANCER_CONTROLLER_CONSUMER_CLIENT_ID_PREFIX = "AutoBalancerControllerConsumer"; + public static final String DEFAULT_AUTO_BALANCER_CONTROLLER_CONSUMER_CLIENT_ID_PREFIX = "auto_balancer_controller_consumer_"; public static final long DEFAULT_AUTO_BALANCER_CONTROLLER_CONSUMER_RETRY_BACKOFF_MS = 1000; public static final long DEFAULT_AUTO_BALANCER_CONTROLLER_ACCEPTED_METRICS_DELAY_MS = Duration.ofMinutes(1).toMillis(); public static final String DEFAULT_AUTO_BALANCER_CONTROLLER_GOALS = new StringJoiner(",") diff --git a/core/src/main/java/kafka/autobalancer/config/AutoBalancerMetricsReporterConfig.java b/core/src/main/java/kafka/autobalancer/config/AutoBalancerMetricsReporterConfig.java index 4819274dd8..6c944ea488 100644 --- a/core/src/main/java/kafka/autobalancer/config/AutoBalancerMetricsReporterConfig.java +++ b/core/src/main/java/kafka/autobalancer/config/AutoBalancerMetricsReporterConfig.java @@ -36,7 +36,7 @@ public class AutoBalancerMetricsReporterConfig extends AbstractConfig { public static final String AUTO_BALANCER_METRICS_REPORTER_LINGER_MS_CONFIG = PREFIX + ProducerConfig.LINGER_MS_CONFIG; public static final String AUTO_BALANCER_METRICS_REPORTER_BATCH_SIZE_CONFIG = PREFIX + ProducerConfig.BATCH_SIZE_CONFIG; /* Default values */ - public static final String DEFAULT_AUTO_BALANCER_METRICS_REPORTER_PRODUCER_CLIENT_ID = "AutoBalancerMetricsReporterProducer"; + public static final String DEFAULT_AUTO_BALANCER_METRICS_REPORTER_PRODUCER_CLIENT_ID = "auto_balancer_metrics_reporter_producer"; public static final long DEFAULT_AUTO_BALANCER_METRICS_REPORTER_INTERVAL_MS = TimeUnit.SECONDS.toMillis(10); public static final int DEFAULT_AUTO_BALANCER_METRICS_REPORTER_LINGER_MS = (int) TimeUnit.SECONDS.toMillis(1); public static final int DEFAULT_AUTO_BALANCER_METRICS_BATCH_SIZE = 800 * 1000; diff --git a/core/src/main/java/kafka/autobalancer/metricsreporter/AutoBalancerMetricsReporter.java b/core/src/main/java/kafka/autobalancer/metricsreporter/AutoBalancerMetricsReporter.java index d96d68c6cd..c9acd87572 100644 --- a/core/src/main/java/kafka/autobalancer/metricsreporter/AutoBalancerMetricsReporter.java +++ b/core/src/main/java/kafka/autobalancer/metricsreporter/AutoBalancerMetricsReporter.java @@ -42,6 +42,7 @@ import org.apache.kafka.common.utils.KafkaThread; import org.apache.kafka.network.SocketServerConfigs; import org.apache.kafka.server.config.KRaftConfigs; +import org.apache.kafka.server.config.QuotaConfigs; import org.apache.kafka.server.config.ServerConfigs; import org.apache.kafka.server.metrics.KafkaYammerMetrics; @@ -229,7 +230,7 @@ public void configure(Map rawConfigs) { setIfAbsent(producerProps, ProducerConfig.CLIENT_ID_CONFIG, - reporterConfig.getString(AutoBalancerMetricsReporterConfig.AUTO_BALANCER_METRICS_REPORTER_PRODUCER_CLIENT_ID)); + QuotaConfigs.INTERNAL_CLIENT_ID_PREFIX + reporterConfig.getString(AutoBalancerMetricsReporterConfig.AUTO_BALANCER_METRICS_REPORTER_PRODUCER_CLIENT_ID)); setIfAbsent(producerProps, ProducerConfig.LINGER_MS_CONFIG, reporterConfig.getLong(AutoBalancerMetricsReporterConfig.AUTO_BALANCER_METRICS_REPORTER_LINGER_MS_CONFIG).toString()); setIfAbsent(producerProps, ProducerConfig.BATCH_SIZE_CONFIG, diff --git a/core/src/main/java/kafka/automq/AutoMQConfig.java b/core/src/main/java/kafka/automq/AutoMQConfig.java index 6a6490cff9..aa9a7e5dad 100644 --- a/core/src/main/java/kafka/automq/AutoMQConfig.java +++ b/core/src/main/java/kafka/automq/AutoMQConfig.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeUnit; import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH; import static org.apache.kafka.common.config.ConfigDef.Importance.LOW; @@ -171,6 +172,14 @@ public class AutoMQConfig { public static final String CLUSTER_ID_CONFIG = "cluster.id"; public static final String CLUSTER_ID_DOC = "If the cluster.id is set, Kafka will auto format the storage."; + public static final String S3_BACK_PRESSURE_ENABLED_CONFIG = "automq.backpressure.enabled"; + public static final String S3_BACK_PRESSURE_ENABLED_DOC = "Whether back pressure is enabled"; + public static final boolean S3_BACK_PRESSURE_ENABLED_DEFAULT = true; + + public static final String S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG = "automq.backpressure.cooldown.ms"; + public static final String S3_BACK_PRESSURE_COOLDOWN_MS_DOC = "The cooldown time in milliseconds to wait between two regulator actions"; + public static final long S3_BACK_PRESSURE_COOLDOWN_MS_DEFAULT = TimeUnit.SECONDS.toMillis(15); + // Deprecated config start public static final String S3_ENDPOINT_CONFIG = "s3.endpoint"; public static final String S3_ENDPOINT_DOC = "[DEPRECATED]please use s3.data.buckets. The object storage endpoint, ex. https://s3.us-east-1.amazonaws.com."; @@ -255,6 +264,8 @@ public static void define(ConfigDef configDef) { .define(AutoMQConfig.S3_TELEMETRY_EXPORTER_REPORT_INTERVAL_MS_CONFIG, INT, S3_METRICS_EXPORTER_REPORT_INTERVAL_MS, MEDIUM, AutoMQConfig.S3_TELEMETRY_EXPORTER_REPORT_INTERVAL_MS_DOC) .define(AutoMQConfig.S3_TELEMETRY_METRICS_EXPORTER_URI_CONFIG, STRING, null, HIGH, AutoMQConfig.S3_TELEMETRY_METRICS_EXPORTER_URI_DOC) .define(AutoMQConfig.S3_TELEMETRY_METRICS_BASE_LABELS_CONFIG, STRING, null, MEDIUM, AutoMQConfig.S3_TELEMETRY_METRICS_BASE_LABELS_DOC) + .define(AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG, BOOLEAN, AutoMQConfig.S3_BACK_PRESSURE_ENABLED_DEFAULT, MEDIUM, AutoMQConfig.S3_BACK_PRESSURE_ENABLED_DOC) + .define(AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG, LONG, AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_DEFAULT, MEDIUM, AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_DOC) // Deprecated config start .define(AutoMQConfig.S3_ENDPOINT_CONFIG, STRING, null, HIGH, AutoMQConfig.S3_ENDPOINT_DOC) .define(AutoMQConfig.S3_REGION_CONFIG, STRING, null, HIGH, AutoMQConfig.S3_REGION_DOC) diff --git a/core/src/main/java/kafka/automq/backpressure/BackPressureConfig.java b/core/src/main/java/kafka/automq/backpressure/BackPressureConfig.java new file mode 100644 index 0000000000..270699b1b4 --- /dev/null +++ b/core/src/main/java/kafka/automq/backpressure/BackPressureConfig.java @@ -0,0 +1,87 @@ +/* + * Copyright 2024, AutoMQ HK Limited. + * + * The use of this file is governed by the Business Source License, + * as detailed in the file "/LICENSE.S3Stream" included in this repository. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package kafka.automq.backpressure; + +import kafka.automq.AutoMQConfig; +import kafka.server.KafkaConfig; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.utils.ConfigUtils; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class BackPressureConfig { + + public static final Set RECONFIGURABLE_CONFIGS = Set.of( + AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG, + AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG + ); + + private volatile boolean enabled; + /** + * The cooldown time in milliseconds to wait between two regulator actions. + */ + private long cooldownMs; + + public static BackPressureConfig from(KafkaConfig config) { + return new BackPressureConfig(config.s3BackPressureEnabled(), config.s3BackPressureCooldownMs()); + } + + public static BackPressureConfig from(Map raw) { + Map configs = new HashMap<>(raw); + return new BackPressureConfig( + ConfigUtils.getBoolean(configs, AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG), + ConfigUtils.getLong(configs, AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG) + ); + } + + public BackPressureConfig(boolean enabled, long cooldownMs) { + this.enabled = enabled; + this.cooldownMs = cooldownMs; + } + + public static void validate(Map raw) throws ConfigException { + Map configs = new HashMap<>(raw); + if (configs.containsKey(AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG)) { + ConfigUtils.getBoolean(configs, AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG); + } + if (configs.containsKey(AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG)) { + validateCooldownMs(ConfigUtils.getLong(configs, AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG)); + } + } + + public static void validateCooldownMs(long cooldownMs) throws ConfigException { + if (cooldownMs < 0) { + throw new ConfigException(AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG, cooldownMs, "The cooldown time must be non-negative."); + } + } + + public void update(Map raw) { + Map configs = new HashMap<>(raw); + if (configs.containsKey(AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG)) { + this.enabled = ConfigUtils.getBoolean(configs, AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG); + } + if (configs.containsKey(AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG)) { + this.cooldownMs = ConfigUtils.getLong(configs, AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG); + } + } + + public boolean enabled() { + return enabled; + } + + public long cooldownMs() { + return cooldownMs; + } +} diff --git a/core/src/main/java/kafka/automq/backpressure/BackPressureManager.java b/core/src/main/java/kafka/automq/backpressure/BackPressureManager.java new file mode 100644 index 0000000000..51ea14bdef --- /dev/null +++ b/core/src/main/java/kafka/automq/backpressure/BackPressureManager.java @@ -0,0 +1,37 @@ +/* + * Copyright 2024, AutoMQ HK Limited. + * + * The use of this file is governed by the Business Source License, + * as detailed in the file "/LICENSE.S3Stream" included in this repository. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package kafka.automq.backpressure; + +import org.apache.kafka.common.Reconfigurable; + +/** + * It checks the {@link LoadLevel} of the system and takes actions based on the load level + * to prevent the system from being overwhelmed. + */ +public interface BackPressureManager extends Reconfigurable { + + /** + * Start the back pressure manager. + */ + void start(); + + /** + * Register a checker to check the load level of the system. + * Note: It should be called between {@link #start()} and {@link #shutdown()}. + */ + void registerChecker(Checker checker); + + /** + * Shutdown the back pressure manager, and release all resources. + */ + void shutdown(); +} diff --git a/core/src/main/java/kafka/automq/backpressure/Checker.java b/core/src/main/java/kafka/automq/backpressure/Checker.java new file mode 100644 index 0000000000..085d4135f9 --- /dev/null +++ b/core/src/main/java/kafka/automq/backpressure/Checker.java @@ -0,0 +1,33 @@ +/* + * Copyright 2024, AutoMQ HK Limited. + * + * The use of this file is governed by the Business Source License, + * as detailed in the file "/LICENSE.S3Stream" included in this repository. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package kafka.automq.backpressure; + +/** + * A checker to check the load level of the system periodically. + */ +public interface Checker { + + /** + * The source of the checker, which should be unique to identify the checker. + */ + String source(); + + /** + * Check the load level of the system. + */ + LoadLevel check(); + + /** + * The interval in milliseconds to check the load level of the system. + */ + long intervalMs(); +} diff --git a/core/src/main/java/kafka/automq/backpressure/DefaultBackPressureManager.java b/core/src/main/java/kafka/automq/backpressure/DefaultBackPressureManager.java new file mode 100644 index 0000000000..3a95a98be3 --- /dev/null +++ b/core/src/main/java/kafka/automq/backpressure/DefaultBackPressureManager.java @@ -0,0 +1,171 @@ +/* + * Copyright 2024, AutoMQ HK Limited. + * + * The use of this file is governed by the Business Source License, + * as detailed in the file "/LICENSE.S3Stream" included in this repository. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package kafka.automq.backpressure; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.server.metrics.s3stream.S3StreamKafkaMetricsManager; + +import com.automq.stream.utils.ThreadUtils; +import com.automq.stream.utils.Threads; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static kafka.automq.backpressure.BackPressureConfig.RECONFIGURABLE_CONFIGS; + +public class DefaultBackPressureManager implements BackPressureManager { + + private static final Logger LOGGER = LoggerFactory.getLogger(DefaultBackPressureManager.class); + + private final BackPressureConfig config; + private final Regulator regulator; + + /** + * The scheduler to schedule the checker periodically. + * Package-private for testing. + */ + ScheduledExecutorService checkerScheduler; + /** + * The map to store the source and the most recent load level from the checker. + * Note: It should only be accessed in the {@link #checkerScheduler} thread. + */ + private final Map loadLevels = new HashMap<>(); + /** + * The last time to trigger the regulator. + * Note: It should only be accessed in the {@link #checkerScheduler} thread. + */ + private long lastRegulateTime = System.currentTimeMillis(); + /** + * The last load level to trigger the regulator. + * Only used for logging and monitoring. + */ + private LoadLevel lastRegulateLevel = LoadLevel.NORMAL; + /** + * The current state metrics of the system. + * Only used for monitoring. + * + * @see S3StreamKafkaMetricsManager#setBackPressureStateSupplier + */ + private final Map stateMetrics = new HashMap<>(LoadLevel.values().length); + + public DefaultBackPressureManager(BackPressureConfig config, Regulator regulator) { + this.config = config; + this.regulator = regulator; + } + + @Override + public void start() { + this.checkerScheduler = Threads.newSingleThreadScheduledExecutor(ThreadUtils.createThreadFactory("back-pressure-checker-%d", false), LOGGER, true, false); + S3StreamKafkaMetricsManager.setBackPressureStateSupplier(this::stateMetrics); + } + + @Override + public void registerChecker(Checker checker) { + checkerScheduler.scheduleWithFixedDelay(() -> { + loadLevels.put(checker.source(), checker.check()); + maybeRegulate(); + }, 0, checker.intervalMs(), TimeUnit.MILLISECONDS); + } + + @Override + public void shutdown() { + ThreadUtils.shutdownExecutor(checkerScheduler, 1, TimeUnit.SECONDS); + } + + private void maybeRegulate() { + if (!config.enabled()) { + return; + } + maybeRegulate(false); + } + + /** + * Regulate the system if the cooldown time has passed. + * + * @param isInternal True if it is an internal call, which means it should not schedule the next regulate action. + */ + private void maybeRegulate(boolean isInternal) { + LoadLevel loadLevel = currentLoadLevel(); + long now = System.currentTimeMillis(); + long timeElapsed = now - lastRegulateTime; + + if (timeElapsed < config.cooldownMs()) { + // Skip regulating if the cooldown time has not passed. + if (!isInternal) { + // Schedule the next regulate action if it is not an internal call. + checkerScheduler.schedule(() -> maybeRegulate(true), config.cooldownMs() - timeElapsed, TimeUnit.MILLISECONDS); + } + return; + } + regulate(loadLevel, now); + } + + /** + * Get the current load level of the system, which is, the maximum load level from all checkers. + */ + private LoadLevel currentLoadLevel() { + return loadLevels.values().stream() + .max(LoadLevel::compareTo) + .orElse(LoadLevel.NORMAL); + } + + private void regulate(LoadLevel loadLevel, long now) { + if (LoadLevel.NORMAL.equals(loadLevel)) { + if (!LoadLevel.NORMAL.equals(lastRegulateLevel)) { + LOGGER.info("The system is back to a normal state, checkers: {}", loadLevels); + } + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("The system is in a normal state, checkers: {}", loadLevels); + } + } else { + LOGGER.info("The system is in a {} state, checkers: {}", loadLevel, loadLevels); + } + + loadLevel.regulate(regulator); + lastRegulateTime = now; + lastRegulateLevel = loadLevel; + } + + private Map stateMetrics() { + LoadLevel current = currentLoadLevel(); + for (LoadLevel level : LoadLevel.values()) { + int value = level.equals(current) ? current.ordinal() : -1; + stateMetrics.put(level.name(), value); + } + return stateMetrics; + } + + @Override + public Set reconfigurableConfigs() { + return RECONFIGURABLE_CONFIGS; + } + + @Override + public void validateReconfiguration(Map configs) throws ConfigException { + BackPressureConfig.validate(configs); + } + + @Override + public void reconfigure(Map configs) { + config.update(configs); + } + + @Override + public void configure(Map configs) { + } +} diff --git a/core/src/main/java/kafka/automq/backpressure/LoadLevel.java b/core/src/main/java/kafka/automq/backpressure/LoadLevel.java new file mode 100644 index 0000000000..3ff1e48fcc --- /dev/null +++ b/core/src/main/java/kafka/automq/backpressure/LoadLevel.java @@ -0,0 +1,43 @@ +/* + * Copyright 2024, AutoMQ HK Limited. + * + * The use of this file is governed by the Business Source License, + * as detailed in the file "/LICENSE.S3Stream" included in this repository. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package kafka.automq.backpressure; + +/** + * Represents the load level of the system. + * {@link BackPressureManager} will take actions based on the load level. + * Note: It MUST be ordered by the severity. + */ +public enum LoadLevel { + /** + * The system is in a normal state. + */ + NORMAL { + @Override + public void regulate(Regulator regulator) { + regulator.increase(); + } + }, + /** + * The system is in a high load state, and some actions should be taken to reduce the load. + */ + HIGH { + @Override + public void regulate(Regulator regulator) { + regulator.decrease(); + } + }; + + /** + * Take actions based on the load level. + */ + public abstract void regulate(Regulator regulator); +} diff --git a/core/src/main/java/kafka/automq/backpressure/Regulator.java b/core/src/main/java/kafka/automq/backpressure/Regulator.java new file mode 100644 index 0000000000..942b022616 --- /dev/null +++ b/core/src/main/java/kafka/automq/backpressure/Regulator.java @@ -0,0 +1,31 @@ +/* + * Copyright 2024, AutoMQ HK Limited. + * + * The use of this file is governed by the Business Source License, + * as detailed in the file "/LICENSE.S3Stream" included in this repository. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package kafka.automq.backpressure; + +/** + * The Regulator class is responsible for controlling and limiting the rate of external requests. + * It provides methods to increase, decrease, and minimize the flow of incoming requests. + */ +public interface Regulator { + + /** + * Increase the rate of incoming requests. + * If the rate is already at the maximum, this method does nothing. + */ + void increase(); + + /** + * Decrease the rate of incoming requests. + * If the rate is already at the minimum, this method does nothing. + */ + void decrease(); +} diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 9496c1c441..7d954c48f2 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -17,6 +17,7 @@ package kafka.server +import kafka.automq.backpressure.{BackPressureConfig, BackPressureManager, DefaultBackPressureManager, Regulator} import kafka.automq.zonerouter.{NoopProduceRouter, ProduceRouter} import kafka.cluster.EndPoint import kafka.coordinator.group.{CoordinatorLoaderImpl, CoordinatorPartitionWriter, GroupCoordinatorAdapter} @@ -158,6 +159,8 @@ class BrokerServer( def metadataLoader: MetadataLoader = sharedServer.loader var produceRouter: ProduceRouter = _ + + var backPressureManager: BackPressureManager = _ // AutoMQ inject end private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus): Boolean = { @@ -504,6 +507,13 @@ class BrokerServer( () => lifecycleManager.resendBrokerRegistrationUnlessZkMode()) metadataPublishers.add(brokerRegistrationTracker) + // AutoMQ inject start + backPressureManager = new DefaultBackPressureManager( + BackPressureConfig.from(config), + newBackPressureRegulator() + ) + backPressureManager.start() + // AutoMQ inject end // Register parts of the broker that can be reconfigured via dynamic configs. This needs to // be done before we publish the dynamic configs, so that we don't miss anything. @@ -660,6 +670,10 @@ class BrokerServer( lifecycleManager.beginShutdown() // AutoMQ for Kafka inject start + if (backPressureManager != null) { + CoreUtils.swallow(backPressureManager.shutdown(), this) + } + // https://github.com/AutoMQ/automq-for-kafka/issues/540 // await partition shutdown: // 1. after lifecycleManager start shutdown to trigger partitions gracefully reassign. @@ -796,6 +810,16 @@ class BrokerServer( dataPlaneRequestProcessor.asInstanceOf[ElasticKafkaApis].setProduceRouter(produceRouter) produceRouter } + + protected def newBackPressureRegulator(): Regulator = { + new Regulator { + override def increase(): Unit = { + } + + override def decrease(): Unit = { + } + } + } // AutoMQ inject end } diff --git a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala index 997e6c1859..8974248131 100644 --- a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala @@ -44,7 +44,7 @@ class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig, private val quotaCallback: Option[ClientQuotaCallback]) extends ClientQuotaManager(config, metrics, QuotaType.Request, time, threadNamePrefix, quotaCallback) { - protected val maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(this.config.quotaWindowSizeSeconds) + private val maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(this.config.quotaWindowSizeSeconds) private val exemptMetricName = metrics.metricName("exempt-request-time", QuotaType.Request.toString, "Tracking exempt-request-time utilization percentage") diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index a3c5e64d93..6bb9d6257c 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -241,7 +241,6 @@ class DelayedFetch( error(s"Unexpected error in delayed fetch: $params $fetchInfos ", e) } } - ReadHint.clear() // AutoMQ for Kafka inject end val fetchPartitionData = logReadResults.map { case (tp, result) => @@ -252,6 +251,11 @@ class DelayedFetch( } responseCallback(fetchPartitionData) + // AutoMQ for Kafka inject start + // clear hint after callback as it will be used in the callback + // see {@link ElasticKafkaApis#handleFetchRequest#processResponseCallback} + ReadHint.clear() + // AutoMQ for Kafka inject end } } diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index d3feaad484..d9dac744c4 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -18,6 +18,7 @@ package kafka.server import kafka.autobalancer.config.{AutoBalancerControllerConfig, AutoBalancerMetricsReporterConfig} +import kafka.automq.backpressure.BackPressureConfig import java.util import java.util.{Collections, Properties} @@ -101,7 +102,8 @@ object DynamicBrokerConfig { DynamicProducerStateManagerConfig ++ DynamicRemoteLogConfig.ReconfigurableConfigs ++ AutoBalancerControllerConfig.RECONFIGURABLE_CONFIGS.asScala ++ - AutoBalancerMetricsReporterConfig.RECONFIGURABLE_CONFIGS.asScala + AutoBalancerMetricsReporterConfig.RECONFIGURABLE_CONFIGS.asScala ++ + BackPressureConfig.RECONFIGURABLE_CONFIGS.asScala private val ClusterLevelListenerConfigs = Set(SocketServerConfigs.MAX_CONNECTIONS_CONFIG, SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG, SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG) private val PerBrokerConfigs = (DynamicSecurityConfigs ++ DynamicListenerConfig.ReconfigurableConfigs).diff( @@ -269,6 +271,13 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging addReconfigurable(kafkaServer.kafkaYammerMetrics) addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, kafkaServer.config, kafkaServer.metrics, kafkaServer.clusterId)) addReconfigurable(new DynamicClientQuotaCallback(kafkaServer.quotaManagers, kafkaServer.config)) + // AutoMQ inject start + kafkaServer match { + case brokerServer: BrokerServer => + addReconfigurable(brokerServer.backPressureManager) + case _ => + } + // AutoMQ inject end addBrokerReconfigurable(new BrokerDynamicThreadPool(kafkaServer)) addBrokerReconfigurable(new DynamicLogConfig(kafkaServer.logManager, kafkaServer)) diff --git a/core/src/main/scala/kafka/server/FairLimiter.java b/core/src/main/scala/kafka/server/FairLimiter.java index d6ad1edc6c..86ac171494 100644 --- a/core/src/main/scala/kafka/server/FairLimiter.java +++ b/core/src/main/scala/kafka/server/FairLimiter.java @@ -13,6 +13,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -27,13 +28,32 @@ public class FairLimiter implements Limiter { private final Lock lock = new ReentrantLock(true); private final Semaphore permits; - public FairLimiter(int size) { - maxPermits = size; - permits = new Semaphore(size); + /** + * The name of this limiter, used for metrics. + */ + private final String name; + /** + * The number of threads waiting for permits, used for metrics. + */ + private final AtomicInteger waitingThreads = new AtomicInteger(0); + + public FairLimiter(int size, String name) { + this.maxPermits = size; + this.permits = new Semaphore(size); + this.name = name; } @Override public Handler acquire(int permit) throws InterruptedException { + waitingThreads.incrementAndGet(); + try { + return acquire0(permit); + } finally { + waitingThreads.decrementAndGet(); + } + } + + private Handler acquire0(int permit) throws InterruptedException { lock.lock(); try { permits.acquire(permit); @@ -45,6 +65,15 @@ public Handler acquire(int permit) throws InterruptedException { @Override public Handler acquire(int permit, long timeoutMs) throws InterruptedException { + waitingThreads.incrementAndGet(); + try { + return acquire0(permit, timeoutMs); + } finally { + waitingThreads.decrementAndGet(); + } + } + + private Handler acquire0(int permit, long timeoutMs) throws InterruptedException { long start = System.nanoTime(); if (lock.tryLock(timeoutMs, TimeUnit.MILLISECONDS)) { try { @@ -72,6 +101,16 @@ public int availablePermits() { return permits.availablePermits(); } + @Override + public int waitingThreads() { + return waitingThreads.get(); + } + + @Override + public String name() { + return name; + } + private Handler acquireLocked(int permit, long timeoutNs) throws InterruptedException { if (permit > maxPermits) { permit = maxPermits; diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 4fd1e53fe3..50935bdc67 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -793,6 +793,8 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) val s3RefillPeriodMsProp = getInt(AutoMQConfig.S3_NETWORK_REFILL_PERIOD_MS_CONFIG) val s3MetricsLevel = getString(AutoMQConfig.S3_TELEMETRY_METRICS_LEVEL_CONFIG) val s3ExporterReportIntervalMs = getInt(AutoMQConfig.S3_TELEMETRY_EXPORTER_REPORT_INTERVAL_MS_CONFIG) + val s3BackPressureEnabled = getBoolean(AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG) + val s3BackPressureCooldownMs = getLong(AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG) // AutoMQ inject end /** Internal Configurations **/ diff --git a/core/src/main/scala/kafka/server/Limiter.java b/core/src/main/scala/kafka/server/Limiter.java index 3e3511de77..b6742adc0b 100644 --- a/core/src/main/scala/kafka/server/Limiter.java +++ b/core/src/main/scala/kafka/server/Limiter.java @@ -49,6 +49,16 @@ public interface Limiter { */ int availablePermits(); + /** + * Return the number of threads waiting for permits. + */ + int waitingThreads(); + + /** + * Return the name of this limiter. + */ + String name(); + /** * A handler to release acquired permits. */ diff --git a/core/src/main/scala/kafka/server/NoopLimiter.java b/core/src/main/scala/kafka/server/NoopLimiter.java index 1fab234492..aaaa63ab7d 100644 --- a/core/src/main/scala/kafka/server/NoopLimiter.java +++ b/core/src/main/scala/kafka/server/NoopLimiter.java @@ -38,6 +38,16 @@ public int availablePermits() { return Integer.MAX_VALUE; } + @Override + public int waitingThreads() { + return 0; + } + + @Override + public String name() { + return "noop"; + } + public static class NoopHandler implements Handler { @Override public void close() { diff --git a/core/src/main/scala/kafka/server/QuotaFactory.scala b/core/src/main/scala/kafka/server/QuotaFactory.scala index 990565e04c..9a13d864f7 100644 --- a/core/src/main/scala/kafka/server/QuotaFactory.scala +++ b/core/src/main/scala/kafka/server/QuotaFactory.scala @@ -32,6 +32,16 @@ object QuotaType { case object Fetch extends QuotaType case object Produce extends QuotaType case object Request extends QuotaType + // AutoMQ for Kafka inject start + /** + * Quota type for slow fetch throughput limiting. + */ + case object SlowFetch extends QuotaType + /** + * Quota type for request rate limiting. + */ + case object RequestRate extends QuotaType + // AutoMQ for Kafka inject end case object ControllerMutation extends QuotaType case object LeaderReplication extends QuotaType case object FollowerReplication extends QuotaType @@ -44,11 +54,16 @@ object QuotaType { case QuotaType.Fetch => ClientQuotaType.FETCH case QuotaType.Produce => ClientQuotaType.PRODUCE case QuotaType.Request => ClientQuotaType.REQUEST + // AutoMQ for Kafka inject start + case QuotaType.SlowFetch => ClientQuotaType.SLOW_FETCH + case QuotaType.RequestRate => ClientQuotaType.REQUEST_RATE + // AutoMQ for Kafka inject end case QuotaType.ControllerMutation => ClientQuotaType.CONTROLLER_MUTATION case _ => throw new IllegalArgumentException(s"Not a client quota type: $quotaType") } } + // AutoMQ for Kafka inject start // for test def fetch(): QuotaType = { QuotaType.Fetch @@ -58,9 +73,14 @@ object QuotaType { QuotaType.Produce } - def request(): QuotaType = { - QuotaType.Request + def slowFetch(): QuotaType = { + QuotaType.SlowFetch + } + + def requestRate(): QuotaType = { + QuotaType.RequestRate } + // AutoMQ for Kafka inject end } sealed trait QuotaType diff --git a/core/src/main/scala/kafka/server/streamaspect/BrokerQuotaManager.scala b/core/src/main/scala/kafka/server/streamaspect/BrokerQuotaManager.scala index accfe469be..b9b5bd3ddd 100644 --- a/core/src/main/scala/kafka/server/streamaspect/BrokerQuotaManager.scala +++ b/core/src/main/scala/kafka/server/streamaspect/BrokerQuotaManager.scala @@ -11,19 +11,20 @@ package kafka.server.streamaspect +import com.automq.stream.s3.metrics.S3StreamMetricsManager import kafka.network.RequestChannel import kafka.server._ import kafka.utils.QuotaUtils import org.apache.kafka.common.MetricName import org.apache.kafka.common.metrics.stats.{Avg, CumulativeSum, Rate} import org.apache.kafka.common.metrics.{Metrics, Quota, QuotaViolationException, Sensor} -import org.apache.kafka.common.requests.RequestContext import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.Time import org.apache.kafka.network.Session -import org.apache.kafka.server.config.BrokerQuotaManagerConfig +import org.apache.kafka.server.config.{BrokerQuotaManagerConfig, QuotaConfigs} -import java.util.Properties +import java.util.concurrent.TimeUnit +import java.util.{Optional, Properties} import scala.collection.mutable import scala.jdk.CollectionConverters._ @@ -32,6 +33,7 @@ class BrokerQuotaManager(private val config: BrokerQuotaManagerConfig, private val time: Time, private val threadNamePrefix: String) extends ClientRequestQuotaManager(config, metrics, time, threadNamePrefix, None) { + private val maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(this.config.quotaWindowSizeSeconds * this.config.numQuotaSamples) private val metricsTags = Map("domain" -> "broker", "nodeId" -> String.valueOf(config.nodeId())) private val whiteListCache = mutable.HashMap[String, Boolean]() @@ -41,14 +43,31 @@ class BrokerQuotaManager(private val config: BrokerQuotaManagerConfig, override def delayQueueSensor: Sensor = brokerDelayQueueSensor - def getMaxValueInQuotaWindow(quotaType: QuotaType): Double = { - if (config.quotaEnabled) { + S3StreamMetricsManager.registerBrokerQuotaLimitSupplier(() => java.util.Map.of( + QuotaType.RequestRate.toString, quotaLimit(QuotaType.RequestRate), + QuotaType.Produce.toString, quotaLimit(QuotaType.Produce), + QuotaType.Fetch.toString, quotaLimit(QuotaType.Fetch), + QuotaType.SlowFetch.toString, quotaLimit(QuotaType.SlowFetch) + )) + + def getMaxValueInQuotaWindow(quotaType: QuotaType, request: RequestChannel.Request): Double = { + if (shouldThrottle(request)) { quotaLimit(quotaType) } else { Double.MaxValue } } + /** + * Get the value of the metric for the given quota type at the given time. + * It return empty if the metric is not found, which is possible if the quota is disabled or no request has been + * processed yet. + */ + def getQuotaMetricValue(quotaType: QuotaType, timeMs: Long): Optional[java.lang.Double] = { + Optional.ofNullable(metrics.metric(clientQuotaMetricName(quotaType, metricsTags))) + .map(_.measurableValueV2(timeMs)) + } + def recordNoThrottle(quotaType: QuotaType, value: Double): Unit = { val clientSensors = getOrCreateQuotaSensors(quotaType) clientSensors.quotaSensor.record(value, time.milliseconds(), false) @@ -56,19 +75,26 @@ class BrokerQuotaManager(private val config: BrokerQuotaManagerConfig, def maybeRecordAndGetThrottleTimeMs(quotaType: QuotaType, request: RequestChannel.Request, value: Double, timeMs: Long): Int = { - if (!config.quotaEnabled) { - return 0 + if (shouldThrottle(request)) { + maybeRecordAndGetThrottleTimeMs(quotaType, value, timeMs) + } else { + 0 } - - maybeRecordAndGetThrottleTimeMs(quotaType, request.session, request.context, value, timeMs) } - protected def throttleTime(quotaType: QuotaType, e: QuotaViolationException, timeMs: Long): Long = { - if (quotaType == QuotaType.Request) { + override protected def throttleTime(e: QuotaViolationException, timeMs: Long): Long = { QuotaUtils.boundedThrottleTime(e, maxThrottleTimeMs, timeMs) - } else { - QuotaUtils.throttleTime(e, timeMs) - } + } + + private def shouldThrottle(request: RequestChannel.Request): Boolean = { + val quotaEnabled = config.quotaEnabled + val isInternal = isInternalClient(request.context.clientId()) + val isWhiteListed = isInWhiteList(request.session.principal, request.context.clientId(), request.context.listenerName()) + quotaEnabled && !isInternal && !isWhiteListed + } + + private def isInternalClient(clientId: String): Boolean = { + clientId.startsWith(QuotaConfigs.INTERNAL_CLIENT_ID_PREFIX) } private def isInWhiteList(principal: KafkaPrincipal, clientId: String, listenerName: String): Boolean = { @@ -84,18 +110,14 @@ class BrokerQuotaManager(private val config: BrokerQuotaManagerConfig, } } - def maybeRecordAndGetThrottleTimeMs(quotaType: QuotaType, session: Session, context: RequestContext, value: Double, - timeMs: Long): Int = { - if (isInWhiteList(session.principal, context.clientId(), context.listenerName())) { - return 0 - } + def maybeRecordAndGetThrottleTimeMs(quotaType: QuotaType, value: Double, timeMs: Long): Int = { val clientSensors = getOrCreateQuotaSensors(quotaType) try { clientSensors.quotaSensor.record(value, timeMs, true) 0 } catch { case e: QuotaViolationException => - val throttleTimeMs = throttleTime(quotaType, e, timeMs).toInt + val throttleTimeMs = throttleTime(e, timeMs).toInt debug(s"Quota violated for sensor (${clientSensors.quotaSensor.name}). Delay time: ($throttleTimeMs)") throttleTimeMs } @@ -112,34 +134,59 @@ class BrokerQuotaManager(private val config: BrokerQuotaManagerConfig, whiteListCache.clear() if (!config.quotaEnabled) { - metrics.removeSensor(getQuotaSensorName(QuotaType.Request, metricsTags)) + metrics.removeSensor(getQuotaSensorName(QuotaType.RequestRate, metricsTags)) metrics.removeSensor(getQuotaSensorName(QuotaType.Produce, metricsTags)) metrics.removeSensor(getQuotaSensorName(QuotaType.Fetch, metricsTags)) - metrics.removeSensor(getThrottleTimeSensorName(QuotaType.Request, metricsTags)) + metrics.removeSensor(getQuotaSensorName(QuotaType.SlowFetch, metricsTags)) + metrics.removeSensor(getThrottleTimeSensorName(QuotaType.RequestRate, metricsTags)) metrics.removeSensor(getThrottleTimeSensorName(QuotaType.Produce, metricsTags)) metrics.removeSensor(getThrottleTimeSensorName(QuotaType.Fetch, metricsTags)) + metrics.removeSensor(getThrottleTimeSensorName(QuotaType.SlowFetch, metricsTags)) return } val allMetrics = metrics.metrics() - val requestMetrics = allMetrics.get(clientQuotaMetricName(QuotaType.Request, metricsTags)) - if (requestMetrics != null) { - requestMetrics.config(getQuotaMetricConfig(quotaLimit(QuotaType.Request))) + val requestRateMetric = allMetrics.get(clientQuotaMetricName(QuotaType.RequestRate, metricsTags)) + if (requestRateMetric != null) { + requestRateMetric.config(getQuotaMetricConfig(quotaLimit(QuotaType.RequestRate))) } - val produceMetrics = allMetrics.get(clientQuotaMetricName(QuotaType.Produce, metricsTags)) - if (produceMetrics != null) { - produceMetrics.config(getQuotaMetricConfig(quotaLimit(QuotaType.Produce))) + val produceMetric = allMetrics.get(clientQuotaMetricName(QuotaType.Produce, metricsTags)) + if (produceMetric != null) { + produceMetric.config(getQuotaMetricConfig(quotaLimit(QuotaType.Produce))) } - val fetchMetrics = allMetrics.get(clientQuotaMetricName(QuotaType.Fetch, metricsTags)) - if (fetchMetrics != null) { - fetchMetrics.config(getQuotaMetricConfig(quotaLimit(QuotaType.Fetch))) + val fetchMetric = allMetrics.get(clientQuotaMetricName(QuotaType.Fetch, metricsTags)) + if (fetchMetric != null) { + fetchMetric.config(getQuotaMetricConfig(quotaLimit(QuotaType.Fetch))) + } + + val slowFetchMetric = allMetrics.get(clientQuotaMetricName(QuotaType.SlowFetch, metricsTags)) + if (slowFetchMetric != null) { + slowFetchMetric.config(getQuotaMetricConfig(quotaLimit(QuotaType.SlowFetch))) } } } + def updateQuota(quotaType: QuotaType, quota: Double): Unit = { + // update the quota in the config first to make sure the new quota will be used if {@link #updateQuotaMetricConfigs} is called + quotaType match { + case QuotaType.RequestRate => config.requestRateQuota(quota) + case QuotaType.Produce => config.produceQuota(quota) + case QuotaType.Fetch => config.fetchQuota(quota) + case QuotaType.SlowFetch => config.slowFetchQuota(quota) + case _ => throw new IllegalArgumentException(s"Unknown quota type $quotaType") + } + + // update the metric config + val allMetrics = metrics.metrics() + val metric = allMetrics.get(clientQuotaMetricName(quotaType, metricsTags)) + if (metric != null) { + metric.config(getQuotaMetricConfig(quotaLimit(quotaType))) + } + } + def throttle( quotaType: QuotaType, throttleCallback: ThrottleCallback, @@ -161,11 +208,14 @@ class BrokerQuotaManager(private val config: BrokerQuotaManagerConfig, private def getQuotaSensorName(quotaType: QuotaType, metricTags: Map[String, String]): String = s"$quotaType-${metricTagsToSensorSuffix(metricTags)}" - private def quotaLimit(quotaType: QuotaType): Double = { - if (quotaType == QuotaType.Request) config.requestQuota - else if (quotaType == QuotaType.Produce) config.produceQuota - else if (quotaType == QuotaType.Fetch) config.fetchQuota - else throw new IllegalArgumentException(s"Unknown quota type $quotaType") + def quotaLimit(quotaType: QuotaType): Double = { + quotaType match { + case QuotaType.RequestRate => config.requestRateQuota + case QuotaType.Produce => config.produceQuota + case QuotaType.Fetch => config.fetchQuota + case QuotaType.SlowFetch => config.slowFetchQuota + case _ => throw new IllegalArgumentException(s"Unknown quota type $quotaType") + } } protected def clientQuotaMetricName(quotaType: QuotaType, quotaMetricTags: Map[String, String]): MetricName = { diff --git a/core/src/main/scala/kafka/server/streamaspect/ElasticKafkaApis.scala b/core/src/main/scala/kafka/server/streamaspect/ElasticKafkaApis.scala index a1d432def2..047567b489 100644 --- a/core/src/main/scala/kafka/server/streamaspect/ElasticKafkaApis.scala +++ b/core/src/main/scala/kafka/server/streamaspect/ElasticKafkaApis.scala @@ -5,7 +5,7 @@ import com.automq.stream.utils.threads.S3StreamThreadPoolMonitor import com.yammer.metrics.core.Histogram import kafka.automq.zonerouter.{ClientIdMetadata, NoopProduceRouter, ProduceRouter} import kafka.coordinator.transaction.TransactionCoordinator -import kafka.log.streamaspect.ElasticLogManager +import kafka.log.streamaspect.{ElasticLogManager, ReadHint} import kafka.metrics.KafkaMetricsUtil import kafka.network.RequestChannel import kafka.server.QuotaFactory.QuotaManagers @@ -312,9 +312,7 @@ class ElasticKafkaApis( val requestThrottleTimeMs = if (produceRequest.acks == 0) 0 else quotas.request.maybeRecordAndGetThrottleTimeMs(request, timeMs) - val brokerRequestThrottleTimeMs = - if (produceRequest.acks == 0) 0 - else quotas.broker.maybeRecordAndGetThrottleTimeMs(QuotaType.Request, request, 1, timeMs) + val brokerRequestThrottleTimeMs = quotas.broker.maybeRecordAndGetThrottleTimeMs(QuotaType.RequestRate, request, 1, timeMs) val maxThrottleTimeMs = IntStream.of(bandwidthThrottleTimeMs, requestThrottleTimeMs, brokerBandwidthThrottleTimeMs, brokerRequestThrottleTimeMs).max().orElse(0) if (maxThrottleTimeMs > 0) { request.apiThrottleTimeMs = maxThrottleTimeMs @@ -325,7 +323,7 @@ class ElasticKafkaApis( } else if (brokerBandwidthThrottleTimeMs == maxThrottleTimeMs) { requestHelper.throttle(QuotaType.Produce, quotas.broker, request, brokerBandwidthThrottleTimeMs) } else if (brokerRequestThrottleTimeMs == maxThrottleTimeMs) { - requestHelper.throttle(QuotaType.Request, quotas.broker, request, brokerRequestThrottleTimeMs) + requestHelper.throttle(QuotaType.RequestRate, quotas.broker, request, brokerRequestThrottleTimeMs) } } // AutoMQ for Kafka inject end @@ -698,26 +696,34 @@ class ElasticKafkaApis( val timeMs = time.milliseconds() // AutoMQ for Kafka inject start + val isSlowRead = !ReadHint.isFastRead + val requestThrottleTimeMs = quotas.request.maybeRecordAndGetThrottleTimeMs(request, timeMs) val bandwidthThrottleTimeMs = quotas.fetch.maybeRecordAndGetThrottleTimeMs(request, responseSize, timeMs) - val brokerRequestThrottleTimeMs = quotas.broker.maybeRecordAndGetThrottleTimeMs(QuotaType.Request, request, 1, timeMs) val brokerBandwidthThrottleTimeMs = quotas.broker.maybeRecordAndGetThrottleTimeMs(QuotaType.Fetch, request, responseSize, timeMs) + val brokerSlowFetchThrottleTimeMs = if (isSlowRead) quotas.broker.maybeRecordAndGetThrottleTimeMs(QuotaType.SlowFetch, request, responseSize, timeMs) else 0 + val brokerRequestThrottleTimeMs = quotas.broker.maybeRecordAndGetThrottleTimeMs(QuotaType.RequestRate, request, 1, timeMs) - val maxThrottleTimeMs = IntStream.of(bandwidthThrottleTimeMs, requestThrottleTimeMs, brokerBandwidthThrottleTimeMs, brokerRequestThrottleTimeMs).max().orElse(0) + val maxThrottleTimeMs = IntStream.of(bandwidthThrottleTimeMs, requestThrottleTimeMs, brokerBandwidthThrottleTimeMs, brokerSlowFetchThrottleTimeMs, brokerRequestThrottleTimeMs).max().orElse(0) if (maxThrottleTimeMs > 0) { request.apiThrottleTimeMs = maxThrottleTimeMs // Even if we need to throttle for request quota violation, we should "unrecord" the already recorded value // from the fetch quota because we are going to return an empty response. quotas.fetch.unrecordQuotaSensor(request, responseSize, timeMs) quotas.broker.unrecordQuotaSensor(QuotaType.Fetch, responseSize, timeMs) + if (isSlowRead) { + quotas.broker.unrecordQuotaSensor(QuotaType.SlowFetch, responseSize, timeMs) + } if (bandwidthThrottleTimeMs == maxThrottleTimeMs) { requestHelper.throttle(quotas.fetch, request, bandwidthThrottleTimeMs) } else if (requestThrottleTimeMs == maxThrottleTimeMs) { requestHelper.throttle(quotas.request, request, requestThrottleTimeMs) } else if (brokerBandwidthThrottleTimeMs == maxThrottleTimeMs) { requestHelper.throttle(QuotaType.Fetch, quotas.broker, request, brokerBandwidthThrottleTimeMs) + } else if (brokerSlowFetchThrottleTimeMs == maxThrottleTimeMs) { + requestHelper.throttle(QuotaType.SlowFetch, quotas.broker, request, brokerSlowFetchThrottleTimeMs) } else if (brokerRequestThrottleTimeMs == maxThrottleTimeMs) { - requestHelper.throttle(QuotaType.Request, quotas.broker, request, brokerRequestThrottleTimeMs) + requestHelper.throttle(QuotaType.RequestRate, quotas.broker, request, brokerRequestThrottleTimeMs) } // AutoMQ for Kafka inject end @@ -747,7 +753,7 @@ class ElasticKafkaApis( Int.MaxValue else { val maxValue = quotas.fetch.getMaxValueInQuotaWindow(request.session, clientId).toInt - val brokerMaxValue = quotas.broker.getMaxValueInQuotaWindow(QuotaType.Fetch).toInt + val brokerMaxValue = quotas.broker.getMaxValueInQuotaWindow(QuotaType.Fetch, request).toInt math.min(maxValue, brokerMaxValue) } diff --git a/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala b/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala index 78084ef7cb..89f2978c3e 100644 --- a/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala +++ b/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala @@ -1,6 +1,7 @@ package kafka.server.streamaspect import com.automq.stream.api.exceptions.FastReadFailFastException +import com.automq.stream.s3.metrics.{MetricsLevel, TimerUtil} import com.automq.stream.utils.FutureUtil import com.automq.stream.utils.threads.S3StreamThreadPoolMonitor import kafka.cluster.Partition @@ -114,14 +115,28 @@ class ElasticReplicaManager( fetchExecutorQueueSizeGaugeMap }) - private val fastFetchLimiter = new FairLimiter(200 * 1024 * 1024) // 200MiB - private val slowFetchLimiter = new FairLimiter(200 * 1024 * 1024) // 200MiB - private val fetchLimiterGaugeMap = new util.HashMap[String, Integer]() + private val fastFetchLimiter = new FairLimiter(200 * 1024 * 1024, FETCH_LIMITER_FAST_NAME) // 200MiB + private val slowFetchLimiter = new FairLimiter(200 * 1024 * 1024, FETCH_LIMITER_SLOW_NAME) // 200MiB + private val fetchLimiterWaitingTasksGaugeMap = new util.HashMap[String, Integer]() + S3StreamKafkaMetricsManager.setFetchLimiterWaitingTaskNumSupplier(() => { + fetchLimiterWaitingTasksGaugeMap.put(FETCH_LIMITER_FAST_NAME, fastFetchLimiter.waitingThreads()) + fetchLimiterWaitingTasksGaugeMap.put(FETCH_LIMITER_SLOW_NAME, slowFetchLimiter.waitingThreads()) + fetchLimiterWaitingTasksGaugeMap + }) + private val fetchLimiterPermitsGaugeMap = new util.HashMap[String, Integer]() S3StreamKafkaMetricsManager.setFetchLimiterPermitNumSupplier(() => { - fetchLimiterGaugeMap.put(FETCH_LIMITER_FAST_NAME, fastFetchLimiter.availablePermits()) - fetchLimiterGaugeMap.put(FETCH_LIMITER_SLOW_NAME, slowFetchLimiter.availablePermits()) - fetchLimiterGaugeMap + fetchLimiterPermitsGaugeMap.put(FETCH_LIMITER_FAST_NAME, fastFetchLimiter.availablePermits()) + fetchLimiterPermitsGaugeMap.put(FETCH_LIMITER_SLOW_NAME, slowFetchLimiter.availablePermits()) + fetchLimiterPermitsGaugeMap }) + private val fetchLimiterTimeoutCounterMap = util.Map.of( + fastFetchLimiter.name, S3StreamKafkaMetricsManager.buildFetchLimiterTimeoutMetric(fastFetchLimiter.name), + slowFetchLimiter.name, S3StreamKafkaMetricsManager.buildFetchLimiterTimeoutMetric(slowFetchLimiter.name) + ) + private val fetchLimiterTimeHistogramMap = util.Map.of( + fastFetchLimiter.name, S3StreamKafkaMetricsManager.buildFetchLimiterTimeMetric(MetricsLevel.INFO, fastFetchLimiter.name), + slowFetchLimiter.name, S3StreamKafkaMetricsManager.buildFetchLimiterTimeMetric(MetricsLevel.INFO, slowFetchLimiter.name) + ) /** * Used to reduce allocation in [[readFromLocalLogV2]] @@ -558,14 +573,16 @@ class ElasticReplicaManager( math.min(bytesNeedFromParam, limiter.maxPermits()) } + val timer: TimerUtil = new TimerUtil() val handler: Handler = timeoutMs match { case t if t > 0 => limiter.acquire(bytesNeed(), t) case _ => limiter.acquire(bytesNeed()) } + fetchLimiterTimeHistogramMap.get(limiter.name).record(timer.elapsedAs(TimeUnit.NANOSECONDS)) if (handler == null) { - // handler maybe null if it timed out to acquire from limiter - // TODO add metrics for this + // the handler will be null if it timed out to acquire from limiter + fetchLimiterTimeoutCounterMap.get(limiter.name).add(MetricsLevel.INFO, 1) // warn(s"Returning emtpy fetch response for fetch request $readPartitionInfo since the wait time exceeds $timeoutMs ms.") ElasticReplicaManager.emptyReadResults(readPartitionInfo.map(_._1)) } else { diff --git a/core/src/test/java/kafka/automq/backpressure/DefaultBackPressureManagerTest.java b/core/src/test/java/kafka/automq/backpressure/DefaultBackPressureManagerTest.java new file mode 100644 index 0000000000..6cce5e742d --- /dev/null +++ b/core/src/test/java/kafka/automq/backpressure/DefaultBackPressureManagerTest.java @@ -0,0 +1,187 @@ +/* + * Copyright 2024, AutoMQ HK Limited. + * + * The use of this file is governed by the Business Source License, + * as detailed in the file "/LICENSE.S3Stream" included in this repository. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package kafka.automq.backpressure; + +import kafka.automq.AutoMQConfig; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; + +public class DefaultBackPressureManagerTest { + static String sourceA = "sourceA"; + static String sourceB = "sourceB"; + static String sourceC = "sourceC"; + + BackPressureConfig config; + DefaultBackPressureManager manager; + + Regulator regulator; + int regulatorIncreaseCalled = 0; + int regulatorDecreaseCalled = 0; + + ScheduledExecutorService scheduler; + int schedulerScheduleCalled = 0; + long schedulerScheduleDelay = 0; + + @BeforeEach + public void setup() { + regulator = mock(Regulator.class); + scheduler = mock(ScheduledExecutorService.class); + + // Mock the regulator to count the number of times each method is called + doAnswer(invocation -> { + regulatorIncreaseCalled++; + return null; + }).when(regulator).increase(); + doAnswer(invocation -> { + regulatorDecreaseCalled++; + return null; + }).when(regulator).decrease(); + + // Mock the scheduler to run the scheduled task immediately and only once + doAnswer(invocation -> { + Runnable runnable = invocation.getArgument(0); + runnable.run(); + return null; + }).when(scheduler).scheduleWithFixedDelay(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class)); + doAnswer(invocation -> { + Runnable runnable = invocation.getArgument(0); + runnable.run(); + schedulerScheduleCalled++; + schedulerScheduleDelay = invocation.getArgument(1); + return null; + }).when(scheduler).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class)); + } + + @Test + public void testDynamicConfig() { + initManager(false, 0); + + callChecker(sourceC, LoadLevel.NORMAL); + callChecker(sourceB, LoadLevel.HIGH); + assertRegulatorCalled(0, 0); + + manager.reconfigure(Map.of( + AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG, "true" + )); + callChecker(sourceC, LoadLevel.NORMAL); + callChecker(sourceB, LoadLevel.NORMAL); + assertRegulatorCalled(1, 1); + + manager.reconfigure(Map.of( + AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG, "false" + )); + callChecker(sourceC, LoadLevel.NORMAL); + callChecker(sourceB, LoadLevel.HIGH); + assertRegulatorCalled(1, 1); + } + + @Test + public void testPriority1() { + initManager(0); + + callChecker(sourceB, LoadLevel.HIGH); + callChecker(sourceC, LoadLevel.NORMAL); + + assertRegulatorCalled(0, 2); + } + + @Test + public void testPriority2() { + initManager(0); + + callChecker(sourceC, LoadLevel.NORMAL); + callChecker(sourceB, LoadLevel.HIGH); + + assertRegulatorCalled(1, 1); + } + + @Test + public void testOverride() { + initManager(0); + + callChecker(sourceA, LoadLevel.NORMAL); + callChecker(sourceA, LoadLevel.HIGH); + callChecker(sourceA, LoadLevel.NORMAL); + + assertRegulatorCalled(2, 1); + } + + @Test + public void testCooldown() { + final long cooldownMs = Long.MAX_VALUE; + final long tolerance = 1000; + + initManager(cooldownMs); + + callChecker(sourceA, LoadLevel.HIGH); + assertRegulatorCalled(0, 0); + assertSchedulerCalled(1); + assertEquals(cooldownMs, schedulerScheduleDelay, tolerance); + + callChecker(sourceA, LoadLevel.NORMAL); + assertRegulatorCalled(0, 0); + assertSchedulerCalled(2); + assertEquals(cooldownMs, schedulerScheduleDelay, tolerance); + } + + private void initManager(long cooldownMs) { + initManager(true, cooldownMs); + } + + /** + * Should be called at the beginning of each test to initialize the manager. + */ + private void initManager(boolean enabled, long cooldownMs) { + config = new BackPressureConfig(enabled, cooldownMs); + manager = new DefaultBackPressureManager(config, regulator); + manager.checkerScheduler = scheduler; + } + + private void callChecker(String source, LoadLevel level) { + manager.registerChecker(new Checker() { + @Override + public String source() { + return source; + } + + @Override + public LoadLevel check() { + return level; + } + + @Override + public long intervalMs() { + return 1; + } + }); + } + + private void assertRegulatorCalled(int increase, int decrease) { + assertEquals(increase, regulatorIncreaseCalled); + assertEquals(decrease, regulatorDecreaseCalled); + } + + private void assertSchedulerCalled(int times) { + assertEquals(times, schedulerScheduleCalled); + } +} diff --git a/core/src/test/scala/unit/kafka/server/BrokerQuotaManagerTest.java b/core/src/test/scala/unit/kafka/server/BrokerQuotaManagerTest.java index aa2bf11586..0b7ec67b87 100644 --- a/core/src/test/scala/unit/kafka/server/BrokerQuotaManagerTest.java +++ b/core/src/test/scala/unit/kafka/server/BrokerQuotaManagerTest.java @@ -105,27 +105,115 @@ public void testQuota() { result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.fetch(), request, 500, time + second2millis); assertEquals(0, result); - // Test request quota + // Test slow fetch quota properties.put(QuotaConfigs.BROKER_QUOTA_FETCH_BYTES_CONFIG, 0); + properties.put(QuotaConfigs.BROKER_QUOTA_SLOW_FETCH_BYTES_CONFIG, 100); + brokerQuotaManager.updateQuotaConfigs(Option.apply(properties)); + result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.slowFetch(), request, 100, time); + assertEquals(0, result); + result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.slowFetch(), request, 100, time + 10); + assertEquals(0, result); + result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.slowFetch(), request, 100, time + second2millis); + assertTrue(result > 0); + + properties.put(QuotaConfigs.BROKER_QUOTA_SLOW_FETCH_BYTES_CONFIG, 1000); + brokerQuotaManager.updateQuotaConfigs(Option.apply(properties)); + result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.slowFetch(), request, 500, time + second2millis); + assertEquals(0, result); + + // Test request quota + properties.put(QuotaConfigs.BROKER_QUOTA_SLOW_FETCH_BYTES_CONFIG, 0); properties.put(QuotaConfigs.BROKER_QUOTA_REQUEST_RATE_CONFIG, 1); brokerQuotaManager.updateQuotaConfigs(Option.apply(properties)); - result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.request(), request, 1, time); + result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time); assertEquals(0, result); - result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.request(), request, 1, time + 10); + result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + 10); assertEquals(0, result); - result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.request(), request, 1, time + second2millis); + result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + second2millis); assertTrue(result > 0); properties.put(QuotaConfigs.BROKER_QUOTA_REQUEST_RATE_CONFIG, 10); brokerQuotaManager.updateQuotaConfigs(Option.apply(properties)); - result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.request(), request, 0, time + second2millis); + result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 0, time + second2millis); + assertEquals(0, result); + } + + @Test + public void testZeroQuota() { + long result; + long time = this.time.milliseconds(); + + // enable quota + Properties properties = new Properties(); + properties.put(QuotaConfigs.BROKER_QUOTA_ENABLED_CONFIG, true); + brokerQuotaManager.updateQuotaConfigs(Option.apply(properties)); + + brokerQuotaManager.updateQuota(QuotaType.requestRate(), 0); + result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time); + assertEquals(1000, result); + + brokerQuotaManager.updateQuota(QuotaType.slowFetch(), 0); + result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.slowFetch(), request, 1, time); + assertEquals(1000, result); + } + + @Test + public void testUpdateQuota() { + int result; + long time = this.time.milliseconds(); + + // enable quota + Properties properties = new Properties(); + properties.put(QuotaConfigs.BROKER_QUOTA_ENABLED_CONFIG, true); + brokerQuotaManager.updateQuotaConfigs(Option.apply(properties)); + + brokerQuotaManager.updateQuota(QuotaType.requestRate(), 1); + // rate = 1 / 2000ms + result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time); + assertQuotaMetricValue(QuotaType.requestRate(), (double) 1 / 2, time); + assertEquals(0, result); + // rate = 2 / 2010ms + result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + 10); + assertQuotaMetricValue(QuotaType.requestRate(), (double) 2 / 2.01, time + 10); + assertEquals(0, result); + // rate = 3 / 2999ms > 1 + result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + 2999); + assertQuotaMetricValue(QuotaType.requestRate(), (double) 3 / 2.999, time + 2999); + assertEquals(1, result); + + brokerQuotaManager.updateQuota(QuotaType.requestRate(), 2); + // rate = 4 / 2999ms + result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + 2999); + assertQuotaMetricValue(QuotaType.requestRate(), (double) 4 / 2.999, time + 2999); assertEquals(0, result); + // rate = 5 / 2999ms + result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + 2999); + assertQuotaMetricValue(QuotaType.requestRate(), (double) 5 / 2.999, time + 2999); + assertEquals(0, result); + // rate = 6 / 2999ms > 2 + result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + 2999); + assertQuotaMetricValue(QuotaType.requestRate(), (double) 6 / 2.999, time + 2999); + assertEquals(1, result); + + brokerQuotaManager.updateQuota(QuotaType.requestRate(), 1); + // rate = 5 / 2999ms > 1 + result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + 2999 + 2999); + assertQuotaMetricValue(QuotaType.requestRate(), (double) 5 / 2.999, time + 2999 + 2999); + assertEquals(1000, result); + // rate = 2 / 2001ms + result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + 2999 + 2999 + 1); + assertQuotaMetricValue(QuotaType.requestRate(), (double) 2 / 2.001, time + 2999 + 2999 + 1); + assertEquals(0, result); + // rate = 3 / 2999ms > 1 + result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + 2999 + 2999 + 2999); + assertQuotaMetricValue(QuotaType.requestRate(), (double) 3 / 2.999, time + 2999 + 2999 + 2999); + assertEquals(1, result); } @Test public void testThrottle() { AtomicInteger throttleCounter = new AtomicInteger(0); - brokerQuotaManager.throttle(QuotaType.request(), new ThrottleCallback() { + brokerQuotaManager.throttle(QuotaType.requestRate(), new ThrottleCallback() { @Override public void startThrottling() { throttleCounter.incrementAndGet(); @@ -184,4 +272,9 @@ public void testWhiteList() { result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.produce(), request, 1000, time.milliseconds()); assertEquals(0, result); } + + private void assertQuotaMetricValue(QuotaType quotaType, double expected, long timeMs) { + double value = brokerQuotaManager.getQuotaMetricValue(quotaType, timeMs).get(); + assertEquals(expected, value); + } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/NoopObservableDoubleGauge.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/NoopObservableDoubleGauge.java new file mode 100644 index 0000000000..6486d55a9d --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/NoopObservableDoubleGauge.java @@ -0,0 +1,17 @@ +/* + * Copyright 2024, AutoMQ HK Limited. + * + * The use of this file is governed by the Business Source License, + * as detailed in the file "/LICENSE.S3Stream" included in this repository. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package com.automq.stream.s3.metrics; + +import io.opentelemetry.api.metrics.ObservableDoubleGauge; + +public class NoopObservableDoubleGauge implements ObservableDoubleGauge { +} diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java index 38f07a4117..e6830301c1 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java @@ -151,4 +151,8 @@ public class S3StreamMetricsConstant { public static final String LABEL_STAGE_GET_OBJECTS = "get_objects"; public static final String LABEL_STAGE_FIND_INDEX = "find_index"; public static final String LABEL_STAGE_COMPUTE = "compute"; + + // Broker Quota + public static final String BROKER_QUOTA_LIMIT_METRIC_NAME = "broker_quota_limit"; + public static final AttributeKey LABEL_BROKER_QUOTA_TYPE = AttributeKey.stringKey("type"); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java index 32228a8f28..8ebe755856 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java @@ -34,6 +34,7 @@ import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.metrics.LongCounter; import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.ObservableDoubleGauge; import io.opentelemetry.api.metrics.ObservableLongGauge; import static com.automq.stream.s3.metrics.S3StreamMetricsConstant.LABEL_CACHE_NAME; @@ -139,10 +140,16 @@ public class S3StreamMetricsManager { private static final MultiAttributes OPERATOR_INDEX_ATTRIBUTES = new MultiAttributes<>(Attributes.empty(), S3StreamMetricsConstant.LABEL_INDEX); + // Broker Quota + private static final MultiAttributes BROKER_QUOTA_TYPE_ATTRIBUTES = new MultiAttributes<>(Attributes.empty(), + S3StreamMetricsConstant.LABEL_BROKER_QUOTA_TYPE); + private static ObservableDoubleGauge brokerQuotaLimit = new NoopObservableDoubleGauge(); + private static Supplier> brokerQuotaLimitSupplier = () -> new ConcurrentHashMap<>(); static { BASE_ATTRIBUTES_LISTENERS.add(ALLOC_TYPE_ATTRIBUTES); BASE_ATTRIBUTES_LISTENERS.add(OPERATOR_INDEX_ATTRIBUTES); + BASE_ATTRIBUTES_LISTENERS.add(BROKER_QUOTA_TYPE_ATTRIBUTES); } public static void configure(MetricsConfig metricsConfig) { @@ -400,6 +407,7 @@ public static void initMetrics(Meter meter, String prefix) { }); initAsyncCacheMetrics(meter, prefix); + initBrokerQuotaMetrics(meter, prefix); } private static void initAsyncCacheMetrics(Meter meter, String prefix) { @@ -475,6 +483,25 @@ private static void initAsyncCacheMetrics(Meter meter, String prefix) { }); } + private static void initBrokerQuotaMetrics(Meter meter, String prefix) { + brokerQuotaLimit = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.BROKER_QUOTA_LIMIT_METRIC_NAME) + .setDescription("Broker quota limit") + .buildWithCallback(result -> { + if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel())) { + Map brokerQuotaLimitMap = brokerQuotaLimitSupplier.get(); + for (Map.Entry entry : brokerQuotaLimitMap.entrySet()) { + String quotaType = entry.getKey(); + Double quotaLimit = entry.getValue(); + // drop too large values + if (quotaLimit > 1e15) { + continue; + } + result.record(quotaLimit, BROKER_QUOTA_TYPE_ATTRIBUTES.get(quotaType)); + } + } + }); + } + public static void registerNetworkLimiterQueueSizeSupplier(AsyncNetworkBandwidthLimiter.Type type, Supplier networkLimiterQueueSizeSupplier) { switch (type) { @@ -907,4 +934,8 @@ public static void registerLocalStreamRangeIndexCacheSizeSupplier(Supplier localStreamRangeIndexCacheStreamNum) { S3StreamMetricsManager.localStreamRangeIndexCacheStreamNum = localStreamRangeIndexCacheStreamNum; } + + public static void registerBrokerQuotaLimitSupplier(Supplier> brokerQuotaLimitSupplier) { + S3StreamMetricsManager.brokerQuotaLimitSupplier = brokerQuotaLimitSupplier; + } } diff --git a/s3stream/src/main/java/com/automq/stream/utils/ThreadUtils.java b/s3stream/src/main/java/com/automq/stream/utils/ThreadUtils.java index 449fc1d2e8..805253b5e0 100644 --- a/s3stream/src/main/java/com/automq/stream/utils/ThreadUtils.java +++ b/s3stream/src/main/java/com/automq/stream/utils/ThreadUtils.java @@ -12,8 +12,11 @@ package com.automq.stream.utils; import org.slf4j.Logger; +import org.slf4j.helpers.NOPLogger; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import io.netty.util.concurrent.FastThreadLocalThread; @@ -79,4 +82,33 @@ public static Runnable wrapRunnable(Runnable runnable, Logger logger) { } }; } + + /** + * A wrapper of {@link #shutdownExecutor} without logging. + */ + public static void shutdownExecutor(ExecutorService executorService, long timeout, TimeUnit timeUnit) { + shutdownExecutor(executorService, timeout, timeUnit, NOPLogger.NOP_LOGGER); + } + + /** + * Shuts down an executor service in two phases, first by calling shutdown to reject incoming tasks, + * and then calling shutdownNow, if necessary, to cancel any lingering tasks. + * After the timeout/on interrupt, the service is forcefully closed. + */ + public static void shutdownExecutor(ExecutorService executorService, long timeout, TimeUnit timeUnit, + Logger logger) { + if (null == executorService) { + return; + } + executorService.shutdown(); + try { + if (!executorService.awaitTermination(timeout, timeUnit)) { + executorService.shutdownNow(); + logger.error("Executor {} did not terminate in time, forcefully shutting down", executorService); + } + } catch (InterruptedException e) { + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + } } diff --git a/server-common/src/main/java/org/apache/kafka/server/config/QuotaConfigs.java b/server-common/src/main/java/org/apache/kafka/server/config/QuotaConfigs.java index db5ecdd872..b1e5e2a020 100644 --- a/server-common/src/main/java/org/apache/kafka/server/config/QuotaConfigs.java +++ b/server-common/src/main/java/org/apache/kafka/server/config/QuotaConfigs.java @@ -104,9 +104,15 @@ public class QuotaConfigs { "for the specified IP."; // AutoMQ inject start + /** + * All clients created by AutoMQ will have this prefix in their client id, and they will be excluded from quota. + */ + public static final String INTERNAL_CLIENT_ID_PREFIX = "__automq_client_"; + public static final String BROKER_QUOTA_ENABLED_CONFIG = "broker.quota.enabled"; public static final String BROKER_QUOTA_PRODUCE_BYTES_CONFIG = "broker.quota.produce.bytes"; public static final String BROKER_QUOTA_FETCH_BYTES_CONFIG = "broker.quota.fetch.bytes"; + public static final String BROKER_QUOTA_SLOW_FETCH_BYTES_CONFIG = "broker.quota.slow.fetch.bytes"; public static final String BROKER_QUOTA_REQUEST_RATE_CONFIG = "broker.quota.request.rate"; public static final String BROKER_QUOTA_WHITE_LIST_USER_CONFIG = "broker.quota.white.list.user"; public static final String BROKER_QUOTA_WHITE_LIST_CLIENT_ID_CONFIG = "broker.quota.white.list.client.id"; @@ -117,6 +123,7 @@ public class QuotaConfigs { public static final String BROKER_QUOTA_ENABLED_DOC = "Enable broker quota."; public static final String BROKER_QUOTA_PRODUCE_BYTES_DOC = "The maximum bytes send by producer in single window."; public static final String BROKER_QUOTA_FETCH_BYTES_DOC = "The maximum bytes receive by consumer in single window."; + public static final String BROKER_QUOTA_SLOW_FETCH_BYTES_DOC = "The maximum bytes receive by slow fetch consumer in single window."; public static final String BROKER_QUOTA_REQUEST_RATE_DOC = "The maximum request count send by client in single window."; public static final String BROKER_QUOTA_WHITE_LIST_USER_DOC = "Broker quota white list for user."; public static final String BROKER_QUOTA_WHITE_LIST_CLIENT_ID_DOC = "Broker quota white list for client id."; @@ -176,6 +183,7 @@ public static ConfigDef brokerQuotaConfigs() { .define(QuotaConfigs.BROKER_QUOTA_ENABLED_CONFIG, ConfigDef.Type.BOOLEAN, false, MEDIUM, QuotaConfigs.BROKER_QUOTA_ENABLED_DOC) .define(QuotaConfigs.BROKER_QUOTA_PRODUCE_BYTES_CONFIG, ConfigDef.Type.DOUBLE, Double.MAX_VALUE, MEDIUM, QuotaConfigs.BROKER_QUOTA_PRODUCE_BYTES_DOC) .define(QuotaConfigs.BROKER_QUOTA_FETCH_BYTES_CONFIG, ConfigDef.Type.DOUBLE, Double.MAX_VALUE, MEDIUM, QuotaConfigs.BROKER_QUOTA_FETCH_BYTES_DOC) + .define(QuotaConfigs.BROKER_QUOTA_SLOW_FETCH_BYTES_CONFIG, ConfigDef.Type.DOUBLE, Double.MAX_VALUE, MEDIUM, QuotaConfigs.BROKER_QUOTA_SLOW_FETCH_BYTES_DOC) .define(QuotaConfigs.BROKER_QUOTA_REQUEST_RATE_CONFIG, ConfigDef.Type.DOUBLE, Double.MAX_VALUE, MEDIUM, QuotaConfigs.BROKER_QUOTA_REQUEST_RATE_DOC) .define(QuotaConfigs.BROKER_QUOTA_WHITE_LIST_USER_CONFIG, ConfigDef.Type.STRING, "", MEDIUM, QuotaConfigs.BROKER_QUOTA_WHITE_LIST_USER_DOC) .define(QuotaConfigs.BROKER_QUOTA_WHITE_LIST_CLIENT_ID_CONFIG, ConfigDef.Type.STRING, "", MEDIUM, QuotaConfigs.BROKER_QUOTA_WHITE_LIST_CLIENT_ID_DOC) diff --git a/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsConstants.java b/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsConstants.java index 98ea1b5bf6..5fd7f3d624 100644 --- a/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsConstants.java +++ b/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsConstants.java @@ -20,7 +20,10 @@ public class S3StreamKafkaMetricsConstants { public static final String STREAM_SET_OBJECT_NUM = "stream_set_object_num"; public static final String STREAM_OBJECT_NUM = "stream_object_num"; public static final String FETCH_LIMITER_PERMIT_NUM = "fetch_limiter_permit_num"; + public static final String FETCH_LIMITER_WAITING_TASK_NUM = "fetch_limiter_waiting_task_num"; public static final String FETCH_PENDING_TASK_NUM = "fetch_pending_task_num"; + public static final String FETCH_LIMITER_TIMEOUT_COUNT = "fetch_limiter_timeout_count"; + public static final String FETCH_LIMITER_TIME = "fetch_limiter_time"; public static final String LOG_APPEND_PERMIT_NUM = "log_append_permit_num"; public static final String SLOW_BROKER_METRIC_NAME = "slow_broker_count"; public static final String TOPIC_PARTITION_COUNT_METRIC_NAME = "topic_partition_count"; @@ -45,4 +48,8 @@ public class S3StreamKafkaMetricsConstants { public static final String PARTITION_STATUS_STATISTICS_METRIC_NAME = "partition_status_statistics"; public static final AttributeKey LABEL_STATUS = AttributeKey.stringKey("status"); + + // Back Pressure + public static final String BACK_PRESSURE_STATE_METRIC_NAME = "back_pressure_state"; + public static final AttributeKey LABEL_BACK_PRESSURE_STATE = AttributeKey.stringKey("state"); } diff --git a/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsManager.java b/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsManager.java index 27d2426eae..e769c58361 100644 --- a/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsManager.java +++ b/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsManager.java @@ -13,23 +13,32 @@ import com.automq.stream.s3.metrics.MetricsConfig; import com.automq.stream.s3.metrics.MetricsLevel; +import com.automq.stream.s3.metrics.NoopLongCounter; import com.automq.stream.s3.metrics.NoopObservableLongGauge; import com.automq.stream.s3.metrics.wrapper.ConfigListener; +import com.automq.stream.s3.metrics.wrapper.CounterMetric; +import com.automq.stream.s3.metrics.wrapper.HistogramInstrument; +import com.automq.stream.s3.metrics.wrapper.HistogramMetric; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Function; import java.util.function.Supplier; import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.LongCounter; import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.api.metrics.ObservableLongGauge; public class S3StreamKafkaMetricsManager { private static final List BASE_ATTRIBUTES_LISTENERS = new ArrayList<>(); + + public static final List FETCH_LIMITER_TIME_METRICS = new CopyOnWriteArrayList<>(); + private static final MultiAttributes BROKER_ATTRIBUTES = new MultiAttributes<>(Attributes.empty(), S3StreamKafkaMetricsConstants.LABEL_NODE_ID); private static final MultiAttributes S3_OBJECT_ATTRIBUTES = new MultiAttributes<>(Attributes.empty(), @@ -40,12 +49,15 @@ public class S3StreamKafkaMetricsManager { S3StreamKafkaMetricsConstants.LABEL_FETCH_EXECUTOR_NAME); private static final MultiAttributes PARTITION_STATUS_STATISTICS_ATTRIBUTES = new MultiAttributes<>(Attributes.empty(), S3StreamKafkaMetricsConstants.LABEL_STATUS); + private static final MultiAttributes BACK_PRESSURE_STATE_ATTRIBUTES = new MultiAttributes<>(Attributes.empty(), + S3StreamKafkaMetricsConstants.LABEL_BACK_PRESSURE_STATE); static { BASE_ATTRIBUTES_LISTENERS.add(BROKER_ATTRIBUTES); BASE_ATTRIBUTES_LISTENERS.add(S3_OBJECT_ATTRIBUTES); BASE_ATTRIBUTES_LISTENERS.add(FETCH_LIMITER_ATTRIBUTES); BASE_ATTRIBUTES_LISTENERS.add(FETCH_EXECUTOR_ATTRIBUTES); + BASE_ATTRIBUTES_LISTENERS.add(BACK_PRESSURE_STATE_ATTRIBUTES); } private static Supplier isActiveSupplier = () -> false; @@ -59,10 +71,16 @@ public class S3StreamKafkaMetricsManager { private static Supplier> streamSetObjectNumSupplier = Collections::emptyMap; private static ObservableLongGauge streamObjectNumMetrics = new NoopObservableLongGauge(); private static Supplier streamObjectNumSupplier = () -> 0; + private static ObservableLongGauge fetchLimiterPermitNumMetrics = new NoopObservableLongGauge(); private static Supplier> fetchLimiterPermitNumSupplier = Collections::emptyMap; + private static ObservableLongGauge fetchLimiterWaitingTaskNumMetrics = new NoopObservableLongGauge(); + private static Supplier> fetchLimiterWaitingTaskNumSupplier = Collections::emptyMap; private static ObservableLongGauge fetchPendingTaskNumMetrics = new NoopObservableLongGauge(); private static Supplier> fetchPendingTaskNumSupplier = Collections::emptyMap; + private static LongCounter fetchLimiterTimeoutCount = new NoopLongCounter(); + private static HistogramInstrument fetchLimiterTime; + private static ObservableLongGauge logAppendPermitNumMetrics = new NoopObservableLongGauge(); private static Supplier logAppendPermitNumSupplier = () -> 0; private static MetricsConfig metricsConfig = new MetricsConfig(MetricsLevel.INFO, Attributes.empty()); @@ -75,6 +93,13 @@ public class S3StreamKafkaMetricsManager { private static List partitionStatusList = Collections.emptyList(); private static Function partitionStatusStatisticsSupplier = s -> 0; + private static ObservableLongGauge backPressureState = new NoopObservableLongGauge(); + /** + * Supplier for back pressure state. + * Key is the state name, value is 1 for current state, -1 for other states. + */ + private static Supplier> backPressureStateSupplier = Collections::emptyMap; + public static void configure(MetricsConfig metricsConfig) { synchronized (BASE_ATTRIBUTES_LISTENERS) { S3StreamKafkaMetricsManager.metricsConfig = metricsConfig; @@ -90,6 +115,7 @@ public static void initMetrics(Meter meter, String prefix) { initFetchMetrics(meter, prefix); initLogAppendMetrics(meter, prefix); initPartitionStatusStatisticsMetrics(meter, prefix); + initBackPressureMetrics(meter, prefix); } private static void initAutoBalancerMetrics(Meter meter, String prefix) { @@ -193,6 +219,18 @@ private static void initFetchMetrics(Meter meter, String prefix) { } } }); + fetchLimiterWaitingTaskNumMetrics = meter.gaugeBuilder(prefix + S3StreamKafkaMetricsConstants.FETCH_LIMITER_WAITING_TASK_NUM) + .setDescription("The number of tasks waiting for permits in fetch limiters") + .ofLongs() + .buildWithCallback(result -> { + if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel())) { + Map fetchLimiterWaitingTaskNumMap = fetchLimiterWaitingTaskNumSupplier.get(); + for (Map.Entry entry : fetchLimiterWaitingTaskNumMap.entrySet()) { + result.record(entry.getValue(), FETCH_LIMITER_ATTRIBUTES.get(entry.getKey())); + } + } + }); + fetchPendingTaskNumMetrics = meter.gaugeBuilder(prefix + S3StreamKafkaMetricsConstants.FETCH_PENDING_TASK_NUM) .setDescription("The number of pending tasks in fetch executors") .ofLongs() @@ -204,6 +242,12 @@ private static void initFetchMetrics(Meter meter, String prefix) { } } }); + + fetchLimiterTimeoutCount = meter.counterBuilder(prefix + S3StreamKafkaMetricsConstants.FETCH_LIMITER_TIMEOUT_COUNT) + .setDescription("The number of acquire permits timeout in fetch limiters") + .build(); + fetchLimiterTime = new HistogramInstrument(meter, prefix + S3StreamKafkaMetricsConstants.FETCH_LIMITER_TIME, + "The time cost of acquire permits in fetch limiters", "nanoseconds", () -> FETCH_LIMITER_TIME_METRICS); } private static void initLogAppendMetrics(Meter meter, String prefix) { @@ -230,6 +274,20 @@ private static void initPartitionStatusStatisticsMetrics(Meter meter, String pre }); } + private static void initBackPressureMetrics(Meter meter, String prefix) { + backPressureState = meter.gaugeBuilder(prefix + S3StreamKafkaMetricsConstants.BACK_PRESSURE_STATE_METRIC_NAME) + .setDescription("Back pressure state") + .ofLongs() + .buildWithCallback(result -> { + if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel())) { + Map states = backPressureStateSupplier.get(); + states.forEach((state, value) -> { + result.record(value, BACK_PRESSURE_STATE_ATTRIBUTES.get(state)); + }); + } + }); + } + public static void setIsActiveSupplier(Supplier isActiveSupplier) { S3StreamKafkaMetricsManager.isActiveSupplier = isActiveSupplier; } @@ -258,10 +316,31 @@ public static void setFetchLimiterPermitNumSupplier(Supplier> fetchLimiterWaitingTaskNumSupplier) { + S3StreamKafkaMetricsManager.fetchLimiterWaitingTaskNumSupplier = fetchLimiterWaitingTaskNumSupplier; + } + public static void setFetchPendingTaskNumSupplier(Supplier> fetchPendingTaskNumSupplier) { S3StreamKafkaMetricsManager.fetchPendingTaskNumSupplier = fetchPendingTaskNumSupplier; } + public static CounterMetric buildFetchLimiterTimeoutMetric(String limiterName) { + synchronized (BASE_ATTRIBUTES_LISTENERS) { + CounterMetric metric = new CounterMetric(metricsConfig, FETCH_LIMITER_ATTRIBUTES.get(limiterName), () -> fetchLimiterTimeoutCount); + BASE_ATTRIBUTES_LISTENERS.add(metric); + return metric; + } + } + + public static HistogramMetric buildFetchLimiterTimeMetric(MetricsLevel metricsLevel, String limiterName) { + synchronized (BASE_ATTRIBUTES_LISTENERS) { + HistogramMetric metric = new HistogramMetric(metricsLevel, metricsConfig, FETCH_LIMITER_ATTRIBUTES.get(limiterName)); + BASE_ATTRIBUTES_LISTENERS.add(metric); + FETCH_LIMITER_TIME_METRICS.add(metric); + return metric; + } + } + public static void setLogAppendPermitNumSupplier(Supplier logAppendPermitNumSupplier) { S3StreamKafkaMetricsManager.logAppendPermitNumSupplier = logAppendPermitNumSupplier; } @@ -278,4 +357,8 @@ public static void setPartitionStatusStatisticsSupplier(List partitionSt public static void setTopicPartitionCountMetricsSupplier(Supplier topicPartitionCountSupplier) { S3StreamKafkaMetricsManager.topicPartitionCountSupplier = topicPartitionCountSupplier; } + + public static void setBackPressureStateSupplier(Supplier> backPressureStateSupplier) { + S3StreamKafkaMetricsManager.backPressureStateSupplier = backPressureStateSupplier; + } } diff --git a/server/src/main/java/org/apache/kafka/server/config/BrokerQuotaManagerConfig.java b/server/src/main/java/org/apache/kafka/server/config/BrokerQuotaManagerConfig.java index 53538e55c7..a09b0e0f44 100644 --- a/server/src/main/java/org/apache/kafka/server/config/BrokerQuotaManagerConfig.java +++ b/server/src/main/java/org/apache/kafka/server/config/BrokerQuotaManagerConfig.java @@ -26,7 +26,8 @@ public class BrokerQuotaManagerConfig extends ClientQuotaManagerConfig { private boolean quotaEnabled = false; private double produceQuota = Double.MAX_VALUE; private double fetchQuota = Double.MAX_VALUE; - private double requestQuota = Double.MAX_VALUE; + private double slowFetchQuota = Double.MAX_VALUE; + private double requestRateQuota = Double.MAX_VALUE; private List userWhiteList = List.of(); private List clientIdWhiteList = List.of(); @@ -42,7 +43,8 @@ public void update(Properties props) { quotaEnabled = getBoolean(map, QuotaConfigs.BROKER_QUOTA_ENABLED_CONFIG, quotaEnabled); produceQuota = getDouble(map, QuotaConfigs.BROKER_QUOTA_PRODUCE_BYTES_CONFIG, produceQuota); fetchQuota = getDouble(map, QuotaConfigs.BROKER_QUOTA_FETCH_BYTES_CONFIG, fetchQuota); - requestQuota = getDouble(map, QuotaConfigs.BROKER_QUOTA_REQUEST_RATE_CONFIG, requestQuota); + slowFetchQuota = getDouble(map, QuotaConfigs.BROKER_QUOTA_SLOW_FETCH_BYTES_CONFIG, slowFetchQuota); + requestRateQuota = getDouble(map, QuotaConfigs.BROKER_QUOTA_REQUEST_RATE_CONFIG, requestRateQuota); String userWhiteListProp = props.getProperty(QuotaConfigs.BROKER_QUOTA_WHITE_LIST_USER_CONFIG); if (null != userWhiteListProp && !userWhiteListProp.isBlank()) { @@ -72,12 +74,32 @@ public double produceQuota() { return produceQuota; } + public void produceQuota(double produceQuota) { + this.produceQuota = produceQuota; + } + public double fetchQuota() { return fetchQuota; } - public double requestQuota() { - return requestQuota; + public void fetchQuota(double fetchQuota) { + this.fetchQuota = fetchQuota; + } + + public double slowFetchQuota() { + return slowFetchQuota; + } + + public void slowFetchQuota(double slowFetchQuota) { + this.slowFetchQuota = slowFetchQuota; + } + + public double requestRateQuota() { + return requestRateQuota; + } + + public void requestRateQuota(double requestRateQuota) { + this.requestRateQuota = requestRateQuota; } public List userWhiteList() {