diff --git a/core/src/main/java/kafka/automq/AutoMQConfig.java b/core/src/main/java/kafka/automq/AutoMQConfig.java
index 6a6490cff9..aa9a7e5dad 100644
--- a/core/src/main/java/kafka/automq/AutoMQConfig.java
+++ b/core/src/main/java/kafka/automq/AutoMQConfig.java
@@ -28,6 +28,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
@@ -171,6 +172,14 @@ public class AutoMQConfig {
public static final String CLUSTER_ID_CONFIG = "cluster.id";
public static final String CLUSTER_ID_DOC = "If the cluster.id is set, Kafka will auto format the storage.";
+ public static final String S3_BACK_PRESSURE_ENABLED_CONFIG = "automq.backpressure.enabled";
+ public static final String S3_BACK_PRESSURE_ENABLED_DOC = "Whether back pressure is enabled";
+ public static final boolean S3_BACK_PRESSURE_ENABLED_DEFAULT = true;
+
+ public static final String S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG = "automq.backpressure.cooldown.ms";
+ public static final String S3_BACK_PRESSURE_COOLDOWN_MS_DOC = "The cooldown time in milliseconds to wait between two regulator actions";
+ public static final long S3_BACK_PRESSURE_COOLDOWN_MS_DEFAULT = TimeUnit.SECONDS.toMillis(15);
+
// Deprecated config start
public static final String S3_ENDPOINT_CONFIG = "s3.endpoint";
public static final String S3_ENDPOINT_DOC = "[DEPRECATED]please use s3.data.buckets. The object storage endpoint, ex. https://s3.us-east-1.amazonaws.com
.";
@@ -255,6 +264,8 @@ public static void define(ConfigDef configDef) {
.define(AutoMQConfig.S3_TELEMETRY_EXPORTER_REPORT_INTERVAL_MS_CONFIG, INT, S3_METRICS_EXPORTER_REPORT_INTERVAL_MS, MEDIUM, AutoMQConfig.S3_TELEMETRY_EXPORTER_REPORT_INTERVAL_MS_DOC)
.define(AutoMQConfig.S3_TELEMETRY_METRICS_EXPORTER_URI_CONFIG, STRING, null, HIGH, AutoMQConfig.S3_TELEMETRY_METRICS_EXPORTER_URI_DOC)
.define(AutoMQConfig.S3_TELEMETRY_METRICS_BASE_LABELS_CONFIG, STRING, null, MEDIUM, AutoMQConfig.S3_TELEMETRY_METRICS_BASE_LABELS_DOC)
+ .define(AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG, BOOLEAN, AutoMQConfig.S3_BACK_PRESSURE_ENABLED_DEFAULT, MEDIUM, AutoMQConfig.S3_BACK_PRESSURE_ENABLED_DOC)
+ .define(AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG, LONG, AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_DEFAULT, MEDIUM, AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_DOC)
// Deprecated config start
.define(AutoMQConfig.S3_ENDPOINT_CONFIG, STRING, null, HIGH, AutoMQConfig.S3_ENDPOINT_DOC)
.define(AutoMQConfig.S3_REGION_CONFIG, STRING, null, HIGH, AutoMQConfig.S3_REGION_DOC)
diff --git a/core/src/main/java/kafka/automq/backpressure/BackPressureConfig.java b/core/src/main/java/kafka/automq/backpressure/BackPressureConfig.java
new file mode 100644
index 0000000000..270699b1b4
--- /dev/null
+++ b/core/src/main/java/kafka/automq/backpressure/BackPressureConfig.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2024, AutoMQ HK Limited.
+ *
+ * The use of this file is governed by the Business Source License,
+ * as detailed in the file "/LICENSE.S3Stream" included in this repository.
+ *
+ * As of the Change Date specified in that file, in accordance with
+ * the Business Source License, use of this software will be governed
+ * by the Apache License, Version 2.0
+ */
+
+package kafka.automq.backpressure;
+
+import kafka.automq.AutoMQConfig;
+import kafka.server.KafkaConfig;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.ConfigUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class BackPressureConfig {
+
+ public static final Set RECONFIGURABLE_CONFIGS = Set.of(
+ AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG,
+ AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG
+ );
+
+ private volatile boolean enabled;
+ /**
+ * The cooldown time in milliseconds to wait between two regulator actions.
+ */
+ private long cooldownMs;
+
+ public static BackPressureConfig from(KafkaConfig config) {
+ return new BackPressureConfig(config.s3BackPressureEnabled(), config.s3BackPressureCooldownMs());
+ }
+
+ public static BackPressureConfig from(Map raw) {
+ Map configs = new HashMap<>(raw);
+ return new BackPressureConfig(
+ ConfigUtils.getBoolean(configs, AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG),
+ ConfigUtils.getLong(configs, AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG)
+ );
+ }
+
+ public BackPressureConfig(boolean enabled, long cooldownMs) {
+ this.enabled = enabled;
+ this.cooldownMs = cooldownMs;
+ }
+
+ public static void validate(Map raw) throws ConfigException {
+ Map configs = new HashMap<>(raw);
+ if (configs.containsKey(AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG)) {
+ ConfigUtils.getBoolean(configs, AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG);
+ }
+ if (configs.containsKey(AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG)) {
+ validateCooldownMs(ConfigUtils.getLong(configs, AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG));
+ }
+ }
+
+ public static void validateCooldownMs(long cooldownMs) throws ConfigException {
+ if (cooldownMs < 0) {
+ throw new ConfigException(AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG, cooldownMs, "The cooldown time must be non-negative.");
+ }
+ }
+
+ public void update(Map raw) {
+ Map configs = new HashMap<>(raw);
+ if (configs.containsKey(AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG)) {
+ this.enabled = ConfigUtils.getBoolean(configs, AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG);
+ }
+ if (configs.containsKey(AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG)) {
+ this.cooldownMs = ConfigUtils.getLong(configs, AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG);
+ }
+ }
+
+ public boolean enabled() {
+ return enabled;
+ }
+
+ public long cooldownMs() {
+ return cooldownMs;
+ }
+}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/backpressure/BackPressureManager.java b/core/src/main/java/kafka/automq/backpressure/BackPressureManager.java
similarity index 86%
rename from s3stream/src/main/java/com/automq/stream/s3/backpressure/BackPressureManager.java
rename to core/src/main/java/kafka/automq/backpressure/BackPressureManager.java
index 0aa9fa3151..51ea14bdef 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/backpressure/BackPressureManager.java
+++ b/core/src/main/java/kafka/automq/backpressure/BackPressureManager.java
@@ -9,13 +9,15 @@
* by the Apache License, Version 2.0
*/
-package com.automq.stream.s3.backpressure;
+package kafka.automq.backpressure;
+
+import org.apache.kafka.common.Reconfigurable;
/**
* It checks the {@link LoadLevel} of the system and takes actions based on the load level
* to prevent the system from being overwhelmed.
*/
-public interface BackPressureManager {
+public interface BackPressureManager extends Reconfigurable {
/**
* Start the back pressure manager.
diff --git a/s3stream/src/main/java/com/automq/stream/s3/backpressure/Checker.java b/core/src/main/java/kafka/automq/backpressure/Checker.java
similarity index 94%
rename from s3stream/src/main/java/com/automq/stream/s3/backpressure/Checker.java
rename to core/src/main/java/kafka/automq/backpressure/Checker.java
index 2e1cdaeefe..085d4135f9 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/backpressure/Checker.java
+++ b/core/src/main/java/kafka/automq/backpressure/Checker.java
@@ -9,7 +9,7 @@
* by the Apache License, Version 2.0
*/
-package com.automq.stream.s3.backpressure;
+package kafka.automq.backpressure;
/**
* A checker to check the load level of the system periodically.
diff --git a/s3stream/src/main/java/com/automq/stream/s3/backpressure/DefaultBackPressureManager.java b/core/src/main/java/kafka/automq/backpressure/DefaultBackPressureManager.java
similarity index 70%
rename from s3stream/src/main/java/com/automq/stream/s3/backpressure/DefaultBackPressureManager.java
rename to core/src/main/java/kafka/automq/backpressure/DefaultBackPressureManager.java
index 10cfea19e6..d764fbfb77 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/backpressure/DefaultBackPressureManager.java
+++ b/core/src/main/java/kafka/automq/backpressure/DefaultBackPressureManager.java
@@ -9,29 +9,29 @@
* by the Apache License, Version 2.0
*/
-package com.automq.stream.s3.backpressure;
+package kafka.automq.backpressure;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.server.metrics.s3stream.S3StreamKafkaMetricsManager;
-import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.utils.ThreadUtils;
import com.automq.stream.utils.Threads;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class DefaultBackPressureManager implements BackPressureManager {
+import static kafka.automq.backpressure.BackPressureConfig.RECONFIGURABLE_CONFIGS;
- public static final long DEFAULT_COOLDOWN_MS = TimeUnit.SECONDS.toMillis(15);
+public class DefaultBackPressureManager implements BackPressureManager {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultBackPressureManager.class);
+ private final BackPressureConfig config;
private final Regulator regulator;
- /**
- * The cooldown time in milliseconds to wait between two regulator actions.
- */
- private final long cooldownMs;
/**
* The scheduler to schedule the checker periodically.
@@ -53,20 +53,23 @@ public class DefaultBackPressureManager implements BackPressureManager {
* Only used for logging and monitoring.
*/
private LoadLevel lastRegulateLevel = LoadLevel.NORMAL;
+ /**
+ * The current state metrics of the system.
+ * Only used for monitoring.
+ *
+ * @see S3StreamKafkaMetricsManager#setBackPressureStateSupplier
+ */
+ private final Map stateMetrics = new HashMap<>(LoadLevel.values().length);
- public DefaultBackPressureManager(Regulator regulator) {
- this(regulator, DEFAULT_COOLDOWN_MS);
- }
-
- public DefaultBackPressureManager(Regulator regulator, long cooldownMs) {
+ public DefaultBackPressureManager(BackPressureConfig config, Regulator regulator) {
+ this.config = config;
this.regulator = regulator;
- this.cooldownMs = cooldownMs;
}
@Override
public void start() {
this.checkerScheduler = Threads.newSingleThreadScheduledExecutor(ThreadUtils.createThreadFactory("back-pressure-checker-%d", false), LOGGER);
- S3StreamMetricsManager.registerBackPressureStateSupplier(this::currentLoadLevel);
+ S3StreamKafkaMetricsManager.setBackPressureStateSupplier(this::stateMetrics);
}
@Override
@@ -83,6 +86,9 @@ public void shutdown() {
}
private void maybeRegulate() {
+ if (!config.enabled()) {
+ return;
+ }
maybeRegulate(false);
}
@@ -96,11 +102,11 @@ private void maybeRegulate(boolean isInternal) {
long now = System.currentTimeMillis();
long timeElapsed = now - lastRegulateTime;
- if (timeElapsed < cooldownMs) {
+ if (timeElapsed < config.cooldownMs()) {
// Skip regulating if the cooldown time has not passed.
if (!isInternal) {
// Schedule the next regulate action if it is not an internal call.
- checkerScheduler.schedule(() -> maybeRegulate(true), cooldownMs - timeElapsed, TimeUnit.MILLISECONDS);
+ checkerScheduler.schedule(() -> maybeRegulate(true), config.cooldownMs() - timeElapsed, TimeUnit.MILLISECONDS);
}
return;
}
@@ -132,4 +138,32 @@ private void regulate(LoadLevel loadLevel, long now) {
lastRegulateTime = now;
lastRegulateLevel = loadLevel;
}
+
+ private Map stateMetrics() {
+ LoadLevel current = currentLoadLevel();
+ for (LoadLevel level : LoadLevel.values()) {
+ int value = level.equals(current) ? 1 : -1;
+ stateMetrics.put(level.name(), value);
+ }
+ return stateMetrics;
+ }
+
+ @Override
+ public Set reconfigurableConfigs() {
+ return RECONFIGURABLE_CONFIGS;
+ }
+
+ @Override
+ public void validateReconfiguration(Map configs) throws ConfigException {
+ BackPressureConfig.validate(configs);
+ }
+
+ @Override
+ public void reconfigure(Map configs) {
+ config.update(configs);
+ }
+
+ @Override
+ public void configure(Map configs) {
+ }
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/backpressure/LoadLevel.java b/core/src/main/java/kafka/automq/backpressure/LoadLevel.java
similarity index 96%
rename from s3stream/src/main/java/com/automq/stream/s3/backpressure/LoadLevel.java
rename to core/src/main/java/kafka/automq/backpressure/LoadLevel.java
index 9fffe35c90..3ff1e48fcc 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/backpressure/LoadLevel.java
+++ b/core/src/main/java/kafka/automq/backpressure/LoadLevel.java
@@ -9,7 +9,7 @@
* by the Apache License, Version 2.0
*/
-package com.automq.stream.s3.backpressure;
+package kafka.automq.backpressure;
/**
* Represents the load level of the system.
diff --git a/s3stream/src/main/java/com/automq/stream/s3/backpressure/Regulator.java b/core/src/main/java/kafka/automq/backpressure/Regulator.java
similarity index 95%
rename from s3stream/src/main/java/com/automq/stream/s3/backpressure/Regulator.java
rename to core/src/main/java/kafka/automq/backpressure/Regulator.java
index 4b7eb057e2..942b022616 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/backpressure/Regulator.java
+++ b/core/src/main/java/kafka/automq/backpressure/Regulator.java
@@ -9,7 +9,7 @@
* by the Apache License, Version 2.0
*/
-package com.automq.stream.s3.backpressure;
+package kafka.automq.backpressure;
/**
* The Regulator class is responsible for controlling and limiting the rate of external requests.
diff --git a/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java b/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java
index 4ce280c1dd..716a3adfbb 100644
--- a/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java
+++ b/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java
@@ -89,8 +89,6 @@ public class DefaultS3Client implements Client {
protected CompactionManager compactionManager;
- protected BackPressureManager backPressureManager;
-
protected S3StreamClient streamClient;
protected KVClient kvClient;
@@ -151,7 +149,6 @@ public void start() {
this.objectManager.setCommitStreamSetObjectHook(localIndexCache::updateIndexFromRequest);
this.blockCache = new StreamReaders(this.config.blockCacheSize(), objectManager, objectStorage, objectReaderFactory);
this.compactionManager = new CompactionManager(this.config, this.objectManager, this.streamManager, compactionobjectStorage);
- this.backPressureManager = new DefaultBackPressureManager(backPressureRegulator());
this.writeAheadLog = buildWAL();
StorageFailureHandlerChain storageFailureHandler = new StorageFailureHandlerChain();
this.storage = new S3Storage(this.config, writeAheadLog, streamManager, objectManager, blockCache, objectStorage, storageFailureHandler);
@@ -168,13 +165,11 @@ public void start() {
this.storage.startup();
this.compactionManager.start();
- this.backPressureManager.start();
LOGGER.info("S3Client started");
}
@Override
public void shutdown() {
- this.backPressureManager.shutdown();
this.compactionManager.shutdown();
this.streamClient.shutdown();
this.storage.shutdown();
@@ -235,18 +230,6 @@ protected ObjectManager newObjectManager(int nodeId, long nodeEpoch, boolean fai
this::getAutoMQVersion, failoverMode);
}
- protected Regulator backPressureRegulator() {
- return new Regulator() {
- @Override
- public void increase() {
- }
-
- @Override
- public void decrease() {
- }
- };
- }
-
protected Failover failover() {
return new Failover(new FailoverFactory() {
@Override
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index 9496c1c441..041f2a5252 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -17,6 +17,7 @@
package kafka.server
+import kafka.automq.backpressure.{BackPressureConfig, BackPressureManager, DefaultBackPressureManager, Regulator}
import kafka.automq.zonerouter.{NoopProduceRouter, ProduceRouter}
import kafka.cluster.EndPoint
import kafka.coordinator.group.{CoordinatorLoaderImpl, CoordinatorPartitionWriter, GroupCoordinatorAdapter}
@@ -158,6 +159,8 @@ class BrokerServer(
def metadataLoader: MetadataLoader = sharedServer.loader
var produceRouter: ProduceRouter = _
+
+ var backPressureManager: BackPressureManager = _
// AutoMQ inject end
private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus): Boolean = {
@@ -540,6 +543,12 @@ class BrokerServer(
// AutoMQ inject start
ElasticLogManager.init(config, clusterId, this)
produceRouter = newProduceRouter()
+
+ backPressureManager = new DefaultBackPressureManager(
+ BackPressureConfig.from(config),
+ newBackPressureRegulator()
+ )
+ backPressureManager.start()
// AutoMQ inject end
// We're now ready to unfence the broker. This also allows this broker to transition
@@ -660,6 +669,10 @@ class BrokerServer(
lifecycleManager.beginShutdown()
// AutoMQ for Kafka inject start
+ if (backPressureManager != null) {
+ CoreUtils.swallow(backPressureManager.shutdown(), this)
+ }
+
// https://github.com/AutoMQ/automq-for-kafka/issues/540
// await partition shutdown:
// 1. after lifecycleManager start shutdown to trigger partitions gracefully reassign.
@@ -796,6 +809,16 @@ class BrokerServer(
dataPlaneRequestProcessor.asInstanceOf[ElasticKafkaApis].setProduceRouter(produceRouter)
produceRouter
}
+
+ protected def newBackPressureRegulator(): Regulator = {
+ new Regulator {
+ override def increase(): Unit = {
+ }
+
+ override def decrease(): Unit = {
+ }
+ }
+ }
// AutoMQ inject end
}
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index d3feaad484..d9dac744c4 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -18,6 +18,7 @@
package kafka.server
import kafka.autobalancer.config.{AutoBalancerControllerConfig, AutoBalancerMetricsReporterConfig}
+import kafka.automq.backpressure.BackPressureConfig
import java.util
import java.util.{Collections, Properties}
@@ -101,7 +102,8 @@ object DynamicBrokerConfig {
DynamicProducerStateManagerConfig ++
DynamicRemoteLogConfig.ReconfigurableConfigs ++
AutoBalancerControllerConfig.RECONFIGURABLE_CONFIGS.asScala ++
- AutoBalancerMetricsReporterConfig.RECONFIGURABLE_CONFIGS.asScala
+ AutoBalancerMetricsReporterConfig.RECONFIGURABLE_CONFIGS.asScala ++
+ BackPressureConfig.RECONFIGURABLE_CONFIGS.asScala
private val ClusterLevelListenerConfigs = Set(SocketServerConfigs.MAX_CONNECTIONS_CONFIG, SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG, SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG)
private val PerBrokerConfigs = (DynamicSecurityConfigs ++ DynamicListenerConfig.ReconfigurableConfigs).diff(
@@ -269,6 +271,13 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
addReconfigurable(kafkaServer.kafkaYammerMetrics)
addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, kafkaServer.config, kafkaServer.metrics, kafkaServer.clusterId))
addReconfigurable(new DynamicClientQuotaCallback(kafkaServer.quotaManagers, kafkaServer.config))
+ // AutoMQ inject start
+ kafkaServer match {
+ case brokerServer: BrokerServer =>
+ addReconfigurable(brokerServer.backPressureManager)
+ case _ =>
+ }
+ // AutoMQ inject end
addBrokerReconfigurable(new BrokerDynamicThreadPool(kafkaServer))
addBrokerReconfigurable(new DynamicLogConfig(kafkaServer.logManager, kafkaServer))
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 4fd1e53fe3..50935bdc67 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -793,6 +793,8 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
val s3RefillPeriodMsProp = getInt(AutoMQConfig.S3_NETWORK_REFILL_PERIOD_MS_CONFIG)
val s3MetricsLevel = getString(AutoMQConfig.S3_TELEMETRY_METRICS_LEVEL_CONFIG)
val s3ExporterReportIntervalMs = getInt(AutoMQConfig.S3_TELEMETRY_EXPORTER_REPORT_INTERVAL_MS_CONFIG)
+ val s3BackPressureEnabled = getBoolean(AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG)
+ val s3BackPressureCooldownMs = getLong(AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG)
// AutoMQ inject end
/** Internal Configurations **/
diff --git a/s3stream/src/test/java/com/automq/stream/s3/backpressure/DefaultBackPressureManagerTest.java b/core/src/test/java/kafka/automq/backpressure/DefaultBackPressureManagerTest.java
similarity index 80%
rename from s3stream/src/test/java/com/automq/stream/s3/backpressure/DefaultBackPressureManagerTest.java
rename to core/src/test/java/kafka/automq/backpressure/DefaultBackPressureManagerTest.java
index c1557c7b6f..6cce5e742d 100644
--- a/s3stream/src/test/java/com/automq/stream/s3/backpressure/DefaultBackPressureManagerTest.java
+++ b/core/src/test/java/kafka/automq/backpressure/DefaultBackPressureManagerTest.java
@@ -9,13 +9,17 @@
* by the Apache License, Version 2.0
*/
-package com.automq.stream.s3.backpressure;
+package kafka.automq.backpressure;
+
+import kafka.automq.AutoMQConfig;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
@@ -27,6 +31,7 @@ public class DefaultBackPressureManagerTest {
static String sourceB = "sourceB";
static String sourceC = "sourceC";
+ BackPressureConfig config;
DefaultBackPressureManager manager;
Regulator regulator;
@@ -67,6 +72,29 @@ public void setup() {
}).when(scheduler).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
}
+ @Test
+ public void testDynamicConfig() {
+ initManager(false, 0);
+
+ callChecker(sourceC, LoadLevel.NORMAL);
+ callChecker(sourceB, LoadLevel.HIGH);
+ assertRegulatorCalled(0, 0);
+
+ manager.reconfigure(Map.of(
+ AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG, "true"
+ ));
+ callChecker(sourceC, LoadLevel.NORMAL);
+ callChecker(sourceB, LoadLevel.NORMAL);
+ assertRegulatorCalled(1, 1);
+
+ manager.reconfigure(Map.of(
+ AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG, "false"
+ ));
+ callChecker(sourceC, LoadLevel.NORMAL);
+ callChecker(sourceB, LoadLevel.HIGH);
+ assertRegulatorCalled(1, 1);
+ }
+
@Test
public void testPriority1() {
initManager(0);
@@ -116,11 +144,16 @@ public void testCooldown() {
assertEquals(cooldownMs, schedulerScheduleDelay, tolerance);
}
+ private void initManager(long cooldownMs) {
+ initManager(true, cooldownMs);
+ }
+
/**
* Should be called at the beginning of each test to initialize the manager.
*/
- private void initManager(long cooldownMs) {
- manager = new DefaultBackPressureManager(regulator, cooldownMs);
+ private void initManager(boolean enabled, long cooldownMs) {
+ config = new BackPressureConfig(enabled, cooldownMs);
+ manager = new DefaultBackPressureManager(config, regulator);
manager.checkerScheduler = scheduler;
}
diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java
index 592b579719..e6830301c1 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java
@@ -152,10 +152,6 @@ public class S3StreamMetricsConstant {
public static final String LABEL_STAGE_FIND_INDEX = "find_index";
public static final String LABEL_STAGE_COMPUTE = "compute";
- // Back Pressure
- public static final String BACK_PRESSURE_STATE_METRIC_NAME = "back_pressure_state";
- public static final AttributeKey LABEL_BACK_PRESSURE_STATE = AttributeKey.stringKey("state");
-
// Broker Quota
public static final String BROKER_QUOTA_LIMIT_METRIC_NAME = "broker_quota_limit";
public static final AttributeKey LABEL_BROKER_QUOTA_TYPE = AttributeKey.stringKey("type");
diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java
index 056340b5df..8ebe755856 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java
@@ -12,7 +12,6 @@
package com.automq.stream.s3.metrics;
import com.automq.stream.s3.ByteBufAlloc;
-import com.automq.stream.s3.backpressure.LoadLevel;
import com.automq.stream.s3.metrics.operations.S3ObjectStage;
import com.automq.stream.s3.metrics.operations.S3Stage;
import com.automq.stream.s3.metrics.wrapper.ConfigListener;
@@ -141,12 +140,6 @@ public class S3StreamMetricsManager {
private static final MultiAttributes OPERATOR_INDEX_ATTRIBUTES = new MultiAttributes<>(Attributes.empty(),
S3StreamMetricsConstant.LABEL_INDEX);
- // Back Pressure
- private static final MultiAttributes BACK_PRESSURE_STATE_ATTRIBUTES = new MultiAttributes<>(Attributes.empty(),
- S3StreamMetricsConstant.LABEL_BACK_PRESSURE_STATE);
- private static ObservableLongGauge backPressureState = new NoopObservableLongGauge();
- private static Supplier backPressureStateSupplier = () -> LoadLevel.NORMAL;
-
// Broker Quota
private static final MultiAttributes BROKER_QUOTA_TYPE_ATTRIBUTES = new MultiAttributes<>(Attributes.empty(),
S3StreamMetricsConstant.LABEL_BROKER_QUOTA_TYPE);
@@ -156,7 +149,6 @@ public class S3StreamMetricsManager {
static {
BASE_ATTRIBUTES_LISTENERS.add(ALLOC_TYPE_ATTRIBUTES);
BASE_ATTRIBUTES_LISTENERS.add(OPERATOR_INDEX_ATTRIBUTES);
- BASE_ATTRIBUTES_LISTENERS.add(BACK_PRESSURE_STATE_ATTRIBUTES);
BASE_ATTRIBUTES_LISTENERS.add(BROKER_QUOTA_TYPE_ATTRIBUTES);
}
@@ -415,7 +407,6 @@ public static void initMetrics(Meter meter, String prefix) {
});
initAsyncCacheMetrics(meter, prefix);
- initBackPressureMetrics(meter, prefix);
initBrokerQuotaMetrics(meter, prefix);
}
@@ -492,24 +483,6 @@ private static void initAsyncCacheMetrics(Meter meter, String prefix) {
});
}
- private static void initBackPressureMetrics(Meter meter, String prefix) {
- backPressureState = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.BACK_PRESSURE_STATE_METRIC_NAME)
- .setDescription("Back pressure state")
- .ofLongs()
- .buildWithCallback(result -> {
- if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel())) {
- LoadLevel state = backPressureStateSupplier.get();
- result.record(state.ordinal(), BACK_PRESSURE_STATE_ATTRIBUTES.get(state.name()));
- // To beautify Grafana dashboard, we record -1 for other states
- for (LoadLevel l : LoadLevel.values()) {
- if (l != state) {
- result.record(-1, BACK_PRESSURE_STATE_ATTRIBUTES.get(l.name()));
- }
- }
- }
- });
- }
-
private static void initBrokerQuotaMetrics(Meter meter, String prefix) {
brokerQuotaLimit = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.BROKER_QUOTA_LIMIT_METRIC_NAME)
.setDescription("Broker quota limit")
@@ -962,10 +935,6 @@ public static void registerLocalStreamRangeIndexCacheStreamNumSupplier(Supplier<
S3StreamMetricsManager.localStreamRangeIndexCacheStreamNum = localStreamRangeIndexCacheStreamNum;
}
- public static void registerBackPressureStateSupplier(Supplier backPressureStateSupplier) {
- S3StreamMetricsManager.backPressureStateSupplier = backPressureStateSupplier;
- }
-
public static void registerBrokerQuotaLimitSupplier(Supplier