Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(table): cherry table topic aspect #2237

Merged
merged 6 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -257,4 +257,14 @@ public class TopicConfig {
"broker will not perform down-conversion for consumers expecting an older message format. The broker responds " +
"with <code>UNSUPPORTED_VERSION</code> error for consume requests from such older clients. This configuration" +
"does not apply to any message format conversion that might be required for replication to followers.";

// AutoMQ inject start
public static final String TABLE_TOPIC_ENABLE_CONFIG = "automq.table.topic.enable";
public static final String TABLE_TOPIC_ENABLE_DOC = "The configuration controls whether enable table topic";
public static final String TABLE_TOPIC_COMMIT_INTERVAL_CONFIG = "automq.table.topic.commit.interval.ms";
public static final String TABLE_TOPIC_COMMIT_INTERVAL_DOC = "The table topic commit interval(ms)";
public static final String TABLE_TOPIC_NAMESPACE_CONFIG = "automq.table.topic.namespace";
public static final String TABLE_TOPIC_NAMESPACE_DOC = "The table topic table namespace";
// AutoMQ inject end

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,13 @@ public class Topic {
public static final String TRANSACTION_STATE_TOPIC_NAME = "__transaction_state";
public static final String SHARE_GROUP_STATE_TOPIC_NAME = "__share_group_state";
public static final String CLUSTER_METADATA_TOPIC_NAME = "__cluster_metadata";

// AutoMQ inject start
public static final String AUTO_BALANCER_METRICS_TOPIC_NAME = "__auto_balancer_metrics";
public static final String TABLE_TOPIC_CONTROL_TOPIC_NAME = "__automq_table_control";
public static final String TABLE_TOPIC_DATA_TOPIC_NAME = "__automq_table_data";
// AutoMQ inject end

public static final TopicPartition CLUSTER_METADATA_TOPIC_PARTITION = new TopicPartition(
CLUSTER_METADATA_TOPIC_NAME,
0
Expand Down
28 changes: 26 additions & 2 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ class Partition(val topicPartition: TopicPartition,
metricsGroup.newGauge("ReplicasCount", () => if (isLeader) assignmentState.replicationFactor else 0, tags)
metricsGroup.newGauge("LastStableOffsetLag", () => log.map(_.lastStableOffsetLag).getOrElse(0), tags)

// AutoMQ for Kafka inject start
// AutoMQ inject start
private val enableTraceLog = isTraceEnabled
private var closed: Boolean = false
/**
Expand All @@ -373,7 +373,8 @@ class Partition(val topicPartition: TopicPartition,
* Used to return fast when fetching messages with `fetchOffset` equals to `confirmOffset` in [[checkFetchOffsetAndMaybeGetInfo]]
*/
private var confirmOffset: Long = -1L
// AutoMQ for Kafka inject end
private val appendListeners = new CopyOnWriteArrayList[PartitionAppendListener]()
// AutoMQ inject end

def hasLateTransaction(currentTimeMs: Long): Boolean = leaderLogIfLocal.exists(_.hasLateTransaction(currentTimeMs))

Expand Down Expand Up @@ -432,6 +433,25 @@ class Partition(val topicPartition: TopicPartition,
listeners.remove(listener)
}

// AutoMQ inject start
def addAppendListener(listener: PartitionAppendListener): Unit = {
appendListeners.add(listener)
}

def removeAppendListener(listener: PartitionAppendListener): Unit = {
appendListeners.remove(listener)
}

def notifyAppendListener(records: MemoryRecords): Unit = {
try {
appendListeners.forEach(_.onAppend(topicPartition, records))
} catch {
case e: Exception =>
error(s"Error while notifying append listeners for partition $topicPartition", e)
}
}
// AutoMQ inject end

/**
* Create the future replica if 1) the current replica is not in the given log directory and 2) the future replica
* does not exist. This method assumes that the current replica has already been created.
Expand Down Expand Up @@ -1499,6 +1519,10 @@ class Partition(val topicPartition: TopicPartition,
val info = leaderLog.appendAsLeader(records, leaderEpoch = this.leaderEpoch, origin,
interBrokerProtocolVersion, requestLocal, verificationGuard)

// AutoMQ inject start
notifyAppendListener(records)
// AutoMQ inject end

// we may need to increment high watermark since ISR could be down to 1
(info, maybeIncrementLeaderHW(leaderLog))

Expand Down
21 changes: 21 additions & 0 deletions core/src/main/scala/kafka/cluster/PartitionAppendListener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2024, AutoMQ HK Limited.
*
* The use of this file is governed by the Business Source License,
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package kafka.cluster;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.MemoryRecords;

public interface PartitionAppendListener {

void onAppend(TopicPartition topicPartition, MemoryRecords records);

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import java.nio.ByteBuffer
import java.nio.file.Path
import java.util
import java.util.concurrent.atomic.LongAdder
import java.util.concurrent.{CompletableFuture, ConcurrentHashMap, Executors}
import java.util.concurrent.{CompletableFuture, ConcurrentHashMap, CopyOnWriteArrayList, Executors}
import scala.jdk.CollectionConverters.CollectionHasAsScala
import scala.util.{Failure, Success, Try}

Expand All @@ -57,6 +57,7 @@ class ElasticUnifiedLog(_logStartOffset: Long,
// fuzzy interval bytes for checkpoint, it's ok not thread safe
var checkpointIntervalBytes = 0
var lastCheckpointTimestamp = time.milliseconds()
var configChangeListeners = new CopyOnWriteArrayList[LogConfigChangeListener]()

def getLocalLog(): ElasticLog = elasticLog

Expand Down Expand Up @@ -229,10 +230,28 @@ class ElasticUnifiedLog(_logStartOffset: Long,
// noop implementation, producer snapshot and recover point will be appended to MetaStream, so they have order relation.
}

override def updateConfig(
newConfig: LogConfig): LogConfig = {
val config = super.updateConfig(newConfig)
for (listener <- configChangeListeners.asScala) {
try {
listener.onLogConfigChange(this, newConfig)
} catch {
case e: Throwable =>
error(s"Error while invoking config change listener $listener", e)
}
}
config
}

// only used for test
def listProducerSnapshots(): util.NavigableMap[java.lang.Long, ByteBuffer] = {
producerStateManager.asInstanceOf[ElasticProducerStateManager].snapshotsMap
}

def addConfigChangeListener(listener: LogConfigChangeListener): Unit = {
configChangeListeners.add(listener)
}
}

object ElasticUnifiedLog extends Logging {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright 2024, AutoMQ HK Limited.
*
* The use of this file is governed by the Business Source License,
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package kafka.log.streamaspect;

import org.apache.kafka.storage.internals.log.LogConfig;

public interface LogConfigChangeListener {

void onLogConfigChange(ElasticUnifiedLog log, LogConfig logConfig);

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -244,6 +245,10 @@ public Map<String, Object> replay() throws IOException {
return getValidMetaMap();
}

public Optional<ByteBuffer> get(String key) {
return Optional.ofNullable(metaCache.get(key)).map(o -> o.value.slice());
}

private Map<String, Object> getValidMetaMap() {
Map<String, Object> metaMap = new HashMap<>();
metaCache.forEach((key, value) -> {
Expand All @@ -261,7 +266,7 @@ private Map<String, Object> getValidMetaMap() {
metaMap.put(key, ElasticLeaderEpochCheckpointMeta.decode(value.value()));
break;
default:
LOGGER.error("{} streamId {}: unknown meta key: {}", logIdent, streamId(), key);
metaMap.put(key, value.value().duplicate());
}
});
return metaMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ import kafka.controller.KafkaController
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.utils.Logging
import org.apache.kafka.clients.ClientResponse
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.errors.InvalidTopicException
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME}
import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TABLE_TOPIC_CONTROL_TOPIC_NAME, TABLE_TOPIC_DATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME}
import org.apache.kafka.common.message.CreateTopicsRequestData
import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicConfig, CreatableTopicConfigCollection}
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
Expand Down Expand Up @@ -244,6 +245,28 @@ class DefaultAutoTopicCreationManager(
.setReplicationFactor(config.transactionTopicReplicationFactor)
.setConfigs(convertToTopicConfigCollections(
txnCoordinator.transactionTopicConfigs))

// AutoMQ inject start
case TABLE_TOPIC_CONTROL_TOPIC_NAME => {
val configs = new Properties()
configs.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, 20 * 1024 * 1024)
new CreatableTopic()
.setName(topic)
.setNumPartitions(1)
.setReplicationFactor(1)
.setConfigs(convertToTopicConfigCollections(configs))
}
case TABLE_TOPIC_DATA_TOPIC_NAME => {
val configs = new Properties()
configs.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, 20 * 1024 * 1024)
new CreatableTopic()
.setName(topic)
.setNumPartitions(50)
.setReplicationFactor(1)
.setConfigs(convertToTopicConfigCollections(configs))
}
// AutoMQ inject end

case topicName =>
new CreatableTopic()
.setName(topicName)
Expand Down
12 changes: 10 additions & 2 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import kafka.log.streamaspect.ElasticLogManager
import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.raft.KafkaRaftManager
import kafka.server.metadata.{AclPublisher, BrokerMetadataPublisher, ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, KRaftMetadataCache, ScramPublisher}
import kafka.server.streamaspect.{ElasticKafkaApis, ElasticReplicaManager}
import kafka.server.streamaspect.{ElasticKafkaApis, ElasticReplicaManager, PartitionLifecycleListener}
import kafka.utils.CoreUtils
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.message.ApiMessageType.ListenerType
Expand All @@ -39,7 +39,7 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{ClusterResource, TopicPartition, Uuid}
import org.apache.kafka.coordinator.group.metrics.{GroupCoordinatorMetrics, GroupCoordinatorRuntimeMetrics}
import org.apache.kafka.coordinator.group.{CoordinatorRecord, GroupCoordinator, GroupCoordinatorService, CoordinatorRecordSerde}
import org.apache.kafka.coordinator.group.{CoordinatorRecord, CoordinatorRecordSerde, GroupCoordinator, GroupCoordinatorService}
import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, MetadataPublisher}
import org.apache.kafka.image.loader.MetadataLoader
import org.apache.kafka.metadata.{BrokerState, ListenerInfo}
Expand Down Expand Up @@ -550,6 +550,10 @@ class BrokerServer(
// AutoMQ inject start
ElasticLogManager.init(config, clusterId, this)
produceRouter = newProduceRouter()

newPartitionLifecycleListeners().forEach(l => {
_replicaManager.addPartitionLifecycleListener(l)
})
// AutoMQ inject end

// We're now ready to unfence the broker. This also allows this broker to transition
Expand Down Expand Up @@ -820,6 +824,10 @@ class BrokerServer(
}
}
}

protected def newPartitionLifecycleListeners(): util.List[PartitionLifecycleListener] = {
new util.ArrayList[PartitionLifecycleListener]()
}
// AutoMQ inject end

}
10 changes: 9 additions & 1 deletion core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ import org.apache.kafka.common.{Node, TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.coordinator.group.{Group, GroupCoordinator}
import org.apache.kafka.server.ClientMetricsManager
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.common.{MetadataVersion}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_11_0_IV0, IBP_2_3_IV0}
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, FetchParams, FetchPartitionData}
Expand Down Expand Up @@ -1306,6 +1306,14 @@ class KafkaApis(val requestChannel: RequestChannel,
val controllerMutationQuota = quotas.controllerMutation.newPermissiveQuotaFor(request)
autoTopicCreationManager.createTopics(nonExistingTopics, controllerMutationQuota, Some(request.context))
} else {
// AutoMQ inject start
for (tableTopic <- Set(Topic.TABLE_TOPIC_CONTROL_TOPIC_NAME, Topic.TABLE_TOPIC_DATA_TOPIC_NAME)) {
if (nonExistingTopics.contains(tableTopic)) {
val controllerMutationQuota = quotas.controllerMutation.newPermissiveQuotaFor(request)
autoTopicCreationManager.createTopics(Set(tableTopic), controllerMutationQuota, Some(request.context))
}
}
// AutoMQ inject end
nonExistingTopics.map { topic =>
val error = try {
Topic.validate(topic)
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
val s3ExporterReportIntervalMs = getInt(AutoMQConfig.S3_TELEMETRY_EXPORTER_REPORT_INTERVAL_MS_CONFIG)
val s3BackPressureEnabled = getBoolean(AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG)
val s3BackPressureCooldownMs = getLong(AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG)
val tableTopicNamespace = getString(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG)
// AutoMQ inject end

/** Internal Configurations **/
Expand Down Expand Up @@ -1247,6 +1248,13 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
logProps.put(TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_CONFIG, logMessageDownConversionEnable: java.lang.Boolean)
logProps.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, remoteLogManagerConfig.logLocalRetentionMs: java.lang.Long)
logProps.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, remoteLogManagerConfig.logLocalRetentionBytes: java.lang.Long)

// AutoMQ inject start
if (tableTopicNamespace != null) {
logProps.put(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG, tableTopicNamespace)
}
// AutoMQ inject end

logProps
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ class ElasticReplicaManager(

private var fenced: Boolean = false

private val partitionLifecycleListeners = new util.ArrayList[PartitionLifecycleListener]()

override def startup(): Unit = {
super.startup()
val haltBrokerOnFailure = metadataCache.metadataVersion().isLessThan(MetadataVersion.IBP_1_0_IV0)
Expand Down Expand Up @@ -211,6 +213,7 @@ class ElasticReplicaManager(
getPartition(topicPartition) match {
case hostedPartition: HostedPartition.Online =>
if (allPartitions.remove(topicPartition, hostedPartition)) {
notifyPartitionClose(hostedPartition.partition)
brokerTopicStats.removeMetrics(topicPartition)
maybeRemoveTopicMetrics(topicPartition.topic)
// AutoMQ for Kafka inject start
Expand Down Expand Up @@ -1201,6 +1204,7 @@ class ElasticReplicaManager(
val state = info.partition.toLeaderAndIsrPartitionState(tp, true)
val partitionAssignedDirectoryId = directoryIds.find(_._1.topicPartition() == tp).map(_._2)
partition.makeLeader(state, offsetCheckpoints, Some(info.topicId), partitionAssignedDirectoryId)
notifyPartitionOpen(partition)
}).foreach { case (partition, _) =>
try {
changedPartitions.add(partition)
Expand Down Expand Up @@ -1400,4 +1404,17 @@ class ElasticReplicaManager(
}
}
}

def addPartitionLifecycleListener(listener: PartitionLifecycleListener): Unit = {
partitionLifecycleListeners.add(listener)
}

private def notifyPartitionOpen(partition: Partition): Unit = {
partitionLifecycleListeners.forEach(listener => CoreUtils.swallow(listener.onOpen(partition), this))
}

private def notifyPartitionClose(partition: Partition): Unit = {
partitionLifecycleListeners.forEach(listener => CoreUtils.swallow(listener.onClose(partition), this))
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2024, AutoMQ HK Limited.
*
* The use of this file is governed by the Business Source License,
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package kafka.server.streamaspect;

import kafka.cluster.Partition;

public interface PartitionLifecycleListener {

void onOpen(Partition partition);

void onClose(Partition partition);

}
Loading
Loading