Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(table): support partition & upsert config #2247

Merged
merged 1 commit into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,17 @@ public class TopicConfig {

public static final String REMOTE_LOG_STORAGE_ENABLE_CONFIG = "remote.storage.enable";
public static final String REMOTE_LOG_STORAGE_ENABLE_DOC = "To enable tiered storage for a topic, set this configuration as true. " +
"You can not disable this config once it is enabled. It will be provided in future versions.";
"You can not disable this config once it is enabled. It will be provided in future versions.";

public static final String LOCAL_LOG_RETENTION_MS_CONFIG = "local.retention.ms";
public static final String LOCAL_LOG_RETENTION_MS_DOC = "The number of milliseconds to keep the local log segment before it gets deleted. " +
"Default value is -2, it represents `retention.ms` value is to be used. The effective value should always be less than or equal " +
"to `retention.ms` value.";
"Default value is -2, it represents `retention.ms` value is to be used. The effective value should always be less than or equal " +
"to `retention.ms` value.";

public static final String LOCAL_LOG_RETENTION_BYTES_CONFIG = "local.retention.bytes";
public static final String LOCAL_LOG_RETENTION_BYTES_DOC = "The maximum size of local log segments that can grow for a partition before it " +
"deletes the old segments. Default value is -2, it represents `retention.bytes` value to be used. The effective value should always be " +
"less than or equal to `retention.bytes` value.";
"deletes the old segments. Default value is -2, it represents `retention.bytes` value to be used. The effective value should always be " +
"less than or equal to `retention.bytes` value.";

public static final String REMOTE_LOG_DISABLE_POLICY_RETAIN = "retain";
public static final String REMOTE_LOG_DISABLE_POLICY_DELETE = "delete";
Expand All @@ -103,16 +103,16 @@ public class TopicConfig {
"selected then all data in remote will be kept post-disablement and will only be deleted when it breaches expiration " +
"thresholds. If %s is selected then the data will be made inaccessible immediately by advancing the log start offset and will be " +
"deleted asynchronously.", REMOTE_LOG_DISABLE_POLICY_RETAIN, REMOTE_LOG_DISABLE_POLICY_DELETE,
REMOTE_LOG_DISABLE_POLICY_RETAIN, REMOTE_LOG_DISABLE_POLICY_DELETE);
REMOTE_LOG_DISABLE_POLICY_RETAIN, REMOTE_LOG_DISABLE_POLICY_DELETE);

public static final String MAX_MESSAGE_BYTES_CONFIG = "max.message.bytes";
public static final String MAX_MESSAGE_BYTES_DOC =
"The largest record batch size allowed by Kafka (after compression if compression is enabled). " +
"If this is increased and there are consumers older than 0.10.2, the consumers' fetch " +
"size must also be increased so that they can fetch record batches this large. " +
"In the latest message format version, records are always grouped into batches for efficiency. " +
"In previous message format versions, uncompressed records are not grouped into batches and this " +
"limit only applies to a single record in that case.";
"If this is increased and there are consumers older than 0.10.2, the consumers' fetch " +
"size must also be increased so that they can fetch record batches this large. " +
"In the latest message format version, records are always grouped into batches for efficiency. " +
"In previous message format versions, uncompressed records are not grouped into batches and this " +
"limit only applies to a single record in that case.";

public static final String INDEX_INTERVAL_BYTES_CONFIG = "index.interval.bytes";
public static final String INDEX_INTERVAL_BYTES_DOC = "This setting controls how frequently " +
Expand Down Expand Up @@ -266,6 +266,15 @@ public class TopicConfig {
public static final String TABLE_TOPIC_NAMESPACE_DOC = "The table topic table namespace";
public static final String TABLE_TOPIC_SCHEMA_TYPE_CONFIG = "automq.table.topic.schema.type";
public static final String TABLE_TOPIC_SCHEMA_TYPE_DOC = "The table topic schema type, support schemaless, schema";
public static final String TABLE_TOPIC_ID_COLUMNS_CONFIG = "automq.table.topic.id.columns";
public static final String TABLE_TOPIC_ID_COLUMNS_DOC = "The primary key, comma-separated list of columns that identify a row in tables."
+ "ex. [region, name]";
public static final String TABLE_TOPIC_PARTITION_BY_CONFIG = "automq.table.topic.partition.by";
public static final String TABLE_TOPIC_PARTITION_BY_DOC = "The partition fields of the table. ex. [bucket(name), month(timestamp)]";
public static final String TABLE_TOPIC_UPSERT_ENABLE_CONFIG = "automq.table.topic.upsert.enable";
public static final String TABLE_TOPIC_UPSERT_ENABLE_DOC = "The configuration controls whether enable table topic upsert";
public static final String TABLE_TOPIC_CDC_FIELD_CONFIG = "automq.table.topic.cdc.field";
public static final String TABLE_TOPIC_CDC_FIELD_DOC = "The name of the field containing the CDC operation, I, U, or D";
// AutoMQ inject end

}
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,11 @@ public Optional<String> serverConfigName(String configName) {
.define(TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_CONFIG, LONG, TimeUnit.MINUTES.toMillis(5), between(1, TimeUnit.MINUTES.toMillis(15)), MEDIUM, TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_DOC)
.define(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG, STRING, null, null, MEDIUM, TopicConfig.TABLE_TOPIC_NAMESPACE_DOC)
.define(TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_CONFIG, STRING, TableTopicSchemaType.SCHEMALESS.name, in(TableTopicSchemaType.names().toArray(new String[0])), MEDIUM, TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_DOC)
// TODO: add validator
.define(TopicConfig.TABLE_TOPIC_ID_COLUMNS_CONFIG, STRING, null, null, MEDIUM, TopicConfig.TABLE_TOPIC_ID_COLUMNS_DOC)
.define(TopicConfig.TABLE_TOPIC_PARTITION_BY_CONFIG, STRING, null, null, MEDIUM, TopicConfig.TABLE_TOPIC_PARTITION_BY_DOC)
.define(TopicConfig.TABLE_TOPIC_UPSERT_ENABLE_CONFIG, BOOLEAN, false, null, MEDIUM, TopicConfig.TABLE_TOPIC_UPSERT_ENABLE_DOC)
.define(TopicConfig.TABLE_TOPIC_CDC_FIELD_CONFIG, STRING, null, null, MEDIUM, TopicConfig.TABLE_TOPIC_CDC_FIELD_DOC)
// AutoMQ inject end
.define(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG, STRING, TopicConfig.REMOTE_LOG_DISABLE_POLICY_RETAIN,
in(TopicConfig.REMOTE_LOG_DISABLE_POLICY_RETAIN, TopicConfig.REMOTE_LOG_DISABLE_POLICY_DELETE),
Expand Down Expand Up @@ -393,6 +398,10 @@ public Optional<String> serverConfigName(String configName) {
public final long tableTopicCommitInterval;
public final String tableTopicNamespace;
public final TableTopicSchemaType tableTopicSchemaType;
public final String tableTopicIdColumns;
public final String tableTopicPartitionBy;
public final boolean tableTopicUpsertEnable;
public final String tableTopicCdcField;
// AutoMQ inject end

private final int maxMessageSize;
Expand Down Expand Up @@ -450,6 +459,10 @@ public LogConfig(Map<?, ?> props, Set<String> overriddenConfigs) {
this.tableTopicCommitInterval = getLong(TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_CONFIG);
this.tableTopicNamespace = getString(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG);
this.tableTopicSchemaType = TableTopicSchemaType.forName(getString(TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_CONFIG));
this.tableTopicIdColumns = getString(TopicConfig.TABLE_TOPIC_ID_COLUMNS_CONFIG);
this.tableTopicPartitionBy = getString(TopicConfig.TABLE_TOPIC_PARTITION_BY_CONFIG);
this.tableTopicUpsertEnable = getBoolean(TopicConfig.TABLE_TOPIC_UPSERT_ENABLE_CONFIG);
this.tableTopicCdcField = getString(TopicConfig.TABLE_TOPIC_CDC_FIELD_CONFIG);
// AutoMQ inject end

remoteLogConfig = new RemoteLogConfig(this);
Expand Down
Loading