Skip to content

Commit

Permalink
KAFKA-16068: Use TestPlugins mechanism in ConnectorValidationIntegrat…
Browse files Browse the repository at this point in the history
…ionTest to prevent ERROR-level log spam in unrelated test suites (apache#16647)

Reviewers: Greg Harris <[email protected]>
  • Loading branch information
C0urante authored Jul 23, 2024
1 parent c71eb60 commit 956d740
Show file tree
Hide file tree
Showing 9 changed files with 261 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.isolation.TestPlugins;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.StringConverter;
Expand Down Expand Up @@ -69,6 +70,16 @@ public static void setup() {
Map<String, String> workerProps = new HashMap<>();
workerProps.put(GROUP_ID_CONFIG, WORKER_GROUP_ID);

TestPlugins.TestPlugin[] testPlugins = new TestPlugins.TestPlugin[] {
TestPlugins.TestPlugin.BAD_PACKAGING_DEFAULT_CONSTRUCTOR_THROWS_CONVERTER,
TestPlugins.TestPlugin.BAD_PACKAGING_DEFAULT_CONSTRUCTOR_THROWS_CONVERTER,
TestPlugins.TestPlugin.BAD_PACKAGING_INNOCUOUS_CONNECTOR
};
workerProps.put(
WorkerConfig.PLUGIN_PATH_CONFIG,
TestPlugins.pluginPathJoined(testPlugins)
);

// build a Connect cluster backed by Kafka and Zk
connect = new EmbeddedConnectCluster.Builder()
.name("connector-validation-connect-cluster")
Expand Down Expand Up @@ -331,8 +342,11 @@ public void testConnectorHasAbstractConverter() throws InterruptedException {

@Test
public void testConnectorHasConverterWithNoSuitableConstructor() throws InterruptedException {
Map<String, String> config = defaultSinkConnectorProps();
config.put(KEY_CONVERTER_CLASS_CONFIG, TestConverterWithPrivateConstructor.class.getName());
Map<String, String> config = innocuousSinkConnectorProps();
config.put(
KEY_CONVERTER_CLASS_CONFIG,
TestPlugins.TestPlugin.BAD_PACKAGING_NO_DEFAULT_CONSTRUCTOR_CONVERTER.className()
);
connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
config.get(CONNECTOR_CLASS_CONFIG),
config,
Expand All @@ -344,8 +358,11 @@ public void testConnectorHasConverterWithNoSuitableConstructor() throws Interrup

@Test
public void testConnectorHasConverterThatThrowsExceptionOnInstantiation() throws InterruptedException {
Map<String, String> config = defaultSinkConnectorProps();
config.put(KEY_CONVERTER_CLASS_CONFIG, TestConverterWithConstructorThatThrowsException.class.getName());
Map<String, String> config = innocuousSinkConnectorProps();
config.put(
KEY_CONVERTER_CLASS_CONFIG,
TestPlugins.TestPlugin.BAD_PACKAGING_DEFAULT_CONSTRUCTOR_THROWS_CONVERTER.className()
);
connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
config.get(CONNECTOR_CLASS_CONFIG),
config,
Expand Down Expand Up @@ -423,8 +440,11 @@ public void testConnectorHasAbstractHeaderConverter() throws InterruptedExceptio

@Test
public void testConnectorHasHeaderConverterWithNoSuitableConstructor() throws InterruptedException {
Map<String, String> config = defaultSinkConnectorProps();
config.put(HEADER_CONVERTER_CLASS_CONFIG, TestConverterWithPrivateConstructor.class.getName());
Map<String, String> config = innocuousSinkConnectorProps();
config.put(
HEADER_CONVERTER_CLASS_CONFIG,
TestPlugins.TestPlugin.BAD_PACKAGING_NO_DEFAULT_CONSTRUCTOR_CONVERTER.className()
);
connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
config.get(CONNECTOR_CLASS_CONFIG),
config,
Expand All @@ -436,8 +456,11 @@ public void testConnectorHasHeaderConverterWithNoSuitableConstructor() throws In

@Test
public void testConnectorHasHeaderConverterThatThrowsExceptionOnInstantiation() throws InterruptedException {
Map<String, String> config = defaultSinkConnectorProps();
config.put(HEADER_CONVERTER_CLASS_CONFIG, TestConverterWithConstructorThatThrowsException.class.getName());
Map<String, String> config = innocuousSinkConnectorProps();
config.put(
HEADER_CONVERTER_CLASS_CONFIG,
TestPlugins.TestPlugin.BAD_PACKAGING_DEFAULT_CONSTRUCTOR_THROWS_CONVERTER.className()
);
connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
config.get(CONNECTOR_CLASS_CONFIG),
config,
Expand Down Expand Up @@ -520,17 +543,6 @@ public byte[] fromConnectHeader(String topic, String headerKey, Schema schema, O
public abstract static class AbstractTestConverter extends TestConverter {
}

public static class TestConverterWithPrivateConstructor extends TestConverter {
private TestConverterWithPrivateConstructor() {
}
}

public static class TestConverterWithConstructorThatThrowsException extends TestConverter {
public TestConverterWithConstructorThatThrowsException() {
throw new ConnectException("whoops");
}
}

public static class TestConverterWithSinglePropertyConfigDef extends TestConverter {
public static final String BOOLEAN_PROPERTY_NAME = "prop";
@Override
Expand Down Expand Up @@ -570,4 +582,15 @@ private Map<String, String> defaultSinkConnectorProps() {
return props;
}

private Map<String, String> innocuousSinkConnectorProps() {
Map<String, String> props = new HashMap<>();
props.put(NAME_CONFIG, "innocuous-sink-connector");
props.put(CONNECTOR_CLASS_CONFIG, TestPlugins.TestPlugin.BAD_PACKAGING_INNOCUOUS_CONNECTOR.className());
props.put(TASKS_MAX_CONFIG, "1");
props.put(TOPICS_CONFIG, "t1");
props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
return props;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;

Expand All @@ -37,13 +36,9 @@

public class PluginScannerTest {

private enum ScannerType { Reflection, ServiceLoader }

@TempDir
File pluginDir;

private Map<ScannerType, PluginScanner> scannerMap;

static Stream<PluginScanner> parameters() {
return Stream.of(new ReflectionScanner(), new ServiceLoaderScanner());
}
Expand Down Expand Up @@ -140,7 +135,6 @@ public void testVersionedPluginsHasVersion(PluginScanner scanner) {
TestPlugins.pluginPath(TestPlugins.TestPlugin.READ_VERSION_FROM_RESOURCE_V1));
assertFalse(versionedPluginResult.isEmpty());
versionedPluginResult.forEach(pluginDesc -> assertEquals("1.0.0", pluginDesc.version()));

}

private PluginScanResult scan(PluginScanner scanner, Set<Path> pluginLocations) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,18 @@ public enum TestPlugin {
* A plugin which is incorrectly packaged, which throws an exception from default constructor.
*/
BAD_PACKAGING_DEFAULT_CONSTRUCTOR_THROWS_CONNECTOR(TestPackage.BAD_PACKAGING, "test.plugins.DefaultConstructorThrowsConnector", false),
/**
* A plugin which is incorrectly packaged, which throws an exception from default constructor.
*/
BAD_PACKAGING_DEFAULT_CONSTRUCTOR_THROWS_CONVERTER(TestPackage.BAD_PACKAGING, "test.plugins.DefaultConstructorThrowsConverter", false),
/**
* A plugin which is incorrectly packaged, which throws an exception from the {@link Versioned#version()} method.
*/
BAD_PACKAGING_INNER_CLASS_CONNECTOR(TestPackage.BAD_PACKAGING, "test.plugins.OuterClass$InnerClass", false),
/**
* A valid plugin, that can be used to test other (possibly-invalid) plugins in the same package.
*/
BAD_PACKAGING_INNOCUOUS_CONNECTOR(TestPackage.BAD_PACKAGING, "test.plugins.InnocuousSinkConnector", true),
/**
* A plugin which is incorrectly packaged, and is missing a superclass definition.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

test.plugins.DefaultConstructorPrivateConnector
test.plugins.DefaultConstructorThrowsConnector
test.plugins.InnocuousSinkConnector
test.plugins.NoDefaultConstructorConnector
test.plugins.StaticInitializerThrowsConnector
test.plugins.OuterClass$InnerClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.

test.plugins.CoLocatedPlugin
test.plugins.DefaultConstructorThrowsConverter
test.plugins.MissingSuperclassConverter
test.plugins.NoDefaultConstructorConverter
test.plugins.NoDefaultConstructorConverter

Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

test.plugins.DefaultConstructorThrowsConverter
test.plugins.NoDefaultConstructorConverter
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package test.plugins;

import java.io.IOException;
import java.util.Map;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;

/**
* Fake plugin class for testing classloading isolation.
* See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
* <p>Unconditionally throw an exception during the default constructor.
*/
public class DefaultConstructorThrowsConverter implements Converter, HeaderConverter {

public DefaultConstructorThrowsConverter() {
throw new RuntimeException("I always throw an exception");
}

@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {

}

@Override
public byte[] fromConnectData(final String topic, final Schema schema, final Object value) {
return new byte[0];
}

@Override
public SchemaAndValue toConnectData(final String topic, final byte[] value) {
return null;
}

@Override
public void close() throws IOException {
}

@Override
public void configure(Map<String, ?> configs) {
}

@Override
public SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] value) {
return null;
}

@Override
public byte[] fromConnectHeader(String topic, String headerKey, Schema schema, Object value) {
return new byte[0];
}

@Override
public ConfigDef config() {
return new ConfigDef();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package test.plugins;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;

/**
* Fake plugin class for testing classloading isolation.
* See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
* <p>This is a valid connector class, and can be used to test connector configurations
* that use other plugins in this package.
*/
public class InnocuousSinkConnector extends SinkConnector {

@Override
public String version() {
return "0.0.0";
}

@Override
public void start(Map<String, String> props) {
}

@Override
public Class<? extends Task> taskClass() {
return InnocuousSinkTask.class;
}

@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
return Collections.emptyList();
}

@Override
public void stop() {
}

@Override
public ConfigDef config() {
return new ConfigDef();
}

public static class InnocuousSinkTask extends SinkTask {
@Override
public String version() {
return "0.0.0";
}

@Override
public void start(Map<String, String> props) {
}

@Override
public void put(Collection<SinkRecord> records) {
}

@Override
public void stop() {
}
}
}
Loading

0 comments on commit 956d740

Please sign in to comment.