diff --git a/GVRf/Framework/src/org/gearvrf/utility/GrowBeforeQueueThreadPoolExecutor.java b/GVRf/Framework/src/org/gearvrf/utility/GrowBeforeQueueThreadPoolExecutor.java new file mode 100755 index 000000000..b971752a4 --- /dev/null +++ b/GVRf/Framework/src/org/gearvrf/utility/GrowBeforeQueueThreadPoolExecutor.java @@ -0,0 +1,71 @@ +package org.gearvrf.utility; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +public class GrowBeforeQueueThreadPoolExecutor extends ThreadPoolExecutor { + + private static final int NUM_CPUS = Runtime.getRuntime().availableProcessors(); + + private int userSpecifiedCorePoolSize; + private int taskCount; + private Object syncObj = new Object(); + + public GrowBeforeQueueThreadPoolExecutor(final String prefix) { + this( + /* core size */Math.min(2, NUM_CPUS), + /* max size */Math.max(Math.min(2, NUM_CPUS), 2 * NUM_CPUS), + /* idle timeout */60, TimeUnit.SECONDS, + new LinkedBlockingQueue(), + new ThreadFactory() { + private final AtomicInteger threadNumber = new AtomicInteger(1); + + @Override + public final Thread newThread(final Runnable r) { + return new Thread(r, prefix + "-" + threadNumber.getAndIncrement()); + } + }); + } + + /*package*/ GrowBeforeQueueThreadPoolExecutor(int corePoolSize, int maximumPoolSize, + long keepAliveTime, TimeUnit unit, + BlockingQueue workQueue, + ThreadFactory threadFactory) { + + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); + userSpecifiedCorePoolSize = corePoolSize; + } + + @Override + public void execute(Runnable runnable) { + synchronized (syncObj) { + taskCount++; + setCorePoolSizeToTaskCountWithinBounds(); + } + super.execute(runnable); + } + + @Override + protected void afterExecute(Runnable runnable, Throwable throwable) { + super.afterExecute(runnable, throwable); + synchronized (syncObj) { + taskCount--; + setCorePoolSizeToTaskCountWithinBounds(); + } + } + + private void setCorePoolSizeToTaskCountWithinBounds() { + int threads = taskCount; + if (threads < userSpecifiedCorePoolSize) { + threads = userSpecifiedCorePoolSize; + } + if (threads > getMaximumPoolSize()) { + threads = getMaximumPoolSize(); + } + setCorePoolSize(threads); + } +} diff --git a/GVRf/Framework/src/org/gearvrf/utility/Threads.java b/GVRf/Framework/src/org/gearvrf/utility/Threads.java index 2bdcfcb9b..89861539f 100755 --- a/GVRf/Framework/src/org/gearvrf/utility/Threads.java +++ b/GVRf/Framework/src/org/gearvrf/utility/Threads.java @@ -70,7 +70,7 @@ public abstract class Threads { /** * Execute a Runnable on a thread pool thread - * + * * @param priority * The thread priority. Be careful with this!
* Note: Use Thread.MIN_PRIORITY..Thread.MAX_PRIORITY @@ -81,7 +81,7 @@ public abstract class Threads { * - by using spawn (and, hence the thread pool) there is at * least a chance that you will be reusing a thread, thus saving * teardown/startup costs. - * + * * @return A Future that lets you wait for thread completion, if * necessary */ @@ -127,13 +127,13 @@ public void run() { * Additionally, spawnHigh() - which should be used very sparingly - starts * a background task that runs at a higher priority than other background * tasks, though still lower than the GUI.) - * + * * @param threadProc * The code to run. It doesn't matter if this code never returns * - by using spawn (and, hence the thread pool) there is at * least a chance that you will be reusing a thread, thus saving * teardown/startup costs. - * + * * @return A Future that lets you wait for thread completion, if * necessary */ @@ -150,13 +150,13 @@ public static Future spawn(final Runnable threadProc) { * the lowest possible priority. Additionally, spawnHigh() - which should be * used very sparingly - starts a background task that runs at a higher * priority than other background tasks, though still lower than the GUI.) - * + * * @param threadProc * The code to run. It doesn't matter if this code never returns * - by using spawn (and, hence the thread pool) there is at * least a chance that you will be reusing a thread, thus saving * teardown/startup costs. - * + * * @return A Future that lets you wait for thread completion, if * necessary */ @@ -173,13 +173,13 @@ public static Future spawnLow(final Runnable threadProc) { * priority. Additionally, spawnHigh() - which should be used very sparingly * - starts a background task that runs at a higher priority than other * background tasks, though still lower than the GUI.) - * + * * @param threadProc * The code to run. It doesn't matter if this code never returns * - by using spawn (and, hence the thread pool) there is at * least a chance that you will be reusing a thread, thus saving * teardown/startup costs. - * + * * @return A Future that lets you wait for thread completion, if * necessary */ @@ -201,13 +201,13 @@ public static Future spawnIdle(final Runnable threadProc) { * to write the downsized Bitmap to cache. Meanwhile, the new save thread * has higher priority than other load threads, so the new Bitmap doesn't * stay in memory for a long time. - * + * * @param threadProc * The code to run. It doesn't matter if this code never returns * - by using spawn (and, hence the thread pool) there is at * least a chance that you will be reusing a thread, thus saving * teardown/startup costs. - * + * * @return A Future that lets you wait for thread completion, if * necessary */ @@ -220,7 +220,7 @@ public static Future spawnHigh(final Runnable threadProc) { * priority. This allows you to start a background thread, and later do a * join() (i.e., block until the thread is done) * to get the thread's output. - * + * * @param * The type-parameter of the Callable and the returned Future * @param callable @@ -232,14 +232,14 @@ public static Future spawn(Callable callable) { return threadPool.submit(callable); } - private static ExecutorService threadPool = Executors.newCachedThreadPool(); + private static ExecutorService threadPool = new GrowBeforeQueueThreadPoolExecutor("gvrf"); /** * By default, the spawn() methods use their own * {@link Executors#newCachedThreadPool()}. This method allows you to use a * different thread pool, if the cached thread pool provides the wrong * semantics or if you want to use a single thread pool for the whole app. - * + * *

* Note that calling this method will have no effect on any threads * started on the existing thread pool; they will run to completion in a @@ -260,7 +260,7 @@ public static void setThreadPool(ExecutorService newThreadPool) { * The spawn() methods use a {@link Executors#newCachedThreadPool()}; this * method provides access to that thread pool, so that other parts of the * app can use the pool without having to call one of the spwan() methods. - * + * *

* Note that any prior call to {@link #setThreadPool(ExecutorService)} will * affect the result of this method! @@ -284,7 +284,7 @@ public interface Cancelable extends Runnable { /** * Manages pool of pending threads, deciding which should run next. - * + * * Used in conjunction with a ThreadLimiter, which handles synchronization: * implementations generally don't need to pay attention to thread safety. */ @@ -294,7 +294,7 @@ public interface ThreadPolicyProvider { /** * Are there any thread procs in the pool? - * + * * May use {@link Cancelable#stillWanted()} to trim pool. * */ boolean isEmpty(); @@ -307,7 +307,7 @@ public interface ThreadPolicyProvider { /** * Make threadProc the next in the pool. - * + * *

    *
  • If threadProc is already in the pool, move it to the front, so * get() returns it next. @@ -320,10 +320,10 @@ public interface ThreadPolicyProvider { /** * Limits the number of (thread pool) threads running at any one time. - * + * * A stand-alone class, as different subsystems may have different limits, * based on the resources their threads consume. - * + * * Each subsystem that instantiates a thread limiter must supply an * implementation of {@link ThreadPolicyProvider}, or use the standard * {@link LifoThreadPolicyProvider}. @@ -351,7 +351,7 @@ public static class ThreadLimiter { /** * Create a new thread limiter, with a * {@link #DEFAULT_SLOW_THREAD_TIMEOUT default slow-thread timeout}. - * + * * @param maxThreads * Maximum number of threads that can run at once. * @param policy @@ -365,7 +365,7 @@ public ThreadLimiter(int maxThreads, /** * Create a new thread limiter. - * + * * @param maxThreads * Maximum number of threads that can run at once. * @param policy @@ -395,7 +395,7 @@ public ThreadLimiter(int maxThreads, /** * Run a thread proc, on a thread from the system thread pool. - * + * * Thread will run immediately, if possible. If the maximum number of * threads are already running, the thread proc will be added to a pool, * and scheduling will be determined by the ThreadPolicyProvider. @@ -580,7 +580,7 @@ private void setRunnable(Runnable runnable) { /** * Remove this thread from the timeout queue - * + * * @return {@code true} if the thread was still in the queue; * {@code false} if the thread has timed out */ @@ -623,7 +623,7 @@ public int compareTo(ThreadTimeouts other) { * Provides LIFO thread policy management: Most recently added thread proc * will run next. Appropriate for image galleries, say, where most recent * request is most likely to be visible. - * + * * Never cancels a request! */ public static class LifoThreadPolicyProvider implements @@ -693,7 +693,7 @@ public static long threadId() { * This is an assertion method that can be used by a thread to confirm that * the thread isn't already holding lock for an object, before acquiring a * lock - * + * * @param object * object to test for lock * @param name