diff --git a/.gitignore b/.gitignore
index 5014111..22a1a35 100644
--- a/.gitignore
+++ b/.gitignore
@@ -6,7 +6,7 @@
# Jave related
diff --git a/README.md b/README.md
index 52ca58a..01fb6c6 100644
--- a/README.md
+++ b/README.md
@@ -319,16 +319,22 @@ Kerberos has some very specific requirements to operate correctly. Some addition
JDK 8 or higher is required for this project.
-First, clone this GitHub repo:
-git clone https://github.com/SolaceProducts/pubsubplus-connector-kafka-source.git
-cd pubsubplus-connector-kafka-source
-Then run the build script:
-gradlew clean build
+1. First, clone this GitHub repo:
+ ```shell
+ git clone https://github.com/SolaceProducts/pubsubplus-connector-kafka-source.git
+ cd pubsubplus-connector-kafka-source
+ ```
+2. Install the test support module:
+ ```shell
+ git submodule update --init --recursive
+ cd solace-integration-test-support
+ ./mvnw clean install -DskipTests
+ cd ..
+ ```
+3. Then run the build script:
+ ```shell
+ gradlew clean build
+ ```
This script creates artifacts in the `build` directory, including the deployable packaged PubSub+ Source Connector archives under `build\distributions`.
@@ -336,26 +342,39 @@ This script creates artifacts in the `build` directory, including the deployable
An integration test suite is also included, which spins up a Docker-based deployment environment that includes a PubSub+ event broker, Zookeeper, Kafka broker, Kafka Connect. It deploys the connector to Kafka Connect and runs end-to-end tests.
-1. Install the test support module:
- ```shell
- git submodule update --init --recursive
- cd solace-integration-test-support
- ./mvnw clean install -DskipTests
- cd ..
- ```
-2. Run the tests:
+1. Run the tests:
./gradlew clean test integrationTest
### Build a New Message Processor
-The processing of the Solace message to create a Kafka source record is handled by an interface defined in [`SolaceMessageProcessorIF.java`](/src/main/java/com/solace/connector/kafka/connect/source/SolMessageProcessorIF.java). This is a simple interface that creates the Kafka source records from the PubSub+ messages. This project includes two examples of classes that implement this interface:
+The processing of the Solace message to create a Kafka source record is handled by [`SolaceMessageProcessorIF`](/src/main/java/com/solace/connector/kafka/connect/source/SolMessageProcessorIF.java). This is a simple interface that creates the Kafka source records from the PubSub+ messages.
+To get started, import the following dependency into your project:
+ com.solace.connector.kafka.connect
+ pubsubplus-connector-kafka-source
+ 2.1.0
+compile "com.solace.connector.kafka.connect:pubsubplus-connector-kafka-source:2.1.0"
+Now you can implement your custom `SolaceMessageProcessorIF`.
+For reference, this project includes two examples which you can use as starting points for implementing your own custom message processors:
* [SolSampleSimpleMessageProcessor](/src/main/java/com/solace/connector/kafka/connect/source/msgprocessors/SolSampleSimpleMessageProcessor.java)
* [SolaceSampleKeyedMessageProcessor](/src/main/java/com/solace/connector/kafka/connect/source/msgprocessors/SolaceSampleKeyedMessageProcessor.java)
-You can use these examples as starting points for implementing your own custom message processors.
+Once you've built the jar file for your custom message processor project, place it into the same directory as this connector, and update the connector's `sol.message_processor_class` config to point to the class of your new message processor.
More information on Kafka source connector development can be found here:
- [Apache Kafka Connect](https://kafka.apache.org/documentation/)
diff --git a/build.gradle b/build.gradle
index e827971..947d459 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1,17 +1,22 @@
import com.github.spotbugs.snom.SpotBugsTask
+import io.github.gradlenexus.publishplugin.InitializeNexusStagingRepository
plugins {
id 'java'
id 'distribution'
id 'jacoco'
+ id 'maven-publish'
id 'pmd'
+ id 'signing'
id 'com.github.spotbugs' version '4.7.6'
+ id 'io.github.gradle-nexus.publish-plugin' version '1.1.0'
id 'org.unbroken-dome.test-sets' version '2.2.1'
ext {
kafkaVersion = '2.8.1'
solaceJavaAPIVersion = '10.12.0'
+ isSnapshot = project.version.endsWith('-SNAPSHOT')
repositories {
@@ -38,9 +43,9 @@ dependencies {
integrationTestImplementation 'org.testcontainers:testcontainers:1.16.0'
integrationTestImplementation 'org.testcontainers:junit-jupiter:1.16.0'
integrationTestImplementation 'org.testcontainers:kafka:1.16.0'
- integrationTestImplementation 'com.solace.test.integration:pubsubplus-junit-jupiter:0.5.0'
+ integrationTestImplementation 'com.solace.test.integration:pubsubplus-junit-jupiter:0.7.2'
integrationTestImplementation 'org.slf4j:slf4j-api:1.7.32'
- integrationTestImplementation 'org.apache.logging.log4j:log4j-slf4j-impl:2.14.1'
+ integrationTestImplementation 'org.apache.logging.log4j:log4j-slf4j-impl:2.16.0'
integrationTestImplementation 'org.apache.commons:commons-configuration2:2.6'
integrationTestImplementation 'commons-beanutils:commons-beanutils:1.9.4'
integrationTestImplementation 'com.google.code.gson:gson:2.3.1'
@@ -49,7 +54,7 @@ dependencies {
integrationTestImplementation "org.apache.kafka:kafka-clients:$kafkaVersion"
testImplementation 'org.junit.jupiter:junit-jupiter:5.8.1'
testImplementation 'org.hamcrest:hamcrest-all:1.3'
- testImplementation 'org.apache.logging.log4j:log4j-slf4j-impl:2.14.1'
+ testImplementation 'org.apache.logging.log4j:log4j-slf4j-impl:2.16.0'
compile "org.apache.kafka:connect-api:$kafkaVersion"
compile "com.solacesystems:sol-jcsmp:$solaceJavaAPIVersion"
@@ -96,6 +101,7 @@ project.integrationTest {
outputs.upToDateWhen { false }
dependsOn prepDistForIntegrationTesting
+ shouldRunAfter test
afterSuite { desc, result ->
if (!desc.parent)
println("${result.resultType} " +
@@ -181,6 +187,11 @@ project.compileJava {
dependsOn generateJava
+java {
+ withJavadocJar()
+ withSourcesJar()
distributions {
main {
contents {
@@ -197,3 +208,89 @@ distributions {
+publishing {
+ publications {
+ maven(MavenPublication) {
+ from components.java
+ pom {
+ name = "Solace PubSub+ Connector for Kafka: Source"
+ description = "The PubSub+ Kafka Source Connector consumes PubSub+ event broker real-time queue or topic data events and streams them to a Kafka topic as Source Records."
+ url = "https://github.com/SolaceProducts/pubsubplus-connector-kafka-source"
+ packaging = "jar"
+ licenses {
+ license {
+ name = "Apache License, Version 2.0"
+ url = "https://github.com/SolaceProducts/pubsubplus-connector-kafka-source/blob/master/LICENSE"
+ distribution = "repo"
+ }
+ }
+ organization {
+ name = "Solace"
+ url = "https://www.solace.com"
+ }
+ developers {
+ developer {
+ name = "Support for Solace"
+ email = "support@solace.com"
+ organization = "Solace"
+ organizationUrl = "http://solace.community"
+ }
+ }
+ scm {
+ connection = "scm:git:git://github.com/SolaceProducts/pubsubplus-connector-kafka-source.git"
+ developerConnection = "scm:git:git@github.com:SolaceProducts/pubsubplus-connector-kafka-source.git"
+ url = "https://github.com/SolaceProducts/pubsubplus-connector-kafka-source.git"
+ }
+ }
+ }
+ }
+ repositories {
+ maven {
+ def releasesUrl = uri('http://apps-jenkins:9090/nexus/content/repositories/releases')
+ def snapshotRepositoryUrl = uri('http://apps-jenkins:9090/nexus/content/repositories/snapshots')
+ url = isSnapshot ? snapshotRepositoryUrl : releasesUrl
+ name = 'internal'
+ credentials {
+ username = project.properties[name + "Username"]
+ password = project.properties[name + "Password"]
+ }
+ }
+ }
+nexusPublishing {
+ repositories {
+ sonatype {
+ nexusUrl = uri('https://oss.sonatype.org/service/local/')
+ snapshotRepositoryUrl = uri('https://oss.sonatype.org/content/repositories/snapshots')
+ // gets credentials from project.properties["sonatypeUsername"] project.properties["sonatypePassword"]
+ }
+ }
+signing {
+ required {
+ !isSnapshot
+ }
+ useGpgCmd()
+ sign publishing.publications.maven
+tasks.withType(Sign) {
+ onlyIf {
+ gradle.taskGraph.allTasks.any {task ->
+ task.name.startsWith("publish") && task.name.contains('Sonatype')
+ }
+ }
+ shouldRunAfter test, integrationTest
+tasks.withType(InitializeNexusStagingRepository).configureEach {
+ dependsOn test, integrationTest
+ shouldRunAfter tasks.withType(Sign)
+tasks.withType(PublishToMavenRepository).configureEach {
+ dependsOn test, integrationTest
diff --git a/deployer-pom.xml b/deployer-pom.xml
new file mode 100644
index 0000000..2d1b8e4
--- /dev/null
+++ b/deployer-pom.xml
@@ -0,0 +1,396 @@
+ 4.0.0
+ com.solace.deploy
+ solace-deployer
+ Solace Deployer
+ 1.0.0
+ A pom for publishing release version to maven central
+ pom
+ ${project.build.directory}/localRepository
+ solace-internal
+ true
+ http://apps-jenkins:9090/nexus/content/groups/stable/
+ org.apache.maven.plugins
+ maven-enforcer-plugin
+ 3.0.0
+ enforce-property
+ enforce
+ release.groupId
+ ^com.solace*(?:systems)?.*?
+ release.groupId must be a valid group ID that starts with "com.solace".
+ release.artifactId
+ release.version
+ ^(?:\d+\.){2}\d+$
+ release.version must be a non-SNAPSHOT semantic version.
+ local.staging.dir
+ org.codehaus.mojo
+ build-helper-maven-plugin
+ 3.2.0
+ regex-property
+ regex-property
+ release.install.groupPath
+ ${release.groupId}
+ \.
+ /
+ true
+ org.apache.maven.plugins
+ maven-install-plugin
+ 2.5.2
+ true
+ org.apache.maven.plugins
+ maven-dependency-plugin
+ 3.2.0
+ copy
+ deploy
+ copy
+ ${project.build.directory}/tmpRepository
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ jar
+ true
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ jar.md5
+ true
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ jar.sha1
+ true
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ jar.sha256
+ true
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ jar.sha512
+ true
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ pom
+ true
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ pom.md5
+ true
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ pom.sha1
+ true
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ pom.sha256
+ true
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ pom.sha512
+ true
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ jar
+ javadoc
+ true
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ jar.md5
+ javadoc
+ true
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ jar.sha1
+ javadoc
+ true
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ jar.sha256
+ javadoc
+ true
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ jar.sha512
+ javadoc
+ true
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ jar
+ sources
+ true
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ jar.md5
+ sources
+ true
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ jar.sha1
+ sources
+ true
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ jar.sha256
+ sources
+ true
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ jar.sha512
+ sources
+ true
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ module
+ true
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ module.md5
+ true
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ module.sha1
+ true
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ module.sha256
+ true
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ module.sha512
+ true
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+ org.apache.maven.plugins
+ maven-deploy-plugin
+ 2.7
+ true
+ org.apache.maven.plugins
+ maven-source-plugin
+ 3.2.1
+ false
diff --git a/gradle.properties b/gradle.properties
index 16cc23c..aae37c5 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -1 +1,2 @@
\ No newline at end of file
diff --git a/solace-integration-test-support b/solace-integration-test-support
index c411ac2..7cce14d 160000
--- a/solace-integration-test-support
+++ b/solace-integration-test-support
@@ -1 +1 @@
-Subproject commit c411ac2e0f82af25ece2994691352cb0d6235142
+Subproject commit 7cce14d0112a49d79edbebf12e91febd44c65462
diff --git a/src/main/java/com/solace/connector/kafka/connect/source/SolSessionHandler.java b/src/main/java/com/solace/connector/kafka/connect/source/SolSessionHandler.java
index cd29211..c9b5550 100644
--- a/src/main/java/com/solace/connector/kafka/connect/source/SolSessionHandler.java
+++ b/src/main/java/com/solace/connector/kafka/connect/source/SolSessionHandler.java
@@ -166,7 +166,6 @@ public void configureSession() {
* Connect JCSMPSession.
- * @return boolean result
* @throws JCSMPException
public void connectSession() throws JCSMPException {
diff --git a/src/main/java/com/solace/connector/kafka/connect/source/SolaceSourceConnectorConfig.java b/src/main/java/com/solace/connector/kafka/connect/source/SolaceSourceConnectorConfig.java
index d74a030..0cbceb0 100644
--- a/src/main/java/com/solace/connector/kafka/connect/source/SolaceSourceConnectorConfig.java
+++ b/src/main/java/com/solace/connector/kafka/connect/source/SolaceSourceConnectorConfig.java
@@ -34,6 +34,7 @@ public class SolaceSourceConnectorConfig extends AbstractConfig {
* Constructor to create Solace Configuration details for Source Connector.
+ * @param properties the configuration properties
public SolaceSourceConnectorConfig(Map properties) {
super(config, properties);
@@ -43,6 +44,7 @@ public SolaceSourceConnectorConfig(Map properties) {
* Returns a ConfigDef to be used for Source Task.
+ * @return ConfigDef
public static ConfigDef solaceConfigDef() {
diff --git a/src/template/java/com/solace/connector/kafka/connect/source/VersionUtil.java b/src/template/java/com/solace/connector/kafka/connect/source/VersionUtil.java
index 839a546..bac7c0f 100644
--- a/src/template/java/com/solace/connector/kafka/connect/source/VersionUtil.java
+++ b/src/template/java/com/solace/connector/kafka/connect/source/VersionUtil.java
@@ -4,6 +4,7 @@ public class VersionUtil {
* Returns the projects version number for the connector.
+ * @return Version Number
public static String getVersion() {
return "${version}";