Skip to content

Commit

Permalink
QPIDJMS-552 just process single completion task on processCompletions
Browse files Browse the repository at this point in the history
  • Loading branch information
franz1981 committed Nov 2, 2021
1 parent 89c026a commit aba2e7f
Showing 1 changed file with 19 additions and 28 deletions.
47 changes: 19 additions & 28 deletions qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -199,38 +199,29 @@ public void onPendingFailure(ProviderException cause) {
}

private void processCompletions() {
for (; ; ) {
assert processCompletion.get();
completionThread = Thread.currentThread();
try {
Runnable completionTask;
while ((completionTask = completionTasks.poll()) != null) {
try {
completionTask.run();
} catch (Throwable t) {
LOG.debug("errored on processCompletions duty cycle", t);
}
assert processCompletion.get();
completionThread = Thread.currentThread();
try {
final Runnable completionTask = completionTasks.poll();
if (completionTask != null) {
try {
completionTask.run();
} catch (Throwable t) {
LOG.debug("errored on processCompletions duty cycle", t);
}
} finally {
completionThread = null;
processCompletion.set(false);
}
if (completionTasks.isEmpty()) {
return;
}
// a racing asyncProcessCompletion has won: no need to fire a continuation
if (!processCompletion.compareAndSet(false, true)) {
return;
}
if (connection.getCompletionExecutorService() == null) {
// an exclusive per-session executor doesn't need to guarantee fair completion processing
// and can just keep on processing completion on this same thread
continue;
}
// guarantees completions belonging to other sessions to be processed
getCompletionExecutor().execute(this::processCompletions);
} finally {
completionThread = null;
processCompletion.set(false);
}
if (completionTasks.isEmpty()) {
return;
}
// a racing asyncProcessCompletion has won: no need to fire a continuation
if (!processCompletion.compareAndSet(false, true)) {
return;
}
getCompletionExecutor().execute(this::processCompletions);
}

private void asyncProcessCompletion(final Runnable completionTask) {
Expand Down

0 comments on commit aba2e7f

Please sign in to comment.