Skip to content

Commit

Permalink
fix(config): adjust configs in KafkaConfig rather than Kafka (#971)
Browse files Browse the repository at this point in the history
* fix(core): fix adjusting s3 cache related configuration

Signed-off-by: SSpirits <[email protected]>

* refactor(config): adjust configs in `KafkaConfig` rather than `Kafka`

Signed-off-by: Ning Yu <[email protected]>

---------

Signed-off-by: SSpirits <[email protected]>
Signed-off-by: Ning Yu <[email protected]>
Co-authored-by: SSpirits <[email protected]>
  • Loading branch information
Chillax-0v0 and ShadowySpirits authored Mar 19, 2024
1 parent b282e99 commit 25476bc
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 60 deletions.
56 changes: 0 additions & 56 deletions core/src/main/scala/kafka/Kafka.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@ package kafka
import com.automq.s3shell.sdk.auth.{CredentialsProviderHolder, EnvVariableCredentialsProvider}
import com.automq.s3shell.sdk.model.S3Url
import com.automq.stream.s3.ByteBufAlloc
import io.netty.util.internal.PlatformDependent
import joptsimple.OptionParser
import kafka.s3shell.util.S3ShellPropUtil
import kafka.server.{KafkaConfig, KafkaRaftServer, KafkaServer, Server}
import kafka.utils.Implicits._
import kafka.utils.{CommandLineUtils, Exit, Logging}
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.utils._
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider}

Expand Down Expand Up @@ -113,7 +111,6 @@ object Kafka extends Logging {
// AutoMQ for Kafka inject start
// set allocator's policy as early as possible
ByteBufAlloc.setPolicy(config.s3StreamAllocatorPolicy)
adjustKafkaConfig(config)
// AutoMQ for Kafka inject end
if (config.requiresZookeeper) {
new KafkaServer(
Expand Down Expand Up @@ -182,57 +179,4 @@ object Kafka extends Logging {
}
Exit.exit(0)
}

// AutoMQ for Kafka inject start
private def adjustKafkaConfig(config: KafkaConfig): Unit = {
val s3WALCacheSizeSet = config.s3WALCacheSize > 0
val s3BlockCacheSizeSet = config.s3BlockCacheSize > 0
if (s3WALCacheSizeSet != s3BlockCacheSizeSet) {
throw new ConfigException(s"${KafkaConfig.S3WALCacheSizeProp} and ${KafkaConfig.S3BlockCacheSizeProp} must be set together")
}

val s3AvailableMemory = if (config.s3StreamAllocatorPolicy.isDirect) {
PlatformDependent.maxDirectMemory()
} else {
Runtime.getRuntime.maxMemory() / 2
}

config.s3WALCacheSize = {
if (s3WALCacheSizeSet) {
config.s3WALCacheSize
} else {
// for example:
// availableMemory = 3G, adjusted = max(3G / 3, (3G - 3G) / 3 * 2) = max(1G, 0) = 1G
// availableMemory = 6G, adjusted = max(6G / 3, (6G - 3G) / 3 * 2) = max(2G, 2G) = 2G
// availableMemory = 9G, adjusted = max(9G / 3, (9G - 3G) / 3 * 2) = max(3G, 4G) = 4G
// availableMemory = 12G, adjusted = max(12G / 3, (12G - 3G) / 3 * 2) = max(4G, 6G) = 6G
val adjusted = Math.max(s3AvailableMemory / 3, (s3AvailableMemory - 3L * 1024 * 1024 * 1024) / 3 * 2)
info(s"${KafkaConfig.S3WALCacheSizeProp} is not set, using $adjusted as the default value")
adjusted
}
}

config.s3BlockCacheSize = {
if (s3BlockCacheSizeSet) {
config.s3BlockCacheSize
} else {
// it's just 1/2 of {@link KafkaConfig#s3WALCacheSize}
val adjusted = Math.max(s3AvailableMemory / 6, (s3AvailableMemory - 3L * 1024 * 1024 * 1024) / 3)
info(s"${KafkaConfig.S3BlockCacheSizeProp} is not set, using $adjusted as the default value")
adjusted
}
}

config.s3WALUploadThreshold = {
if (config.s3WALUploadThreshold > 0) {
config.s3WALUploadThreshold
} else {
// it should not be greater than 1/3 of {@link KafkaConfig#s3WALCapacity} and {@link KafkaConfig#s3WALCacheSize}
val adjusted = (config.s3WALCapacity / 3) min (config.s3WALCacheSize / 3) min (500L * 1024 * 1024)
info(s"${KafkaConfig.S3WALUploadThresholdProp} is not set, using $adjusted as the default value")
adjusted
}
}
}
// AutoMQ for Kafka inject end
}
76 changes: 72 additions & 4 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package kafka.server

import com.automq.stream.s3.ByteBufAllocPolicy
import io.netty.util.internal.PlatformDependent
import kafka.autobalancer.config.AutoBalancerControllerConfig

import java.util
Expand All @@ -30,7 +31,7 @@ import kafka.log.LogConfig
import kafka.log.LogConfig.MessageFormatVersion
import kafka.message.{BrokerCompressionCodec, CompressionCodec, ProducerCompressionCodec, ZStdCompressionCodec}
import kafka.security.authorizer.AuthorizerUtils
import kafka.server.KafkaConfig.{ControllerListenerNamesProp, ListenerSecurityProtocolMapProp}
import kafka.server.KafkaConfig.{ControllerListenerNamesProp, ListenerSecurityProtocolMapProp, S3BlockCacheSizeProp, S3WALCacheSizeProp, S3WALUploadThresholdProp}
import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole, ProcessRole}
import kafka.utils.CoreUtils.parseCsvList
import kafka.utils.{CoreUtils, Logging}
Expand Down Expand Up @@ -2166,22 +2167,89 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami

// AutoMQ for Kafka inject start
/** ********* Kafka on S3 Configuration *********/
/**
* Adjust S3 configurations based on memory resource.
*
* @return (s3WALCacheSize, s3BlockCacheSize, s3WALUploadThreshold)
*/
private def adjustS3Configs(s3StreamAllocatorPolicy: ByteBufAllocPolicy, s3WALCapacity: Long): (Long, Long, Long) = {
val rawS3WALCacheSize = getLong(KafkaConfig.S3WALCacheSizeProp)
val rawS3BlockCacheSize = getLong(KafkaConfig.S3BlockCacheSizeProp)
val rawS3WALUploadThreshold = getLong(KafkaConfig.S3WALUploadThresholdProp)

val s3WALCacheSizeSet = rawS3WALCacheSize > 0
val s3BlockCacheSizeSet = rawS3BlockCacheSize > 0
val s3WALUploadThresholdSet = rawS3WALUploadThreshold > 0
if (s3WALCacheSizeSet != s3BlockCacheSizeSet) {
throw new ConfigException(s"${S3WALCacheSizeProp} and ${S3BlockCacheSizeProp} must be set together")
}

val s3AvailableMemory = if (s3StreamAllocatorPolicy.isDirect) {
PlatformDependent.maxDirectMemory()
} else {
Runtime.getRuntime.maxMemory() / 2
}

val s3WALCacheSize: Long = {
if (s3WALCacheSizeSet) {
rawS3WALCacheSize
} else {
// for example:
// availableMemory = 3G, adjusted = max(3G / 3, (3G - 3G) / 3 * 2) = max(1G, 0) = 1G
// availableMemory = 6G, adjusted = max(6G / 3, (6G - 3G) / 3 * 2) = max(2G, 2G) = 2G
// availableMemory = 9G, adjusted = max(9G / 3, (9G - 3G) / 3 * 2) = max(3G, 4G) = 4G
// availableMemory = 12G, adjusted = max(12G / 3, (12G - 3G) / 3 * 2) = max(4G, 6G) = 6G
val adjusted = Math.max(s3AvailableMemory / 3, (s3AvailableMemory - 3L * 1024 * 1024 * 1024) / 3 * 2)
if (doLog) {
info(s"$S3WALCacheSizeProp is not set, using $adjusted as the default value")
}
adjusted
}
}

val s3BlockCacheSize: Long = {
if (s3BlockCacheSizeSet) {
rawS3BlockCacheSize
} else {
// it's just 1/2 of s3WALCacheSize
val adjusted = Math.max(s3AvailableMemory / 6, (s3AvailableMemory - 3L * 1024 * 1024 * 1024) / 3)
if (doLog) {
info(s"$S3BlockCacheSizeProp is not set, using $adjusted as the default value")
}
adjusted
}
}

val s3WALUploadThreshold: Long = {
if (s3WALUploadThresholdSet) {
rawS3WALUploadThreshold
} else {
// it should not be greater than 1/3 of s3WALCapacity, 1/3 of s3WALCacheSize and 500M
val adjusted = (s3WALCapacity / 3) min (s3WALCacheSize / 3) min (500L * 1024 * 1024)
if (doLog) {
info(s"$S3WALUploadThresholdProp is not set, using $adjusted as the default value")
}
adjusted
}
}

(s3WALCacheSize, s3BlockCacheSize, s3WALUploadThreshold)
}

val s3Endpoint = getString(KafkaConfig.S3EndpointProp)
val s3Region = getString(KafkaConfig.S3RegionProp)
val s3PathStyle = getBoolean(KafkaConfig.S3PathStyleProp)
val s3Bucket = getString(KafkaConfig.S3BucketProp)
val s3WALPath = getString(KafkaConfig.S3WALPathProp)
var s3WALCacheSize = getLong(KafkaConfig.S3WALCacheSizeProp)
val s3WALCapacity = getLong(KafkaConfig.S3WALCapacityProp)
val s3WALThread = getInt(KafkaConfig.S3WALThreadProp)
val s3WALIOPS = getInt(KafkaConfig.S3WALIOPSProp)
var s3WALUploadThreshold = getLong(KafkaConfig.S3WALUploadThresholdProp)
val s3StreamSplitSize = getInt(KafkaConfig.S3StreamSplitSizeProp)
val s3ObjectBlockSize = getInt(KafkaConfig.S3ObjectBlockSizeProp)
val s3ObjectPartSize = getInt(KafkaConfig.S3ObjectPartSizeProp)
val s3ObjectTagging = getBoolean(KafkaConfig.S3ObjectTaggingProp)
var s3BlockCacheSize = getLong(KafkaConfig.S3BlockCacheSizeProp)
val s3StreamAllocatorPolicy = Enum.valueOf(classOf[ByteBufAllocPolicy], getString(KafkaConfig.S3StreamAllocatorPolicyProp))
val (s3WALCacheSize, s3BlockCacheSize, s3WALUploadThreshold) = adjustS3Configs(s3StreamAllocatorPolicy, s3WALCapacity)
val s3StreamObjectCompactionTaskIntervalMinutes = getInt(KafkaConfig.S3StreamObjectCompactionIntervalMinutesProp)
val s3StreamObjectCompactionMaxSizeBytes = getLong(KafkaConfig.S3StreamObjectCompactionMaxSizeBytesProp)
val s3ControllerRequestRetryMaxCount = getInt(KafkaConfig.S3ControllerRequestRetryMaxCountProp)
Expand Down

0 comments on commit 25476bc

Please sign in to comment.