From 8e1d3924771ec48566ccd472bee7f48cd84f2808 Mon Sep 17 00:00:00 2001 From: Anush Date: Tue, 25 Jun 2024 11:45:51 +0530 Subject: [PATCH] feat: int test and build (#1) --- .github/workflows/test.yml | 34 ++++ build.gradle | 175 +++++++++++------- .../io/qdrant/kafka/BaseKafkaConnectTest.java | 143 ++++++++++++++ .../java/io/qdrant/kafka/BaseQdrantTest.java | 76 ++++++++ .../qdrant/kafka/QdrantSinkConnectorTest.java | 33 ++++ .../io/qdrant/kafka/QdrantSinkConfigTest.java | 8 +- 6 files changed, 403 insertions(+), 66 deletions(-) create mode 100644 .github/workflows/test.yml create mode 100644 src/intTest/java/io/qdrant/kafka/BaseKafkaConnectTest.java create mode 100644 src/intTest/java/io/qdrant/kafka/BaseQdrantTest.java create mode 100644 src/intTest/java/io/qdrant/kafka/QdrantSinkConnectorTest.java diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..753613d --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,34 @@ +name: Test + +on: + pull_request: + types: + - opened + - edited + - synchronize + - reopened + +permissions: + contents: write + checks: write + +jobs: + build: + name: Build + runs-on: ubuntu-latest + + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up JDK 17 + uses: actions/setup-java@v3 + with: + java-version: '17' + distribution: 'temurin' + + - name: Build And Test + uses: gradle/gradle-build-action@v2 + with: + gradle-version: 8.5 + arguments: build \ No newline at end of file diff --git a/build.gradle b/build.gradle index d831688..01fec7b 100644 --- a/build.gradle +++ b/build.gradle @@ -2,98 +2,149 @@ import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; plugins { - id 'java' - id 'java-library' - id 'maven-publish' + id 'java' + id 'java-library' + id 'maven-publish' - id 'com.github.johnrengelman.shadow' version '8.1.1' + id "com.diffplug.spotless" version "6.22.0" + id 'com.github.johnrengelman.shadow' version '8.1.1' } group = 'io.qdrant' version = '1.0.0' -description = 'qdrant-kafka' +description = 'Kafka Sink Connector for Qdrant.' java.sourceCompatibility = JavaVersion.VERSION_1_8 +java.targetCompatibility = JavaVersion.VERSION_1_8 repositories { - mavenLocal() - mavenCentral() - maven { - url = uri('https://packages.confluent.io/maven/') - } + mavenLocal() + mavenCentral() + maven { + url = uri('https://packages.confluent.io/maven/') + } } -dependencies { - implementation 'org.apache.kafka:connect-api:3.7.0' - - implementation 'io.qdrant:client:1.9.1' - implementation 'io.grpc:grpc-protobuf:1.59.0' - implementation 'com.google.guava:guava:33.2.1-jre' +sourceSets { + intTest { + compileClasspath += sourceSets.main.output + runtimeClasspath += sourceSets.main.output + } +} - implementation 'com.fasterxml.jackson.core:jackson-databind:2.17.1' - implementation 'com.google.protobuf:protobuf-java-util:3.25.3' - implementation 'org.slf4j:slf4j-api:2.0.13' +configurations { + intTestImplementation.extendsFrom implementation + intTestImplementation.extendsFrom testImplementation + intTestRuntimeOnly.extendsFrom runtimeOnly + intTestRuntimeOnly.extendsFrom testRuntimeOnly +} - testImplementation 'org.junit.jupiter:junit-jupiter-api:5.10.2' - testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.10.2' +def kafkaVersion = '3.5.0' +dependencies { + implementation "org.apache.kafka:connect-api:$kafkaVersion" + implementation 'io.qdrant:client:1.9.1' + implementation 'io.grpc:grpc-protobuf:1.59.0' + implementation 'com.google.guava:guava:33.2.1-jre' + implementation 'com.fasterxml.jackson.core:jackson-databind:2.17.1' + implementation 'com.google.protobuf:protobuf-java-util:3.25.3' + implementation 'org.slf4j:slf4j-api:2.0.13' + + testImplementation 'org.junit.jupiter:junit-jupiter-api:5.10.2' + testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.10.2' + + intTestImplementation 'org.testcontainers:junit-jupiter:1.19.6' + intTestImplementation 'org.testcontainers:qdrant:1.19.6' + intTestImplementation "org.apache.kafka:kafka_2.13:$kafkaVersion" + intTestImplementation "org.apache.kafka:kafka_2.13:$kafkaVersion:test" + intTestImplementation "org.apache.kafka:kafka-clients:$kafkaVersion" + intTestImplementation "org.apache.kafka:connect-runtime:$kafkaVersion" + intTestImplementation "org.apache.kafka:kafka_2.13:$kafkaVersion" + intTestImplementation "org.apache.kafka:kafka_2.13:$kafkaVersion:test" + intTestImplementation "org.apache.kafka:kafka-clients:$kafkaVersion:test" + intTestImplementation "org.apache.kafka:connect-runtime:$kafkaVersion:test" } - java { - withSourcesJar() - withJavadocJar() + withSourcesJar() + withJavadocJar() } test { useJUnitPlatform() } +spotless { + java { + importOrder() + removeUnusedImports() + cleanthat() + googleJavaFormat() + formatAnnotations() + } +} + shadowJar { - minimize() - mergeServiceFiles() - archiveClassifier.set('confluent') + mergeServiceFiles() + archiveClassifier.set('') } ext.releaseDate = DateTimeFormatter.ISO_LOCAL_DATE.format(LocalDateTime.now()) def archiveFilename = 'qdrant-kafka' task prepareConfluentArchive(type: Copy) { - group = 'Confluent' - dependsOn 'shadowJar' - - def baseDir = "$archiveFilename-${project.version}" - from('archive/manifest.json') { - expand project.properties - destinationDir = file "$buildDir/confluentArchive/$baseDir" - } - - from('archive/assets') { - into 'assets' - } - - from('archive/etc') { - include 'qdrant-kafka.properties' - into 'etc' - } - - from("$buildDir/libs") { - include "${project.name}-${project.version}-confluent.jar" - into 'lib' - } - - from('.') { - include 'README.md', 'LICENSE' - into 'doc' - } + group = 'Confluent' + dependsOn 'shadowJar' + + def baseDir = "$archiveFilename-${project.version}" + from('archive/manifest.json') { + expand project.properties + destinationDir = file "$buildDir/confluentArchive/$baseDir" + } + + from('archive/assets') { + into 'assets' + } + + from('archive/etc') { + include 'qdrant-kafka.properties' + into 'etc' + } + + from("$buildDir/libs") { + include "${project.name}-${project.version}.jar" + into 'lib' + } + + from('.') { + include 'README.md', 'LICENSE' + into 'doc' + } } task createConfluentArchive(type: Zip) { - group = 'Confluent' - dependsOn prepareConfluentArchive - from "$buildDir/confluentArchive" - archiveBaseName.set('') - archiveAppendix.set(archiveFilename) - archiveVersion.set(project.version.toString()) - destinationDirectory.set(file("$buildDir/confluent")) -} \ No newline at end of file + group = 'Confluent' + dependsOn prepareConfluentArchive + from "$buildDir/confluentArchive" + archiveBaseName.set('') + archiveAppendix.set(archiveFilename) + archiveVersion.set(project.version.toString()) + destinationDirectory.set(file("$buildDir/confluent")) +} + +tasks.register('integrationTest', Test) { + description = 'Runs integration tests.' + group = 'verification' + + testClassesDirs = sourceSets.intTest.output.classesDirs + classpath = sourceSets.intTest.runtimeClasspath + shouldRunAfter test + + useJUnitPlatform() + + testLogging { + events "passed" + } +} + +check.dependsOn integrationTest diff --git a/src/intTest/java/io/qdrant/kafka/BaseKafkaConnectTest.java b/src/intTest/java/io/qdrant/kafka/BaseKafkaConnectTest.java new file mode 100644 index 0000000..52def4b --- /dev/null +++ b/src/intTest/java/io/qdrant/kafka/BaseKafkaConnectTest.java @@ -0,0 +1,143 @@ +/* (C)2024 */ +package io.qdrant.kafka; + +import static org.apache.kafka.connect.json.JsonConverterConfig.SCHEMAS_ENABLE_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.DeleteTopicsResult; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.runtime.AbstractStatus; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BaseKafkaConnectTest extends BaseQdrantTest { + + static final Logger LOGGER = LoggerFactory.getLogger(BaseKafkaConnectTest.class); + + EmbeddedConnectCluster connect; + + final String topicName; + + final String connectorName; + + protected BaseKafkaConnectTest(final String topicName, final String connectorName) { + this.topicName = topicName; + this.connectorName = connectorName; + } + + @BeforeEach + void startConnect() { + connect = new EmbeddedConnectCluster.Builder().name("qdrant-it-connect-cluster").build(); + connect.start(); + connect.kafka().createTopic(topicName); + } + + @AfterEach + void stopConnect() { + try (final Admin admin = connect.kafka().createAdminClient()) { + final DeleteTopicsResult result = admin.deleteTopics(Arrays.asList(topicName)); + result.all().get(); + } catch (final ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + connect.stop(); + } + + long waitForConnectorToStart(final String name, final int numTasks) throws InterruptedException { + TestUtils.waitForCondition( + () -> assertConnectorAndTasksRunning(name, numTasks).orElse(false), + TimeUnit.MINUTES.toMillis(60), + "Connector tasks did not start in time."); + return System.currentTimeMillis(); + } + + Optional assertConnectorAndTasksRunning(final String connectorName, final int numTasks) { + try { + final ConnectorStateInfo info = connect.connectorStatus(connectorName); + final boolean result = + info != null + && info.tasks().size() >= numTasks + && info.connector().state().equals(AbstractStatus.State.RUNNING.toString()) + && info.tasks().stream() + .allMatch(s -> s.state().equals(AbstractStatus.State.RUNNING.toString())); + return Optional.of(result); + } catch (final Exception e) { + LOGGER.error("Could not check connector state info."); + return Optional.empty(); + } + } + + Map connectorProperties() { + final Map props = new HashMap<>(getDefaultProperties()); + props.put(CONNECTOR_CLASS_CONFIG, QdrantSinkConnector.class.getName()); + props.put(TOPICS_CONFIG, topicName); + props.put(TASKS_MAX_CONFIG, Integer.toString(1)); + props.put(KEY_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); + props.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); + props.put("value.converter." + SCHEMAS_ENABLE_CONFIG, "false"); + props.put("key.converter." + SCHEMAS_ENABLE_CONFIG, "false"); + return props; + } + + void writeUnnamedPoint(String collectionName, int id) { + connect + .kafka() + .produce( + topicName, + String.format( + "{\n" + + // + " \"collection_name\": \"%s\",\n" + + // + " \"id\": %d,\n" + + // + " \"vector\": [\n" + + // + " 0.1,\n" + + // + " 0.2,\n" + + // + " 0.3,\n" + + // + " 0.4,\n" + + // + " 0.5,\n" + + // + " 0.6,\n" + + // + " 0.7,\n" + + // + " 0.8\n" + + // + " ],\n" + + // + " \"payload\": {\n" + + // + " \"name\": \"kafka\",\n" + + // + " \"description\": \"Kafka is a distributed streaming platform for all\",\n" + + // + " \"url\": \"https://kafka.apache.com/\"\n" + + // + " }\n" + + // + "}", + collectionName, id)); + } +} diff --git a/src/intTest/java/io/qdrant/kafka/BaseQdrantTest.java b/src/intTest/java/io/qdrant/kafka/BaseQdrantTest.java new file mode 100644 index 0000000..9268355 --- /dev/null +++ b/src/intTest/java/io/qdrant/kafka/BaseQdrantTest.java @@ -0,0 +1,76 @@ +/* (C)2024 */ +package io.qdrant.kafka; + +import static io.qdrant.kafka.QdrantSinkConfig.GRPC_URL; + +import io.qdrant.client.QdrantClient; +import io.qdrant.client.QdrantGrpcClient; +import io.qdrant.client.grpc.Collections.Distance; +import io.qdrant.client.grpc.Collections.VectorParams; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.qdrant.QdrantContainer; + +@Testcontainers +public class BaseQdrantTest { + + @Container static QdrantContainer qdrantContainer = new QdrantContainer(getQdrantImage()); + + QdrantClient qdrantClient; + + String unnamedVecCollection = "unnamed-vec-collection"; + String namedVecCollection = "named-vec-collection"; + String sparseVecCollection = "sparse-vec-collection"; + + @BeforeEach + void setup() throws Exception { + qdrantClient = + new QdrantClient( + QdrantGrpcClient.newBuilder( + qdrantContainer.getHost(), qdrantContainer.getMappedPort(6334), false) + .build()); + + qdrantClient + .createCollectionAsync( + unnamedVecCollection, + VectorParams.newBuilder().setSize(8).setDistance(Distance.Dot).build()) + .get(); + } + + @AfterEach + void tearDown() { + if (Objects.nonNull(qdrantClient)) { + qdrantClient.deleteCollectionAsync(unnamedVecCollection); + qdrantClient.close(); + } + } + + protected void waitForPoints(final String collectionName, final int expectedPoints) + throws InterruptedException { + TestUtils.waitForCondition( + () -> { + return expectedPoints == qdrantClient.countAsync(collectionName).get(); + }, + TimeUnit.MINUTES.toMillis(1), + String.format("Could not find %d points in time.", expectedPoints)); + } + + private static String getQdrantImage() { + return "qdrant/qdrant:latest"; + } + + protected Map getDefaultProperties() { + Map properties = new HashMap<>(); + + properties.put(GRPC_URL, "http://" + qdrantContainer.getGrpcHostAddress()); + + return properties; + } +} diff --git a/src/intTest/java/io/qdrant/kafka/QdrantSinkConnectorTest.java b/src/intTest/java/io/qdrant/kafka/QdrantSinkConnectorTest.java new file mode 100644 index 0000000..0f76f87 --- /dev/null +++ b/src/intTest/java/io/qdrant/kafka/QdrantSinkConnectorTest.java @@ -0,0 +1,33 @@ +/* (C)2024 */ +package io.qdrant.kafka; + +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class QdrantSinkConnectorTest extends BaseKafkaConnectTest { + + static final Logger LOGGER = LoggerFactory.getLogger(QdrantSinkConnectorTest.class); + + static final String CONNECTOR_NAME = "qdrant-sink-connector"; + + static final String TOPIC_NAME = "qdrant-topic"; + + public QdrantSinkConnectorTest() { + super(TOPIC_NAME, CONNECTOR_NAME); + } + + @Test + public void testUnnamedVector() throws Exception { + connect.configureConnector(CONNECTOR_NAME, connectorProperties()); + waitForConnectorToStart(CONNECTOR_NAME, 1); + + int pointsCount = 2; + + for (int i = 0; i < pointsCount; i++) { + writeUnnamedPoint(unnamedVecCollection, i); + } + + waitForPoints(unnamedVecCollection, pointsCount); + } +} diff --git a/src/test/java/io/qdrant/kafka/QdrantSinkConfigTest.java b/src/test/java/io/qdrant/kafka/QdrantSinkConfigTest.java index b140a03..f49832c 100644 --- a/src/test/java/io/qdrant/kafka/QdrantSinkConfigTest.java +++ b/src/test/java/io/qdrant/kafka/QdrantSinkConfigTest.java @@ -1,3 +1,4 @@ +/* (C)2024 */ package io.qdrant.kafka; import static org.junit.jupiter.api.Assertions.*; @@ -13,7 +14,7 @@ class QdrantSinkConfigTest { @Test void testDefaultConfigValues() { QdrantSinkConfig config = new QdrantSinkConfig(new HashMap<>()); - assertEquals("http://localhost:6334/", config.getGrpcUrl()); + assertEquals("http://localhost:6334", config.getGrpcUrl()); assertEquals(new Password(""), config.getApiKey()); } @@ -34,7 +35,7 @@ void testMissingConfigValues() { customConfig.put(QdrantSinkConfig.API_KEY, "custom-api-key"); QdrantSinkConfig config = new QdrantSinkConfig(customConfig); - assertEquals("http://localhost:6334/", config.getGrpcUrl()); + assertEquals("http://localhost:6334", config.getGrpcUrl()); assertEquals(new Password("custom-api-key"), config.getApiKey()); } @@ -43,8 +44,7 @@ void testConfigDef() { ConfigDef configDef = QdrantSinkConfig.conf(); assertNotNull(configDef); - assertEquals( - "http://localhost:6334/", configDef.defaultValues().get(QdrantSinkConfig.GRPC_URL)); + assertEquals("http://localhost:6334", configDef.defaultValues().get(QdrantSinkConfig.GRPC_URL)); assertEquals("", ((Password) configDef.defaultValues().get(QdrantSinkConfig.API_KEY)).value()); }