Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(backpressure): support dynamic configs #2204

Merged
merged 9 commits into from
Dec 3, 2024
11 changes: 11 additions & 0 deletions core/src/main/java/kafka/automq/AutoMQConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
Expand Down Expand Up @@ -171,6 +172,14 @@ public class AutoMQConfig {
public static final String CLUSTER_ID_CONFIG = "cluster.id";
public static final String CLUSTER_ID_DOC = "If the cluster.id is set, Kafka will auto format the storage.";

public static final String S3_BACK_PRESSURE_ENABLED_CONFIG = "automq.backpressure.enabled";
public static final String S3_BACK_PRESSURE_ENABLED_DOC = "Whether back pressure is enabled";
public static final boolean S3_BACK_PRESSURE_ENABLED_DEFAULT = true;

public static final String S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG = "automq.backpressure.cooldown.ms";
public static final String S3_BACK_PRESSURE_COOLDOWN_MS_DOC = "The cooldown time in milliseconds to wait between two regulator actions";
public static final long S3_BACK_PRESSURE_COOLDOWN_MS_DEFAULT = TimeUnit.SECONDS.toMillis(15);

// Deprecated config start
public static final String S3_ENDPOINT_CONFIG = "s3.endpoint";
public static final String S3_ENDPOINT_DOC = "[DEPRECATED]please use s3.data.buckets. The object storage endpoint, ex. <code>https://s3.us-east-1.amazonaws.com</code>.";
Expand Down Expand Up @@ -255,6 +264,8 @@ public static void define(ConfigDef configDef) {
.define(AutoMQConfig.S3_TELEMETRY_EXPORTER_REPORT_INTERVAL_MS_CONFIG, INT, S3_METRICS_EXPORTER_REPORT_INTERVAL_MS, MEDIUM, AutoMQConfig.S3_TELEMETRY_EXPORTER_REPORT_INTERVAL_MS_DOC)
.define(AutoMQConfig.S3_TELEMETRY_METRICS_EXPORTER_URI_CONFIG, STRING, null, HIGH, AutoMQConfig.S3_TELEMETRY_METRICS_EXPORTER_URI_DOC)
.define(AutoMQConfig.S3_TELEMETRY_METRICS_BASE_LABELS_CONFIG, STRING, null, MEDIUM, AutoMQConfig.S3_TELEMETRY_METRICS_BASE_LABELS_DOC)
.define(AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG, BOOLEAN, AutoMQConfig.S3_BACK_PRESSURE_ENABLED_DEFAULT, MEDIUM, AutoMQConfig.S3_BACK_PRESSURE_ENABLED_DOC)
.define(AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG, LONG, AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_DEFAULT, MEDIUM, AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_DOC)
// Deprecated config start
.define(AutoMQConfig.S3_ENDPOINT_CONFIG, STRING, null, HIGH, AutoMQConfig.S3_ENDPOINT_DOC)
.define(AutoMQConfig.S3_REGION_CONFIG, STRING, null, HIGH, AutoMQConfig.S3_REGION_DOC)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright 2024, AutoMQ HK Limited.
*
* The use of this file is governed by the Business Source License,
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package kafka.automq.backpressure;

import kafka.automq.AutoMQConfig;
import kafka.server.KafkaConfig;

import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.ConfigUtils;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public class BackPressureConfig {

public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of(
AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG,
AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG
);

private volatile boolean enabled;
/**
* The cooldown time in milliseconds to wait between two regulator actions.
*/
private long cooldownMs;

public static BackPressureConfig from(KafkaConfig config) {
return new BackPressureConfig(config.s3BackPressureEnabled(), config.s3BackPressureCooldownMs());
}

public static BackPressureConfig from(Map<String, ?> raw) {
Map<String, Object> configs = new HashMap<>(raw);
return new BackPressureConfig(
ConfigUtils.getBoolean(configs, AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG),
ConfigUtils.getLong(configs, AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG)
);
}

public BackPressureConfig(boolean enabled, long cooldownMs) {
this.enabled = enabled;
this.cooldownMs = cooldownMs;
}

public static void validate(Map<String, ?> raw) throws ConfigException {
Map<String, Object> configs = new HashMap<>(raw);
if (configs.containsKey(AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG)) {
ConfigUtils.getBoolean(configs, AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG);
}
if (configs.containsKey(AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG)) {
validateCooldownMs(ConfigUtils.getLong(configs, AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG));
}
}

public static void validateCooldownMs(long cooldownMs) throws ConfigException {
if (cooldownMs < 0) {
throw new ConfigException(AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG, cooldownMs, "The cooldown time must be non-negative.");
}
}

public void update(Map<String, ?> raw) {
Map<String, Object> configs = new HashMap<>(raw);
if (configs.containsKey(AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG)) {
this.enabled = ConfigUtils.getBoolean(configs, AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG);
}
if (configs.containsKey(AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG)) {
this.cooldownMs = ConfigUtils.getLong(configs, AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG);
}
}

public boolean enabled() {
return enabled;
}

public long cooldownMs() {
return cooldownMs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@
* by the Apache License, Version 2.0
*/

package com.automq.stream.s3.backpressure;
package kafka.automq.backpressure;

import org.apache.kafka.common.Reconfigurable;

/**
* It checks the {@link LoadLevel} of the system and takes actions based on the load level
* to prevent the system from being overwhelmed.
*/
public interface BackPressureManager {
public interface BackPressureManager extends Reconfigurable {

/**
* Start the back pressure manager.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* by the Apache License, Version 2.0
*/

package com.automq.stream.s3.backpressure;
package kafka.automq.backpressure;

/**
* A checker to check the load level of the system periodically.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
* by the Apache License, Version 2.0
*/

package com.automq.stream.s3.backpressure;
package kafka.automq.backpressure;

import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.server.metrics.s3stream.S3StreamKafkaMetricsManager;

import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.utils.ThreadUtils;
import com.automq.stream.utils.Threads;

Expand All @@ -20,20 +22,18 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class DefaultBackPressureManager implements BackPressureManager {
import static kafka.automq.backpressure.BackPressureConfig.RECONFIGURABLE_CONFIGS;

public static final long DEFAULT_COOLDOWN_MS = TimeUnit.SECONDS.toMillis(15);
public class DefaultBackPressureManager implements BackPressureManager {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultBackPressureManager.class);

private final BackPressureConfig config;
private final Regulator regulator;
/**
* The cooldown time in milliseconds to wait between two regulator actions.
*/
private final long cooldownMs;

/**
* The scheduler to schedule the checker periodically.
Expand All @@ -55,20 +55,23 @@ public class DefaultBackPressureManager implements BackPressureManager {
* Only used for logging and monitoring.
*/
private LoadLevel lastRegulateLevel = LoadLevel.NORMAL;
/**
* The current state metrics of the system.
* Only used for monitoring.
*
* @see S3StreamKafkaMetricsManager#setBackPressureStateSupplier
*/
private final Map<String, Integer> stateMetrics = new HashMap<>(LoadLevel.values().length);

public DefaultBackPressureManager(Regulator regulator) {
this(regulator, DEFAULT_COOLDOWN_MS);
}

public DefaultBackPressureManager(Regulator regulator, long cooldownMs) {
public DefaultBackPressureManager(BackPressureConfig config, Regulator regulator) {
this.config = config;
this.regulator = regulator;
this.cooldownMs = cooldownMs;
}

@Override
public void start() {
this.checkerScheduler = Threads.newSingleThreadScheduledExecutor(ThreadUtils.createThreadFactory("back-pressure-checker-%d", false), LOGGER);
S3StreamMetricsManager.registerBackPressureStateSupplier(this::currentLoadLevel);
S3StreamKafkaMetricsManager.setBackPressureStateSupplier(this::stateMetrics);
}

@Override
Expand All @@ -85,6 +88,9 @@ public void shutdown() {
}

private void maybeRegulate() {
if (!config.enabled()) {
return;
}
maybeRegulate(false);
}

Expand All @@ -98,11 +104,11 @@ private void maybeRegulate(boolean isInternal) {
long now = System.currentTimeMillis();
long timeElapsed = now - lastRegulateTime;

if (timeElapsed < cooldownMs) {
if (timeElapsed < config.cooldownMs()) {
// Skip regulating if the cooldown time has not passed.
if (!isInternal) {
// Schedule the next regulate action if it is not an internal call.
checkerScheduler.schedule(() -> maybeRegulate(true), cooldownMs - timeElapsed, TimeUnit.MILLISECONDS);
checkerScheduler.schedule(() -> maybeRegulate(true), config.cooldownMs() - timeElapsed, TimeUnit.MILLISECONDS);
}
return;
}
Expand Down Expand Up @@ -134,4 +140,32 @@ private void regulate(LoadLevel loadLevel, long now) {
lastRegulateTime = now;
lastRegulateLevel = loadLevel;
}

private Map<String, Integer> stateMetrics() {
LoadLevel current = currentLoadLevel();
for (LoadLevel level : LoadLevel.values()) {
int value = level.equals(current) ? 1 : -1;
stateMetrics.put(level.name(), value);
}
return stateMetrics;
}

@Override
public Set<String> reconfigurableConfigs() {
return RECONFIGURABLE_CONFIGS;
}

@Override
public void validateReconfiguration(Map<String, ?> configs) throws ConfigException {
BackPressureConfig.validate(configs);
}

@Override
public void reconfigure(Map<String, ?> configs) {
config.update(configs);
}

@Override
public void configure(Map<String, ?> configs) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* by the Apache License, Version 2.0
*/

package com.automq.stream.s3.backpressure;
package kafka.automq.backpressure;

/**
* Represents the load level of the system.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* by the Apache License, Version 2.0
*/

package com.automq.stream.s3.backpressure;
package kafka.automq.backpressure;

/**
* The Regulator class is responsible for controlling and limiting the rate of external requests.
Expand Down
20 changes: 0 additions & 20 deletions core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@
import com.automq.stream.s3.Config;
import com.automq.stream.s3.S3Storage;
import com.automq.stream.s3.S3StreamClient;
import com.automq.stream.s3.backpressure.BackPressureManager;
import com.automq.stream.s3.backpressure.DefaultBackPressureManager;
import com.automq.stream.s3.backpressure.Regulator;
import com.automq.stream.s3.cache.S3BlockCache;
import com.automq.stream.s3.cache.blockcache.DefaultObjectReaderFactory;
import com.automq.stream.s3.cache.blockcache.ObjectReaderFactory;
Expand Down Expand Up @@ -89,8 +86,6 @@ public class DefaultS3Client implements Client {

protected CompactionManager compactionManager;

protected BackPressureManager backPressureManager;

protected S3StreamClient streamClient;

protected KVClient kvClient;
Expand Down Expand Up @@ -151,7 +146,6 @@ public void start() {
this.objectManager.setCommitStreamSetObjectHook(localIndexCache::updateIndexFromRequest);
this.blockCache = new StreamReaders(this.config.blockCacheSize(), objectManager, objectStorage, objectReaderFactory);
this.compactionManager = new CompactionManager(this.config, this.objectManager, this.streamManager, compactionobjectStorage);
this.backPressureManager = new DefaultBackPressureManager(backPressureRegulator());
this.writeAheadLog = buildWAL();
StorageFailureHandlerChain storageFailureHandler = new StorageFailureHandlerChain();
this.storage = new S3Storage(this.config, writeAheadLog, streamManager, objectManager, blockCache, objectStorage, storageFailureHandler);
Expand All @@ -168,13 +162,11 @@ public void start() {

this.storage.startup();
this.compactionManager.start();
this.backPressureManager.start();
LOGGER.info("S3Client started");
}

@Override
public void shutdown() {
this.backPressureManager.shutdown();
this.compactionManager.shutdown();
this.streamClient.shutdown();
this.storage.shutdown();
Expand Down Expand Up @@ -235,18 +227,6 @@ protected ObjectManager newObjectManager(int nodeId, long nodeEpoch, boolean fai
this::getAutoMQVersion, failoverMode);
}

protected Regulator backPressureRegulator() {
return new Regulator() {
@Override
public void increase() {
}

@Override
public void decrease() {
}
};
}

protected Failover failover() {
return new Failover(new FailoverFactory() {
@Override
Expand Down
Loading