diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java index abfa9a5dc10cf..a82238600cbf1 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java @@ -20,7 +20,8 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; -import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.runtime.isolation.TestPlugins; import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.HeaderConverter; import org.apache.kafka.connect.storage.StringConverter; @@ -69,6 +70,16 @@ public static void setup() { Map workerProps = new HashMap<>(); workerProps.put(GROUP_ID_CONFIG, WORKER_GROUP_ID); + TestPlugins.TestPlugin[] testPlugins = new TestPlugins.TestPlugin[] { + TestPlugins.TestPlugin.BAD_PACKAGING_DEFAULT_CONSTRUCTOR_THROWS_CONVERTER, + TestPlugins.TestPlugin.BAD_PACKAGING_DEFAULT_CONSTRUCTOR_THROWS_CONVERTER, + TestPlugins.TestPlugin.BAD_PACKAGING_INNOCUOUS_CONNECTOR + }; + workerProps.put( + WorkerConfig.PLUGIN_PATH_CONFIG, + TestPlugins.pluginPathJoined(testPlugins) + ); + // build a Connect cluster backed by Kafka and Zk connect = new EmbeddedConnectCluster.Builder() .name("connector-validation-connect-cluster") @@ -331,8 +342,11 @@ public void testConnectorHasAbstractConverter() throws InterruptedException { @Test public void testConnectorHasConverterWithNoSuitableConstructor() throws InterruptedException { - Map config = defaultSinkConnectorProps(); - config.put(KEY_CONVERTER_CLASS_CONFIG, TestConverterWithPrivateConstructor.class.getName()); + Map config = innocuousSinkConnectorProps(); + config.put( + KEY_CONVERTER_CLASS_CONFIG, + TestPlugins.TestPlugin.BAD_PACKAGING_NO_DEFAULT_CONSTRUCTOR_CONVERTER.className() + ); connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation( config.get(CONNECTOR_CLASS_CONFIG), config, @@ -344,8 +358,11 @@ public void testConnectorHasConverterWithNoSuitableConstructor() throws Interrup @Test public void testConnectorHasConverterThatThrowsExceptionOnInstantiation() throws InterruptedException { - Map config = defaultSinkConnectorProps(); - config.put(KEY_CONVERTER_CLASS_CONFIG, TestConverterWithConstructorThatThrowsException.class.getName()); + Map config = innocuousSinkConnectorProps(); + config.put( + KEY_CONVERTER_CLASS_CONFIG, + TestPlugins.TestPlugin.BAD_PACKAGING_DEFAULT_CONSTRUCTOR_THROWS_CONVERTER.className() + ); connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation( config.get(CONNECTOR_CLASS_CONFIG), config, @@ -423,8 +440,11 @@ public void testConnectorHasAbstractHeaderConverter() throws InterruptedExceptio @Test public void testConnectorHasHeaderConverterWithNoSuitableConstructor() throws InterruptedException { - Map config = defaultSinkConnectorProps(); - config.put(HEADER_CONVERTER_CLASS_CONFIG, TestConverterWithPrivateConstructor.class.getName()); + Map config = innocuousSinkConnectorProps(); + config.put( + HEADER_CONVERTER_CLASS_CONFIG, + TestPlugins.TestPlugin.BAD_PACKAGING_NO_DEFAULT_CONSTRUCTOR_CONVERTER.className() + ); connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation( config.get(CONNECTOR_CLASS_CONFIG), config, @@ -436,8 +456,11 @@ public void testConnectorHasHeaderConverterWithNoSuitableConstructor() throws In @Test public void testConnectorHasHeaderConverterThatThrowsExceptionOnInstantiation() throws InterruptedException { - Map config = defaultSinkConnectorProps(); - config.put(HEADER_CONVERTER_CLASS_CONFIG, TestConverterWithConstructorThatThrowsException.class.getName()); + Map config = innocuousSinkConnectorProps(); + config.put( + HEADER_CONVERTER_CLASS_CONFIG, + TestPlugins.TestPlugin.BAD_PACKAGING_DEFAULT_CONSTRUCTOR_THROWS_CONVERTER.className() + ); connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation( config.get(CONNECTOR_CLASS_CONFIG), config, @@ -520,17 +543,6 @@ public byte[] fromConnectHeader(String topic, String headerKey, Schema schema, O public abstract static class AbstractTestConverter extends TestConverter { } - public static class TestConverterWithPrivateConstructor extends TestConverter { - private TestConverterWithPrivateConstructor() { - } - } - - public static class TestConverterWithConstructorThatThrowsException extends TestConverter { - public TestConverterWithConstructorThatThrowsException() { - throw new ConnectException("whoops"); - } - } - public static class TestConverterWithSinglePropertyConfigDef extends TestConverter { public static final String BOOLEAN_PROPERTY_NAME = "prop"; @Override @@ -570,4 +582,15 @@ private Map defaultSinkConnectorProps() { return props; } + private Map innocuousSinkConnectorProps() { + Map props = new HashMap<>(); + props.put(NAME_CONFIG, "innocuous-sink-connector"); + props.put(CONNECTOR_CLASS_CONFIG, TestPlugins.TestPlugin.BAD_PACKAGING_INNOCUOUS_CONNECTOR.className()); + props.put(TASKS_MAX_CONFIG, "1"); + props.put(TOPICS_CONFIG, "t1"); + props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + return props; + } + } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java index 268efdc8c026f..d083f980349a1 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java @@ -26,7 +26,6 @@ import java.nio.file.Path; import java.util.Collections; import java.util.HashSet; -import java.util.Map; import java.util.Set; import java.util.stream.Stream; @@ -37,13 +36,9 @@ public class PluginScannerTest { - private enum ScannerType { Reflection, ServiceLoader } - @TempDir File pluginDir; - private Map scannerMap; - static Stream parameters() { return Stream.of(new ReflectionScanner(), new ServiceLoaderScanner()); } @@ -140,7 +135,6 @@ public void testVersionedPluginsHasVersion(PluginScanner scanner) { TestPlugins.pluginPath(TestPlugins.TestPlugin.READ_VERSION_FROM_RESOURCE_V1)); assertFalse(versionedPluginResult.isEmpty()); versionedPluginResult.forEach(pluginDesc -> assertEquals("1.0.0", pluginDesc.version())); - } private PluginScanResult scan(PluginScanner scanner, Set pluginLocations) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java index 47c6f8b4ee66e..c9d8892da91ce 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java @@ -129,10 +129,18 @@ public enum TestPlugin { * A plugin which is incorrectly packaged, which throws an exception from default constructor. */ BAD_PACKAGING_DEFAULT_CONSTRUCTOR_THROWS_CONNECTOR(TestPackage.BAD_PACKAGING, "test.plugins.DefaultConstructorThrowsConnector", false), + /** + * A plugin which is incorrectly packaged, which throws an exception from default constructor. + */ + BAD_PACKAGING_DEFAULT_CONSTRUCTOR_THROWS_CONVERTER(TestPackage.BAD_PACKAGING, "test.plugins.DefaultConstructorThrowsConverter", false), /** * A plugin which is incorrectly packaged, which throws an exception from the {@link Versioned#version()} method. */ BAD_PACKAGING_INNER_CLASS_CONNECTOR(TestPackage.BAD_PACKAGING, "test.plugins.OuterClass$InnerClass", false), + /** + * A valid plugin, that can be used to test other (possibly-invalid) plugins in the same package. + */ + BAD_PACKAGING_INNOCUOUS_CONNECTOR(TestPackage.BAD_PACKAGING, "test.plugins.InnocuousSinkConnector", true), /** * A plugin which is incorrectly packaged, and is missing a superclass definition. */ diff --git a/connect/runtime/src/test/resources/test-plugins/bad-packaging/META-INF/services/org.apache.kafka.connect.sink.SinkConnector b/connect/runtime/src/test/resources/test-plugins/bad-packaging/META-INF/services/org.apache.kafka.connect.sink.SinkConnector index afd40b0d63f97..8e843327a68a0 100644 --- a/connect/runtime/src/test/resources/test-plugins/bad-packaging/META-INF/services/org.apache.kafka.connect.sink.SinkConnector +++ b/connect/runtime/src/test/resources/test-plugins/bad-packaging/META-INF/services/org.apache.kafka.connect.sink.SinkConnector @@ -15,6 +15,7 @@ test.plugins.DefaultConstructorPrivateConnector test.plugins.DefaultConstructorThrowsConnector +test.plugins.InnocuousSinkConnector test.plugins.NoDefaultConstructorConnector test.plugins.StaticInitializerThrowsConnector test.plugins.OuterClass$InnerClass diff --git a/connect/runtime/src/test/resources/test-plugins/bad-packaging/META-INF/services/org.apache.kafka.connect.storage.Converter b/connect/runtime/src/test/resources/test-plugins/bad-packaging/META-INF/services/org.apache.kafka.connect.storage.Converter index 38da8f3376742..b4696e6999842 100644 --- a/connect/runtime/src/test/resources/test-plugins/bad-packaging/META-INF/services/org.apache.kafka.connect.storage.Converter +++ b/connect/runtime/src/test/resources/test-plugins/bad-packaging/META-INF/services/org.apache.kafka.connect.storage.Converter @@ -14,6 +14,7 @@ # limitations under the License. test.plugins.CoLocatedPlugin +test.plugins.DefaultConstructorThrowsConverter test.plugins.MissingSuperclassConverter test.plugins.NoDefaultConstructorConverter -test.plugins.NoDefaultConstructorConverter + diff --git a/connect/runtime/src/test/resources/test-plugins/bad-packaging/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter b/connect/runtime/src/test/resources/test-plugins/bad-packaging/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter new file mode 100644 index 0000000000000..dcf9c16d9a66f --- /dev/null +++ b/connect/runtime/src/test/resources/test-plugins/bad-packaging/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter @@ -0,0 +1,17 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +test.plugins.DefaultConstructorThrowsConverter +test.plugins.NoDefaultConstructorConverter diff --git a/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/DefaultConstructorThrowsConverter.java b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/DefaultConstructorThrowsConverter.java new file mode 100644 index 0000000000000..9888b4548981d --- /dev/null +++ b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/DefaultConstructorThrowsConverter.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.plugins; + +import java.io.IOException; +import java.util.Map; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.storage.HeaderConverter; + +/** + * Fake plugin class for testing classloading isolation. + * See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}. + *

Unconditionally throw an exception during the default constructor. + */ +public class DefaultConstructorThrowsConverter implements Converter, HeaderConverter { + + public DefaultConstructorThrowsConverter() { + throw new RuntimeException("I always throw an exception"); + } + + @Override + public void configure(final Map configs, final boolean isKey) { + + } + + @Override + public byte[] fromConnectData(final String topic, final Schema schema, final Object value) { + return new byte[0]; + } + + @Override + public SchemaAndValue toConnectData(final String topic, final byte[] value) { + return null; + } + + @Override + public void close() throws IOException { + } + + @Override + public void configure(Map configs) { + } + + @Override + public SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] value) { + return null; + } + + @Override + public byte[] fromConnectHeader(String topic, String headerKey, Schema schema, Object value) { + return new byte[0]; + } + + @Override + public ConfigDef config() { + return new ConfigDef(); + } +} diff --git a/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/InnocuousSinkConnector.java b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/InnocuousSinkConnector.java new file mode 100644 index 0000000000000..1ba580ca9a39c --- /dev/null +++ b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/InnocuousSinkConnector.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.plugins; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.sink.SinkConnector; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; + +/** + * Fake plugin class for testing classloading isolation. + * See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}. + *

This is a valid connector class, and can be used to test connector configurations + * that use other plugins in this package. + */ +public class InnocuousSinkConnector extends SinkConnector { + + @Override + public String version() { + return "0.0.0"; + } + + @Override + public void start(Map props) { + } + + @Override + public Class taskClass() { + return InnocuousSinkTask.class; + } + + @Override + public List> taskConfigs(int maxTasks) { + return Collections.emptyList(); + } + + @Override + public void stop() { + } + + @Override + public ConfigDef config() { + return new ConfigDef(); + } + + public static class InnocuousSinkTask extends SinkTask { + @Override + public String version() { + return "0.0.0"; + } + + @Override + public void start(Map props) { + } + + @Override + public void put(Collection records) { + } + + @Override + public void stop() { + } + } +} diff --git a/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/NoDefaultConstructorConverter.java b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/NoDefaultConstructorConverter.java index 35b1812e5627e..bfdfcaa390b48 100644 --- a/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/NoDefaultConstructorConverter.java +++ b/connect/runtime/src/test/resources/test-plugins/bad-packaging/test/plugins/NoDefaultConstructorConverter.java @@ -17,17 +17,21 @@ package test.plugins; +import java.io.IOException; import java.util.Map; + +import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.storage.HeaderConverter; /** * Fake plugin class for testing classloading isolation. * See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}. *

This class has no default constructor */ -public class NoDefaultConstructorConverter implements Converter { +public class NoDefaultConstructorConverter implements Converter, HeaderConverter { public NoDefaultConstructorConverter(int ignored) { } @@ -46,4 +50,27 @@ public byte[] fromConnectData(final String topic, final Schema schema, final Obj public SchemaAndValue toConnectData(final String topic, final byte[] value) { return null; } + + @Override + public void close() throws IOException { + } + + @Override + public void configure(Map configs) { + } + + @Override + public SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] value) { + return null; + } + + @Override + public byte[] fromConnectHeader(String topic, String headerKey, Schema schema, Object value) { + return new byte[0]; + } + + @Override + public ConfigDef config() { + return new ConfigDef(); + } }