Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(table): table topic aspect #2167

Merged
merged 3 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -256,4 +256,14 @@ public class TopicConfig {
"broker will not perform down-conversion for consumers expecting an older message format. The broker responds " +
"with <code>UNSUPPORTED_VERSION</code> error for consume requests from such older clients. This configuration" +
"does not apply to any message format conversion that might be required for replication to followers.";

// AutoMQ inject start
public static final String TABLE_TOPIC_ENABLE_CONFIG = "automq.table.topic.enable";
public static final String TABLE_TOPIC_ENABLE_DOC = "The configuration controls whether enable table topic";
public static final String TABLE_TOPIC_COMMIT_INTERVAL_CONFIG = "automq.table.topic.commit.interval.ms";
public static final String TABLE_TOPIC_COMMIT_INTERVAL_DOC = "The table topic commit interval(ms)";
public static final String TABLE_TOPIC_NAMESPACE_CONFIG = "automq.table.topic.namespace";
public static final String TABLE_TOPIC_NAMESPACE_DOC = "The table topic table namespace";
// AutoMQ inject end

}
28 changes: 26 additions & 2 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ class Partition(val topicPartition: TopicPartition,
metricsGroup.newGauge("ReplicasCount", () => if (isLeader) assignmentState.replicationFactor else 0, tags)
metricsGroup.newGauge("LastStableOffsetLag", () => log.map(_.lastStableOffsetLag).getOrElse(0), tags)

// AutoMQ for Kafka inject start
// AutoMQ inject start
private val enableTraceLog = isTraceEnabled
private var closed: Boolean = false
/**
Expand All @@ -373,7 +373,8 @@ class Partition(val topicPartition: TopicPartition,
* Used to return fast when fetching messages with `fetchOffset` equals to `confirmOffset` in [[checkFetchOffsetAndMaybeGetInfo]]
*/
private var confirmOffset: Long = -1L
// AutoMQ for Kafka inject end
private val appendListeners = new CopyOnWriteArrayList[PartitionAppendListener]()
// AutoMQ inject end

def hasLateTransaction(currentTimeMs: Long): Boolean = leaderLogIfLocal.exists(_.hasLateTransaction(currentTimeMs))

Expand Down Expand Up @@ -432,6 +433,25 @@ class Partition(val topicPartition: TopicPartition,
listeners.remove(listener)
}

// AutoMQ inject start
def addAppendListener(listener: PartitionAppendListener): Unit = {
appendListeners.add(listener)
}

def removeAppendListener(listener: PartitionAppendListener): Unit = {
appendListeners.remove(listener)
}

def notifyAppendListener(records: MemoryRecords): Unit = {
try {
appendListeners.forEach(_.onAppend(topicPartition, records))
} catch {
case e: Exception =>
error(s"Error while notifying append listeners for partition $topicPartition", e)
}
}
// AutoMQ inject end

/**
* Create the future replica if 1) the current replica is not in the given log directory and 2) the future replica
* does not exist. This method assumes that the current replica has already been created.
Expand Down Expand Up @@ -1499,6 +1519,10 @@ class Partition(val topicPartition: TopicPartition,
val info = leaderLog.appendAsLeader(records, leaderEpoch = this.leaderEpoch, origin,
interBrokerProtocolVersion, requestLocal, verificationGuard)

// AutoMQ inject start
notifyAppendListener(records)
// AutoMQ inject end

// we may need to increment high watermark since ISR could be down to 1
(info, maybeIncrementLeaderHW(leaderLog))

Expand Down
21 changes: 21 additions & 0 deletions core/src/main/scala/kafka/cluster/PartitionAppendListener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2024, AutoMQ HK Limited.
*
* The use of this file is governed by the Business Source License,
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package kafka.cluster;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.MemoryRecords;

public interface PartitionAppendListener {

void onAppend(TopicPartition topicPartition, MemoryRecords records);

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@

package kafka.log.stream.s3;

import com.automq.stream.s3.backpressure.BackPressureManager;
import com.automq.stream.s3.backpressure.DefaultBackPressureManager;
import com.automq.stream.s3.backpressure.Regulator;
import kafka.autobalancer.metricsreporter.metric.Derivator;
import kafka.log.stream.s3.metadata.StreamMetadataManager;
import kafka.log.stream.s3.network.ControllerRequestSender;
Expand All @@ -30,6 +27,9 @@
import com.automq.stream.s3.Config;
import com.automq.stream.s3.S3Storage;
import com.automq.stream.s3.S3StreamClient;
import com.automq.stream.s3.backpressure.BackPressureManager;
import com.automq.stream.s3.backpressure.DefaultBackPressureManager;
import com.automq.stream.s3.backpressure.Regulator;
import com.automq.stream.s3.cache.S3BlockCache;
import com.automq.stream.s3.cache.blockcache.DefaultObjectReaderFactory;
import com.automq.stream.s3.cache.blockcache.ObjectReaderFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import java.nio.ByteBuffer
import java.nio.file.Path
import java.util
import java.util.concurrent.atomic.LongAdder
import java.util.concurrent.{CompletableFuture, ConcurrentHashMap, Executors}
import java.util.concurrent.{CompletableFuture, ConcurrentHashMap, CopyOnWriteArrayList, Executors}
import scala.jdk.CollectionConverters.CollectionHasAsScala
import scala.util.{Failure, Success, Try}

Expand All @@ -57,6 +57,7 @@ class ElasticUnifiedLog(_logStartOffset: Long,
// fuzzy interval bytes for checkpoint, it's ok not thread safe
var checkpointIntervalBytes = 0
var lastCheckpointTimestamp = time.milliseconds()
var configChangeListeners = new CopyOnWriteArrayList[LogConfigChangeListener]()

def getLocalLog(): ElasticLog = elasticLog

Expand Down Expand Up @@ -229,10 +230,28 @@ class ElasticUnifiedLog(_logStartOffset: Long,
// noop implementation, producer snapshot and recover point will be appended to MetaStream, so they have order relation.
}

override def updateConfig(
newConfig: LogConfig): LogConfig = {
val config = super.updateConfig(newConfig)
for (listener <- configChangeListeners.asScala) {
try {
listener.onLogConfigChange(this, newConfig)
} catch {
case e: Throwable =>
error(s"Error while invoking config change listener $listener", e)
}
}
config
}

// only used for test
def listProducerSnapshots(): util.NavigableMap[java.lang.Long, ByteBuffer] = {
producerStateManager.asInstanceOf[ElasticProducerStateManager].snapshotsMap
}

def addConfigChangeListener(listener: LogConfigChangeListener): Unit = {
configChangeListeners.add(listener)
}
}

object ElasticUnifiedLog extends Logging {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright 2024, AutoMQ HK Limited.
*
* The use of this file is governed by the Business Source License,
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package kafka.log.streamaspect;

import org.apache.kafka.storage.internals.log.LogConfig;

public interface LogConfigChangeListener {

void onLogConfigChange(ElasticUnifiedLog log, LogConfig logConfig);

}
5 changes: 5 additions & 0 deletions core/src/main/scala/kafka/log/streamaspect/MetaStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -244,6 +245,10 @@ public Map<String, Object> replay() throws IOException {
return getValidMetaMap();
}

public Optional<ByteBuffer> get(String key) {
return Optional.ofNullable(metaCache.get(key)).map(o -> o.value.slice());
}

private Map<String, Object> getValidMetaMap() {
Map<String, Object> metaMap = new HashMap<>();
metaCache.forEach((key, value) -> {
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,20 @@ class DefaultAutoTopicCreationManager(
.setReplicationFactor(config.transactionTopicReplicationFactor)
.setConfigs(convertToTopicConfigCollections(
txnCoordinator.transactionTopicConfigs))

// AutoMQ inject start
case "__automq_table_control" =>
new CreatableTopic()
.setName(topic)
.setNumPartitions(1)
.setReplicationFactor(1)
case "__automq_table_data" =>
new CreatableTopic()
.setName(topic)
.setNumPartitions(50)
.setReplicationFactor(1)
// AutoMQ inject end

case topicName =>
new CreatableTopic()
.setName(topicName)
Expand Down
12 changes: 10 additions & 2 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import kafka.log.streamaspect.ElasticLogManager
import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.raft.KafkaRaftManager
import kafka.server.metadata.{AclPublisher, BrokerMetadataPublisher, ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, KRaftMetadataCache, ScramPublisher}
import kafka.server.streamaspect.{ElasticKafkaApis, ElasticReplicaManager}
import kafka.server.streamaspect.{ElasticKafkaApis, ElasticReplicaManager, PartitionLifecycleListener}
import kafka.utils.CoreUtils
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.feature.SupportedVersionRange
Expand All @@ -39,7 +39,7 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{ClusterResource, TopicPartition, Uuid}
import org.apache.kafka.coordinator.group.metrics.{GroupCoordinatorMetrics, GroupCoordinatorRuntimeMetrics}
import org.apache.kafka.coordinator.group.{CoordinatorRecord, GroupCoordinator, GroupCoordinatorService, CoordinatorRecordSerde}
import org.apache.kafka.coordinator.group.{CoordinatorRecord, CoordinatorRecordSerde, GroupCoordinator, GroupCoordinatorService}
import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, MetadataPublisher}
import org.apache.kafka.image.loader.MetadataLoader
import org.apache.kafka.metadata.{BrokerState, ListenerInfo, VersionRange}
Expand Down Expand Up @@ -544,6 +544,10 @@ class BrokerServer(
// AutoMQ inject start
ElasticLogManager.init(config, clusterId, this)
produceRouter = newProduceRouter()

newPartitionLifecycleListeners().forEach(l => {
_replicaManager.addPartitionLifecycleListener(l)
})
// AutoMQ inject end

// We're now ready to unfence the broker. This also allows this broker to transition
Expand Down Expand Up @@ -794,6 +798,10 @@ class BrokerServer(
dataPlaneRequestProcessor.asInstanceOf[ElasticKafkaApis].setProduceRouter(produceRouter)
produceRouter
}

protected def newPartitionLifecycleListeners(): util.List[PartitionLifecycleListener] = {
new util.ArrayList[PartitionLifecycleListener]()
}
// AutoMQ inject end

}
8 changes: 8 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -792,6 +792,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
val s3RefillPeriodMsProp = getInt(AutoMQConfig.S3_NETWORK_REFILL_PERIOD_MS_CONFIG)
val s3MetricsLevel = getString(AutoMQConfig.S3_TELEMETRY_METRICS_LEVEL_CONFIG)
val s3ExporterReportIntervalMs = getInt(AutoMQConfig.S3_TELEMETRY_EXPORTER_REPORT_INTERVAL_MS_CONFIG)
val tableTopicNamespace = getString(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG)
// AutoMQ inject end

/** Internal Configurations **/
Expand Down Expand Up @@ -1231,6 +1232,13 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
logProps.put(TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_CONFIG, logMessageDownConversionEnable: java.lang.Boolean)
logProps.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, remoteLogManagerConfig.logLocalRetentionMs: java.lang.Long)
logProps.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, remoteLogManagerConfig.logLocalRetentionBytes: java.lang.Long)

// AutoMQ inject start
if (tableTopicNamespace != null) {
logProps.put(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG, tableTopicNamespace)
}
// AutoMQ inject end

logProps
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ class ElasticReplicaManager(

private var fenced: Boolean = false

private val partitionLifecycleListeners = new util.ArrayList[PartitionLifecycleListener]()

override def startup(): Unit = {
super.startup()
val haltBrokerOnFailure = metadataCache.metadataVersion().isLessThan(MetadataVersion.IBP_1_0_IV0)
Expand Down Expand Up @@ -196,6 +198,7 @@ class ElasticReplicaManager(
getPartition(topicPartition) match {
case hostedPartition: HostedPartition.Online =>
if (allPartitions.remove(topicPartition, hostedPartition)) {
notifyPartitionClose(hostedPartition.partition)
brokerTopicStats.removeMetrics(topicPartition)
maybeRemoveTopicMetrics(topicPartition.topic)
// AutoMQ for Kafka inject start
Expand Down Expand Up @@ -1184,6 +1187,7 @@ class ElasticReplicaManager(
val state = info.partition.toLeaderAndIsrPartitionState(tp, true)
val partitionAssignedDirectoryId = directoryIds.find(_._1.topicPartition() == tp).map(_._2)
partition.makeLeader(state, offsetCheckpoints, Some(info.topicId), partitionAssignedDirectoryId)
notifyPartitionOpen(partition)
}).foreach { case (partition, _) =>
try {
changedPartitions.add(partition)
Expand Down Expand Up @@ -1383,4 +1387,17 @@ class ElasticReplicaManager(
}
}
}

def addPartitionLifecycleListener(listener: PartitionLifecycleListener): Unit = {
partitionLifecycleListeners.add(listener)
}

private def notifyPartitionOpen(partition: Partition): Unit = {
partitionLifecycleListeners.forEach(listener => CoreUtils.swallow(listener.onOpen(partition), this))
}

private def notifyPartitionClose(partition: Partition): Unit = {
partitionLifecycleListeners.forEach(listener => CoreUtils.swallow(listener.onClose(partition), this))
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2024, AutoMQ HK Limited.
*
* The use of this file is governed by the Business Source License,
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package kafka.server.streamaspect;

import kafka.cluster.Partition;

public interface PartitionLifecycleListener {

void onOpen(Partition partition);

void onClose(Partition partition);

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@

import com.automq.stream.utils.ThreadUtils;
import com.automq.stream.utils.Threads;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultBackPressureManager implements BackPressureManager {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@

package com.automq.stream.utils;

import io.netty.util.concurrent.FastThreadLocalThread;
import org.slf4j.Logger;
import org.slf4j.helpers.NOPLogger;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.helpers.NOPLogger;

import io.netty.util.concurrent.FastThreadLocalThread;

/**
* Utilities for working with threads.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@

package com.automq.stream.s3.backpressure;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
Expand Down
Loading
Loading