Skip to content

Commit

Permalink
Merge pull request #68 from SolaceDev/upgrade/kafka-3.x
Browse files Browse the repository at this point in the history
3.0.0 Release
  • Loading branch information
Nephery authored Aug 25, 2023
2 parents e6bc6d3 + b17512a commit 7c18846
Show file tree
Hide file tree
Showing 21 changed files with 112 additions and 94 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ on:
pull_request:

push:

workflow_dispatch:

jobs:
Expand Down Expand Up @@ -44,18 +44,18 @@ jobs:
restore-keys: |
${{ runner.os }}-gradle-build-test-
- name: Setup JDK 8
- name: Setup JDK 11
uses: actions/setup-java@v2
with:
distribution: zulu
java-version: 8
java-version: 11

- name: Validate Gradle wrapper
uses: gradle/wrapper-validation-action@v1

- name: Install Test Support
working-directory: solace-integration-test-support
run: ./mvnw clean install -DskipTests
run: ./mvnw clean install -DskipTests -Dchangelist=

- name: Build and test with Gradle
run: ./gradlew clean test integrationTest jacocoFullReport --info
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ jobs:
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@v1
uses: github/codeql-action/init@v2
with:
languages: java
# If you wish to specify custom queries, you can do so here or in a config file.
Expand All @@ -72,7 +72,7 @@ jobs:
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
uses: github/codeql-action/autobuild@v1
uses: github/codeql-action/autobuild@v2

# ℹ️ Command-line programs to run using the OS shell.
# 📚 https://git.io/JvXDl
Expand All @@ -86,7 +86,7 @@ jobs:
# make release

- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v1
uses: github/codeql-action/analyze@v2

- name: Cleanup Gradle Cache
# Remove some files from the Gradle cache, so they aren't cached by GitHub Actions.
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/pmd-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ jobs:
restore-keys: |
${{ runner.os }}-gradle-pmd-
- name: Setup JDK 8
- name: Setup JDK 11
uses: actions/setup-java@v2
with:
distribution: zulu
java-version: 8
java-version: 11

- name: Validate Gradle wrapper
uses: gradle/wrapper-validation-action@v1
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/spotbugs-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ jobs:
restore-keys: |
${{ runner.os }}-gradle-spotbugs-
- name: Setup JDK 8
- name: Setup JDK 11
uses: actions/setup-java@v2
with:
distribution: zulu
java-version: 8
java-version: 11

- name: Validate Gradle wrapper
uses: gradle/wrapper-validation-action@v1
Expand Down
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ Refer to the in-line documentation of the [sample PubSub+ Kafka Sink Connector p

### Deployment

The PubSub+ Sink Connector deployment has been tested on Apache Kafka 2.4 and Confluent Kafka 5.4 platforms. The Kafka software is typically placed under the root directory: `/opt/<provider>/<kafka or confluent-version>`.
The PubSub+ Sink Connector deployment has been tested on Apache Kafka 3.5 and Confluent Kafka 7.4 platforms. The Kafka software is typically placed under the root directory: `/opt/<provider>/<kafka or confluent-version>`.

Kafka distributions may be available as install bundles, Docker images, Kubernetes deployments, etc. They all support Kafka Connect which includes the scripts, tools and sample properties for Kafka connectors.

Expand All @@ -157,7 +157,7 @@ In this case the IP address is one of the nodes running the distributed mode wor
{
"class": "com.solace.connector.kafka.connect.sink.SolaceSinkConnector",
"type": "sink",
"version": "2.1.0"
"version": "3.0.0"
},
```

Expand Down Expand Up @@ -335,7 +335,7 @@ Kerberos has some very specific requirements to operate correctly. Some addition

### Build the Project

JDK 8 or higher is required for this project.
JDK 11 or higher is required for this project.

1. First, clone this GitHub repo:
```shell
Expand All @@ -346,7 +346,7 @@ JDK 8 or higher is required for this project.
```shell
git submodule update --init --recursive
cd solace-integration-test-support
./mvnw clean install -DskipTests
./mvnw clean install -DskipTests -Dchangelist=
cd ..
```
3. Then run the build script:
Expand Down Expand Up @@ -376,13 +376,13 @@ To get started, import the following dependency into your project:
<dependency>
<groupId>com.solace.connector.kafka.connect</groupId>
<artifactId>pubsubplus-connector-kafka-sink</artifactId>
<version>2.2.0</version>
<version>3.0.0</version>
</dependency>
```

**Gradle**
```groovy
compile "com.solace.connector.kafka.connect:pubsubplus-connector-kafka-sink:2.2.0"
compile "com.solace.connector.kafka.connect:pubsubplus-connector-kafka-sink:3.0.0"
```

Now you can implement your custom `SolRecordProcessorIF`.
Expand Down
19 changes: 16 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ plugins {
}

ext {
kafkaVersion = '2.8.1'
solaceJavaAPIVersion = '10.15.0'
kafkaVersion = '3.5.1'
solaceJavaAPIVersion = '10.21.0'
isSnapshot = project.version.endsWith('-SNAPSHOT')
}

Expand Down Expand Up @@ -45,7 +45,7 @@ dependencies {
integrationTestImplementation 'org.testcontainers:junit-jupiter:1.17.3'
integrationTestImplementation 'org.testcontainers:kafka:1.17.3'
integrationTestImplementation 'org.testcontainers:toxiproxy:1.16.0'
integrationTestImplementation 'com.solace.test.integration:pubsubplus-junit-jupiter:0.7.1'
integrationTestImplementation 'com.solace.test.integration:pubsubplus-junit-jupiter:1.1.0'
integrationTestImplementation 'org.slf4j:slf4j-api:1.7.36'
integrationTestImplementation 'org.apache.logging.log4j:log4j-slf4j-impl:2.18.0'
integrationTestImplementation 'org.apache.commons:commons-configuration2:2.8.0'
Expand All @@ -61,6 +61,7 @@ dependencies {
testImplementation 'org.apache.logging.log4j:log4j-slf4j-impl:2.18.0'
implementation "org.apache.kafka:connect-api:$kafkaVersion"
implementation "com.solacesystems:sol-jcsmp:$solaceJavaAPIVersion"
implementation "org.slf4j:slf4j-api:1.7.36"
}

pmd {
Expand Down Expand Up @@ -202,6 +203,18 @@ task('generateJava', type: Copy) {

project.compileJava {
dependsOn generateJava
sourceCompatibility '11'
targetCompatibility '11'
}

project.compileTestJava {
sourceCompatibility '11'
targetCompatibility '11'
}

project.compileIntegrationTestJava {
sourceCompatibility '11'
targetCompatibility '11'
}

java {
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
group=com.solace.connector.kafka.connect
version=2.3.0
version=3.0.0
2 changes: 1 addition & 1 deletion solace-integration-test-support
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.solace.connector.kafka.connect.sink;

import com.solace.connector.kafka.connect.sink.it.util.extensions.NetworkPubSubPlusExtension;
import com.solace.test.integration.junit.jupiter.extension.PubSubPlusExtension;
import com.solacesystems.jcsmp.BytesXMLMessage;
import com.solacesystems.jcsmp.ConsumerFlowProperties;
import com.solacesystems.jcsmp.DeliveryMode;
Expand Down Expand Up @@ -39,7 +39,7 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;

@ExtendWith(NetworkPubSubPlusExtension.class)
@ExtendWith(PubSubPlusExtension.class)
public class SolaceProducerHandlerIT {
private Map<String, String> properties;
private SolSessionHandler sessionHandler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@
import com.solace.connector.kafka.connect.sink.it.util.extensions.KafkaArgumentsProvider;
import com.solace.connector.kafka.connect.sink.it.util.extensions.KafkaArgumentsProvider.KafkaContext;
import com.solace.connector.kafka.connect.sink.it.util.extensions.KafkaArgumentsProvider.KafkaSource;
import com.solace.connector.kafka.connect.sink.it.util.extensions.NetworkPubSubPlusExtension;
import com.solace.connector.kafka.connect.sink.it.util.extensions.pubsubplus.NetworkPubSubPlusContainerProvider;
import com.solace.connector.kafka.connect.sink.recordprocessor.SolDynamicDestinationRecordProcessor;
import com.solace.test.integration.junit.jupiter.extension.ExecutorServiceExtension;
import com.solace.test.integration.junit.jupiter.extension.ExecutorServiceExtension.ExecSvc;
import com.solace.test.integration.junit.jupiter.extension.PubSubPlusExtension;
import com.solace.test.integration.junit.jupiter.extension.PubSubPlusExtension.JCSMPProxy;
import com.solace.test.integration.junit.jupiter.extension.PubSubPlusExtension.ToxiproxyContext;
import com.solace.test.integration.semp.v2.SempV2Api;
Expand Down Expand Up @@ -44,7 +45,6 @@
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestInstance.Lifecycle;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.junitpioneer.jupiter.cartesian.CartesianTest;
Expand Down Expand Up @@ -79,6 +79,7 @@
import static org.junit.jupiter.api.Assertions.fail;

@ExtendWith(ExecutorServiceExtension.class)
@ExtendWith(PubSubPlusExtension.class)
@ExtendWith(KafkaArgumentsProvider.AutoDeleteSolaceConnectorDeploymentAfterEach.class)
public class SinkConnectorIT implements TestConstants {

Expand All @@ -90,9 +91,6 @@ enum AdditionalCheck { ATTACHMENTBYTEBUFFER, CORRELATIONID }

private Properties connectorProps;

@RegisterExtension
public static final NetworkPubSubPlusExtension PUB_SUB_PLUS_EXTENSION = new NetworkPubSubPlusExtension();

////////////////////////////////////////////////////
// Main setup/teardown

Expand All @@ -110,7 +108,7 @@ static void setUp(JCSMPSession jcsmpSession) throws JCSMPException {
@BeforeEach
public void beforeEach(JCSMPProperties jcsmpProperties) {
connectorProps = new Properties();
connectorProps.setProperty(SolaceSinkConstants.SOL_HOST, String.format("tcp://%s:55555", PUB_SUB_PLUS_EXTENSION.getNetworkAlias()));
connectorProps.setProperty(SolaceSinkConstants.SOL_HOST, String.format("tcp://%s:55555", NetworkPubSubPlusContainerProvider.DOCKER_NET_PUBSUB_ALIAS));
connectorProps.setProperty(SolaceSinkConstants.SOL_USERNAME, jcsmpProperties.getStringProperty(JCSMPProperties.USERNAME));
connectorProps.setProperty(SolaceSinkConstants.SOL_PASSWORD, jcsmpProperties.getStringProperty(JCSMPProperties.PASSWORD));
connectorProps.setProperty(SolaceSinkConstants.SOL_VPN_NAME, jcsmpProperties.getStringProperty(JCSMPProperties.VPN_NAME));
Expand Down Expand Up @@ -603,9 +601,9 @@ void testCommitRollback(@Values(booleans = { false, true }) boolean dynamicDesti
connectorProps.setProperty("errors.retry.timeout", Long.toString(-1));

sempV2Api.config().updateMsgVpnQueue(SOL_VPN, queue.getName(), new ConfigMsgVpnQueue().maxMsgSize(1),
null);
null, null);
sempV2Api.config().createMsgVpnQueueSubscription(SOL_VPN, queue.getName(),
new ConfigMsgVpnQueueSubscription().subscriptionTopic(topicName), null);
new ConfigMsgVpnQueueSubscription().subscriptionTopic(topicName), null, null);
assertTimeoutPreemptively(Duration.ofSeconds(30), () -> {
while (sempV2Api.monitor().getMsgVpnQueue(SOL_VPN, queue.getName(), null).getData()
.getMaxMsgSize() != 1) {
Expand Down Expand Up @@ -648,7 +646,7 @@ void testCommitRollback(@Values(booleans = { false, true }) boolean dynamicDesti
executorService.schedule(() -> {
logger.info("Restoring max message size for queue {}", queue.getName());
return sempV2Api.config().updateMsgVpnQueue(SOL_VPN, queue.getName(),
new ConfigMsgVpnQueue().maxMsgSize(10000000), null);
new ConfigMsgVpnQueue().maxMsgSize(10000000), null, null);
}, 5, TimeUnit.SECONDS);

logger.info("Waiting for Solace transaction to be committed");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,20 @@ public SolaceConnectorDeployment(KafkaConnection kafkaConnection, AdminClient ad

public void waitForConnectorRestIFUp() {
Request request = new Request.Builder().url(kafkaConnection.getConnectUrl() + "/connector-plugins").build();
boolean success = false;
do {
try {
Thread.sleep(1000L);
try (Response response = client.newCall(request).execute()) {
success = response.isSuccessful();
assertTimeoutPreemptively(Duration.ofMinutes(15), () -> {
boolean success = false;
do {
try {
Thread.sleep(1000L);
logger.info("Waiting for {} to be accessible", request.url());
try (Response response = client.newCall(request).execute()) {
success = response.isSuccessful();
}
} catch (IOException | InterruptedException e) {
logger.error("Failed to get connector-plugins", e);
}
} catch (IOException | InterruptedException e) {
// Continue looping
}
} while (!success);
} while (!success);
});
}

public void provisionKafkaTestTopic() {
Expand Down
Loading

0 comments on commit 7c18846

Please sign in to comment.