Skip to content
This repository has been archived by the owner on Jun 15, 2023. It is now read-only.

Commit

Permalink
Issue: #14: Fix some stacktraces in Dispatcher tests
Browse files Browse the repository at this point in the history
  • Loading branch information
magnet committed Oct 16, 2017
1 parent dc491fe commit 3c06918
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,27 @@ public final class DetachedDispatcherImpl implements DetachedDispatcher {
private final ExecutionContextManager executionContextManager;
private final Dispatcher dispatcher;

public DetachedDispatcherImpl(Dispatcher dispatcher, ExecutionContextManager executionContextManager, String name, int minThreads,
int maxThreads) {
public DetachedDispatcherImpl(Dispatcher dispatcher, ExecutionContextManager executionContextManager, String name,
int minThreads, int maxThreads) {
this.dispatcher = dispatcher;
this.executionContextManager = executionContextManager;
LinkedBlockingDeque<Runnable> queue = new LinkedBlockingDeque<>();

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(minThreads, maxThreads, 10, TimeUnit.SECONDS, queue,
new ThreadFactoryBuilder().setNameFormat(name + "-pool-%d").setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
new ThreadFactoryBuilder().setNameFormat(name + "-pool-%d")
.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {

@Override
public void uncaughtException(Thread t, Throwable e) {
LOGGER.error("Uncaught error in thread {}", t, e);
}
}).build(), new RejectedExecutionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
LOGGER.error("Uncaught error in thread {}", t, e);
}
}).build(),
new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// this will block if the queue is full, which is unlikely since it is virtually unbounded...!
// this will block if the queue is full, which is
// unlikely since it is virtually
// unbounded...! (rethink this, unbounded queues &
// blocking are evil!)
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
Expand All @@ -60,28 +65,38 @@ public void execute(Runnable task, boolean inheritExecutionContext) {

@Override
public <T> CancelablePromise<T> dispatch(Callable<T> task, boolean inheritExecutionContext) {
CancelablePromise<T> dispatchedPromise = Dispatchers.dispatch(executor, executionContextManager, task, inheritExecutionContext);
CancelablePromise<T> dispatchedPromise = Dispatchers.dispatch(executor, executionContextManager, task,
inheritExecutionContext);

// Bridge the promise resolution callback back to the non-blocking dispatcher!
// Bridge the promise resolution callback back to the non-blocking
// dispatcher!
Deferred<T> deferred = new Deferred<>();
dispatchedPromise.onResolve(() -> {
dispatcher.execute(() -> {
deferred.resolveWith(dispatchedPromise);
} , inheritExecutionContext);
});

return new DelegatingCancelablePromise<T>(deferred.getPromise()) {
DelegatingCancelablePromise<T> cancelablePromise = new DelegatingCancelablePromise<T>(deferred.getPromise()) {
@Override
public boolean cancel(String reason, boolean tryToInterrupt) {
return dispatchedPromise.cancel(reason, tryToInterrupt);
}
};

dispatchedPromise.onResolve(() -> {
try {
dispatcher.execute(() -> {
deferred.resolveWith(dispatchedPromise);
}, inheritExecutionContext);
} catch (RejectedExecutionException rej) {
LOGGER.warn(
"Cannot bridge promise: the main non-blocking dispatcher is rejecting tasks (shutdown?)");
}
});

return cancelablePromise;
}

@Override
public <T> List<CancelablePromise<T>> dispatchAll(List<? extends Callable<T>> tasks, boolean inheritExecutionContext) {
return Dispatchers.dispatchAll(task -> dispatch(task, inheritExecutionContext), tasks,
inheritExecutionContext);
public <T> List<CancelablePromise<T>> dispatchAll(List<? extends Callable<T>> tasks,
boolean inheritExecutionContext) {
return Dispatchers.dispatchAll(task -> dispatch(task, inheritExecutionContext), tasks, inheritExecutionContext);
}

@Override
Expand Down
2 changes: 0 additions & 2 deletions src/main/java/io/primeval/codex/internal/DispatcherImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,9 @@ public void uncaughtException(Thread t, Throwable e) {
public void deactivate() {
if (executor != null) {
executor.shutdown();
executor = null;
}
if (scheduler != null) {
scheduler.shutdown();
scheduler = null;
}
}

Expand Down
57 changes: 31 additions & 26 deletions src/main/java/io/primeval/codex/internal/Dispatchers.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@

public final class Dispatchers {



static <T> void execute(Executor executor, ExecutionContextManager executionContextManager, Runnable task,
boolean inheritExecutionContext) {

Expand All @@ -32,33 +30,40 @@ static <T> void execute(Executor executor, ExecutionContextManager executionCont

}

static <T> CancelablePromise<T> dispatch(Executor executor, ExecutionContextManager executionContextManager, Callable<T> task,
boolean inheritExecutionContext) {
CancelableDeferred<T> deferred = new CancelableDeferred<>();
CancelationSupport cancelationSupport = new CancelationSupport(deferred);
static <T> CancelablePromise<T> dispatch(Executor executor, ExecutionContextManager executionContextManager,
Callable<T> task, boolean inheritExecutionContext) {

ExecutionContextSwitch executionContextSwitch = inheritExecutionContext ? executionContextManager.onDispatch()
: ExecutionContextManagerImpl.NOOP_EXECUTION_CONTEXT_SWITCH;
try {
CancelableDeferred<T> deferred = new CancelableDeferred<>();
CancelationSupport cancelationSupport = new CancelationSupport(deferred);

executor.execute(() -> {
// If canceled before start, we never run.
if (deferred.canceled) {
return;
}
// Start to get the interruption thingy.
try {
cancelationSupport.start();
executionContextSwitch.apply();
T call = task.call();
deferred.resolve(call);
} catch (Exception e) {
deferred.fail(e);
} finally {
cancelationSupport.stop();
ExecutionContextSwitch executionContextSwitch = inheritExecutionContext
? executionContextManager.onDispatch()
: ExecutionContextManagerImpl.NOOP_EXECUTION_CONTEXT_SWITCH;

executor.execute(() -> {
// If canceled before start, we never run.
if (deferred.canceled) {
return;
}
// Start to get the interruption thingy.
try {
cancelationSupport.start();
executionContextSwitch.apply();
T call = task.call();
deferred.resolve(call);
} catch (Exception e) {
deferred.fail(e);
} finally {
cancelationSupport.stop();
executionContextSwitch.unapply();
}
});
return new DispatchedPromise<T>(deferred.getPromise(), cancelationSupport);
}
});
return new DispatchedPromise<T>(deferred.getPromise(), cancelationSupport);

} catch (Throwable e) {
return CancelablePromise.failed(e);
}
}

static <T> List<CancelablePromise<T>> dispatchAll(Function<Callable<T>, CancelablePromise<T>> dispatchingFunction,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ public static void classSetUp() throws Exception {
}

@AfterClass
public static void classTearDown() {
public static void classTearDown() throws InterruptedException {
Thread.sleep(1500L);
dispatcher.deactivate();
}

Expand Down
22 changes: 22 additions & 0 deletions src/test/java/io/primeval/codex/promise/PromiseHelperTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.primeval.codex.promise;

import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.osgi.util.promise.Promise;
import org.osgi.util.promise.Promises;

public class PromiseHelperTest {

@Test
public void mapFallible() throws InterruptedException {

Promise<Integer> promise = PromiseHelper.mapFallible(Promises.resolved("foo"), s -> {
if ("foo".equals(s)) {
throw new Exception();
}
return 32;
});
Assertions.assertThat(promise.getFailure().getClass()).isSameAs(Exception.class);
}

}

0 comments on commit 3c06918

Please sign in to comment.