From af4dd1faaf3294571de8afb4c0db4d340f15213b Mon Sep 17 00:00:00 2001 From: Gezi-lzq Date: Tue, 3 Dec 2024 18:57:56 +0800 Subject: [PATCH] feat(config): add table topic conversion type configuration (#2203) * feat(config): add table topic conversion type configurations * feat(config): rename table topic type to schema type and update related configurations * feat(config): add table topic schema registry URL configuration and validation * test(config): add unit tests for ControllerConfigurationValidator table topic schema configuration * fix(tests): update exception type in ControllerConfigurationValidatorTableTest for schema validation * feat(config): polish code --- .../kafka/common/config/TopicConfig.java | 2 + .../main/java/kafka/automq/AutoMQConfig.java | 7 ++- .../ControllerConfigurationValidator.scala | 11 ++-- .../main/scala/kafka/server/KafkaConfig.scala | 4 ++ ...ollerConfigurationValidatorTableTest.scala | 51 +++++++++++++++++++ ...ControllerConfigurationValidatorTest.scala | 26 +++++++++- .../config/ServerTopicConfigSynonyms.java | 1 + .../server/record/TableTopicSchemaType.java | 41 +++++++++++++++ .../storage/internals/log/LogConfig.java | 14 +++++ 9 files changed, 152 insertions(+), 5 deletions(-) create mode 100644 core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTableTest.scala create mode 100644 server-common/src/main/java/org/apache/kafka/server/record/TableTopicSchemaType.java diff --git a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java index 727f1f489f..440f2cbced 100755 --- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java @@ -264,6 +264,8 @@ public class TopicConfig { public static final String TABLE_TOPIC_COMMIT_INTERVAL_DOC = "The table topic commit interval(ms)"; public static final String TABLE_TOPIC_NAMESPACE_CONFIG = "automq.table.topic.namespace"; public static final String TABLE_TOPIC_NAMESPACE_DOC = "The table topic table namespace"; + public static final String TABLE_TOPIC_SCHEMA_TYPE_CONFIG = "automq.table.topic.schema.type"; + public static final String TABLE_TOPIC_SCHEMA_TYPE_DOC = "The table topic schema type, support schemaless, schema"; // AutoMQ inject end } diff --git a/core/src/main/java/kafka/automq/AutoMQConfig.java b/core/src/main/java/kafka/automq/AutoMQConfig.java index aa9a7e5dad..a9d02881ba 100644 --- a/core/src/main/java/kafka/automq/AutoMQConfig.java +++ b/core/src/main/java/kafka/automq/AutoMQConfig.java @@ -230,6 +230,10 @@ public class AutoMQConfig { public static final String S3_TELEMETRY_OPS_ENABLED_CONFIG = "s3.telemetry.ops.enabled"; public static final String S3_TELEMETRY_OPS_ENABLED_DOC = "[DEPRECATED] use s3.telemetry.metrics.uri instead."; + + public static final String TABLE_TOPIC_SCHEMA_REGISTRY_URL_CONFIG = "automq.table.topic.schema.registry.url"; + private static final String TABLE_TOPIC_SCHEMA_REGISTRY_URL_DOC = "The schema registry url for table topic"; + // Deprecated config end public static void define(ConfigDef configDef) { @@ -282,7 +286,8 @@ public static void define(ConfigDef configDef) { .define(AutoMQConfig.S3_TELEMETRY_EXPORTER_OTLP_PROTOCOL_CONFIG, STRING, S3_EXPORTER_OTLPPROTOCOL, MEDIUM, AutoMQConfig.S3_TELEMETRY_EXPORTER_OTLP_PROTOCOL_DOC) .define(AutoMQConfig.S3_TELEMETRY_EXPORTER_OTLP_COMPRESSION_ENABLE_CONFIG, BOOLEAN, false, MEDIUM, AutoMQConfig.S3_TELEMETRY_EXPORTER_OTLP_COMPRESSION_ENABLE_DOC) .define(AutoMQConfig.S3_METRICS_EXPORTER_PROM_HOST_CONFIG, STRING, "localhost", MEDIUM, AutoMQConfig.S3_METRICS_EXPORTER_PROM_HOST_DOC) - .define(AutoMQConfig.S3_METRICS_EXPORTER_PROM_PORT_CONFIG, INT, 9090, MEDIUM, AutoMQConfig.S3_METRICS_EXPORTER_PROM_PORT_DOC); + .define(AutoMQConfig.S3_METRICS_EXPORTER_PROM_PORT_CONFIG, INT, 9090, MEDIUM, AutoMQConfig.S3_METRICS_EXPORTER_PROM_PORT_DOC) + .define(AutoMQConfig.TABLE_TOPIC_SCHEMA_REGISTRY_URL_CONFIG, STRING, null, MEDIUM, AutoMQConfig.TABLE_TOPIC_SCHEMA_REGISTRY_URL_DOC); } private List dataBuckets; diff --git a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala index b99065b573..bff152695f 100644 --- a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala +++ b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala @@ -17,16 +17,16 @@ package kafka.server -import java.util -import java.util.Properties import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, CLIENT_METRICS, TOPIC} -import org.apache.kafka.controller.ConfigurationValidator import org.apache.kafka.common.errors.{InvalidConfigurationException, InvalidRequestException} import org.apache.kafka.common.internals.Topic +import org.apache.kafka.controller.ConfigurationValidator import org.apache.kafka.server.metrics.ClientMetricsConfigs import org.apache.kafka.storage.internals.log.LogConfig +import java.util +import java.util.Properties import scala.collection.mutable /** @@ -109,6 +109,11 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu } LogConfig.validate(properties, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) + + // AutoMQ inject start + LogConfig.validateTableTopicSchemaConfigValues(properties, kafkaConfig.tableTopicSchemaRegistryUrl) + // AutoMQ inject end + case BROKER => validateBrokerName(resource.name()) case CLIENT_METRICS => val properties = new Properties() diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index c8d60190a6..1860a597a2 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -795,6 +795,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) val tableTopicNamespace = getString(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG) val s3BackPressureEnabled = getBoolean(AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG) val s3BackPressureCooldownMs = getLong(AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG) + val tableTopicSchemaRegistryUrl = getString(AutoMQConfig.TABLE_TOPIC_SCHEMA_REGISTRY_URL_CONFIG); // AutoMQ inject end /** Internal Configurations **/ @@ -1239,6 +1240,9 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) if (tableTopicNamespace != null) { logProps.put(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG, tableTopicNamespace) } + if (tableTopicSchemaRegistryUrl != null) { + logProps.put(AutoMQConfig.TABLE_TOPIC_SCHEMA_REGISTRY_URL_CONFIG, tableTopicSchemaRegistryUrl) + } // AutoMQ inject end logProps diff --git a/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTableTest.scala b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTableTest.scala new file mode 100644 index 0000000000..3204ceb91e --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTableTest.scala @@ -0,0 +1,51 @@ +/* + * Copyright 2024, AutoMQ HK Limited. + * + * The use of this file is governed by the Business Source License, + * as detailed in the file "/LICENSE.S3Stream" included in this repository. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +import kafka.automq.AutoMQConfig +import kafka.server.{ControllerConfigurationValidator, KafkaConfig} +import kafka.utils.TestUtils +import org.apache.kafka.common.config.ConfigResource +import org.apache.kafka.common.config.ConfigResource.Type.TOPIC +import org.apache.kafka.common.config.TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_CONFIG +import org.apache.kafka.common.errors.InvalidConfigurationException +import org.apache.kafka.server.record.TableTopicSchemaType +import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows} +import org.junit.jupiter.api.{Tag, Test} + +import java.util + +@Tag("S3Unit") +class ControllerConfigurationValidatorTableTest { + val config = new KafkaConfig(TestUtils.createDummyBrokerConfig()) + val validator = new ControllerConfigurationValidator(config) + + @Test + def testInvalidTableTopicSchemaConfig(): Unit = { + val config = new util.TreeMap[String, String]() + config.put(TABLE_TOPIC_SCHEMA_TYPE_CONFIG, TableTopicSchemaType.SCHEMA.name) + + // Test without schema registry URL configured + val exception = assertThrows(classOf[InvalidConfigurationException], () => { + validator.validate(new ConfigResource(TOPIC, "foo"), config) + }) + assertEquals("Table topic schema type is set to SCHEMA but schema registry URL is not configured", exception.getMessage) + + // Test with schema registry URL configured + val brokerConfigWithSchemaRegistry = TestUtils.createDummyBrokerConfig() + brokerConfigWithSchemaRegistry.put(AutoMQConfig.TABLE_TOPIC_SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081") + + val kafkaConfigWithSchemaRegistry = new KafkaConfig(brokerConfigWithSchemaRegistry) + val validatorWithSchemaRegistry = new ControllerConfigurationValidator(kafkaConfigWithSchemaRegistry) + + // No exception should be thrown when schema registry URL is configured properly + validatorWithSchemaRegistry.validate(new ConfigResource(TOPIC, "foo"), config) + } +} diff --git a/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala index 00bb93811b..29ea7ec58a 100644 --- a/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala @@ -17,12 +17,14 @@ package kafka.server +import kafka.automq.AutoMQConfig import kafka.utils.TestUtils import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, BROKER_LOGGER, CLIENT_METRICS, TOPIC} -import org.apache.kafka.common.config.TopicConfig.{SEGMENT_BYTES_CONFIG, SEGMENT_JITTER_MS_CONFIG, SEGMENT_MS_CONFIG} +import org.apache.kafka.common.config.TopicConfig.{SEGMENT_BYTES_CONFIG, SEGMENT_JITTER_MS_CONFIG, SEGMENT_MS_CONFIG, TABLE_TOPIC_SCHEMA_TYPE_CONFIG} import org.apache.kafka.common.errors.{InvalidConfigurationException, InvalidRequestException, InvalidTopicException} import org.apache.kafka.server.metrics.ClientMetricsConfigs +import org.apache.kafka.server.record.TableTopicSchemaType import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows} import org.junit.jupiter.api.Test @@ -153,4 +155,26 @@ class ControllerConfigurationValidatorTest { assertThrows(classOf[InvalidConfigurationException], () => validator.validate( new ConfigResource(CLIENT_METRICS, "subscription-1"), config)). getMessage) } + + @Test + def testInvalidTableTopicSchemaConfig(): Unit = { + val config = new util.TreeMap[String, String]() + config.put(TABLE_TOPIC_SCHEMA_TYPE_CONFIG, TableTopicSchemaType.SCHEMA.name) + + // Test without schema registry URL configured + val exception = assertThrows(classOf[InvalidRequestException], () => { + validator.validate(new ConfigResource(TOPIC, "foo"), config) + }) + assertEquals("Table topic schema type is set to SCHEMA but schema registry URL is not configured", exception.getMessage) + + // Test with schema registry URL configured + val brokerConfigWithSchemaRegistry = TestUtils.createDummyBrokerConfig() + brokerConfigWithSchemaRegistry.put(AutoMQConfig.TABLE_TOPIC_SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081") + + val kafkaConfigWithSchemaRegistry = new KafkaConfig(brokerConfigWithSchemaRegistry) + val validatorWithSchemaRegistry = new ControllerConfigurationValidator(kafkaConfigWithSchemaRegistry) + + // No exception should be thrown when schema registry URL is configured properly + validatorWithSchemaRegistry.validate(new ConfigResource(TOPIC, "foo"), config) + } } diff --git a/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java b/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java index 33a802cf29..fea99d8dd9 100644 --- a/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java +++ b/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java @@ -88,6 +88,7 @@ public final class ServerTopicConfigSynonyms { sameName(TopicConfig.TABLE_TOPIC_ENABLE_CONFIG), sameName(TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_CONFIG), sameName(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG), + sameName(TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_CONFIG), // AutoMQ inject end sameNameWithLogPrefix(TopicConfig.PREALLOCATE_CONFIG), diff --git a/server-common/src/main/java/org/apache/kafka/server/record/TableTopicSchemaType.java b/server-common/src/main/java/org/apache/kafka/server/record/TableTopicSchemaType.java new file mode 100644 index 0000000000..6683694233 --- /dev/null +++ b/server-common/src/main/java/org/apache/kafka/server/record/TableTopicSchemaType.java @@ -0,0 +1,41 @@ +/* + * Copyright 2024, AutoMQ HK Limited. + * + * The use of this file is governed by the Business Source License, + * as detailed in the file "/LICENSE.S3Stream" included in this repository. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package org.apache.kafka.server.record; + +import java.util.List; +import java.util.Locale; +import java.util.stream.Collectors; + +import static java.util.Arrays.asList; + +public enum TableTopicSchemaType { + SCHEMALESS("schemaless"), + SCHEMA("schema"); + + public final String name; + private static final List VALUES = asList(values()); + + TableTopicSchemaType(String name) { + this.name = name; + } + + public static List names() { + return VALUES.stream().map(v -> v.name).collect(Collectors.toList()); + } + + public static TableTopicSchemaType forName(String n) { + String name = n.toLowerCase(Locale.ROOT); + return VALUES.stream().filter(v -> v.name.equals(name)).findFirst().orElseThrow(() -> + new IllegalArgumentException("Unknown table topic type name: " + name) + ); + } +} diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java index 8542576876..df4a4b39ac 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java @@ -40,6 +40,7 @@ import org.apache.kafka.server.config.ServerLogConfigs; import org.apache.kafka.server.config.ServerTopicConfigSynonyms; import org.apache.kafka.server.record.BrokerCompressionType; +import org.apache.kafka.server.record.TableTopicSchemaType; import java.util.Collections; import java.util.HashMap; @@ -336,6 +337,7 @@ public Optional serverConfigName(String configName) { .define(TopicConfig.TABLE_TOPIC_ENABLE_CONFIG, BOOLEAN, false, null, MEDIUM, TopicConfig.TABLE_TOPIC_ENABLE_DOC) .define(TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_CONFIG, LONG, TimeUnit.MINUTES.toMillis(5), between(1, TimeUnit.MINUTES.toMillis(15)), MEDIUM, TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_DOC) .define(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG, STRING, null, null, MEDIUM, TopicConfig.TABLE_TOPIC_NAMESPACE_DOC) + .define(TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_CONFIG, STRING, TableTopicSchemaType.SCHEMALESS.name, in(TableTopicSchemaType.names().toArray(new String[0])), MEDIUM, TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_DOC) // AutoMQ inject end .define(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG, STRING, TopicConfig.REMOTE_LOG_DISABLE_POLICY_RETAIN, in(TopicConfig.REMOTE_LOG_DISABLE_POLICY_RETAIN, TopicConfig.REMOTE_LOG_DISABLE_POLICY_DELETE), @@ -390,6 +392,7 @@ public Optional serverConfigName(String configName) { public final boolean tableTopicEnable; public final long tableTopicCommitInterval; public final String tableTopicNamespace; + public final TableTopicSchemaType tableTopicSchemaType; // AutoMQ inject end private final int maxMessageSize; @@ -446,6 +449,7 @@ public LogConfig(Map props, Set overriddenConfigs) { this.tableTopicEnable = getBoolean(TopicConfig.TABLE_TOPIC_ENABLE_CONFIG); this.tableTopicCommitInterval = getLong(TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_CONFIG); this.tableTopicNamespace = getString(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG); + this.tableTopicSchemaType = TableTopicSchemaType.forName(getString(TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_CONFIG)); // AutoMQ inject end remoteLogConfig = new RemoteLogConfig(this); @@ -723,6 +727,16 @@ public static void validate(Properties props, } } + // AutoMQ inject start + public static void validateTableTopicSchemaConfigValues(Properties props, String tableTopicSchemaRegistryUrl) { + String schemaType = props.getProperty(TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_CONFIG); + if (TableTopicSchemaType.SCHEMA.name.equals(schemaType) && tableTopicSchemaRegistryUrl == null) { + throw new InvalidConfigurationException("Table topic schema type is set to SCHEMA but schema registry URL is not configured"); + } + } + + // AutoMQ inject end + @Override public String toString() { return "LogConfig{" +