Skip to content

Commit

Permalink
feat(config): add table topic conversion type configuration (#2203)
Browse files Browse the repository at this point in the history
* feat(config): add table topic conversion type configurations

* feat(config): rename table topic type to schema type and update related configurations

* feat(config): add table topic schema registry URL configuration and validation

* test(config): add unit tests for ControllerConfigurationValidator table topic schema configuration

* fix(tests): update exception type in ControllerConfigurationValidatorTableTest for schema validation

* feat(config): polish code
  • Loading branch information
Gezi-lzq authored Dec 3, 2024
1 parent 89ccf10 commit af4dd1f
Show file tree
Hide file tree
Showing 9 changed files with 152 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ public class TopicConfig {
public static final String TABLE_TOPIC_COMMIT_INTERVAL_DOC = "The table topic commit interval(ms)";
public static final String TABLE_TOPIC_NAMESPACE_CONFIG = "automq.table.topic.namespace";
public static final String TABLE_TOPIC_NAMESPACE_DOC = "The table topic table namespace";
public static final String TABLE_TOPIC_SCHEMA_TYPE_CONFIG = "automq.table.topic.schema.type";
public static final String TABLE_TOPIC_SCHEMA_TYPE_DOC = "The table topic schema type, support schemaless, schema";
// AutoMQ inject end

}
7 changes: 6 additions & 1 deletion core/src/main/java/kafka/automq/AutoMQConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,10 @@ public class AutoMQConfig {

public static final String S3_TELEMETRY_OPS_ENABLED_CONFIG = "s3.telemetry.ops.enabled";
public static final String S3_TELEMETRY_OPS_ENABLED_DOC = "[DEPRECATED] use s3.telemetry.metrics.uri instead.";

public static final String TABLE_TOPIC_SCHEMA_REGISTRY_URL_CONFIG = "automq.table.topic.schema.registry.url";
private static final String TABLE_TOPIC_SCHEMA_REGISTRY_URL_DOC = "The schema registry url for table topic";

// Deprecated config end

public static void define(ConfigDef configDef) {
Expand Down Expand Up @@ -282,7 +286,8 @@ public static void define(ConfigDef configDef) {
.define(AutoMQConfig.S3_TELEMETRY_EXPORTER_OTLP_PROTOCOL_CONFIG, STRING, S3_EXPORTER_OTLPPROTOCOL, MEDIUM, AutoMQConfig.S3_TELEMETRY_EXPORTER_OTLP_PROTOCOL_DOC)
.define(AutoMQConfig.S3_TELEMETRY_EXPORTER_OTLP_COMPRESSION_ENABLE_CONFIG, BOOLEAN, false, MEDIUM, AutoMQConfig.S3_TELEMETRY_EXPORTER_OTLP_COMPRESSION_ENABLE_DOC)
.define(AutoMQConfig.S3_METRICS_EXPORTER_PROM_HOST_CONFIG, STRING, "localhost", MEDIUM, AutoMQConfig.S3_METRICS_EXPORTER_PROM_HOST_DOC)
.define(AutoMQConfig.S3_METRICS_EXPORTER_PROM_PORT_CONFIG, INT, 9090, MEDIUM, AutoMQConfig.S3_METRICS_EXPORTER_PROM_PORT_DOC);
.define(AutoMQConfig.S3_METRICS_EXPORTER_PROM_PORT_CONFIG, INT, 9090, MEDIUM, AutoMQConfig.S3_METRICS_EXPORTER_PROM_PORT_DOC)
.define(AutoMQConfig.TABLE_TOPIC_SCHEMA_REGISTRY_URL_CONFIG, STRING, null, MEDIUM, AutoMQConfig.TABLE_TOPIC_SCHEMA_REGISTRY_URL_DOC);
}

private List<BucketURI> dataBuckets;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@

package kafka.server

import java.util
import java.util.Properties
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, CLIENT_METRICS, TOPIC}
import org.apache.kafka.controller.ConfigurationValidator
import org.apache.kafka.common.errors.{InvalidConfigurationException, InvalidRequestException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.controller.ConfigurationValidator
import org.apache.kafka.server.metrics.ClientMetricsConfigs
import org.apache.kafka.storage.internals.log.LogConfig

import java.util
import java.util.Properties
import scala.collection.mutable

/**
Expand Down Expand Up @@ -109,6 +109,11 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu
}
LogConfig.validate(properties, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())

// AutoMQ inject start
LogConfig.validateTableTopicSchemaConfigValues(properties, kafkaConfig.tableTopicSchemaRegistryUrl)
// AutoMQ inject end

case BROKER => validateBrokerName(resource.name())
case CLIENT_METRICS =>
val properties = new Properties()
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
val tableTopicNamespace = getString(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG)
val s3BackPressureEnabled = getBoolean(AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG)
val s3BackPressureCooldownMs = getLong(AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG)
val tableTopicSchemaRegistryUrl = getString(AutoMQConfig.TABLE_TOPIC_SCHEMA_REGISTRY_URL_CONFIG);
// AutoMQ inject end

/** Internal Configurations **/
Expand Down Expand Up @@ -1239,6 +1240,9 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
if (tableTopicNamespace != null) {
logProps.put(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG, tableTopicNamespace)
}
if (tableTopicSchemaRegistryUrl != null) {
logProps.put(AutoMQConfig.TABLE_TOPIC_SCHEMA_REGISTRY_URL_CONFIG, tableTopicSchemaRegistryUrl)
}
// AutoMQ inject end

logProps
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2024, AutoMQ HK Limited.
*
* The use of this file is governed by the Business Source License,
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

import kafka.automq.AutoMQConfig
import kafka.server.{ControllerConfigurationValidator, KafkaConfig}
import kafka.utils.TestUtils
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type.TOPIC
import org.apache.kafka.common.config.TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_CONFIG
import org.apache.kafka.common.errors.InvalidConfigurationException
import org.apache.kafka.server.record.TableTopicSchemaType
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
import org.junit.jupiter.api.{Tag, Test}

import java.util

@Tag("S3Unit")
class ControllerConfigurationValidatorTableTest {
val config = new KafkaConfig(TestUtils.createDummyBrokerConfig())
val validator = new ControllerConfigurationValidator(config)

@Test
def testInvalidTableTopicSchemaConfig(): Unit = {
val config = new util.TreeMap[String, String]()
config.put(TABLE_TOPIC_SCHEMA_TYPE_CONFIG, TableTopicSchemaType.SCHEMA.name)

// Test without schema registry URL configured
val exception = assertThrows(classOf[InvalidConfigurationException], () => {
validator.validate(new ConfigResource(TOPIC, "foo"), config)
})
assertEquals("Table topic schema type is set to SCHEMA but schema registry URL is not configured", exception.getMessage)

// Test with schema registry URL configured
val brokerConfigWithSchemaRegistry = TestUtils.createDummyBrokerConfig()
brokerConfigWithSchemaRegistry.put(AutoMQConfig.TABLE_TOPIC_SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081")

val kafkaConfigWithSchemaRegistry = new KafkaConfig(brokerConfigWithSchemaRegistry)
val validatorWithSchemaRegistry = new ControllerConfigurationValidator(kafkaConfigWithSchemaRegistry)

// No exception should be thrown when schema registry URL is configured properly
validatorWithSchemaRegistry.validate(new ConfigResource(TOPIC, "foo"), config)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

package kafka.server

import kafka.automq.AutoMQConfig
import kafka.utils.TestUtils
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, BROKER_LOGGER, CLIENT_METRICS, TOPIC}
import org.apache.kafka.common.config.TopicConfig.{SEGMENT_BYTES_CONFIG, SEGMENT_JITTER_MS_CONFIG, SEGMENT_MS_CONFIG}
import org.apache.kafka.common.config.TopicConfig.{SEGMENT_BYTES_CONFIG, SEGMENT_JITTER_MS_CONFIG, SEGMENT_MS_CONFIG, TABLE_TOPIC_SCHEMA_TYPE_CONFIG}
import org.apache.kafka.common.errors.{InvalidConfigurationException, InvalidRequestException, InvalidTopicException}
import org.apache.kafka.server.metrics.ClientMetricsConfigs
import org.apache.kafka.server.record.TableTopicSchemaType
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
import org.junit.jupiter.api.Test

Expand Down Expand Up @@ -153,4 +155,26 @@ class ControllerConfigurationValidatorTest {
assertThrows(classOf[InvalidConfigurationException], () => validator.validate(
new ConfigResource(CLIENT_METRICS, "subscription-1"), config)). getMessage)
}

@Test
def testInvalidTableTopicSchemaConfig(): Unit = {
val config = new util.TreeMap[String, String]()
config.put(TABLE_TOPIC_SCHEMA_TYPE_CONFIG, TableTopicSchemaType.SCHEMA.name)

// Test without schema registry URL configured
val exception = assertThrows(classOf[InvalidRequestException], () => {
validator.validate(new ConfigResource(TOPIC, "foo"), config)
})
assertEquals("Table topic schema type is set to SCHEMA but schema registry URL is not configured", exception.getMessage)

// Test with schema registry URL configured
val brokerConfigWithSchemaRegistry = TestUtils.createDummyBrokerConfig()
brokerConfigWithSchemaRegistry.put(AutoMQConfig.TABLE_TOPIC_SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081")

val kafkaConfigWithSchemaRegistry = new KafkaConfig(brokerConfigWithSchemaRegistry)
val validatorWithSchemaRegistry = new ControllerConfigurationValidator(kafkaConfigWithSchemaRegistry)

// No exception should be thrown when schema registry URL is configured properly
validatorWithSchemaRegistry.validate(new ConfigResource(TOPIC, "foo"), config)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public final class ServerTopicConfigSynonyms {
sameName(TopicConfig.TABLE_TOPIC_ENABLE_CONFIG),
sameName(TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_CONFIG),
sameName(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG),
sameName(TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_CONFIG),
// AutoMQ inject end

sameNameWithLogPrefix(TopicConfig.PREALLOCATE_CONFIG),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2024, AutoMQ HK Limited.
*
* The use of this file is governed by the Business Source License,
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package org.apache.kafka.server.record;

import java.util.List;
import java.util.Locale;
import java.util.stream.Collectors;

import static java.util.Arrays.asList;

public enum TableTopicSchemaType {
SCHEMALESS("schemaless"),
SCHEMA("schema");

public final String name;
private static final List<TableTopicSchemaType> VALUES = asList(values());

TableTopicSchemaType(String name) {
this.name = name;
}

public static List<String> names() {
return VALUES.stream().map(v -> v.name).collect(Collectors.toList());
}

public static TableTopicSchemaType forName(String n) {
String name = n.toLowerCase(Locale.ROOT);
return VALUES.stream().filter(v -> v.name.equals(name)).findFirst().orElseThrow(() ->
new IllegalArgumentException("Unknown table topic type name: " + name)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.kafka.server.config.ServerLogConfigs;
import org.apache.kafka.server.config.ServerTopicConfigSynonyms;
import org.apache.kafka.server.record.BrokerCompressionType;
import org.apache.kafka.server.record.TableTopicSchemaType;

import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -336,6 +337,7 @@ public Optional<String> serverConfigName(String configName) {
.define(TopicConfig.TABLE_TOPIC_ENABLE_CONFIG, BOOLEAN, false, null, MEDIUM, TopicConfig.TABLE_TOPIC_ENABLE_DOC)
.define(TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_CONFIG, LONG, TimeUnit.MINUTES.toMillis(5), between(1, TimeUnit.MINUTES.toMillis(15)), MEDIUM, TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_DOC)
.define(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG, STRING, null, null, MEDIUM, TopicConfig.TABLE_TOPIC_NAMESPACE_DOC)
.define(TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_CONFIG, STRING, TableTopicSchemaType.SCHEMALESS.name, in(TableTopicSchemaType.names().toArray(new String[0])), MEDIUM, TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_DOC)
// AutoMQ inject end
.define(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG, STRING, TopicConfig.REMOTE_LOG_DISABLE_POLICY_RETAIN,
in(TopicConfig.REMOTE_LOG_DISABLE_POLICY_RETAIN, TopicConfig.REMOTE_LOG_DISABLE_POLICY_DELETE),
Expand Down Expand Up @@ -390,6 +392,7 @@ public Optional<String> serverConfigName(String configName) {
public final boolean tableTopicEnable;
public final long tableTopicCommitInterval;
public final String tableTopicNamespace;
public final TableTopicSchemaType tableTopicSchemaType;
// AutoMQ inject end

private final int maxMessageSize;
Expand Down Expand Up @@ -446,6 +449,7 @@ public LogConfig(Map<?, ?> props, Set<String> overriddenConfigs) {
this.tableTopicEnable = getBoolean(TopicConfig.TABLE_TOPIC_ENABLE_CONFIG);
this.tableTopicCommitInterval = getLong(TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_CONFIG);
this.tableTopicNamespace = getString(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG);
this.tableTopicSchemaType = TableTopicSchemaType.forName(getString(TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_CONFIG));
// AutoMQ inject end

remoteLogConfig = new RemoteLogConfig(this);
Expand Down Expand Up @@ -723,6 +727,16 @@ public static void validate(Properties props,
}
}

// AutoMQ inject start
public static void validateTableTopicSchemaConfigValues(Properties props, String tableTopicSchemaRegistryUrl) {
String schemaType = props.getProperty(TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_CONFIG);
if (TableTopicSchemaType.SCHEMA.name.equals(schemaType) && tableTopicSchemaRegistryUrl == null) {
throw new InvalidConfigurationException("Table topic schema type is set to SCHEMA but schema registry URL is not configured");
}
}

// AutoMQ inject end

@Override
public String toString() {
return "LogConfig{" +
Expand Down

0 comments on commit af4dd1f

Please sign in to comment.