diff --git a/.gitignore b/.gitignore
index d2e5889..0e26c64 100644
--- a/.gitignore
+++ b/.gitignore
@@ -7,7 +7,7 @@
/bin/
# Jave related
-target/**
+/target/
*.jar
*.war
*.ear
@@ -45,4 +45,7 @@ local.properties
*.ipr
# Unzipped test connector
-src/integrationTest/resources/pubsubplus-connector-kafka*/
\ No newline at end of file
+src/integrationTest/resources/pubsubplus-connector-kafka*/
+
+# Local testing
+solace.properties
diff --git a/README.md b/README.md
index 3732bce..ca279b0 100644
--- a/README.md
+++ b/README.md
@@ -258,17 +258,25 @@ A value of 0 results in the replay of the entire Kafka Topic. A positive value r
We recommend using PubSub+ Topics if high throughput is required and the Kafka Topic is configured for high performance. Message duplication and loss mimics the underlying reliability and QoS configured for the Kafka topic.
+By default, messages are published to topics using direct messaging. To publish persistent messages to topics using a local transaction, set `sol.use_transactions_for_topics` to `true`. See [Sending with Local Transactions](#sending-with-local-transactions) for more info.
+
#### Sending to PubSub+ Queue
When Kafka records reliability is critical, we recommend configuring the Sink Connector to send records to the Event Mesh using PubSub+ queues at the cost of reduced throughput.
A PubSub+ queue guarantees order of delivery, provides High Availability and Disaster Recovery (depending on the setup of the PubSub+ brokers) and provides an acknowledgment to the connector when the event is stored in all HA and DR members and flushed to disk. This is a higher guarantee than is provided by Kafka, even for Kafka idempotent delivery.
-The connector uses local transactions to deliver to the queue by default. The transaction is committed if messages are flushed by Kafka Connect (see below how to tune flush interval) or the outstanding messages size reaches the `sol.autoflush.size` (default 200) configuration.
+The connector uses local transactions to deliver to the queue by default. See [Sending with Local Transactions](#sending-with-local-transactions) for more info.
Note that generally one connector can send to only one queue.
-##### Recovery from Kafka Connect API or Kafka Broker Failure
+#### Sending with Local Transactions
+
+By default, only sending to a queue uses local transactions. To use the transacted session to send persistent messages to topics, set `sol.use_transactions_for_topics` to `true`.
+
+The transaction is committed if messages are flushed by Kafka Connect (see [below how to tune flush interval](#recovery-from-kafka-connect-api-or-kafka-broker-failure)) or the outstanding messages size reaches the `sol.autoflush.size` (default 200) configuration.
+
+#### Recovery from Kafka Connect API or Kafka Broker Failure
Operators are expected to monitor their connector for failures since errors will cause it to stop. If any are found and the connector was stopped, the operator must explicitly restart it again once the error condition has been resolved.
@@ -329,16 +337,22 @@ Kerberos has some very specific requirements to operate correctly. Some addition
JDK 8 or higher is required for this project.
-First, clone this GitHub repo:
-```shell
-git clone https://github.com/SolaceProducts/pubsubplus-connector-kafka-sink.git
-cd pubsubplus-connector-kafka-sink
-```
-
-Then run the build script:
-```shell
-./gradlew clean build
-```
+1. First, clone this GitHub repo:
+ ```shell
+ git clone https://github.com/SolaceProducts/pubsubplus-connector-kafka-sink.git
+ cd pubsubplus-connector-kafka-sink
+ ```
+2. Install the test support module:
+ ```shell
+ git submodule update --init --recursive
+ cd solace-integration-test-support
+ ./mvnw clean install -DskipTests
+ cd ..
+ ```
+3. Then run the build script:
+ ```shell
+ ./gradlew clean build
+ ```
This script creates artifacts in the `build` directory, including the deployable packaged PubSub+ Sink Connector archives under `build\distributions`.
@@ -346,27 +360,40 @@ This script creates artifacts in the `build` directory, including the deployable
An integration test suite is also included, which spins up a Docker-based deployment environment that includes a PubSub+ event broker, Zookeeper, Kafka broker, Kafka Connect. It deploys the connector to Kafka Connect and runs end-to-end tests.
-1. Install the test support module:
- ```shell
- git submodule update --init --recursive
- cd solace-integration-test-support
- ./mvnw clean install -DskipTests
- cd ..
- ```
-2. Run the tests:
+1. Run the tests:
```shell
./gradlew clean test integrationTest
```
### Build a New Record Processor
-The processing of a Kafka record to create a PubSub+ message is handled by an interface defined in [`SolRecordProcessorIF.java`](/src/main/java/com/solace/connector/kafka/connect/sink/SolRecordProcessorIF.java). This is a simple interface that creates the Kafka source records from the PubSub+ messages. This project includes three examples of classes that implement this interface:
+The processing of a Kafka record to create a PubSub+ message is handled by [`SolRecordProcessorIF`](/src/main/java/com/solace/connector/kafka/connect/sink/SolRecordProcessorIF.java). This is a simple interface that creates the Kafka source records from the PubSub+ messages.
+
+To get started, import the following dependency into your project:
+
+**Maven**
+```xml
+
+ com.solace.connector.kafka.connect
+ pubsubplus-connector-kafka-sink
+ 2.2.0
+
+```
+
+**Gradle**
+```groovy
+compile "com.solace.connector.kafka.connect:pubsubplus-connector-kafka-sink:2.2.0"
+```
+
+Now you can implement your custom `SolRecordProcessorIF`.
+
+For reference, this project includes three examples which you can use as starting points for implementing your own custom record processors:
* [SolSimpleRecordProcessor](/src/main/java/com/solace/connector/kafka/connect/sink/recordprocessor/SolSimpleRecordProcessor.java)
* [SolSimpleKeyedRecordProcessor](/src/main/java/com/solace/connector/kafka/connect/sink/recordprocessor/SolSimpleKeyedRecordProcessor.java)
* [SolDynamicDestinationRecordProcessor](/src/main/java/com/solace/connector/kafka/connect/sink/recordprocessor/SolDynamicDestinationRecordProcessor.java)
-You can use these examples as starting points for implementing your own custom record processors.
+Once you've built the jar file for your custom record processor project, place it into the same directory as this connector, and update the connector's `sol.record_processor_class` config to point to the class of your new record processor.
More information on Kafka sink connector development can be found here:
- [Apache Kafka Connect](https://kafka.apache.org/documentation/)
diff --git a/build.gradle b/build.gradle
index db66b72..0564ee8 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1,17 +1,23 @@
import com.github.spotbugs.snom.SpotBugsTask
+import io.github.gradlenexus.publishplugin.InitializeNexusStagingRepository
plugins {
id 'java'
id 'distribution'
id 'jacoco'
+ id 'maven-publish'
id 'pmd'
+ id 'signing'
id 'com.github.spotbugs' version '4.7.6'
+ id 'io.github.gradle-nexus.publish-plugin' version '1.1.0'
+ id 'org.gradle.test-retry' version '1.3.1'
id 'org.unbroken-dome.test-sets' version '2.2.1'
}
ext {
kafkaVersion = '2.8.1'
- solaceJavaAPIVersion = '10.12.0'
+ solaceJavaAPIVersion = '10.12.1'
+ isSnapshot = project.version.endsWith('-SNAPSHOT')
}
repositories {
@@ -33,14 +39,15 @@ testSets {
dependencies {
integrationTestImplementation 'org.junit.jupiter:junit-jupiter:5.8.1'
- integrationTestImplementation 'org.junit-pioneer:junit-pioneer:1.4.2'
+ integrationTestImplementation 'org.junit-pioneer:junit-pioneer:1.5.0'
integrationTestImplementation 'org.mockito:mockito-junit-jupiter:3.12.4'
integrationTestImplementation 'org.testcontainers:testcontainers:1.16.0'
integrationTestImplementation 'org.testcontainers:junit-jupiter:1.16.0'
integrationTestImplementation 'org.testcontainers:kafka:1.16.0'
- integrationTestImplementation 'com.solace.test.integration:pubsubplus-junit-jupiter:0.5.0'
+ integrationTestImplementation 'org.testcontainers:toxiproxy:1.16.0'
+ integrationTestImplementation 'com.solace.test.integration:pubsubplus-junit-jupiter:0.7.1'
integrationTestImplementation 'org.slf4j:slf4j-api:1.7.32'
- integrationTestImplementation 'org.apache.logging.log4j:log4j-slf4j-impl:2.14.1'
+ integrationTestImplementation 'org.apache.logging.log4j:log4j-slf4j-impl:2.16.0'
integrationTestImplementation 'org.apache.commons:commons-configuration2:2.6'
integrationTestImplementation 'commons-beanutils:commons-beanutils:1.9.4'
integrationTestImplementation 'com.google.code.gson:gson:2.3.1'
@@ -50,7 +57,7 @@ dependencies {
testImplementation 'org.junit.jupiter:junit-jupiter:5.8.1'
testImplementation 'org.mockito:mockito-junit-jupiter:3.12.4'
testImplementation 'org.hamcrest:hamcrest-all:1.3'
- testImplementation 'org.apache.logging.log4j:log4j-slf4j-impl:2.14.1'
+ testImplementation 'org.apache.logging.log4j:log4j-slf4j-impl:2.16.0'
compile "org.apache.kafka:connect-api:$kafkaVersion"
compile "com.solacesystems:sol-jcsmp:$solaceJavaAPIVersion"
}
@@ -96,6 +103,10 @@ project.integrationTest {
useJUnitPlatform()
outputs.upToDateWhen { false }
dependsOn prepDistForIntegrationTesting
+ shouldRunAfter test
+ retry {
+ maxRetries = 3
+ }
afterSuite { desc, result ->
if (!desc.parent)
println("${result.resultType} " +
@@ -181,6 +192,11 @@ project.compileJava {
dependsOn generateJava
}
+java {
+ withJavadocJar()
+ withSourcesJar()
+}
+
distributions {
main {
contents {
@@ -197,3 +213,89 @@ distributions {
}
}
}
+
+publishing {
+ publications {
+ maven(MavenPublication) {
+ from components.java
+ pom {
+ name = "Solace PubSub+ Connector for Kafka: Sink"
+ description = "The Solace/Kafka adapter consumes Kafka topic records and streams them to the PubSub+ Event Mesh as topic and/or queue data events."
+ url = "https://github.com/SolaceProducts/pubsubplus-connector-kafka-sink"
+ packaging = "jar"
+ licenses {
+ license {
+ name = "Apache License, Version 2.0"
+ url = "https://github.com/SolaceProducts/pubsubplus-connector-kafka-sink/blob/master/LICENSE"
+ distribution = "repo"
+ }
+ }
+ organization {
+ name = "Solace"
+ url = "https://www.solace.com"
+ }
+ developers {
+ developer {
+ name = "Support for Solace"
+ email = "support@solace.com"
+ organization = "Solace"
+ organizationUrl = "http://solace.community"
+ }
+ }
+ scm {
+ connection = "scm:git:git://github.com/SolaceProducts/pubsubplus-connector-kafka-sink.git"
+ developerConnection = "scm:git:git@github.com:SolaceProducts/pubsubplus-connector-kafka-sink.git"
+ url = "https://github.com/SolaceProducts/pubsubplus-connector-kafka-sink.git"
+ }
+ }
+ }
+ }
+ repositories {
+ maven {
+ def releasesUrl = uri('http://apps-jenkins:9090/nexus/content/repositories/releases')
+ def snapshotRepositoryUrl = uri('http://apps-jenkins:9090/nexus/content/repositories/snapshots')
+ url = isSnapshot ? snapshotRepositoryUrl : releasesUrl
+ name = 'internal'
+ credentials {
+ username = project.properties[name + "Username"]
+ password = project.properties[name + "Password"]
+ }
+ }
+ }
+}
+
+nexusPublishing {
+ repositories {
+ sonatype {
+ nexusUrl = uri('https://oss.sonatype.org/service/local/')
+ snapshotRepositoryUrl = uri('https://oss.sonatype.org/content/repositories/snapshots')
+ // gets credentials from project.properties["sonatypeUsername"] project.properties["sonatypePassword"]
+ }
+ }
+}
+
+signing {
+ required {
+ !isSnapshot
+ }
+ useGpgCmd()
+ sign publishing.publications.maven
+}
+
+tasks.withType(Sign) {
+ onlyIf {
+ gradle.taskGraph.allTasks.any {task ->
+ task.name.startsWith("publish") && task.name.contains('Sonatype')
+ }
+ }
+ shouldRunAfter test, integrationTest
+}
+
+tasks.withType(InitializeNexusStagingRepository).configureEach {
+ dependsOn test, integrationTest
+ shouldRunAfter tasks.withType(Sign)
+}
+
+tasks.withType(PublishToMavenRepository).configureEach {
+ dependsOn test, integrationTest
+}
diff --git a/deployer-pom.xml b/deployer-pom.xml
new file mode 100644
index 0000000..2d1b8e4
--- /dev/null
+++ b/deployer-pom.xml
@@ -0,0 +1,396 @@
+
+ 4.0.0
+ com.solace.deploy
+ solace-deployer
+ Solace Deployer
+ 1.0.0
+ A pom for publishing release version to maven central
+ pom
+
+
+ ${project.build.directory}/localRepository
+
+
+
+
+ solace-internal
+
+ true
+
+ http://apps-jenkins:9090/nexus/content/groups/stable/
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-enforcer-plugin
+ 3.0.0
+
+
+ enforce-property
+
+ enforce
+
+
+
+
+ release.groupId
+ ^com.solace*(?:systems)?.*?
+ release.groupId must be a valid group ID that starts with "com.solace".
+
+
+ release.artifactId
+
+
+ release.version
+ ^(?:\d+\.){2}\d+$
+ release.version must be a non-SNAPSHOT semantic version.
+
+
+ local.staging.dir
+
+
+
+
+
+
+
+ org.codehaus.mojo
+ build-helper-maven-plugin
+ 3.2.0
+
+
+ regex-property
+
+ regex-property
+
+
+ release.install.groupPath
+ ${release.groupId}
+ \.
+ /
+ true
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-install-plugin
+ 2.5.2
+
+ true
+
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+ 3.2.0
+
+
+ copy
+ deploy
+
+ copy
+
+
+ ${project.build.directory}/tmpRepository
+
+
+
+
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ jar
+ true
+
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+
+
+
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ jar.md5
+ true
+
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+
+
+
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ jar.sha1
+ true
+
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+
+
+
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ jar.sha256
+ true
+
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+
+
+
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ jar.sha512
+ true
+
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+
+
+
+
+
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ pom
+ true
+
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+
+
+
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ pom.md5
+ true
+
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+
+
+
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ pom.sha1
+ true
+
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+
+
+
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ pom.sha256
+ true
+
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+
+
+
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ pom.sha512
+ true
+
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+
+
+
+
+
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ jar
+ javadoc
+ true
+
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+
+
+
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ jar.md5
+ javadoc
+ true
+
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+
+
+
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ jar.sha1
+ javadoc
+ true
+
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+
+
+
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ jar.sha256
+ javadoc
+ true
+
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+
+
+
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ jar.sha512
+ javadoc
+ true
+
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+
+
+
+
+
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ jar
+ sources
+ true
+
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+
+
+
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ jar.md5
+ sources
+ true
+
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+
+
+
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ jar.sha1
+ sources
+ true
+
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+
+
+
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ jar.sha256
+ sources
+ true
+
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+
+
+
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ jar.sha512
+ sources
+ true
+
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+
+
+
+
+
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ module
+ true
+
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+
+
+
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ module.md5
+ true
+
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+
+
+
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ module.sha1
+ true
+
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+
+
+
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ module.sha256
+ true
+
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+
+
+
+ ${release.groupId}
+ ${release.artifactId}
+ ${release.version}
+ module.sha512
+ true
+
+ ${local.staging.dir}/${release.install.groupPath}/${release.artifactId}/${release.version}/
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-deploy-plugin
+ 2.7
+
+ true
+
+
+
+ org.apache.maven.plugins
+ maven-source-plugin
+ 3.2.1
+
+ false
+
+
+
+
+
diff --git a/etc/solace_sink.properties b/etc/solace_sink.properties
index ae26863..b757417 100644
--- a/etc/solace_sink.properties
+++ b/etc/solace_sink.properties
@@ -52,7 +52,10 @@ sol.record_processor_class=com.solace.connector.kafka.connect.sink.recordprocess
# Whether to use transacted session and transactions to publish messages to PubSub+ queue
#sol.use_transactions_for_queue=true
-# Max outstanding number of transacted messages if using transactions to reliably publish records to a queue. Must be <255
+# When true, messages published to topics will use persistent delivery type using transactions.
+#sol.use_transactions_for_topics=false
+
+# Max outstanding number of transacted messages if using transactions to reliably publish records to a queue or topic. Must be <255
# If outstanding messages limit is reached will auto-commit - will not wait for Kafka Connect "flush" initiated.
#sol.autoflush.size=200
diff --git a/gradle.properties b/gradle.properties
index 16cc23c..daf8797 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -1 +1,2 @@
-version=2.1.0
\ No newline at end of file
+group=com.solace.connector.kafka.connect
+version=2.2.0
\ No newline at end of file
diff --git a/solace-integration-test-support b/solace-integration-test-support
index c411ac2..2d9cc15 160000
--- a/solace-integration-test-support
+++ b/solace-integration-test-support
@@ -1 +1 @@
-Subproject commit c411ac2e0f82af25ece2994691352cb0d6235142
+Subproject commit 2d9cc15cada6624218ff37e682027daba1627098
diff --git a/src/integrationTest/java/com/solace/connector/kafka/connect/sink/SolaceProducerHandlerIT.java b/src/integrationTest/java/com/solace/connector/kafka/connect/sink/SolaceProducerHandlerIT.java
new file mode 100644
index 0000000..9bf7c25
--- /dev/null
+++ b/src/integrationTest/java/com/solace/connector/kafka/connect/sink/SolaceProducerHandlerIT.java
@@ -0,0 +1,255 @@
+package com.solace.connector.kafka.connect.sink;
+
+import com.solace.connector.kafka.connect.sink.it.util.extensions.NetworkPubSubPlusExtension;
+import com.solacesystems.jcsmp.BytesXMLMessage;
+import com.solacesystems.jcsmp.ConsumerFlowProperties;
+import com.solacesystems.jcsmp.DeliveryMode;
+import com.solacesystems.jcsmp.Destination;
+import com.solacesystems.jcsmp.Endpoint;
+import com.solacesystems.jcsmp.FlowReceiver;
+import com.solacesystems.jcsmp.JCSMPException;
+import com.solacesystems.jcsmp.JCSMPFactory;
+import com.solacesystems.jcsmp.JCSMPProperties;
+import com.solacesystems.jcsmp.JCSMPSession;
+import com.solacesystems.jcsmp.Queue;
+import com.solacesystems.jcsmp.TextMessage;
+import com.solacesystems.jcsmp.Topic;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junitpioneer.jupiter.cartesian.CartesianTest;
+import org.junitpioneer.jupiter.cartesian.CartesianTest.Values;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+@ExtendWith(NetworkPubSubPlusExtension.class)
+public class SolaceProducerHandlerIT {
+ private Map properties;
+ private SolSessionHandler sessionHandler;
+ private SolProducerHandler producerHandler;
+
+ private static final Logger logger = LoggerFactory.getLogger(SolaceProducerHandlerIT.class);
+
+ @BeforeEach
+ void setUp(JCSMPProperties jcsmpProperties) {
+ properties = new HashMap<>();
+ properties.put(SolaceSinkConstants.SOL_HOST, jcsmpProperties.getStringProperty(JCSMPProperties.HOST));
+ properties.put(SolaceSinkConstants.SOL_VPN_NAME, jcsmpProperties.getStringProperty(JCSMPProperties.VPN_NAME));
+ properties.put(SolaceSinkConstants.SOL_USERNAME, jcsmpProperties.getStringProperty(JCSMPProperties.USERNAME));
+ properties.put(SolaceSinkConstants.SOL_PASSWORD, jcsmpProperties.getStringProperty(JCSMPProperties.PASSWORD));
+ }
+
+ @AfterEach
+ void tearDown() {
+ if (producerHandler != null) {
+ producerHandler.close();
+ }
+
+ if (sessionHandler != null) {
+ sessionHandler.shutdown();
+ }
+ }
+
+ @CartesianTest(name = "[{index}] txnQueue={0}, txnTopic={1}, sendToQueue={2}, sendToTopic={3}")
+ public void testStaticDestinations(@Values(booleans = {true, false}) boolean txnQueue,
+ @Values(booleans = {true, false}) boolean txnTopic,
+ @Values(booleans = {true, false}) boolean sendToQueue,
+ @Values(booleans = {true, false}) boolean sendToTopic,
+ JCSMPSession session,
+ Queue queue) throws Exception {
+ properties.put(SolaceSinkConstants.SOl_QUEUE, queue.getName());
+ properties.put(SolaceSinkConstants.SOL_TOPICS, RandomStringUtils.randomAlphanumeric(100));
+ properties.put(SolaceSinkConstants.SOl_USE_TRANSACTIONS_FOR_QUEUE, Boolean.toString(txnQueue));
+ properties.put(SolaceSinkConstants.SOl_USE_TRANSACTIONS_FOR_TOPICS, Boolean.toString(txnTopic));
+
+ SolaceSinkConnectorConfig config = new SolaceSinkConnectorConfig(properties);
+ sessionHandler = new SolSessionHandler(config);
+ sessionHandler.configureSession();
+ sessionHandler.connectSession();
+ producerHandler = new SolProducerHandler(config, sessionHandler, null);
+
+ assertNotNull(producerHandler.producer);
+ assertNotNull(producerHandler.queueProducer);
+ assertNotNull(producerHandler.topicProducer);
+ assertEquals(txnTopic ? producerHandler.transactedProducer : producerHandler.producer, producerHandler.topicProducer);
+
+ if (txnQueue || txnTopic) {
+ assertNotNull(sessionHandler.getTxSession());
+ assertNotNull(producerHandler.transactedProducer);
+ assertNotEquals(producerHandler.producer, producerHandler.transactedProducer);
+ } else {
+ assertNull(sessionHandler.getTxSession());
+ assertNull(producerHandler.transactedProducer);
+ }
+
+ if (txnQueue) {
+ assertEquals(producerHandler.transactedProducer, producerHandler.queueProducer);
+ } else {
+ assertNotEquals(producerHandler.transactedProducer, producerHandler.queueProducer);
+ }
+
+ if (sendToQueue) {
+ sendAndAssert(session, properties.get(SolaceSinkConstants.SOl_QUEUE), Queue.class, queue, txnQueue);
+ }
+
+ if (sendToTopic) {
+ sendAndAssert(session, properties.get(SolaceSinkConstants.SOL_TOPICS), Topic.class, queue, txnTopic);
+ }
+ }
+
+ @CartesianTest(name = "[{index}] txnQueue={0}, txnTopic={1}, sendToQueue={2}, sendToTopic={3}")
+ public void testLazyInit(@Values(booleans = {true, false}) boolean txnQueue,
+ @Values(booleans = {true, false}) boolean txnTopic,
+ @Values(booleans = {true, false}) boolean sendToQueue,
+ @Values(booleans = {true, false}) boolean sendToTopic,
+ JCSMPSession session,
+ Queue queue) throws Exception {
+ properties.put(SolaceSinkConstants.SOl_USE_TRANSACTIONS_FOR_QUEUE, Boolean.toString(txnQueue));
+ properties.put(SolaceSinkConstants.SOl_USE_TRANSACTIONS_FOR_TOPICS, Boolean.toString(txnTopic));
+
+ SolaceSinkConnectorConfig config = new SolaceSinkConnectorConfig(properties);
+ sessionHandler = new SolSessionHandler(config);
+ sessionHandler.configureSession();
+ sessionHandler.connectSession();
+ producerHandler = new SolProducerHandler(config, sessionHandler, null);
+
+ assertNotNull(producerHandler.producer);
+ assertNull(sessionHandler.getTxSession());
+ assertNull(producerHandler.transactedProducer);
+ assertNull(producerHandler.topicProducer);
+ assertNull(producerHandler.queueProducer);
+
+ if (sendToQueue) {
+ sendAndAssert(session, queue.getName(), Queue.class, queue, txnQueue);
+ }
+
+ if (sendToTopic) {
+ sendAndAssert(session, RandomStringUtils.randomAlphanumeric(100), Topic.class, queue, txnTopic);
+ }
+
+ if ((sendToQueue && txnQueue) || (sendToTopic && txnTopic)) {
+ assertNotNull(sessionHandler.getTxSession());
+ assertNotNull(producerHandler.transactedProducer);
+ assertNotEquals(producerHandler.producer, producerHandler.transactedProducer);
+ } else {
+ assertNull(sessionHandler.getTxSession());
+ assertNull(producerHandler.transactedProducer);
+ }
+
+ if (sendToQueue) {
+ assertNotNull(producerHandler.queueProducer);
+ if (txnQueue) {
+ assertEquals(producerHandler.transactedProducer, producerHandler.queueProducer);
+ } else {
+ assertNotEquals(producerHandler.transactedProducer, producerHandler.queueProducer);
+ }
+ } else {
+ assertNull(producerHandler.queueProducer);
+ }
+
+ if (sendToTopic) {
+ assertNotNull(producerHandler.topicProducer);
+ assertEquals(txnTopic ? producerHandler.transactedProducer : producerHandler.producer, producerHandler.topicProducer);
+ } else {
+ assertNull(producerHandler.topicProducer);
+ }
+ }
+
+ @Test
+ public void testAutoFlush(JCSMPSession session, Queue queue) throws Exception {
+ List topics = IntStream.range(0, 9)
+ .mapToObj(i -> RandomStringUtils.randomAlphanumeric(100))
+ .collect(Collectors.toList());
+ properties.put(SolaceSinkConstants.SOl_QUEUE, queue.getName());
+ properties.put(SolaceSinkConstants.SOL_TOPICS, String.join(",", topics));
+ properties.put(SolaceSinkConstants.SOl_USE_TRANSACTIONS_FOR_QUEUE, Boolean.toString(true));
+ properties.put(SolaceSinkConstants.SOl_USE_TRANSACTIONS_FOR_TOPICS, Boolean.toString(true));
+ properties.put(SolaceSinkConstants.SOL_AUTOFLUSH_SIZE, Integer.toString(2));
+ final int expectedNumCommits = (topics.size() + 1) /
+ Integer.parseInt(properties.get(SolaceSinkConstants.SOL_AUTOFLUSH_SIZE));
+
+ SolaceSinkConnectorConfig config = new SolaceSinkConnectorConfig(properties);
+ sessionHandler = new SolSessionHandler(config);
+ sessionHandler.configureSession();
+ sessionHandler.connectSession();
+ AtomicInteger numCommits = new AtomicInteger();
+ producerHandler = new SolProducerHandler(config, sessionHandler, () -> {
+ try {
+ sessionHandler.getTxSession().commit();
+ producerHandler.getTxMsgCount().set(0);
+ } catch (JCSMPException e) {
+ throw new RuntimeException(e);
+ }
+ logger.info("Commit #{}", numCommits.incrementAndGet());
+ });
+
+ sendAndAssert(session, queue.getName(), Queue.class, queue, true, false, false);
+ for (String topic : topics) {
+ sendAndAssert(session, topic, Topic.class, queue, true, false, false);
+ }
+
+ assertEquals(expectedNumCommits, numCommits.get());
+ }
+
+ private void sendAndAssert(JCSMPSession session, String destination, Class extends Destination> destinationType,
+ Endpoint receiveEndpoint, boolean isTransacted) throws JCSMPException {
+ sendAndAssert(session, destination, destinationType, receiveEndpoint, isTransacted, isTransacted, true);
+ }
+
+ private void sendAndAssert(JCSMPSession session, String destination, Class extends Destination> destinationType,
+ Endpoint receiveEndpoint, boolean isTransacted, boolean doCommit, boolean doConsume)
+ throws JCSMPException {
+ Destination sendDestination;
+ if (destinationType.isAssignableFrom(Queue.class)) {
+ sendDestination = JCSMPFactory.onlyInstance().createQueue(destination);
+ } else {
+ Topic topic = JCSMPFactory.onlyInstance().createTopic(destination);
+ session.addSubscription(receiveEndpoint, topic, JCSMPSession.WAIT_FOR_CONFIRM);
+ sendDestination = topic;
+ }
+
+ TextMessage message = Mockito.spy(JCSMPFactory.onlyInstance().createMessage(TextMessage.class));
+ message.setText(RandomStringUtils.randomAlphanumeric(100));
+ producerHandler.send(message, sendDestination);
+ if (doCommit) {
+ sessionHandler.getTxSession().commit();
+ }
+
+ if (doConsume) {
+ ConsumerFlowProperties consumerFlowProperties = new ConsumerFlowProperties();
+ consumerFlowProperties.setEndpoint(receiveEndpoint);
+ consumerFlowProperties.setStartState(true);
+ FlowReceiver flowReceiver = session.createFlow(null, consumerFlowProperties);
+ BytesXMLMessage receivedMessage;
+ try {
+ receivedMessage = flowReceiver.receive(Math.toIntExact(TimeUnit.SECONDS.toMillis(30)));
+ } finally {
+ flowReceiver.close();
+ }
+
+ assertInstanceOf(TextMessage.class, receivedMessage);
+ assertEquals(message.getText(), ((TextMessage) receivedMessage).getText());
+ assertEquals(destination, receivedMessage.getDestination().getName());
+ }
+
+ Mockito.verify(message).setDeliveryMode(destinationType.isAssignableFrom(Queue.class) || isTransacted ?
+ DeliveryMode.PERSISTENT : DeliveryMode.DIRECT);
+ }
+}
diff --git a/src/integrationTest/java/com/solace/connector/kafka/connect/sink/it/SinkConnectorIT.java b/src/integrationTest/java/com/solace/connector/kafka/connect/sink/it/SinkConnectorIT.java
index b439d30..c67f619 100644
--- a/src/integrationTest/java/com/solace/connector/kafka/connect/sink/it/SinkConnectorIT.java
+++ b/src/integrationTest/java/com/solace/connector/kafka/connect/sink/it/SinkConnectorIT.java
@@ -4,16 +4,21 @@
import com.google.gson.GsonBuilder;
import com.google.gson.JsonObject;
import com.solace.connector.kafka.connect.sink.SolaceSinkConstants;
+import com.solace.connector.kafka.connect.sink.it.util.ThrowingFunction;
import com.solace.connector.kafka.connect.sink.it.util.extensions.KafkaArgumentsProvider;
-import com.solace.connector.kafka.connect.sink.it.util.extensions.KafkaArgumentsProvider.KafkaArgumentSource;
import com.solace.connector.kafka.connect.sink.it.util.extensions.KafkaArgumentsProvider.KafkaContext;
+import com.solace.connector.kafka.connect.sink.it.util.extensions.KafkaArgumentsProvider.KafkaSource;
import com.solace.connector.kafka.connect.sink.it.util.extensions.NetworkPubSubPlusExtension;
+import com.solace.connector.kafka.connect.sink.recordprocessor.SolDynamicDestinationRecordProcessor;
import com.solace.test.integration.junit.jupiter.extension.ExecutorServiceExtension;
import com.solace.test.integration.junit.jupiter.extension.ExecutorServiceExtension.ExecSvc;
+import com.solace.test.integration.junit.jupiter.extension.PubSubPlusExtension.JCSMPProxy;
+import com.solace.test.integration.junit.jupiter.extension.PubSubPlusExtension.ToxiproxyContext;
import com.solace.test.integration.semp.v2.SempV2Api;
import com.solace.test.integration.semp.v2.config.model.ConfigMsgVpnQueue;
+import com.solace.test.integration.semp.v2.config.model.ConfigMsgVpnQueueSubscription;
import com.solacesystems.jcsmp.BytesXMLMessage;
-import com.solacesystems.jcsmp.EndpointProperties;
+import com.solacesystems.jcsmp.Destination;
import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jcsmp.JCSMPFactory;
import com.solacesystems.jcsmp.JCSMPProperties;
@@ -21,9 +26,16 @@
import com.solacesystems.jcsmp.Queue;
import com.solacesystems.jcsmp.SDTException;
import com.solacesystems.jcsmp.SDTMap;
+import eu.rekawek.toxiproxy.model.ToxicDirection;
+import eu.rekawek.toxiproxy.model.toxic.Latency;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.kafka.clients.admin.ConsumerGroupListing;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
@@ -34,8 +46,8 @@
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
-import org.junitpioneer.jupiter.CartesianProductTest;
-import org.junitpioneer.jupiter.CartesianValueSource;
+import org.junitpioneer.jupiter.cartesian.CartesianTest;
+import org.junitpioneer.jupiter.cartesian.CartesianTest.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.WaitingConsumer;
@@ -43,6 +55,7 @@
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -50,10 +63,14 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasItems;
+import static org.hamcrest.Matchers.hasSize;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -82,9 +99,9 @@ enum AdditionalCheck { ATTACHMENTBYTEBUFFER, CORRELATIONID }
static void setUp(JCSMPSession jcsmpSession) throws JCSMPException {
// Start consumer
// Ensure test queue exists on PubSub+
- solaceTopicConsumer = new TestSolaceTopicConsumer(jcsmpSession);
+ solaceTopicConsumer = new TestSolaceTopicConsumer(jcsmpSession); //TODO make this dynamic for concurrency
solaceTopicConsumer.start();
- solaceQueueConsumer = new TestSolaceQueueConsumer(jcsmpSession);
+ solaceQueueConsumer = new TestSolaceQueueConsumer(jcsmpSession); //TODO make this dynamic for concurrency
solaceQueueConsumer.provisionQueue(SOL_QUEUE);
solaceQueueConsumer.start();
}
@@ -109,16 +126,18 @@ static void cleanUp() {
////////////////////////////////////////////////////
// Test types
- void messageToKafkaTest(TestKafkaProducer producer, String expectedSolaceQueue, String[] expectedSolaceTopics, String kafkaKey, String kafkaValue,
- Map additionalChecks) {
+ void messageToKafkaTest(KafkaContext kafkaContext,
+ String expectedSolaceQueue,
+ String[] expectedSolaceTopics,
+ String kafkaKey,
+ String kafkaValue,
+ Map additionalChecks) {
try {
clearReceivedMessages();
- RecordMetadata metadata = sendMessagetoKafka(producer, kafkaKey, kafkaValue);
- assertMessageReceived(expectedSolaceQueue, expectedSolaceTopics, metadata, additionalChecks);
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (SDTException e) {
+ RecordMetadata metadata = sendMessagetoKafka(kafkaContext.getProducer(), kafkaKey, kafkaValue);
+ assertMessageReceived(kafkaContext, expectedSolaceQueue, expectedSolaceTopics, metadata, additionalChecks);
+ } catch (InterruptedException | SDTException e) {
e.printStackTrace();
}
}
@@ -137,8 +156,44 @@ RecordMetadata sendMessagetoKafka(TestKafkaProducer producer, String kafkaKey, S
return metadata;
}
- void assertMessageReceived(String expectedSolaceQueue, String[] expectedSolaceTopics, RecordMetadata metadata,
- Map additionalChecks) throws SDTException, InterruptedException {
+ List assertMessageReceived(KafkaContext kafkaContext,
+ String expectedSolaceQueue,
+ String[] expectedSolaceTopics,
+ RecordMetadata metadata,
+ Map additionalChecks)
+ throws SDTException, InterruptedException {
+ assertTimeoutPreemptively(Duration.ofMinutes(5), () -> {
+ boolean isCommitted;
+ do {
+ Stream groupIds = kafkaContext.getAdminClient().listConsumerGroups().all().get().stream()
+ .map(ConsumerGroupListing::groupId);
+
+ Stream> partitionsToOffsets = groupIds
+ .map(groupId -> kafkaContext.getAdminClient().listConsumerGroupOffsets(groupId))
+ .map(ListConsumerGroupOffsetsResult::partitionsToOffsetAndMetadata)
+ .map((ThrowingFunction>,
+ Map>) KafkaFuture::get)
+ .map(Map::entrySet)
+ .flatMap(Collection::stream);
+
+ Long partitionOffset = partitionsToOffsets
+ .filter(e -> e.getKey().topic().equals(metadata.topic()))
+ .filter(e -> e.getKey().partition() == metadata.partition())
+ .map(Map.Entry::getValue)
+ .map(OffsetAndMetadata::offset)
+ .findAny()
+ .orElse(null);
+
+ isCommitted = partitionOffset != null && partitionOffset >= metadata.offset();
+
+ if (!isCommitted) {
+ logger.info("Waiting for record {} to be committed. Partition offset: {}", metadata, partitionOffset);
+ Thread.sleep(TimeUnit.SECONDS.toMillis(1));
+ }
+ } while (!isCommitted);
+ logger.info("Record {} was committed", metadata);
+ });
+
List receivedMessages = new ArrayList<>();
// Wait for PubSub+ to report messages - populate queue and topics if provided
@@ -188,6 +243,8 @@ void assertMessageReceived(String expectedSolaceQueue, String[] expectedSolaceTo
}
}
}
+
+ return receivedMessages;
}
////////////////////////////////////////////////////
@@ -214,7 +271,7 @@ void setUp() {
@ArgumentsSource(KafkaArgumentsProvider.class)
void kafkaConsumerTextMessageToTopicTest(KafkaContext kafkaContext) {
kafkaContext.getSolaceConnectorDeployment().startConnector(connectorProps);
- messageToKafkaTest(kafkaContext.getProducer(), SOL_QUEUE, topics,
+ messageToKafkaTest(kafkaContext, SOL_QUEUE, topics,
// kafka key and value
"Key", "Hello TextMessageToTopicTest world!",
// additional checks
@@ -245,7 +302,7 @@ void setUp() {
@ArgumentsSource(KafkaArgumentsProvider.class)
void kafkaConsumerTextMessageToTopicTest(KafkaContext kafkaContext) {
kafkaContext.getSolaceConnectorDeployment().startConnector(connectorProps);
- messageToKafkaTest(kafkaContext.getProducer(), SOL_QUEUE, topics,
+ messageToKafkaTest(kafkaContext, SOL_QUEUE, topics,
// kafka key and value
"Key", "Hello TextMessageToTopicTest world!",
// additional checks
@@ -276,7 +333,7 @@ void setUp() {
@ArgumentsSource(KafkaArgumentsProvider.class)
void kafkaConsumerTextMessageToTopicTest(KafkaContext kafkaContext) {
kafkaContext.getSolaceConnectorDeployment().startConnector(connectorProps);
- messageToKafkaTest(kafkaContext.getProducer(), SOL_QUEUE, topics,
+ messageToKafkaTest(kafkaContext, SOL_QUEUE, topics,
// kafka key and value
"Destination", "Hello TextMessageToTopicTest world!",
// additional checks with expected values
@@ -308,7 +365,7 @@ void setUp() {
@ArgumentsSource(KafkaArgumentsProvider.class)
void kafkaConsumerTextMessageToTopicTest(KafkaContext kafkaContext) {
kafkaContext.getSolaceConnectorDeployment().startConnector(connectorProps);
- messageToKafkaTest(kafkaContext.getProducer(), SOL_QUEUE, topics,
+ messageToKafkaTest(kafkaContext, SOL_QUEUE, topics,
// kafka key and value
"TestCorrelationId", "Hello TextMessageToTopicTest world!",
// additional checks with expected values
@@ -340,7 +397,7 @@ void setUp() {
@ArgumentsSource(KafkaArgumentsProvider.class)
void kafkaConsumerTextMessageToTopicTest(KafkaContext kafkaContext) {
kafkaContext.getSolaceConnectorDeployment().startConnector(connectorProps);
- messageToKafkaTest(kafkaContext.getProducer(), SOL_QUEUE, topics,
+ messageToKafkaTest(kafkaContext, SOL_QUEUE, topics,
// kafka key and value
"TestCorrelationId", "Hello TextMessageToTopicTest world!",
// additional checks with expected values
@@ -359,7 +416,7 @@ class SinkDynamicDestinationMessageProcessorMessageProcessorTests {
@BeforeEach
void setUp() {
- connectorProps.setProperty("sol.record_processor_class", "com.solace.connector.kafka.connect.sink.recordprocessor.SolDynamicDestinationRecordProcessor");
+ connectorProps.setProperty("sol.record_processor_class", SolDynamicDestinationRecordProcessor.class.getName());
connectorProps.setProperty("sol.dynamic_destination", "true");
connectorProps.setProperty("sol.topics", String.join(", ", topics));
connectorProps.setProperty("sol.queue", SOL_QUEUE);
@@ -367,11 +424,14 @@ void setUp() {
@DisplayName("TextMessage-DynamicDestinationMessageProcessor-start")
- @ParameterizedTest
- @ArgumentsSource(KafkaArgumentsProvider.class)
- void kafkaConsumerTextMessageToTopicTest(KafkaContext kafkaContext) {
+ @CartesianTest(name = "[{index}] transacted={0}, autoFlush={1}, kafka={2}")
+ void kafkaConsumerTextMessageToTopicTest(@Values(booleans = { true, false }) boolean transacted,
+ @Values(booleans = { true, false }) boolean autoFlush,
+ @KafkaSource KafkaContext kafkaContext) {
+ connectorProps.setProperty(SolaceSinkConstants.SOl_USE_TRANSACTIONS_FOR_TOPICS, Boolean.toString(transacted));
+ connectorProps.setProperty(SolaceSinkConstants.SOL_AUTOFLUSH_SIZE, Integer.toString(autoFlush ? 1 : 200));
kafkaContext.getSolaceConnectorDeployment().startConnector(connectorProps);
- messageToKafkaTest(kafkaContext.getProducer(),
+ messageToKafkaTest(kafkaContext,
// expected list of delivery queue and topics
null, new String[] {"ctrl/bus/1234/start"},
// kafka key and value
@@ -381,11 +441,14 @@ void kafkaConsumerTextMessageToTopicTest(KafkaContext kafkaContext) {
}
@DisplayName("TextMessage-DynamicDestinationMessageProcessor-stop")
- @ParameterizedTest
- @ArgumentsSource(KafkaArgumentsProvider.class)
- void kafkaConsumerTextMessageToTopicTest2(KafkaContext kafkaContext) {
+ @CartesianTest(name = "[{index}] transacted={0}, autoFlush={1}, kafka={2}")
+ void kafkaConsumerTextMessageToTopicTest2(@Values(booleans = { true, false }) boolean transacted,
+ @Values(booleans = { true, false }) boolean autoFlush,
+ @KafkaSource KafkaContext kafkaContext) {
+ connectorProps.setProperty(SolaceSinkConstants.SOl_USE_TRANSACTIONS_FOR_TOPICS, Boolean.toString(transacted));
+ connectorProps.setProperty(SolaceSinkConstants.SOL_AUTOFLUSH_SIZE, Integer.toString(autoFlush ? 1 : 200));
kafkaContext.getSolaceConnectorDeployment().startConnector(connectorProps);
- messageToKafkaTest(kafkaContext.getProducer(),
+ messageToKafkaTest(kafkaContext,
// expected list of delivery queue and topics
null, new String[] {"ctrl/bus/1234/stop"},
// kafka key and value
@@ -395,11 +458,14 @@ void kafkaConsumerTextMessageToTopicTest2(KafkaContext kafkaContext) {
}
@DisplayName("TextMessage-DynamicDestinationMessageProcessor-other")
- @ParameterizedTest
- @ArgumentsSource(KafkaArgumentsProvider.class)
- void kafkaConsumerTextMessageToTopicTest3(KafkaContext kafkaContext) {
+ @CartesianTest(name = "[{index}] transacted={0}, autoFlush={1}, kafka={2}")
+ void kafkaConsumerTextMessageToTopicTest3(@Values(booleans = { true, false }) boolean transacted,
+ @Values(booleans = { true, false }) boolean autoFlush,
+ @KafkaSource KafkaContext kafkaContext) {
+ connectorProps.setProperty(SolaceSinkConstants.SOl_USE_TRANSACTIONS_FOR_TOPICS, Boolean.toString(transacted));
+ connectorProps.setProperty(SolaceSinkConstants.SOL_AUTOFLUSH_SIZE, Integer.toString(autoFlush ? 1 : 200));
kafkaContext.getSolaceConnectorDeployment().startConnector(connectorProps);
- messageToKafkaTest(kafkaContext.getProducer(),
+ messageToKafkaTest(kafkaContext,
// expected list of delivery queue and topics
null, new String[] {"comms/bus/1234"},
// kafka key and value
@@ -432,31 +498,133 @@ void testFailPubSubConnection(KafkaContext kafkaContext) {
}, () -> "Timed out waiting for connector to fail: " + GSON.toJson(connectorStatus.get()));
}
- @CartesianProductTest(name = "[{index}] autoFlush={0}, kafka={1}")
- @CartesianValueSource(booleans = { true, false })
- @KafkaArgumentSource
- void testCommitRollback(boolean autoFlush, KafkaContext kafkaContext,
- JCSMPSession jcsmpSession, SempV2Api sempV2Api,
+ @CartesianTest(name = "[{index}] dynamicDestination={0}, autoFlush={1}, kafka={2}")
+ void testRebalancedKafkaConsumers(@Values(booleans = { false, true }) boolean dynamicDestination,
+ @Values(booleans = { false, true }) boolean autoFlush,
+ @KafkaSource KafkaContext kafkaContext,
+ @JCSMPProxy ToxiproxyContext jcsmpProxyContext,
+ @ExecSvc(poolSize = 2, scheduled = true) ScheduledExecutorService executorService)
+ throws Exception {
+ String topicName;
+
+ if (dynamicDestination) {
+ topicName = "comms/bus/" + RandomStringUtils.randomAlphanumeric(4);
+ connectorProps.setProperty(SolaceSinkConstants.SOL_DYNAMIC_DESTINATION, Boolean.toString(true));
+ connectorProps.setProperty(SolaceSinkConstants.SOL_RECORD_PROCESSOR, SolDynamicDestinationRecordProcessor.class.getName());
+ connectorProps.setProperty(SolaceSinkConstants.SOL_AUTOFLUSH_SIZE, Integer.toString(autoFlush ? 1 : 100));
+ } else {
+ topicName = SOL_ROOT_TOPIC + "/" + RandomStringUtils.randomAlphanumeric(100);
+ connectorProps.setProperty(SolaceSinkConstants.SOl_QUEUE, SOL_QUEUE);
+ connectorProps.setProperty(SolaceSinkConstants.SOL_TOPICS, topicName);
+ connectorProps.setProperty(SolaceSinkConstants.SOL_AUTOFLUSH_SIZE, Integer.toString(autoFlush ? 2 : 100));
+ }
+
+ connectorProps.setProperty(SolaceSinkConstants.SOL_HOST, String.format("tcp://%s:%s",
+ jcsmpProxyContext.getDockerNetworkAlias(), jcsmpProxyContext.getProxy().getOriginalProxyPort()));
+ connectorProps.setProperty(SolaceSinkConstants.SOl_USE_TRANSACTIONS_FOR_QUEUE, Boolean.toString(true));
+ connectorProps.setProperty(SolaceSinkConstants.SOl_USE_TRANSACTIONS_FOR_TOPICS, Boolean.toString(true));
+ connectorProps.setProperty(SolaceSinkConstants.SOL_CHANNEL_PROPERTY_reconnectRetries, Integer.toString(-1));
+ connectorProps.setProperty("errors.retry.timeout", Long.toString(-1));
+ connectorProps.setProperty("consumer.override.max.poll.interval.ms", "1000");
+
+ kafkaContext.getSolaceConnectorDeployment().startConnector(connectorProps);
+
+ Latency lag = jcsmpProxyContext.getProxy().toxics()
+ .latency("lag", ToxicDirection.UPSTREAM, TimeUnit.DAYS.toMillis(1));
+
+ clearReceivedMessages();
+ String recordValue = randomAlphanumeric(100);
+ Future recordMetadataFuture = executorService.schedule(() ->
+ sendMessagetoKafka(kafkaContext.getProducer(), randomAlphanumeric(100),
+ dynamicDestination ?
+ topicName.substring("comms/bus/".length()) + ":" + recordValue :
+ recordValue),
+ 5, TimeUnit.SECONDS);
+
+ logger.info("Waiting for Kafka poll interval to expire...");
+ WaitingConsumer logConsumer = new WaitingConsumer();
+ kafkaContext.getConnection().getConnectContainer().followOutput(logConsumer);
+ logConsumer.waitUntil(frame -> frame.getUtf8String().contains("consumer poll timeout has expired"),
+ 1, TimeUnit.MINUTES);
+ logConsumer.waitUntil(frame -> frame.getUtf8String().contains("Connection attempt failed to host"),
+ 1, TimeUnit.MINUTES);
+ Thread.sleep(TimeUnit.SECONDS.toMillis(5));
+
+ assertEquals("RUNNING",
+ kafkaContext.getSolaceConnectorDeployment().getConnectorStatus()
+ .getAsJsonArray("tasks").get(0).getAsJsonObject().get("state").getAsString());
+
+ logger.info("Removing toxic {}", lag.getName());
+ lag.remove();
+
+ logger.info("Checking consumption of message");
+ assertMessageReceived(kafkaContext, dynamicDestination ? null : SOL_QUEUE,
+ new String[]{topicName},
+ recordMetadataFuture.get(30, TimeUnit.SECONDS),
+ ImmutableMap.of(AdditionalCheck.ATTACHMENTBYTEBUFFER, recordValue));
+
+ logger.info("Sending a new message to verify that Solace producers & Kafka consumers have recovered");
+ clearReceivedMessages();
+ String newRecordValue = randomAlphanumeric(100);
+ messageToKafkaTest(kafkaContext, dynamicDestination ? null : SOL_QUEUE,
+ new String[]{topicName},
+ randomAlphanumeric(100), dynamicDestination ?
+ topicName.substring("comms/bus/".length()) + ":" + newRecordValue :
+ newRecordValue,
+ ImmutableMap.of(AdditionalCheck.ATTACHMENTBYTEBUFFER, newRecordValue));
+ }
+
+ @CartesianTest(name = "[{index}] dynamicDestination={0}, autoFlush={1}, kafka={2}")
+ void testCommitRollback(@Values(booleans = { false, true }) boolean dynamicDestination,
+ @Values(booleans = { false, true }) boolean autoFlush,
+ @KafkaSource KafkaContext kafkaContext,
+ JCSMPSession jcsmpSession,
+ SempV2Api sempV2Api,
+ Queue queue,
@ExecSvc(poolSize = 2, scheduled = true) ScheduledExecutorService executorService)
throws Exception {
- Queue queue = JCSMPFactory.onlyInstance().createQueue(randomAlphanumeric(100));
+ String topicName;
+ if (dynamicDestination) {
+ topicName = "comms/bus/" + RandomStringUtils.randomAlphanumeric(4);
+ connectorProps.setProperty(SolaceSinkConstants.SOL_DYNAMIC_DESTINATION, Boolean.toString(true));
+ connectorProps.setProperty(SolaceSinkConstants.SOL_RECORD_PROCESSOR, SolDynamicDestinationRecordProcessor.class.getName());
+ connectorProps.setProperty(SolaceSinkConstants.SOL_AUTOFLUSH_SIZE, Integer.toString(autoFlush ? 1 : 200));
+ } else {
+ topicName = RandomStringUtils.randomAlphanumeric(100);
+ connectorProps.setProperty(SolaceSinkConstants.SOl_QUEUE, queue.getName());
+ connectorProps.setProperty(SolaceSinkConstants.SOL_TOPICS, topicName);
+ connectorProps.setProperty(SolaceSinkConstants.SOL_AUTOFLUSH_SIZE, Integer.toString(autoFlush ? 2 : 200));
+ }
+
+ connectorProps.setProperty(SolaceSinkConstants.SOl_USE_TRANSACTIONS_FOR_QUEUE, Boolean.toString(true));
+ connectorProps.setProperty(SolaceSinkConstants.SOl_USE_TRANSACTIONS_FOR_TOPICS, Boolean.toString(true));
+ connectorProps.setProperty("errors.retry.timeout", Long.toString(-1));
+
+ sempV2Api.config().updateMsgVpnQueue(SOL_VPN, queue.getName(), new ConfigMsgVpnQueue().maxMsgSize(1),
+ null);
+ sempV2Api.config().createMsgVpnQueueSubscription(SOL_VPN, queue.getName(),
+ new ConfigMsgVpnQueueSubscription().subscriptionTopic(topicName), null);
+ assertTimeoutPreemptively(Duration.ofSeconds(30), () -> {
+ while (sempV2Api.monitor().getMsgVpnQueue(SOL_VPN, queue.getName(), null).getData()
+ .getMaxMsgSize() != 1) {
+ logger.info("Waiting for queue {} to have max message size of 1", queue.getName());
+ Thread.sleep(100);
+ }
+ });
try (TestSolaceQueueConsumer solaceConsumer1 = new TestSolaceQueueConsumer(jcsmpSession)) {
- EndpointProperties endpointProperties = new EndpointProperties();
- endpointProperties.setMaxMsgSize(1);
- solaceConsumer1.provisionQueue(queue.getName(), endpointProperties);
+ solaceConsumer1.setQueueName(queue.getName());
solaceConsumer1.start();
- connectorProps.setProperty(SolaceSinkConstants.SOl_QUEUE, queue.getName());
- connectorProps.setProperty(SolaceSinkConstants.SOL_QUEUE_MESSAGES_AUTOFLUSH_SIZE, Integer.toString(autoFlush ? 1 : 100));
- connectorProps.setProperty(SolaceSinkConstants.SOl_USE_TRANSACTIONS_FOR_QUEUE, Boolean.toString(true));
- connectorProps.setProperty("errors.retry.timeout", Long.toString(-1));
kafkaContext.getSolaceConnectorDeployment().startConnector(connectorProps);
clearReceivedMessages();
String recordValue = randomAlphanumeric(100);
- Future recordMetadata = executorService.schedule(() ->
- sendMessagetoKafka(kafkaContext.getProducer(), randomAlphanumeric(100), recordValue),
+ Future recordMetadataFuture = executorService.schedule(() ->
+ sendMessagetoKafka(kafkaContext.getProducer(), randomAlphanumeric(100),
+ dynamicDestination ?
+ topicName.substring("comms/bus/".length()) + ":" + recordValue :
+ recordValue),
5, TimeUnit.SECONDS);
WaitingConsumer logConsumer = new WaitingConsumer();
@@ -472,15 +640,42 @@ void testCommitRollback(boolean autoFlush, KafkaContext kafkaContext,
}
Thread.sleep(5000);
- Assertions.assertEquals("RUNNING",
- kafkaContext.getSolaceConnectorDeployment().getConnectorStatus()
- .getAsJsonArray("tasks").get(0).getAsJsonObject().get("state").getAsString());
+ assertEquals("RUNNING", kafkaContext.getSolaceConnectorDeployment().getConnectorStatus()
+ .getAsJsonArray("tasks").get(0).getAsJsonObject().get("state").getAsString());
+
+ executorService.schedule(() -> {
+ logger.info("Restoring max message size for queue {}", queue.getName());
+ return sempV2Api.config().updateMsgVpnQueue(SOL_VPN, queue.getName(),
+ new ConfigMsgVpnQueue().maxMsgSize(10000000), null);
+ }, 5, TimeUnit.SECONDS);
+
+ logger.info("Waiting for Solace transaction to be committed");
+ logConsumer.waitUntil(frame -> frame.getUtf8String()
+ .contains("Committed Solace records for transaction"), 5, TimeUnit.MINUTES);
+
+ logger.info("Sending another message to Kafka since Kafka Connect won't commit existing messages " +
+ "after a retry until a new message is received");
+ sendMessagetoKafka(kafkaContext.getProducer(), randomAlphanumeric(100), randomAlphanumeric(100));
+
+ List receivedMsgDestinations = new ArrayList<>();
+ while (receivedMsgDestinations.size() < (dynamicDestination ? 1 : 2)) {
+ logger.info("Checking consumption of messages");
+ receivedMsgDestinations.addAll(assertMessageReceived(kafkaContext, queue.getName(), new String[0],
+ recordMetadataFuture.get(30, TimeUnit.SECONDS),
+ ImmutableMap.of(AdditionalCheck.ATTACHMENTBYTEBUFFER, recordValue))
+ .stream()
+ .map(BytesXMLMessage::getDestination)
+ .collect(Collectors.toList()));
+ }
- sempV2Api.config().updateMsgVpnQueue(SOL_VPN, queue.getName(), new ConfigMsgVpnQueue().maxMsgSize(10000000), null);
- assertMessageReceived(queue.getName(), new String[0], recordMetadata.get(30, TimeUnit.SECONDS),
- ImmutableMap.of(AdditionalCheck.ATTACHMENTBYTEBUFFER, recordValue));
- } finally {
- jcsmpSession.deprovision(queue, JCSMPSession.FLAG_IGNORE_DOES_NOT_EXIST);
+ if (dynamicDestination) {
+ assertThat(receivedMsgDestinations, hasSize(1));
+ assertThat(receivedMsgDestinations, hasItems(JCSMPFactory.onlyInstance().createTopic(topicName)));
+ } else {
+ assertThat(receivedMsgDestinations, hasSize(2));
+ assertThat(receivedMsgDestinations, hasItems(queue,
+ JCSMPFactory.onlyInstance().createTopic(topicName)));
+ }
}
}
}
diff --git a/src/integrationTest/java/com/solace/connector/kafka/connect/sink/it/SolaceSinkTaskIT.java b/src/integrationTest/java/com/solace/connector/kafka/connect/sink/it/SolaceSinkTaskIT.java
index c38b12a..8fc4286 100644
--- a/src/integrationTest/java/com/solace/connector/kafka/connect/sink/it/SolaceSinkTaskIT.java
+++ b/src/integrationTest/java/com/solace/connector/kafka/connect/sink/it/SolaceSinkTaskIT.java
@@ -1,26 +1,41 @@
package com.solace.connector.kafka.connect.sink.it;
import com.solace.connector.kafka.connect.sink.SolRecordProcessorIF;
+import com.solace.connector.kafka.connect.sink.SolSessionEventCallbackHandler;
import com.solace.connector.kafka.connect.sink.SolaceSinkConstants;
import com.solace.connector.kafka.connect.sink.SolaceSinkSender;
import com.solace.connector.kafka.connect.sink.SolaceSinkTask;
import com.solace.connector.kafka.connect.sink.it.util.extensions.NetworkPubSubPlusExtension;
import com.solace.connector.kafka.connect.sink.recordprocessor.SolDynamicDestinationRecordProcessor;
+import com.solace.connector.kafka.connect.sink.recordprocessor.SolSimpleRecordProcessor;
import com.solace.test.integration.junit.jupiter.extension.ExecutorServiceExtension;
import com.solace.test.integration.junit.jupiter.extension.ExecutorServiceExtension.ExecSvc;
import com.solace.test.integration.junit.jupiter.extension.LogCaptorExtension;
import com.solace.test.integration.junit.jupiter.extension.LogCaptorExtension.LogCaptor;
+import com.solace.test.integration.junit.jupiter.extension.PubSubPlusExtension.JCSMPProxy;
+import com.solace.test.integration.junit.jupiter.extension.PubSubPlusExtension.ToxiproxyContext;
import com.solace.test.integration.semp.v2.SempV2Api;
import com.solace.test.integration.semp.v2.config.model.ConfigMsgVpnClientProfile;
import com.solace.test.integration.semp.v2.config.model.ConfigMsgVpnClientUsername;
import com.solace.test.integration.semp.v2.config.model.ConfigMsgVpnQueue;
+import com.solace.test.integration.semp.v2.config.model.ConfigMsgVpnQueueSubscription;
import com.solacesystems.jcsmp.BytesXMLMessage;
import com.solacesystems.jcsmp.ClosedFacilityException;
+import com.solacesystems.jcsmp.ConsumerFlowProperties;
+import com.solacesystems.jcsmp.Destination;
+import com.solacesystems.jcsmp.FlowReceiver;
import com.solacesystems.jcsmp.JCSMPException;
+import com.solacesystems.jcsmp.JCSMPFactory;
import com.solacesystems.jcsmp.JCSMPProperties;
+import com.solacesystems.jcsmp.JCSMPSession;
import com.solacesystems.jcsmp.Queue;
import com.solacesystems.jcsmp.SDTException;
+import com.solacesystems.jcsmp.SessionEvent;
+import com.solacesystems.jcsmp.Topic;
+import com.solacesystems.jcsmp.XMLMessage;
import com.solacesystems.jcsmp.transaction.RollbackException;
+import eu.rekawek.toxiproxy.model.ToxicDirection;
+import eu.rekawek.toxiproxy.model.toxic.Latency;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -36,6 +51,8 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import org.junitpioneer.jupiter.cartesian.CartesianTest;
+import org.junitpioneer.jupiter.cartesian.CartesianTest.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,8 +60,10 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -54,8 +73,10 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
@@ -155,14 +176,19 @@ public void testFailTransactedSessionCreation(SempV2Api sempV2Api, Queue queue)
new ConfigMsgVpnClientProfile().allowTransactedSessionsEnabled(false), null);
ConnectException thrown = assertThrows(ConnectException.class, () -> solaceSinkTask.start(connectorProperties));
- assertThat(thrown.getMessage(), containsString("Failed to create Transacted Session"));
assertThat(thrown.getCause(), instanceOf(JCSMPException.class));
assertThat(thrown.getCause().getMessage(), containsString("Router does not support transacted sessions"));
}
- @Test
- public void testSendToTopicThrowsJCSMPException() {
- connectorProperties.put(SolaceSinkConstants.SOL_TOPICS, RandomStringUtils.randomAlphanumeric(100));
+ @ParameterizedTest
+ @ValueSource(classes = {Queue.class, Topic.class})
+ public void testSendThrowsJCSMPException(Class destinationType, Queue queue) {
+ if (destinationType.isAssignableFrom(Queue.class)) {
+ connectorProperties.put(SolaceSinkConstants.SOl_QUEUE, queue.getName());
+ } else {
+ connectorProperties.put(SolaceSinkConstants.SOL_TOPICS, RandomStringUtils.randomAlphanumeric(100));
+ }
+
solaceSinkTask.start(connectorProperties);
SinkRecord sinkRecord = new SinkRecord(RandomStringUtils.randomAlphanumeric(100), 0,
@@ -173,37 +199,28 @@ public void testSendToTopicThrowsJCSMPException() {
ConnectException thrown = assertThrows(ConnectException.class, () -> solaceSinkTask.put(
Collections.singleton(sinkRecord)));
assertThat(thrown, instanceOf(RetriableException.class));
- assertThat(thrown.getMessage(), containsString("Received exception while sending message to topic"));
+ assertThat(thrown.getMessage(), containsString("Received exception while sending message to " +
+ (destinationType.isAssignableFrom(Queue.class) ? "queue" : "topic")));
assertThat(thrown.getCause(), instanceOf(ClosedFacilityException.class));
}
- @Test
- public void testSendToQueueThrowsJCSMPException(Queue queue) {
- connectorProperties.put(SolaceSinkConstants.SOl_QUEUE, queue.getName());
+ @ParameterizedTest(name = "[{index}] destinationType={0}")
+ @ValueSource(classes = {Queue.class, Topic.class})
+ public void testDynamicSendThrowsJCSMPException(Class destinationType, Queue queue) {
+ connectorProperties.put(SolaceSinkConstants.SOL_DYNAMIC_DESTINATION, Boolean.toString(true));
+ connectorProperties.put(SolaceSinkConstants.SOL_RECORD_PROCESSOR, DynamicDestinationTypeRecordProcessor.class
+ .getName());
solaceSinkTask.start(connectorProperties);
SinkRecord sinkRecord = new SinkRecord(RandomStringUtils.randomAlphanumeric(100), 0,
Schema.STRING_SCHEMA, RandomStringUtils.randomAlphanumeric(100),
Schema.BYTES_SCHEMA, RandomUtils.nextBytes(10), 0);
- solaceSinkTask.stop();
- ConnectException thrown = assertThrows(ConnectException.class, () -> solaceSinkTask.put(
- Collections.singleton(sinkRecord)));
- assertThat(thrown, instanceOf(RetriableException.class));
- assertThat(thrown.getMessage(), containsString("Received exception while sending message to queue"));
- assertThat(thrown.getCause(), instanceOf(ClosedFacilityException.class));
- }
-
- @Test
- public void testSendToDynamicTopicThrowsJCSMPException() {
- connectorProperties.put(SolaceSinkConstants.SOL_DYNAMIC_DESTINATION, "true");
- connectorProperties.put(SolaceSinkConstants.SOL_RECORD_PROCESSOR, SolDynamicDestinationRecordProcessor.class.getName());
- solaceSinkTask.start(connectorProperties);
-
- SinkRecord sinkRecord = new SinkRecord(RandomStringUtils.randomAlphanumeric(100), 0,
- Schema.STRING_SCHEMA, RandomStringUtils.randomAlphanumeric(100),
- Schema.BYTES_SCHEMA, String.format("%s %s", RandomStringUtils.randomAlphanumeric(4),
- RandomStringUtils.randomAlphanumeric(100)).getBytes(StandardCharsets.UTF_8), 0);
+ String dynamicDestinationName = destinationType.isAssignableFrom(Queue.class) ? queue.getName() :
+ RandomStringUtils.randomAlphanumeric(100);
+ sinkRecord.headers()
+ .addString(DynamicDestinationTypeRecordProcessor.HEADER_DYNAMIC_DESTINATION, dynamicDestinationName)
+ .addString(DynamicDestinationTypeRecordProcessor.HEADER_DYNAMIC_DESTINATION_TYPE, destinationType.getName());
solaceSinkTask.stop();
ConnectException thrown = assertThrows(ConnectException.class, () -> solaceSinkTask.put(
@@ -282,12 +299,20 @@ public void testRecordProcessorError(boolean ignoreRecordProcessorError,
}
}
- @Test
- public void testCommitRollback(SempV2Api sempV2Api, Queue queue) throws Exception {
+ @ParameterizedTest(name = "[{index}] autoFlush={0}")
+ @ValueSource(booleans = {false, true})
+ public void testCommitRollback(boolean autoFlush, SempV2Api sempV2Api, Queue queue) throws Exception {
connectorProperties.put(SolaceSinkConstants.SOl_QUEUE, queue.getName());
+ connectorProperties.put(SolaceSinkConstants.SOL_TOPICS, RandomStringUtils.randomAlphanumeric(100));
connectorProperties.put(SolaceSinkConstants.SOl_USE_TRANSACTIONS_FOR_QUEUE, Boolean.toString(true));
+ connectorProperties.put(SolaceSinkConstants.SOl_USE_TRANSACTIONS_FOR_TOPICS, Boolean.toString(true));
+ if (autoFlush) {
+ connectorProperties.put(SolaceSinkConstants.SOL_AUTOFLUSH_SIZE, Integer.toString(2));
+ }
String vpnName = connectorProperties.get(SolaceSinkConstants.SOL_VPN_NAME);
+ sempV2Api.config().createMsgVpnQueueSubscription(vpnName, queue.getName(), new ConfigMsgVpnQueueSubscription()
+ .subscriptionTopic(connectorProperties.get(SolaceSinkConstants.SOL_TOPICS)), null);
sempV2Api.config().updateMsgVpnQueue(vpnName, queue.getName(), new ConfigMsgVpnQueue().maxMsgSize(1), null);
assertTimeoutPreemptively(Duration.ofSeconds(20), () -> {
@@ -303,26 +328,50 @@ public void testCommitRollback(SempV2Api sempV2Api, Queue queue) throws Exceptio
SinkRecord sinkRecord = new SinkRecord(RandomStringUtils.randomAlphanumeric(100), 0,
Schema.STRING_SCHEMA, RandomStringUtils.randomAlphanumeric(100),
Schema.BYTES_SCHEMA, RandomUtils.nextBytes(10), 0);
- Map currentOffsets = Collections.singletonMap(
- new TopicPartition(sinkRecord.topic(), sinkRecord.kafkaPartition()),
- new OffsetAndMetadata(sinkRecord.kafkaOffset()));
- solaceSinkTask.put(Collections.singleton(sinkRecord));
- ConnectException thrown = assertThrows(ConnectException.class, () -> solaceSinkTask.flush(currentOffsets));
+ ConnectException thrown;
+ if (autoFlush) {
+ thrown = assertThrows(RetriableException.class, () -> solaceSinkTask.put(Collections.singleton(sinkRecord)));
+ } else {
+ Map currentOffsets = Collections.singletonMap(
+ new TopicPartition(sinkRecord.topic(), sinkRecord.kafkaPartition()),
+ new OffsetAndMetadata(sinkRecord.kafkaOffset()));
+ solaceSinkTask.put(Collections.singleton(sinkRecord));
+ thrown = assertThrows(ConnectException.class, () -> solaceSinkTask.flush(currentOffsets));
+ }
+
assertThat(thrown.getMessage(), containsString("Error in committing transaction"));
assertThat(thrown.getCause(), instanceOf(RollbackException.class));
assertThat(thrown.getCause().getMessage(), containsString("Document Is Too Large"));
+ assertEquals(2, sempV2Api.monitor().getMsgVpnQueue(vpnName, queue.getName(), null)
+ .getData().getMaxMsgSizeExceededDiscardedMsgCount());
}
- @Test
- public void testAutoFlushCommitRollback(SempV2Api sempV2Api, Queue queue) throws Exception {
- connectorProperties.put(SolaceSinkConstants.SOl_QUEUE, queue.getName());
+ @CartesianTest(name = "[{index}] destinationType={0}, autoFlush={1}")
+ public void testDynamicDestinationCommitRollback(
+ @Values(classes = {Queue.class, Topic.class}) Class destinationType,
+ @Values(booleans = {false, true}) boolean autoFlush,
+ SempV2Api sempV2Api,
+ Queue queue) throws Exception {
connectorProperties.put(SolaceSinkConstants.SOl_USE_TRANSACTIONS_FOR_QUEUE, Boolean.toString(true));
- connectorProperties.put(SolaceSinkConstants.SOL_QUEUE_MESSAGES_AUTOFLUSH_SIZE, Integer.toString(1));
+ connectorProperties.put(SolaceSinkConstants.SOl_USE_TRANSACTIONS_FOR_TOPICS, Boolean.toString(true));
+ connectorProperties.put(SolaceSinkConstants.SOL_DYNAMIC_DESTINATION, Boolean.toString(true));
+ connectorProperties.put(SolaceSinkConstants.SOL_RECORD_PROCESSOR, DynamicDestinationTypeRecordProcessor.class
+ .getName());
+ if (autoFlush) {
+ connectorProperties.put(SolaceSinkConstants.SOL_AUTOFLUSH_SIZE, Integer.toString(1));
+ }
+
+ String topicName = RandomStringUtils.randomAlphanumeric(100);
String vpnName = connectorProperties.get(SolaceSinkConstants.SOL_VPN_NAME);
- sempV2Api.config().updateMsgVpnQueue(vpnName, queue.getName(), new ConfigMsgVpnQueue().maxMsgSize(1), null);
+ if (destinationType.isAssignableFrom(Topic.class)) {
+ sempV2Api.config().createMsgVpnQueueSubscription(vpnName, queue.getName(),
+ new ConfigMsgVpnQueueSubscription().subscriptionTopic(topicName), null);
+ }
+
+ sempV2Api.config().updateMsgVpnQueue(vpnName, queue.getName(), new ConfigMsgVpnQueue().maxMsgSize(1), null);
assertTimeoutPreemptively(Duration.ofSeconds(20), () -> {
while (sempV2Api.monitor().getMsgVpnQueue(vpnName, queue.getName(), null).getData()
.getMaxMsgSize() != 1) {
@@ -337,10 +386,217 @@ public void testAutoFlushCommitRollback(SempV2Api sempV2Api, Queue queue) throws
Schema.STRING_SCHEMA, RandomStringUtils.randomAlphanumeric(100),
Schema.BYTES_SCHEMA, RandomUtils.nextBytes(10), 0);
- ConnectException thrown = assertThrows(RetriableException.class, () -> solaceSinkTask.put(Collections.singleton(sinkRecord)));
+ String dynamicDestinationName = destinationType.isAssignableFrom(Queue.class) ? queue.getName() : topicName;
+ sinkRecord.headers()
+ .addString(DynamicDestinationTypeRecordProcessor.HEADER_DYNAMIC_DESTINATION, dynamicDestinationName)
+ .addString(DynamicDestinationTypeRecordProcessor.HEADER_DYNAMIC_DESTINATION_TYPE, destinationType.getName());
+
+ ConnectException thrown;
+ if (autoFlush) {
+ thrown = assertThrows(RetriableException.class, () -> solaceSinkTask.put(Collections.singleton(sinkRecord)));
+ } else {
+ Map currentOffsets = Collections.singletonMap(
+ new TopicPartition(sinkRecord.topic(), sinkRecord.kafkaPartition()),
+ new OffsetAndMetadata(sinkRecord.kafkaOffset()));
+ solaceSinkTask.put(Collections.singleton(sinkRecord));
+ thrown = assertThrows(ConnectException.class, () -> solaceSinkTask.flush(currentOffsets));
+ }
+
assertThat(thrown.getMessage(), containsString("Error in committing transaction"));
assertThat(thrown.getCause(), instanceOf(RollbackException.class));
assertThat(thrown.getCause().getMessage(), containsString("Document Is Too Large"));
+ assertEquals(1, sempV2Api.monitor().getMsgVpnQueue(vpnName, queue.getName(), null)
+ .getData().getMaxMsgSizeExceededDiscardedMsgCount());
+ }
+
+ @ParameterizedTest(name = "[{index}] autoFlush={0}")
+ @ValueSource(booleans = {false, true})
+ public void testLongCommit(boolean autoFlush,
+ @JCSMPProxy JCSMPSession jcsmpSession,
+ SempV2Api sempV2Api,
+ Queue queue,
+ @JCSMPProxy ToxiproxyContext jcsmpProxyContext,
+ @ExecSvc ExecutorService executorService,
+ @LogCaptor(SolSessionEventCallbackHandler.class) BufferedReader logReader)
+ throws Exception {
+ connectorProperties.put(SolaceSinkConstants.SOL_HOST, (String) jcsmpSession.getProperty(JCSMPProperties.HOST));
+ connectorProperties.put(SolaceSinkConstants.SOl_QUEUE, queue.getName());
+ connectorProperties.put(SolaceSinkConstants.SOL_TOPICS, RandomStringUtils.randomAlphanumeric(100));
+ connectorProperties.put(SolaceSinkConstants.SOl_USE_TRANSACTIONS_FOR_QUEUE, Boolean.toString(true));
+ connectorProperties.put(SolaceSinkConstants.SOl_USE_TRANSACTIONS_FOR_TOPICS, Boolean.toString(true));
+ connectorProperties.put(SolaceSinkConstants.SOL_CHANNEL_PROPERTY_reconnectRetries, Integer.toString(-1));
+
+ if (autoFlush) {
+ connectorProperties.put(SolaceSinkConstants.SOL_AUTOFLUSH_SIZE, Integer.toString(2));
+ }
+
+ String vpnName = connectorProperties.get(SolaceSinkConstants.SOL_VPN_NAME);
+ sempV2Api.config().createMsgVpnQueueSubscription(vpnName, queue.getName(), new ConfigMsgVpnQueueSubscription()
+ .subscriptionTopic(connectorProperties.get(SolaceSinkConstants.SOL_TOPICS)), null);
+
+ solaceSinkTask.start(connectorProperties);
+
+ SinkRecord sinkRecord = new SinkRecord(RandomStringUtils.randomAlphanumeric(100), 0,
+ Schema.STRING_SCHEMA, RandomStringUtils.randomAlphanumeric(100),
+ Schema.BYTES_SCHEMA, RandomUtils.nextBytes(10), 0);
+ Map currentOffsets = Collections.singletonMap(
+ new TopicPartition(sinkRecord.topic(), sinkRecord.kafkaPartition()),
+ new OffsetAndMetadata(sinkRecord.kafkaOffset()));
+
+ logger.info("Cutting JCSMP upstream");
+ Latency lag = jcsmpProxyContext.getProxy().toxics()
+ .latency("lag", ToxicDirection.UPSTREAM, TimeUnit.HOURS.toMillis(1));
+
+ Future> future = executorService.submit(() -> {
+ String logLine;
+ do {
+ try {
+ logLine = logReader.readLine();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ } while (!logLine.contains("Received Session Event " + SessionEvent.RECONNECTING));
+
+ try {
+ Thread.sleep(TimeUnit.SECONDS.toMillis(5));
+ } catch (InterruptedException ignored) {}
+
+ try {
+ logger.info("Restoring JCSMP upstream");
+ lag.remove();
+ logger.info("JCSMP upstream restored");
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ assertTimeoutPreemptively(Duration.ofMinutes(5), () -> {
+ solaceSinkTask.put(Collections.singleton(sinkRecord));
+ solaceSinkTask.flush(currentOffsets);
+ });
+ future.get(30, TimeUnit.SECONDS);
+
+ List receivedDestinations = new ArrayList<>();
+ ConsumerFlowProperties consumerFlowProperties = new ConsumerFlowProperties();
+ consumerFlowProperties.setEndpoint(queue);
+ consumerFlowProperties.setStartState(true);
+ FlowReceiver flow = jcsmpSession.createFlow(null, consumerFlowProperties);
+ try {
+ assertTimeoutPreemptively(Duration.ofSeconds(30), () -> {
+ while (receivedDestinations.size() < 2) {
+ logger.info("Receiving messages");
+ Optional.ofNullable(flow.receive())
+ .map(XMLMessage::getDestination)
+ .ifPresent(receivedDestinations::add);
+ }
+ });
+ } finally {
+ flow.close();
+ }
+
+ assertThat(receivedDestinations, hasItems(queue,
+ JCSMPFactory.onlyInstance().createTopic(connectorProperties.get(SolaceSinkConstants.SOL_TOPICS))));
+ }
+
+ @CartesianTest(name = "[{index}] destinationType={0}, autoFlush={1}")
+ public void testDynamicDestinationLongCommit(
+ @Values(classes = {Queue.class, Topic.class}) Class destinationType,
+ @Values(booleans = {false, true}) boolean autoFlush,
+ @JCSMPProxy JCSMPSession jcsmpSession,
+ SempV2Api sempV2Api,
+ Queue queue,
+ @JCSMPProxy ToxiproxyContext jcsmpProxyContext,
+ @ExecSvc ExecutorService executorService,
+ @LogCaptor(SolSessionEventCallbackHandler.class) BufferedReader logReader) throws Exception {
+ connectorProperties.put(SolaceSinkConstants.SOL_HOST, (String) jcsmpSession.getProperty(JCSMPProperties.HOST));
+ connectorProperties.put(SolaceSinkConstants.SOl_USE_TRANSACTIONS_FOR_QUEUE, Boolean.toString(true));
+ connectorProperties.put(SolaceSinkConstants.SOl_USE_TRANSACTIONS_FOR_TOPICS, Boolean.toString(true));
+ connectorProperties.put(SolaceSinkConstants.SOL_CHANNEL_PROPERTY_reconnectRetries, Integer.toString(-1));
+ connectorProperties.put(SolaceSinkConstants.SOL_DYNAMIC_DESTINATION, Boolean.toString(true));
+ connectorProperties.put(SolaceSinkConstants.SOL_RECORD_PROCESSOR, DynamicDestinationTypeRecordProcessor.class
+ .getName());
+
+ // Force transacted session to be created during connector-start.
+ connectorProperties.put(SolaceSinkConstants.SOL_TOPICS, RandomStringUtils.randomAlphanumeric(100));
+
+ if (autoFlush) {
+ connectorProperties.put(SolaceSinkConstants.SOL_AUTOFLUSH_SIZE, Integer.toString(1));
+ }
+
+ String topicName = RandomStringUtils.randomAlphanumeric(100);
+ if (destinationType.isAssignableFrom(Topic.class)) {
+ sempV2Api.config().createMsgVpnQueueSubscription(connectorProperties.get(SolaceSinkConstants.SOL_VPN_NAME),
+ queue.getName(), new ConfigMsgVpnQueueSubscription()
+ .subscriptionTopic(topicName), null);
+ }
+
+ solaceSinkTask.start(connectorProperties);
+
+ SinkRecord sinkRecord = new SinkRecord(RandomStringUtils.randomAlphanumeric(100), 0,
+ Schema.STRING_SCHEMA, RandomStringUtils.randomAlphanumeric(100),
+ Schema.BYTES_SCHEMA, RandomUtils.nextBytes(10), 0);
+
+ String dynamicDestinationName = destinationType.isAssignableFrom(Queue.class) ? queue.getName() : topicName;
+ sinkRecord.headers()
+ .addString(DynamicDestinationTypeRecordProcessor.HEADER_DYNAMIC_DESTINATION, dynamicDestinationName)
+ .addString(DynamicDestinationTypeRecordProcessor.HEADER_DYNAMIC_DESTINATION_TYPE, destinationType.getName());
+
+ Map currentOffsets = Collections.singletonMap(
+ new TopicPartition(sinkRecord.topic(), sinkRecord.kafkaPartition()),
+ new OffsetAndMetadata(sinkRecord.kafkaOffset()));
+
+ logger.info("Cutting JCSMP upstream");
+ Latency lag = jcsmpProxyContext.getProxy().toxics()
+ .latency("lag", ToxicDirection.UPSTREAM, TimeUnit.HOURS.toMillis(1));
+
+ Future> future = executorService.submit(() -> {
+ String logLine;
+ do {
+ try {
+ logLine = logReader.readLine();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ } while (!logLine.contains("Received Session Event " + SessionEvent.RECONNECTING));
+
+ try {
+ Thread.sleep(TimeUnit.SECONDS.toMillis(5));
+ } catch (InterruptedException ignored) {}
+
+ try {
+ logger.info("Restoring JCSMP upstream");
+ lag.remove();
+ logger.info("JCSMP upstream restored");
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ assertTimeoutPreemptively(Duration.ofMinutes(5), () -> {
+ solaceSinkTask.put(Collections.singleton(sinkRecord));
+ solaceSinkTask.flush(currentOffsets);
+ });
+ future.get(30, TimeUnit.SECONDS);
+
+ ConsumerFlowProperties consumerFlowProperties = new ConsumerFlowProperties();
+ consumerFlowProperties.setEndpoint(queue);
+ consumerFlowProperties.setStartState(true);
+ FlowReceiver flow = jcsmpSession.createFlow(null, consumerFlowProperties);
+ try {
+ assertTimeoutPreemptively(Duration.ofSeconds(30), () -> {
+ while (true) {
+ logger.info("Receiving message");
+ BytesXMLMessage receivedMessage = flow.receive();
+ if (receivedMessage != null) {
+ assertInstanceOf(destinationType, receivedMessage.getDestination());
+ assertEquals(dynamicDestinationName, receivedMessage.getDestination().getName());
+ break;
+ }
+ }
+ });
+ } finally {
+ flow.close();
+ }
}
public static class BadRecordProcessor implements SolRecordProcessorIF {
@@ -364,4 +620,31 @@ public BytesXMLMessage processRecord(String skey, SinkRecord record) {
return msg;
}
}
+
+ public static class DynamicDestinationTypeRecordProcessor extends SolSimpleRecordProcessor {
+ public static final String HEADER_DYNAMIC_DESTINATION = "dynamicDestination";
+ public static final String HEADER_DYNAMIC_DESTINATION_TYPE = "dynamicDestinationType";
+ private static final Logger logger = LoggerFactory.getLogger(DynamicDestinationTypeRecordProcessor.class);
+
+ @Override
+ public BytesXMLMessage processRecord(String skey, SinkRecord record) {
+ try {
+ String dynamicDestinationName = (String) record.headers().lastWithName(HEADER_DYNAMIC_DESTINATION)
+ .value();
+ Class> dynamicDestinationType = Class.forName((String) record.headers()
+ .lastWithName(HEADER_DYNAMIC_DESTINATION_TYPE).value());
+
+ Destination dynamicDestination = dynamicDestinationType.isAssignableFrom(Queue.class) ?
+ JCSMPFactory.onlyInstance().createQueue(dynamicDestinationName) :
+ JCSMPFactory.onlyInstance().createTopic(dynamicDestinationName);
+ logger.info("Parsed dynamic destination {} {}", dynamicDestinationType.getSimpleName(), dynamicDestination);
+
+ BytesXMLMessage msg = super.processRecord(skey, record);
+ msg.getProperties().putDestination("dynamicDestination", dynamicDestination);
+ return msg;
+ } catch (SDTException | ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
}
diff --git a/src/integrationTest/java/com/solace/connector/kafka/connect/sink/it/util/ThrowingFunction.java b/src/integrationTest/java/com/solace/connector/kafka/connect/sink/it/util/ThrowingFunction.java
new file mode 100644
index 0000000..dc590e7
--- /dev/null
+++ b/src/integrationTest/java/com/solace/connector/kafka/connect/sink/it/util/ThrowingFunction.java
@@ -0,0 +1,16 @@
+package com.solace.connector.kafka.connect.sink.it.util;
+
+import java.util.function.Function;
+
+public interface ThrowingFunction extends Function {
+ @Override
+ default R apply(T t) {
+ try {
+ return acceptThrows(t);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ R acceptThrows(T t) throws Exception;
+}
diff --git a/src/integrationTest/java/com/solace/connector/kafka/connect/sink/it/util/extensions/KafkaArgumentsProvider.java b/src/integrationTest/java/com/solace/connector/kafka/connect/sink/it/util/extensions/KafkaArgumentsProvider.java
index 99f6531..9d9ab83 100644
--- a/src/integrationTest/java/com/solace/connector/kafka/connect/sink/it/util/extensions/KafkaArgumentsProvider.java
+++ b/src/integrationTest/java/com/solace/connector/kafka/connect/sink/it/util/extensions/KafkaArgumentsProvider.java
@@ -16,7 +16,7 @@
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;
-import org.junitpioneer.jupiter.CartesianAnnotationConsumer;
+import org.junitpioneer.jupiter.cartesian.CartesianArgumentsProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
@@ -28,16 +28,29 @@
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
+import java.lang.reflect.Parameter;
+import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
-public class KafkaArgumentsProvider implements ArgumentsProvider, CartesianAnnotationConsumer {
+public class KafkaArgumentsProvider implements ArgumentsProvider,
+ CartesianArgumentsProvider {
private static final Logger LOG = LoggerFactory.getLogger(KafkaArgumentsProvider.class);
@Override
public Stream extends Arguments> provideArguments(ExtensionContext context) {
+ return createKafkaContexts(context).map(Arguments::of);
+ }
+
+ @Override
+ public Stream provideArguments(ExtensionContext context, Parameter parameter) {
+ Objects.requireNonNull(parameter.getAnnotation(KafkaSource.class));
+ return createKafkaContexts(context);
+ }
+
+ private Stream createKafkaContexts(ExtensionContext context) {
KafkaConnection bitnamiCxn = context.getRoot()
.getStore(KafkaNamespace.BITNAMI.getNamespace())
.getOrComputeIfAbsent(BitnamiResource.class, c -> {
@@ -86,8 +99,8 @@ public Stream extends Arguments> provideArguments(ExtensionContext context) {
.getKafkaConnection();
return Stream.of(
- Arguments.of(createKafkaContext(bitnamiCxn, KafkaNamespace.BITNAMI, context)),
- Arguments.of(createKafkaContext(confluentCxn, KafkaNamespace.CONFLUENT, context))
+ createKafkaContext(bitnamiCxn, KafkaNamespace.BITNAMI, context),
+ createKafkaContext(confluentCxn, KafkaNamespace.CONFLUENT, context)
);
}
@@ -125,17 +138,10 @@ private KafkaContext createKafkaContext(KafkaConnection connection, KafkaNamespa
return new KafkaContext(namespace, connection, adminClient, connectorDeployment, producer);
}
- @Override
- public void accept(KafkaArgumentSource kafkaArgumentSource) {
-
- }
-
- @Target(ElementType.METHOD)
+ @Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@ArgumentsSource(KafkaArgumentsProvider.class)
- public @interface KafkaArgumentSource {
-
- }
+ public @interface KafkaSource {}
public static class AutoDeleteSolaceConnectorDeploymentAfterEach implements AfterEachCallback {
diff --git a/src/integrationTest/java/com/solace/connector/kafka/connect/sink/it/util/testcontainers/BitnamiKafkaConnectContainer.java b/src/integrationTest/java/com/solace/connector/kafka/connect/sink/it/util/testcontainers/BitnamiKafkaConnectContainer.java
index 5729c40..22433a7 100644
--- a/src/integrationTest/java/com/solace/connector/kafka/connect/sink/it/util/testcontainers/BitnamiKafkaConnectContainer.java
+++ b/src/integrationTest/java/com/solace/connector/kafka/connect/sink/it/util/testcontainers/BitnamiKafkaConnectContainer.java
@@ -1,6 +1,8 @@
package com.solace.connector.kafka.connect.sink.it.util.testcontainers;
import com.github.dockerjava.api.command.InspectContainerResponse;
+import com.solace.connector.kafka.connect.sink.SolProducerHandler;
+import com.solace.connector.kafka.connect.sink.SolaceSinkSender;
import com.solace.connector.kafka.connect.sink.SolaceSinkTask;
import com.solace.connector.kafka.connect.sink.it.Tools;
import org.testcontainers.containers.BindMode;
@@ -23,6 +25,8 @@ public class BitnamiKafkaConnectContainer extends GenericContainer zookeeperContainer;
@@ -77,14 +81,18 @@ protected void doStart() {
protected void containerIsStarting(InspectContainerResponse containerInfo) {
String command = "/bin/sh\n" +
"set -e\n" +
- "echo 'plugin.path=/opt/bitnami/kafka/jars' >> /opt/bitnami/kafka/config/connect-distributed.properties\n" +
- "echo 'rest.port=" + CONNECT_PORT + "' >> /opt/bitnami/kafka/config/connect-distributed.properties\n" +
- "echo 'log4j.logger.org.apache.kafka.connect.runtime.WorkerSinkTask=DEBUG' >> /opt/bitnami/kafka/config/connect-log4j.properties\n" +
- "echo 'log4j.logger." + SolaceSinkTask.class.getName() + "=TRACE' >> /opt/bitnami/kafka/config/connect-log4j.properties\n" +
+ "echo 'plugin.path=/opt/bitnami/kafka/jars' >> " + PROPS_CONNECT + "\n" +
+ "echo 'rest.port=" + CONNECT_PORT + "' >> " + PROPS_CONNECT + "\n" +
+ "echo 'connector.client.config.override.policy=All' >> " + PROPS_CONNECT + "\n" +
+ "echo 'log4j.logger.org.apache.kafka.connect.runtime.WorkerSinkTask=TRACE' >> " + PROPS_LOG4J + "\n" +
+ "echo 'log4j.logger.com.solace.connector.kafka.connect.sink=DEBUG' >> " + PROPS_LOG4J + "\n" +
+ "echo 'log4j.logger." + SolProducerHandler.class.getName() + "=TRACE' >> " + PROPS_LOG4J + "\n" +
+ "echo 'log4j.logger." + SolaceSinkSender.class.getName() + "=TRACE' >> " + PROPS_LOG4J + "\n" +
+ "echo 'log4j.logger." + SolaceSinkTask.class.getName() + "=TRACE' >> " + PROPS_LOG4J + "\n" +
"export KAFKA_CFG_ADVERTISED_LISTENERS=" + advertisedListeners(containerInfo) + "\n" +
"/opt/bitnami/scripts/kafka/setup.sh\n" +
"/opt/bitnami/scripts/kafka/run.sh &\n" +
- "/opt/bitnami/kafka/bin/connect-distributed.sh /opt/bitnami/kafka/config/connect-distributed.properties\n";
+ "/opt/bitnami/kafka/bin/connect-distributed.sh " + PROPS_CONNECT + "\n";
copyFileToContainer(Transferable.of(command.getBytes(StandardCharsets.UTF_8), 0777), STARTER_SCRIPT);
super.containerIsStarting(containerInfo);
}
diff --git a/src/integrationTest/java/com/solace/connector/kafka/connect/sink/it/util/testcontainers/ConfluentKafkaConnectContainer.java b/src/integrationTest/java/com/solace/connector/kafka/connect/sink/it/util/testcontainers/ConfluentKafkaConnectContainer.java
index 04a8064..46b3f48 100644
--- a/src/integrationTest/java/com/solace/connector/kafka/connect/sink/it/util/testcontainers/ConfluentKafkaConnectContainer.java
+++ b/src/integrationTest/java/com/solace/connector/kafka/connect/sink/it/util/testcontainers/ConfluentKafkaConnectContainer.java
@@ -1,5 +1,8 @@
package com.solace.connector.kafka.connect.sink.it.util.testcontainers;
+import com.solace.connector.kafka.connect.sink.SolProducerHandler;
+import com.solace.connector.kafka.connect.sink.SolaceSinkSender;
+import com.solace.connector.kafka.connect.sink.SolaceSinkTask;
import com.solace.connector.kafka.connect.sink.it.Tools;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.GenericContainer;
@@ -50,7 +53,15 @@ public ConfluentKafkaConnectContainer(DockerImageName dockerImageName,
withEnv("CONNECT_INTERNAL_KEY_CONVERTER", "org.apache.kafka.connect.json.JsonConverter");
withEnv("CONNECT_INTERNAL_VALUE_CONVERTER", "org.apache.kafka.connect.json.JsonConverter");
withEnv("CONNECT_REST_ADVERTISED_HOST_NAME", "localhost");
+ withEnv("CONNECT_CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY", "All");
withEnv("CONNECT_LOG4J_ROOT_LOGLEVEL", "INFO");
+ withEnv("CONNECT_LOG4J_LOGGERS", String.join(",",
+ String.join("=", "org.apache.kafka.connect.runtime.WorkerSinkTask", "TRACE"),
+ String.join("=", "com.solace.connector.kafka.connect.sink", "DEBUG"),
+ String.join("=", SolProducerHandler.class.getName(), "TRACE"),
+ String.join("=", SolaceSinkSender.class.getName(), "TRACE"),
+ String.join("=", SolaceSinkTask.class.getName(), "TRACE")
+ ));
withEnv("CONNECT_PLUGIN_PATH", "/usr/share/java,/etc/kafka-connect/jars");
withClasspathResourceMapping(Tools.getUnzippedConnectorDirName() + "/lib",
"/etc/kafka-connect/jars", BindMode.READ_ONLY);
diff --git a/src/integrationTest/resources/log4j2.xml b/src/integrationTest/resources/log4j2.xml
index 01eb664..0ff37d5 100644
--- a/src/integrationTest/resources/log4j2.xml
+++ b/src/integrationTest/resources/log4j2.xml
@@ -6,6 +6,9 @@
+
+
+
diff --git a/src/main/java/com/solace/connector/kafka/connect/sink/SolProducerHandler.java b/src/main/java/com/solace/connector/kafka/connect/sink/SolProducerHandler.java
new file mode 100644
index 0000000..41bf46f
--- /dev/null
+++ b/src/main/java/com/solace/connector/kafka/connect/sink/SolProducerHandler.java
@@ -0,0 +1,182 @@
+package com.solace.connector.kafka.connect.sink;
+
+import com.solacesystems.jcsmp.DeliveryMode;
+import com.solacesystems.jcsmp.Destination;
+import com.solacesystems.jcsmp.JCSMPException;
+import com.solacesystems.jcsmp.ProducerFlowProperties;
+import com.solacesystems.jcsmp.Queue;
+import com.solacesystems.jcsmp.XMLMessage;
+import com.solacesystems.jcsmp.XMLMessageProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class SolProducerHandler implements AutoCloseable {
+ private final SolaceSinkConnectorConfig config;
+ private final SolSessionHandler sessionHandler;
+ final XMLMessageProducer producer;
+ private final AtomicInteger txMsgCount = new AtomicInteger();
+ private final Runnable txAutoFlushCallback;
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+ volatile XMLMessageProducer transactedProducer;
+ volatile XMLMessageProducer topicProducer;
+ volatile XMLMessageProducer queueProducer;
+
+ private static final Logger log = LoggerFactory.getLogger(SolProducerHandler.class);
+
+ public SolProducerHandler(final SolaceSinkConnectorConfig config,
+ final SolSessionHandler sessionHandler,
+ final Runnable txAutoFlushCallback) throws JCSMPException {
+ this.config = config;
+ this.sessionHandler = sessionHandler;
+ this.txAutoFlushCallback = txAutoFlushCallback;
+ this.producer = sessionHandler.getSession().getMessageProducer(new SolStreamingMessageCallbackHandler());
+
+ if (config.getString(SolaceSinkConstants.SOl_QUEUE) != null) {
+ // pre-init producer if queues are statically defined
+ initQueueProducer();
+ }
+
+ if (config.getTopics().length > 0) {
+ // pre-init producer if topics are statically defined
+ initTopicProducer();
+ }
+ }
+
+ public void send(final XMLMessage message, final Destination destination) throws JCSMPException {
+ if (destination instanceof Queue) {
+ if (queueProducer == null) {
+ initQueueProducer();
+ }
+ } else {
+ if (topicProducer == null) {
+ initTopicProducer();
+ }
+ }
+
+ Lock readLock = this.lock.readLock();
+ readLock.lock();
+ try {
+ if (destination instanceof Queue) {
+ message.setDeliveryMode(DeliveryMode.PERSISTENT);
+ queueProducer.send(message, destination);
+ if (config.getBoolean(SolaceSinkConstants.SOl_USE_TRANSACTIONS_FOR_QUEUE)) {
+ autoFlush();
+ }
+ } else {
+ if (config.getBoolean(SolaceSinkConstants.SOl_USE_TRANSACTIONS_FOR_TOPICS)) {
+ message.setDeliveryMode(DeliveryMode.PERSISTENT);
+ topicProducer.send(message, destination);
+ autoFlush();
+ } else {
+ message.setDeliveryMode(DeliveryMode.DIRECT);
+ topicProducer.send(message, destination);
+ }
+ }
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public AtomicInteger getTxMsgCount() {
+ return txMsgCount;
+ }
+
+ private void autoFlush() {
+ int txMsgCnt = txMsgCount.incrementAndGet();
+ log.trace("================ Count of TX message is now: {}", txMsgCnt);
+ if (txMsgCnt > config.getInt(SolaceSinkConstants.SOL_AUTOFLUSH_SIZE)-1) {
+ txAutoFlushCallback.run();
+ }
+ }
+
+ private void initTopicProducer() throws JCSMPException {
+ Lock writeLock = this.lock.writeLock();
+ writeLock.lock();
+ try {
+ if (topicProducer == null) {
+ if (config.getBoolean(SolaceSinkConstants.SOl_USE_TRANSACTIONS_FOR_TOPICS)) {
+ this.topicProducer = createTransactedProducer();
+ } else {
+ this.topicProducer = producer;
+ }
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ private void initQueueProducer() throws JCSMPException {
+ Lock writeLock = this.lock.writeLock();
+ writeLock.lock();
+ try {
+ if (queueProducer == null) {
+ if (config.getBoolean(SolaceSinkConstants.SOl_USE_TRANSACTIONS_FOR_QUEUE)) {
+ // Using transacted session for queue
+ queueProducer = createTransactedProducer();
+ } else {
+ // Not using transacted session for queue
+ queueProducer = sessionHandler.getSession().createProducer(createProducerFlowProperties(),
+ new SolStreamingMessageCallbackHandler(), new SolProducerEventCallbackHandler());
+ }
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ private XMLMessageProducer createTransactedProducer() throws JCSMPException {
+ if (transactedProducer == null) {
+ Lock writeLock = this.lock.writeLock();
+ writeLock.lock();
+ try {
+ if (transactedProducer == null) {
+ sessionHandler.createTxSession();
+ transactedProducer = sessionHandler.getTxSession().createProducer(createProducerFlowProperties(),
+ new SolStreamingMessageCallbackHandler(), new SolProducerEventCallbackHandler());
+ log.info("================ txSession status: {}",
+ sessionHandler.getTxSession().getStatus().toString());
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+ return transactedProducer;
+ }
+
+ private ProducerFlowProperties createProducerFlowProperties() {
+ ProducerFlowProperties flowProps = new ProducerFlowProperties();
+ flowProps.setAckEventMode(config.getString(SolaceSinkConstants.SOL_ACK_EVENT_MODE));
+ flowProps.setWindowSize(config.getInt(SolaceSinkConstants.SOL_PUBLISHER_WINDOW_SIZE));
+ return flowProps;
+ }
+
+ @Override
+ public void close() {
+ Lock writeLock = lock.writeLock();
+ writeLock.lock();
+ try {
+ if (queueProducer != null && !queueProducer.isClosed()) {
+ queueProducer.close();
+ }
+
+ if (topicProducer != null && !topicProducer.isClosed()) {
+ topicProducer.close();
+ }
+
+ if (transactedProducer != null && !transactedProducer.isClosed()) {
+ transactedProducer.close();
+ }
+
+ if (producer != null && !producer.isClosed()) {
+ producer.close();
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+}
diff --git a/src/main/java/com/solace/connector/kafka/connect/sink/SolSessionHandler.java b/src/main/java/com/solace/connector/kafka/connect/sink/SolSessionHandler.java
index 9ce427c..01b10b2 100644
--- a/src/main/java/com/solace/connector/kafka/connect/sink/SolSessionHandler.java
+++ b/src/main/java/com/solace/connector/kafka/connect/sink/SolSessionHandler.java
@@ -30,6 +30,8 @@
import java.util.Enumeration;
import java.util.Optional;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.common.config.types.Password;
import org.slf4j.Logger;
@@ -39,12 +41,13 @@
public class SolSessionHandler {
private static final Logger log = LoggerFactory.getLogger(SolSessionHandler.class);
- private SolaceSinkConnectorConfig lconfig;
+ private final SolaceSinkConnectorConfig lconfig;
final JCSMPProperties properties = new JCSMPProperties();
final JCSMPChannelProperties chanProperties = new JCSMPChannelProperties();
private JCSMPSession session = null;
- private TransactedSession txSession = null;
+ private volatile TransactedSession txSession = null;
+ private final Lock lock = new ReentrantLock();
public SolSessionHandler(SolaceSinkConnectorConfig lconfig) {
this.lconfig = lconfig;
@@ -164,7 +167,6 @@ public void configureSession() {
/**
* Create and connect JCSMPSession
- * @return
* @throws JCSMPException
*/
public void connectSession() throws JCSMPException {
@@ -180,11 +182,20 @@ public void connectSession() throws JCSMPException {
/**
* Create transacted session
- * @return TransactedSession
* @throws JCSMPException
*/
public void createTxSession() throws JCSMPException {
- txSession = session.createTransactedSession();
+ if (txSession == null) {
+ lock.lock();
+ try {
+ if (txSession == null) {
+ txSession = session.createTransactedSession();
+ log.info("Transacted Session has been created");
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
}
public JCSMPSession getSession() {
diff --git a/src/main/java/com/solace/connector/kafka/connect/sink/SolaceSinkConnectorConfig.java b/src/main/java/com/solace/connector/kafka/connect/sink/SolaceSinkConnectorConfig.java
index f6db799..8e86900 100644
--- a/src/main/java/com/solace/connector/kafka/connect/sink/SolaceSinkConnectorConfig.java
+++ b/src/main/java/com/solace/connector/kafka/connect/sink/SolaceSinkConnectorConfig.java
@@ -20,6 +20,7 @@
package com.solace.connector.kafka.connect.sink;
import java.util.Map;
+import java.util.Optional;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
@@ -118,7 +119,7 @@ public static ConfigDef solaceConfigDef() {
.define(SolaceSinkConstants.SOL_SUB_ACK_WINDOW_SIZE,
Type.INT, 255, Importance.LOW,
"The size of the sliding subscriber ACK window. The valid range is 1-255")
- .define(SolaceSinkConstants.SOL_QUEUE_MESSAGES_AUTOFLUSH_SIZE,
+ .define(SolaceSinkConstants.SOL_AUTOFLUSH_SIZE,
Type.INT, 200, Importance.LOW,
"Number of outstanding transacted messages before autoflush. Must be lower than "
+ "max PubSub+ transaction size (255). The valid range is 1-200")
@@ -137,6 +138,9 @@ public static ConfigDef solaceConfigDef() {
.define(SolaceSinkConstants.SOl_USE_TRANSACTIONS_FOR_QUEUE,
Type.BOOLEAN, true, Importance.LOW,
"Specifies if writing messages to queue destination shall use transactions.")
+ .define(SolaceSinkConstants.SOl_USE_TRANSACTIONS_FOR_TOPICS,
+ Type.BOOLEAN, false, Importance.LOW,
+ "When true, messages published to topics will use persistent delivery type using transactions.")
.define(SolaceSinkConstants.SOL_CHANNEL_PROPERTY_connectTimeoutInMillis,
Type.INT, 30000, Importance.MEDIUM,
"Timeout value (in ms) for creating an initial connection to Solace")
@@ -268,6 +272,12 @@ public static ConfigDef solaceConfigDef() {
}
+ public String[] getTopics() {
+ return Optional.ofNullable(getString(SolaceSinkConstants.SOL_TOPICS))
+ .map(s -> s.split(","))
+ .orElse(new String[0]);
+ }
+
public boolean isEmitKafkaRecordHeadersEnabled() {
return getBoolean(SolaceSinkConstants.SOL_EMIT_KAFKA_RECORD_HEADERS_ENABLED);
}
diff --git a/src/main/java/com/solace/connector/kafka/connect/sink/SolaceSinkConstants.java b/src/main/java/com/solace/connector/kafka/connect/sink/SolaceSinkConstants.java
index ad3fa5b..acf16a8 100644
--- a/src/main/java/com/solace/connector/kafka/connect/sink/SolaceSinkConstants.java
+++ b/src/main/java/com/solace/connector/kafka/connect/sink/SolaceSinkConstants.java
@@ -66,6 +66,7 @@ public class SolaceSinkConstants {
public static final String SOL_KRB_SERVICE_NAME = "sol.krb_service_name";
public static final String SOL_SSL_CONNECTION_DOWNGRADE_TO = "sol.ssl_connection_downgrade_to";
public static final String SOl_USE_TRANSACTIONS_FOR_QUEUE = "sol.use_transactions_for_queue";
+ public static final String SOl_USE_TRANSACTIONS_FOR_TOPICS = "sol.use_transactions_for_topics";
// Low Importance Solace TLS Protocol properties
// public static final String SOL_SSL_PROTOCOL = "sol.ssl_protocol";
@@ -114,7 +115,7 @@ public class SolaceSinkConstants {
// Low Importance Persistent Message Properties
public static final String SOL_SUB_ACK_WINDOW_SIZE = "sol.sub_ack_window_size";
public static final String SOL_PUB_ACK_WINDOW_SIZE = "sol.pub_ack_window_size";
- public static final String SOL_QUEUE_MESSAGES_AUTOFLUSH_SIZE = "sol.autoflush.size";
+ public static final String SOL_AUTOFLUSH_SIZE = "sol.autoflush.size";
public static final String SOL_SUB_ACK_TIME = "sol.sub_ack_time";
public static final String SOL_PUB_ACK_TIME = "sol.pub_ack_time";
public static final String SOL_SUB_ACK_WINDOW_THRESHOLD = "sol.sub_ack_window_threshold";
diff --git a/src/main/java/com/solace/connector/kafka/connect/sink/SolaceSinkSender.java b/src/main/java/com/solace/connector/kafka/connect/sink/SolaceSinkSender.java
index 85b789b..ab9d9ee 100644
--- a/src/main/java/com/solace/connector/kafka/connect/sink/SolaceSinkSender.java
+++ b/src/main/java/com/solace/connector/kafka/connect/sink/SolaceSinkSender.java
@@ -20,16 +20,13 @@
package com.solace.connector.kafka.connect.sink;
import com.solacesystems.jcsmp.BytesXMLMessage;
-import com.solacesystems.jcsmp.DeliveryMode;
import com.solacesystems.jcsmp.Destination;
import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jcsmp.JCSMPFactory;
-import com.solacesystems.jcsmp.ProducerFlowProperties;
import com.solacesystems.jcsmp.Queue;
import com.solacesystems.jcsmp.SDTException;
import com.solacesystems.jcsmp.SDTMap;
import com.solacesystems.jcsmp.Topic;
-import com.solacesystems.jcsmp.XMLMessageProducer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
@@ -43,21 +40,17 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.concurrent.atomic.AtomicInteger;
public class SolaceSinkSender {
private static final Logger log = LoggerFactory.getLogger(SolaceSinkSender.class);
private final SolaceSinkConnectorConfig sconfig;
- private final XMLMessageProducer topicProducer;
- private XMLMessageProducer queueProducer;
private final SolSessionHandler sessionHandler;
+ final SolProducerHandler producerHandler;
private final List topics = new ArrayList<>();
private Queue solQueue = null;
- private boolean useTxforQueue = false;
private final SolRecordProcessorIF processor;
private final String kafkaKey;
- private final AtomicInteger txMsgCounter = new AtomicInteger();
private final SolaceSinkTask sinkTask;
private final Map offsets = new HashMap<>();
@@ -65,53 +58,25 @@ public class SolaceSinkSender {
* Class that sends Solace Messages from Kafka Records.
* @param sconfig JCSMP Configuration
* @param sessionHandler SolSessionHandler
- * @param useTxforQueue
* @param sinkTask Connector Sink Task
* @throws JCSMPException
*/
public SolaceSinkSender(final SolaceSinkConnectorConfig sconfig,
final SolSessionHandler sessionHandler,
- final boolean useTxforQueue,
final SolaceSinkTask sinkTask) throws JCSMPException {
this.sconfig = sconfig;
this.sessionHandler = sessionHandler;
- this.useTxforQueue = useTxforQueue;
this.sinkTask = sinkTask;
this.kafkaKey = sconfig.getString(SolaceSinkConstants.SOL_KAFKA_MESSAGE_KEY);
- this.topicProducer = sessionHandler.getSession().getMessageProducer(new SolStreamingMessageCallbackHandler());
+ this.producerHandler = new SolProducerHandler(sconfig, sessionHandler, this::txAutoFlushHandler);
this.processor = sconfig.getConfiguredInstance(SolaceSinkConstants.SOL_RECORD_PROCESSOR, SolRecordProcessorIF.class);
- }
- /**
- * Generate PubSub+ topics from topic string
- */
- public void setupDestinationTopics() {
- String solaceTopics = sconfig.getString(SolaceSinkConstants.SOL_TOPICS);
- String[] stopics = solaceTopics.split(",");
- int counter = 0;
- while (stopics.length > counter) {
- topics.add(JCSMPFactory.onlyInstance().createTopic(stopics[counter].trim()));
- counter++;
+ for (String topic : sconfig.getTopics()) {
+ this.topics.add(JCSMPFactory.onlyInstance().createTopic(topic.trim()));
}
- }
- /**
- * Generate PubSub queue
- */
- public void setupDestinationQueue() throws JCSMPException {
- solQueue = JCSMPFactory.onlyInstance().createQueue(sconfig.getString(SolaceSinkConstants.SOl_QUEUE));
- ProducerFlowProperties flowProps = new ProducerFlowProperties();
- flowProps.setAckEventMode(sconfig.getString(SolaceSinkConstants.SOL_ACK_EVENT_MODE));
- flowProps.setWindowSize(sconfig.getInt(SolaceSinkConstants.SOL_PUBLISHER_WINDOW_SIZE));
- if (useTxforQueue) {
- // Using transacted session for queue
- queueProducer = sessionHandler.getTxSession().createProducer(flowProps, new SolStreamingMessageCallbackHandler(),
- new SolProducerEventCallbackHandler());
- log.info("================ txSession status: {}", sessionHandler.getTxSession().getStatus().toString());
- } else {
- // Not using transacted session for queue
- queueProducer = sessionHandler.getSession().createProducer(flowProps, new SolStreamingMessageCallbackHandler(),
- new SolProducerEventCallbackHandler());
+ if (sconfig.getString(SolaceSinkConstants.SOl_QUEUE) != null) {
+ solQueue = JCSMPFactory.onlyInstance().createQueue(sconfig.getString(SolaceSinkConstants.SOl_QUEUE));
}
}
@@ -125,8 +90,7 @@ public void sendRecord(SinkRecord record) {
message = processor.processRecord(kafkaKey, record);
offsets.put(new TopicPartition(record.topic(), record.kafkaPartition()),
new OffsetAndMetadata(record.kafkaOffset()));
- log.trace("================ Processed record details, topic: {}, Partition: {}, "
- + "Offset: {}", record.topic(),
+ log.trace("================ Processed record details, topic: {}, Partition: {}, Offset: {}", record.topic(),
record.kafkaPartition(), record.kafkaOffset());
} catch (Exception e) {
if (sconfig.getBoolean(SolaceSinkConstants.SOL_RECORD_PROCESSOR_IGNORE_ERROR)) {
@@ -160,7 +124,7 @@ public void sendRecord(SinkRecord record) {
}
}
try {
- topicProducer.send(message, dest);
+ producerHandler.send(message, dest);
} catch (IllegalArgumentException e) {
throw new ConnectException(String.format("Received exception while sending message to topic %s",
dest != null ? dest.getName() : null), e);
@@ -172,12 +136,7 @@ public void sendRecord(SinkRecord record) {
// Process when Dynamic destination is not set
if (solQueue != null) {
try {
- message.setDeliveryMode(DeliveryMode.PERSISTENT);
- queueProducer.send(message, solQueue);
- if (useTxforQueue) {
- txMsgCounter.getAndIncrement();
- log.trace("================ Count of TX message is now: {}", txMsgCounter.get());
- }
+ producerHandler.send(message, solQueue);
} catch (IllegalArgumentException e) {
throw new ConnectException(String.format("Received exception while sending message to queue %s",
solQueue.getName()), e);
@@ -187,34 +146,30 @@ public void sendRecord(SinkRecord record) {
}
}
if (topics.size() != 0 && message.getDestination() == null) {
- message.setDeliveryMode(DeliveryMode.DIRECT);
- int count = 0;
- while (topics.size() > count) {
+ for (Topic topic : topics) {
try {
- topicProducer.send(message, topics.get(count));
+ producerHandler.send(message, topic);
} catch (IllegalArgumentException e) {
throw new ConnectException(String.format("Received exception while sending message to topic %s",
- topics.get(count).getName()), e);
+ topic.getName()), e);
} catch (JCSMPException e) {
throw new RetriableException(String.format("Received exception while sending message to topic %s",
- topics.get(count).getName()), e);
+ topic.getName()), e);
}
- count++;
}
}
}
+ }
- // Solace limits transaction size to 255 messages so need to force commit
- if ( useTxforQueue && txMsgCounter.get() > sconfig.getInt(SolaceSinkConstants.SOL_QUEUE_MESSAGES_AUTOFLUSH_SIZE)-1 ) {
- log.debug("================ Queue transaction autoflush size reached, flushing offsets from connector");
- try {
- sinkTask.flush(offsets);
- } catch (ConnectException e) {
- if (e.getCause() instanceof JCSMPException) {
- throw new RetriableException(e.getMessage(), e.getCause());
- } else {
- throw e;
- }
+ private void txAutoFlushHandler() {
+ log.debug("================ Queue transaction autoflush size reached, flushing offsets from connector");
+ try {
+ sinkTask.flush(offsets);
+ } catch (ConnectException e) {
+ if (e.getCause() instanceof JCSMPException) {
+ throw new RetriableException(e.getMessage(), e.getCause());
+ } else {
+ throw e;
}
}
}
@@ -244,25 +199,18 @@ void mayEnrichUserPropertiesWithKafkaRecordHeaders(final SinkRecord record,
* Commit Solace and Kafka records.
*/
public synchronized void commit() throws JCSMPException {
- if (useTxforQueue) {
+ if (producerHandler.getTxMsgCount().getAndSet(0) > 0) {
sessionHandler.getTxSession().commit();
- txMsgCounter.set(0);
- log.debug("Comitted Solace records for transaction with status: {}",
+ log.debug("Committed Solace records for transaction with status: {}",
sessionHandler.getTxSession().getStatus().name());
}
}
/**
* Shutdown TXProducer and Topic Producer.
- * @return
*/
public void shutdown() {
- if (queueProducer != null) {
- queueProducer.close();
- }
- if (topicProducer != null) {
- topicProducer.close();
- }
+ producerHandler.close();
}
}
diff --git a/src/main/java/com/solace/connector/kafka/connect/sink/SolaceSinkTask.java b/src/main/java/com/solace/connector/kafka/connect/sink/SolaceSinkTask.java
index 5d56790..7dd3750 100644
--- a/src/main/java/com/solace/connector/kafka/connect/sink/SolaceSinkTask.java
+++ b/src/main/java/com/solace/connector/kafka/connect/sink/SolaceSinkTask.java
@@ -38,7 +38,6 @@ public class SolaceSinkTask extends SinkTask {
private static final Logger log = LoggerFactory.getLogger(SolaceSinkTask.class);
private SolSessionHandler solSessionHandler;
private SolaceSinkSender solSender;
- private boolean useTxforQueue = false;
private SinkTaskContext context;
SolaceSinkConnectorConfig connectorConfig;
@@ -60,28 +59,8 @@ public void start(Map props) {
}
log.info("================ JCSMPSession Connected");
- if (connectorConfig.getString(SolaceSinkConstants.SOl_QUEUE) != null) {
- // Use transactions for queue destination
- useTxforQueue = connectorConfig.getBoolean(SolaceSinkConstants.SOl_USE_TRANSACTIONS_FOR_QUEUE);
- if (useTxforQueue) {
- try {
- solSessionHandler.createTxSession();
- log.info("================ Transacted Session has been Created for PubSub+ queue destination");
- } catch (JCSMPException e) {
- throw new ConnectException("Failed to create Transacted Session for PubSub+ queue destination, " +
- "make sure transacted sessions are enabled", e);
- }
- }
- }
-
try {
- solSender = new SolaceSinkSender(connectorConfig, solSessionHandler, useTxforQueue, this);
- if (connectorConfig.getString(SolaceSinkConstants.SOL_TOPICS) != null) {
- solSender.setupDestinationTopics();
- }
- if (connectorConfig.getString(SolaceSinkConstants.SOl_QUEUE) != null) {
- solSender.setupDestinationQueue();
- }
+ solSender = new SolaceSinkSender(connectorConfig, solSessionHandler, this);
} catch (Exception e) {
throw new ConnectException("Failed to setup sender to PubSub+", e);
}
@@ -121,7 +100,7 @@ public synchronized void flush(Map currentOff
log.debug("Flushing up to topic {}, partition {} and offset {}", tp.topic(),
tp.partition(), om.offset());
}
- if (useTxforQueue) {
+ if (solSessionHandler.getTxSession() != null) {
try {
solSender.commit();
} catch (JCSMPException e) {
diff --git a/src/test/java/com/solace/connector/kafka/connect/sink/SolaceSinkConnectorConfigTest.java b/src/test/java/com/solace/connector/kafka/connect/sink/SolaceSinkConnectorConfigTest.java
index b51b222..06b00e9 100644
--- a/src/test/java/com/solace/connector/kafka/connect/sink/SolaceSinkConnectorConfigTest.java
+++ b/src/test/java/com/solace/connector/kafka/connect/sink/SolaceSinkConnectorConfigTest.java
@@ -8,9 +8,14 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.stream.IntStream;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.emptyArray;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -42,13 +47,32 @@ public void shouldReturnConfiguredSolRecordProcessorIFGivenConfigurableClass() {
SolaceSinkConnectorConfig config = new SolaceSinkConnectorConfig(configProps);
// THEN
- SolRecordProcessorIF processor = config.getConfiguredInstance(SolaceSinkConstants.SOL_RECORD_PROCESSOR, SolRecordProcessorIF.class);;
+ SolRecordProcessorIF processor = config.getConfiguredInstance(SolaceSinkConstants.SOL_RECORD_PROCESSOR, SolRecordProcessorIF.class);
assertNotNull(processor);
assertNotNull(((TestSolRecordProcessorIF)processor).configs);
assertEquals("dummy", ((TestSolRecordProcessorIF)processor).configs.get("processor.config"));
}
+ @Test
+ public void testSplitTopics() {
+ String[] topics = IntStream.range(0, 10)
+ .mapToObj(i -> RandomStringUtils.randomAlphanumeric(30))
+ .toArray(String[]::new);
+ SolaceSinkConnectorConfig config = new SolaceSinkConnectorConfig(
+ Collections.singletonMap(SolaceSinkConstants.SOL_TOPICS, String.join(",", topics)));
+ assertNotNull(config.getTopics());
+ assertArrayEquals(topics, config.getTopics());
+ }
+
+ @Test
+ public void testNullTopics() {
+ SolaceSinkConnectorConfig config = new SolaceSinkConnectorConfig(
+ Collections.singletonMap(SolaceSinkConstants.SOL_TOPICS, null));
+ assertNotNull(config.getTopics());
+ assertThat(config.getTopics(), emptyArray());
+ }
+
public static class TestSolRecordProcessorIF implements SolRecordProcessorIF {
Map configs;
diff --git a/src/test/java/com/solace/connector/kafka/connect/sink/SolaceSinkSenderTest.java b/src/test/java/com/solace/connector/kafka/connect/sink/SolaceSinkSenderTest.java
index ea71da9..d58d0ea 100644
--- a/src/test/java/com/solace/connector/kafka/connect/sink/SolaceSinkSenderTest.java
+++ b/src/test/java/com/solace/connector/kafka/connect/sink/SolaceSinkSenderTest.java
@@ -24,27 +24,37 @@
import com.solacesystems.jcsmp.JCSMPFactory;
import com.solacesystems.jcsmp.JCSMPSession;
import com.solacesystems.jcsmp.SDTMap;
+import com.solacesystems.jcsmp.transaction.RollbackException;
+import com.solacesystems.jcsmp.transaction.TransactedSession;
+import com.solacesystems.jcsmp.transaction.TransactionStatus;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
public class SolaceSinkSenderTest {
@Mock private SolSessionHandler mkSessionHandler;
@Mock private JCSMPSession mkJcsmpSession;
+ @Mock private TransactedSession mkTransactedSession;
@Mock private SolaceSinkTask mkSolaceSinkTask;
@Test
@@ -62,7 +72,6 @@ public void shouldAddKafkaRecordHeadersOnBytesXMLMessageWhenEnabled() throws JCS
final SolaceSinkSender sender = new SolaceSinkSender(
connectorConfig,
mkSessionHandler,
- false,
mkSolaceSinkTask
);
@@ -101,4 +110,52 @@ public void shouldAddKafkaRecordHeadersOnBytesXMLMessageWhenEnabled() throws JCS
assertEquals("val2", properties.getString("h2"));
assertEquals("val5", properties.getString("h3"));
}
-}
\ No newline at end of file
+
+ @ParameterizedTest(name = "[{index}] rollback={0}")
+ @ValueSource(booleans = {true, false})
+ public void testCommit(boolean rollback) throws Exception {
+ Mockito.when(mkSessionHandler.getSession()).thenReturn(mkJcsmpSession);
+ Mockito.when(mkSessionHandler.getTxSession()).thenReturn(mkTransactedSession);
+ Mockito.when(mkTransactedSession.getStatus()).thenReturn(TransactionStatus.ACTIVE);
+
+ if (rollback) {
+ Mockito.doThrow(new RollbackException("test-rollback")).when(mkTransactedSession).commit();
+ }
+
+ Map config = new HashMap<>();
+ config.put(SolaceSinkConstants.SOL_RECORD_PROCESSOR, SolSimpleRecordProcessor.class.getName());
+ config.put(SolaceSinkConstants.SOl_USE_TRANSACTIONS_FOR_QUEUE, Boolean.toString(true));
+ config.put(SolaceSinkConstants.SOl_USE_TRANSACTIONS_FOR_TOPICS, Boolean.toString(true));
+
+ final SolaceSinkConnectorConfig connectorConfig = new SolaceSinkConnectorConfig(config);
+ final SolaceSinkSender sender = new SolaceSinkSender(connectorConfig, mkSessionHandler, mkSolaceSinkTask);
+
+ sender.producerHandler.getTxMsgCount().incrementAndGet();
+ if (rollback) {
+ assertThrows(RollbackException.class, sender::commit);
+ } else {
+ sender.commit();
+ }
+
+ Mockito.verify(mkTransactedSession, Mockito.times(1)).commit();
+ assertEquals(0, sender.producerHandler.getTxMsgCount().get());
+ }
+
+ @Test
+ public void testCommitNoMessages() throws Exception {
+ Mockito.when(mkSessionHandler.getSession()).thenReturn(mkJcsmpSession);
+ Mockito.when(mkSessionHandler.getTxSession()).thenReturn(mkTransactedSession);
+
+ Map config = new HashMap<>();
+ config.put(SolaceSinkConstants.SOL_RECORD_PROCESSOR, SolSimpleRecordProcessor.class.getName());
+ config.put(SolaceSinkConstants.SOl_USE_TRANSACTIONS_FOR_QUEUE, Boolean.toString(true));
+ config.put(SolaceSinkConstants.SOl_USE_TRANSACTIONS_FOR_TOPICS, Boolean.toString(true));
+
+ final SolaceSinkConnectorConfig connectorConfig = new SolaceSinkConnectorConfig(config);
+ final SolaceSinkSender sender = new SolaceSinkSender(connectorConfig, mkSessionHandler, mkSolaceSinkTask);
+ sender.commit();
+
+ Mockito.verify(mkTransactedSession, Mockito.times(0)).commit();
+ assertEquals(0, sender.producerHandler.getTxMsgCount().get());
+ }
+}
diff --git a/src/test/resources/log4j2.xml b/src/test/resources/log4j2.xml
index 01eb664..0ff37d5 100644
--- a/src/test/resources/log4j2.xml
+++ b/src/test/resources/log4j2.xml
@@ -6,6 +6,9 @@
+
+
+