Skip to content

Commit

Permalink
Using GrowBeforeQueueThreadPoolExecutor as the threadPool
Browse files Browse the repository at this point in the history
  • Loading branch information
xcaostagit committed Sep 15, 2015
1 parent 4a34d89 commit fcf7f36
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package org.gearvrf.utility;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class GrowBeforeQueueThreadPoolExecutor extends ThreadPoolExecutor {

private static final int NUM_CPUS = Runtime.getRuntime().availableProcessors();

private int userSpecifiedCorePoolSize;
private int taskCount;
private Object syncObj = new Object();

public GrowBeforeQueueThreadPoolExecutor(final String prefix) {
this(
/* core size */Math.min(2, NUM_CPUS),
/* max size */Math.max(Math.min(2, NUM_CPUS), 2 * NUM_CPUS),
/* idle timeout */60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(1);

@Override
public final Thread newThread(final Runnable r) {
return new Thread(r, prefix + "-" + threadNumber.getAndIncrement());
}
});
}

/*package*/ GrowBeforeQueueThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {

super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
userSpecifiedCorePoolSize = corePoolSize;
}

@Override
public void execute(Runnable runnable) {
synchronized (syncObj) {
taskCount++;
setCorePoolSizeToTaskCountWithinBounds();
}
super.execute(runnable);
}

@Override
protected void afterExecute(Runnable runnable, Throwable throwable) {
super.afterExecute(runnable, throwable);
synchronized (syncObj) {
taskCount--;
setCorePoolSizeToTaskCountWithinBounds();
}
}

private void setCorePoolSizeToTaskCountWithinBounds() {
int threads = taskCount;
if (threads < userSpecifiedCorePoolSize) {
threads = userSpecifiedCorePoolSize;
}
if (threads > getMaximumPoolSize()) {
threads = getMaximumPoolSize();
}
setCorePoolSize(threads);
}
}
50 changes: 25 additions & 25 deletions GVRf/Framework/src/org/gearvrf/utility/Threads.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public abstract class Threads {

/**
* Execute a Runnable on a thread pool thread
*
*
* @param priority
* The thread priority. Be careful with this! <blockquote>
* <b>Note:</b> Use Thread.MIN_PRIORITY..Thread.MAX_PRIORITY
Expand All @@ -81,7 +81,7 @@ public abstract class Threads {
* - by using spawn (and, hence the thread pool) there is at
* least a chance that you will be reusing a thread, thus saving
* teardown/startup costs.
*
*
* @return A Future<?> that lets you wait for thread completion, if
* necessary
*/
Expand Down Expand Up @@ -127,13 +127,13 @@ public void run() {
* Additionally, spawnHigh() - which should be used very sparingly - starts
* a background task that runs at a higher priority than other background
* tasks, though still lower than the GUI.)
*
*
* @param threadProc
* The code to run. It doesn't matter if this code never returns
* - by using spawn (and, hence the thread pool) there is at
* least a chance that you will be reusing a thread, thus saving
* teardown/startup costs.
*
*
* @return A Future<?> that lets you wait for thread completion, if
* necessary
*/
Expand All @@ -150,13 +150,13 @@ public static Future<?> spawn(final Runnable threadProc) {
* the lowest possible priority. Additionally, spawnHigh() - which should be
* used very sparingly - starts a background task that runs at a higher
* priority than other background tasks, though still lower than the GUI.)
*
*
* @param threadProc
* The code to run. It doesn't matter if this code never returns
* - by using spawn (and, hence the thread pool) there is at
* least a chance that you will be reusing a thread, thus saving
* teardown/startup costs.
*
*
* @return A Future<?> that lets you wait for thread completion, if
* necessary
*/
Expand All @@ -173,13 +173,13 @@ public static Future<?> spawnLow(final Runnable threadProc) {
* priority. Additionally, spawnHigh() - which should be used very sparingly
* - starts a background task that runs at a higher priority than other
* background tasks, though still lower than the GUI.)
*
*
* @param threadProc
* The code to run. It doesn't matter if this code never returns
* - by using spawn (and, hence the thread pool) there is at
* least a chance that you will be reusing a thread, thus saving
* teardown/startup costs.
*
*
* @return A Future<?> that lets you wait for thread completion, if
* necessary
*/
Expand All @@ -201,13 +201,13 @@ public static Future<?> spawnIdle(final Runnable threadProc) {
* to write the downsized Bitmap to cache. Meanwhile, the new save thread
* has higher priority than other load threads, so the new Bitmap doesn't
* stay in memory for a long time.
*
*
* @param threadProc
* The code to run. It doesn't matter if this code never returns
* - by using spawn (and, hence the thread pool) there is at
* least a chance that you will be reusing a thread, thus saving
* teardown/startup costs.
*
*
* @return A Future<?> that lets you wait for thread completion, if
* necessary
*/
Expand All @@ -220,7 +220,7 @@ public static Future<?> spawnHigh(final Runnable threadProc) {
* priority. This allows you to start a background thread, and later do a
* <code><i>join()</i></code> (<i>i.e.</i>, block until the thread is done)
* to get the thread's output.
*
*
* @param <T>
* The type-parameter of the Callable and the returned Future
* @param callable
Expand All @@ -232,14 +232,14 @@ public static <T> Future<T> spawn(Callable<T> callable) {
return threadPool.submit(callable);
}

private static ExecutorService threadPool = Executors.newCachedThreadPool();
private static ExecutorService threadPool = new GrowBeforeQueueThreadPoolExecutor("gvrf");

/**
* By default, the spawn() methods use their own
* {@link Executors#newCachedThreadPool()}. This method allows you to use a
* different thread pool, if the cached thread pool provides the wrong
* semantics or if you want to use a single thread pool for the whole app.
*
*
* <p>
* Note that calling this method will have <em>no effect</em> on any threads
* started on the existing thread pool; they will run to completion in a
Expand All @@ -260,7 +260,7 @@ public static void setThreadPool(ExecutorService newThreadPool) {
* The spawn() methods use a {@link Executors#newCachedThreadPool()}; this
* method provides access to that thread pool, so that other parts of the
* app can use the pool without having to call one of the spwan() methods.
*
*
* <p>
* Note that any prior call to {@link #setThreadPool(ExecutorService)} will
* affect the result of this method!
Expand All @@ -284,7 +284,7 @@ public interface Cancelable extends Runnable {

/**
* Manages pool of pending threads, deciding which should run next.
*
*
* Used in conjunction with a ThreadLimiter, which handles synchronization:
* implementations generally don't need to pay attention to thread safety.
*/
Expand All @@ -294,7 +294,7 @@ public interface ThreadPolicyProvider<CANCELABLE extends Cancelable> {

/**
* Are there any thread procs in the pool?
*
*
* May use {@link Cancelable#stillWanted()} to trim pool.
* */
boolean isEmpty();
Expand All @@ -307,7 +307,7 @@ public interface ThreadPolicyProvider<CANCELABLE extends Cancelable> {

/**
* Make threadProc the next in the pool.
*
*
* <ul>
* <li>If threadProc is already in the pool, move it to the front, so
* get() returns it next.
Expand All @@ -320,10 +320,10 @@ public interface ThreadPolicyProvider<CANCELABLE extends Cancelable> {

/**
* Limits the number of (thread pool) threads running at any one time.
*
*
* A stand-alone class, as different subsystems may have different limits,
* based on the resources their threads consume.
*
*
* Each subsystem that instantiates a thread limiter must supply an
* implementation of {@link ThreadPolicyProvider}, or use the standard
* {@link LifoThreadPolicyProvider}.
Expand Down Expand Up @@ -351,7 +351,7 @@ public static class ThreadLimiter<CANCELABLE extends Cancelable> {
/**
* Create a new thread limiter, with a
* {@link #DEFAULT_SLOW_THREAD_TIMEOUT default slow-thread timeout}.
*
*
* @param maxThreads
* Maximum number of threads that can run at once.
* @param policy
Expand All @@ -365,7 +365,7 @@ public ThreadLimiter(int maxThreads,

/**
* Create a new thread limiter.
*
*
* @param maxThreads
* Maximum number of threads that can run at once.
* @param policy
Expand Down Expand Up @@ -395,7 +395,7 @@ public ThreadLimiter(int maxThreads,

/**
* Run a thread proc, on a thread from the system thread pool.
*
*
* Thread will run immediately, if possible. If the maximum number of
* threads are already running, the thread proc will be added to a pool,
* and scheduling will be determined by the ThreadPolicyProvider.
Expand Down Expand Up @@ -580,7 +580,7 @@ private void setRunnable(Runnable runnable) {

/**
* Remove this thread from the timeout queue
*
*
* @return {@code true} if the thread was still in the queue;
* {@code false} if the thread has timed out
*/
Expand Down Expand Up @@ -623,7 +623,7 @@ public int compareTo(ThreadTimeouts other) {
* Provides LIFO thread policy management: Most recently added thread proc
* will run next. Appropriate for image galleries, say, where most recent
* request is most likely to be visible.
*
*
* Never cancels a request!
*/
public static class LifoThreadPolicyProvider implements
Expand Down Expand Up @@ -693,7 +693,7 @@ public static long threadId() {
* This is an assertion method that can be used by a thread to confirm that
* the thread isn't already holding lock for an object, before acquiring a
* lock
*
*
* @param object
* object to test for lock
* @param name
Expand Down

0 comments on commit fcf7f36

Please sign in to comment.