Skip to content

Commit

Permalink
add executor service param - minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Semernitskaya committed Sep 13, 2023
1 parent d20b264 commit 78dd557
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Predicate;

import static java.util.stream.Collectors.toList;
Expand All @@ -32,7 +31,7 @@ public final class FailsafePlugin implements Plugin {
private final ExecutorService executorService;

public FailsafePlugin() {
this(vec(), vec(), ForkJoinPool.commonPool());
this(vec(), vec(), null);
}

public FailsafePlugin withPolicy(final Policy<ClientHttpResponse> policy) {
Expand Down Expand Up @@ -64,11 +63,15 @@ public RequestExecution aroundAsync(final RequestExecution execution) {

if (policies.isEmpty()) {
return execution.execute(arguments);
} else if (executorService != null) {
return Failsafe.with(select(arguments))
// TODO: need threads count validation? see with method doc
.with(executorService) // according to https://failsafe.dev/async-execution/#custom-schedulers
.getStageAsync(decorate(execution, arguments));
} else {
return Failsafe.with(select(arguments))
.getStageAsync(decorate(execution, arguments));
}

return Failsafe.with(select(arguments))
.with(executorService) // according to https://jodah.net/failsafe/schedulers
.getStageAsync(decorate(execution, arguments));
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
Expand All @@ -44,11 +44,12 @@ final class FailsafePluginCustomExecutorTest {

public static class CountingExecutorService extends ThreadPoolExecutor {
AtomicInteger counter = new AtomicInteger();
public CountingExecutorService (int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {

public CountingExecutorService(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory());
}
Expand All @@ -59,8 +60,11 @@ void resetCounter() {

@Override
public void execute(@NotNull Runnable command) {
counter.incrementAndGet();
super.execute(command);
super.execute(() -> {
log.info("Failsafe executor runnable");
counter.incrementAndGet();
command.run();
});
}
}

Expand All @@ -69,12 +73,12 @@ public void execute(@NotNull Runnable command) {
private final CloseableHttpClient client = HttpClientBuilder.create().build();
private final ApacheClientHttpRequestFactory factory = new ApacheClientHttpRequestFactory(client);

private final CountingExecutorService countingExecutor = new CountingExecutorService(1, 1,
private final CountingExecutorService countingExecutor = new CountingExecutorService(3, 3,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>());

private final Http unit = Http.builder()
.executor(newSingleThreadExecutor())
.executor(newFixedThreadPool(2))
.requestFactory(factory)
.baseUrl(getBaseUrl(server))
.converter(createJsonConverter())
Expand Down Expand Up @@ -103,17 +107,20 @@ void tearDown() throws IOException {
}

@Test
void shouldUseCustomExecutor() {
void shouldUseCustomExecutor() throws InterruptedException {
server.enqueue(emptyMockResponse());

int invocationCount = 5;
IntStream.range(0, invocationCount).forEach(i -> {
var futures = IntStream.range(0, invocationCount).mapToObj(i -> {
server.enqueue(emptyMockResponse());

unit.get("/foo")
.call(pass())
return unit.get("/foo")
.call(pass());
}).toList();

CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.join();
});

verify(server, invocationCount, "/foo");
assertEquals(invocationCount, countingExecutor.counter.get());
}
Expand Down

0 comments on commit 78dd557

Please sign in to comment.