Skip to content

Commit

Permalink
Merge pull request #44 from SolaceDev/stage-2.2.0
Browse files Browse the repository at this point in the history
* Add `sol.use_transactions_for_topics` config option
* Support transactions when `sol.dynamic_destination = true`
* Support publishing to queues when `sol.dynamic_destination = true`
* Add tasks to release the connector to Maven Central
* Upgrade log4j to 2.16.0 in tests to fix CVE-2021-45046 risks
  • Loading branch information
Nephery authored Dec 17, 2021
2 parents b4d72d8 + 364dd3c commit a02fda4
Show file tree
Hide file tree
Showing 24 changed files with 1,781 additions and 257 deletions.
7 changes: 5 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
/bin/

# Jave related
target/**
/target/
*.jar
*.war
*.ear
Expand Down Expand Up @@ -45,4 +45,7 @@ local.properties
*.ipr

# Unzipped test connector
src/integrationTest/resources/pubsubplus-connector-kafka*/
src/integrationTest/resources/pubsubplus-connector-kafka*/

# Local testing
solace.properties
71 changes: 49 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,17 +258,25 @@ A value of 0 results in the replay of the entire Kafka Topic. A positive value r

We recommend using PubSub+ Topics if high throughput is required and the Kafka Topic is configured for high performance. Message duplication and loss mimics the underlying reliability and QoS configured for the Kafka topic.

By default, messages are published to topics using direct messaging. To publish persistent messages to topics using a local transaction, set `sol.use_transactions_for_topics` to `true`. See [Sending with Local Transactions](#sending-with-local-transactions) for more info.

#### Sending to PubSub+ Queue

When Kafka records reliability is critical, we recommend configuring the Sink Connector to send records to the Event Mesh using PubSub+ queues at the cost of reduced throughput.

A PubSub+ queue guarantees order of delivery, provides High Availability and Disaster Recovery (depending on the setup of the PubSub+ brokers) and provides an acknowledgment to the connector when the event is stored in all HA and DR members and flushed to disk. This is a higher guarantee than is provided by Kafka, even for Kafka idempotent delivery.

The connector uses local transactions to deliver to the queue by default. The transaction is committed if messages are flushed by Kafka Connect (see below how to tune flush interval) or the outstanding messages size reaches the `sol.autoflush.size` (default 200) configuration.
The connector uses local transactions to deliver to the queue by default. See [Sending with Local Transactions](#sending-with-local-transactions) for more info.

Note that generally one connector can send to only one queue.

##### Recovery from Kafka Connect API or Kafka Broker Failure
#### Sending with Local Transactions

By default, only sending to a queue uses local transactions. To use the transacted session to send persistent messages to topics, set `sol.use_transactions_for_topics` to `true`.

The transaction is committed if messages are flushed by Kafka Connect (see [below how to tune flush interval](#recovery-from-kafka-connect-api-or-kafka-broker-failure)) or the outstanding messages size reaches the `sol.autoflush.size` (default 200) configuration.

#### Recovery from Kafka Connect API or Kafka Broker Failure

Operators are expected to monitor their connector for failures since errors will cause it to stop. If any are found and the connector was stopped, the operator must explicitly restart it again once the error condition has been resolved.

Expand Down Expand Up @@ -329,44 +337,63 @@ Kerberos has some very specific requirements to operate correctly. Some addition

JDK 8 or higher is required for this project.

First, clone this GitHub repo:
```shell
git clone https://github.com/SolaceProducts/pubsubplus-connector-kafka-sink.git
cd pubsubplus-connector-kafka-sink
```

Then run the build script:
```shell
./gradlew clean build
```
1. First, clone this GitHub repo:
```shell
git clone https://github.com/SolaceProducts/pubsubplus-connector-kafka-sink.git
cd pubsubplus-connector-kafka-sink
```
2. Install the test support module:
```shell
git submodule update --init --recursive
cd solace-integration-test-support
./mvnw clean install -DskipTests
cd ..
```
3. Then run the build script:
```shell
./gradlew clean build
```

This script creates artifacts in the `build` directory, including the deployable packaged PubSub+ Sink Connector archives under `build\distributions`.

### Test the Project

An integration test suite is also included, which spins up a Docker-based deployment environment that includes a PubSub+ event broker, Zookeeper, Kafka broker, Kafka Connect. It deploys the connector to Kafka Connect and runs end-to-end tests.

1. Install the test support module:
```shell
git submodule update --init --recursive
cd solace-integration-test-support
./mvnw clean install -DskipTests
cd ..
```
2. Run the tests:
1. Run the tests:
```shell
./gradlew clean test integrationTest
```

### Build a New Record Processor

The processing of a Kafka record to create a PubSub+ message is handled by an interface defined in [`SolRecordProcessorIF.java`](/src/main/java/com/solace/connector/kafka/connect/sink/SolRecordProcessorIF.java). This is a simple interface that creates the Kafka source records from the PubSub+ messages. This project includes three examples of classes that implement this interface:
The processing of a Kafka record to create a PubSub+ message is handled by [`SolRecordProcessorIF`](/src/main/java/com/solace/connector/kafka/connect/sink/SolRecordProcessorIF.java). This is a simple interface that creates the Kafka source records from the PubSub+ messages.

To get started, import the following dependency into your project:

**Maven**
```xml
<dependency>
<groupId>com.solace.connector.kafka.connect</groupId>
<artifactId>pubsubplus-connector-kafka-sink</artifactId>
<version>2.2.0</version>
</dependency>
```

**Gradle**
```groovy
compile "com.solace.connector.kafka.connect:pubsubplus-connector-kafka-sink:2.2.0"
```

Now you can implement your custom `SolRecordProcessorIF`.

For reference, this project includes three examples which you can use as starting points for implementing your own custom record processors:

* [SolSimpleRecordProcessor](/src/main/java/com/solace/connector/kafka/connect/sink/recordprocessor/SolSimpleRecordProcessor.java)
* [SolSimpleKeyedRecordProcessor](/src/main/java/com/solace/connector/kafka/connect/sink/recordprocessor/SolSimpleKeyedRecordProcessor.java)
* [SolDynamicDestinationRecordProcessor](/src/main/java/com/solace/connector/kafka/connect/sink/recordprocessor/SolDynamicDestinationRecordProcessor.java)

You can use these examples as starting points for implementing your own custom record processors.
Once you've built the jar file for your custom record processor project, place it into the same directory as this connector, and update the connector's `sol.record_processor_class` config to point to the class of your new record processor.

More information on Kafka sink connector development can be found here:
- [Apache Kafka Connect](https://kafka.apache.org/documentation/)
Expand Down
112 changes: 107 additions & 5 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
import com.github.spotbugs.snom.SpotBugsTask
import io.github.gradlenexus.publishplugin.InitializeNexusStagingRepository

plugins {
id 'java'
id 'distribution'
id 'jacoco'
id 'maven-publish'
id 'pmd'
id 'signing'
id 'com.github.spotbugs' version '4.7.6'
id 'io.github.gradle-nexus.publish-plugin' version '1.1.0'
id 'org.gradle.test-retry' version '1.3.1'
id 'org.unbroken-dome.test-sets' version '2.2.1'
}

ext {
kafkaVersion = '2.8.1'
solaceJavaAPIVersion = '10.12.0'
solaceJavaAPIVersion = '10.12.1'
isSnapshot = project.version.endsWith('-SNAPSHOT')
}

repositories {
Expand All @@ -33,14 +39,15 @@ testSets {

dependencies {
integrationTestImplementation 'org.junit.jupiter:junit-jupiter:5.8.1'
integrationTestImplementation 'org.junit-pioneer:junit-pioneer:1.4.2'
integrationTestImplementation 'org.junit-pioneer:junit-pioneer:1.5.0'
integrationTestImplementation 'org.mockito:mockito-junit-jupiter:3.12.4'
integrationTestImplementation 'org.testcontainers:testcontainers:1.16.0'
integrationTestImplementation 'org.testcontainers:junit-jupiter:1.16.0'
integrationTestImplementation 'org.testcontainers:kafka:1.16.0'
integrationTestImplementation 'com.solace.test.integration:pubsubplus-junit-jupiter:0.5.0'
integrationTestImplementation 'org.testcontainers:toxiproxy:1.16.0'
integrationTestImplementation 'com.solace.test.integration:pubsubplus-junit-jupiter:0.7.1'
integrationTestImplementation 'org.slf4j:slf4j-api:1.7.32'
integrationTestImplementation 'org.apache.logging.log4j:log4j-slf4j-impl:2.14.1'
integrationTestImplementation 'org.apache.logging.log4j:log4j-slf4j-impl:2.16.0'
integrationTestImplementation 'org.apache.commons:commons-configuration2:2.6'
integrationTestImplementation 'commons-beanutils:commons-beanutils:1.9.4'
integrationTestImplementation 'com.google.code.gson:gson:2.3.1'
Expand All @@ -50,7 +57,7 @@ dependencies {
testImplementation 'org.junit.jupiter:junit-jupiter:5.8.1'
testImplementation 'org.mockito:mockito-junit-jupiter:3.12.4'
testImplementation 'org.hamcrest:hamcrest-all:1.3'
testImplementation 'org.apache.logging.log4j:log4j-slf4j-impl:2.14.1'
testImplementation 'org.apache.logging.log4j:log4j-slf4j-impl:2.16.0'
compile "org.apache.kafka:connect-api:$kafkaVersion"
compile "com.solacesystems:sol-jcsmp:$solaceJavaAPIVersion"
}
Expand Down Expand Up @@ -96,6 +103,10 @@ project.integrationTest {
useJUnitPlatform()
outputs.upToDateWhen { false }
dependsOn prepDistForIntegrationTesting
shouldRunAfter test
retry {
maxRetries = 3
}
afterSuite { desc, result ->
if (!desc.parent)
println("${result.resultType} " +
Expand Down Expand Up @@ -181,6 +192,11 @@ project.compileJava {
dependsOn generateJava
}

java {
withJavadocJar()
withSourcesJar()
}

distributions {
main {
contents {
Expand All @@ -197,3 +213,89 @@ distributions {
}
}
}

publishing {
publications {
maven(MavenPublication) {
from components.java
pom {
name = "Solace PubSub+ Connector for Kafka: Sink"
description = "The Solace/Kafka adapter consumes Kafka topic records and streams them to the PubSub+ Event Mesh as topic and/or queue data events."
url = "https://github.com/SolaceProducts/pubsubplus-connector-kafka-sink"
packaging = "jar"
licenses {
license {
name = "Apache License, Version 2.0"
url = "https://github.com/SolaceProducts/pubsubplus-connector-kafka-sink/blob/master/LICENSE"
distribution = "repo"
}
}
organization {
name = "Solace"
url = "https://www.solace.com"
}
developers {
developer {
name = "Support for Solace"
email = "[email protected]"
organization = "Solace"
organizationUrl = "http://solace.community"
}
}
scm {
connection = "scm:git:git://github.com/SolaceProducts/pubsubplus-connector-kafka-sink.git"
developerConnection = "scm:git:[email protected]:SolaceProducts/pubsubplus-connector-kafka-sink.git"
url = "https://github.com/SolaceProducts/pubsubplus-connector-kafka-sink.git"
}
}
}
}
repositories {
maven {
def releasesUrl = uri('http://apps-jenkins:9090/nexus/content/repositories/releases')
def snapshotRepositoryUrl = uri('http://apps-jenkins:9090/nexus/content/repositories/snapshots')
url = isSnapshot ? snapshotRepositoryUrl : releasesUrl
name = 'internal'
credentials {
username = project.properties[name + "Username"]
password = project.properties[name + "Password"]
}
}
}
}

nexusPublishing {
repositories {
sonatype {
nexusUrl = uri('https://oss.sonatype.org/service/local/')
snapshotRepositoryUrl = uri('https://oss.sonatype.org/content/repositories/snapshots')
// gets credentials from project.properties["sonatypeUsername"] project.properties["sonatypePassword"]
}
}
}

signing {
required {
!isSnapshot
}
useGpgCmd()
sign publishing.publications.maven
}

tasks.withType(Sign) {
onlyIf {
gradle.taskGraph.allTasks.any {task ->
task.name.startsWith("publish") && task.name.contains('Sonatype')
}
}
shouldRunAfter test, integrationTest
}

tasks.withType(InitializeNexusStagingRepository).configureEach {
dependsOn test, integrationTest
shouldRunAfter tasks.withType(Sign)
}

tasks.withType(PublishToMavenRepository).configureEach {
dependsOn test, integrationTest
}
Loading

0 comments on commit a02fda4

Please sign in to comment.