Skip to content

Commit

Permalink
Fix some test regressions and some code simplifications (#4239)
Browse files Browse the repository at this point in the history
* Fixed a few FT regressions. Enable configuration of Single source cancellation and moved initialization of executors to the AfterDeploymentValidation phase.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Simplified uses of AnnotationType/AnnotationMethod by simply passing AnnotationMethod.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>
  • Loading branch information
spericas authored May 19, 2022
1 parent ae7978e commit 09be6c3
Show file tree
Hide file tree
Showing 20 changed files with 161 additions and 87 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
* Copyright (c) 2020, 2022 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -52,6 +52,7 @@ class Builder implements io.helidon.common.Builder<Bulkhead> {
private int limit = DEFAULT_LIMIT;
private int queueLength = DEFAULT_QUEUE_LENGTH;
private String name = "Bulkhead-" + System.identityHashCode(this);
private boolean cancelSource = true;

private Builder() {
}
Expand Down Expand Up @@ -108,6 +109,18 @@ public Builder name(String name) {
return this;
}

/**
* Policy to cancel any source stage if the value return by {@link Bulkhead#invoke}
* is cancelled. Default is {@code true}; mostly used by FT MP to change default.
*
* @param cancelSource cancel source policy
* @return updated builder instance
*/
public Builder cancelSource(boolean cancelSource) {
this.cancelSource = cancelSource;
return this;
}

int limit() {
return limit;
}
Expand All @@ -123,6 +136,10 @@ LazyValue<? extends ExecutorService> executor() {
String name() {
return name;
}

boolean cancelSource() {
return cancelSource;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
* Copyright (c) 2020, 2022 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -38,6 +38,7 @@ class BulkheadImpl implements Bulkhead {
private final Queue<DelayedTask<?>> queue;
private final Semaphore inProgress;
private final String name;
private final boolean cancelSource;

private final AtomicLong concurrentExecutions = new AtomicLong(0L);
private final AtomicLong callsAccepted = new AtomicLong(0L);
Expand All @@ -47,6 +48,7 @@ class BulkheadImpl implements Bulkhead {
this.executor = builder.executor();
this.inProgress = new Semaphore(builder.limit(), true);
this.name = builder.name();
this.cancelSource = builder.cancelSource();

if (builder.queueLength() == 0) {
queue = new NoQueue();
Expand All @@ -62,7 +64,7 @@ public String name() {

@Override
public <T> Single<T> invoke(Supplier<? extends CompletionStage<T>> supplier) {
return invokeTask(DelayedTask.createSingle(supplier));
return invokeTask(DelayedTask.createSingle(supplier, cancelSource));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
* Copyright (c) 2020, 2022 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -102,6 +102,7 @@ class Builder implements io.helidon.common.Builder<CircuitBreaker> {
private int volume = 10;
private LazyValue<? extends ScheduledExecutorService> executor = FaultTolerance.scheduledExecutor();
private String name = "CircuitBreaker-" + System.identityHashCode(this);
private boolean cancelSource = true;

private Builder() {
}
Expand Down Expand Up @@ -242,6 +243,18 @@ public Builder name(String name) {
return this;
}

/**
* Policy to cancel any source stage if the value return by {@link CircuitBreaker#invoke}
* is cancelled. Default is {@code true}; mostly used by FT MP to change default.
*
* @param cancelSource cancel source policy
* @return updated builder instance
*/
public Builder cancelSource(boolean cancelSource) {
this.cancelSource = cancelSource;
return this;
}

LazyValue<? extends ScheduledExecutorService> executor() {
return executor;
}
Expand Down Expand Up @@ -273,5 +286,9 @@ int volume() {
String name() {
return name;
}

boolean cancelSource() {
return cancelSource;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
* Copyright (c) 2020, 2022 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -55,6 +55,7 @@ class CircuitBreakerImpl implements CircuitBreaker {
private final AtomicReference<ScheduledFuture<Boolean>> schedule = new AtomicReference<>();
private final ErrorChecker errorChecker;
private final String name;
private final boolean cancelSource;

CircuitBreakerImpl(CircuitBreaker.Builder builder) {
this.delayMillis = builder.delay().toMillis();
Expand All @@ -63,6 +64,7 @@ class CircuitBreakerImpl implements CircuitBreaker {
this.executor = builder.executor();
this.errorChecker = ErrorChecker.create(builder.skipOn(), builder.applyOn());
this.name = builder.name();
this.cancelSource = builder.cancelSource();
}

@Override
Expand All @@ -77,7 +79,7 @@ public <T> Multi<T> invokeMulti(Supplier<? extends Flow.Publisher<T>> supplier)

@Override
public <T> Single<T> invoke(Supplier<? extends CompletionStage<T>> supplier) {
return invokeTask(DelayedTask.createSingle(supplier));
return invokeTask(DelayedTask.createSingle(supplier, cancelSource));
}

private <U> U invokeTask(DelayedTask<U> task) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2021 Oracle and/or its affiliates.
* Copyright (c) 2020, 2022 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -105,6 +105,11 @@ private void completeMarker() {
}

static <T> DelayedTask<Single<T>> createSingle(Supplier<? extends CompletionStage<T>> supplier) {
return createSingle(supplier, true);
}

static <T> DelayedTask<Single<T>> createSingle(Supplier<? extends CompletionStage<T>> supplier,
boolean cancelSource) {
return new DelayedTask<>() {
// future we returned as a result of invoke command
private final LazyValue<CompletableFuture<T>> resultFuture = LazyValue.create(CompletableFuture::new);
Expand All @@ -119,7 +124,7 @@ public CompletionStage<Void> execute() {
}

CompletableFuture<T> future = resultFuture.get();
createDependency(result, future);
createDependency(result, future, cancelSource);
return result.thenRun(() -> {});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,21 +154,26 @@ static Throwable cause(Throwable throwable) {
*
* @param source the source stage
* @param dependent the dependent future
* @param cancelSource cancel source if dependent is cancelled
* @param <T> type of result
*/
static <T> CompletableFuture<T> createDependency(CompletionStage<T> source, CompletableFuture<T> dependent) {
static <T> CompletableFuture<T> createDependency(CompletionStage<T> source,
CompletableFuture<T> dependent,
boolean cancelSource) {
source.whenComplete((o, t) -> {
if (t != null) {
dependent.completeExceptionally(t);
} else {
dependent.complete(o);
}
});
dependent.whenComplete((o, t) -> {
if (dependent.isCancelled()) {
source.toCompletableFuture().cancel(true);
}
});
if (cancelSource) {
dependent.whenComplete((o, t) -> {
if (dependent.isCancelled()) {
source.toCompletableFuture().cancel(true);
}
});
}
return dependent;
}

Expand Down
17 changes: 17 additions & 0 deletions fault-tolerance/src/main/java/io/helidon/faulttolerance/Retry.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class Builder implements io.helidon.common.Builder<Retry> {
private Duration overallTimeout = Duration.ofSeconds(1);
private LazyValue<? extends ScheduledExecutorService> scheduledExecutor = FaultTolerance.scheduledExecutor();
private String name = "Retry-" + System.identityHashCode(this);
private boolean cancelSource = true;

private Builder() {
}
Expand Down Expand Up @@ -178,6 +179,18 @@ public Builder name(String name) {
return this;
}

/**
* Policy to cancel any source stage if the value return by {@link Retry#invoke}
* is cancelled. Default is {@code true}; mostly used by FT MP to change default.
*
* @param cancelSource cancel source policy
* @return updated builder instance
*/
public Builder cancelSource(boolean cancelSource) {
this.cancelSource = cancelSource;
return this;
}

Set<Class<? extends Throwable>> applyOn() {
return applyOn;
}
Expand All @@ -201,6 +214,10 @@ LazyValue<? extends ScheduledExecutorService> scheduledExecutor() {
String name() {
return name;
}

boolean cancelSource() {
return cancelSource;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2021 Oracle and/or its affiliates.
* Copyright (c) 2020, 2022 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -39,13 +39,15 @@ class RetryImpl implements Retry {
private final Retry.RetryPolicy retryPolicy;
private final AtomicLong retryCounter = new AtomicLong(0L);
private final String name;
private final boolean cancelSource;

RetryImpl(Retry.Builder builder) {
this.scheduledExecutor = builder.scheduledExecutor();
this.errorChecker = ErrorChecker.create(builder.skipOn(), builder.applyOn());
this.maxTimeNanos = builder.overallTimeout().toNanos();
this.retryPolicy = builder.retryPolicy();
this.name = builder.name();
this.cancelSource = builder.cancelSource();
}

@Override
Expand Down Expand Up @@ -86,7 +88,7 @@ private <T> Single<T> retrySingle(RetryContext<? extends CompletionStage<T>> con
retryCounter.getAndIncrement();
}

DelayedTask<Single<T>> task = DelayedTask.createSingle(context.supplier);
DelayedTask<Single<T>> task = DelayedTask.createSingle(context.supplier, cancelSource);
if (delay == 0) {
task.execute();
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
* Copyright (c) 2020, 2022 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -54,6 +54,7 @@ class Builder implements io.helidon.common.Builder<Timeout> {
private LazyValue<? extends ScheduledExecutorService> executor = FaultTolerance.scheduledExecutor();
private boolean currentThread = false;
private String name = "Timeout-" + System.identityHashCode(this);
private boolean cancelSource = true;

private Builder() {
}
Expand Down Expand Up @@ -108,6 +109,17 @@ public Builder name(String name) {
return this;
}

/**
* Cancel source if destination stage is cancelled.
*
* @param cancelSource setting for cancel source, defaults (@code true}
* @return updated builder instance
*/
public Builder cancelSource(boolean cancelSource) {
this.cancelSource = cancelSource;
return this;
}

Duration timeout() {
return timeout;
}
Expand All @@ -123,5 +135,9 @@ boolean currentThread() {
String name() {
return name;
}

boolean cancelSource() {
return cancelSource;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2021 Oracle and/or its affiliates.
* Copyright (c) 2020, 2022 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -39,12 +39,14 @@ class TimeoutImpl implements Timeout {
private final LazyValue<? extends ScheduledExecutorService> executor;
private final boolean currentThread;
private final String name;
private final boolean cancelSource;

TimeoutImpl(Timeout.Builder builder) {
this.timeoutMillis = builder.timeout().toMillis();
this.executor = builder.executor();
this.currentThread = builder.currentThread();
this.name = builder.name();
this.cancelSource = builder.cancelSource();
}

@Override
Expand Down Expand Up @@ -100,7 +102,7 @@ public <T> Single<T> invoke(Supplier<? extends CompletionStage<T>> supplier) {
// Run invocation in current thread
Single<T> single = Single.create(supplier.get(), true);
callReturned.set(true);
createDependency(single, future);
createDependency(single, future, cancelSource);

// Clear interrupted flag here -- required for uninterruptible busy loops
Thread.interrupted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.concurrent.Future;

import javax.enterprise.inject.spi.AnnotatedMethod;
import javax.enterprise.inject.spi.AnnotatedType;

import org.eclipse.microprofile.faulttolerance.Asynchronous;
import org.eclipse.microprofile.faulttolerance.exceptions.FaultToleranceDefinitionException;
Expand All @@ -30,11 +29,10 @@ class AsynchronousAntn extends MethodAntn implements Asynchronous {
/**
* Constructor.
*
* @param annotatedType The annotated type.
* @param annotatedMethod The annotated method.
*/
AsynchronousAntn(AnnotatedType<?> annotatedType, AnnotatedMethod<?> annotatedMethod) {
super(annotatedType, annotatedMethod);
AsynchronousAntn(AnnotatedMethod<?> annotatedMethod) {
super(annotatedMethod);
}

@Override
Expand Down
Loading

0 comments on commit 09be6c3

Please sign in to comment.