Skip to content

Commit

Permalink
feat(backpressure): support dynamic configs (#2204)
Browse files Browse the repository at this point in the history
* feat(backpressure): make back pressure manager configurable

Signed-off-by: Ning Yu <[email protected]>

* test: test diabled

Signed-off-by: Ning Yu <[email protected]>

* refactor: move backpressure from s3stream to kafka.core

Signed-off-by: Ning Yu <[email protected]>

* refactor: init `BackPressureManager` in `BrokerServer`

Signed-off-by: Ning Yu <[email protected]>

* refactor: introduce `BackPressureConfig`

Signed-off-by: Ning Yu <[email protected]>

* feat: make `BackPressureManager` reconfigurable

Signed-off-by: Ning Yu <[email protected]>

* test: test reconfigurable

Signed-off-by: Ning Yu <[email protected]>

* refactor: rename config key

Signed-off-by: Ning Yu <[email protected]>

* refactor: move metric "back_pressure_state" from s3stream to core

Signed-off-by: Ning Yu <[email protected]>

---------

Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 committed Dec 20, 2024
1 parent 37ae176 commit 5e11479
Show file tree
Hide file tree
Showing 16 changed files with 262 additions and 80 deletions.
11 changes: 11 additions & 0 deletions core/src/main/java/kafka/automq/AutoMQConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
Expand Down Expand Up @@ -171,6 +172,14 @@ public class AutoMQConfig {
public static final String CLUSTER_ID_CONFIG = "cluster.id";
public static final String CLUSTER_ID_DOC = "If the cluster.id is set, Kafka will auto format the storage.";

public static final String S3_BACK_PRESSURE_ENABLED_CONFIG = "automq.backpressure.enabled";
public static final String S3_BACK_PRESSURE_ENABLED_DOC = "Whether back pressure is enabled";
public static final boolean S3_BACK_PRESSURE_ENABLED_DEFAULT = true;

public static final String S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG = "automq.backpressure.cooldown.ms";
public static final String S3_BACK_PRESSURE_COOLDOWN_MS_DOC = "The cooldown time in milliseconds to wait between two regulator actions";
public static final long S3_BACK_PRESSURE_COOLDOWN_MS_DEFAULT = TimeUnit.SECONDS.toMillis(15);

// Deprecated config start
public static final String S3_ENDPOINT_CONFIG = "s3.endpoint";
public static final String S3_ENDPOINT_DOC = "[DEPRECATED]please use s3.data.buckets. The object storage endpoint, ex. <code>https://s3.us-east-1.amazonaws.com</code>.";
Expand Down Expand Up @@ -255,6 +264,8 @@ public static void define(ConfigDef configDef) {
.define(AutoMQConfig.S3_TELEMETRY_EXPORTER_REPORT_INTERVAL_MS_CONFIG, INT, S3_METRICS_EXPORTER_REPORT_INTERVAL_MS, MEDIUM, AutoMQConfig.S3_TELEMETRY_EXPORTER_REPORT_INTERVAL_MS_DOC)
.define(AutoMQConfig.S3_TELEMETRY_METRICS_EXPORTER_URI_CONFIG, STRING, null, HIGH, AutoMQConfig.S3_TELEMETRY_METRICS_EXPORTER_URI_DOC)
.define(AutoMQConfig.S3_TELEMETRY_METRICS_BASE_LABELS_CONFIG, STRING, null, MEDIUM, AutoMQConfig.S3_TELEMETRY_METRICS_BASE_LABELS_DOC)
.define(AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG, BOOLEAN, AutoMQConfig.S3_BACK_PRESSURE_ENABLED_DEFAULT, MEDIUM, AutoMQConfig.S3_BACK_PRESSURE_ENABLED_DOC)
.define(AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG, LONG, AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_DEFAULT, MEDIUM, AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_DOC)
// Deprecated config start
.define(AutoMQConfig.S3_ENDPOINT_CONFIG, STRING, null, HIGH, AutoMQConfig.S3_ENDPOINT_DOC)
.define(AutoMQConfig.S3_REGION_CONFIG, STRING, null, HIGH, AutoMQConfig.S3_REGION_DOC)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright 2024, AutoMQ HK Limited.
*
* The use of this file is governed by the Business Source License,
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package kafka.automq.backpressure;

import kafka.automq.AutoMQConfig;
import kafka.server.KafkaConfig;

import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.ConfigUtils;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public class BackPressureConfig {

public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of(
AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG,
AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG
);

private volatile boolean enabled;
/**
* The cooldown time in milliseconds to wait between two regulator actions.
*/
private long cooldownMs;

public static BackPressureConfig from(KafkaConfig config) {
return new BackPressureConfig(config.s3BackPressureEnabled(), config.s3BackPressureCooldownMs());
}

public static BackPressureConfig from(Map<String, ?> raw) {
Map<String, Object> configs = new HashMap<>(raw);
return new BackPressureConfig(
ConfigUtils.getBoolean(configs, AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG),
ConfigUtils.getLong(configs, AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG)
);
}

public BackPressureConfig(boolean enabled, long cooldownMs) {
this.enabled = enabled;
this.cooldownMs = cooldownMs;
}

public static void validate(Map<String, ?> raw) throws ConfigException {
Map<String, Object> configs = new HashMap<>(raw);
if (configs.containsKey(AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG)) {
ConfigUtils.getBoolean(configs, AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG);
}
if (configs.containsKey(AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG)) {
validateCooldownMs(ConfigUtils.getLong(configs, AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG));
}
}

public static void validateCooldownMs(long cooldownMs) throws ConfigException {
if (cooldownMs < 0) {
throw new ConfigException(AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG, cooldownMs, "The cooldown time must be non-negative.");
}
}

public void update(Map<String, ?> raw) {
Map<String, Object> configs = new HashMap<>(raw);
if (configs.containsKey(AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG)) {
this.enabled = ConfigUtils.getBoolean(configs, AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG);
}
if (configs.containsKey(AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG)) {
this.cooldownMs = ConfigUtils.getLong(configs, AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG);
}
}

public boolean enabled() {
return enabled;
}

public long cooldownMs() {
return cooldownMs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@
* by the Apache License, Version 2.0
*/

package com.automq.stream.s3.backpressure;
package kafka.automq.backpressure;

import org.apache.kafka.common.Reconfigurable;

/**
* It checks the {@link LoadLevel} of the system and takes actions based on the load level
* to prevent the system from being overwhelmed.
*/
public interface BackPressureManager {
public interface BackPressureManager extends Reconfigurable {

/**
* Start the back pressure manager.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* by the Apache License, Version 2.0
*/

package com.automq.stream.s3.backpressure;
package kafka.automq.backpressure;

/**
* A checker to check the load level of the system periodically.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,29 @@
* by the Apache License, Version 2.0
*/

package com.automq.stream.s3.backpressure;
package kafka.automq.backpressure;

import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.server.metrics.s3stream.S3StreamKafkaMetricsManager;

import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.utils.ThreadUtils;
import com.automq.stream.utils.Threads;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultBackPressureManager implements BackPressureManager {
import static kafka.automq.backpressure.BackPressureConfig.RECONFIGURABLE_CONFIGS;

public static final long DEFAULT_COOLDOWN_MS = TimeUnit.SECONDS.toMillis(15);
public class DefaultBackPressureManager implements BackPressureManager {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultBackPressureManager.class);

private final BackPressureConfig config;
private final Regulator regulator;
/**
* The cooldown time in milliseconds to wait between two regulator actions.
*/
private final long cooldownMs;

/**
* The scheduler to schedule the checker periodically.
Expand All @@ -53,20 +53,23 @@ public class DefaultBackPressureManager implements BackPressureManager {
* Only used for logging and monitoring.
*/
private LoadLevel lastRegulateLevel = LoadLevel.NORMAL;
/**
* The current state metrics of the system.
* Only used for monitoring.
*
* @see S3StreamKafkaMetricsManager#setBackPressureStateSupplier
*/
private final Map<String, Integer> stateMetrics = new HashMap<>(LoadLevel.values().length);

public DefaultBackPressureManager(Regulator regulator) {
this(regulator, DEFAULT_COOLDOWN_MS);
}

public DefaultBackPressureManager(Regulator regulator, long cooldownMs) {
public DefaultBackPressureManager(BackPressureConfig config, Regulator regulator) {
this.config = config;
this.regulator = regulator;
this.cooldownMs = cooldownMs;
}

@Override
public void start() {
this.checkerScheduler = Threads.newSingleThreadScheduledExecutor(ThreadUtils.createThreadFactory("back-pressure-checker-%d", false), LOGGER);
S3StreamMetricsManager.registerBackPressureStateSupplier(this::currentLoadLevel);
S3StreamKafkaMetricsManager.setBackPressureStateSupplier(this::stateMetrics);
}

@Override
Expand All @@ -83,6 +86,9 @@ public void shutdown() {
}

private void maybeRegulate() {
if (!config.enabled()) {
return;
}
maybeRegulate(false);
}

Expand All @@ -96,11 +102,11 @@ private void maybeRegulate(boolean isInternal) {
long now = System.currentTimeMillis();
long timeElapsed = now - lastRegulateTime;

if (timeElapsed < cooldownMs) {
if (timeElapsed < config.cooldownMs()) {
// Skip regulating if the cooldown time has not passed.
if (!isInternal) {
// Schedule the next regulate action if it is not an internal call.
checkerScheduler.schedule(() -> maybeRegulate(true), cooldownMs - timeElapsed, TimeUnit.MILLISECONDS);
checkerScheduler.schedule(() -> maybeRegulate(true), config.cooldownMs() - timeElapsed, TimeUnit.MILLISECONDS);
}
return;
}
Expand Down Expand Up @@ -132,4 +138,32 @@ private void regulate(LoadLevel loadLevel, long now) {
lastRegulateTime = now;
lastRegulateLevel = loadLevel;
}

private Map<String, Integer> stateMetrics() {
LoadLevel current = currentLoadLevel();
for (LoadLevel level : LoadLevel.values()) {
int value = level.equals(current) ? 1 : -1;
stateMetrics.put(level.name(), value);
}
return stateMetrics;
}

@Override
public Set<String> reconfigurableConfigs() {
return RECONFIGURABLE_CONFIGS;
}

@Override
public void validateReconfiguration(Map<String, ?> configs) throws ConfigException {
BackPressureConfig.validate(configs);
}

@Override
public void reconfigure(Map<String, ?> configs) {
config.update(configs);
}

@Override
public void configure(Map<String, ?> configs) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* by the Apache License, Version 2.0
*/

package com.automq.stream.s3.backpressure;
package kafka.automq.backpressure;

/**
* Represents the load level of the system.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* by the Apache License, Version 2.0
*/

package com.automq.stream.s3.backpressure;
package kafka.automq.backpressure;

/**
* The Regulator class is responsible for controlling and limiting the rate of external requests.
Expand Down
17 changes: 0 additions & 17 deletions core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,6 @@ public class DefaultS3Client implements Client {

protected CompactionManager compactionManager;

protected BackPressureManager backPressureManager;

protected S3StreamClient streamClient;

protected KVClient kvClient;
Expand Down Expand Up @@ -151,7 +149,6 @@ public void start() {
this.objectManager.setCommitStreamSetObjectHook(localIndexCache::updateIndexFromRequest);
this.blockCache = new StreamReaders(this.config.blockCacheSize(), objectManager, objectStorage, objectReaderFactory);
this.compactionManager = new CompactionManager(this.config, this.objectManager, this.streamManager, compactionobjectStorage);
this.backPressureManager = new DefaultBackPressureManager(backPressureRegulator());
this.writeAheadLog = buildWAL();
StorageFailureHandlerChain storageFailureHandler = new StorageFailureHandlerChain();
this.storage = new S3Storage(this.config, writeAheadLog, streamManager, objectManager, blockCache, objectStorage, storageFailureHandler);
Expand All @@ -168,13 +165,11 @@ public void start() {

this.storage.startup();
this.compactionManager.start();
this.backPressureManager.start();
LOGGER.info("S3Client started");
}

@Override
public void shutdown() {
this.backPressureManager.shutdown();
this.compactionManager.shutdown();
this.streamClient.shutdown();
this.storage.shutdown();
Expand Down Expand Up @@ -235,18 +230,6 @@ protected ObjectManager newObjectManager(int nodeId, long nodeEpoch, boolean fai
this::getAutoMQVersion, failoverMode);
}

protected Regulator backPressureRegulator() {
return new Regulator() {
@Override
public void increase() {
}

@Override
public void decrease() {
}
};
}

protected Failover failover() {
return new Failover(new FailoverFactory() {
@Override
Expand Down
23 changes: 23 additions & 0 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package kafka.server

import kafka.automq.backpressure.{BackPressureConfig, BackPressureManager, DefaultBackPressureManager, Regulator}
import kafka.automq.zonerouter.{NoopProduceRouter, ProduceRouter}
import kafka.cluster.EndPoint
import kafka.coordinator.group.{CoordinatorLoaderImpl, CoordinatorPartitionWriter, GroupCoordinatorAdapter}
Expand Down Expand Up @@ -158,6 +159,8 @@ class BrokerServer(
def metadataLoader: MetadataLoader = sharedServer.loader

var produceRouter: ProduceRouter = _

var backPressureManager: BackPressureManager = _
// AutoMQ inject end

private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus): Boolean = {
Expand Down Expand Up @@ -540,6 +543,12 @@ class BrokerServer(
// AutoMQ inject start
ElasticLogManager.init(config, clusterId, this)
produceRouter = newProduceRouter()

backPressureManager = new DefaultBackPressureManager(
BackPressureConfig.from(config),
newBackPressureRegulator()
)
backPressureManager.start()
// AutoMQ inject end

// We're now ready to unfence the broker. This also allows this broker to transition
Expand Down Expand Up @@ -660,6 +669,10 @@ class BrokerServer(
lifecycleManager.beginShutdown()

// AutoMQ for Kafka inject start
if (backPressureManager != null) {
CoreUtils.swallow(backPressureManager.shutdown(), this)
}

// https://github.com/AutoMQ/automq-for-kafka/issues/540
// await partition shutdown:
// 1. after lifecycleManager start shutdown to trigger partitions gracefully reassign.
Expand Down Expand Up @@ -796,6 +809,16 @@ class BrokerServer(
dataPlaneRequestProcessor.asInstanceOf[ElasticKafkaApis].setProduceRouter(produceRouter)
produceRouter
}

protected def newBackPressureRegulator(): Regulator = {
new Regulator {
override def increase(): Unit = {
}

override def decrease(): Unit = {
}
}
}
// AutoMQ inject end

}
Loading

0 comments on commit 5e11479

Please sign in to comment.