Skip to content

Commit

Permalink
Config metadata (strimzi#2007)
Browse files Browse the repository at this point in the history
* Validate user-supplied Kafka configs

* Generate a model of Kafka config parameters at build time for each
  Kafka version in kafka-versions file
* Consume this in the KafkaConfiguration to provide validation according
  to the contraints in the model.
* Remove doc about spec.kafka.config validation

Signed-off-by: Tom Bentley <[email protected]>
  • Loading branch information
tombentley authored Sep 26, 2019
1 parent 3250f21 commit 2bd5919
Show file tree
Hide file tree
Showing 22 changed files with 4,347 additions and 7 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ ifneq ($(RELEASE_VERSION),latest)
GITHUB_VERSION = $(RELEASE_VERSION)
endif

SUBDIRS=kafka-agent mirror-maker-agent tracing-agent crd-annotations test crd-generator api mockkube certificate-manager operator-common cluster-operator topic-operator user-operator kafka-init docker-images helm-charts install examples metrics
SUBDIRS=kafka-agent mirror-maker-agent tracing-agent crd-annotations test crd-generator api mockkube certificate-manager operator-common config-model config-model-generator cluster-operator topic-operator user-operator kafka-init docker-images helm-charts install examples metrics
DOCKER_TARGETS=docker_build docker_push docker_tag

all: $(SUBDIRS)
Expand Down
4 changes: 4 additions & 0 deletions cluster-operator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
<groupId>io.strimzi</groupId>
<artifactId>api</artifactId>
</dependency>
<dependency>
<groupId>io.strimzi</groupId>
<artifactId>config-model</artifactId>
</dependency>
<dependency>
<groupId>io.strimzi</groupId>
<artifactId>operator-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ public static KafkaCluster fromCrd(Kafka kafkaAssembly, KafkaVersion.Lookup vers
return fromCrd(kafkaAssembly, versions, null);
}

@SuppressWarnings("checkstyle:MethodLength")
@SuppressWarnings({"checkstyle:MethodLength", "checkstyle:JavaNCSS"})
public static KafkaCluster fromCrd(Kafka kafkaAssembly, KafkaVersion.Lookup versions, Storage oldStorage) {
KafkaCluster result = new KafkaCluster(kafkaAssembly.getMetadata().getNamespace(),
kafkaAssembly.getMetadata().getName(),
Expand Down Expand Up @@ -389,7 +389,21 @@ public static KafkaCluster fromCrd(Kafka kafkaAssembly, KafkaVersion.Lookup vers

result.setJvmOptions(kafkaClusterSpec.getJvmOptions());

result.setConfiguration(new KafkaConfiguration(kafkaClusterSpec.getConfig().entrySet()));
KafkaConfiguration configuration = new KafkaConfiguration(kafkaClusterSpec.getConfig().entrySet());
List<String> errorsInConfig = configuration.validate(versions.version(kafkaClusterSpec.getVersion()));
if (!errorsInConfig.isEmpty()) {
for (String error : errorsInConfig) {
log.warn("Kafka {}/{} has invalid spec.kafka.config: {}",
kafkaAssembly.getMetadata().getNamespace(),
kafkaAssembly.getMetadata().getName(),
error);
}
throw new InvalidResourceException("Kafka " +
kafkaAssembly.getMetadata().getNamespace() + "/" + kafkaAssembly.getMetadata().getName() +
" has invalid spec.kafka.config: " +
String.join(", ", errorsInConfig));
}
result.setConfiguration(configuration);

Map<String, Object> metrics = kafkaClusterSpec.getMetrics();
if (metrics != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,21 @@

package io.strimzi.operator.cluster.model;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.strimzi.api.kafka.model.KafkaClusterSpec;
import io.strimzi.kafka.config.model.ConfigModel;
import io.strimzi.kafka.config.model.ConfigModels;
import io.strimzi.kafka.config.model.Scope;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
Expand Down Expand Up @@ -42,6 +53,7 @@ private KafkaConfiguration(String configuration, List<String> forbiddenOptions)
super(configuration, forbiddenOptions);
}


/**
* Returns a KafkaConfiguration created without forbidden option filtering.
* @param string A string representation of the Properties
Expand All @@ -50,4 +62,103 @@ private KafkaConfiguration(String configuration, List<String> forbiddenOptions)
public static KafkaConfiguration unvalidated(String string) {
return new KafkaConfiguration(string, emptyList());
}

/**
* Validate the configs in this KafkaConfiguration returning a list of errors.
* @param kafkaVersion The broker version.
* @return A list of error messages.
*/
public List<String> validate(KafkaVersion kafkaVersion) {
List<String> errors = new ArrayList<>();
Map<String, ConfigModel> models = readConfigModel(kafkaVersion);
for (Map.Entry<String, String> entry: asOrderedProperties().asMap().entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
ConfigModel config = models.get(key);
if (config != null) {
// It's not an error if config _is_ null because extra configs
// might be intended for plugins
errors.addAll(config.validate(key, value));
}
}
return errors;
}

private Map<String, ConfigModel> readConfigModel(KafkaVersion kafkaVersion) {
String name = "/kafka-" + kafkaVersion.version() + "-config-model.json";
try {
try (InputStream in = KafkaConfiguration.class.getResourceAsStream(name)) {
ConfigModels configModels = new ObjectMapper().readValue(in, ConfigModels.class);
if (!kafkaVersion.version().equals(configModels.getVersion())) {
throw new RuntimeException("Incorrect version");
}
return configModels.getConfigs();
}
} catch (IOException e) {
throw new RuntimeException("Error reading from classpath resource " + name, e);
}
}

/**
* Return true if the configs in this KafkaConfiguration include any which are read-only.
* @param kafkaVersion The broker version.
* @return true if the configs in this KafkaConfiguration include any which are read-only.
*/
public boolean anyReadOnly(KafkaVersion kafkaVersion) {
Set<String> strings = readOnlyConfigs(kafkaVersion);
return !strings.isEmpty();
}

/**
* Return the configs in this KafkaConfiguration which are read-only.
* @param kafkaVersion The broker version.
* @return The read-only configs.
*/
public Set<String> readOnlyConfigs(KafkaVersion kafkaVersion) {
return withScope(kafkaVersion, Scope.READ_ONLY);
}

/**
* Return the configs in this KafkaConfiguration which are cluster-wide.
* @param kafkaVersion The broker version.
* @return The cluster-wide configs.
*/
public Set<String> clusterWideConfigs(KafkaVersion kafkaVersion) {
return withScope(kafkaVersion, Scope.CLUSTER_WIDE);
}

/**
* Return the configs in this KafkaConfiguration which are per-broker.
* @param kafkaVersion The broker version.
* @return The per-broker configs.
*/
public Set<String> perBrokerConfigs(KafkaVersion kafkaVersion) {
return withScope(kafkaVersion, Scope.PER_BROKER);
}

private Set<String> withScope(KafkaVersion kafkaVersion, Scope scope) {
Map<String, ConfigModel> c = readConfigModel(kafkaVersion);
List<String> configsOfScope = c.entrySet().stream()
.filter(config -> scope.equals(config.getValue().getScope()))
.map(config -> config.getKey())
.collect(Collectors.toList());
Set<String> result = new HashSet<>(asOrderedProperties().asMap().keySet());
result.retainAll(configsOfScope);
return Collections.unmodifiableSet(result);
}

/**
* Return the configs in this KafkaConfiguration which are not known broker configs.
* These might be consumed by broker plugins.
* @param kafkaVersion The broker version.
* @return The unknown configs.
*/
public Set<String> unknownConfigs(KafkaVersion kafkaVersion) {
Map<String, ConfigModel> c = readConfigModel(kafkaVersion);
Set<String> result = new HashSet<>(asOrderedProperties().asMap().keySet());
result.removeAll(c.keySet());
return result;
}


}
Loading

0 comments on commit 2bd5919

Please sign in to comment.