From 91a7845c06bc352a3503b3e7ea9ac878dc06bc2e Mon Sep 17 00:00:00 2001 From: Robin Han Date: Wed, 25 Dec 2024 11:46:14 +0800 Subject: [PATCH] feat(table): support partition & upsert config Signed-off-by: Robin Han --- .../kafka/common/config/TopicConfig.java | 31 ++++++++++++------- .../storage/internals/log/LogConfig.java | 13 ++++++++ 2 files changed, 33 insertions(+), 11 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java index 440f2cbced..09d8d0d739 100755 --- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java @@ -81,17 +81,17 @@ public class TopicConfig { public static final String REMOTE_LOG_STORAGE_ENABLE_CONFIG = "remote.storage.enable"; public static final String REMOTE_LOG_STORAGE_ENABLE_DOC = "To enable tiered storage for a topic, set this configuration as true. " + - "You can not disable this config once it is enabled. It will be provided in future versions."; + "You can not disable this config once it is enabled. It will be provided in future versions."; public static final String LOCAL_LOG_RETENTION_MS_CONFIG = "local.retention.ms"; public static final String LOCAL_LOG_RETENTION_MS_DOC = "The number of milliseconds to keep the local log segment before it gets deleted. " + - "Default value is -2, it represents `retention.ms` value is to be used. The effective value should always be less than or equal " + - "to `retention.ms` value."; + "Default value is -2, it represents `retention.ms` value is to be used. The effective value should always be less than or equal " + + "to `retention.ms` value."; public static final String LOCAL_LOG_RETENTION_BYTES_CONFIG = "local.retention.bytes"; public static final String LOCAL_LOG_RETENTION_BYTES_DOC = "The maximum size of local log segments that can grow for a partition before it " + - "deletes the old segments. Default value is -2, it represents `retention.bytes` value to be used. The effective value should always be " + - "less than or equal to `retention.bytes` value."; + "deletes the old segments. Default value is -2, it represents `retention.bytes` value to be used. The effective value should always be " + + "less than or equal to `retention.bytes` value."; public static final String REMOTE_LOG_DISABLE_POLICY_RETAIN = "retain"; public static final String REMOTE_LOG_DISABLE_POLICY_DELETE = "delete"; @@ -103,16 +103,16 @@ public class TopicConfig { "selected then all data in remote will be kept post-disablement and will only be deleted when it breaches expiration " + "thresholds. If %s is selected then the data will be made inaccessible immediately by advancing the log start offset and will be " + "deleted asynchronously.", REMOTE_LOG_DISABLE_POLICY_RETAIN, REMOTE_LOG_DISABLE_POLICY_DELETE, - REMOTE_LOG_DISABLE_POLICY_RETAIN, REMOTE_LOG_DISABLE_POLICY_DELETE); + REMOTE_LOG_DISABLE_POLICY_RETAIN, REMOTE_LOG_DISABLE_POLICY_DELETE); public static final String MAX_MESSAGE_BYTES_CONFIG = "max.message.bytes"; public static final String MAX_MESSAGE_BYTES_DOC = "The largest record batch size allowed by Kafka (after compression if compression is enabled). " + - "If this is increased and there are consumers older than 0.10.2, the consumers' fetch " + - "size must also be increased so that they can fetch record batches this large. " + - "In the latest message format version, records are always grouped into batches for efficiency. " + - "In previous message format versions, uncompressed records are not grouped into batches and this " + - "limit only applies to a single record in that case."; + "If this is increased and there are consumers older than 0.10.2, the consumers' fetch " + + "size must also be increased so that they can fetch record batches this large. " + + "In the latest message format version, records are always grouped into batches for efficiency. " + + "In previous message format versions, uncompressed records are not grouped into batches and this " + + "limit only applies to a single record in that case."; public static final String INDEX_INTERVAL_BYTES_CONFIG = "index.interval.bytes"; public static final String INDEX_INTERVAL_BYTES_DOC = "This setting controls how frequently " + @@ -266,6 +266,15 @@ public class TopicConfig { public static final String TABLE_TOPIC_NAMESPACE_DOC = "The table topic table namespace"; public static final String TABLE_TOPIC_SCHEMA_TYPE_CONFIG = "automq.table.topic.schema.type"; public static final String TABLE_TOPIC_SCHEMA_TYPE_DOC = "The table topic schema type, support schemaless, schema"; + public static final String TABLE_TOPIC_ID_COLUMNS_CONFIG = "automq.table.topic.id.columns"; + public static final String TABLE_TOPIC_ID_COLUMNS_DOC = "The primary key, comma-separated list of columns that identify a row in tables." + + "ex. [region, name]"; + public static final String TABLE_TOPIC_PARTITION_BY_CONFIG = "automq.table.topic.partition.by"; + public static final String TABLE_TOPIC_PARTITION_BY_DOC = "The partition fields of the table. ex. [bucket(name), month(timestamp)]"; + public static final String TABLE_TOPIC_UPSERT_ENABLE_CONFIG = "automq.table.topic.upsert.enable"; + public static final String TABLE_TOPIC_UPSERT_ENABLE_DOC = "The configuration controls whether enable table topic upsert"; + public static final String TABLE_TOPIC_CDC_FIELD_CONFIG = "automq.table.topic.cdc.field"; + public static final String TABLE_TOPIC_CDC_FIELD_DOC = "The name of the field containing the CDC operation, I, U, or D"; // AutoMQ inject end } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java index df4a4b39ac..472644f10e 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java @@ -338,6 +338,11 @@ public Optional serverConfigName(String configName) { .define(TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_CONFIG, LONG, TimeUnit.MINUTES.toMillis(5), between(1, TimeUnit.MINUTES.toMillis(15)), MEDIUM, TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_DOC) .define(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG, STRING, null, null, MEDIUM, TopicConfig.TABLE_TOPIC_NAMESPACE_DOC) .define(TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_CONFIG, STRING, TableTopicSchemaType.SCHEMALESS.name, in(TableTopicSchemaType.names().toArray(new String[0])), MEDIUM, TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_DOC) + // TODO: add validator + .define(TopicConfig.TABLE_TOPIC_ID_COLUMNS_CONFIG, STRING, null, null, MEDIUM, TopicConfig.TABLE_TOPIC_ID_COLUMNS_DOC) + .define(TopicConfig.TABLE_TOPIC_PARTITION_BY_CONFIG, STRING, null, null, MEDIUM, TopicConfig.TABLE_TOPIC_PARTITION_BY_DOC) + .define(TopicConfig.TABLE_TOPIC_UPSERT_ENABLE_CONFIG, BOOLEAN, false, null, MEDIUM, TopicConfig.TABLE_TOPIC_UPSERT_ENABLE_DOC) + .define(TopicConfig.TABLE_TOPIC_CDC_FIELD_CONFIG, STRING, null, null, MEDIUM, TopicConfig.TABLE_TOPIC_CDC_FIELD_DOC) // AutoMQ inject end .define(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG, STRING, TopicConfig.REMOTE_LOG_DISABLE_POLICY_RETAIN, in(TopicConfig.REMOTE_LOG_DISABLE_POLICY_RETAIN, TopicConfig.REMOTE_LOG_DISABLE_POLICY_DELETE), @@ -393,6 +398,10 @@ public Optional serverConfigName(String configName) { public final long tableTopicCommitInterval; public final String tableTopicNamespace; public final TableTopicSchemaType tableTopicSchemaType; + public final String tableTopicIdColumns; + public final String tableTopicPartitionBy; + public final boolean tableTopicUpsertEnable; + public final String tableTopicCdcField; // AutoMQ inject end private final int maxMessageSize; @@ -450,6 +459,10 @@ public LogConfig(Map props, Set overriddenConfigs) { this.tableTopicCommitInterval = getLong(TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_CONFIG); this.tableTopicNamespace = getString(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG); this.tableTopicSchemaType = TableTopicSchemaType.forName(getString(TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_CONFIG)); + this.tableTopicIdColumns = getString(TopicConfig.TABLE_TOPIC_ID_COLUMNS_CONFIG); + this.tableTopicPartitionBy = getString(TopicConfig.TABLE_TOPIC_PARTITION_BY_CONFIG); + this.tableTopicUpsertEnable = getBoolean(TopicConfig.TABLE_TOPIC_UPSERT_ENABLE_CONFIG); + this.tableTopicCdcField = getString(TopicConfig.TABLE_TOPIC_CDC_FIELD_CONFIG); // AutoMQ inject end remoteLogConfig = new RemoteLogConfig(this);