Skip to content

Commit

Permalink
Adding WorkflowTaskOptions and use it instead of TaskOptions (dapr#1200)
Browse files Browse the repository at this point in the history
  • Loading branch information
artur-ciocanu authored Jan 30, 2025
1 parent be0e56b commit be5530f
Show file tree
Hide file tree
Showing 19 changed files with 410 additions and 168 deletions.
4 changes: 2 additions & 2 deletions sdk-workflows/src/main/java/io/dapr/workflows/Workflow.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import com.microsoft.durabletask.interruption.ContinueAsNewInterruption;
import com.microsoft.durabletask.interruption.OrchestratorBlockedException;
import io.dapr.workflows.saga.SagaCompensationException;
import io.dapr.workflows.saga.SagaOption;
import io.dapr.workflows.saga.SagaOptions;

/**
* Common interface for workflow implementations.
Expand Down Expand Up @@ -74,7 +74,7 @@ default boolean isSagaEnabled() {
*
* @return saga configuration
*/
default SagaOption getSagaOption() {
default SagaOptions getSagaOption() {
// by default, saga is disabled
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
package io.dapr.workflows;

public interface WorkflowActivityContext {

String getName();

<T> T getInput(Class<T> targetType);

}
39 changes: 19 additions & 20 deletions sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.microsoft.durabletask.Task;
import com.microsoft.durabletask.TaskCanceledException;
import com.microsoft.durabletask.TaskFailedException;
import com.microsoft.durabletask.TaskOptions;
import io.dapr.workflows.saga.SagaContext;
import org.slf4j.Logger;

Expand Down Expand Up @@ -153,24 +152,24 @@ default <V> Task<V> waitForExternalEvent(String name, Class<V> dataType) {
* @param <V> the expected type of the activity output
* @return a new {@link Task} that completes when the activity completes or fails
*/
<V> Task<V> callActivity(String name, Object input, TaskOptions options, Class<V> returnType);
<V> Task<V> callActivity(String name, Object input, WorkflowTaskOptions options, Class<V> returnType);

/**
* Asynchronously invokes an activity by name and returns a new {@link Task} that completes when the activity
* completes. See {@link #callActivity(String, Object, TaskOptions, Class)} for a complete description.
* completes. See {@link #callActivity(String, Object, WorkflowTaskOptions, Class)} for a complete description.
*
* @param name the name of the activity to call
* @return a new {@link Task} that completes when the activity completes or fails
* @see #callActivity(String, Object, TaskOptions, Class)
* @see #callActivity(String, Object, WorkflowTaskOptions, Class)
*/
default Task<Void> callActivity(String name) {
return this.callActivity(name, null, null, Void.class);
}

/**
* Asynchronously invokes an activity by name and with the specified input value and returns a new {@link Task}
* that completes when the activity completes. See {@link #callActivity(String, Object, TaskOptions, Class)} for a
* complete description.
* that completes when the activity completes.
* See {@link #callActivity(String, Object, WorkflowTaskOptions, Class)} for a complete description.
*
* @param name the name of the activity to call
* @param input the serializable input to pass to the activity
Expand All @@ -183,7 +182,7 @@ default Task<Void> callActivity(String name, Object input) {
/**
* Asynchronously invokes an activity by name and returns a new {@link Task} that completes when the activity
* completes. If the activity completes successfully, the returned {@code Task}'s value will be the activity's
* output. See {@link #callActivity(String, Object, TaskOptions, Class)} for a complete description.
* output. See {@link #callActivity(String, Object, WorkflowTaskOptions, Class)} for a complete description.
*
* @param name the name of the activity to call
* @param returnType the expected class type of the activity output
Expand All @@ -197,8 +196,8 @@ default <V> Task<V> callActivity(String name, Class<V> returnType) {
/**
* Asynchronously invokes an activity by name and with the specified input value and returns a new {@link Task}
* that completes when the activity completes.If the activity completes successfully, the returned {@code Task}'s
* value will be the activity's output. See {@link #callActivity(String, Object, TaskOptions, Class)} for a
* complete description.
* value will be the activity's output.
* See {@link #callActivity(String, Object, WorkflowTaskOptions, Class)} for a complete description.
*
* @param name the name of the activity to call
* @param input the serializable input to pass to the activity
Expand All @@ -212,15 +211,15 @@ default <V> Task<V> callActivity(String name, Object input, Class<V> returnType)

/**
* Asynchronously invokes an activity by name and with the specified input value and returns a new {@link Task}
* that completes when the activity completes. See {@link #callActivity(String, Object, TaskOptions, Class)} for a
* complete description.
* that completes when the activity completes.
* See {@link #callActivity(String, Object, WorkflowTaskOptions, Class)} for a complete description.
*
* @param name the name of the activity to call
* @param input the serializable input to pass to the activity
* @param options additional options that control the execution and processing of the activity
* @return a new {@link Task} that completes when the activity completes or fails
*/
default Task<Void> callActivity(String name, Object input, TaskOptions options) {
default Task<Void> callActivity(String name, Object input, WorkflowTaskOptions options) {
return this.callActivity(name, input, options, Void.class);
}

Expand Down Expand Up @@ -367,11 +366,11 @@ default Task<Void> createTimer(ZonedDateTime zonedDateTime) {
* Asynchronously invokes another workflow as a child-workflow and returns a {@link Task} that completes
* when the child-workflow completes.
*
* <p>See {@link #callChildWorkflow(String, Object, String, TaskOptions, Class)} for a full description.
* <p>See {@link #callChildWorkflow(String, Object, String, WorkflowTaskOptions, Class)} for a full description.
*
* @param name the name of the workflow to invoke
* @return a new {@link Task} that completes when the child-workflow completes or fails
* @see #callChildWorkflow(String, Object, String, TaskOptions, Class)
* @see #callChildWorkflow(String, Object, String, WorkflowTaskOptions, Class)
*/
default Task<Void> callChildWorkflow(String name) {
return this.callChildWorkflow(name, null);
Expand All @@ -381,7 +380,7 @@ default Task<Void> callChildWorkflow(String name) {
* Asynchronously invokes another workflow as a child-workflow and returns a {@link Task} that completes
* when the child-workflow completes.
*
* <p>See {@link #callChildWorkflow(String, Object, String, TaskOptions, Class)} for a full description.
* <p>See {@link #callChildWorkflow(String, Object, String, WorkflowTaskOptions, Class)} for a full description.
*
* @param name the name of the workflow to invoke
* @param input the serializable input to send to the child-workflow
Expand All @@ -395,7 +394,7 @@ default Task<Void> callChildWorkflow(String name, Object input) {
* Asynchronously invokes another workflow as a child-workflow and returns a {@link Task} that completes
* when the child-workflow completes.
*
* <p>See {@link #callChildWorkflow(String, Object, String, TaskOptions, Class)} for a full description.
* <p>See {@link #callChildWorkflow(String, Object, String, WorkflowTaskOptions, Class)} for a full description.
*
* @param name the name of the workflow to invoke
* @param input the serializable input to send to the child-workflow
Expand All @@ -411,7 +410,7 @@ default <V> Task<V> callChildWorkflow(String name, Object input, Class<V> return
* Asynchronously invokes another workflow as a child-workflow and returns a {@link Task} that completes
* when the child-workflow completes.
*
* <p>See {@link #callChildWorkflow(String, Object, String, TaskOptions, Class)} for a full description.
* <p>See {@link #callChildWorkflow(String, Object, String, WorkflowTaskOptions, Class)} for a full description.
*
* @param name the name of the workflow to invoke
* @param input the serializable input to send to the child-workflow
Expand All @@ -428,15 +427,15 @@ default <V> Task<V> callChildWorkflow(String name, Object input, String instance
* Asynchronously invokes another workflow as a child-workflow and returns a {@link Task} that completes
* when the child-workflow completes.
*
* <p>See {@link #callChildWorkflow(String, Object, String, TaskOptions, Class)} for a full description.
* <p>See {@link #callChildWorkflow(String, Object, String, WorkflowTaskOptions, Class)} for a full description.
*
* @param name the name of the workflow to invoke
* @param input the serializable input to send to the child-workflow
* @param instanceID the unique ID of the child-workflow
* @param options additional options that control the execution and processing of the activity
* @return a new {@link Task} that completes when the child-workflow completes or fails
*/
default Task<Void> callChildWorkflow(String name, Object input, String instanceID, TaskOptions options) {
default Task<Void> callChildWorkflow(String name, Object input, String instanceID, WorkflowTaskOptions options) {
return this.callChildWorkflow(name, input, instanceID, options, Void.class);
}

Expand Down Expand Up @@ -478,7 +477,7 @@ default Task<Void> callChildWorkflow(String name, Object input, String instanceI
<V> Task<V> callChildWorkflow(String name,
@Nullable Object input,
@Nullable String instanceID,
@Nullable TaskOptions options,
@Nullable WorkflowTaskOptions options,
Class<V> returnType);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,7 @@

@FunctionalInterface
public interface WorkflowStub {

void run(WorkflowContext ctx);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.workflows;

public class WorkflowTaskOptions {

private final WorkflowTaskRetryPolicy retryPolicy;

public WorkflowTaskOptions(WorkflowTaskRetryPolicy retryPolicy) {
this.retryPolicy = retryPolicy;
}

public WorkflowTaskRetryPolicy getRetryPolicy() {
return retryPolicy;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.workflows;

import javax.annotation.Nullable;

import java.time.Duration;

public final class WorkflowTaskRetryPolicy {

private final Integer maxNumberOfAttempts;
private final Duration firstRetryInterval;
private final Double backoffCoefficient;
private final Duration maxRetryInterval;
private final Duration retryTimeout;

/**
* Constructor for WorkflowTaskRetryPolicy.
* @param maxNumberOfAttempts Maximum number of attempts to retry the workflow.
* @param firstRetryInterval Interval to wait before the first retry.
* @param backoffCoefficient Coefficient to increase the retry interval.
* @param maxRetryInterval Maximum interval to wait between retries.
* @param retryTimeout Timeout for the whole retry process.
*/
public WorkflowTaskRetryPolicy(
Integer maxNumberOfAttempts,
Duration firstRetryInterval,
Double backoffCoefficient,
Duration maxRetryInterval,
Duration retryTimeout
) {
this.maxNumberOfAttempts = maxNumberOfAttempts;
this.firstRetryInterval = firstRetryInterval;
this.backoffCoefficient = backoffCoefficient;
this.maxRetryInterval = maxRetryInterval;
this.retryTimeout = retryTimeout;
}

public int getMaxNumberOfAttempts() {
return maxNumberOfAttempts;
}

public Duration getFirstRetryInterval() {
return firstRetryInterval;
}

public double getBackoffCoefficient() {
return backoffCoefficient;
}

public Duration getMaxRetryInterval() {
return maxRetryInterval;
}

public Duration getRetryTimeout() {
return retryTimeout;
}

public static Builder newBuilder() {
return new Builder();
}

public static class Builder {

private Integer maxNumberOfAttempts;
private Duration firstRetryInterval;
private Double backoffCoefficient = 1.0;
private Duration maxRetryInterval;
private Duration retryTimeout;

private Builder() {
}

/**
* Build the WorkflowTaskRetryPolicy.
* @return WorkflowTaskRetryPolicy
*/
public WorkflowTaskRetryPolicy build() {
return new WorkflowTaskRetryPolicy(
this.maxNumberOfAttempts,
this.firstRetryInterval,
this.backoffCoefficient,
this.maxRetryInterval,
this.retryTimeout
);
}

/**
* Set the maximum number of attempts to retry the workflow.
* @param maxNumberOfAttempts Maximum number
* @return This builder
*/
public Builder setMaxNumberOfAttempts(int maxNumberOfAttempts) {
if (maxNumberOfAttempts <= 0) {
throw new IllegalArgumentException("The value for maxNumberOfAttempts must be greater than zero.");
}

this.maxNumberOfAttempts = maxNumberOfAttempts;

return this;
}

/**
* Set the interval to wait before the first retry.
* @param firstRetryInterval Interval
* @return This builder
*/
public Builder setFirstRetryInterval(Duration firstRetryInterval) {
if (firstRetryInterval == null) {
throw new IllegalArgumentException("firstRetryInterval cannot be null.");
}
if (firstRetryInterval.isZero() || firstRetryInterval.isNegative()) {
throw new IllegalArgumentException("The value for firstRetryInterval must be greater than zero.");
}

this.firstRetryInterval = firstRetryInterval;

return this;
}

/**
* Set the backoff coefficient.
* @param backoffCoefficient Double value
* @return This builder
*/
public Builder setBackoffCoefficient(double backoffCoefficient) {
if (backoffCoefficient < 1.0) {
throw new IllegalArgumentException("The value for backoffCoefficient must be greater or equal to 1.0.");
}

this.backoffCoefficient = backoffCoefficient;

return this;
}

/**
* Set the maximum interval to wait between retries.
* @param maxRetryInterval Maximum interval
* @return This builder
*/
public Builder setMaxRetryInterval(@Nullable Duration maxRetryInterval) {
if (maxRetryInterval != null && maxRetryInterval.compareTo(this.firstRetryInterval) < 0) {
throw new IllegalArgumentException(
"The value for maxRetryInterval must be greater than or equal to the value for firstRetryInterval.");
}

this.maxRetryInterval = maxRetryInterval;

return this;
}

/**
* Set the maximum retry timeout.
* @param retryTimeout Maximum retry timeout
* @return This builder
*/
public Builder setRetryTimeout(Duration retryTimeout) {
if (retryTimeout != null && retryTimeout.compareTo(this.firstRetryInterval) < 0) {
throw new IllegalArgumentException(
"The value for retryTimeout must be greater than or equal to the value for firstRetryInterval.");
}

this.retryTimeout = retryTimeout;

return this;
}
}

}
Loading

0 comments on commit be5530f

Please sign in to comment.