Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

source-kafka: adopt CDK 0.20.4 #35229

Merged
merged 5 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 2 additions & 14 deletions airbyte-integrations/connectors/source-kafka/build.gradle
Original file line number Diff line number Diff line change
@@ -1,23 +1,13 @@
plugins {
id 'application'
id 'airbyte-java-connector'
}

airbyteJavaConnector {
cdkVersionRequired = '0.2.0'
cdkVersionRequired = '0.20.4'
features = ['db-sources']
useLocalCdk = false
}

//remove once upgrading the CDK version to 0.4.x or later
java {
compileJava {
options.compilerArgs.remove("-Werror")
}
}

airbyteJavaConnector.addCdkDependencies()

application {
mainClass = 'io.airbyte.integrations.source.kafka.KafkaSource'
applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0']
Expand All @@ -29,7 +19,5 @@ dependencies {
implementation 'org.apache.kafka:connect-json:3.2.1'
implementation 'io.confluent:kafka-avro-serializer:7.2.1'

testImplementation libs.testcontainers.kafka

integrationTestJavaImplementation libs.testcontainers.kafka
testImplementation 'org.testcontainers:kafka:1.19.4'
}
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-kafka/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: d917a47b-8537-4d0d-8c10-36a9928d4265
dockerImageTag: 0.2.3
dockerImageTag: 0.2.4
dockerRepository: airbyte/source-kafka
githubIssueLabel: source-kafka
icon: kafka.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
import com.google.common.collect.ImmutableMap;
import io.airbyte.cdk.integrations.standardtest.source.SourceAcceptanceTest;
import io.airbyte.cdk.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.cdk.integrations.util.HostPortResolver;
import io.airbyte.commons.jackson.MoreMappers;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.string.Strings;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.v0.CatalogHelpers;
Expand All @@ -22,6 +24,7 @@
import io.airbyte.protocol.models.v0.SyncMode;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.AdminClient;
Expand All @@ -32,16 +35,20 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.connect.json.JsonSerializer;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

@Disabled("need to fix docker container networking")
public class KafkaSourceAcceptanceTest extends SourceAcceptanceTest {

private static final ObjectMapper mapper = MoreMappers.initMapper();
private static final String TOPIC_NAME = "test.topic";

private static KafkaContainer KAFKA;

private String topicName;

@Override
protected String getImageName() {
return "airbyte/source-kafka:dev";
Expand All @@ -53,10 +60,11 @@ protected JsonNode getConfig() {
final ObjectNode subscriptionConfig = mapper.createObjectNode();
protocolConfig.put("security_protocol", KafkaProtocol.PLAINTEXT.toString());
subscriptionConfig.put("subscription_type", "subscribe");
subscriptionConfig.put("topic_pattern", TOPIC_NAME);
subscriptionConfig.put("topic_pattern", topicName);

var bootstrapServers = String.format("PLAINTEXT://%s:%d", HostPortResolver.resolveHost(KAFKA), HostPortResolver.resolvePort(KAFKA));
return Jsons.jsonNode(ImmutableMap.builder()
.put("bootstrap_servers", KAFKA.getBootstrapServers())
.put("bootstrap_servers", bootstrapServers)
.put("subscription", subscriptionConfig)
.put("client_dns_lookup", "use_all_dns_ips")
.put("enable_auto_commit", false)
Expand All @@ -67,11 +75,15 @@ protected JsonNode getConfig() {
.build());
}

@Override
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
@BeforeAll
static public void setupContainer() {
KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.0"));
KAFKA.start();
}

@Override
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
topicName = Strings.addRandomSuffix("topic.test", "_", 10);
createTopic();
sendEvent();
}
Expand All @@ -87,7 +99,7 @@ private void sendEvent() throws ExecutionException, InterruptedException {
final ObjectNode event = mapper.createObjectNode();
event.put("test", "value");

producer.send(new ProducerRecord<>(TOPIC_NAME, event), (recordMetadata, exception) -> {
producer.send(new ProducerRecord<>(topicName, event), (recordMetadata, exception) -> {
if (exception != null) {
throw new RuntimeException("Cannot send message to Kafka. Error: " + exception.getMessage(), exception);
}
Expand All @@ -96,14 +108,18 @@ private void sendEvent() throws ExecutionException, InterruptedException {

private void createTopic() throws Exception {
try (final var admin = AdminClient.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers()))) {
final NewTopic topic = new NewTopic(TOPIC_NAME, 1, (short) 1);
final NewTopic topic = new NewTopic(topicName, 1, (short) 1);
admin.createTopics(Collections.singletonList(topic)).all().get();
}
}

@Override
protected void tearDown(final TestDestinationEnv testEnv) {
KAFKA.close();
try (final var admin = AdminClient.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers()))) {
admin.deleteTopics(List.of(topicName)).all().get();
} catch (Exception e) {
throw new RuntimeException(e);
}
Comment on lines +120 to +122
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this do, just re-type the exception?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think tearDown is not supposed to throw any exceptions. But sometimes, the tearDown failing means the whole suit will be a problem, so we're throwing a RuntimeException. Not the cleanest thing, but we don't have a clean story around exception handling, either in tests or in production code

}

@Override
Expand All @@ -114,7 +130,7 @@ protected ConnectorSpecification getSpec() throws Exception {
@Override
protected ConfiguredAirbyteCatalog getConfiguredCatalog() throws Exception {
final ConfiguredAirbyteStream streams =
CatalogHelpers.createConfiguredAirbyteStream(TOPIC_NAME, null, Field.of("value", JsonSchemaType.STRING));
CatalogHelpers.createConfiguredAirbyteStream(topicName, null, Field.of("value", JsonSchemaType.STRING));
streams.setSyncMode(SyncMode.FULL_REFRESH);
return new ConfiguredAirbyteCatalog().withStreams(Collections.singletonList(streams));
}
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ The Kafka source connector supports the following [sync modes](https://docs.airb

| Version | Date | Pull Request | Subject |
| :------ | :-------- | :------------------------------------------------------| :---------------------------------------- |
| 0.2.4 | 2024-02-13 | [35229](https://github.com/airbytehq/airbyte/pull/35229) | Adopt CDK 0.20.4 |
| 0.2.4 | 2024-01-24 | [34453](https://github.com/airbytehq/airbyte/pull/34453) | bump CDK version |
| 0.2.3 | 2022-12-06 | [19587](https://github.com/airbytehq/airbyte/pull/19587) | Fix missing data before consumer is closed |
| 0.2.2 | 2022-11-04 | [18648](https://github.com/airbytehq/airbyte/pull/18648) | Add missing record_count increment for JSON|
Expand Down
Loading