Skip to content

Commit

Permalink
Kafka 2.3.0 (#56)
Browse files Browse the repository at this point in the history
* Documentation fixes (#23)
* Sol 75392 update sink connector to propagate failed status (#24)
* Enabled skipping of duplicate run check
* Upgraded to Gradle 7
* Set task to failed state in case of connection error to destination on PS+ broker
* Using PS+ broker latest for testing
* Adjusted test runs
* Minor doc fix
* Upgraded to 10.15.0 (#25)
  • Loading branch information
bczoma authored Sep 12, 2022
1 parent a02fda4 commit 574c767
Show file tree
Hide file tree
Showing 17 changed files with 100 additions and 81 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ on:
pull_request:

push:

workflow_dispatch:

jobs:
dupe_check:
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ It builds on the open source [Apache Kafka Quickstart tutorial](//kafka.apache.o
```
After startup, the logs will eventually contain following line:
```
================Session is Connected
================ JCSMPSession Connected
```

6. To watch messages arriving into PubSub+, we use the "Try Me!" test service of the browser-based administration console to subscribe to messages to the `sinktest` topic. Behind the scenes, "Try Me!" uses the JavaScript WebSocket API.
Expand Down Expand Up @@ -173,7 +173,7 @@ The connector's JSON configuration file, in this case, is called `solace_sink_pr

Determine whether the Sink Connector is running with the following command:
```ini
curl 18.218.82.209:8083/connectors/solaceSourceConnector/status | jq
curl 18.218.82.209:8083/connectors/solaceSinkConnector/status | jq
```
If there was an error in starting, the details are returned with this command.

Expand Down
58 changes: 35 additions & 23 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@ import com.github.spotbugs.snom.SpotBugsTask
import io.github.gradlenexus.publishplugin.InitializeNexusStagingRepository

plugins {
id 'java'
id 'java-library'
id 'distribution'
id 'jacoco'
id 'maven-publish'
id 'pmd'
id 'signing'
id 'com.github.spotbugs' version '4.7.6'
id 'com.github.spotbugs' version '5.0.12'
id 'io.github.gradle-nexus.publish-plugin' version '1.1.0'
id 'org.gradle.test-retry' version '1.3.1'
id 'org.unbroken-dome.test-sets' version '2.2.1'
id 'org.unbroken-dome.test-sets' version '4.0.0'
}

ext {
kafkaVersion = '2.8.1'
solaceJavaAPIVersion = '10.12.1'
solaceJavaAPIVersion = '10.15.0'
isSnapshot = project.version.endsWith('-SNAPSHOT')
}

Expand All @@ -38,41 +38,47 @@ testSets {
}

dependencies {
integrationTestImplementation 'org.junit.jupiter:junit-jupiter:5.8.1'
integrationTestImplementation 'org.junit.jupiter:junit-jupiter:5.9.0'
integrationTestImplementation 'org.junit-pioneer:junit-pioneer:1.5.0'
integrationTestImplementation 'org.mockito:mockito-junit-jupiter:3.12.4'
integrationTestImplementation 'org.testcontainers:testcontainers:1.16.0'
integrationTestImplementation 'org.testcontainers:junit-jupiter:1.16.0'
integrationTestImplementation 'org.testcontainers:kafka:1.16.0'
integrationTestImplementation 'org.mockito:mockito-junit-jupiter:4.7.0'
integrationTestImplementation 'org.testcontainers:testcontainers:1.17.3'
integrationTestImplementation 'org.testcontainers:junit-jupiter:1.17.3'
integrationTestImplementation 'org.testcontainers:kafka:1.17.3'
integrationTestImplementation 'org.testcontainers:toxiproxy:1.16.0'
integrationTestImplementation 'com.solace.test.integration:pubsubplus-junit-jupiter:0.7.1'
integrationTestImplementation 'org.slf4j:slf4j-api:1.7.32'
integrationTestImplementation 'org.apache.logging.log4j:log4j-slf4j-impl:2.16.0'
integrationTestImplementation 'org.apache.commons:commons-configuration2:2.6'
integrationTestImplementation 'org.slf4j:slf4j-api:1.7.36'
integrationTestImplementation 'org.apache.logging.log4j:log4j-slf4j-impl:2.18.0'
integrationTestImplementation 'org.apache.commons:commons-configuration2:2.8.0'
integrationTestImplementation 'commons-beanutils:commons-beanutils:1.9.4'
integrationTestImplementation 'com.google.code.gson:gson:2.3.1'
integrationTestImplementation 'commons-io:commons-io:2.4'
integrationTestImplementation 'com.squareup.okhttp3:okhttp:4.9.1'
integrationTestImplementation 'com.google.code.gson:gson:2.9.1'
integrationTestImplementation 'commons-io:commons-io:2.11.0'
integrationTestImplementation 'com.squareup.okhttp3:okhttp:4.10.0'
integrationTestImplementation "org.apache.kafka:kafka-clients:$kafkaVersion"
testImplementation 'org.junit.jupiter:junit-jupiter:5.8.1'
testImplementation 'org.mockito:mockito-junit-jupiter:3.12.4'
testImplementation 'org.apache.commons:commons-lang3:3.12.0'
testImplementation 'org.junit.jupiter:junit-jupiter:5.9.0'
testImplementation 'org.mockito:mockito-junit-jupiter:4.7.0'
testImplementation 'org.hamcrest:hamcrest-all:1.3'
testImplementation 'org.apache.logging.log4j:log4j-slf4j-impl:2.16.0'
compile "org.apache.kafka:connect-api:$kafkaVersion"
compile "com.solacesystems:sol-jcsmp:$solaceJavaAPIVersion"
testImplementation 'org.apache.logging.log4j:log4j-slf4j-impl:2.18.0'
implementation "org.apache.kafka:connect-api:$kafkaVersion"
implementation "com.solacesystems:sol-jcsmp:$solaceJavaAPIVersion"
}

pmd {
consoleOutput = true
toolVersion = '6.49.0'
rulesMinimumPriority = 2
toolVersion = '6.38.0'
sourceSets = [sourceSets.main]
}

spotbugs {
effort 'max'
reportLevel 'high' // Decrease to medium once medium errors are fixed
}

spotbugsIntegrationTest {
enabled = false
}

task('jacocoFullReport', type: JacocoReport) {
description 'Generates code coverage report for all tests.'
executionData tasks.withType(Test)
Expand All @@ -99,6 +105,7 @@ task('prepDistForIntegrationTesting') {
}
}
}

project.integrationTest {
useJUnitPlatform()
outputs.upToDateWhen { false }
Expand Down Expand Up @@ -180,6 +187,11 @@ task('pmdMainSarif') {
}
}

task sourcesJar(type: Jar, dependsOn: classes) {
classifier = 'sources'
from sourceSets.main.allSource
}

task('generateJava', type: Copy) {
def templateContext = [version: project.version]
inputs.properties templateContext // Register context as input so that task doesn't skip when props are updated
Expand Down Expand Up @@ -207,7 +219,7 @@ distributions {
from('THIRD-PARTY-LICENSES') { into 'doc' }
into('lib') {
from jar
from(project.configurations.runtime)
from(project.configurations.runtimeClasspath)
}
// from jar
}
Expand All @@ -220,7 +232,7 @@ publishing {
from components.java
pom {
name = "Solace PubSub+ Connector for Kafka: Sink"
description = "The Solace/Kafka adapter consumes Kafka topic records and streams them to the PubSub+ Event Mesh as topic and/or queue data events."
description = "The PubSub+ Kafka Sink Connector consumes Kafka topic records and streams them to the PubSub+ Event Mesh as topic and/or queue data events."
url = "https://github.com/SolaceProducts/pubsubplus-connector-kafka-sink"
packaging = "jar"
licenses {
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
group=com.solace.connector.kafka.connect
version=2.2.0
version=2.3.0
Binary file modified gradle/wrapper/gradle-wrapper.jar
Binary file not shown.
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-6.9.1-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-7.5.1-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
2 changes: 0 additions & 2 deletions gradlew
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ esac

CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar


# Determine the Java command to use to start the JVM.
if [ -n "$JAVA_HOME" ] ; then
if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
Expand Down Expand Up @@ -130,7 +129,6 @@ fi
if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then
APP_HOME=`cygpath --path --mixed "$APP_HOME"`
CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`

JAVACMD=`cygpath --unix "$JAVACMD"`

# We build the pattern for arguments to be converted via cygpath
Expand Down
25 changes: 18 additions & 7 deletions gradlew.bat
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ if "%DIRNAME%" == "" set DIRNAME=.
set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME%

@rem Resolve any "." and ".." in APP_HOME to make it shorter.
for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi

@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m"

Expand All @@ -40,7 +37,7 @@ if defined JAVA_HOME goto findJavaFromJavaHome

set JAVA_EXE=java.exe
%JAVA_EXE% -version >NUL 2>&1
if "%ERRORLEVEL%" == "0" goto execute
if "%ERRORLEVEL%" == "0" goto init

echo.
echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
Expand All @@ -54,7 +51,7 @@ goto fail
set JAVA_HOME=%JAVA_HOME:"=%
set JAVA_EXE=%JAVA_HOME%/bin/java.exe

if exist "%JAVA_EXE%" goto execute
if exist "%JAVA_EXE%" goto init

echo.
echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME%
Expand All @@ -64,14 +61,28 @@ echo location of your Java installation.

goto fail

:init
@rem Get command-line arguments, handling Windows variants

if not "%OS%" == "Windows_NT" goto win9xME_args

:win9xME_args
@rem Slurp the command line arguments.
set CMD_LINE_ARGS=
set _SKIP=2

:win9xME_args_slurp
if "x%~1" == "x" goto execute

set CMD_LINE_ARGS=%*

:execute
@rem Setup the command line

set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar


@rem Execute Gradle
"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %*
"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS%

:end
@rem End local scope for the variables with windows NT shell
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.TestInstance;
Expand Down Expand Up @@ -574,6 +575,7 @@ void testRebalancedKafkaConsumers(@Values(booleans = { false, true }) boolean dy
ImmutableMap.of(AdditionalCheck.ATTACHMENTBYTEBUFFER, newRecordValue));
}

@Disabled()
@CartesianTest(name = "[{index}] dynamicDestination={0}, autoFlush={1}, kafka={2}")
void testCommitRollback(@Values(booleans = { false, true }) boolean dynamicDestination,
@Values(booleans = { false, true }) boolean autoFlush,
Expand Down Expand Up @@ -633,7 +635,7 @@ void testCommitRollback(@Values(booleans = { false, true }) boolean dynamicDesti
.contains("Document Is Too Large"), 30, TimeUnit.SECONDS);
if (autoFlush) {
logConsumer.waitUntil(frame -> frame.getUtf8String()
.contains("RetriableException from SinkTask"), 30, TimeUnit.SECONDS);
.contains("ConnectException from SinkTask"), 30, TimeUnit.SECONDS);
} else {
logConsumer.waitUntil(frame -> frame.getUtf8String()
.contains("Offset commit failed, rewinding to last committed offsets"), 1, TimeUnit.MINUTES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
Expand Down Expand Up @@ -198,7 +198,7 @@ public void testSendThrowsJCSMPException(Class<Destination> destinationType, Que
solaceSinkTask.stop();
ConnectException thrown = assertThrows(ConnectException.class, () -> solaceSinkTask.put(
Collections.singleton(sinkRecord)));
assertThat(thrown, instanceOf(RetriableException.class));
assertThat(thrown, instanceOf(ConnectException.class));
assertThat(thrown.getMessage(), containsString("Received exception while sending message to " +
(destinationType.isAssignableFrom(Queue.class) ? "queue" : "topic")));
assertThat(thrown.getCause(), instanceOf(ClosedFacilityException.class));
Expand All @@ -225,7 +225,7 @@ public void testDynamicSendThrowsJCSMPException(Class<Destination> destinationTy
solaceSinkTask.stop();
ConnectException thrown = assertThrows(ConnectException.class, () -> solaceSinkTask.put(
Collections.singleton(sinkRecord)));
assertThat(thrown, instanceOf(RetriableException.class));
assertThat(thrown, instanceOf(ConnectException.class));
assertThat(thrown.getMessage(), containsString("Received exception while sending message to topic"));
assertThat(thrown.getCause(), instanceOf(ClosedFacilityException.class));
}
Expand Down Expand Up @@ -331,7 +331,7 @@ public void testCommitRollback(boolean autoFlush, SempV2Api sempV2Api, Queue que

ConnectException thrown;
if (autoFlush) {
thrown = assertThrows(RetriableException.class, () -> solaceSinkTask.put(Collections.singleton(sinkRecord)));
thrown = assertThrows(ConnectException.class, () -> solaceSinkTask.put(Collections.singleton(sinkRecord)));
} else {
Map<TopicPartition, OffsetAndMetadata> currentOffsets = Collections.singletonMap(
new TopicPartition(sinkRecord.topic(), sinkRecord.kafkaPartition()),
Expand Down Expand Up @@ -393,7 +393,7 @@ public void testDynamicDestinationCommitRollback(

ConnectException thrown;
if (autoFlush) {
thrown = assertThrows(RetriableException.class, () -> solaceSinkTask.put(Collections.singleton(sinkRecord)));
thrown = assertThrows(ConnectException.class, () -> solaceSinkTask.put(Collections.singleton(sinkRecord)));
} else {
Map<TopicPartition, OffsetAndMetadata> currentOffsets = Collections.singletonMap(
new TopicPartition(sinkRecord.topic(), sinkRecord.kafkaPartition()),
Expand All @@ -409,6 +409,7 @@ public void testDynamicDestinationCommitRollback(
.getData().getMaxMsgSizeExceededDiscardedMsgCount());
}

@Disabled()
@ParameterizedTest(name = "[{index}] autoFlush={0}")
@ValueSource(booleans = {false, true})
public void testLongCommit(boolean autoFlush,
Expand Down Expand Up @@ -498,6 +499,7 @@ public void testLongCommit(boolean autoFlush,
JCSMPFactory.onlyInstance().createTopic(connectorProperties.get(SolaceSinkConstants.SOL_TOPICS))));
}

@Disabled()
@CartesianTest(name = "[{index}] destinationType={0}, autoFlush={1}")
public void testDynamicDestinationLongCommit(
@Values(classes = {Queue.class, Topic.class}) Class<Destination> destinationType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public class NetworkPubSubPlusExtension extends PubSubPlusExtension {
private static final String DOCKER_NET_PUBSUB_ALIAS = "solace-pubsubplus";

public NetworkPubSubPlusExtension() {
super(() -> new PubSubPlusContainer()
super(() -> new PubSubPlusContainer("solace/solace-pubsub-standard:latest")
.withNetwork(DOCKER_NET)
.withNetworkAliases(DOCKER_NET_PUBSUB_ALIAS));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ protected void containerIsStarting(InspectContainerResponse containerInfo) {

@Override
public void close() {
super.close();
if (zookeeperContainer != null) {
zookeeperContainer.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public void configureSession() {

/**
* Create and connect JCSMPSession
* @throws JCSMPException
* @throws JCSMPException In case of JCSMP error
*/
public void connectSession() throws JCSMPException {
System.setProperty("java.security.auth.login.config",
Expand All @@ -182,7 +182,7 @@ public void connectSession() throws JCSMPException {

/**
* Create transacted session
* @throws JCSMPException
* @throws JCSMPException In case of JCSMP error
*/
public void createTxSession() throws JCSMPException {
if (txSession == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ public static ConfigDef solaceConfigDef() {
.define(SolaceSinkConstants.SOL_VPN_NAME, Type.STRING, "default", Importance.HIGH,
"Solace VPN to connect with ")
.define(SolaceSinkConstants.SOL_TOPICS, Type.STRING, null, Importance.MEDIUM,
"Solace topic or list of topics to subscribe from")
"Solace topic or list of topics to publish to")
.define(SolaceSinkConstants.SOl_QUEUE,
Type.STRING, null, Importance.MEDIUM, "Solace queue to consume from")
Type.STRING, null, Importance.MEDIUM, "Solace queue to publish to")
.define(SolaceSinkConstants.SOL_RECORD_PROCESSOR,
Type.CLASS, SolRecordProcessorIF.class, Importance.HIGH,
"default Solace message processor to use against Kafka Sink Records")
"default Solace record processor to use against Kafka Sink Records")
.define(SolaceSinkConstants.SOL_RECORD_PROCESSOR_IGNORE_ERROR,
Type.BOOLEAN, false, Importance.MEDIUM,
"If enabled, records that throw record processor errors will be discarded")
Expand Down
Loading

0 comments on commit 574c767

Please sign in to comment.