Skip to content

Commit

Permalink
QPIDJMS-552 JMS 2 Completion threads shouldn't scale with the number …
Browse files Browse the repository at this point in the history
…of sessions
  • Loading branch information
franz1981 committed Oct 28, 2021
1 parent 0cf4998 commit 6814114
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,13 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -107,6 +113,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
private final AtomicReference<Exception> failureCause = new AtomicReference<>();
private final JmsConnectionInfo connectionInfo;
private final ThreadPoolExecutor executor;
private volatile ThreadPoolExecutor completionExecutor;

private ExceptionListener exceptionListener;
private JmsMessageFactory messageFactory;
Expand Down Expand Up @@ -183,6 +190,44 @@ JmsConnection connect() throws JMSException {
return this;
}

ExecutorService getCompletionExecutor() {
ThreadPoolExecutor exec = completionExecutor;
if (exec == null) {
synchronized (this) {
exec = completionExecutor;
if (exec == null) {
// it can grow "unbounded" to serve multiple concurrent session completions:
// in reality it is bounded by the amount of concurrent completion requests
exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5, TimeUnit.SECONDS, new SynchronousQueue<>(),
new QpidJMSThreadFactory("JmsConnection ["+ connectionInfo.getId() + "] completion dispatcher", connectionInfo.isUseDaemonThread()));
exec.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy() {

@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// Completely ignore the task if the connection has closed.
if (!closed.get()) {
LOG.trace("Task {} rejected from executor: {}", r, e);
super.rejectedExecution(r, e);
}
}
});
// Ensure work thread is fully up before allowing other threads
// to attempt to execute on this instance.
Future<?> starter = exec.submit(() -> {});
try {
starter.get();
} catch (InterruptedException | ExecutionException e) {
LOG.trace("Completion Executor starter task failed: {}", e.getMessage());
}

completionExecutor = exec;
}
}
}

return exec;
}

@Override
public void close() throws JMSException {
boolean interrupted = Thread.interrupted();
Expand Down Expand Up @@ -215,6 +260,17 @@ public void close() throws JMSException {
connectionConsumer.shutdown();
}

final ThreadPoolExecutor completionExecutor = this.completionExecutor;

if (completionExecutor != null) {
completionExecutor.shutdown();
try {
completionExecutor.awaitTermination(connectionInfo.getCloseTimeout(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.trace("Connection close awaiting session send completions was interrupted");
}
}

if (isConnected() && !isFailed()) {
ProviderFuture request = provider.newProviderFuture();
requests.put(request, request);
Expand Down
97 changes: 53 additions & 44 deletions qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,8 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -71,6 +69,7 @@
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;

import io.netty.util.internal.PlatformDependent;
import org.apache.qpid.jms.exceptions.JmsConnectionFailedException;
import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
Expand Down Expand Up @@ -125,17 +124,17 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
private final JmsSessionInfo sessionInfo;
private final ReentrantLock sendLock = new ReentrantLock();
private volatile ThreadPoolExecutor deliveryExecutor;
private volatile ThreadPoolExecutor completionExcecutor;
private AtomicReference<Thread> deliveryThread = new AtomicReference<Thread>();
private final AtomicReference<Thread> deliveryThread = new AtomicReference<Thread>();
private boolean deliveryThreadCheckEnabled = true;
private AtomicReference<Thread> completionThread = new AtomicReference<Thread>();
private final AtomicReference<Thread> completionThread = new AtomicReference<Thread>();

private final AtomicLong consumerIdGenerator = new AtomicLong();
private final AtomicLong producerIdGenerator = new AtomicLong();
private JmsTransactionContext transactionContext;
private boolean sessionRecovered;
private final AtomicReference<Throwable> failureCause = new AtomicReference<>();
private final Deque<SendCompletion> asyncSendQueue = new ConcurrentLinkedDeque<SendCompletion>();
private final java.util.Queue<Runnable> completionTasks = PlatformDependent.newMpscQueue();

protected JmsSession(JmsConnection connection, JmsSessionId sessionId, int acknowledgementMode) throws JMSException {
this.connection = connection;
Expand Down Expand Up @@ -194,6 +193,42 @@ public void onPendingFailure(ProviderException cause) {
}
}

private void processCompletions() {
do {
if (!completionThread.compareAndSet(null, Thread.currentThread())) {
return;
}
try {
Runnable completionTask;
while ((completionTask = completionTasks.poll()) != null) {
try {
completionTask.run();
} catch (Throwable t) {
LOG.debug("errored on processCompletions duty cycle", t);
}
}
} finally {
completionThread.set(null);
}
} while (!completionTasks.isEmpty());
}

private void asyncProcessCompletion(final Runnable completionTask) {
asyncProcessCompletion(completionTask, false);
}

private void asyncProcessCompletion(final Runnable completionTask, final boolean ignoreSessionClosed) {
if (!ignoreSessionClosed) {
if (closed.get()) {
return;
}
}
completionTasks.add(completionTask);
if (completionThread.get() == null) {
connection.getCompletionExecutor().execute(this::processCompletions);
}
}

int acknowledgementMode() {
return acknowledgementMode;
}
Expand Down Expand Up @@ -372,15 +407,14 @@ protected boolean shutdown(Throwable cause) throws JMSException {
if (cause == null) {
cause = new JMSException("Session closed remotely before message transfer result was notified");
}

getCompletionExecutor().execute(new FailOrCompleteAsyncCompletionsTask(JmsExceptionSupport.create(cause)));
getCompletionExecutor().shutdown();
}

try {
getCompletionExecutor().awaitTermination(connection.getCloseTimeout(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.trace("Session close awaiting send completions was interrupted");
asyncProcessCompletion(new FailOrCompleteAsyncCompletionsTask(JmsExceptionSupport.create(cause)), true);
final CountDownLatch completed = new CountDownLatch(1);
try {
asyncProcessCompletion(completed::countDown, true);
completed.await(connection.getCloseTimeout(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.trace("Session close awaiting send completions was interrupted");
}
}

try {
Expand Down Expand Up @@ -447,7 +481,7 @@ JmsMessageProducer producerClosed(JmsProducerInfo resource, Throwable cause) {

try {
if (producer != null) {
getCompletionExecutor().execute(new FailOrCompleteAsyncCompletionsTask(
asyncProcessCompletion(new FailOrCompleteAsyncCompletionsTask(
producer.getProducerId(), JmsExceptionSupport.create(cause)));
producer.shutdown(cause);
}
Expand Down Expand Up @@ -1194,31 +1228,6 @@ Executor getDispatcherExecutor() {
return exec;
}

private ExecutorService getCompletionExecutor() {
ThreadPoolExecutor exec = completionExcecutor;
if (exec == null) {
synchronized (sessionInfo) {
exec = completionExcecutor;
if (exec == null) {
exec = createExecutor("completion dispatcher", completionThread);

// Ensure work thread is fully up before allowing other threads
// to attempt to execute on this instance.
Future<?> starter = exec.submit(() -> {});
try {
starter.get();
} catch (InterruptedException | ExecutionException e) {
LOG.trace("Completion Executor starter task failed: {}", e.getMessage());
}

completionExcecutor = exec;
}
}
}

return exec;
}

private ThreadPoolExecutor createExecutor(final String threadNameSuffix, AtomicReference<Thread> threadTracker) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new QpidJMSThreadFactory("JmsSession ["+ sessionInfo.getId() + "] " + threadNameSuffix, true, threadTracker));
Expand Down Expand Up @@ -1391,19 +1400,19 @@ public void onInboundMessage(JmsInboundMessageDispatch envelope) {
}

protected void onCompletedMessageSend(final JmsOutboundMessageDispatch envelope) {
getCompletionExecutor().execute(new AsyncCompletionTask(envelope));
asyncProcessCompletion(new AsyncCompletionTask(envelope));
}

protected void onFailedMessageSend(final JmsOutboundMessageDispatch envelope, final Throwable cause) {
getCompletionExecutor().execute(new AsyncCompletionTask(envelope, cause));
asyncProcessCompletion(new AsyncCompletionTask(envelope, cause));
}

protected void onConnectionInterrupted() {
transactionContext.onConnectionInterrupted();

// TODO - Synthesize a better exception
JMSException failureCause = new JMSException("Send failed due to connection loss");
getCompletionExecutor().execute(new FailOrCompleteAsyncCompletionsTask(failureCause));
asyncProcessCompletion(new FailOrCompleteAsyncCompletionsTask(failureCause));

for (JmsMessageProducer producer : producers.values()) {
producer.onConnectionInterrupted();
Expand Down

0 comments on commit 6814114

Please sign in to comment.