diff --git a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java index 818f9200fd..836009849a 100755 --- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java @@ -81,37 +81,37 @@ public class TopicConfig { public static final String REMOTE_LOG_STORAGE_ENABLE_CONFIG = "remote.storage.enable"; public static final String REMOTE_LOG_STORAGE_ENABLE_DOC = "To enable tiered storage for a topic, set this configuration as true. " + - "You can not disable this config once it is enabled. It will be provided in future versions."; + "You can not disable this config once it is enabled. It will be provided in future versions."; public static final String LOCAL_LOG_RETENTION_MS_CONFIG = "local.retention.ms"; public static final String LOCAL_LOG_RETENTION_MS_DOC = "The number of milliseconds to keep the local log segment before it gets deleted. " + - "Default value is -2, it represents `retention.ms` value is to be used. The effective value should always be less than or equal " + - "to `retention.ms` value."; + "Default value is -2, it represents `retention.ms` value is to be used. The effective value should always be less than or equal " + + "to `retention.ms` value."; public static final String LOCAL_LOG_RETENTION_BYTES_CONFIG = "local.retention.bytes"; public static final String LOCAL_LOG_RETENTION_BYTES_DOC = "The maximum size of local log segments that can grow for a partition before it " + - "deletes the old segments. Default value is -2, it represents `retention.bytes` value to be used. The effective value should always be " + - "less than or equal to `retention.bytes` value."; + "deletes the old segments. Default value is -2, it represents `retention.bytes` value to be used. The effective value should always be " + + "less than or equal to `retention.bytes` value."; public static final String REMOTE_LOG_COPY_DISABLE_CONFIG = "remote.log.copy.disable"; public static final String REMOTE_LOG_COPY_DISABLE_DOC = "Determines whether tiered data for a topic should become read only," + - " and no more data uploading on a topic. Once this config is set to true, the local retention configuration " + - "(i.e. local.retention.ms/bytes) becomes irrelevant, and all data expiration follows the topic-wide retention configuration" + - "(i.e. retention.ms/bytes)."; + " and no more data uploading on a topic. Once this config is set to true, the local retention configuration " + + "(i.e. local.retention.ms/bytes) becomes irrelevant, and all data expiration follows the topic-wide retention configuration" + + "(i.e. retention.ms/bytes)."; public static final String REMOTE_LOG_DELETE_ON_DISABLE_CONFIG = "remote.log.delete.on.disable"; public static final String REMOTE_LOG_DELETE_ON_DISABLE_DOC = "Determines whether tiered data for a topic should be " + - "deleted after tiered storage is disabled on a topic. This configuration should be enabled when trying to " + - "set `remote.storage.enable` from true to false"; + "deleted after tiered storage is disabled on a topic. This configuration should be enabled when trying to " + + "set `remote.storage.enable` from true to false"; public static final String MAX_MESSAGE_BYTES_CONFIG = "max.message.bytes"; public static final String MAX_MESSAGE_BYTES_DOC = "The largest record batch size allowed by Kafka (after compression if compression is enabled). " + - "If this is increased and there are consumers older than 0.10.2, the consumers' fetch " + - "size must also be increased so that they can fetch record batches this large. " + - "In the latest message format version, records are always grouped into batches for efficiency. " + - "In previous message format versions, uncompressed records are not grouped into batches and this " + - "limit only applies to a single record in that case."; + "If this is increased and there are consumers older than 0.10.2, the consumers' fetch " + + "size must also be increased so that they can fetch record batches this large. " + + "In the latest message format version, records are always grouped into batches for efficiency. " + + "In previous message format versions, uncompressed records are not grouped into batches and this " + + "limit only applies to a single record in that case."; public static final String INDEX_INTERVAL_BYTES_CONFIG = "index.interval.bytes"; public static final String INDEX_INTERVAL_BYTES_DOC = "This setting controls how frequently " + @@ -167,7 +167,7 @@ public class TopicConfig { "not in the ISR set to be elected as leader as a last resort, even though doing so may result in data " + "loss.
Note: In KRaft mode, when enabling this config dynamically, it needs to wait for the unclean leader election" + "thread to trigger election periodically (default is 5 minutes). Please run `kafka-leader-election.sh` with `unclean` option " + - "to trigger the unclean leader election immediately if needed.
"; + "to trigger the unclean leader election immediately if needed."; public static final String MIN_IN_SYNC_REPLICAS_CONFIG = "min.insync.replicas"; public static final String MIN_IN_SYNC_REPLICAS_DOC = "When a producer sets acks to \"all\" (or \"-1\"), " + @@ -267,6 +267,15 @@ public class TopicConfig { public static final String TABLE_TOPIC_NAMESPACE_DOC = "The table topic table namespace"; public static final String TABLE_TOPIC_SCHEMA_TYPE_CONFIG = "automq.table.topic.schema.type"; public static final String TABLE_TOPIC_SCHEMA_TYPE_DOC = "The table topic schema type, support schemaless, schema"; + public static final String TABLE_TOPIC_ID_COLUMNS_CONFIG = "automq.table.topic.id.columns"; + public static final String TABLE_TOPIC_ID_COLUMNS_DOC = "The primary key, comma-separated list of columns that identify a row in tables." + + "ex. [region, name]"; + public static final String TABLE_TOPIC_PARTITION_BY_CONFIG = "automq.table.topic.partition.by"; + public static final String TABLE_TOPIC_PARTITION_BY_DOC = "The partition fields of the table. ex. [bucket(name), month(timestamp)]"; + public static final String TABLE_TOPIC_UPSERT_ENABLE_CONFIG = "automq.table.topic.upsert.enable"; + public static final String TABLE_TOPIC_UPSERT_ENABLE_DOC = "The configuration controls whether enable table topic upsert"; + public static final String TABLE_TOPIC_CDC_FIELD_CONFIG = "automq.table.topic.cdc.field"; + public static final String TABLE_TOPIC_CDC_FIELD_DOC = "The name of the field containing the CDC operation, I, U, or D"; // AutoMQ inject end } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java index 981925f38f..d8aa2aeb44 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java @@ -342,6 +342,11 @@ public Optional