Skip to content

Commit

Permalink
QPIDJMS-552 JMS 2 Completion threads shouldn't scale with the number …
Browse files Browse the repository at this point in the history
…of sessions
  • Loading branch information
franz1981 committed Feb 28, 2022
1 parent 0cf4998 commit e5fa3cb
Show file tree
Hide file tree
Showing 7 changed files with 376 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
Expand Down Expand Up @@ -83,6 +85,7 @@
import org.apache.qpid.jms.provider.ProviderSynchronization;
import org.apache.qpid.jms.tracing.JmsTracer;
import org.apache.qpid.jms.util.FifoMessageQueue;
import org.apache.qpid.jms.util.RefPool;
import org.apache.qpid.jms.util.MessageQueue;
import org.apache.qpid.jms.util.PriorityMessageQueue;
import org.apache.qpid.jms.util.QpidJMSThreadFactory;
Expand All @@ -106,7 +109,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
private final AtomicBoolean started = new AtomicBoolean();
private final AtomicReference<Exception> failureCause = new AtomicReference<>();
private final JmsConnectionInfo connectionInfo;
private final ThreadPoolExecutor executor;
protected final ThreadPoolExecutor executor;

private ExceptionListener exceptionListener;
private JmsMessageFactory messageFactory;
Expand All @@ -119,6 +122,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
private final AtomicLong transactionIdGenerator = new AtomicLong();
private final AtomicLong connectionConsumerIdGenerator = new AtomicLong();
private final Map<AsyncResult, AsyncResult> requests = new ConcurrentHashMap<>();
private final RefPool.Ref<ExecutorService> completionExecutorService;

protected JmsConnection(final JmsConnectionInfo connectionInfo, Provider provider) throws JMSException {

Expand All @@ -127,7 +131,7 @@ protected JmsConnection(final JmsConnectionInfo connectionInfo, Provider provide
// will also serve as a means of preventing JVM shutdown should a client application
// not have it's own mechanism for doing so if the configuration specifies that the
// Connection create this thread as a non-daemon thread.
executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedTransferQueue<>(),
new QpidJMSThreadFactory("QpidJMS Connection Executor: " + connectionInfo.getId(), connectionInfo.isUseDaemonThread()));

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
Expand All @@ -151,6 +155,7 @@ public void run() {

this.connectionInfo = connectionInfo;
this.connectionInfo.setConnection(this);
this.completionExecutorService = this.connectionInfo.getCompletionExecutorServiceFactory().map(Supplier::get).orElse(null);
}

JmsConnection connect() throws JMSException {
Expand Down Expand Up @@ -211,6 +216,10 @@ public void close() throws JMSException {
session.shutdown();
}

if (completionExecutorService != null) {
completionExecutorService.close();
}

for (JmsConnectionConsumer connectionConsumer : connectionConsumers.values()) {
connectionConsumer.shutdown();
}
Expand Down Expand Up @@ -266,6 +275,10 @@ public void close() throws JMSException {
}
}

RefPool.Ref<ExecutorService> getCompletionExecutorService() {
return completionExecutorService;
}

/**
* Called to free all Connection resources.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
import java.security.PrivilegedAction;
import java.util.EnumMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.function.BiFunction;
import java.util.function.Supplier;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
Expand Down Expand Up @@ -55,13 +58,18 @@
import org.apache.qpid.jms.tracing.JmsNoOpTracer;
import org.apache.qpid.jms.tracing.JmsTracer;
import org.apache.qpid.jms.tracing.JmsTracerFactory;
import org.apache.qpid.jms.util.RefPool.Ref;
import org.apache.qpid.jms.util.IdGenerator;
import org.apache.qpid.jms.util.PropertyUtil;
import org.apache.qpid.jms.util.QpidJMSForkJoinWorkerThreadFactory;
import org.apache.qpid.jms.util.ThreadPoolUtils;
import org.apache.qpid.jms.util.URISupport;
import org.apache.qpid.jms.util.URISupport.CompositeData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.qpid.jms.util.RefPool.shared;

/**
* JMS ConnectionFactory Implementation.
*/
Expand Down Expand Up @@ -102,14 +110,15 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact
private long requestTimeout = JmsConnectionInfo.DEFAULT_REQUEST_TIMEOUT;
private long closeTimeout = JmsConnectionInfo.DEFAULT_CLOSE_TIMEOUT;
private long connectTimeout = JmsConnectionInfo.DEFAULT_CONNECT_TIMEOUT;
private int completionThreads = JmsConnectionInfo.DEFAULT_COMPLETION_THREADS;
private IdGenerator clientIdGenerator;
private String clientIDPrefix;
private IdGenerator connectionIdGenerator;
private String connectionIDPrefix;
private ExceptionListener exceptionListener;
private String tracing;
private JmsTracer tracer;

private Supplier<Ref<ExecutorService>> completionExecutorServiceFactory;
private JmsPrefetchPolicy prefetchPolicy = new JmsDefaultPrefetchPolicy();
private JmsRedeliveryPolicy redeliveryPolicy = new JmsDefaultRedeliveryPolicy();
private JmsPresettlePolicy presettlePolicy = new JmsDefaultPresettlePolicy();
Expand Down Expand Up @@ -282,7 +291,7 @@ protected JmsConnectionInfo configureConnectionInfo(String username, String pass
implicitTracer = JmsTracerFactory.create(remoteURI, tracing);
connectionInfo.setTracer(implicitTracer);
}

connectionInfo.setCompletionExecutorServiceFactory(getCompletionExecutorServiceFactory());
// Set properties to make additional configuration changes
PropertyUtil.setProperties(connectionInfo, properties);

Expand Down Expand Up @@ -368,6 +377,19 @@ protected static URI createURI(String name) {
return null;
}

protected Supplier<Ref<ExecutorService>> getCompletionExecutorServiceFactory() {
if (this.completionThreads == 0) {
return null;
}
synchronized (this) {
if (completionExecutorServiceFactory == null) {
QpidJMSForkJoinWorkerThreadFactory fjThreadFactory = new QpidJMSForkJoinWorkerThreadFactory("completion thread pool", true);
completionExecutorServiceFactory = shared(() -> new ForkJoinPool(completionThreads, fjThreadFactory, null, false), ThreadPoolUtils::shutdown);
}
return completionExecutorServiceFactory;
}
}

protected synchronized IdGenerator getConnectionIdGenerator() {
if (connectionIdGenerator == null) {
if (connectionIDPrefix != null) {
Expand Down Expand Up @@ -591,6 +613,14 @@ public long getConnectTimeout() {
return this.connectTimeout;
}

public void setCompletionThreads(final int completionThreads) {
this.completionThreads = completionThreads;
}

public int getCompletionThreads() {
return completionThreads;
}

/**
* Sets the timeout value used to control how long a client will wait for a successful
* connection to the remote peer to be established before considering the attempt to
Expand Down
96 changes: 74 additions & 22 deletions qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -125,17 +127,19 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
private final JmsSessionInfo sessionInfo;
private final ReentrantLock sendLock = new ReentrantLock();
private volatile ThreadPoolExecutor deliveryExecutor;
private volatile ThreadPoolExecutor completionExcecutor;
private volatile ExecutorService completionExecutor;
private AtomicReference<Thread> deliveryThread = new AtomicReference<Thread>();
private boolean deliveryThreadCheckEnabled = true;
private AtomicReference<Thread> completionThread = new AtomicReference<Thread>();
private volatile Thread completionThread = null;
private final AtomicBoolean processCompletion = new AtomicBoolean();

private final AtomicLong consumerIdGenerator = new AtomicLong();
private final AtomicLong producerIdGenerator = new AtomicLong();
private JmsTransactionContext transactionContext;
private boolean sessionRecovered;
private final AtomicReference<Throwable> failureCause = new AtomicReference<>();
private final Deque<SendCompletion> asyncSendQueue = new ConcurrentLinkedDeque<SendCompletion>();
private final ConcurrentLinkedQueue<Runnable> completionTasks = new ConcurrentLinkedQueue<>();

protected JmsSession(JmsConnection connection, JmsSessionId sessionId, int acknowledgementMode) throws JMSException {
this.connection = connection;
Expand Down Expand Up @@ -194,6 +198,48 @@ public void onPendingFailure(ProviderException cause) {
}
}

private void processCompletions() {
assert processCompletion.get();
completionThread = Thread.currentThread();
try {
final Runnable completionTask = completionTasks.poll();
if (completionTask != null) {
try {
completionTask.run();
} catch (Throwable t) {
LOG.debug("errored on processCompletions duty cycle", t);
}
}
} finally {
completionThread = null;
processCompletion.set(false);
}
if (completionTasks.isEmpty()) {
return;
}
// a racing asyncProcessCompletion has won: no need to fire a continuation
if (!processCompletion.compareAndSet(false, true)) {
return;
}
getCompletionExecutor().execute(this::processCompletions);
}

private void asyncProcessCompletion(final Runnable completionTask) {
asyncProcessCompletion(completionTask, false);
}

private void asyncProcessCompletion(final Runnable completionTask, final boolean ignoreSessionClosed) {
if (!ignoreSessionClosed) {
if (closed.get()) {
return;
}
}
completionTasks.add(completionTask);
if (processCompletion.compareAndSet(false, true)) {
getCompletionExecutor().execute(this::processCompletions);
}
}

int acknowledgementMode() {
return acknowledgementMode;
}
Expand Down Expand Up @@ -372,15 +418,17 @@ protected boolean shutdown(Throwable cause) throws JMSException {
if (cause == null) {
cause = new JMSException("Session closed remotely before message transfer result was notified");
}

getCompletionExecutor().execute(new FailOrCompleteAsyncCompletionsTask(JmsExceptionSupport.create(cause)));
getCompletionExecutor().shutdown();
}

try {
getCompletionExecutor().awaitTermination(connection.getCloseTimeout(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.trace("Session close awaiting send completions was interrupted");
asyncProcessCompletion(new FailOrCompleteAsyncCompletionsTask(JmsExceptionSupport.create(cause)), true);
final CountDownLatch completed = new CountDownLatch(1);
try {
asyncProcessCompletion(completed::countDown, true);
completed.await(connection.getCloseTimeout(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.trace("Session close awaiting send completions was interrupted");
}
if (connection.getCompletionExecutorService() == null) {
getCompletionExecutor().shutdown();
}
}

try {
Expand Down Expand Up @@ -447,7 +495,7 @@ JmsMessageProducer producerClosed(JmsProducerInfo resource, Throwable cause) {

try {
if (producer != null) {
getCompletionExecutor().execute(new FailOrCompleteAsyncCompletionsTask(
asyncProcessCompletion(new FailOrCompleteAsyncCompletionsTask(
producer.getProducerId(), JmsExceptionSupport.create(cause)));
producer.shutdown(cause);
}
Expand Down Expand Up @@ -1195,23 +1243,27 @@ Executor getDispatcherExecutor() {
}

private ExecutorService getCompletionExecutor() {
ThreadPoolExecutor exec = completionExcecutor;
ExecutorService exec = completionExecutor;
if (exec == null) {
synchronized (sessionInfo) {
exec = completionExcecutor;
exec = completionExecutor;
if (exec == null) {
exec = createExecutor("completion dispatcher", completionThread);

if (connection.getCompletionExecutorService() != null) {
exec = connection.getCompletionExecutorService().ref();
} else {
exec = createExecutor("completion dispatcher", null);
}
// Ensure work thread is fully up before allowing other threads
// to attempt to execute on this instance.
Future<?> starter = exec.submit(() -> {});
Future<?> starter = exec.submit(() -> {
});
try {
starter.get();
} catch (InterruptedException | ExecutionException e) {
LOG.trace("Completion Executor starter task failed: {}", e.getMessage());
}

completionExcecutor = exec;
completionExecutor = exec;
}
}
}
Expand Down Expand Up @@ -1303,7 +1355,7 @@ void checkIsDeliveryThread() throws JMSException {
}

void checkIsCompletionThread() throws JMSException {
if (Thread.currentThread().equals(completionThread.get())) {
if (Thread.currentThread().equals(completionThread)) {
throw new IllegalStateException("Illegal invocation from CompletionListener callback");
}
}
Expand Down Expand Up @@ -1391,19 +1443,19 @@ public void onInboundMessage(JmsInboundMessageDispatch envelope) {
}

protected void onCompletedMessageSend(final JmsOutboundMessageDispatch envelope) {
getCompletionExecutor().execute(new AsyncCompletionTask(envelope));
asyncProcessCompletion(new AsyncCompletionTask(envelope));
}

protected void onFailedMessageSend(final JmsOutboundMessageDispatch envelope, final Throwable cause) {
getCompletionExecutor().execute(new AsyncCompletionTask(envelope, cause));
asyncProcessCompletion(new AsyncCompletionTask(envelope, cause));
}

protected void onConnectionInterrupted() {
transactionContext.onConnectionInterrupted();

// TODO - Synthesize a better exception
JMSException failureCause = new JMSException("Send failed due to connection loss");
getCompletionExecutor().execute(new FailOrCompleteAsyncCompletionsTask(failureCause));
asyncProcessCompletion(new FailOrCompleteAsyncCompletionsTask(failureCause));

for (JmsMessageProducer producer : producers.values()) {
producer.onConnectionInterrupted();
Expand Down
Loading

0 comments on commit e5fa3cb

Please sign in to comment.