Skip to content

Commit

Permalink
fix integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
postamar committed Feb 13, 2024
1 parent fe2dff3 commit 764dac9
Showing 1 changed file with 23 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
import com.google.common.collect.ImmutableMap;
import io.airbyte.cdk.integrations.standardtest.source.SourceAcceptanceTest;
import io.airbyte.cdk.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.cdk.integrations.util.HostPortResolver;
import io.airbyte.commons.jackson.MoreMappers;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.string.Strings;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.v0.CatalogHelpers;
Expand All @@ -22,6 +24,7 @@
import io.airbyte.protocol.models.v0.SyncMode;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.AdminClient;
Expand All @@ -32,16 +35,18 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.connect.json.JsonSerializer;
import org.junit.jupiter.api.BeforeAll;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

public class KafkaSourceAcceptanceTest extends SourceAcceptanceTest {

private static final ObjectMapper mapper = MoreMappers.initMapper();
private static final String TOPIC_NAME = "test.topic";

private static KafkaContainer KAFKA;

private String topicName;

@Override
protected String getImageName() {
return "airbyte/source-kafka:dev";
Expand All @@ -53,10 +58,11 @@ protected JsonNode getConfig() {
final ObjectNode subscriptionConfig = mapper.createObjectNode();
protocolConfig.put("security_protocol", KafkaProtocol.PLAINTEXT.toString());
subscriptionConfig.put("subscription_type", "subscribe");
subscriptionConfig.put("topic_pattern", TOPIC_NAME);
subscriptionConfig.put("topic_pattern", topicName);

var bootstrapServers = String.format("PLAINTEXT://%s:%s", HostPortResolver.resolveHost(KAFKA), HostPortResolver.resolvePort(KAFKA));
return Jsons.jsonNode(ImmutableMap.builder()
.put("bootstrap_servers", KAFKA.getBootstrapServers())
.put("bootstrap_servers", bootstrapServers)
.put("subscription", subscriptionConfig)
.put("client_dns_lookup", "use_all_dns_ips")
.put("enable_auto_commit", false)
Expand All @@ -67,11 +73,15 @@ protected JsonNode getConfig() {
.build());
}

@Override
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
@BeforeAll
static public void setupContainer() {
KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.0"));
KAFKA.start();
}

@Override
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
topicName = Strings.addRandomSuffix("topic.test", "_", 10);
createTopic();
sendEvent();
}
Expand All @@ -87,7 +97,7 @@ private void sendEvent() throws ExecutionException, InterruptedException {
final ObjectNode event = mapper.createObjectNode();
event.put("test", "value");

producer.send(new ProducerRecord<>(TOPIC_NAME, event), (recordMetadata, exception) -> {
producer.send(new ProducerRecord<>(topicName, event), (recordMetadata, exception) -> {
if (exception != null) {
throw new RuntimeException("Cannot send message to Kafka. Error: " + exception.getMessage(), exception);
}
Expand All @@ -96,14 +106,18 @@ private void sendEvent() throws ExecutionException, InterruptedException {

private void createTopic() throws Exception {
try (final var admin = AdminClient.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers()))) {
final NewTopic topic = new NewTopic(TOPIC_NAME, 1, (short) 1);
final NewTopic topic = new NewTopic(topicName, 1, (short) 1);
admin.createTopics(Collections.singletonList(topic)).all().get();
}
}

@Override
protected void tearDown(final TestDestinationEnv testEnv) {
KAFKA.close();
try (final var admin = AdminClient.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers()))) {
admin.deleteTopics(List.of(topicName)).all().get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
Expand All @@ -114,7 +128,7 @@ protected ConnectorSpecification getSpec() throws Exception {
@Override
protected ConfiguredAirbyteCatalog getConfiguredCatalog() throws Exception {
final ConfiguredAirbyteStream streams =
CatalogHelpers.createConfiguredAirbyteStream(TOPIC_NAME, null, Field.of("value", JsonSchemaType.STRING));
CatalogHelpers.createConfiguredAirbyteStream(topicName, null, Field.of("value", JsonSchemaType.STRING));
streams.setSyncMode(SyncMode.FULL_REFRESH);
return new ConfiguredAirbyteCatalog().withStreams(Collections.singletonList(streams));
}
Expand Down

0 comments on commit 764dac9

Please sign in to comment.