From 24146943e6becf62069494620a97bbd4b10abfe2 Mon Sep 17 00:00:00 2001 From: "Xu Han@AutoMQ" Date: Wed, 20 Nov 2024 17:04:28 +0800 Subject: [PATCH 1/6] feat(table): table topic aspect --- .../kafka/common/config/TopicConfig.java | 10 +++++++ .../main/scala/kafka/cluster/Partition.scala | 28 +++++++++++++++++-- .../cluster/PartitionAppendListener.java | 21 ++++++++++++++ .../kafka/log/stream/s3/DefaultS3Client.java | 3 ++ .../log/streamaspect/ElasticUnifiedLog.scala | 21 +++++++++++++- .../streamaspect/LogConfigChangeListener.java | 20 +++++++++++++ .../kafka/log/streamaspect/MetaStream.java | 5 ++++ .../server/AutoTopicCreationManager.scala | 14 ++++++++++ .../scala/kafka/server/BrokerServer.scala | 12 ++++++-- .../main/scala/kafka/server/KafkaConfig.scala | 8 ++++++ .../streamaspect/ElasticReplicaManager.scala | 17 +++++++++++ .../PartitionLifecycleListener.java | 22 +++++++++++++++ .../config/ServerTopicConfigSynonyms.java | 7 +++++ .../storage/internals/log/LogConfig.java | 20 +++++++++++++ 14 files changed, 203 insertions(+), 5 deletions(-) create mode 100644 core/src/main/scala/kafka/cluster/PartitionAppendListener.java create mode 100644 core/src/main/scala/kafka/log/streamaspect/LogConfigChangeListener.java create mode 100644 core/src/main/scala/kafka/server/streamaspect/PartitionLifecycleListener.java diff --git a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java index 1437577ed0..9fd5a4d4f0 100755 --- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java @@ -257,4 +257,14 @@ public class TopicConfig { "broker will not perform down-conversion for consumers expecting an older message format. The broker responds " + "with UNSUPPORTED_VERSION error for consume requests from such older clients. This configuration" + "does not apply to any message format conversion that might be required for replication to followers."; + + // AutoMQ inject start + public static final String TABLE_TOPIC_ENABLE_CONFIG = "automq.table.topic.enable"; + public static final String TABLE_TOPIC_ENABLE_DOC = "The configuration controls whether enable table topic"; + public static final String TABLE_TOPIC_COMMIT_INTERVAL_CONFIG = "automq.table.topic.commit.interval.ms"; + public static final String TABLE_TOPIC_COMMIT_INTERVAL_DOC = "The table topic commit interval(ms)"; + public static final String TABLE_TOPIC_NAMESPACE_CONFIG = "automq.table.topic.namespace"; + public static final String TABLE_TOPIC_NAMESPACE_DOC = "The table topic table namespace"; + // AutoMQ inject end + } diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 2b285681c9..2ab05eaf89 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -364,7 +364,7 @@ class Partition(val topicPartition: TopicPartition, metricsGroup.newGauge("ReplicasCount", () => if (isLeader) assignmentState.replicationFactor else 0, tags) metricsGroup.newGauge("LastStableOffsetLag", () => log.map(_.lastStableOffsetLag).getOrElse(0), tags) - // AutoMQ for Kafka inject start + // AutoMQ inject start private val enableTraceLog = isTraceEnabled private var closed: Boolean = false /** @@ -373,7 +373,8 @@ class Partition(val topicPartition: TopicPartition, * Used to return fast when fetching messages with `fetchOffset` equals to `confirmOffset` in [[checkFetchOffsetAndMaybeGetInfo]] */ private var confirmOffset: Long = -1L - // AutoMQ for Kafka inject end + private val appendListeners = new CopyOnWriteArrayList[PartitionAppendListener]() + // AutoMQ inject end def hasLateTransaction(currentTimeMs: Long): Boolean = leaderLogIfLocal.exists(_.hasLateTransaction(currentTimeMs)) @@ -432,6 +433,25 @@ class Partition(val topicPartition: TopicPartition, listeners.remove(listener) } + // AutoMQ inject start + def addAppendListener(listener: PartitionAppendListener): Unit = { + appendListeners.add(listener) + } + + def removeAppendListener(listener: PartitionAppendListener): Unit = { + appendListeners.remove(listener) + } + + def notifyAppendListener(records: MemoryRecords): Unit = { + try { + appendListeners.forEach(_.onAppend(topicPartition, records)) + } catch { + case e: Exception => + error(s"Error while notifying append listeners for partition $topicPartition", e) + } + } + // AutoMQ inject end + /** * Create the future replica if 1) the current replica is not in the given log directory and 2) the future replica * does not exist. This method assumes that the current replica has already been created. @@ -1499,6 +1519,10 @@ class Partition(val topicPartition: TopicPartition, val info = leaderLog.appendAsLeader(records, leaderEpoch = this.leaderEpoch, origin, interBrokerProtocolVersion, requestLocal, verificationGuard) + // AutoMQ inject start + notifyAppendListener(records) + // AutoMQ inject end + // we may need to increment high watermark since ISR could be down to 1 (info, maybeIncrementLeaderHW(leaderLog)) diff --git a/core/src/main/scala/kafka/cluster/PartitionAppendListener.java b/core/src/main/scala/kafka/cluster/PartitionAppendListener.java new file mode 100644 index 0000000000..96e2dbc95e --- /dev/null +++ b/core/src/main/scala/kafka/cluster/PartitionAppendListener.java @@ -0,0 +1,21 @@ +/* + * Copyright 2024, AutoMQ HK Limited. + * + * The use of this file is governed by the Business Source License, + * as detailed in the file "/LICENSE.S3Stream" included in this repository. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package kafka.cluster; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.record.MemoryRecords; + +public interface PartitionAppendListener { + + void onAppend(TopicPartition topicPartition, MemoryRecords records); + +} diff --git a/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java b/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java index 2b17455c4f..39e3dfb234 100644 --- a/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java +++ b/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java @@ -27,6 +27,9 @@ import com.automq.stream.s3.Config; import com.automq.stream.s3.S3Storage; import com.automq.stream.s3.S3StreamClient; +import com.automq.stream.s3.backpressure.BackPressureManager; +import com.automq.stream.s3.backpressure.DefaultBackPressureManager; +import com.automq.stream.s3.backpressure.Regulator; import com.automq.stream.s3.cache.S3BlockCache; import com.automq.stream.s3.cache.blockcache.DefaultObjectReaderFactory; import com.automq.stream.s3.cache.blockcache.ObjectReaderFactory; diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticUnifiedLog.scala b/core/src/main/scala/kafka/log/streamaspect/ElasticUnifiedLog.scala index 6e406d1e56..f78cb4f458 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticUnifiedLog.scala +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticUnifiedLog.scala @@ -32,7 +32,7 @@ import java.nio.ByteBuffer import java.nio.file.Path import java.util import java.util.concurrent.atomic.LongAdder -import java.util.concurrent.{CompletableFuture, ConcurrentHashMap, Executors} +import java.util.concurrent.{CompletableFuture, ConcurrentHashMap, CopyOnWriteArrayList, Executors} import scala.jdk.CollectionConverters.CollectionHasAsScala import scala.util.{Failure, Success, Try} @@ -57,6 +57,7 @@ class ElasticUnifiedLog(_logStartOffset: Long, // fuzzy interval bytes for checkpoint, it's ok not thread safe var checkpointIntervalBytes = 0 var lastCheckpointTimestamp = time.milliseconds() + var configChangeListeners = new CopyOnWriteArrayList[LogConfigChangeListener]() def getLocalLog(): ElasticLog = elasticLog @@ -229,10 +230,28 @@ class ElasticUnifiedLog(_logStartOffset: Long, // noop implementation, producer snapshot and recover point will be appended to MetaStream, so they have order relation. } + override def updateConfig( + newConfig: LogConfig): LogConfig = { + val config = super.updateConfig(newConfig) + for (listener <- configChangeListeners.asScala) { + try { + listener.onLogConfigChange(this, newConfig) + } catch { + case e: Throwable => + error(s"Error while invoking config change listener $listener", e) + } + } + config + } + // only used for test def listProducerSnapshots(): util.NavigableMap[java.lang.Long, ByteBuffer] = { producerStateManager.asInstanceOf[ElasticProducerStateManager].snapshotsMap } + + def addConfigChangeListener(listener: LogConfigChangeListener): Unit = { + configChangeListeners.add(listener) + } } object ElasticUnifiedLog extends Logging { diff --git a/core/src/main/scala/kafka/log/streamaspect/LogConfigChangeListener.java b/core/src/main/scala/kafka/log/streamaspect/LogConfigChangeListener.java new file mode 100644 index 0000000000..f318accf27 --- /dev/null +++ b/core/src/main/scala/kafka/log/streamaspect/LogConfigChangeListener.java @@ -0,0 +1,20 @@ +/* + * Copyright 2024, AutoMQ HK Limited. + * + * The use of this file is governed by the Business Source License, + * as detailed in the file "/LICENSE.S3Stream" included in this repository. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package kafka.log.streamaspect; + +import org.apache.kafka.storage.internals.log.LogConfig; + +public interface LogConfigChangeListener { + + void onLogConfigChange(ElasticUnifiedLog log, LogConfig logConfig); + +} diff --git a/core/src/main/scala/kafka/log/streamaspect/MetaStream.java b/core/src/main/scala/kafka/log/streamaspect/MetaStream.java index 1d1acc21e7..4a836822e1 100644 --- a/core/src/main/scala/kafka/log/streamaspect/MetaStream.java +++ b/core/src/main/scala/kafka/log/streamaspect/MetaStream.java @@ -31,6 +31,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -244,6 +245,10 @@ public Map replay() throws IOException { return getValidMetaMap(); } + public Optional get(String key) { + return Optional.ofNullable(metaCache.get(key)).map(o -> o.value.slice()); + } + private Map getValidMetaMap() { Map metaMap = new HashMap<>(); metaCache.forEach((key, value) -> { diff --git a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala index a5c0146f84..f4093818dd 100644 --- a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala +++ b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala @@ -244,6 +244,20 @@ class DefaultAutoTopicCreationManager( .setReplicationFactor(config.transactionTopicReplicationFactor) .setConfigs(convertToTopicConfigCollections( txnCoordinator.transactionTopicConfigs)) + + // AutoMQ inject start + case "__automq_table_control" => + new CreatableTopic() + .setName(topic) + .setNumPartitions(1) + .setReplicationFactor(1) + case "__automq_table_data" => + new CreatableTopic() + .setName(topic) + .setNumPartitions(50) + .setReplicationFactor(1) + // AutoMQ inject end + case topicName => new CreatableTopic() .setName(topicName) diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 7d954c48f2..6d93155df2 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -28,7 +28,7 @@ import kafka.log.streamaspect.ElasticLogManager import kafka.network.{DataPlaneAcceptor, SocketServer} import kafka.raft.KafkaRaftManager import kafka.server.metadata.{AclPublisher, BrokerMetadataPublisher, ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, KRaftMetadataCache, ScramPublisher} -import kafka.server.streamaspect.{ElasticKafkaApis, ElasticReplicaManager} +import kafka.server.streamaspect.{ElasticKafkaApis, ElasticReplicaManager, PartitionLifecycleListener} import kafka.utils.CoreUtils import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.message.ApiMessageType.ListenerType @@ -39,7 +39,7 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok import org.apache.kafka.common.utils.{LogContext, Time} import org.apache.kafka.common.{ClusterResource, TopicPartition, Uuid} import org.apache.kafka.coordinator.group.metrics.{GroupCoordinatorMetrics, GroupCoordinatorRuntimeMetrics} -import org.apache.kafka.coordinator.group.{CoordinatorRecord, GroupCoordinator, GroupCoordinatorService, CoordinatorRecordSerde} +import org.apache.kafka.coordinator.group.{CoordinatorRecord, CoordinatorRecordSerde, GroupCoordinator, GroupCoordinatorService} import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, MetadataPublisher} import org.apache.kafka.image.loader.MetadataLoader import org.apache.kafka.metadata.{BrokerState, ListenerInfo} @@ -550,6 +550,10 @@ class BrokerServer( // AutoMQ inject start ElasticLogManager.init(config, clusterId, this) produceRouter = newProduceRouter() + + newPartitionLifecycleListeners().forEach(l => { + _replicaManager.addPartitionLifecycleListener(l) + }) // AutoMQ inject end // We're now ready to unfence the broker. This also allows this broker to transition @@ -820,6 +824,10 @@ class BrokerServer( } } } + + protected def newPartitionLifecycleListeners(): util.List[PartitionLifecycleListener] = { + new util.ArrayList[PartitionLifecycleListener]() + } // AutoMQ inject end } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 50935bdc67..715e9e7688 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -795,6 +795,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) val s3ExporterReportIntervalMs = getInt(AutoMQConfig.S3_TELEMETRY_EXPORTER_REPORT_INTERVAL_MS_CONFIG) val s3BackPressureEnabled = getBoolean(AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG) val s3BackPressureCooldownMs = getLong(AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG) + val tableTopicNamespace = getString(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG) // AutoMQ inject end /** Internal Configurations **/ @@ -1247,6 +1248,13 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) logProps.put(TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_CONFIG, logMessageDownConversionEnable: java.lang.Boolean) logProps.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, remoteLogManagerConfig.logLocalRetentionMs: java.lang.Long) logProps.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, remoteLogManagerConfig.logLocalRetentionBytes: java.lang.Long) + + // AutoMQ inject start + if (tableTopicNamespace != null) { + logProps.put(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG, tableTopicNamespace) + } + // AutoMQ inject end + logProps } diff --git a/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala b/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala index 89f2978c3e..358c27d3f1 100644 --- a/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala +++ b/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala @@ -180,6 +180,8 @@ class ElasticReplicaManager( private var fenced: Boolean = false + private val partitionLifecycleListeners = new util.ArrayList[PartitionLifecycleListener]() + override def startup(): Unit = { super.startup() val haltBrokerOnFailure = metadataCache.metadataVersion().isLessThan(MetadataVersion.IBP_1_0_IV0) @@ -211,6 +213,7 @@ class ElasticReplicaManager( getPartition(topicPartition) match { case hostedPartition: HostedPartition.Online => if (allPartitions.remove(topicPartition, hostedPartition)) { + notifyPartitionClose(hostedPartition.partition) brokerTopicStats.removeMetrics(topicPartition) maybeRemoveTopicMetrics(topicPartition.topic) // AutoMQ for Kafka inject start @@ -1201,6 +1204,7 @@ class ElasticReplicaManager( val state = info.partition.toLeaderAndIsrPartitionState(tp, true) val partitionAssignedDirectoryId = directoryIds.find(_._1.topicPartition() == tp).map(_._2) partition.makeLeader(state, offsetCheckpoints, Some(info.topicId), partitionAssignedDirectoryId) + notifyPartitionOpen(partition) }).foreach { case (partition, _) => try { changedPartitions.add(partition) @@ -1400,4 +1404,17 @@ class ElasticReplicaManager( } } } + + def addPartitionLifecycleListener(listener: PartitionLifecycleListener): Unit = { + partitionLifecycleListeners.add(listener) + } + + private def notifyPartitionOpen(partition: Partition): Unit = { + partitionLifecycleListeners.forEach(listener => CoreUtils.swallow(listener.onOpen(partition), this)) + } + + private def notifyPartitionClose(partition: Partition): Unit = { + partitionLifecycleListeners.forEach(listener => CoreUtils.swallow(listener.onClose(partition), this)) + } + } diff --git a/core/src/main/scala/kafka/server/streamaspect/PartitionLifecycleListener.java b/core/src/main/scala/kafka/server/streamaspect/PartitionLifecycleListener.java new file mode 100644 index 0000000000..7d47f9e824 --- /dev/null +++ b/core/src/main/scala/kafka/server/streamaspect/PartitionLifecycleListener.java @@ -0,0 +1,22 @@ +/* + * Copyright 2024, AutoMQ HK Limited. + * + * The use of this file is governed by the Business Source License, + * as detailed in the file "/LICENSE.S3Stream" included in this repository. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package kafka.server.streamaspect; + +import kafka.cluster.Partition; + +public interface PartitionLifecycleListener { + + void onOpen(Partition partition); + + void onClose(Partition partition); + +} diff --git a/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java b/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java index 66747e7436..33a802cf29 100644 --- a/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java +++ b/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java @@ -83,6 +83,13 @@ public final class ServerTopicConfigSynonyms { sameName(TopicConfig.COMPRESSION_GZIP_LEVEL_CONFIG), sameName(TopicConfig.COMPRESSION_LZ4_LEVEL_CONFIG), sameName(TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG), + + // AutoMQ inject start + sameName(TopicConfig.TABLE_TOPIC_ENABLE_CONFIG), + sameName(TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_CONFIG), + sameName(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG), + // AutoMQ inject end + sameNameWithLogPrefix(TopicConfig.PREALLOCATE_CONFIG), sameNameWithLogPrefix(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG), sameNameWithLogPrefix(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG), diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java index e9e417c5cc..fcb21a3488 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java @@ -257,6 +257,11 @@ public Optional serverConfigName(String configName) { .define(ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, CLASS, null, LOW, ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_DOC) .define(ServerLogConfigs.LOG_MESSAGE_DOWNCONVERSION_ENABLE_CONFIG, BOOLEAN, ServerLogConfigs.LOG_MESSAGE_DOWNCONVERSION_ENABLE_DEFAULT, LOW, ServerLogConfigs.LOG_MESSAGE_DOWNCONVERSION_ENABLE_DOC) .define(ServerLogConfigs.LOG_DIR_FAILURE_TIMEOUT_MS_CONFIG, LONG, ServerLogConfigs.LOG_DIR_FAILURE_TIMEOUT_MS_DEFAULT, atLeast(1), LOW, ServerLogConfigs.LOG_DIR_FAILURE_TIMEOUT_MS_DOC) + + // AutoMQ inject start + .define(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG, STRING, null, null, MEDIUM, TopicConfig.TABLE_TOPIC_NAMESPACE_DOC) + // AutoMQ inject end + .defineInternal(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG, LONG, ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT, atLeast(0), LOW, ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DOC); private static final LogConfigDef CONFIG = new LogConfigDef(); @@ -332,6 +337,9 @@ public Optional serverConfigName(String configName) { // AutoMQ inject start // dynamic config #validateNames check will check the old configs name validity .define("elasticstream.replication.factor", INT, 1, atLeast(1), LOW, "deprecated, should not remove for compatibility") + .define(TopicConfig.TABLE_TOPIC_ENABLE_CONFIG, BOOLEAN, false, null, MEDIUM, TopicConfig.TABLE_TOPIC_ENABLE_DOC) + .define(TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_CONFIG, LONG, TimeUnit.MINUTES.toMillis(5), between(1, TimeUnit.MINUTES.toMillis(15)), MEDIUM, TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_DOC) + .define(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG, STRING, null, null, MEDIUM, TopicConfig.TABLE_TOPIC_NAMESPACE_DOC) // AutoMQ inject end .define(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, BOOLEAN, false, MEDIUM, TopicConfig.REMOTE_LOG_COPY_DISABLE_DOC) .define(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, BOOLEAN, false, MEDIUM, TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_DOC); @@ -381,6 +389,12 @@ public Optional serverConfigName(String configName) { public final boolean messageDownConversionEnable; private final RemoteLogConfig remoteLogConfig; + // AutoMQ inject start + public final boolean tableTopicEnable; + public final long tableTopicCommitInterval; + public final String tableTopicNamespace; + // AutoMQ inject end + private final int maxMessageSize; private final Map props; @@ -431,6 +445,12 @@ public LogConfig(Map props, Set overriddenConfigs) { this.followerReplicationThrottledReplicas = Collections.unmodifiableList(getList(QuotaConfigs.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG)); this.messageDownConversionEnable = getBoolean(TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_CONFIG); + // AutoMQ inject start + this.tableTopicEnable = getBoolean(TopicConfig.TABLE_TOPIC_ENABLE_CONFIG); + this.tableTopicCommitInterval = getLong(TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_CONFIG); + this.tableTopicNamespace = getString(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG); + // AutoMQ inject end + remoteLogConfig = new RemoteLogConfig(this); } From 22d81c69e4019855a6333be7d5a9f11f05d70722 Mon Sep 17 00:00:00 2001 From: "Xu Han@AutoMQ" Date: Fri, 22 Nov 2024 17:15:44 +0800 Subject: [PATCH 2/6] chore(stream): move asyncsemaphore to util (#2173) Signed-off-by: Robin Han --- .../com/automq/stream/s3/cache/blockcache/DataBlockCache.java | 1 + .../stream/{s3/cache/blockcache => utils}/AsyncSemaphore.java | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) rename s3stream/src/main/java/com/automq/stream/{s3/cache/blockcache => utils}/AsyncSemaphore.java (97%) diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DataBlockCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DataBlockCache.java index 84244603a5..382fe59e90 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DataBlockCache.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DataBlockCache.java @@ -18,6 +18,7 @@ import com.automq.stream.s3.metrics.S3StreamMetricsManager; import com.automq.stream.s3.metrics.stats.StorageOperationStats; import com.automq.stream.s3.network.ThrottleStrategy; +import com.automq.stream.utils.AsyncSemaphore; import com.automq.stream.utils.FutureUtil; import com.automq.stream.utils.Time; import com.automq.stream.utils.threads.EventLoop; diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/AsyncSemaphore.java b/s3stream/src/main/java/com/automq/stream/utils/AsyncSemaphore.java similarity index 97% rename from s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/AsyncSemaphore.java rename to s3stream/src/main/java/com/automq/stream/utils/AsyncSemaphore.java index 9b7936546b..ec50c523ae 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/AsyncSemaphore.java +++ b/s3stream/src/main/java/com/automq/stream/utils/AsyncSemaphore.java @@ -9,7 +9,7 @@ * by the Apache License, Version 2.0 */ -package com.automq.stream.s3.cache.blockcache; +package com.automq.stream.utils; import com.automq.stream.utils.threads.EventLoop; @@ -21,7 +21,7 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; -class AsyncSemaphore { +public class AsyncSemaphore { private static final Logger LOGGER = LoggerFactory.getLogger(AsyncSemaphore.class); private final Queue tasks = new LinkedList<>(); private long permits; From 538d6148d69c918dd90edf6bd760454157be8fd5 Mon Sep 17 00:00:00 2001 From: "Xu Han@AutoMQ" Date: Tue, 26 Nov 2024 10:48:55 +0800 Subject: [PATCH 3/6] chore(table): set table max.message.bytes to 20MiB (#2182) Signed-off-by: Robin Han --- .../scala/kafka/log/streamaspect/MetaStream.java | 2 +- .../kafka/server/AutoTopicCreationManager.scala | 13 +++++++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/log/streamaspect/MetaStream.java b/core/src/main/scala/kafka/log/streamaspect/MetaStream.java index 4a836822e1..d18c40028c 100644 --- a/core/src/main/scala/kafka/log/streamaspect/MetaStream.java +++ b/core/src/main/scala/kafka/log/streamaspect/MetaStream.java @@ -266,7 +266,7 @@ private Map getValidMetaMap() { metaMap.put(key, ElasticLeaderEpochCheckpointMeta.decode(value.value())); break; default: - LOGGER.error("{} streamId {}: unknown meta key: {}", logIdent, streamId(), key); + metaMap.put(key, value.value().duplicate()); } }); return metaMap; diff --git a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala index f4093818dd..0e307b16a8 100644 --- a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala +++ b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala @@ -24,6 +24,7 @@ import kafka.controller.KafkaController import kafka.coordinator.transaction.TransactionCoordinator import kafka.utils.Logging import org.apache.kafka.clients.ClientResponse +import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.errors.InvalidTopicException import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME} @@ -246,16 +247,24 @@ class DefaultAutoTopicCreationManager( txnCoordinator.transactionTopicConfigs)) // AutoMQ inject start - case "__automq_table_control" => + case "__automq_table_control" => { + val configs = new Properties() + configs.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, 20 * 1024 * 1024) new CreatableTopic() .setName(topic) .setNumPartitions(1) .setReplicationFactor(1) - case "__automq_table_data" => + .setConfigs(convertToTopicConfigCollections(configs)) + } + case "__automq_table_data" => { + val configs = new Properties() + configs.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, 20 * 1024 * 1024) new CreatableTopic() .setName(topic) .setNumPartitions(50) .setReplicationFactor(1) + .setConfigs(convertToTopicConfigCollections(configs)) + } // AutoMQ inject end case topicName => From f3cda604ee76a118189902ddaa3354407dde896d Mon Sep 17 00:00:00 2001 From: "Xu Han@AutoMQ" Date: Thu, 28 Nov 2024 10:24:12 +0800 Subject: [PATCH 4/6] feat(table): auto create table topic control topic (#2186) Signed-off-by: Robin Han --- .../java/org/apache/kafka/common/internals/Topic.java | 6 ++++++ .../scala/kafka/server/AutoTopicCreationManager.scala | 6 +++--- core/src/main/scala/kafka/server/KafkaApis.scala | 10 +++++++++- 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/internals/Topic.java b/clients/src/main/java/org/apache/kafka/common/internals/Topic.java index d94c91da50..3709d8005b 100644 --- a/clients/src/main/java/org/apache/kafka/common/internals/Topic.java +++ b/clients/src/main/java/org/apache/kafka/common/internals/Topic.java @@ -30,7 +30,13 @@ public class Topic { public static final String TRANSACTION_STATE_TOPIC_NAME = "__transaction_state"; public static final String SHARE_GROUP_STATE_TOPIC_NAME = "__share_group_state"; public static final String CLUSTER_METADATA_TOPIC_NAME = "__cluster_metadata"; + + // AutoMQ inject start public static final String AUTO_BALANCER_METRICS_TOPIC_NAME = "__auto_balancer_metrics"; + public static final String TABLE_TOPIC_CONTROL_TOPIC_NAME = "__automq_table_control"; + public static final String TABLE_TOPIC_DATA_TOPIC_NAME = "__automq_table_data"; + // AutoMQ inject end + public static final TopicPartition CLUSTER_METADATA_TOPIC_PARTITION = new TopicPartition( CLUSTER_METADATA_TOPIC_NAME, 0 diff --git a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala index 0e307b16a8..9060c464bf 100644 --- a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala +++ b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala @@ -27,7 +27,7 @@ import org.apache.kafka.clients.ClientResponse import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.errors.InvalidTopicException import org.apache.kafka.common.internals.Topic -import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME} +import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TABLE_TOPIC_CONTROL_TOPIC_NAME, TABLE_TOPIC_DATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME} import org.apache.kafka.common.message.CreateTopicsRequestData import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicConfig, CreatableTopicConfigCollection} import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic @@ -247,7 +247,7 @@ class DefaultAutoTopicCreationManager( txnCoordinator.transactionTopicConfigs)) // AutoMQ inject start - case "__automq_table_control" => { + case TABLE_TOPIC_CONTROL_TOPIC_NAME => { val configs = new Properties() configs.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, 20 * 1024 * 1024) new CreatableTopic() @@ -256,7 +256,7 @@ class DefaultAutoTopicCreationManager( .setReplicationFactor(1) .setConfigs(convertToTopicConfigCollections(configs)) } - case "__automq_table_data" => { + case TABLE_TOPIC_DATA_TOPIC_NAME => { val configs = new Properties() configs.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, 20 * 1024 * 1024) new CreatableTopic() diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index eedc9afe06..1f61a32570 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -70,7 +70,7 @@ import org.apache.kafka.common.{Node, TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.coordinator.group.{Group, GroupCoordinator} import org.apache.kafka.server.ClientMetricsManager import org.apache.kafka.server.authorizer._ -import org.apache.kafka.server.common.{MetadataVersion} +import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion.{IBP_0_11_0_IV0, IBP_2_3_IV0} import org.apache.kafka.server.record.BrokerCompressionType import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, FetchParams, FetchPartitionData} @@ -1306,6 +1306,14 @@ class KafkaApis(val requestChannel: RequestChannel, val controllerMutationQuota = quotas.controllerMutation.newPermissiveQuotaFor(request) autoTopicCreationManager.createTopics(nonExistingTopics, controllerMutationQuota, Some(request.context)) } else { + // AutoMQ inject start + for (tableTopic <- Set(Topic.TABLE_TOPIC_CONTROL_TOPIC_NAME, Topic.TABLE_TOPIC_DATA_TOPIC_NAME)) { + if (nonExistingTopics.contains(tableTopic)) { + val controllerMutationQuota = quotas.controllerMutation.newPermissiveQuotaFor(request) + autoTopicCreationManager.createTopics(Set(tableTopic), controllerMutationQuota, Some(request.context)) + } + } + // AutoMQ inject end nonExistingTopics.map { topic => val error = try { Topic.validate(topic) From 6d3f3ec3a96051b032f6e8435aad0491be81923b Mon Sep 17 00:00:00 2001 From: "Xu Han@AutoMQ" Date: Thu, 5 Dec 2024 10:09:28 +0800 Subject: [PATCH 5/6] chore(gradle): update aws version to 2.29.26 (#2210) Signed-off-by: Robin Han --- gradle/dependencies.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index b0b5d77d74..5b3e8782ce 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -172,7 +172,7 @@ versions += [ opentelemetrySDKAlpha: "1.40.0-alpha", opentelemetryInstrument: "2.6.0-alpha", oshi: "6.4.7", - awsSdk:"2.26.10", + awsSdk:"2.29.26", bucket4j:"8.5.0", jna:"5.2.0", guava:"32.0.1-jre", From 359d6bbf257501d1f4b6420b607f547545aa1b59 Mon Sep 17 00:00:00 2001 From: "Xu Han@AutoMQ" Date: Fri, 13 Dec 2024 14:33:50 +0800 Subject: [PATCH 6/6] feat(tools/perf): support schema message perf (#2226) Signed-off-by: Robin Han --- core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java b/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java index 39e3dfb234..2b17455c4f 100644 --- a/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java +++ b/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java @@ -27,9 +27,6 @@ import com.automq.stream.s3.Config; import com.automq.stream.s3.S3Storage; import com.automq.stream.s3.S3StreamClient; -import com.automq.stream.s3.backpressure.BackPressureManager; -import com.automq.stream.s3.backpressure.DefaultBackPressureManager; -import com.automq.stream.s3.backpressure.Regulator; import com.automq.stream.s3.cache.S3BlockCache; import com.automq.stream.s3.cache.blockcache.DefaultObjectReaderFactory; import com.automq.stream.s3.cache.blockcache.ObjectReaderFactory;