Skip to content

Commit

Permalink
feat(quota): exclude internal client IDs from broker quota (#2179)
Browse files Browse the repository at this point in the history
* feat(quota): exclude internal client IDs from broker quota

Signed-off-by: Ning Yu <[email protected]>

* feat(autobalancer): mark producers and consumers internal clients

Signed-off-by: Ning Yu <[email protected]>

---------

Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 authored Nov 25, 2024
1 parent 89f2c6b commit 018833e
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 6 deletions.
5 changes: 3 additions & 2 deletions core/src/main/java/kafka/autobalancer/LoadRetriever.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.controller.Controller;
import org.apache.kafka.controller.ControllerRequestContext;
import org.apache.kafka.server.config.QuotaConfigs;

import com.automq.stream.utils.LogContext;

Expand Down Expand Up @@ -111,7 +112,7 @@ public LoadRetriever(AutoBalancerControllerConfig config, Controller controller,
metricReporterTopicPartition = config.getInt(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_METRICS_TOPIC_NUM_PARTITIONS_CONFIG);
metricReporterTopicRetentionTime = config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_METRICS_TOPIC_RETENTION_MS_CONFIG);
consumerPollTimeout = config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_CONSUMER_POLL_TIMEOUT);
consumerClientIdPrefix = config.getString(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_CONSUMER_CLIENT_ID_PREFIX);
consumerClientIdPrefix = QuotaConfigs.INTERNAL_CLIENT_ID_PREFIX + config.getString(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_CONSUMER_CLIENT_ID_PREFIX);
consumerRetryBackOffMs = config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_CONSUMER_RETRY_BACKOFF_MS);
}

Expand Down Expand Up @@ -154,7 +155,7 @@ protected Properties buildConsumerProps(String bootstrapServer) {
Properties consumerProps = new Properties();
long randomToken = RANDOM.nextLong();
consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, consumerClientIdPrefix + "-consumer-" + randomToken);
consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, consumerClientIdPrefix + randomToken);
consumerProps.setProperty(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, Long.toString(consumerRetryBackOffMs));
consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
consumerProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class AutoBalancerControllerConfig extends AbstractConfig {
public static final Integer DEFAULT_AUTO_BALANCER_CONTROLLER_METRICS_TOPIC_NUM_PARTITIONS_CONFIG = 1;
public static final long DEFAULT_AUTO_BALANCER_CONTROLLER_METRICS_TOPIC_RETENTION_MS_CONFIG = TimeUnit.MINUTES.toMillis(30);
public static final long DEFAULT_AUTO_BALANCER_CONTROLLER_CONSUMER_POLL_TIMEOUT = 1000L;
public static final String DEFAULT_AUTO_BALANCER_CONTROLLER_CONSUMER_CLIENT_ID_PREFIX = "AutoBalancerControllerConsumer";
public static final String DEFAULT_AUTO_BALANCER_CONTROLLER_CONSUMER_CLIENT_ID_PREFIX = "auto_balancer_controller_consumer_";
public static final long DEFAULT_AUTO_BALANCER_CONTROLLER_CONSUMER_RETRY_BACKOFF_MS = 1000;
public static final long DEFAULT_AUTO_BALANCER_CONTROLLER_ACCEPTED_METRICS_DELAY_MS = Duration.ofMinutes(1).toMillis();
public static final String DEFAULT_AUTO_BALANCER_CONTROLLER_GOALS = new StringJoiner(",")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class AutoBalancerMetricsReporterConfig extends AbstractConfig {
public static final String AUTO_BALANCER_METRICS_REPORTER_LINGER_MS_CONFIG = PREFIX + ProducerConfig.LINGER_MS_CONFIG;
public static final String AUTO_BALANCER_METRICS_REPORTER_BATCH_SIZE_CONFIG = PREFIX + ProducerConfig.BATCH_SIZE_CONFIG;
/* Default values */
public static final String DEFAULT_AUTO_BALANCER_METRICS_REPORTER_PRODUCER_CLIENT_ID = "AutoBalancerMetricsReporterProducer";
public static final String DEFAULT_AUTO_BALANCER_METRICS_REPORTER_PRODUCER_CLIENT_ID = "auto_balancer_metrics_reporter_producer";
public static final long DEFAULT_AUTO_BALANCER_METRICS_REPORTER_INTERVAL_MS = TimeUnit.SECONDS.toMillis(10);
public static final int DEFAULT_AUTO_BALANCER_METRICS_REPORTER_LINGER_MS = (int) TimeUnit.SECONDS.toMillis(1);
public static final int DEFAULT_AUTO_BALANCER_METRICS_BATCH_SIZE = 800 * 1000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.server.config.KRaftConfigs;
import org.apache.kafka.server.config.QuotaConfigs;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;

Expand Down Expand Up @@ -229,7 +230,7 @@ public void configure(Map<String, ?> rawConfigs) {

setIfAbsent(producerProps,
ProducerConfig.CLIENT_ID_CONFIG,
reporterConfig.getString(AutoBalancerMetricsReporterConfig.AUTO_BALANCER_METRICS_REPORTER_PRODUCER_CLIENT_ID));
QuotaConfigs.INTERNAL_CLIENT_ID_PREFIX + reporterConfig.getString(AutoBalancerMetricsReporterConfig.AUTO_BALANCER_METRICS_REPORTER_PRODUCER_CLIENT_ID));
setIfAbsent(producerProps, ProducerConfig.LINGER_MS_CONFIG,
reporterConfig.getLong(AutoBalancerMetricsReporterConfig.AUTO_BALANCER_METRICS_REPORTER_LINGER_MS_CONFIG).toString());
setIfAbsent(producerProps, ProducerConfig.BATCH_SIZE_CONFIG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.kafka.common.metrics.{Metrics, Quota, QuotaViolationException,
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.Time
import org.apache.kafka.network.Session
import org.apache.kafka.server.config.BrokerQuotaManagerConfig
import org.apache.kafka.server.config.{BrokerQuotaManagerConfig, QuotaConfigs}

import java.util.concurrent.TimeUnit
import java.util.{Optional, Properties}
Expand Down Expand Up @@ -72,6 +72,11 @@ class BrokerQuotaManager(private val config: BrokerQuotaManagerConfig,
return 0
}

if (isInternalClient(request.context.clientId())) {
// Internal clients are exempt from quota
return 0
}

if (isInWhiteList(request.session.principal, request.context.clientId(), request.context.listenerName())) {
// Client is in the white list, no need to throttle
return 0
Expand All @@ -90,6 +95,10 @@ class BrokerQuotaManager(private val config: BrokerQuotaManagerConfig,

}

private def isInternalClient(clientId: String): Boolean = {
clientId.startsWith(QuotaConfigs.INTERNAL_CLIENT_ID_PREFIX)
}

private def isInWhiteList(principal: KafkaPrincipal, clientId: String, listenerName: String): Boolean = {
val key = s"$principal:$clientId:$listenerName"
whiteListCache.get(key) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ public class QuotaConfigs {
"for the specified IP.";

// AutoMQ inject start
/**
* All clients created by AutoMQ will have this prefix in their client id, and they will be excluded from quota.
*/
public static final String INTERNAL_CLIENT_ID_PREFIX = "__automq_client_";

public static final String BROKER_QUOTA_ENABLED_CONFIG = "broker.quota.enabled";
public static final String BROKER_QUOTA_PRODUCE_BYTES_CONFIG = "broker.quota.produce.bytes";
public static final String BROKER_QUOTA_FETCH_BYTES_CONFIG = "broker.quota.fetch.bytes";
Expand Down

0 comments on commit 018833e

Please sign in to comment.