Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

QPIDJMS-552 JMS 2 Completion threads shouldn't scale with the number of sessions #44

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

franz1981
Copy link
Contributor

@franz1981 franz1981 commented Oct 26, 2021

This implements an elastic per-connection thread pool that allows session completions to be consumed in order (as now) while saving create a dedicated thread per session.

In a shared connection use case, multiple sessions can benefit from re-using the same thread to handle their completions, reducing dramatically both wake-up cost and (native) memory usage.

In the case where each session is using its own connection, it would behave as the previous implementation.

@franz1981
Copy link
Contributor Author

@tabish121 @gemmellr I'm going to share 2 flamegraph to show how this PR change the way completions are processed with some shared connections and many sessions per-connection.

@franz1981
Copy link
Contributor Author

franz1981 commented Oct 26, 2021

This PR is still in draft because of:

  • no JIRA yet: still waiting maintainers feedbacks
  • add a configuration option to cap the max numbers of threads used in the elastic pool (maybe turning it to be just one too, in case users knows that the completion handler isn't going to perform any heavy work and can be shared among sessions on the same connection)
  • add a configuration option to configure keep alive time of the elastic pool

@franz1981
Copy link
Contributor Author

franz1981 commented Oct 26, 2021

These are the results of this change.
Running a benchmark with:

  • 10 non-durable producer sessions
  • 5 sessions per connection
  • 2 connections handled by 2 separate threads (ie each thread is handling exclusively 5 producer sessions using the same connection)
  • 100 bytes messages
  • max 10 in-flight completions per producer

With main:

 -> TEST        12,040  msg/sec
 -> TEST        12,340  msg/sec
 -> TEST        12,081  msg/sec
 -> TEST        12,367  msg/sec
 -> TEST        11,971  msg/sec
 -> TEST        12,323  msg/sec
 -> TEST        12,026  msg/sec
 -> TEST        12,267  msg/sec
 -> TEST        12,120  msg/sec
 -> TEST        12,223  msg/sec
 -> *   121,764 msg/sec

with this PR:

 -> TEST        15,695  msg/sec
 -> TEST        15,441  msg/sec
 -> TEST        15,681  msg/sec
 -> TEST        15,455  msg/sec
 -> TEST        15,704  msg/sec
 -> TEST        15,451  msg/sec
 -> TEST        15,683  msg/sec
 -> TEST        15,485  msg/sec
 -> TEST        15,676  msg/sec
 -> TEST        15,436  msg/sec
 -> *   155,713 msg/sec

TLDR

121,764 msg/sec vs 155,713 msg/sec

The behaviour is, for main:
image
in violet, the 10 completion threads (1 per session), consuming 40 * 10 samples = 400 samples -> 40% of a single core

this PR:
image
in violet, the 2 completion threads, (1 per connection), consuming 71 * 2 samples = 142 samples -> ~14% of a single core

The other threads perform nearly the same amount of work, meaning that sharing connections now save a considerable amount of CPU time and memory too

@franz1981 franz1981 force-pushed the completion_elastic_pool branch from 4d3c222 to 9c78965 Compare October 26, 2021 15:13
@franz1981 franz1981 changed the title TBD-JIRA per-connection elastic completion pool QPIDJMS-552 JMS 2 Completion threads shouldn't scale with the number of sessions Oct 26, 2021
@franz1981 franz1981 marked this pull request as ready for review October 26, 2021 15:17
@franz1981
Copy link
Contributor Author

There're still something to be considered while using this processing model in case JMS Sessions are used on different threads AND share the same connection:

  • on main the Netty thread handling the shared connection issue completion events vs exclusive single threaded thread pools
  • on this PR the Netty thread handling the shared connection issue completion events vs a shared thread pool

If the completions event processing is heavy weight, issuing completion events vs a shared thread pool shouldn't be the bottleneck, but each completion thread processing, while if completion event processing is light-weight, reusing the same completion thread(s), kept busy as much as possible, reduce the amount of context switches and native resources used.

@@ -71,6 +69,7 @@
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;

import io.netty.util.internal.PlatformDependent;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like it using Netty internal APIs.

Copy link
Contributor Author

@franz1981 franz1981 Oct 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just a shortcut to save checking for Unsafe existence: JCTools queues are good performers (and mostly GC free) but need to decide if use the Atomic or Unsafe variant based on Unsafe class presence; using this Netty util class save checking it by ourself, but can be changed to perform some explicit check too

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I guess it maybe also depends on how likely it is to change. I dont particularly want to depend on or shade JCTools, but on the other hand this has been clearly marked as internal API which would typically mean dont use.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case we can just use a CLQ, although it means creating a new Node for each completion event, that seems a waste to me given that we have a rare and nice single consumer use case here, but I understand that adding a JCTools dep just for this isn't that good the same...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I not particularly fond of using this internal Netty API which has not guarantee of being a stable public facing API

} finally {
completionThread.set(null);
}
} while (!completionTasks.isEmpty());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The queue used was explicitly single-consumer, to which its doc adds a "one thread!" note. This line will be potentially executable concurrently (with both itself, and with the poll in the loop) since the thread ref is nulled before the loop check, meaning a second (or more) thread can technically get into the loop as another completes and potentially also then exit it and have more than one at this point too. Can the queue handle that?

Copy link
Contributor Author

@franz1981 franz1981 Oct 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isEmpty is safe to be called by many threads and ensure that a subsequent (single threaded) poll won't return null in case false (we have a good coverage for this on JCTools)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So to be clear, the "one thread!" API doc is overstated, its happy with isEmpty() concurrent with itself and with polls, so long as the poll is done with one thread at a time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isEmpty can be called by any threads concurrently without restrictions, but poll has to be called in a single threaded fashion ie one thread "at time" or just the same thread

The use case of isEmpty is exactly for this type of duty cycle loop (very common in Akka actors mailboxes, that use JCTools), which can be executed by any thread, but serially, and is used to detect if it worth to "continue" draining the mailbox.

@@ -215,6 +259,17 @@ public void close() throws JMSException {
connectionConsumer.shutdown();
}

final ThreadPoolExecutor completionExecutor = this.completionExecutor;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for newline here, its all one unit.

// it can grow "unbounded" to serve multiple concurrent session completions:
// in reality it is bounded by the amount of concurrent completion requests
exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5, TimeUnit.SECONDS, new LinkedTransferQueue<>(),
new QpidJMSThreadFactory("JmsConnection ["+ connectionInfo.getId() + "] completion dispatcher", connectionInfo.isUseDaemonThread()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These threads will all end up having the same name when operating even if there are many at once, and as they come and go over time, which could be confusing. Adding some kind of index might be an idea, though I guess that could also look odd if it just always increases if they come and go (and the thread factory would need changed to accommodate it, its aimed at single thread creation)

Previously they were named based on their individual session. It might be good to at least reinstate that name while operating for a given session?

I think I might be inclined to have core size be 1. The executor wont be created until it is needed during cleanup or if they use completions in which case it is needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously they were named based on their individual session. It might be good to at least reinstate that name while operating for a given session?

You're right, indeed it was nice to have a good name to find them both on stack trace and while profiling, let me think how to improve this

If the user doesn't use anymore JMS 2 features in same connection and close any existing session that use completions, I expect the pool to release any existing completion thread (at some point), but as you rightly said, it's a pretty weird an unhappy use case

if (!completionThread.compareAndSet(null, Thread.currentThread())) {
return;
}
try {

This comment was marked as outdated.

@gemmellr
Copy link
Member

So, one 'slight' problem with this hehe. It doesnt do at all what is described or I think both of us initially thought it would be doing. Digging into the executor behaviour, rather than it potentially being elastic up to n threads its instead binary where it has either 0 or 1 thread.

It only looks to create new threads if there are none, or if it is under the max number when it cant insert into the task queue, which it always can and so it only creates a first thread and then always uses that after (ignoring idle timeout). Essentially the current impl serializes all completions for all sessions on the connection onto 1 thread, and would halt completions across all sessions if the thread were ever used to do any kind of longer lasting / blocking work during any given completion callback, which is entirely allowed and so that isnt acceptable.

A search confirms this as expected executor behaviour. To have it behave otherwise you have to do stuff like play around with things like pretending the task queue queue is full, extending the rejected execution handler, or having it create core threads but allowing core threads to also timeout as well.

@franz1981
Copy link
Contributor Author

franz1981 commented Oct 27, 2021

Oops my bad, I should have used a synchronous queue instead of a linked blocking queue and it should behave as expected

I would run the bench again to be sure it still reuse the same thread

@franz1981 franz1981 force-pushed the completion_elastic_pool branch from 9c78965 to 6814114 Compare October 28, 2021 07:33
@franz1981
Copy link
Contributor Author

Oops my bad, I should have used a synchronous queue instead of a linked blocking queue and it should behave as expected

And I was wrong, despite the 0-capacity synchronous q should behave as expected, it seems to create more thread then expected, I'm going to change a bit the logic to see if it get better

@gemmellr
Copy link
Member

I dont believe it will ever work the way you want with a simple queue of any size. You have to add hoops to manipulate it into behaving that way. E.g by pretending the queue is full when you want it to create threads, and inserting into the queue (directly or via the rejected execution handler) when you dont want more threads.

The docs for ThreadPoolExecutor covers the built in behaviour: https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/ThreadPoolExecutor.html

@franz1981
Copy link
Contributor Author

franz1981 commented Oct 28, 2021

I dont believe it will ever work the way you want with a simple queue of any size. You have to add hoops to manipulate it into behaving that way

A synchrnous queue (that's not a real queue, actually, but more a randevouz point for threads handoff) works exactly like that, but I've made a mistake about how completions are submitted...doc says

Direct handoffs. A good default choice for a work queue is a SynchronousQueue 
that hands off tasks to threads without otherwise holding them. Here, an attempt 
to queue a task will fail if no threads are immediately available to run it, 
so a new thread will be constructed. 

Let me think about it a bit more and I see if this PR can be closed or just need to be refined to achieve the expected behaviour

but I've made a mistake about how completions are submitted

It means that if the Netty event loop the connection belong is going to submit completions for 2 different sessions (eg let's say 1 and 2) and session 1 is already processing completions:

  • incoming completions vs session 1 should add them to the session completion queue without bothering the thread pool, because there's already a running completion thread handling them
  • new session 2 completions won't find any completion thread awake and would submit the request to the thread pool
  • the synchronous queue is built to reject any offer if there is no thread idle/awaiting on take, that means that the thread pool create a new one to handle session 2 completions or reuse any existing one (without rejecting the task offer)

It means that under heavy load completions can land so fast that we still have 2 separate threads handling the 2 sessions completion, that's not exactly what I want

@gemmellr
Copy link
Member

Sure. However there is a fairly decent window that will happen even with 1 session with how it is done just now. Consider where it will often execute a second task for the same session, unless a previous already began, got back into the session, set the thread ref, and was still inside the inner poll loop before nulling the ref, while a next message drops. Especially since the worker nulls the thread ref before checking if the tasks are not-empty again and going back around the outer loop, where one of those other executions on another threads may have already got in, I think it likely even a single session will bounce between threads with all that happening. I absolutely expect 2 sessions would often use different threads at times for similar reasons since there will be multiple threads primed and waiting.

@franz1981
Copy link
Contributor Author

Yep, probably what I really wanted is to save these threads to be created as a whole and let the completions be handled directly on the event loop threads (or on some externally provided thread pool from user): the JMS 2 API seems to miss it, while it's a key point to save resources if the user is aware that processing completions is a lightweight op...any idea how it would be possible to achieve something like that?

@gemmellr
Copy link
Member

Using the event loop is out of the question, it runs the inner core and the callback is allowed to do most stuff on the outer connection that it services. Disaster waiting to happen.

I think optimising it by default may be more trouble than it is worth the more I think about it. Adding the option to e.g use a connection-wide pool of a given fixed size (and where the user is aware that blocking the callbacks in any significant way will cause cross-session completion starvation if they have more sessions) might be an...option. Or the factory has an extension mechanism that could be used to supply an actual pool.

@franz1981
Copy link
Contributor Author

Adding the option to e.g use a connection-wide pool of a given fixed size (and where the user is aware that blocking the callbacks in any significant way will cause cross-session completion starvation if they have more sessions) might be an...option. Or the factory has an extension mechanism that could be used to supply an actual pool.

+100

I like this: it can still be pretty valuable in constrained environment...let's speak/let me find an appropriate way to expose it and I'm opened to any suggestion here

@gemmellr
Copy link
Member

Alternatively, the connection already has an executor of its own (used for the exception listener etc to avoid blocking the event loop), an option could just be to use that for completions. For the factory extension bits follow JmsConnectionFactory.setExtension(String, BiFunction<Connection, URI, Object>).

@franz1981 franz1981 force-pushed the completion_elastic_pool branch from 6814114 to d84917c Compare October 28, 2021 13:58
@franz1981
Copy link
Contributor Author

franz1981 commented Oct 28, 2021

@gemmellr

I've pushed a change that allow using a single connection executor to handle all the sessions completions, although the processing algorithm has been changed to prevent starvation of completion tasks belonging to different sessions (assuming no blocking user code ;) )

I need to add tests if the idea seems good, let me know

For the factory extension bits follow JmsConnectionFactory.setExtension(String, BiFunction<Connection, URI, Object>).

Let me take a look, I'm still unhappy that the single session per connection use case doesn't have yet a proper solution here unless I'll use the JmsConnectionFactory to inject a thread pool that allow handling completions belonging to different session/connection...

@franz1981 franz1981 force-pushed the completion_elastic_pool branch from d84917c to 335bc17 Compare October 28, 2021 14:05
@franz1981
Copy link
Contributor Author

franz1981 commented Oct 28, 2021

@gemmellr I'm going to send a separate commit that handle this same issue in a different way, similar to what #45 has done
ie completions are handled by a shared fork join pool (capable of work stealing) that will be shutdown deterministically when no JmsSession(s) are opened anymore (and reopened on demand).

Using a ForkJoin executor:

  • enable work-stealing (and better load balancing, better explained below)
  • better handle contention while submitting from multiple threads

A different way to handle this should be to create a shared (reference counted) event loop group and assign a different event loop executor in round-robin to each session (that need completion processing), with some good points:

  • each session would have a single threaded executor (no need to have a separate concurrent queue to handle completions)
  • completion thread won't change during the session lifecycle

And an important drawback:

  • no runtime load balancing: if an executor is shared by many busy sessions (processing many completions), an unlucky session won't have any other Thread helping to process its completions, despite the other event loops are idle

@gemmellr
Copy link
Member

I'm not sure I like the idea of taking similar approach as 45, which I hadnt seen to look at yet (can only keep up with so many different large and invasive changes while trying to do other stuff). I havent even looked at this version yet.

I do specifically want to avoid a default case where users need to end up tweaking thread counts because an entirely legitimate blocking workload is drowning out other stuff happening though. Seems like there are 2 different routes for that to happen there (+based on seeing that happen elsewhere this is done).

@franz1981
Copy link
Contributor Author

franz1981 commented Oct 28, 2021

I do specifically want to avoid a default case where users need to end up tweaking thread counts because an entirely legitimate blocking workload is drowning out other stuff happening though. Seems like there are 2 different routes for that to happen there (+based on seeing that happen elsewhere this is done).

The default use case will remain the same as now ie one thread w its executor service per session, but we can still give the option for users that know that completions processing won't be blocking (or accept the trade-offs of slow completion processing) and they would be handled by a shared thread pool (configurable) that would auto-dispose when no JMS sessions requires it anymore. This would just keep the number of completion threads under control while fairly load balancing completion processing (FJ pool is pretty good at work-stealing) ie a blocking processing won't stop other completions processing until all completions threads are blocked.

@gemmellr
Copy link
Member

Ok. It seemed like you wanted to make it the default.

@franz1981
Copy link
Contributor Author

franz1981 commented Oct 28, 2021

Ok. It seemed like you wanted to make it the default.

I didn't said it clearly on the PR, but I don't want to break existing users code :)
#45 is using a ref counted approach that would GC the shared "resource" after all JMSSession that need it have been closed; I'm opened to other approaches, but it's not clear to me (yet) which class should decide the lifecycle of the shared completion thread pool, any idea is welcome eg connection factory finalization looks appealing (on Artemis JMS impl we do that), but I'm not a big fan of finalizers :P

@gemmellr
Copy link
Member

There will definitely be no finalizers :) I think the connection is the most likely choice for a shared thing enabled by config.

@franz1981
Copy link
Contributor Author

franz1981 commented Oct 28, 2021

Yep, but if people decide to use 100 connections, why they should have 100 separate completion thread pools?
I was thinking about a proper shared approach really that span across connections/sessions - sadly (JMS) connection factory doesn't have any close method :(

@gemmellr
Copy link
Member

I didnt say it would be per connection. You asked about lifecycle. Factories dont have any 'close'. Connections do. Hence..

@gemmellr
Copy link
Member

The current/second version isnt quite what I had in mind when it occurred to me the connection executor was already there. You made it a sort of hybrid still mostly like your first version, calling back into the session to process a task list there. I was essentially thinking of more what it already did but just playing with the executor used when firing things off. I expect you wanted the task list for 'aggregating efficiency' though, albeit that does then introduce fairness issues you had to account for a little, though they are still there. Since you dont like this approach anyway, lets see what your alternative is like.

@franz1981
Copy link
Contributor Author

franz1981 commented Nov 2, 2021

I've sent a change that it's introducing HolderSuppliers (bad name, so happy to receive feedbacks/suggestions) and SharedDisposable as a util that could be used on #45 to handle shared disposable/pre-configured reference counted resources.

Some points re this change:

  1. thanks to FJ work stealing having a thread busy/blocked processing completions isn't a big deal as long as other FJ threads can handle other sessions completions
  2. fairness isn't still "right": a perfectly fair system should just process a single completion and continue on the FJ pool, to guarantee other completions to get their chance to get executed; I've chosen to execute burst of completions to amortize the "virtual context switch" cost, improving locality, but in theory would be better to cap (time based) for how much time a burst of completions should keep a single thread busy (a good number could be the N * default timeslack_ns of linux ie N* 50 us)

It can now save creating a number of completion threads that is bounded to the number of sessions, just by adding completionThreads=N to the qpid url

This is one run of the previous benchmarks with completionThreads=2, in violet, the FJ pool threads in action, with 89 + 88 = 177 samples:
image

 -> TEST        16,959  msg/sec
 -> TEST        17,391  msg/sec
 -> TEST        16,986  msg/sec
 -> TEST        17,377  msg/sec
 -> TEST        16,997  msg/sec
 -> TEST        17,415  msg/sec
 -> TEST        16,978  msg/sec
 -> TEST        17,379  msg/sec
 -> TEST        16,987  msg/sec
 -> TEST        17,385  msg/sec
 -> *   171,857 msg/sec

@franz1981
Copy link
Contributor Author

I see that a version that just handle one completion task at time to guarantee FJ to handle fairness of completions isn't working that bad, really, but I need to perform some more tests on it, code-wise it means:

    private void processCompletions() {
        assert processCompletion.get();
        completionThread = Thread.currentThread();
        try {
            final Runnable completionTask = completionTasks.poll();
            if (completionTask != null) {
                try {
                    completionTask.run();
                } catch (Throwable t) {
                    LOG.debug("errored on processCompletions duty cycle", t);
                }
            }
        } finally {
            completionThread = null;
            processCompletion.set(false);
        }
        if (completionTasks.isEmpty()) {
            return;
        }
        // a racing asyncProcessCompletion has won: no need to fire a continuation
        if (!processCompletion.compareAndSet(false, true)) {
            return;
        }
        getCompletionExecutor().execute(this::processCompletions);
    }

@gemmellr
Copy link
Member

gemmellr commented Nov 2, 2021

I think single completion at a time is the way to go, at least by default, it just shouldn't really be a concern when using it whether the callback execution is fair or not.

Copy link
Member

@gemmellr gemmellr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My heads spinning after trying to look at this. I'm not at all sure it actually fully does what was intended / I understood you intended. If it does I've definitely missed something due to aforementioned spinning. It definitely seems overly elaborate either way.

Out of curiosity, have you tested what the impact is on the default (i.e existing) behaviour as well, rather than just the tweaked/configured behaviour?

@@ -151,6 +155,7 @@ public void run() {

this.connectionInfo = connectionInfo;
this.connectionInfo.setConnection(this);
this.completionExecutorService = this.connectionInfo.getCompletionExecutorServiceFactory().map(Supplier::get).orElse(null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling the field e.g 'sharedCompletionExecutorHolder' would make its use clearer.

Similarly the getCompletionExecutorServiceFactory feels a bit lengthy, and not reflective that it only covers the shared one, e.g getSharedCompletionExecutorFactory.

Actually, it feels like most of this line could be done up front, either in the ConnectionFactory or inside the ConnectionInfo, such that this line was a simple getSharedCompletionExecutorHolder that either returns null (as it will by default) or not.

@@ -106,7 +109,7 @@
private final AtomicBoolean started = new AtomicBoolean();
private final AtomicReference<Exception> failureCause = new AtomicReference<>();
private final JmsConnectionInfo connectionInfo;
private final ThreadPoolExecutor executor;
protected final ThreadPoolExecutor executor;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need exposed?

@@ -591,6 +613,14 @@ public long getConnectTimeout() {
return this.connectTimeout;
}

public void setCompletionThreads(final int completionThreads) {
Copy link
Member

@gemmellr gemmellr Nov 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd go with SharedCompletionThreads to emphasize its shared, given its both same and cross-connection sharing.

EDIT: actually, is it? I'm not seeing where it would actually share? A new SharedDisposable looks to be made afresh each time, and has no statics to share across connections, which would seem to mean its just a bigger per-connection pool.

Copy link
Contributor Author

@franz1981 franz1981 Nov 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EDIT: actually, is it? I'm not seeing where it would actually share?

The completion factory create a singleton instance of sharedRefCnt of ForkJoinPool that allows sharing the same FJ pool unless every connection that reference it get closed.
If that happen, the last one would dispose it, leaving incoming connections (if any) able to create a new one, similarly to the shared event loop group of #45

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I had forgotten that bit in the ConnectionFactory by the time my head spun around from all the other bits, so it is indeed doing per-factory sharing.

Its still not what I expected you to do, given it means that every factory would result in its own pool. Folks can have more than one factory, rather than just creating all connections on a single given factory, and from prior discussion it seems like you really cared about all the different connections sharing, so I rather expected you to do all-connection sharing when they were configured with this option.

That would introduce a similar issue to the other JIRA/PR, where people might have different configurations that then needs different pools here...but that seems like it would be easily solved in a pretty similar way, effectively just a map of different setups to match against.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would introduce a similar issue to the other JIRA/PR, where people might have different configurations that then needs different pools here...but that seems like it would be easily solved in a pretty similar way, effectively just a map of different setups to match against.

This can be done creating some "leaky" (by purpose) singleton shared map with per-configuration pool, but is it a valid use case?
Or the typical use case is to have a single factory per-application?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've seen both approaches. Neither is unusual. It really just depends what they are doing.

E.g if they are configuring ClientID's on the factory, they can necessarily only create 1 connection [at a time] with that factory, and might have more than one. Different frameworks, different config.

I would typically expect them to use the same config for options like this even if they do...but, really nothing requires they do that, and it may not always be in their control that they do, so it should work if they dont. Same as comments on the other PR, really.

Comment on lines 386 to 387
QpidJMSForkJoinWorkerThreadFactory fjThreadFactory = new QpidJMSForkJoinWorkerThreadFactory("completion thread pool", true);
completionExecutorServiceFactory = sharedRefCnt(() -> new ForkJoinPool(completionThreads, fjThreadFactory, null, false), ThreadPoolUtils::shutdown);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesnt feel like we need a supplier of a holder of an executor. Just a holder.

I assume thats to try and avoid creating it? Since it seems somewhat implicit that if you set the option to get this shared executor behaviour, then you actually want it and thus can be expected to need it, its not really clear to me its worth the extra mechanics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume thats to try and avoid creating it?

Given that we cannot rely on finalization of connection factory, I cannot pre-allocate it if there are no actual "users" ie connections. And I would like it to be correctly disposed and shutdown while every connection belonging to the connction factory got closed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant was it an attempt to avoid creating the pool until the completion executor is used. Not 'avoid creating it' in terms of pre-allocating it with the factory.

This bit is only used after the point createConnection has been called, and only if the option was set. I think its reasonable to assume the pool is to be used from then given its an explicitly set option, and I dont see anyone ever setting it unless they want the behaviour.

Given that, the mechanism still all seems rather overcomplicated. This feels like a relatively simple case, an 'if there is an existing pool, then use that, otherwise create one' check coupled with the opposing cleanup. One that should be relatively infrequently used. It seems like even a simple synchronized block with a count inside could do?

I'm also weighing this vast mechanism against against a change that enables supplying a pool via the factory extension mechanism, which would probably be something ridiculous like 5-10 lines in comparison.

Copy link
Contributor Author

@franz1981 franz1981 Nov 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that, the mechanism still all seems rather overcomplicated. This feels like a relatively simple case, an 'if there is an existing pool, then use that, otherwise create one' check coupled with the opposing cleanup. One that should be relatively infrequently used. It seems like even a simple synchronized block with a count inside could do?

Not sure, there's still a problem related disposing it:

  1. the shared/common pool should be allocated once and live forever?
  2. if the answer to 1 is no, how/what is going to trigger disposing it?

The mechanism I've implemented just handle this use-case using reference counting, but in order to do it, it requires someone to be the first owner while ensuring correct/deterministic release of resources that could cause the whole application/class-loader to leak

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, you would still keep track with the [connection] count and shut it down if it wasnt being used, as this does, but just without a massive amount of mechanics around it to over complicate things. It would have to be retained statically of course if wanting to share across factories...like I believe your other PR already does for example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, so you prefer a concurrent map per configuration statically referenced by each connection?
The map entries will still be sharedRefCnt pools, because I need to consider each connection to be able to concurrently ref/deref the shared pool (with N threads).
What I've implemented is already using a synchronization point while allocating the shared pool, but the concurrent mechanism is needed to ensure connections to never block while releasing, nor preventing incoming ones to acquire a new/existing pool

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. I was thinking a super simple 'get me a pool' at connection creation that bumps a count and creates a pool if needed, and an 'Im not using the pool anymore' at connection closure, that if the pool is then unused, actually shuts it down. If anotehr connection needs a pool while that is happening, it creates a new one. I'm not seeing the need for all the complexity around that which this version adds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. I was thinking a super simple 'get me a pool' at connection creation that bumps a count and creates a pool if needed

if the pool acquisition is always handled in a synchronized block, it would work, but this version I've implemented try to save blocking different connections vs the same lock if there's already an available pool.
If this is not a problem, then I can use a simpler version

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it wont happen at all by default, is a relatively infrequent thing, and should be pretty quick under synchronization, it seems like always synchronizing should be fine. Creating and destroying connections is already a very synchronous task with [multiple] remote trips, this doesn't seem like it would be making a significant difference at all.

Comment on lines +217 to +224
if (completionTasks.isEmpty()) {
return;
}
// a racing asyncProcessCompletion has won: no need to fire a continuation
if (!processCompletion.compareAndSet(false, true)) {
return;
}
getCompletionExecutor().execute(this::processCompletions);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Newline before isEmpty, its on to a different unit of the work really.

I think it would be more readable to either...combine the two 'I can just stop now' checks into a single if(empty || CAS) { return }...or instead to gate the execute call inside the if(CAS) the same way it is done in the asyncProcessCompletion method below.

Comment on lines +1258 to +1259
Future<?> starter = exec.submit(() -> {
});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leave it on a single line, its clearer.

}

private void asyncProcessCompletion(final Runnable completionTask, final boolean ignoreSessionClosed) {
if (!ignoreSessionClosed) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think the session closed check thing works. The closed boolean is set true as soon as session shutdown starts, but its possible for successfull async completions to arrive before the closure completes, simply because they are async and so is the session close, so they may be in flight and arrive before the session closure. I expect this change would mean they get could be marked as failed when they shouldnt, as the completion task would get dropped instead of running and then the fallback 'session closed, fail all outstanding completions' task would run and say they failed.

Copy link
Contributor Author

@franz1981 franz1981 Nov 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expect this change would mean they get could be marked as failed when they shouldnt

I've implemented it intending to mimic (by inlining) the reject policy of the single threaded executor: I admit i didn't put much thoughts into this, but probably can be treated differently and it's maybe introducing a slightly different semantic, but I still don't see any harm; my expectation is that when session::shutdown is called, any already submitted completion should be handled (and that's visible in the new shutdown logic using CountDownLatch), but new submissions would be processed if sent after shutdown is initiated (unless part of the shutdown logic itself ie ignoreSessionClosed == true) and ignored otherwise.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ill admit I forgot about that bit, but yes it is a little different - that is something that only occurs later, after the executor is shut down in the existing non-shared-completion case, and is handling a case that probably wont occur, and also shouldnt matter if it did because something else already cleaned up anything related to it before killing the executor.

Here, the work would start being thrown away immediatelym even before the producer itself is necessarily notified its being considered closed, and long before the executor is being shut down (which in the regular non-shared-pool case would still happen), so it could actually be a quite significant change in behaviour. I think this introduces a hole that didnt exist before.

@franz1981 franz1981 force-pushed the completion_elastic_pool branch from 4f2b203 to 1fe1760 Compare February 28, 2022 16:37
@franz1981 franz1981 force-pushed the completion_elastic_pool branch from 1fe1760 to e5fa3cb Compare February 28, 2022 16:44
@gemmellr
Copy link
Member

gemmellr commented Mar 1, 2022

Having just read them all, the existing feedback and questions (e.g impact on default case) on the code and in the discussion essentially all still seems to apply now (even some of the now-'outdated' ones), given the changes remain essentially the same as back when they were made, just minus one holder and with some class/method renames.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants