Skip to content

Commit

Permalink
source-kafka: adopt CDK 0.20.4 (airbytehq#35229)
Browse files Browse the repository at this point in the history
  • Loading branch information
postamar authored and jatinyadav-cc committed Feb 21, 2024
1 parent eefb472 commit 91c6799
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 24 deletions.
16 changes: 2 additions & 14 deletions airbyte-integrations/connectors/source-kafka/build.gradle
Original file line number Diff line number Diff line change
@@ -1,23 +1,13 @@
plugins {
id 'application'
id 'airbyte-java-connector'
}

airbyteJavaConnector {
cdkVersionRequired = '0.2.0'
cdkVersionRequired = '0.20.4'
features = ['db-sources']
useLocalCdk = false
}

//remove once upgrading the CDK version to 0.4.x or later
java {
compileJava {
options.compilerArgs.remove("-Werror")
}
}

airbyteJavaConnector.addCdkDependencies()

application {
mainClass = 'io.airbyte.integrations.source.kafka.KafkaSource'
applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0']
Expand All @@ -29,7 +19,5 @@ dependencies {
implementation 'org.apache.kafka:connect-json:3.2.1'
implementation 'io.confluent:kafka-avro-serializer:7.2.1'

testImplementation libs.testcontainers.kafka

integrationTestJavaImplementation libs.testcontainers.kafka
testImplementation 'org.testcontainers:kafka:1.19.4'
}
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-kafka/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: d917a47b-8537-4d0d-8c10-36a9928d4265
dockerImageTag: 0.2.3
dockerImageTag: 0.2.4
dockerRepository: airbyte/source-kafka
githubIssueLabel: source-kafka
icon: kafka.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
import com.google.common.collect.ImmutableMap;
import io.airbyte.cdk.integrations.standardtest.source.SourceAcceptanceTest;
import io.airbyte.cdk.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.cdk.integrations.util.HostPortResolver;
import io.airbyte.commons.jackson.MoreMappers;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.string.Strings;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.v0.CatalogHelpers;
Expand All @@ -22,6 +24,7 @@
import io.airbyte.protocol.models.v0.SyncMode;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.AdminClient;
Expand All @@ -32,16 +35,20 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.connect.json.JsonSerializer;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

@Disabled("need to fix docker container networking")
public class KafkaSourceAcceptanceTest extends SourceAcceptanceTest {

private static final ObjectMapper mapper = MoreMappers.initMapper();
private static final String TOPIC_NAME = "test.topic";

private static KafkaContainer KAFKA;

private String topicName;

@Override
protected String getImageName() {
return "airbyte/source-kafka:dev";
Expand All @@ -53,10 +60,11 @@ protected JsonNode getConfig() {
final ObjectNode subscriptionConfig = mapper.createObjectNode();
protocolConfig.put("security_protocol", KafkaProtocol.PLAINTEXT.toString());
subscriptionConfig.put("subscription_type", "subscribe");
subscriptionConfig.put("topic_pattern", TOPIC_NAME);
subscriptionConfig.put("topic_pattern", topicName);

var bootstrapServers = String.format("PLAINTEXT://%s:%d", HostPortResolver.resolveHost(KAFKA), HostPortResolver.resolvePort(KAFKA));
return Jsons.jsonNode(ImmutableMap.builder()
.put("bootstrap_servers", KAFKA.getBootstrapServers())
.put("bootstrap_servers", bootstrapServers)
.put("subscription", subscriptionConfig)
.put("client_dns_lookup", "use_all_dns_ips")
.put("enable_auto_commit", false)
Expand All @@ -67,11 +75,15 @@ protected JsonNode getConfig() {
.build());
}

@Override
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
@BeforeAll
static public void setupContainer() {
KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.0"));
KAFKA.start();
}

@Override
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
topicName = Strings.addRandomSuffix("topic.test", "_", 10);
createTopic();
sendEvent();
}
Expand All @@ -87,7 +99,7 @@ private void sendEvent() throws ExecutionException, InterruptedException {
final ObjectNode event = mapper.createObjectNode();
event.put("test", "value");

producer.send(new ProducerRecord<>(TOPIC_NAME, event), (recordMetadata, exception) -> {
producer.send(new ProducerRecord<>(topicName, event), (recordMetadata, exception) -> {
if (exception != null) {
throw new RuntimeException("Cannot send message to Kafka. Error: " + exception.getMessage(), exception);
}
Expand All @@ -96,14 +108,18 @@ private void sendEvent() throws ExecutionException, InterruptedException {

private void createTopic() throws Exception {
try (final var admin = AdminClient.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers()))) {
final NewTopic topic = new NewTopic(TOPIC_NAME, 1, (short) 1);
final NewTopic topic = new NewTopic(topicName, 1, (short) 1);
admin.createTopics(Collections.singletonList(topic)).all().get();
}
}

@Override
protected void tearDown(final TestDestinationEnv testEnv) {
KAFKA.close();
try (final var admin = AdminClient.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers()))) {
admin.deleteTopics(List.of(topicName)).all().get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
Expand All @@ -114,7 +130,7 @@ protected ConnectorSpecification getSpec() throws Exception {
@Override
protected ConfiguredAirbyteCatalog getConfiguredCatalog() throws Exception {
final ConfiguredAirbyteStream streams =
CatalogHelpers.createConfiguredAirbyteStream(TOPIC_NAME, null, Field.of("value", JsonSchemaType.STRING));
CatalogHelpers.createConfiguredAirbyteStream(topicName, null, Field.of("value", JsonSchemaType.STRING));
streams.setSyncMode(SyncMode.FULL_REFRESH);
return new ConfiguredAirbyteCatalog().withStreams(Collections.singletonList(streams));
}
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ The Kafka source connector supports the following [sync modes](https://docs.airb

| Version | Date | Pull Request | Subject |
| :------ | :-------- | :------------------------------------------------------| :---------------------------------------- |
| 0.2.4 | 2024-02-13 | [35229](https://github.com/airbytehq/airbyte/pull/35229) | Adopt CDK 0.20.4 |
| 0.2.4 | 2024-01-24 | [34453](https://github.com/airbytehq/airbyte/pull/34453) | bump CDK version |
| 0.2.3 | 2022-12-06 | [19587](https://github.com/airbytehq/airbyte/pull/19587) | Fix missing data before consumer is closed |
| 0.2.2 | 2022-11-04 | [18648](https://github.com/airbytehq/airbyte/pull/18648) | Add missing record_count increment for JSON|
Expand Down

0 comments on commit 91c6799

Please sign in to comment.