Skip to content

Commit

Permalink
QPIDJMS-552 JMS 2 Completion threads shouldn't scale with the number …
Browse files Browse the repository at this point in the history
…of sessions
  • Loading branch information
franz1981 committed Oct 28, 2021
1 parent 0cf4998 commit 335bc17
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,13 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -106,7 +112,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
private final AtomicBoolean started = new AtomicBoolean();
private final AtomicReference<Exception> failureCause = new AtomicReference<>();
private final JmsConnectionInfo connectionInfo;
private final ThreadPoolExecutor executor;
protected final ThreadPoolExecutor executor;

private ExceptionListener exceptionListener;
private JmsMessageFactory messageFactory;
Expand All @@ -127,7 +133,7 @@ protected JmsConnection(final JmsConnectionInfo connectionInfo, Provider provide
// will also serve as a means of preventing JVM shutdown should a client application
// not have it's own mechanism for doing so if the configuration specifies that the
// Connection create this thread as a non-daemon thread.
executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedTransferQueue<>(),
new QpidJMSThreadFactory("QpidJMS Connection Executor: " + connectionInfo.getId(), connectionInfo.isUseDaemonThread()));

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
Expand Down Expand Up @@ -1140,6 +1146,14 @@ void setMessageFactory(JmsMessageFactory factory) {
messageFactory = factory;
}

public void setUseConnectionCompletionHandler(boolean value) {
connectionInfo.setUseConnectionCompletionHandler(value);
}

public boolean isUseConnectionCompletionHandler() {
return connectionInfo.isUseConnectionCompletionHandler();
}

public boolean isForceAsyncAcks() {
return connectionInfo.isForceAsyncAcks();
}
Expand Down
103 changes: 86 additions & 17 deletions qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -71,6 +72,7 @@
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;

import io.netty.util.internal.PlatformDependent;
import org.apache.qpid.jms.exceptions.JmsConnectionFailedException;
import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
Expand Down Expand Up @@ -128,14 +130,16 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
private volatile ThreadPoolExecutor completionExcecutor;
private AtomicReference<Thread> deliveryThread = new AtomicReference<Thread>();
private boolean deliveryThreadCheckEnabled = true;
private AtomicReference<Thread> completionThread = new AtomicReference<Thread>();
private volatile Thread completionThread = null;
private final AtomicBoolean processCompletion = new AtomicBoolean();

private final AtomicLong consumerIdGenerator = new AtomicLong();
private final AtomicLong producerIdGenerator = new AtomicLong();
private JmsTransactionContext transactionContext;
private boolean sessionRecovered;
private final AtomicReference<Throwable> failureCause = new AtomicReference<>();
private final Deque<SendCompletion> asyncSendQueue = new ConcurrentLinkedDeque<SendCompletion>();
private final java.util.Queue<Runnable> completionTasks = PlatformDependent.newMpscQueue();

protected JmsSession(JmsConnection connection, JmsSessionId sessionId, int acknowledgementMode) throws JMSException {
this.connection = connection;
Expand Down Expand Up @@ -194,6 +198,57 @@ public void onPendingFailure(ProviderException cause) {
}
}

private void processCompletions() {
for (; ; ) {
assert processCompletion.get();
completionThread = Thread.currentThread();
try {
Runnable completionTask;
while ((completionTask = completionTasks.poll()) != null) {
try {
completionTask.run();
} catch (Throwable t) {
LOG.debug("errored on processCompletions duty cycle", t);
}
}
} finally {
completionThread = null;
processCompletion.set(false);
}
if (completionTasks.isEmpty()) {
return;
}
// a racing asyncProcessCompletion has won: no need to fire a continuation
if (!processCompletion.compareAndSet(false, true)) {
return;
}
if (!connection.isUseConnectionCompletionHandler()) {
// an exclusive per-session executor doesn't need to guarantee fair completion processing
// and can just keep on processing completion on this same thread
continue;
}
// guarantees completions belonging to other sessions to be processed
getCompletionExecutor().execute(this::processCompletions);
return;
}
}

private void asyncProcessCompletion(final Runnable completionTask) {
asyncProcessCompletion(completionTask, false);
}

private void asyncProcessCompletion(final Runnable completionTask, final boolean ignoreSessionClosed) {
if (!ignoreSessionClosed) {
if (closed.get()) {
return;
}
}
completionTasks.add(completionTask);
if (processCompletion.compareAndSet(false, true)) {
getCompletionExecutor().execute(this::processCompletions);
}
}

int acknowledgementMode() {
return acknowledgementMode;
}
Expand Down Expand Up @@ -372,15 +427,25 @@ protected boolean shutdown(Throwable cause) throws JMSException {
if (cause == null) {
cause = new JMSException("Session closed remotely before message transfer result was notified");
}

getCompletionExecutor().execute(new FailOrCompleteAsyncCompletionsTask(JmsExceptionSupport.create(cause)));
getCompletionExecutor().shutdown();
asyncProcessCompletion(new FailOrCompleteAsyncCompletionsTask(JmsExceptionSupport.create(cause)), true);
if (!connection.isUseConnectionCompletionHandler()) {
getCompletionExecutor().shutdown();
}
}

try {
getCompletionExecutor().awaitTermination(connection.getCloseTimeout(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.trace("Session close awaiting send completions was interrupted");
if (connection.isUseConnectionCompletionHandler()) {
final CountDownLatch completed = new CountDownLatch(1);
try {
asyncProcessCompletion(completed::countDown, true);
completed.await(connection.getCloseTimeout(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.trace("Session close awaiting send completions was interrupted");
}
} else {
try {
getCompletionExecutor().awaitTermination(connection.getCloseTimeout(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.trace("Session close awaiting send completions was interrupted");
}
}

try {
Expand Down Expand Up @@ -447,7 +512,7 @@ JmsMessageProducer producerClosed(JmsProducerInfo resource, Throwable cause) {

try {
if (producer != null) {
getCompletionExecutor().execute(new FailOrCompleteAsyncCompletionsTask(
asyncProcessCompletion(new FailOrCompleteAsyncCompletionsTask(
producer.getProducerId(), JmsExceptionSupport.create(cause)));
producer.shutdown(cause);
}
Expand Down Expand Up @@ -1200,11 +1265,15 @@ private ExecutorService getCompletionExecutor() {
synchronized (sessionInfo) {
exec = completionExcecutor;
if (exec == null) {
exec = createExecutor("completion dispatcher", completionThread);

if (connection.isUseConnectionCompletionHandler()) {
exec = connection.executor;
} else {
exec = createExecutor("completion dispatcher", null);
}
// Ensure work thread is fully up before allowing other threads
// to attempt to execute on this instance.
Future<?> starter = exec.submit(() -> {});
Future<?> starter = exec.submit(() -> {
});
try {
starter.get();
} catch (InterruptedException | ExecutionException e) {
Expand Down Expand Up @@ -1303,7 +1372,7 @@ void checkIsDeliveryThread() throws JMSException {
}

void checkIsCompletionThread() throws JMSException {
if (Thread.currentThread().equals(completionThread.get())) {
if (Thread.currentThread().equals(completionThread)) {
throw new IllegalStateException("Illegal invocation from CompletionListener callback");
}
}
Expand Down Expand Up @@ -1391,19 +1460,19 @@ public void onInboundMessage(JmsInboundMessageDispatch envelope) {
}

protected void onCompletedMessageSend(final JmsOutboundMessageDispatch envelope) {
getCompletionExecutor().execute(new AsyncCompletionTask(envelope));
asyncProcessCompletion(new AsyncCompletionTask(envelope));
}

protected void onFailedMessageSend(final JmsOutboundMessageDispatch envelope, final Throwable cause) {
getCompletionExecutor().execute(new AsyncCompletionTask(envelope, cause));
asyncProcessCompletion(new AsyncCompletionTask(envelope, cause));
}

protected void onConnectionInterrupted() {
transactionContext.onConnectionInterrupted();

// TODO - Synthesize a better exception
JMSException failureCause = new JMSException("Send failed due to connection loss");
getCompletionExecutor().execute(new FailOrCompleteAsyncCompletionsTask(failureCause));
asyncProcessCompletion(new FailOrCompleteAsyncCompletionsTask(failureCause));

for (JmsMessageProducer producer : producers.values()) {
producer.onConnectionInterrupted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public final class JmsConnectionInfo extends JmsAbstractResource implements Comp
public static final long DEFAULT_CLOSE_TIMEOUT = 60000;
public static final long DEFAULT_SEND_TIMEOUT = INFINITE;
public static final long DEFAULT_REQUEST_TIMEOUT = INFINITE;
public static final boolean DEFAULT_USE_CONNECTION_COMPLETION_EXECUTOR = false;

private final JmsConnectionId connectionId;
private final EnumMap<JmsConnectionExtensions, BiFunction<Connection, URI, Object>> extensionMap = new EnumMap<>(JmsConnectionExtensions.class);
Expand Down Expand Up @@ -78,6 +79,7 @@ public final class JmsConnectionInfo extends JmsAbstractResource implements Comp
private long requestTimeout = DEFAULT_REQUEST_TIMEOUT;
private long connectTimeout = DEFAULT_CONNECT_TIMEOUT;
private long closeTimeout = DEFAULT_CLOSE_TIMEOUT;
private boolean useConnectionCompletionHandler;
private String queuePrefix = null;
private String topicPrefix = null;

Expand Down Expand Up @@ -259,6 +261,14 @@ public void setRequestTimeout(long requestTimeout) {
this.requestTimeout = requestTimeout;
}

public void setUseConnectionCompletionHandler(boolean value) {
this.useConnectionCompletionHandler = value;
}

public boolean isUseConnectionCompletionHandler() {
return useConnectionCompletionHandler;
}

public boolean isLocalMessagePriority() {
return localMessagePriority;
}
Expand Down

0 comments on commit 335bc17

Please sign in to comment.