Skip to content

Commit

Permalink
feat(table): support partition & upsert config
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx committed Jan 2, 2025
1 parent 9502ac3 commit 95d606f
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,37 +81,37 @@ public class TopicConfig {

public static final String REMOTE_LOG_STORAGE_ENABLE_CONFIG = "remote.storage.enable";
public static final String REMOTE_LOG_STORAGE_ENABLE_DOC = "To enable tiered storage for a topic, set this configuration as true. " +
"You can not disable this config once it is enabled. It will be provided in future versions.";
"You can not disable this config once it is enabled. It will be provided in future versions.";

public static final String LOCAL_LOG_RETENTION_MS_CONFIG = "local.retention.ms";
public static final String LOCAL_LOG_RETENTION_MS_DOC = "The number of milliseconds to keep the local log segment before it gets deleted. " +
"Default value is -2, it represents `retention.ms` value is to be used. The effective value should always be less than or equal " +
"to `retention.ms` value.";
"Default value is -2, it represents `retention.ms` value is to be used. The effective value should always be less than or equal " +
"to `retention.ms` value.";

public static final String LOCAL_LOG_RETENTION_BYTES_CONFIG = "local.retention.bytes";
public static final String LOCAL_LOG_RETENTION_BYTES_DOC = "The maximum size of local log segments that can grow for a partition before it " +
"deletes the old segments. Default value is -2, it represents `retention.bytes` value to be used. The effective value should always be " +
"less than or equal to `retention.bytes` value.";
"deletes the old segments. Default value is -2, it represents `retention.bytes` value to be used. The effective value should always be " +
"less than or equal to `retention.bytes` value.";

public static final String REMOTE_LOG_COPY_DISABLE_CONFIG = "remote.log.copy.disable";
public static final String REMOTE_LOG_COPY_DISABLE_DOC = "Determines whether tiered data for a topic should become read only," +
" and no more data uploading on a topic. Once this config is set to true, the local retention configuration " +
"(i.e. local.retention.ms/bytes) becomes irrelevant, and all data expiration follows the topic-wide retention configuration" +
"(i.e. retention.ms/bytes).";
" and no more data uploading on a topic. Once this config is set to true, the local retention configuration " +
"(i.e. local.retention.ms/bytes) becomes irrelevant, and all data expiration follows the topic-wide retention configuration" +
"(i.e. retention.ms/bytes).";

public static final String REMOTE_LOG_DELETE_ON_DISABLE_CONFIG = "remote.log.delete.on.disable";
public static final String REMOTE_LOG_DELETE_ON_DISABLE_DOC = "Determines whether tiered data for a topic should be " +
"deleted after tiered storage is disabled on a topic. This configuration should be enabled when trying to " +
"set `remote.storage.enable` from true to false";
"deleted after tiered storage is disabled on a topic. This configuration should be enabled when trying to " +
"set `remote.storage.enable` from true to false";

public static final String MAX_MESSAGE_BYTES_CONFIG = "max.message.bytes";
public static final String MAX_MESSAGE_BYTES_DOC =
"The largest record batch size allowed by Kafka (after compression if compression is enabled). " +
"If this is increased and there are consumers older than 0.10.2, the consumers' fetch " +
"size must also be increased so that they can fetch record batches this large. " +
"In the latest message format version, records are always grouped into batches for efficiency. " +
"In previous message format versions, uncompressed records are not grouped into batches and this " +
"limit only applies to a single record in that case.";
"If this is increased and there are consumers older than 0.10.2, the consumers' fetch " +
"size must also be increased so that they can fetch record batches this large. " +
"In the latest message format version, records are always grouped into batches for efficiency. " +
"In previous message format versions, uncompressed records are not grouped into batches and this " +
"limit only applies to a single record in that case.";

public static final String INDEX_INTERVAL_BYTES_CONFIG = "index.interval.bytes";
public static final String INDEX_INTERVAL_BYTES_DOC = "This setting controls how frequently " +
Expand Down Expand Up @@ -167,7 +167,7 @@ public class TopicConfig {
"not in the ISR set to be elected as leader as a last resort, even though doing so may result in data " +
"loss.<p>Note: In KRaft mode, when enabling this config dynamically, it needs to wait for the unclean leader election" +
"thread to trigger election periodically (default is 5 minutes). Please run `kafka-leader-election.sh` with `unclean` option " +
"to trigger the unclean leader election immediately if needed.</p>";
"to trigger the unclean leader election immediately if needed.</p>";

public static final String MIN_IN_SYNC_REPLICAS_CONFIG = "min.insync.replicas";
public static final String MIN_IN_SYNC_REPLICAS_DOC = "When a producer sets acks to \"all\" (or \"-1\"), " +
Expand Down Expand Up @@ -267,6 +267,15 @@ public class TopicConfig {
public static final String TABLE_TOPIC_NAMESPACE_DOC = "The table topic table namespace";
public static final String TABLE_TOPIC_SCHEMA_TYPE_CONFIG = "automq.table.topic.schema.type";
public static final String TABLE_TOPIC_SCHEMA_TYPE_DOC = "The table topic schema type, support schemaless, schema";
public static final String TABLE_TOPIC_ID_COLUMNS_CONFIG = "automq.table.topic.id.columns";
public static final String TABLE_TOPIC_ID_COLUMNS_DOC = "The primary key, comma-separated list of columns that identify a row in tables."
+ "ex. [region, name]";
public static final String TABLE_TOPIC_PARTITION_BY_CONFIG = "automq.table.topic.partition.by";
public static final String TABLE_TOPIC_PARTITION_BY_DOC = "The partition fields of the table. ex. [bucket(name), month(timestamp)]";
public static final String TABLE_TOPIC_UPSERT_ENABLE_CONFIG = "automq.table.topic.upsert.enable";
public static final String TABLE_TOPIC_UPSERT_ENABLE_DOC = "The configuration controls whether enable table topic upsert";
public static final String TABLE_TOPIC_CDC_FIELD_CONFIG = "automq.table.topic.cdc.field";
public static final String TABLE_TOPIC_CDC_FIELD_DOC = "The name of the field containing the CDC operation, I, U, or D";
// AutoMQ inject end

}
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,11 @@ public Optional<String> serverConfigName(String configName) {
.define(TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_CONFIG, LONG, TimeUnit.MINUTES.toMillis(5), between(1, TimeUnit.MINUTES.toMillis(15)), MEDIUM, TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_DOC)
.define(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG, STRING, null, null, MEDIUM, TopicConfig.TABLE_TOPIC_NAMESPACE_DOC)
.define(TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_CONFIG, STRING, TableTopicSchemaType.SCHEMALESS.name, in(TableTopicSchemaType.names().toArray(new String[0])), MEDIUM, TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_DOC)
// TODO: add validator
.define(TopicConfig.TABLE_TOPIC_ID_COLUMNS_CONFIG, STRING, null, null, MEDIUM, TopicConfig.TABLE_TOPIC_ID_COLUMNS_DOC)
.define(TopicConfig.TABLE_TOPIC_PARTITION_BY_CONFIG, STRING, null, null, MEDIUM, TopicConfig.TABLE_TOPIC_PARTITION_BY_DOC)
.define(TopicConfig.TABLE_TOPIC_UPSERT_ENABLE_CONFIG, BOOLEAN, false, null, MEDIUM, TopicConfig.TABLE_TOPIC_UPSERT_ENABLE_DOC)
.define(TopicConfig.TABLE_TOPIC_CDC_FIELD_CONFIG, STRING, null, null, MEDIUM, TopicConfig.TABLE_TOPIC_CDC_FIELD_DOC)
// AutoMQ inject end
.define(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, BOOLEAN, false, MEDIUM, TopicConfig.REMOTE_LOG_COPY_DISABLE_DOC)
.define(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, BOOLEAN, false, MEDIUM, TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_DOC);
Expand Down Expand Up @@ -396,6 +401,10 @@ public Optional<String> serverConfigName(String configName) {
public final long tableTopicCommitInterval;
public final String tableTopicNamespace;
public final TableTopicSchemaType tableTopicSchemaType;
public final String tableTopicIdColumns;
public final String tableTopicPartitionBy;
public final boolean tableTopicUpsertEnable;
public final String tableTopicCdcField;
// AutoMQ inject end

private final int maxMessageSize;
Expand Down Expand Up @@ -453,6 +462,10 @@ public LogConfig(Map<?, ?> props, Set<String> overriddenConfigs) {
this.tableTopicCommitInterval = getLong(TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_CONFIG);
this.tableTopicNamespace = getString(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG);
this.tableTopicSchemaType = TableTopicSchemaType.forName(getString(TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_CONFIG));
this.tableTopicIdColumns = getString(TopicConfig.TABLE_TOPIC_ID_COLUMNS_CONFIG);
this.tableTopicPartitionBy = getString(TopicConfig.TABLE_TOPIC_PARTITION_BY_CONFIG);
this.tableTopicUpsertEnable = getBoolean(TopicConfig.TABLE_TOPIC_UPSERT_ENABLE_CONFIG);
this.tableTopicCdcField = getString(TopicConfig.TABLE_TOPIC_CDC_FIELD_CONFIG);
// AutoMQ inject end

remoteLogConfig = new RemoteLogConfig(this);
Expand Down

0 comments on commit 95d606f

Please sign in to comment.