Skip to content

Commit

Permalink
Merge pull request #66 from SolaceDev/upgrade/kafka-3.x
Browse files Browse the repository at this point in the history
3.0.0 Release
  • Loading branch information
Nephery authored Aug 25, 2023
2 parents 482b471 + ca29115 commit da2da48
Show file tree
Hide file tree
Showing 19 changed files with 89 additions and 68 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,18 @@ jobs:
restore-keys: |
${{ runner.os }}-gradle-build-test-
- name: Setup JDK 8
- name: Setup JDK 11
uses: actions/setup-java@v2
with:
distribution: zulu
java-version: 8
java-version: 11

- name: Validate Gradle wrapper
uses: gradle/wrapper-validation-action@v1

- name: Install Test Support
working-directory: solace-integration-test-support
run: ./mvnw clean install -DskipTests
run: ./mvnw clean install -DskipTests -Dchangelist=

- name: Build and test with Gradle
run: ./gradlew clean test integrationTest jacocoFullReport --info
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ jobs:
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@v1
uses: github/codeql-action/init@v2
with:
languages: java
# If you wish to specify custom queries, you can do so here or in a config file.
Expand All @@ -72,7 +72,7 @@ jobs:
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
uses: github/codeql-action/autobuild@v1
uses: github/codeql-action/autobuild@v2

# ℹ️ Command-line programs to run using the OS shell.
# 📚 https://git.io/JvXDl
Expand All @@ -86,7 +86,7 @@ jobs:
# make release

- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v1
uses: github/codeql-action/analyze@v2

- name: Cleanup Gradle Cache
# Remove some files from the Gradle cache, so they aren't cached by GitHub Actions.
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/pmd-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ jobs:
restore-keys: |
${{ runner.os }}-gradle-pmd-
- name: Setup JDK 8
- name: Setup JDK 11
uses: actions/setup-java@v2
with:
distribution: zulu
java-version: 8
java-version: 11

- name: Validate Gradle wrapper
uses: gradle/wrapper-validation-action@v1
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/spotbugs-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ jobs:
restore-keys: |
${{ runner.os }}-gradle-spotbugs-
- name: Setup JDK 8
- name: Setup JDK 11
uses: actions/setup-java@v2
with:
distribution: zulu
java-version: 8
java-version: 11

- name: Validate Gradle wrapper
uses: gradle/wrapper-validation-action@v1
Expand Down
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ Refer to the in-line documentation of the [sample PubSub+ Kafka Source Connector

### Deployment

The PubSub+ Source Connector deployment has been tested on Apache Kafka 2.4 and Confluent Kafka 5.4 platforms. The Kafka software is typically placed under the root directory: `/opt/<provider>/<kafka or confluent-version>`.
The PubSub+ Source Connector deployment has been tested on Apache Kafka 3.5 and Confluent Kafka 7.4 platforms. The Kafka software is typically placed under the root directory: `/opt/<provider>/<kafka or confluent-version>`.

Kafka distributions may be available as install bundles, Docker images, Kubernetes deployments, etc. They all support Kafka Connect which includes the scripts, tools and sample properties for Kafka connectors.

Expand All @@ -156,7 +156,7 @@ In this case the IP address is one of the nodes running the distributed mode wor
{
"class": "com.solace.connector.kafka.connect.source.SolaceSourceConnector",
"type": "source",
"version": "2.1.0"
"version": "3.0.0"
},
```

Expand Down Expand Up @@ -317,7 +317,7 @@ Kerberos has some very specific requirements to operate correctly. Some addition

### Build the Project

JDK 8 or higher is required for this project.
JDK 11 or higher is required for this project.

1. First, clone this GitHub repo:
```shell
Expand All @@ -328,7 +328,7 @@ JDK 8 or higher is required for this project.
```shell
git submodule update --init --recursive
cd solace-integration-test-support
./mvnw clean install -DskipTests
./mvnw clean install -DskipTests -Dchangelist=
cd ..
```
3. Then run the build script:
Expand Down Expand Up @@ -358,13 +358,13 @@ To get started, import the following dependency into your project:
<dependency>
<groupId>com.solace.connector.kafka.connect</groupId>
<artifactId>pubsubplus-connector-kafka-source</artifactId>
<version>2.1.0</version>
<version>3.0.0</version>
</dependency>
```

**Gradle**
```groovy
compile "com.solace.connector.kafka.connect:pubsubplus-connector-kafka-source:2.1.0"
compile "com.solace.connector.kafka.connect:pubsubplus-connector-kafka-source:3.0.0"
```

Now you can implement your custom `SolMessageProcessorIF`.
Expand Down
19 changes: 16 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ plugins {
}

ext {
kafkaVersion = '2.8.1'
solaceJavaAPIVersion = '10.15.0'
kafkaVersion = '3.5.1'
solaceJavaAPIVersion = '10.21.0'
isSnapshot = project.version.endsWith('-SNAPSHOT')
}

Expand Down Expand Up @@ -44,7 +44,7 @@ dependencies {
integrationTestImplementation 'org.testcontainers:junit-jupiter:1.16.0'
integrationTestImplementation 'org.testcontainers:kafka:1.16.0'
integrationTestImplementation 'org.testcontainers:toxiproxy:1.16.0'
integrationTestImplementation 'com.solace.test.integration:pubsubplus-junit-jupiter:0.7.1'
integrationTestImplementation 'com.solace.test.integration:pubsubplus-junit-jupiter:1.1.0'
integrationTestImplementation 'org.slf4j:slf4j-api:1.7.32'
integrationTestImplementation 'org.apache.logging.log4j:log4j-slf4j-impl:2.16.0'
integrationTestImplementation 'org.apache.commons:commons-configuration2:2.6'
Expand All @@ -60,6 +60,7 @@ dependencies {
testImplementation 'org.apache.logging.log4j:log4j-slf4j-impl:2.16.0'
implementation "org.apache.kafka:connect-api:$kafkaVersion"
implementation "com.solacesystems:sol-jcsmp:$solaceJavaAPIVersion"
implementation "org.slf4j:slf4j-api:1.7.36"
}

pmd {
Expand Down Expand Up @@ -198,6 +199,18 @@ task('generateJava', type: Copy) {

project.compileJava {
dependsOn generateJava
sourceCompatibility '11'
targetCompatibility '11'
}

project.compileTestJava {
sourceCompatibility '11'
targetCompatibility '11'
}

project.compileIntegrationTestJava {
sourceCompatibility '11'
targetCompatibility '11'
}

java {
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
group=com.solace.connector.kafka.connect
version=2.3.0
version=3.0.0
2 changes: 1 addition & 1 deletion solace-integration-test-support
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,17 @@ public SolaceConnectorDeployment(KafkaConnection kafkaConnection, String kafkaTo

public void waitForConnectorRestIFUp() {
Request request = new Request.Builder().url(kafkaConnection.getConnectUrl() + "/connector-plugins").build();
Response response = null;
do {
try {
Thread.sleep(1000L);
response = client.newCall(request).execute();
} catch (IOException | InterruptedException e) {
// Continue looping
}
} while (response == null || !response.isSuccessful());
assertTimeoutPreemptively(Duration.ofMinutes(15), () -> {
Response response = null;
do {
try {
Thread.sleep(1000L);
response = client.newCall(request).execute();
} catch (IOException | InterruptedException e) {
logger.error("Failed to get connector-plugins", e);
}
} while (response == null || !response.isSuccessful());
});
}

void startConnector(Properties props) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
import com.solace.connector.kafka.connect.source.SolMessageProcessorIF;
import com.solace.connector.kafka.connect.source.SolaceSourceConstants;
import com.solace.connector.kafka.connect.source.SolaceSourceTask;
import com.solace.connector.kafka.connect.source.it.util.extensions.NetworkPubSubPlusExtension;
import com.solace.connector.kafka.connect.source.msgprocessors.SolSampleSimpleMessageProcessor;
import com.solace.test.integration.junit.jupiter.extension.ExecutorServiceExtension;
import com.solace.test.integration.junit.jupiter.extension.ExecutorServiceExtension.ExecSvc;
import com.solace.test.integration.junit.jupiter.extension.LogCaptorExtension;
import com.solace.test.integration.junit.jupiter.extension.LogCaptorExtension.LogCaptor;
import com.solace.test.integration.junit.jupiter.extension.PubSubPlusExtension;
import com.solace.test.integration.semp.v2.SempV2Api;
import com.solacesystems.jcsmp.BytesXMLMessage;
import com.solacesystems.jcsmp.JCSMPErrorResponseException;
Expand Down Expand Up @@ -52,7 +52,7 @@

@ExtendWith(ExecutorServiceExtension.class)
@ExtendWith(LogCaptorExtension.class)
@ExtendWith(NetworkPubSubPlusExtension.class)
@ExtendWith(PubSubPlusExtension.class)
public class SolaceSourceTaskIT {
private SolaceSourceTask solaceSourceTask;
private Map<String, String> connectorProperties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
import com.solace.connector.kafka.connect.source.it.util.extensions.KafkaArgumentsProvider;
import com.solace.connector.kafka.connect.source.it.util.extensions.KafkaArgumentsProvider.KafkaArgumentSource;
import com.solace.connector.kafka.connect.source.it.util.extensions.KafkaArgumentsProvider.KafkaContext;
import com.solace.connector.kafka.connect.source.it.util.extensions.NetworkPubSubPlusExtension;
import com.solace.connector.kafka.connect.source.it.util.extensions.pubsubplus.pubsubplus.NetworkPubSubPlusContainerProvider;
import com.solace.test.integration.junit.jupiter.extension.PubSubPlusExtension;
import com.solacesystems.jcsmp.BytesMessage;
import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jcsmp.JCSMPProperties;
Expand Down Expand Up @@ -44,7 +45,7 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;

@ExtendWith(NetworkPubSubPlusExtension.class)
@ExtendWith(PubSubPlusExtension.class)
@ExtendWith(KafkaArgumentsProvider.AutoDeleteSolaceConnectorDeploymentAfterEach.class)
public class SourceConnectorIT implements TestConstants {

Expand All @@ -64,7 +65,7 @@ static void setUp(JCSMPSession jcsmpSession) throws Exception {
@BeforeEach
public void beforeEach(JCSMPProperties jcsmpProperties) {
connectorProps = new Properties();
connectorProps.setProperty(SolaceSourceConstants.SOL_HOST, String.format("tcp://%s:55555", NetworkPubSubPlusExtension.DOCKER_NET_PUBSUB_ALIAS));
connectorProps.setProperty(SolaceSourceConstants.SOL_HOST, String.format("tcp://%s:55555", NetworkPubSubPlusContainerProvider.DOCKER_NET_PUBSUB_ALIAS));
connectorProps.setProperty(SolaceSourceConstants.SOL_USERNAME, jcsmpProperties.getStringProperty(JCSMPProperties.USERNAME));
connectorProps.setProperty(SolaceSourceConstants.SOL_PASSWORD, jcsmpProperties.getStringProperty(JCSMPProperties.PASSWORD));
connectorProps.setProperty(SolaceSourceConstants.SOL_VPN_NAME, jcsmpProperties.getStringProperty(JCSMPProperties.VPN_NAME));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.solace.connector.kafka.connect.source.it.SolaceConnectorDeployment;
import com.solace.connector.kafka.connect.source.it.util.KafkaConnection;
import com.solace.connector.kafka.connect.source.it.util.extensions.pubsubplus.pubsubplus.NetworkPubSubPlusContainerProvider;
import com.solace.connector.kafka.connect.source.it.util.testcontainers.BitnamiKafkaConnectContainer;
import com.solace.connector.kafka.connect.source.it.util.testcontainers.ConfluentKafkaConnectContainer;
import com.solace.connector.kafka.connect.source.it.util.testcontainers.ConfluentKafkaControlCenterContainer;
Expand Down Expand Up @@ -55,7 +56,7 @@ public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
.getOrComputeIfAbsent(BitnamiResource.class, c -> {
LOG.info("Creating Bitnami Kafka");
BitnamiKafkaConnectContainer container = new BitnamiKafkaConnectContainer()
.withNetwork(NetworkPubSubPlusExtension.DOCKER_NET);
.withNetwork(NetworkPubSubPlusContainerProvider.DOCKER_NET);
if (!container.isCreated()) {
container.start();
}
Expand All @@ -69,7 +70,7 @@ public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
LOG.info("Creating Confluent Kafka");
KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka")
.withTag("6.2.1"))
.withNetwork(NetworkPubSubPlusExtension.DOCKER_NET)
.withNetwork(NetworkPubSubPlusContainerProvider.DOCKER_NET)
.withNetworkAliases("kafka");
if (!kafkaContainer.isCreated()) {
kafkaContainer.start();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.solace.connector.kafka.connect.source.it.util.extensions.pubsubplus.pubsubplus;

import com.solace.test.integration.junit.jupiter.extension.pubsubplus.provider.container.SimpleContainerProvider;
import com.solace.test.integration.testcontainer.PubSubPlusContainer;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.testcontainers.containers.Network;

import java.util.function.Supplier;

public class NetworkPubSubPlusContainerProvider extends SimpleContainerProvider {
public static final Network DOCKER_NET = Network.newNetwork();
public static final String DOCKER_NET_PUBSUB_ALIAS = "solace-pubsubplus";

@Override
public Supplier<PubSubPlusContainer> containerSupplier(ExtensionContext extensionContext) {
return () -> new PubSubPlusContainer()
.withNetwork(DOCKER_NET)
.withNetworkAliases(DOCKER_NET_PUBSUB_ALIAS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,20 @@
import org.testcontainers.utility.DockerImageName;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Comparator;

public class BitnamiKafkaConnectContainer extends GenericContainer<BitnamiKafkaConnectContainer> {
private static final String BROKER_LISTENER_NAME = "PLAINTEXT";
private static final int BROKER_LISTENER_PORT = 9092;
private static final String BOOTSTRAP_LISTENER_NAME = "PLAINTEXT_HOST";
public static final int BOOTSTRAP_LISTENER_PORT = 29092;
public static final int CONNECT_PORT = 28083;
public static final int CONNECT_PORT = 8083;
private static final int ZOOKEEPER_PORT = 2181;
private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("bitnami/kafka");
private static final String DEFAULT_IMAGE_TAG = "2";
private static final String DEFAULT_IMAGE_TAG = "3.5";
private static final String STARTER_SCRIPT = "/testcontainers_start.sh";
private DockerImageName zookeeperDockerImageName = DockerImageName.parse("bitnami/zookeeper:3");
private DockerImageName zookeeperDockerImageName = DockerImageName.parse("bitnami/zookeeper:3.8");
private GenericContainer<?> zookeeperContainer;

public BitnamiKafkaConnectContainer() {
Expand All @@ -47,7 +48,8 @@ public BitnamiKafkaConnectContainer(DockerImageName dockerImageName) {
BROKER_LISTENER_NAME + "://:" + BROKER_LISTENER_PORT, BOOTSTRAP_LISTENER_NAME + "://:" + BOOTSTRAP_LISTENER_PORT));
withClasspathResourceMapping(Tools.getUnzippedConnectorDirName() + "/lib",
"/opt/bitnami/kafka/jars/pubsubplus-connector-kafka", BindMode.READ_ONLY);
waitingFor(Wait.forLogMessage(".*Finished starting connectors and tasks.*", 1));
waitingFor(Wait.forLogMessage(".*Finished starting connectors and tasks.*", 1)
.withStartupTimeout(Duration.ofMinutes(10)));
}

@Override
Expand Down
Loading

0 comments on commit da2da48

Please sign in to comment.