diff --git a/airbyte-integrations/connectors/source-kafka/src/test-integration/java/io/airbyte/integrations/source/kafka/KafkaSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-kafka/src/test-integration/java/io/airbyte/integrations/source/kafka/KafkaSourceAcceptanceTest.java index 2ae90e827d746..cef83ca661473 100644 --- a/airbyte-integrations/connectors/source-kafka/src/test-integration/java/io/airbyte/integrations/source/kafka/KafkaSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-kafka/src/test-integration/java/io/airbyte/integrations/source/kafka/KafkaSourceAcceptanceTest.java @@ -10,9 +10,11 @@ import com.google.common.collect.ImmutableMap; import io.airbyte.cdk.integrations.standardtest.source.SourceAcceptanceTest; import io.airbyte.cdk.integrations.standardtest.source.TestDestinationEnv; +import io.airbyte.cdk.integrations.util.HostPortResolver; import io.airbyte.commons.jackson.MoreMappers; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.resources.MoreResources; +import io.airbyte.commons.string.Strings; import io.airbyte.protocol.models.Field; import io.airbyte.protocol.models.JsonSchemaType; import io.airbyte.protocol.models.v0.CatalogHelpers; @@ -22,6 +24,7 @@ import io.airbyte.protocol.models.v0.SyncMode; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import org.apache.kafka.clients.admin.AdminClient; @@ -32,16 +35,18 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.connect.json.JsonSerializer; +import org.junit.jupiter.api.BeforeAll; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.utility.DockerImageName; public class KafkaSourceAcceptanceTest extends SourceAcceptanceTest { private static final ObjectMapper mapper = MoreMappers.initMapper(); - private static final String TOPIC_NAME = "test.topic"; private static KafkaContainer KAFKA; + private String topicName; + @Override protected String getImageName() { return "airbyte/source-kafka:dev"; @@ -53,10 +58,11 @@ protected JsonNode getConfig() { final ObjectNode subscriptionConfig = mapper.createObjectNode(); protocolConfig.put("security_protocol", KafkaProtocol.PLAINTEXT.toString()); subscriptionConfig.put("subscription_type", "subscribe"); - subscriptionConfig.put("topic_pattern", TOPIC_NAME); + subscriptionConfig.put("topic_pattern", topicName); + var bootstrapServers = String.format("PLAINTEXT://%s:%s", HostPortResolver.resolveHost(KAFKA), HostPortResolver.resolvePort(KAFKA)); return Jsons.jsonNode(ImmutableMap.builder() - .put("bootstrap_servers", KAFKA.getBootstrapServers()) + .put("bootstrap_servers", bootstrapServers) .put("subscription", subscriptionConfig) .put("client_dns_lookup", "use_all_dns_ips") .put("enable_auto_commit", false) @@ -67,11 +73,15 @@ protected JsonNode getConfig() { .build()); } - @Override - protected void setupEnvironment(final TestDestinationEnv environment) throws Exception { + @BeforeAll + static public void setupContainer() { KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.0")); KAFKA.start(); + } + @Override + protected void setupEnvironment(final TestDestinationEnv environment) throws Exception { + topicName = Strings.addRandomSuffix("topic.test", "_", 10); createTopic(); sendEvent(); } @@ -87,7 +97,7 @@ private void sendEvent() throws ExecutionException, InterruptedException { final ObjectNode event = mapper.createObjectNode(); event.put("test", "value"); - producer.send(new ProducerRecord<>(TOPIC_NAME, event), (recordMetadata, exception) -> { + producer.send(new ProducerRecord<>(topicName, event), (recordMetadata, exception) -> { if (exception != null) { throw new RuntimeException("Cannot send message to Kafka. Error: " + exception.getMessage(), exception); } @@ -96,14 +106,18 @@ private void sendEvent() throws ExecutionException, InterruptedException { private void createTopic() throws Exception { try (final var admin = AdminClient.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers()))) { - final NewTopic topic = new NewTopic(TOPIC_NAME, 1, (short) 1); + final NewTopic topic = new NewTopic(topicName, 1, (short) 1); admin.createTopics(Collections.singletonList(topic)).all().get(); } } @Override protected void tearDown(final TestDestinationEnv testEnv) { - KAFKA.close(); + try (final var admin = AdminClient.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers()))) { + admin.deleteTopics(List.of(topicName)).all().get(); + } catch (Exception e) { + throw new RuntimeException(e); + } } @Override @@ -114,7 +128,7 @@ protected ConnectorSpecification getSpec() throws Exception { @Override protected ConfiguredAirbyteCatalog getConfiguredCatalog() throws Exception { final ConfiguredAirbyteStream streams = - CatalogHelpers.createConfiguredAirbyteStream(TOPIC_NAME, null, Field.of("value", JsonSchemaType.STRING)); + CatalogHelpers.createConfiguredAirbyteStream(topicName, null, Field.of("value", JsonSchemaType.STRING)); streams.setSyncMode(SyncMode.FULL_REFRESH); return new ConfiguredAirbyteCatalog().withStreams(Collections.singletonList(streams)); }