diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/Workflow.java b/sdk-workflows/src/main/java/io/dapr/workflows/Workflow.java index 0b1a85f02b..8cb4750aeb 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/Workflow.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/Workflow.java @@ -16,7 +16,7 @@ import com.microsoft.durabletask.interruption.ContinueAsNewInterruption; import com.microsoft.durabletask.interruption.OrchestratorBlockedException; import io.dapr.workflows.saga.SagaCompensationException; -import io.dapr.workflows.saga.SagaOption; +import io.dapr.workflows.saga.SagaOptions; /** * Common interface for workflow implementations. @@ -74,7 +74,7 @@ default boolean isSagaEnabled() { * * @return saga configuration */ - default SagaOption getSagaOption() { + default SagaOptions getSagaOption() { // by default, saga is disabled return null; } diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowActivityContext.java b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowActivityContext.java index 114c8b58e7..3fe5d88a23 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowActivityContext.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowActivityContext.java @@ -14,7 +14,9 @@ package io.dapr.workflows; public interface WorkflowActivityContext { + String getName(); T getInput(Class targetType); + } diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java index bc5d53186b..4156c5a592 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java @@ -17,7 +17,6 @@ import com.microsoft.durabletask.Task; import com.microsoft.durabletask.TaskCanceledException; import com.microsoft.durabletask.TaskFailedException; -import com.microsoft.durabletask.TaskOptions; import io.dapr.workflows.saga.SagaContext; import org.slf4j.Logger; @@ -153,15 +152,15 @@ default Task waitForExternalEvent(String name, Class dataType) { * @param the expected type of the activity output * @return a new {@link Task} that completes when the activity completes or fails */ - Task callActivity(String name, Object input, TaskOptions options, Class returnType); + Task callActivity(String name, Object input, WorkflowTaskOptions options, Class returnType); /** * Asynchronously invokes an activity by name and returns a new {@link Task} that completes when the activity - * completes. See {@link #callActivity(String, Object, TaskOptions, Class)} for a complete description. + * completes. See {@link #callActivity(String, Object, WorkflowTaskOptions, Class)} for a complete description. * * @param name the name of the activity to call * @return a new {@link Task} that completes when the activity completes or fails - * @see #callActivity(String, Object, TaskOptions, Class) + * @see #callActivity(String, Object, WorkflowTaskOptions, Class) */ default Task callActivity(String name) { return this.callActivity(name, null, null, Void.class); @@ -169,8 +168,8 @@ default Task callActivity(String name) { /** * Asynchronously invokes an activity by name and with the specified input value and returns a new {@link Task} - * that completes when the activity completes. See {@link #callActivity(String, Object, TaskOptions, Class)} for a - * complete description. + * that completes when the activity completes. + * See {@link #callActivity(String, Object, WorkflowTaskOptions, Class)} for a complete description. * * @param name the name of the activity to call * @param input the serializable input to pass to the activity @@ -183,7 +182,7 @@ default Task callActivity(String name, Object input) { /** * Asynchronously invokes an activity by name and returns a new {@link Task} that completes when the activity * completes. If the activity completes successfully, the returned {@code Task}'s value will be the activity's - * output. See {@link #callActivity(String, Object, TaskOptions, Class)} for a complete description. + * output. See {@link #callActivity(String, Object, WorkflowTaskOptions, Class)} for a complete description. * * @param name the name of the activity to call * @param returnType the expected class type of the activity output @@ -197,8 +196,8 @@ default Task callActivity(String name, Class returnType) { /** * Asynchronously invokes an activity by name and with the specified input value and returns a new {@link Task} * that completes when the activity completes.If the activity completes successfully, the returned {@code Task}'s - * value will be the activity's output. See {@link #callActivity(String, Object, TaskOptions, Class)} for a - * complete description. + * value will be the activity's output. + * See {@link #callActivity(String, Object, WorkflowTaskOptions, Class)} for a complete description. * * @param name the name of the activity to call * @param input the serializable input to pass to the activity @@ -212,15 +211,15 @@ default Task callActivity(String name, Object input, Class returnType) /** * Asynchronously invokes an activity by name and with the specified input value and returns a new {@link Task} - * that completes when the activity completes. See {@link #callActivity(String, Object, TaskOptions, Class)} for a - * complete description. + * that completes when the activity completes. + * See {@link #callActivity(String, Object, WorkflowTaskOptions, Class)} for a complete description. * * @param name the name of the activity to call * @param input the serializable input to pass to the activity * @param options additional options that control the execution and processing of the activity * @return a new {@link Task} that completes when the activity completes or fails */ - default Task callActivity(String name, Object input, TaskOptions options) { + default Task callActivity(String name, Object input, WorkflowTaskOptions options) { return this.callActivity(name, input, options, Void.class); } @@ -367,11 +366,11 @@ default Task createTimer(ZonedDateTime zonedDateTime) { * Asynchronously invokes another workflow as a child-workflow and returns a {@link Task} that completes * when the child-workflow completes. * - *

See {@link #callChildWorkflow(String, Object, String, TaskOptions, Class)} for a full description. + *

See {@link #callChildWorkflow(String, Object, String, WorkflowTaskOptions, Class)} for a full description. * * @param name the name of the workflow to invoke * @return a new {@link Task} that completes when the child-workflow completes or fails - * @see #callChildWorkflow(String, Object, String, TaskOptions, Class) + * @see #callChildWorkflow(String, Object, String, WorkflowTaskOptions, Class) */ default Task callChildWorkflow(String name) { return this.callChildWorkflow(name, null); @@ -381,7 +380,7 @@ default Task callChildWorkflow(String name) { * Asynchronously invokes another workflow as a child-workflow and returns a {@link Task} that completes * when the child-workflow completes. * - *

See {@link #callChildWorkflow(String, Object, String, TaskOptions, Class)} for a full description. + *

See {@link #callChildWorkflow(String, Object, String, WorkflowTaskOptions, Class)} for a full description. * * @param name the name of the workflow to invoke * @param input the serializable input to send to the child-workflow @@ -395,7 +394,7 @@ default Task callChildWorkflow(String name, Object input) { * Asynchronously invokes another workflow as a child-workflow and returns a {@link Task} that completes * when the child-workflow completes. * - *

See {@link #callChildWorkflow(String, Object, String, TaskOptions, Class)} for a full description. + *

See {@link #callChildWorkflow(String, Object, String, WorkflowTaskOptions, Class)} for a full description. * * @param name the name of the workflow to invoke * @param input the serializable input to send to the child-workflow @@ -411,7 +410,7 @@ default Task callChildWorkflow(String name, Object input, Class return * Asynchronously invokes another workflow as a child-workflow and returns a {@link Task} that completes * when the child-workflow completes. * - *

See {@link #callChildWorkflow(String, Object, String, TaskOptions, Class)} for a full description. + *

See {@link #callChildWorkflow(String, Object, String, WorkflowTaskOptions, Class)} for a full description. * * @param name the name of the workflow to invoke * @param input the serializable input to send to the child-workflow @@ -428,7 +427,7 @@ default Task callChildWorkflow(String name, Object input, String instance * Asynchronously invokes another workflow as a child-workflow and returns a {@link Task} that completes * when the child-workflow completes. * - *

See {@link #callChildWorkflow(String, Object, String, TaskOptions, Class)} for a full description. + *

See {@link #callChildWorkflow(String, Object, String, WorkflowTaskOptions, Class)} for a full description. * * @param name the name of the workflow to invoke * @param input the serializable input to send to the child-workflow @@ -436,7 +435,7 @@ default Task callChildWorkflow(String name, Object input, String instance * @param options additional options that control the execution and processing of the activity * @return a new {@link Task} that completes when the child-workflow completes or fails */ - default Task callChildWorkflow(String name, Object input, String instanceID, TaskOptions options) { + default Task callChildWorkflow(String name, Object input, String instanceID, WorkflowTaskOptions options) { return this.callChildWorkflow(name, input, instanceID, options, Void.class); } @@ -478,7 +477,7 @@ default Task callChildWorkflow(String name, Object input, String instanceI Task callChildWorkflow(String name, @Nullable Object input, @Nullable String instanceID, - @Nullable TaskOptions options, + @Nullable WorkflowTaskOptions options, Class returnType); /** diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowStub.java b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowStub.java index 6a109c626d..ed8963e2d2 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowStub.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowStub.java @@ -15,5 +15,7 @@ @FunctionalInterface public interface WorkflowStub { + void run(WorkflowContext ctx); + } diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowTaskOptions.java b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowTaskOptions.java new file mode 100644 index 0000000000..4f3be9d7f9 --- /dev/null +++ b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowTaskOptions.java @@ -0,0 +1,28 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.workflows; + +public class WorkflowTaskOptions { + + private final WorkflowTaskRetryPolicy retryPolicy; + + public WorkflowTaskOptions(WorkflowTaskRetryPolicy retryPolicy) { + this.retryPolicy = retryPolicy; + } + + public WorkflowTaskRetryPolicy getRetryPolicy() { + return retryPolicy; + } + +} diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowTaskRetryPolicy.java b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowTaskRetryPolicy.java new file mode 100644 index 0000000000..cc63274ee5 --- /dev/null +++ b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowTaskRetryPolicy.java @@ -0,0 +1,180 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.workflows; + +import javax.annotation.Nullable; + +import java.time.Duration; + +public final class WorkflowTaskRetryPolicy { + + private final Integer maxNumberOfAttempts; + private final Duration firstRetryInterval; + private final Double backoffCoefficient; + private final Duration maxRetryInterval; + private final Duration retryTimeout; + + /** + * Constructor for WorkflowTaskRetryPolicy. + * @param maxNumberOfAttempts Maximum number of attempts to retry the workflow. + * @param firstRetryInterval Interval to wait before the first retry. + * @param backoffCoefficient Coefficient to increase the retry interval. + * @param maxRetryInterval Maximum interval to wait between retries. + * @param retryTimeout Timeout for the whole retry process. + */ + public WorkflowTaskRetryPolicy( + Integer maxNumberOfAttempts, + Duration firstRetryInterval, + Double backoffCoefficient, + Duration maxRetryInterval, + Duration retryTimeout + ) { + this.maxNumberOfAttempts = maxNumberOfAttempts; + this.firstRetryInterval = firstRetryInterval; + this.backoffCoefficient = backoffCoefficient; + this.maxRetryInterval = maxRetryInterval; + this.retryTimeout = retryTimeout; + } + + public int getMaxNumberOfAttempts() { + return maxNumberOfAttempts; + } + + public Duration getFirstRetryInterval() { + return firstRetryInterval; + } + + public double getBackoffCoefficient() { + return backoffCoefficient; + } + + public Duration getMaxRetryInterval() { + return maxRetryInterval; + } + + public Duration getRetryTimeout() { + return retryTimeout; + } + + public static Builder newBuilder() { + return new Builder(); + } + + public static class Builder { + + private Integer maxNumberOfAttempts; + private Duration firstRetryInterval; + private Double backoffCoefficient = 1.0; + private Duration maxRetryInterval; + private Duration retryTimeout; + + private Builder() { + } + + /** + * Build the WorkflowTaskRetryPolicy. + * @return WorkflowTaskRetryPolicy + */ + public WorkflowTaskRetryPolicy build() { + return new WorkflowTaskRetryPolicy( + this.maxNumberOfAttempts, + this.firstRetryInterval, + this.backoffCoefficient, + this.maxRetryInterval, + this.retryTimeout + ); + } + + /** + * Set the maximum number of attempts to retry the workflow. + * @param maxNumberOfAttempts Maximum number + * @return This builder + */ + public Builder setMaxNumberOfAttempts(int maxNumberOfAttempts) { + if (maxNumberOfAttempts <= 0) { + throw new IllegalArgumentException("The value for maxNumberOfAttempts must be greater than zero."); + } + + this.maxNumberOfAttempts = maxNumberOfAttempts; + + return this; + } + + /** + * Set the interval to wait before the first retry. + * @param firstRetryInterval Interval + * @return This builder + */ + public Builder setFirstRetryInterval(Duration firstRetryInterval) { + if (firstRetryInterval == null) { + throw new IllegalArgumentException("firstRetryInterval cannot be null."); + } + if (firstRetryInterval.isZero() || firstRetryInterval.isNegative()) { + throw new IllegalArgumentException("The value for firstRetryInterval must be greater than zero."); + } + + this.firstRetryInterval = firstRetryInterval; + + return this; + } + + /** + * Set the backoff coefficient. + * @param backoffCoefficient Double value + * @return This builder + */ + public Builder setBackoffCoefficient(double backoffCoefficient) { + if (backoffCoefficient < 1.0) { + throw new IllegalArgumentException("The value for backoffCoefficient must be greater or equal to 1.0."); + } + + this.backoffCoefficient = backoffCoefficient; + + return this; + } + + /** + * Set the maximum interval to wait between retries. + * @param maxRetryInterval Maximum interval + * @return This builder + */ + public Builder setMaxRetryInterval(@Nullable Duration maxRetryInterval) { + if (maxRetryInterval != null && maxRetryInterval.compareTo(this.firstRetryInterval) < 0) { + throw new IllegalArgumentException( + "The value for maxRetryInterval must be greater than or equal to the value for firstRetryInterval."); + } + + this.maxRetryInterval = maxRetryInterval; + + return this; + } + + /** + * Set the maximum retry timeout. + * @param retryTimeout Maximum retry timeout + * @return This builder + */ + public Builder setRetryTimeout(Duration retryTimeout) { + if (retryTimeout != null && retryTimeout.compareTo(this.firstRetryInterval) < 0) { + throw new IllegalArgumentException( + "The value for retryTimeout must be greater than or equal to the value for firstRetryInterval."); + } + + this.retryTimeout = retryTimeout; + + return this; + } + } + +} diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java b/sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java index 0b4c387aa3..cc1c0f9b1a 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java @@ -17,19 +17,12 @@ import com.microsoft.durabletask.DurableTaskGrpcClientBuilder; import com.microsoft.durabletask.OrchestrationMetadata; import com.microsoft.durabletask.PurgeResult; -import io.dapr.client.Headers; import io.dapr.config.Properties; import io.dapr.utils.NetworkUtils; import io.dapr.workflows.Workflow; import io.dapr.workflows.internal.ApiTokenClientInterceptor; -import io.grpc.CallOptions; -import io.grpc.Channel; -import io.grpc.ClientCall; import io.grpc.ClientInterceptor; -import io.grpc.ForwardingClientCall; import io.grpc.ManagedChannel; -import io.grpc.Metadata; -import io.grpc.MethodDescriptor; import javax.annotation.Nullable; @@ -42,6 +35,8 @@ */ public class DaprWorkflowClient implements AutoCloseable { + private static final ClientInterceptor WORKFLOW_INTERCEPTOR = new ApiTokenClientInterceptor(); + private DurableTaskClient innerClient; private ManagedChannel grpcChannel; @@ -137,7 +132,7 @@ public String scheduleNewWorkflow(Class clazz, Object in * @param options the options for the new workflow, including input, instance ID, etc. * @return the instanceId parameter value. */ - public String scheduleNewWorkflow(Class clazz, NewWorkflowOption options) { + public String scheduleNewWorkflow(Class clazz, NewWorkflowOptions options) { return this.innerClient.scheduleNewOrchestrationInstance(clazz.getCanonicalName(), options.getNewOrchestrationInstanceOptions()); } @@ -272,6 +267,6 @@ public void close() throws InterruptedException { } } - private static ClientInterceptor WORKFLOW_INTERCEPTOR = new ApiTokenClientInterceptor(); + } diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/client/NewWorkflowOption.java b/sdk-workflows/src/main/java/io/dapr/workflows/client/NewWorkflowOptions.java similarity index 86% rename from sdk-workflows/src/main/java/io/dapr/workflows/client/NewWorkflowOption.java rename to sdk-workflows/src/main/java/io/dapr/workflows/client/NewWorkflowOptions.java index d802c8f2c3..b83d4945a5 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/client/NewWorkflowOption.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/client/NewWorkflowOptions.java @@ -20,16 +20,16 @@ /** * Options for starting a new instance of a workflow. */ -public class NewWorkflowOption { +public class NewWorkflowOptions { private final NewOrchestrationInstanceOptions newOrchestrationInstanceOptions = new NewOrchestrationInstanceOptions(); /** * Sets the version of the workflow to start. * * @param version the user-defined version of workflow - * @return this {@link NewWorkflowOption} object + * @return this {@link NewWorkflowOptions} object */ - public NewWorkflowOption setVersion(String version) { + public NewWorkflowOptions setVersion(String version) { this.newOrchestrationInstanceOptions.setVersion(version); return this; } @@ -40,9 +40,9 @@ public NewWorkflowOption setVersion(String version) { *

If no instance ID is configured, the workflow will be created with a randomly generated instance ID. * * @param instanceId the ID of the new workflow - * @return this {@link NewWorkflowOption} object + * @return this {@link NewWorkflowOptions} object */ - public NewWorkflowOption setInstanceId(String instanceId) { + public NewWorkflowOptions setInstanceId(String instanceId) { this.newOrchestrationInstanceOptions.setInstanceId(instanceId); return this; } @@ -51,9 +51,9 @@ public NewWorkflowOption setInstanceId(String instanceId) { * Sets the input of the workflow to start. * * @param input the input of the new workflow - * @return this {@link NewWorkflowOption} object + * @return this {@link NewWorkflowOptions} object */ - public NewWorkflowOption setInput(Object input) { + public NewWorkflowOptions setInput(Object input) { this.newOrchestrationInstanceOptions.setInput(input); return this; } @@ -65,9 +65,9 @@ public NewWorkflowOption setInput(Object input) { * to start them at a specific time in the future. * * @param startTime the start time of the new workflow - * @return this {@link NewWorkflowOption} object + * @return this {@link NewWorkflowOptions} object */ - public NewWorkflowOption setStartTime(Instant startTime) { + public NewWorkflowOptions setStartTime(Instant startTime) { this.newOrchestrationInstanceOptions.setStartTime(startTime); return this; } diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowContext.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowContext.java index fea7ea3968..b843819ad6 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowContext.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowContext.java @@ -14,11 +14,14 @@ package io.dapr.workflows.runtime; import com.microsoft.durabletask.CompositeTaskFailedException; +import com.microsoft.durabletask.RetryPolicy; import com.microsoft.durabletask.Task; import com.microsoft.durabletask.TaskCanceledException; import com.microsoft.durabletask.TaskOptions; import com.microsoft.durabletask.TaskOrchestrationContext; import io.dapr.workflows.WorkflowContext; +import io.dapr.workflows.WorkflowTaskOptions; +import io.dapr.workflows.WorkflowTaskRetryPolicy; import io.dapr.workflows.runtime.saga.DefaultSagaContext; import io.dapr.workflows.saga.Saga; import io.dapr.workflows.saga.SagaContext; @@ -149,7 +152,7 @@ public Task waitForExternalEvent(String name, Duration timeout, Class * before the event is received */ @Override - public Task waitForExternalEvent(String name, Duration timeout) throws TaskCanceledException { + public Task waitForExternalEvent(String name, Duration timeout) throws TaskCanceledException { return this.innerContext.waitForExternalEvent(name, timeout, Void.class); } @@ -165,7 +168,7 @@ public Task waitForExternalEvent(String name, Duration timeout) throws * @return a new {@link Task} that completes when the external event is received */ @Override - public Task waitForExternalEvent(String name) throws TaskCanceledException { + public Task waitForExternalEvent(String name) throws TaskCanceledException { return this.innerContext.waitForExternalEvent(name, null, Void.class); } @@ -177,8 +180,10 @@ public boolean isReplaying() { /** * {@inheritDoc} */ - public Task callActivity(String name, Object input, TaskOptions options, Class returnType) { - return this.innerContext.callActivity(name, input, options, returnType); + public Task callActivity(String name, Object input, WorkflowTaskOptions options, Class returnType) { + TaskOptions taskOptions = toTaskOptions(options); + + return this.innerContext.callActivity(name, input, taskOptions, returnType); } /** @@ -214,9 +219,10 @@ public T getInput(Class targetType) { */ @Override public Task callChildWorkflow(String name, @Nullable Object input, @Nullable String instanceID, - @Nullable TaskOptions options, Class returnType) { + @Nullable WorkflowTaskOptions options, Class returnType) { + TaskOptions taskOptions = toTaskOptions(options); - return this.innerContext.callSubOrchestrator(name, input, instanceID, options, returnType); + return this.innerContext.callSubOrchestrator(name, input, instanceID, taskOptions, returnType); } /** @@ -251,4 +257,21 @@ public SagaContext getSagaContext() { return new DefaultSagaContext(this.saga, this); } + + private static TaskOptions toTaskOptions(WorkflowTaskOptions options) { + if (options == null) { + return null; + } + + WorkflowTaskRetryPolicy workflowTaskRetryPolicy = options.getRetryPolicy(); + RetryPolicy retryPolicy = new RetryPolicy( + workflowTaskRetryPolicy.getMaxNumberOfAttempts(), + workflowTaskRetryPolicy.getFirstRetryInterval() + ); + + retryPolicy.setBackoffCoefficient(workflowTaskRetryPolicy.getBackoffCoefficient()); + retryPolicy.setRetryTimeout(workflowTaskRetryPolicy.getRetryTimeout()); + + return new TaskOptions(retryPolicy); + } } diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/saga/CompensationInformation.java b/sdk-workflows/src/main/java/io/dapr/workflows/saga/CompensationInformation.java index 3e0cb7d413..33a2f741d1 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/saga/CompensationInformation.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/saga/CompensationInformation.java @@ -13,7 +13,7 @@ package io.dapr.workflows.saga; -import com.microsoft.durabletask.TaskOptions; +import io.dapr.workflows.WorkflowTaskOptions; /** * Information for a compensation activity. @@ -21,7 +21,7 @@ class CompensationInformation { private final String compensationActivityClassName; private final Object compensationActivityInput; - private final TaskOptions taskOptions; + private final WorkflowTaskOptions options; /** * Constructor for a compensation information. @@ -30,13 +30,13 @@ class CompensationInformation { * compensation. * @param compensationActivityInput Input of the activity to do * compensation. - * @param taskOptions task options to set retry strategy + * @param options Task options to set retry strategy */ public CompensationInformation(String compensationActivityClassName, - Object compensationActivityInput, TaskOptions taskOptions) { + Object compensationActivityInput, WorkflowTaskOptions options) { this.compensationActivityClassName = compensationActivityClassName; this.compensationActivityInput = compensationActivityInput; - this.taskOptions = taskOptions; + this.options = options; } /** @@ -62,7 +62,7 @@ public Object getCompensationActivityInput() { * * @return task options, null if not set */ - public TaskOptions getTaskOptions() { - return taskOptions; + public WorkflowTaskOptions getExecutionOptions() { + return options; } } diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/saga/Saga.java b/sdk-workflows/src/main/java/io/dapr/workflows/saga/Saga.java index 56fc08f2d0..f02da10b47 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/saga/Saga.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/saga/Saga.java @@ -14,28 +14,28 @@ package io.dapr.workflows.saga; import com.microsoft.durabletask.Task; -import com.microsoft.durabletask.TaskOptions; import com.microsoft.durabletask.interruption.ContinueAsNewInterruption; import com.microsoft.durabletask.interruption.OrchestratorBlockedException; import io.dapr.workflows.WorkflowContext; +import io.dapr.workflows.WorkflowTaskOptions; import java.util.ArrayList; import java.util.List; public final class Saga { - private final SagaOption option; + private final SagaOptions options; private final List compensationActivities = new ArrayList<>(); /** * Build up a Saga with its options. * - * @param option Saga option. + * @param options Saga option. */ - public Saga(SagaOption option) { - if (option == null) { + public Saga(SagaOptions options) { + if (options == null) { throw new IllegalArgumentException("option is required and should not be null."); } - this.option = option; + this.options = options; } /** @@ -50,16 +50,16 @@ public void registerCompensation(String activityClassName, Object activityInput) /** * Register a compensation activity. - * + * * @param activityClassName name of the activity class * @param activityInput input of the activity to be compensated - * @param taskOptions task options to set retry strategy + * @param options task options to set retry strategy */ - public void registerCompensation(String activityClassName, Object activityInput, TaskOptions taskOptions) { + public void registerCompensation(String activityClassName, Object activityInput, WorkflowTaskOptions options) { if (activityClassName == null || activityClassName.isEmpty()) { throw new IllegalArgumentException("activityClassName is required and should not be null or empty."); } - this.compensationActivities.add(new CompensationInformation(activityClassName, activityInput, taskOptions)); + this.compensationActivities.add(new CompensationInformation(activityClassName, activityInput, options)); } /** @@ -72,7 +72,7 @@ public void compensate(WorkflowContext ctx) { // Special case: when parallel compensation is enabled and there is only one // compensation, we still // compensate sequentially. - if (option.isParallelCompensation() && compensationActivities.size() > 1) { + if (options.isParallelCompensation() && compensationActivities.size() > 1) { compensateInParallel(ctx); } else { compensateSequentially(ctx); @@ -109,7 +109,7 @@ private void compensateSequentially(WorkflowContext ctx) { sagaException.addSuppressed(e); } - if (!option.isContinueWithError()) { + if (!options.isContinueWithError()) { throw sagaException; } } @@ -124,6 +124,6 @@ private Task executeCompensateActivity(WorkflowContext ctx, CompensationIn throws SagaCompensationException { String activityClassName = info.getCompensationActivityClassName(); return ctx.callActivity(activityClassName, info.getCompensationActivityInput(), - info.getTaskOptions()); + info.getExecutionOptions()); } } diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/saga/SagaOption.java b/sdk-workflows/src/main/java/io/dapr/workflows/saga/SagaOptions.java similarity index 91% rename from sdk-workflows/src/main/java/io/dapr/workflows/saga/SagaOption.java rename to sdk-workflows/src/main/java/io/dapr/workflows/saga/SagaOptions.java index f3c082f58c..8a7184b6df 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/saga/SagaOption.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/saga/SagaOptions.java @@ -16,12 +16,12 @@ /** * Saga option. */ -public final class SagaOption { +public final class SagaOptions { private final boolean parallelCompensation; private final int maxParallelThread; private final boolean continueWithError; - private SagaOption(boolean parallelCompensation, int maxParallelThread, boolean continueWithError) { + private SagaOptions(boolean parallelCompensation, int maxParallelThread, boolean continueWithError) { this.parallelCompensation = parallelCompensation; this.maxParallelThread = maxParallelThread; this.continueWithError = continueWithError; @@ -95,8 +95,8 @@ public Builder setContinueWithError(boolean continueWithError) { * Build Saga option. * @return Saga option */ - public SagaOption build() { - return new SagaOption(this.parallelCompensation, this.maxParallelThread, this.continueWithError); + public SagaOptions build() { + return new SagaOptions(this.parallelCompensation, this.maxParallelThread, this.continueWithError); } } } diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java index abbbae491e..32af9fc6f2 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java @@ -14,7 +14,6 @@ package io.dapr.workflows; import com.microsoft.durabletask.CompositeTaskFailedException; -import com.microsoft.durabletask.RetryPolicy; import com.microsoft.durabletask.Task; import com.microsoft.durabletask.TaskCanceledException; import com.microsoft.durabletask.TaskOptions; @@ -27,6 +26,7 @@ import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; import org.slf4j.Logger; import java.time.Duration; @@ -35,9 +35,10 @@ import java.util.Arrays; import java.util.List; -import static org.junit.Assert.assertNotNull; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -86,17 +87,17 @@ public Task waitForExternalEvent(String name, Duration timeout, Class } @Override - public Task waitForExternalEvent(String name, Duration timeout) throws TaskCanceledException { + public Task waitForExternalEvent(String name, Duration timeout) throws TaskCanceledException { return null; } @Override - public Task waitForExternalEvent(String name) throws TaskCanceledException { + public Task waitForExternalEvent(String name) throws TaskCanceledException { return null; } @Override - public Task callActivity(String name, Object input, TaskOptions options, Class returnType) { + public Task callActivity(String name, Object input, WorkflowTaskOptions options, Class returnType) { return null; } @@ -127,7 +128,7 @@ public V getInput(Class targetType) { @Override public Task callChildWorkflow(String name, @Nullable Object input, @Nullable String instanceID, - @Nullable TaskOptions options, Class returnType) { + @Nullable WorkflowTaskOptions options, Class returnType) { return null; } @@ -190,15 +191,12 @@ public void callActivityTest() { @Test public void DaprWorkflowContextWithEmptyInnerContext() { - assertThrows(IllegalArgumentException.class, () -> { - context = new DefaultWorkflowContext(mockInnerContext, (Logger)null); - }); } + assertThrows(IllegalArgumentException.class, () -> + context = new DefaultWorkflowContext(mockInnerContext, (Logger)null)); } @Test public void DaprWorkflowContextWithEmptyLogger() { - assertThrows(IllegalArgumentException.class, () -> { - context = new DefaultWorkflowContext(null, (Logger)null); - }); + assertThrows(IllegalArgumentException.class, () -> context = new DefaultWorkflowContext(null, (Logger)null)); } @Test @@ -291,11 +289,28 @@ public void callChildWorkflowWithOptions() { String expectedName = "TestActivity"; String expectedInput = "TestInput"; String expectedInstanceId = "TestInstanceId"; - TaskOptions expectedOptions = new TaskOptions(new RetryPolicy(1, Duration.ofSeconds(10))); - - context.callChildWorkflow(expectedName, expectedInput, expectedInstanceId, expectedOptions, String.class); - verify(mockInnerContext, times(1)).callSubOrchestrator(expectedName, expectedInput, expectedInstanceId, - expectedOptions, String.class); + WorkflowTaskRetryPolicy retryPolicy = WorkflowTaskRetryPolicy.newBuilder() + .setMaxNumberOfAttempts(1) + .setFirstRetryInterval(Duration.ofSeconds(10)) + .build(); + WorkflowTaskOptions executionOptions = new WorkflowTaskOptions(retryPolicy); + ArgumentCaptor captor = ArgumentCaptor.forClass(TaskOptions.class); + + context.callChildWorkflow(expectedName, expectedInput, expectedInstanceId, executionOptions, String.class); + + verify(mockInnerContext, times(1)) + .callSubOrchestrator( + eq(expectedName), + eq(expectedInput), + eq(expectedInstanceId), + captor.capture(), + eq(String.class) + ); + + TaskOptions taskOptions = captor.getValue(); + + assertEquals(retryPolicy.getMaxNumberOfAttempts(), taskOptions.getRetryPolicy().getMaxNumberOfAttempts()); + assertEquals(retryPolicy.getFirstRetryInterval(), taskOptions.getRetryPolicy().getFirstRetryInterval()); } @Test @@ -326,14 +341,12 @@ public void getSagaContextTest_sagaEnabled() { WorkflowContext context = new DefaultWorkflowContext(mockInnerContext, saga); SagaContext sagaContext = context.getSagaContext(); - assertNotNull("SagaContext should not be null", sagaContext); + assertNotNull(sagaContext, "SagaContext should not be null"); } @Test public void getSagaContextTest_sagaDisabled() { WorkflowContext context = new DefaultWorkflowContext(mockInnerContext); - assertThrows(UnsupportedOperationException.class, () -> { - context.getSagaContext(); - }); + assertThrows(UnsupportedOperationException.class, context::getSagaContext); } } diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/WorkflowTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/WorkflowTest.java index 439827d3c1..f319709eca 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/WorkflowTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/WorkflowTest.java @@ -22,7 +22,7 @@ import io.dapr.workflows.saga.SagaCompensationException; import io.dapr.workflows.saga.SagaContext; -import io.dapr.workflows.saga.SagaOption; +import io.dapr.workflows.saga.SagaOptions; public class WorkflowTest { @@ -188,8 +188,8 @@ public WorkflowStub create() { } @Override - public SagaOption getSagaOption() { - return SagaOption.newBuilder() + public SagaOptions getSagaOption() { + return SagaOptions.newBuilder() .setParallelCompensation(false) .build(); } diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/client/DaprWorkflowClientTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/client/DaprWorkflowClientTest.java index cf366e6b8d..fdb0534181 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/client/DaprWorkflowClientTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/client/DaprWorkflowClientTest.java @@ -111,13 +111,13 @@ public void scheduleNewWorkflowWithArgsNameInputInstance() { public void scheduleNewWorkflowWithNewWorkflowOption() { String expectedName = TestWorkflow.class.getCanonicalName(); Object expectedInput = new Object(); - NewWorkflowOption newWorkflowOption = new NewWorkflowOption(); - newWorkflowOption.setInput(expectedInput).setStartTime(Instant.now()); + NewWorkflowOptions newWorkflowOptions = new NewWorkflowOptions(); + newWorkflowOptions.setInput(expectedInput).setStartTime(Instant.now()); - client.scheduleNewWorkflow(TestWorkflow.class, newWorkflowOption); + client.scheduleNewWorkflow(TestWorkflow.class, newWorkflowOptions); verify(mockInnerClient, times(1)) - .scheduleNewOrchestrationInstance(expectedName, newWorkflowOption.getNewOrchestrationInstanceOptions()); + .scheduleNewOrchestrationInstance(expectedName, newWorkflowOptions.getNewOrchestrationInstanceOptions()); } @Test diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/client/NewWorkflowOptionTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/client/NewWorkflowOptionsTest.java similarity index 87% rename from sdk-workflows/src/test/java/io/dapr/workflows/client/NewWorkflowOptionTest.java rename to sdk-workflows/src/test/java/io/dapr/workflows/client/NewWorkflowOptionsTest.java index 78feb84f26..e1d10c68ce 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/client/NewWorkflowOptionTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/client/NewWorkflowOptionsTest.java @@ -5,11 +5,11 @@ import java.time.Instant; -public class NewWorkflowOptionTest { +public class NewWorkflowOptionsTest { @Test void testNewWorkflowOption() { - NewWorkflowOption workflowOption = new NewWorkflowOption(); + NewWorkflowOptions workflowOption = new NewWorkflowOptions(); String version = "v1"; String instanceId = "123"; Object input = new Object(); diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/saga/SagaIntegrationTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/saga/SagaIntegrationTest.java index 99342bbde1..0838aa1a3d 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/saga/SagaIntegrationTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/saga/SagaIntegrationTest.java @@ -51,7 +51,7 @@ public void testSaga_compensateInParallel() { } private boolean doExecuteWorkflowWithSaga(boolean parallelCompensation) { - SagaOption config = SagaOption.newBuilder() + SagaOptions config = SagaOptions.newBuilder() .setParallelCompensation(parallelCompensation) .setContinueWithError(true).build(); Saga saga = new Saga(config); diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/saga/SagaOptionTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/saga/SagaOptionsTest.java similarity index 78% rename from sdk-workflows/src/test/java/io/dapr/workflows/saga/SagaOptionTest.java rename to sdk-workflows/src/test/java/io/dapr/workflows/saga/SagaOptionsTest.java index 996f199dce..76c5388138 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/saga/SagaOptionTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/saga/SagaOptionsTest.java @@ -5,15 +5,15 @@ import org.junit.Test; -public class SagaOptionTest { +public class SagaOptionsTest { @Test public void testBuild() { - SagaOption.Builder builder = SagaOption.newBuilder(); + SagaOptions.Builder builder = SagaOptions.newBuilder(); builder.setParallelCompensation(true); builder.setMaxParallelThread(32); builder.setContinueWithError(false); - SagaOption option = builder.build(); + SagaOptions option = builder.build(); assertEquals(true, option.isParallelCompensation()); assertEquals(32, option.getMaxParallelThread()); @@ -22,8 +22,8 @@ public void testBuild() { @Test public void testBuild_default() { - SagaOption.Builder builder = SagaOption.newBuilder(); - SagaOption option = builder.build(); + SagaOptions.Builder builder = SagaOptions.newBuilder(); + SagaOptions option = builder.build(); assertEquals(false, option.isParallelCompensation()); assertEquals(16, option.getMaxParallelThread()); @@ -32,7 +32,7 @@ public void testBuild_default() { @Test public void testsetMaxParallelThread() { - SagaOption.Builder builder = SagaOption.newBuilder(); + SagaOptions.Builder builder = SagaOptions.newBuilder(); assertThrows(IllegalArgumentException.class, () -> { builder.setMaxParallelThread(0); diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/saga/SagaTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/saga/SagaTest.java index 72df912025..8afa2eb10f 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/saga/SagaTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/saga/SagaTest.java @@ -33,13 +33,13 @@ import java.util.concurrent.TimeUnit; import io.dapr.workflows.WorkflowActivityContext; +import io.dapr.workflows.WorkflowTaskOptions; import org.junit.Test; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import com.microsoft.durabletask.Task; -import com.microsoft.durabletask.TaskOptions; import io.dapr.workflows.WorkflowContext; import io.dapr.workflows.WorkflowActivity; @@ -48,7 +48,7 @@ public class SagaTest { public static WorkflowContext createMockContext() { WorkflowContext workflowContext = mock(WorkflowContext.class); - when(workflowContext.callActivity(anyString(), any(), eq((TaskOptions) null))).thenAnswer(new ActivityAnswer()); + when(workflowContext.callActivity(anyString(), any(), eq((WorkflowTaskOptions) null))).thenAnswer(new ActivityAnswer()); when(workflowContext.allOf(anyList())).thenAnswer(new AllActivityAnswer()); return workflowContext; @@ -63,7 +63,7 @@ public void testSaga_IllegalArgument() { @Test public void testregisterCompensation() { - SagaOption config = SagaOption.newBuilder() + SagaOptions config = SagaOptions.newBuilder() .setParallelCompensation(false) .setContinueWithError(true).build(); Saga saga = new Saga(config); @@ -73,7 +73,7 @@ public void testregisterCompensation() { @Test public void testregisterCompensation_IllegalArgument() { - SagaOption config = SagaOption.newBuilder() + SagaOptions config = SagaOptions.newBuilder() .setParallelCompensation(false) .setContinueWithError(true).build(); Saga saga = new Saga(config); @@ -88,43 +88,43 @@ public void testregisterCompensation_IllegalArgument() { @Test public void testCompensateInParallel() { - MockCompentationActivity.compensateOrder.clear(); + MockCompensationActivity.compensateOrder.clear(); - SagaOption config = SagaOption.newBuilder() + SagaOptions config = SagaOptions.newBuilder() .setParallelCompensation(true).build(); Saga saga = new Saga(config); MockActivityInput input1 = new MockActivityInput(); input1.setOrder(1); - saga.registerCompensation(MockCompentationActivity.class.getName(), input1); + saga.registerCompensation(MockCompensationActivity.class.getName(), input1); MockActivityInput input2 = new MockActivityInput(); input2.setOrder(2); - saga.registerCompensation(MockCompentationActivity.class.getName(), input2); + saga.registerCompensation(MockCompensationActivity.class.getName(), input2); MockActivityInput input3 = new MockActivityInput(); input3.setOrder(3); - saga.registerCompensation(MockCompentationActivity.class.getName(), input3); + saga.registerCompensation(MockCompensationActivity.class.getName(), input3); saga.compensate(createMockContext()); - assertEquals(3, MockCompentationActivity.compensateOrder.size()); + assertEquals(3, MockCompensationActivity.compensateOrder.size()); } @Test public void testCompensateInParallel_exception_1failed() { - MockCompentationActivity.compensateOrder.clear(); + MockCompensationActivity.compensateOrder.clear(); - SagaOption config = SagaOption.newBuilder() + SagaOptions config = SagaOptions.newBuilder() .setParallelCompensation(true).build(); Saga saga = new Saga(config); MockActivityInput input1 = new MockActivityInput(); input1.setOrder(1); - saga.registerCompensation(MockCompentationActivity.class.getName(), input1); + saga.registerCompensation(MockCompensationActivity.class.getName(), input1); MockActivityInput input2 = new MockActivityInput(); input2.setOrder(2); input2.setThrowException(true); - saga.registerCompensation(MockCompentationActivity.class.getName(), input2); + saga.registerCompensation(MockCompensationActivity.class.getName(), input2); MockActivityInput input3 = new MockActivityInput(); input3.setOrder(3); - saga.registerCompensation(MockCompentationActivity.class.getName(), input3); + saga.registerCompensation(MockCompensationActivity.class.getName(), input3); SagaCompensationException exception = assertThrows(SagaCompensationException.class, () -> { saga.compensate(createMockContext()); @@ -132,110 +132,110 @@ public void testCompensateInParallel_exception_1failed() { assertNotNull(exception.getCause()); // 3 compentation activities, 2 succeed, 1 failed assertEquals(0, exception.getSuppressed().length); - assertEquals(2, MockCompentationActivity.compensateOrder.size()); + assertEquals(2, MockCompensationActivity.compensateOrder.size()); } @Test public void testCompensateInParallel_exception_2failed() { - MockCompentationActivity.compensateOrder.clear(); + MockCompensationActivity.compensateOrder.clear(); - SagaOption config = SagaOption.newBuilder() + SagaOptions config = SagaOptions.newBuilder() .setParallelCompensation(true).build(); Saga saga = new Saga(config); MockActivityInput input1 = new MockActivityInput(); input1.setOrder(1); - saga.registerCompensation(MockCompentationActivity.class.getName(), input1); + saga.registerCompensation(MockCompensationActivity.class.getName(), input1); MockActivityInput input2 = new MockActivityInput(); input2.setOrder(2); input2.setThrowException(true); - saga.registerCompensation(MockCompentationActivity.class.getName(), input2); + saga.registerCompensation(MockCompensationActivity.class.getName(), input2); MockActivityInput input3 = new MockActivityInput(); input3.setOrder(3); input3.setThrowException(true); - saga.registerCompensation(MockCompentationActivity.class.getName(), input3); + saga.registerCompensation(MockCompensationActivity.class.getName(), input3); SagaCompensationException exception = assertThrows(SagaCompensationException.class, () -> { saga.compensate(createMockContext()); }); assertNotNull(exception.getCause()); // 3 compentation activities, 1 succeed, 2 failed - assertEquals(1, MockCompentationActivity.compensateOrder.size()); + assertEquals(1, MockCompensationActivity.compensateOrder.size()); } @Test public void testCompensateInParallel_exception_3failed() { - MockCompentationActivity.compensateOrder.clear(); + MockCompensationActivity.compensateOrder.clear(); - SagaOption config = SagaOption.newBuilder() + SagaOptions config = SagaOptions.newBuilder() .setParallelCompensation(true).build(); Saga saga = new Saga(config); MockActivityInput input1 = new MockActivityInput(); input1.setOrder(1); input1.setThrowException(true); - saga.registerCompensation(MockCompentationActivity.class.getName(), input1); + saga.registerCompensation(MockCompensationActivity.class.getName(), input1); MockActivityInput input2 = new MockActivityInput(); input2.setOrder(2); input2.setThrowException(true); - saga.registerCompensation(MockCompentationActivity.class.getName(), input2); + saga.registerCompensation(MockCompensationActivity.class.getName(), input2); MockActivityInput input3 = new MockActivityInput(); input3.setOrder(3); input3.setThrowException(true); - saga.registerCompensation(MockCompentationActivity.class.getName(), input3); + saga.registerCompensation(MockCompensationActivity.class.getName(), input3); SagaCompensationException exception = assertThrows(SagaCompensationException.class, () -> { saga.compensate(createMockContext()); }); assertNotNull(exception.getCause()); // 3 compentation activities, 0 succeed, 3 failed - assertEquals(0, MockCompentationActivity.compensateOrder.size()); + assertEquals(0, MockCompensationActivity.compensateOrder.size()); } @Test public void testCompensateSequentially() { - MockCompentationActivity.compensateOrder.clear(); + MockCompensationActivity.compensateOrder.clear(); - SagaOption config = SagaOption.newBuilder() + SagaOptions config = SagaOptions.newBuilder() .setParallelCompensation(false).build(); Saga saga = new Saga(config); MockActivityInput input1 = new MockActivityInput(); input1.setOrder(1); - saga.registerCompensation(MockCompentationActivity.class.getName(), input1); + saga.registerCompensation(MockCompensationActivity.class.getName(), input1); MockActivityInput input2 = new MockActivityInput(); input2.setOrder(2); - saga.registerCompensation(MockCompentationActivity.class.getName(), input2); + saga.registerCompensation(MockCompensationActivity.class.getName(), input2); MockActivityInput input3 = new MockActivityInput(); input3.setOrder(3); - saga.registerCompensation(MockCompentationActivity.class.getName(), input3); + saga.registerCompensation(MockCompensationActivity.class.getName(), input3); saga.compensate(createMockContext()); - assertEquals(3, MockCompentationActivity.compensateOrder.size()); + assertEquals(3, MockCompensationActivity.compensateOrder.size()); // the order should be 3 / 2 / 1 - assertEquals(Integer.valueOf(3), MockCompentationActivity.compensateOrder.get(0)); - assertEquals(Integer.valueOf(2), MockCompentationActivity.compensateOrder.get(1)); - assertEquals(Integer.valueOf(1), MockCompentationActivity.compensateOrder.get(2)); + assertEquals(Integer.valueOf(3), MockCompensationActivity.compensateOrder.get(0)); + assertEquals(Integer.valueOf(2), MockCompensationActivity.compensateOrder.get(1)); + assertEquals(Integer.valueOf(1), MockCompensationActivity.compensateOrder.get(2)); } @Test public void testCompensateSequentially_continueWithError() { - MockCompentationActivity.compensateOrder.clear(); + MockCompensationActivity.compensateOrder.clear(); - SagaOption config = SagaOption.newBuilder() + SagaOptions config = SagaOptions.newBuilder() .setParallelCompensation(false) .setContinueWithError(true) .build(); Saga saga = new Saga(config); MockActivityInput input1 = new MockActivityInput(); input1.setOrder(1); - saga.registerCompensation(MockCompentationActivity.class.getName(), input1); + saga.registerCompensation(MockCompensationActivity.class.getName(), input1); MockActivityInput input2 = new MockActivityInput(); input2.setOrder(2); input2.setThrowException(true); - saga.registerCompensation(MockCompentationActivity.class.getName(), input2); + saga.registerCompensation(MockCompensationActivity.class.getName(), input2); MockActivityInput input3 = new MockActivityInput(); input3.setOrder(3); - saga.registerCompensation(MockCompentationActivity.class.getName(), input3); + saga.registerCompensation(MockCompensationActivity.class.getName(), input3); SagaCompensationException exception = assertThrows(SagaCompensationException.class, () -> { saga.compensate(createMockContext()); @@ -244,32 +244,32 @@ public void testCompensateSequentially_continueWithError() { assertEquals(0, exception.getSuppressed().length); // 3 compentation activities, 2 succeed, 1 failed - assertEquals(2, MockCompentationActivity.compensateOrder.size()); + assertEquals(2, MockCompensationActivity.compensateOrder.size()); // the order should be 3 / 1 - assertEquals(Integer.valueOf(3), MockCompentationActivity.compensateOrder.get(0)); - assertEquals(Integer.valueOf(1), MockCompentationActivity.compensateOrder.get(1)); + assertEquals(Integer.valueOf(3), MockCompensationActivity.compensateOrder.get(0)); + assertEquals(Integer.valueOf(1), MockCompensationActivity.compensateOrder.get(1)); } @Test public void testCompensateSequentially_continueWithError_suppressed() { - MockCompentationActivity.compensateOrder.clear(); + MockCompensationActivity.compensateOrder.clear(); - SagaOption config = SagaOption.newBuilder() + SagaOptions config = SagaOptions.newBuilder() .setParallelCompensation(false) .setContinueWithError(true) .build(); Saga saga = new Saga(config); MockActivityInput input1 = new MockActivityInput(); input1.setOrder(1); - saga.registerCompensation(MockCompentationActivity.class.getName(), input1); + saga.registerCompensation(MockCompensationActivity.class.getName(), input1); MockActivityInput input2 = new MockActivityInput(); input2.setOrder(2); input2.setThrowException(true); - saga.registerCompensation(MockCompentationActivity.class.getName(), input2); + saga.registerCompensation(MockCompensationActivity.class.getName(), input2); MockActivityInput input3 = new MockActivityInput(); input3.setOrder(3); input3.setThrowException(true); - saga.registerCompensation(MockCompentationActivity.class.getName(), input3); + saga.registerCompensation(MockCompensationActivity.class.getName(), input3); SagaCompensationException exception = assertThrows(SagaCompensationException.class, () -> { saga.compensate(createMockContext()); @@ -278,30 +278,30 @@ public void testCompensateSequentially_continueWithError_suppressed() { assertEquals(1, exception.getSuppressed().length); // 3 compentation activities, 1 succeed, 2 failed - assertEquals(1, MockCompentationActivity.compensateOrder.size()); + assertEquals(1, MockCompensationActivity.compensateOrder.size()); // the order should be 3 / 1 - assertEquals(Integer.valueOf(1), MockCompentationActivity.compensateOrder.get(0)); + assertEquals(Integer.valueOf(1), MockCompensationActivity.compensateOrder.get(0)); } @Test public void testCompensateSequentially_notContinueWithError() { - MockCompentationActivity.compensateOrder.clear(); + MockCompensationActivity.compensateOrder.clear(); - SagaOption config = SagaOption.newBuilder() + SagaOptions config = SagaOptions.newBuilder() .setParallelCompensation(false) .setContinueWithError(false) .build(); Saga saga = new Saga(config); MockActivityInput input1 = new MockActivityInput(); input1.setOrder(1); - saga.registerCompensation(MockCompentationActivity.class.getName(), input1); + saga.registerCompensation(MockCompensationActivity.class.getName(), input1); MockActivityInput input2 = new MockActivityInput(); input2.setOrder(2); input2.setThrowException(true); - saga.registerCompensation(MockCompentationActivity.class.getName(), input2); + saga.registerCompensation(MockCompensationActivity.class.getName(), input2); MockActivityInput input3 = new MockActivityInput(); input3.setOrder(3); - saga.registerCompensation(MockCompentationActivity.class.getName(), input3); + saga.registerCompensation(MockCompensationActivity.class.getName(), input3); SagaCompensationException exception = assertThrows(SagaCompensationException.class, () -> { saga.compensate(createMockContext()); @@ -310,9 +310,9 @@ public void testCompensateSequentially_notContinueWithError() { assertEquals(0, exception.getSuppressed().length); // 3 compentation activities, 1 succeed, 1 failed and not continue - assertEquals(1, MockCompentationActivity.compensateOrder.size()); + assertEquals(1, MockCompensationActivity.compensateOrder.size()); // the order should be 3 / 1 - assertEquals(Integer.valueOf(3), MockCompentationActivity.compensateOrder.get(0)); + assertEquals(Integer.valueOf(3), MockCompensationActivity.compensateOrder.get(0)); } public static class MockActivity implements WorkflowActivity { @@ -325,9 +325,9 @@ public Object run(WorkflowActivityContext ctx) { } } - public static class MockCompentationActivity implements WorkflowActivity { + public static class MockCompensationActivity implements WorkflowActivity { - private static List compensateOrder = Collections.synchronizedList(new ArrayList<>()); + private static final List compensateOrder = Collections.synchronizedList(new ArrayList<>()); @Override public Object run(WorkflowActivityContext ctx) {