From d84917c525aece7498566a0eff8558ebfa73826a Mon Sep 17 00:00:00 2001 From: franz1981 Date: Mon, 25 Oct 2021 16:48:23 +0200 Subject: [PATCH] QPIDJMS-552 JMS 2 Completion threads shouldn't scale with the number of sessions --- .../org/apache/qpid/jms/JmsConnection.java | 18 ++- .../java/org/apache/qpid/jms/JmsSession.java | 103 +++++++++++++++--- .../qpid/jms/meta/JmsConnectionInfo.java | 10 ++ 3 files changed, 112 insertions(+), 19 deletions(-) diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java index 4a1edfa3a..65fc10e31 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java @@ -22,7 +22,13 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.LinkedTransferQueue; +import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -106,7 +112,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection private final AtomicBoolean started = new AtomicBoolean(); private final AtomicReference failureCause = new AtomicReference<>(); private final JmsConnectionInfo connectionInfo; - private final ThreadPoolExecutor executor; + protected final ThreadPoolExecutor executor; private ExceptionListener exceptionListener; private JmsMessageFactory messageFactory; @@ -127,7 +133,7 @@ protected JmsConnection(final JmsConnectionInfo connectionInfo, Provider provide // will also serve as a means of preventing JVM shutdown should a client application // not have it's own mechanism for doing so if the configuration specifies that the // Connection create this thread as a non-daemon thread. - executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue(), + executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedTransferQueue<>(), new QpidJMSThreadFactory("QpidJMS Connection Executor: " + connectionInfo.getId(), connectionInfo.isUseDaemonThread())); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); @@ -1140,6 +1146,14 @@ void setMessageFactory(JmsMessageFactory factory) { messageFactory = factory; } + public void setUseConnectionCompletionHandler(boolean value) { + connectionInfo.setUseConnectionCompletionHandler(value); + } + + public boolean isUseConnectionCompletionHandler() { + return connectionInfo.isUseConnectionCompletionHandler(); + } + public boolean isForceAsyncAcks() { return connectionInfo.isForceAsyncAcks(); } diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java index 4907d1694..d69ecb0cc 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; @@ -71,6 +72,7 @@ import javax.jms.TopicSession; import javax.jms.TopicSubscriber; +import io.netty.util.internal.PlatformDependent; import org.apache.qpid.jms.exceptions.JmsConnectionFailedException; import org.apache.qpid.jms.exceptions.JmsExceptionSupport; import org.apache.qpid.jms.message.JmsInboundMessageDispatch; @@ -128,7 +130,8 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe private volatile ThreadPoolExecutor completionExcecutor; private AtomicReference deliveryThread = new AtomicReference(); private boolean deliveryThreadCheckEnabled = true; - private AtomicReference completionThread = new AtomicReference(); + private volatile Thread completionThread = null; + private final AtomicBoolean processCompletion = new AtomicBoolean(); private final AtomicLong consumerIdGenerator = new AtomicLong(); private final AtomicLong producerIdGenerator = new AtomicLong(); @@ -136,6 +139,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe private boolean sessionRecovered; private final AtomicReference failureCause = new AtomicReference<>(); private final Deque asyncSendQueue = new ConcurrentLinkedDeque(); + private final java.util.Queue completionTasks = PlatformDependent.newMpscQueue(); protected JmsSession(JmsConnection connection, JmsSessionId sessionId, int acknowledgementMode) throws JMSException { this.connection = connection; @@ -194,6 +198,57 @@ public void onPendingFailure(ProviderException cause) { } } + private void processCompletions() { + for (; ; ) { + assert processCompletion.get(); + completionThread = Thread.currentThread(); + try { + Runnable completionTask; + while ((completionTask = completionTasks.poll()) != null) { + try { + completionTask.run(); + } catch (Throwable t) { + LOG.debug("errored on processCompletions duty cycle", t); + } + } + } finally { + completionThread = null; + processCompletion.set(false); + } + if (completionTasks.isEmpty()) { + return; + } + // a racing asyncProcessCompletion has won: no need to fire a countinuation + if (!processCompletion.compareAndSet(false, true)) { + return; + } + if (!connection.isUseConnectionCompletionHandler()) { + // an exclusive per-session executor doesn't need to guarantee fair completion processing + // and can just keep on processing completion on this same thread + continue; + } + // guarantees completions belonging to other sessions to be processed + getCompletionExecutor().execute(this::processCompletions); + return; + } + } + + private void asyncProcessCompletion(final Runnable completionTask) { + asyncProcessCompletion(completionTask, false); + } + + private void asyncProcessCompletion(final Runnable completionTask, final boolean ignoreSessionClosed) { + if (!ignoreSessionClosed) { + if (closed.get()) { + return; + } + } + completionTasks.add(completionTask); + if (processCompletion.compareAndSet(false, true)) { + getCompletionExecutor().execute(this::processCompletions); + } + } + int acknowledgementMode() { return acknowledgementMode; } @@ -372,15 +427,25 @@ protected boolean shutdown(Throwable cause) throws JMSException { if (cause == null) { cause = new JMSException("Session closed remotely before message transfer result was notified"); } - - getCompletionExecutor().execute(new FailOrCompleteAsyncCompletionsTask(JmsExceptionSupport.create(cause))); - getCompletionExecutor().shutdown(); + asyncProcessCompletion(new FailOrCompleteAsyncCompletionsTask(JmsExceptionSupport.create(cause)), true); + if (!connection.isUseConnectionCompletionHandler()) { + getCompletionExecutor().shutdown(); + } } - - try { - getCompletionExecutor().awaitTermination(connection.getCloseTimeout(), TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - LOG.trace("Session close awaiting send completions was interrupted"); + if (connection.isUseConnectionCompletionHandler()) { + final CountDownLatch completed = new CountDownLatch(1); + try { + asyncProcessCompletion(completed::countDown, true); + completed.await(connection.getCloseTimeout(), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + LOG.trace("Session close awaiting send completions was interrupted"); + } + } else { + try { + getCompletionExecutor().awaitTermination(connection.getCloseTimeout(), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + LOG.trace("Session close awaiting send completions was interrupted"); + } } try { @@ -447,7 +512,7 @@ JmsMessageProducer producerClosed(JmsProducerInfo resource, Throwable cause) { try { if (producer != null) { - getCompletionExecutor().execute(new FailOrCompleteAsyncCompletionsTask( + asyncProcessCompletion(new FailOrCompleteAsyncCompletionsTask( producer.getProducerId(), JmsExceptionSupport.create(cause))); producer.shutdown(cause); } @@ -1200,11 +1265,15 @@ private ExecutorService getCompletionExecutor() { synchronized (sessionInfo) { exec = completionExcecutor; if (exec == null) { - exec = createExecutor("completion dispatcher", completionThread); - + if (connection.isUseConnectionCompletionHandler()) { + exec = connection.executor; + } else { + exec = createExecutor("completion dispatcher", null); + } // Ensure work thread is fully up before allowing other threads // to attempt to execute on this instance. - Future starter = exec.submit(() -> {}); + Future starter = exec.submit(() -> { + }); try { starter.get(); } catch (InterruptedException | ExecutionException e) { @@ -1303,7 +1372,7 @@ void checkIsDeliveryThread() throws JMSException { } void checkIsCompletionThread() throws JMSException { - if (Thread.currentThread().equals(completionThread.get())) { + if (Thread.currentThread().equals(completionThread)) { throw new IllegalStateException("Illegal invocation from CompletionListener callback"); } } @@ -1391,11 +1460,11 @@ public void onInboundMessage(JmsInboundMessageDispatch envelope) { } protected void onCompletedMessageSend(final JmsOutboundMessageDispatch envelope) { - getCompletionExecutor().execute(new AsyncCompletionTask(envelope)); + asyncProcessCompletion(new AsyncCompletionTask(envelope)); } protected void onFailedMessageSend(final JmsOutboundMessageDispatch envelope, final Throwable cause) { - getCompletionExecutor().execute(new AsyncCompletionTask(envelope, cause)); + asyncProcessCompletion(new AsyncCompletionTask(envelope, cause)); } protected void onConnectionInterrupted() { @@ -1403,7 +1472,7 @@ protected void onConnectionInterrupted() { // TODO - Synthesize a better exception JMSException failureCause = new JMSException("Send failed due to connection loss"); - getCompletionExecutor().execute(new FailOrCompleteAsyncCompletionsTask(failureCause)); + asyncProcessCompletion(new FailOrCompleteAsyncCompletionsTask(failureCause)); for (JmsMessageProducer producer : producers.values()) { producer.onConnectionInterrupted(); diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java index 0dc0dc78a..9bdcee322 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java @@ -50,6 +50,7 @@ public final class JmsConnectionInfo extends JmsAbstractResource implements Comp public static final long DEFAULT_CLOSE_TIMEOUT = 60000; public static final long DEFAULT_SEND_TIMEOUT = INFINITE; public static final long DEFAULT_REQUEST_TIMEOUT = INFINITE; + public static final boolean DEFAULT_USE_CONNECTION_COMPLETION_EXECUTOR = false; private final JmsConnectionId connectionId; private final EnumMap> extensionMap = new EnumMap<>(JmsConnectionExtensions.class); @@ -78,6 +79,7 @@ public final class JmsConnectionInfo extends JmsAbstractResource implements Comp private long requestTimeout = DEFAULT_REQUEST_TIMEOUT; private long connectTimeout = DEFAULT_CONNECT_TIMEOUT; private long closeTimeout = DEFAULT_CLOSE_TIMEOUT; + private boolean useConnectionCompletionHandler; private String queuePrefix = null; private String topicPrefix = null; @@ -259,6 +261,14 @@ public void setRequestTimeout(long requestTimeout) { this.requestTimeout = requestTimeout; } + public void setUseConnectionCompletionHandler(boolean value) { + this.useConnectionCompletionHandler = value; + } + + public boolean isUseConnectionCompletionHandler() { + return useConnectionCompletionHandler; + } + public boolean isLocalMessagePriority() { return localMessagePriority; }