Skip to content

Commit

Permalink
KAFKA-8407: Fix validation of class and list configs in connector cli…
Browse files Browse the repository at this point in the history
…ent overrides (#6789)

Because of how config values are converted into strings in the `AbstractHerder.validateClientOverrides()` method after being validated by the client override policy, an exception is thrown if the value returned by the policy isn't already parsed as the type expected by the client `ConfigDef`. The fix here involves parsing client override properties before passing them to the override policy.

A unit test is added to ensure that several different types of configs are validated properly by the herder.

Author: Chris Egerton <[email protected]>
Reviewers: Magesh Nandakumar <[email protected]>, Randall Hauch <[email protected]>
  • Loading branch information
C0urante authored and rhauch committed May 23, 2019
1 parent d1d5ab8 commit baddf8b
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,16 @@ private static ConfigInfos validateClientOverrides(String connName,
List<ConfigInfo> configInfoList = new LinkedList<>();
Map<String, ConfigKey> configKeys = configDef.configKeys();
Set<String> groups = new LinkedHashSet<>();
Map<String, Object> clientConfigs = connectorConfig.originalsWithPrefix(prefix);
Map<String, Object> clientConfigs = new HashMap<>();
for (Map.Entry<String, Object> rawClientConfig : connectorConfig.originalsWithPrefix(prefix).entrySet()) {
String configName = rawClientConfig.getKey();
Object rawConfigValue = rawClientConfig.getValue();
ConfigKey configKey = configDef.configKeys().get(configName);
Object parsedConfigValue = configKey != null
? ConfigDef.parseType(configName, rawConfigValue, configKey.type)
: rawConfigValue;
clientConfigs.put(configName, parsedConfigValue);
}
ConnectorClientConfigRequest connectorClientConfigRequest = new ConnectorClientConfigRequest(
connName, connectorType, connectorClass, clientConfigs, clientType);
List<ConfigValue> configValues = connectorClientConfigOverridePolicy.validate(connectorClientConfigRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,22 @@
*/
package org.apache.kafka.connect.runtime;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.connector.policy.PrincipalConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
Expand Down Expand Up @@ -56,6 +60,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static org.powermock.api.easymock.PowerMock.verifyAll;
import static org.powermock.api.easymock.PowerMock.replayAll;
Expand Down Expand Up @@ -364,14 +369,12 @@ public void testConfigValidationPrincipalOnlyOverride() {
AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class, new PrincipalConnectorClientConfigOverridePolicy());
replayAll();

// Define 2 transformations. One has a class defined and so can get embedded configs, the other is missing
// class info that should generate an error.
Map<String, String> config = new HashMap<>();
config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestSourceConnector.class.getName());
config.put(ConnectorConfig.NAME_CONFIG, "connector-name");
config.put("required", "value"); // connector required config
String ackConfigKey = ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + ProducerConfig.ACKS_CONFIG;
String saslConfigKey = ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + SaslConfigs.SASL_JAAS_CONFIG;
String ackConfigKey = producerOverrideKey(ProducerConfig.ACKS_CONFIG);
String saslConfigKey = producerOverrideKey(SaslConfigs.SASL_JAAS_CONFIG);
config.put(ackConfigKey, "none");
config.put(saslConfigKey, "jaas_config");

Expand Down Expand Up @@ -399,6 +402,55 @@ public void testConfigValidationPrincipalOnlyOverride() {
verifyAll();
}

@Test
public void testConfigValidationAllOverride() {
AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class, new AllConnectorClientConfigOverridePolicy());
replayAll();

Map<String, String> config = new HashMap<>();
config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestSourceConnector.class.getName());
config.put(ConnectorConfig.NAME_CONFIG, "connector-name");
config.put("required", "value"); // connector required config
// Try to test a variety of configuration types: string, int, long, boolean, list, class
String protocolConfigKey = producerOverrideKey(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);
config.put(protocolConfigKey, "SASL_PLAINTEXT");
String maxRequestSizeConfigKey = producerOverrideKey(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
config.put(maxRequestSizeConfigKey, "420");
String maxBlockConfigKey = producerOverrideKey(ProducerConfig.MAX_BLOCK_MS_CONFIG);
config.put(maxBlockConfigKey, "28980");
String idempotenceConfigKey = producerOverrideKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG);
config.put(idempotenceConfigKey, "true");
String bootstrapServersConfigKey = producerOverrideKey(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
config.put(bootstrapServersConfigKey, "SASL_PLAINTEXT://localhost:12345,SASL_PLAINTEXT://localhost:23456");
String loginCallbackHandlerConfigKey = producerOverrideKey(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS);
config.put(loginCallbackHandlerConfigKey, OAuthBearerUnsecuredLoginCallbackHandler.class.getName());

final Set<String> overriddenClientConfigs = new HashSet<>();
overriddenClientConfigs.add(protocolConfigKey);
overriddenClientConfigs.add(maxRequestSizeConfigKey);
overriddenClientConfigs.add(maxBlockConfigKey);
overriddenClientConfigs.add(idempotenceConfigKey);
overriddenClientConfigs.add(bootstrapServersConfigKey);
overriddenClientConfigs.add(loginCallbackHandlerConfigKey);

ConfigInfos result = herder.validateConnectorConfig(config);
assertEquals(herder.connectorTypeForClass(config.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG)), ConnectorType.SOURCE);

Map<String, String> validatedOverriddenClientConfigs = new HashMap<>();
for (ConfigInfo configInfo : result.values()) {
String configName = configInfo.configKey().name();
if (overriddenClientConfigs.contains(configName)) {
validatedOverriddenClientConfigs.put(configName, configInfo.configValue().value());
}
}
Map<String, String> rawOverriddenClientConfigs = config.entrySet().stream()
.filter(e -> overriddenClientConfigs.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

assertEquals(rawOverriddenClientConfigs, validatedOverriddenClientConfigs);
verifyAll();
}

@Test
public void testReverseTransformConfigs() {
// Construct a task config with constant values for TEST_KEY and TEST_KEY2
Expand Down Expand Up @@ -482,4 +534,8 @@ private abstract class BogusSourceConnector extends SourceConnector {

private abstract class BogusSourceTask extends SourceTask {
}

private static String producerOverrideKey(String config) {
return ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + config;
}
}

0 comments on commit baddf8b

Please sign in to comment.