From d2bc6322c9d411734e9288850eb5b83149fa06fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bal=C3=A1zs=20Czoma?= Date: Thu, 24 Sep 2020 09:54:34 -0400 Subject: [PATCH] Perf fixes (#22) * Fixed tight spin lock at poll which can result in 100% CPU if no new messages * Fixed leak in processor object use * Updated travis to also publish to local gh-pages if on SolaceDev * Fine-tuned spin turns for poll --- .travis.yml | 16 ++++++++++ gradle.properties | 2 +- .../connect/source/SolaceSourceTask.java | 32 +++++++++++-------- .../kafka/connect/source/VersionUtil.java | 2 +- 4 files changed, 37 insertions(+), 15 deletions(-) diff --git a/.travis.yml b/.travis.yml index bf18541..ddcf547 100644 --- a/.travis.yml +++ b/.travis.yml @@ -27,4 +27,20 @@ after_success: git remote add origin-pages https://${GH_TOKEN}@github.com/SolaceProducts/pubsubplus-connector-kafka-source.git > /dev/null 2>&1; git push --quiet --set-upstream origin-pages gh-pages; echo "Updated and pushed GH pages!"; + elif [ "$TRAVIS_PULL_REQUEST" = "false" ] && ! [ $(echo $TRAVIS_REPO_SLUG | grep "SolaceProducts") ]; then + git config --global user.email "travis@travis-ci.org"; + git config --global user.name "travis-ci"; + mkdir gh-pages; # Now update gh-pages + git clone --quiet --branch=gh-pages https://${GH_TOKEN}@github.com/$TRAVIS_REPO_SLUG gh-pages > /dev/null 2>&1; + rm gh-pages/downloads/pubsubplus-connector-kafka-source* + mv build/distributions/pubsubplus-connector-kafka-source* gh-pages/downloads + cd gh-pages; + pushd downloads + cp index.template index.html; FILENAME=`find . | grep *.zip | cut -d'/' -f2 | sed 's/.\{4\}$//'`; sed -i "s/CONNECTOR_NAME/$FILENAME/g" index.html; + popd; + git add -f .; + git commit -m "Latest connector distribution on successful travis build $TRAVIS_BUILD_NUMBER auto-pushed to gh-pages"; + git remote add origin-pages https://${GH_TOKEN}@github.com/$TRAVIS_REPO_SLUG.git > /dev/null 2>&1; + git push --quiet --set-upstream origin-pages gh-pages; + echo "Updated and pushed GH pages!"; fi diff --git a/gradle.properties b/gradle.properties index 516314f..fb7cb53 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -version=2.0.1 \ No newline at end of file +version=2.0.2 \ No newline at end of file diff --git a/src/main/java/com/solace/connector/kafka/connect/source/SolaceSourceTask.java b/src/main/java/com/solace/connector/kafka/connect/source/SolaceSourceTask.java index c51acba..3132569 100644 --- a/src/main/java/com/solace/connector/kafka/connect/source/SolaceSourceTask.java +++ b/src/main/java/com/solace/connector/kafka/connect/source/SolaceSourceTask.java @@ -24,19 +24,12 @@ import com.solacesystems.jcsmp.JCSMPException; import com.solacesystems.jcsmp.JCSMPProperties; import com.solacesystems.jcsmp.JCSMPSession; -import com.solacesystems.jcsmp.JCSMPSessionStats; -import com.solacesystems.jcsmp.statistics.StatType; - import java.util.ArrayList; import java.util.Collections; -import java.util.Enumeration; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; import org.slf4j.Logger; @@ -59,7 +52,7 @@ public class SolaceSourceTask extends SourceTask { // implements XMLMessageListe String skafkaTopic; SolaceSourceTopicListener topicListener = null; SolaceSourceQueueConsumer queueConsumer = null; - + private int spinTurns = 0; private volatile boolean shuttingDown = false; // private Class cProcessor; @@ -74,6 +67,16 @@ public String version() { public void start(Map props) { connectorConfig = new SolaceSourceConnectorConfig(props); + try { + processor = connectorConfig + .getConfiguredInstance(SolaceSourceConstants + .SOL_MESSAGE_PROCESSOR, SolMessageProcessorIF.class); + } catch (Exception e) { + log.info( + "================ Encountered exception in creating the message processor." + + " Cause: {}, Stacktrace: {} ", + e.getCause(), e.getStackTrace()); + } skafkaTopic = connectorConfig.getString(SolaceSourceConstants.KAFKA_TOPIC); solSessionHandler = new SolSessionHandler(connectorConfig); try { @@ -106,9 +109,15 @@ public void start(Map props) { public synchronized List poll() throws InterruptedException { if (shuttingDown || ingressMessages.size() == 0) { - return null; // Nothing to do, return control + spinTurns++; + if (spinTurns > 100) { + spinTurns = 0; + Thread.sleep(1); + } + return null; // Nothing to do, return control } // There is at least one message to process + spinTurns = 0; // init spinTurns again List records = new ArrayList<>(); int processedInIhisBatch = 0; int count = 0; @@ -116,10 +125,7 @@ public synchronized List poll() throws InterruptedException { while (count < arraySize) { BytesXMLMessage msg = ingressMessages.take(); try { - processor = connectorConfig - .getConfiguredInstance(SolaceSourceConstants - .SOL_MESSAGE_PROCESSOR, SolMessageProcessorIF.class) - .process(connectorConfig.getString(SolaceSourceConstants.SOL_KAFKA_MESSAGE_KEY), msg); + processor.process(connectorConfig.getString(SolaceSourceConstants.SOL_KAFKA_MESSAGE_KEY), msg); } catch (Exception e) { log.info( "================ Encountered exception in message processing....discarded." diff --git a/src/main/java/com/solace/connector/kafka/connect/source/VersionUtil.java b/src/main/java/com/solace/connector/kafka/connect/source/VersionUtil.java index 33c4c8b..258df9b 100644 --- a/src/main/java/com/solace/connector/kafka/connect/source/VersionUtil.java +++ b/src/main/java/com/solace/connector/kafka/connect/source/VersionUtil.java @@ -7,7 +7,7 @@ public class VersionUtil { */ public static String getVersion() { - return "2.0.1"; + return "2.0.2"; }