Skip to content

Commit

Permalink
MINOR: Extract SockerServer inner classes to server module (apache#16632
Browse files Browse the repository at this point in the history
)

Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
mimaison authored Jul 23, 2024
1 parent a012af5 commit c71eb60
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 63 deletions.
75 changes: 14 additions & 61 deletions core/src/main/scala/kafka/network/SocketServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import java.util.Optional
import java.util.concurrent._
import java.util.concurrent.atomic._
import kafka.cluster.{BrokerEndPoint, EndPoint}
import kafka.network.ConnectionQuotas._
import kafka.network.Processor._
import kafka.network.RequestChannel.{CloseConnectionResponse, EndThrottlingResponse, NoOpResponse, SendResponse, StartThrottlingResponse}
import kafka.network.SocketServer._
Expand All @@ -45,7 +44,7 @@ import org.apache.kafka.common.requests.{ApiVersionsRequest, RequestContext, Req
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{KafkaThread, LogContext, Time, Utils}
import org.apache.kafka.common.{Endpoint, KafkaException, MetricName, Reconfigurable}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.network.{ConnectionQuotaEntity, ConnectionThrottledException, SocketServerConfigs, TooManyConnectionsException}
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.config.QuotaConfigs
import org.apache.kafka.server.metrics.KafkaMetricsGroup
Expand Down Expand Up @@ -1387,47 +1386,6 @@ private[kafka] class Processor(
}
}

/**
* Interface for connection quota configuration. Connection quotas can be configured at the
* broker, listener or IP level.
*/
sealed trait ConnectionQuotaEntity {
def sensorName: String
def metricName: String
def sensorExpiration: Long
def metricTags: Map[String, String]
}

object ConnectionQuotas {
private val InactiveSensorExpirationTimeSeconds = TimeUnit.HOURS.toSeconds(1)
private val ConnectionRateSensorName = "Connection-Accept-Rate"
private val ConnectionRateMetricName = "connection-accept-rate"
private val IpMetricTag = "ip"
private val ListenerThrottlePrefix = ""
private val IpThrottlePrefix = "ip-"

private case class ListenerQuotaEntity(listenerName: String) extends ConnectionQuotaEntity {
override def sensorName: String = s"$ConnectionRateSensorName-$listenerName"
override def sensorExpiration: Long = Long.MaxValue
override def metricName: String = ConnectionRateMetricName
override def metricTags: Map[String, String] = Map(ListenerMetricTag -> listenerName)
}

private case object BrokerQuotaEntity extends ConnectionQuotaEntity {
override def sensorName: String = ConnectionRateSensorName
override def sensorExpiration: Long = Long.MaxValue
override def metricName: String = s"broker-$ConnectionRateMetricName"
override def metricTags: Map[String, String] = Map.empty
}

private case class IpQuotaEntity(ip: InetAddress) extends ConnectionQuotaEntity {
override def sensorName: String = s"$ConnectionRateSensorName-${ip.getHostAddress}"
override def sensorExpiration: Long = InactiveSensorExpirationTimeSeconds
override def metricName: String = ConnectionRateMetricName
override def metricTags: Map[String, String] = Map(IpMetricTag -> ip.getHostAddress)
}
}

class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extends Logging with AutoCloseable {

@volatile private var defaultMaxConnectionsPerIp: Int = config.maxConnectionsPerIp
Expand All @@ -1444,7 +1402,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
@volatile private var defaultConnectionRatePerIp = QuotaConfigs.IP_CONNECTION_RATE_DEFAULT.intValue()
private val connectionRatePerIp = new ConcurrentHashMap[InetAddress, Int]()
// sensor that tracks broker-wide connection creation rate and limit (quota)
private val brokerConnectionRateSensor = getOrCreateConnectionRateQuotaSensor(config.maxConnectionCreationRate, BrokerQuotaEntity)
private val brokerConnectionRateSensor = getOrCreateConnectionRateQuotaSensor(config.maxConnectionCreationRate, ConnectionQuotaEntity.brokerQuotaEntity())
private val maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(config.quotaWindowSizeSeconds.toLong)

def inc(listenerName: ListenerName, address: InetAddress, acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Unit = {
Expand Down Expand Up @@ -1482,7 +1440,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): Unit = {
// if there is a connection waiting on the rate throttle delay, we will let it wait the original delay even if
// the rate limit increases, because it is just one connection per listener and the code is simpler that way
updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity)
updateConnectionRateQuota(maxConnectionRate, ConnectionQuotaEntity.brokerQuotaEntity())
}

/**
Expand All @@ -1495,9 +1453,9 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
*/
def updateIpConnectionRateQuota(ip: Option[InetAddress], maxConnectionRate: Option[Int]): Unit = synchronized {
def isIpConnectionRateMetric(metricName: MetricName) = {
metricName.name == ConnectionRateMetricName &&
metricName.name == ConnectionQuotaEntity.CONNECTION_RATE_METRIC_NAME &&
metricName.group == MetricsGroup &&
metricName.tags.containsKey(IpMetricTag)
metricName.tags.containsKey(ConnectionQuotaEntity.IP_METRIC_TAG)
}

def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = {
Expand All @@ -1517,7 +1475,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
connectionRatePerIp.remove(address)
}
}
updateConnectionRateQuota(connectionRateForIp(address), IpQuotaEntity(address))
updateConnectionRateQuota(connectionRateForIp(address), ConnectionQuotaEntity.ipQuotaEntity(address))
case None =>
// synchronize on counts to ensure reading an IP connection rate quota and creating a quota config is atomic
counts.synchronized {
Expand All @@ -1526,7 +1484,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
info(s"Updated default max IP connection rate to $defaultConnectionRatePerIp")
metrics.metrics.forEach { (metricName, metric) =>
if (isIpConnectionRateMetric(metricName)) {
val quota = connectionRateForIp(InetAddress.getByName(metricName.tags.get(IpMetricTag)))
val quota = connectionRateForIp(InetAddress.getByName(metricName.tags.get(ConnectionQuotaEntity.IP_METRIC_TAG)))
if (shouldUpdateQuota(metric, quota)) {
debug(s"Updating existing connection rate quota config for ${metricName.tags} to $quota")
metric.config(rateQuotaMetricConfig(quota))
Expand Down Expand Up @@ -1701,7 +1659,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
val connectionRateQuota = connectionRateForIp(address)
val quotaEnabled = connectionRateQuota != QuotaConfigs.IP_CONNECTION_RATE_DEFAULT
if (quotaEnabled) {
val sensor = getOrCreateConnectionRateQuotaSensor(connectionRateQuota, IpQuotaEntity(address))
val sensor = getOrCreateConnectionRateQuotaSensor(connectionRateQuota, ConnectionQuotaEntity.ipQuotaEntity(address))
val timeMs = time.milliseconds
val throttleMs = recordAndGetThrottleTimeMs(sensor, timeMs)
if (throttleMs > 0) {
Expand Down Expand Up @@ -1766,7 +1724,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
connectionQuotaEntity.metricName,
MetricsGroup,
s"Tracking rate of accepting new connections (per second)",
connectionQuotaEntity.metricTags.asJava)
connectionQuotaEntity.metricTags)
}

private def rateQuotaMetricConfig(quotaLimit: Int): MetricConfig = {
Expand All @@ -1783,17 +1741,17 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend

class ListenerConnectionQuota(lock: Object, listener: ListenerName) extends ListenerReconfigurable with AutoCloseable {
@volatile private var _maxConnections = Int.MaxValue
private[network] val connectionRateSensor = getOrCreateConnectionRateQuotaSensor(Int.MaxValue, ListenerQuotaEntity(listener.value))
private[network] val listenerConnectionRateThrottleSensor = createConnectionRateThrottleSensor(ListenerThrottlePrefix)
private[network] val ipConnectionRateThrottleSensor = createConnectionRateThrottleSensor(IpThrottlePrefix)
private[network] val connectionRateSensor = getOrCreateConnectionRateQuotaSensor(Int.MaxValue, ConnectionQuotaEntity.listenerQuotaEntity(listener.value))
private[network] val listenerConnectionRateThrottleSensor = createConnectionRateThrottleSensor(ConnectionQuotaEntity.LISTENER_THROTTLE_PREFIX)
private[network] val ipConnectionRateThrottleSensor = createConnectionRateThrottleSensor(ConnectionQuotaEntity.IP_THROTTLE_PREFIX)

def maxConnections: Int = _maxConnections

override def listenerName(): ListenerName = listener

override def configure(configs: util.Map[String, _]): Unit = {
_maxConnections = maxConnections(configs)
updateConnectionRateQuota(maxConnectionCreationRate(configs), ListenerQuotaEntity(listener.value))
updateConnectionRateQuota(maxConnectionCreationRate(configs), ConnectionQuotaEntity.listenerQuotaEntity(listener.value))
}

override def reconfigurableConfigs(): util.Set[String] = {
Expand All @@ -1813,7 +1771,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
override def reconfigure(configs: util.Map[String, _]): Unit = {
lock.synchronized {
_maxConnections = maxConnections(configs)
updateConnectionRateQuota(maxConnectionCreationRate(configs), ListenerQuotaEntity(listener.value))
updateConnectionRateQuota(maxConnectionCreationRate(configs), ConnectionQuotaEntity.listenerQuotaEntity(listener.value))
lock.notifyAll()
}
}
Expand Down Expand Up @@ -1860,8 +1818,3 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
}

}

class TooManyConnectionsException(val ip: InetAddress, val count: Int) extends KafkaException(s"Too many connections from $ip (maximum = $count)")

class ConnectionThrottledException(val ip: InetAddress, val startThrottleTimeMs: Long, val throttleTimeMs: Long)
extends KafkaException(s"$ip throttled for $throttleTimeMs")
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import org.apache.kafka.common.metrics.internals.MetricsUtils
import org.apache.kafka.common.metrics.{KafkaMetric, MetricConfig, Metrics}
import org.apache.kafka.common.network._
import org.apache.kafka.common.utils.Time
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.server.config.{ReplicationConfigs, QuotaConfigs}
import org.apache.kafka.network.{ConnectionThrottledException, SocketServerConfigs, TooManyConnectionsException}
import org.apache.kafka.server.config.{QuotaConfigs, ReplicationConfigs}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.MockTime
import org.junit.jupiter.api.Assertions._
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.network;

import java.net.InetAddress;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
* Class for connection quota configuration. Connection quotas can be configured at the
* broker, listener or IP level.
*/
public class ConnectionQuotaEntity {

public static final String CONNECTION_RATE_SENSOR_NAME = "Connection-Accept-Rate";
public static final String CONNECTION_RATE_METRIC_NAME = "connection-accept-rate";
public static final String LISTENER_THROTTLE_PREFIX = "";
public static final String IP_METRIC_TAG = "ip";
public static final String IP_THROTTLE_PREFIX = "ip-";

public static ConnectionQuotaEntity listenerQuotaEntity(String listenerName) {
return new ConnectionQuotaEntity(CONNECTION_RATE_SENSOR_NAME + "-" + listenerName,
CONNECTION_RATE_METRIC_NAME,
Long.MAX_VALUE,
Collections.singletonMap("listener", listenerName));
}

public static ConnectionQuotaEntity brokerQuotaEntity() {
return new ConnectionQuotaEntity(CONNECTION_RATE_SENSOR_NAME,
"broker-" + ConnectionQuotaEntity.CONNECTION_RATE_METRIC_NAME,
Long.MAX_VALUE,
Collections.emptyMap());
}

public static ConnectionQuotaEntity ipQuotaEntity(InetAddress ip) {
return new ConnectionQuotaEntity(CONNECTION_RATE_SENSOR_NAME + "-" + ip.getHostAddress(),
CONNECTION_RATE_METRIC_NAME,
TimeUnit.HOURS.toSeconds(1),
Collections.singletonMap(IP_METRIC_TAG, ip.getHostAddress()));
}

private final String sensorName;
private final String metricName;
private final long sensorExpiration;
private final Map<String, String> metricTags;

private ConnectionQuotaEntity(String sensorName, String metricName, long sensorExpiration, Map<String, String> metricTags) {
this.sensorName = sensorName;
this.metricName = metricName;
this.sensorExpiration = sensorExpiration;
this.metricTags = metricTags;
}

/**
* The name of the sensor for this quota entity
*/
public String sensorName() {
return sensorName;
}

/**
* The name of the metric for this quota entity
*/
public String metricName() {
return metricName;
}

/**
* The duration in second to keep the sensor even if no new values are recorded
*/
public long sensorExpiration() {
return sensorExpiration;
}

/**
* Tags associated with this quota entity
*/
public Map<String, String> metricTags() {
return metricTags;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.network;

import org.apache.kafka.common.KafkaException;

import java.net.InetAddress;

public class ConnectionThrottledException extends KafkaException {

public final long startThrottleTimeMs;
public final long throttleTimeMs;

public ConnectionThrottledException(InetAddress ip, long startThrottleTimeMs, long throttleTimeMs) {
super(ip + " throttled for " + throttleTimeMs);
this.startThrottleTimeMs = startThrottleTimeMs;
this.throttleTimeMs = throttleTimeMs;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.network;

import org.apache.kafka.common.KafkaException;

import java.net.InetAddress;

public class TooManyConnectionsException extends KafkaException {

public final InetAddress ip;
public final int count;

public TooManyConnectionsException(InetAddress ip, int count) {
super("Too many connections from " + ip + " (maximum = " + count + ")");
this.ip = ip;
this.count = count;
}
}

0 comments on commit c71eb60

Please sign in to comment.