Skip to content

Commit

Permalink
fix callbacks in describe and continue as new
Browse files Browse the repository at this point in the history
  • Loading branch information
rodrigozhou committed Feb 28, 2025
1 parent 35bbf02 commit 86ff638
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import io.grpc.Deadline;
import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes;
import io.temporal.api.common.v1.Callback;
import io.temporal.api.common.v1.Payload;
import io.temporal.api.common.v1.Payloads;
import io.temporal.api.enums.v1.SignalExternalWorkflowExecutionFailedCause;
Expand All @@ -32,6 +33,7 @@
import io.temporal.api.nexus.v1.StartOperationResponse;
import io.temporal.api.taskqueue.v1.StickyExecutionAttributes;
import io.temporal.api.workflowservice.v1.*;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -147,4 +149,6 @@ PollWorkflowExecutionUpdateResponse pollUpdateWorkflowExecution(
boolean isTerminalState();

boolean isRequestIdAttached(String requestId);

List<Callback> getCompletionCallbacks();
}
Original file line number Diff line number Diff line change
Expand Up @@ -1503,6 +1503,7 @@ private void processFailWorkflowExecution(
identity,
getExecutionId(),
workflow.getData().firstExecutionRunId,
this,
parent,
parentChildInitiatedEventId);
return;
Expand Down Expand Up @@ -1635,6 +1636,7 @@ private void startNewCronRun(
identity,
getExecutionId(),
workflow.getData().firstExecutionRunId,
this,
parent,
parentChildInitiatedEventId);
}
Expand Down Expand Up @@ -1692,6 +1694,7 @@ private void processContinueAsNewWorkflowExecution(
identity,
getExecutionId(),
workflow.getData().firstExecutionRunId,
this,
parent,
parentChildInitiatedEventId);
}
Expand Down Expand Up @@ -3153,7 +3156,7 @@ private DescribeWorkflowExecutionResponse describeWorkflowExecutionInsideLock()
.setParentExecution(p.getExecutionId().getExecution()));

List<CallbackInfo> callbacks =
this.startRequest.getCompletionCallbacksList().stream()
this.completionCallbacks.stream()
.map(TestWorkflowMutableStateImpl::constructCallbackInfo)
.collect(Collectors.toList());

Expand Down Expand Up @@ -3612,4 +3615,9 @@ private boolean isTerminalState(State workflowState) {
|| workflowState == State.TERMINATED
|| workflowState == State.CONTINUED_AS_NEW;
}

@Override
public List<Callback> getCompletionCallbacks() {
return completionCallbacks;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1488,6 +1488,7 @@ public String continueAsNew(
String identity,
ExecutionId continuedExecutionId,
String firstExecutionRunId,
TestWorkflowMutableState previousExecutionState,
Optional<TestWorkflowMutableState> parent,
OptionalLong parentChildInitiatedEventId) {
StartWorkflowExecutionRequest.Builder startRequestBuilder =
Expand All @@ -1507,9 +1508,9 @@ public String continueAsNew(
// if (previousRunStartRequest.hasRetryPolicy()) {
// startRequestBuilder.setRetryPolicy(previousRunStartRequest.getRetryPolicy());
// }
if (previousRunStartRequest.getCompletionCallbacksCount() > 0) {
if (!previousExecutionState.getCompletionCallbacks().isEmpty()) {
startRequestBuilder.addAllCompletionCallbacks(
previousRunStartRequest.getCompletionCallbacksList());
previousExecutionState.getCompletionCallbacks());
}
if (ca.hasRetryPolicy()) {
startRequestBuilder.setRetryPolicy(ca.getRetryPolicy());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,20 @@

import static java.util.UUID.randomUUID;

import com.google.protobuf.ByteString;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.temporal.api.common.v1.Callback;
import io.temporal.api.common.v1.Link;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.common.v1.WorkflowType;
import io.temporal.api.enums.v1.EventType;
import io.temporal.api.enums.v1.WorkflowIdConflictPolicy;
import io.temporal.api.history.v1.HistoryEvent;
import io.temporal.api.history.v1.WorkflowExecutionOptionsUpdatedEventAttributes;
import io.temporal.api.taskqueue.v1.TaskQueue;
import io.temporal.api.workflow.v1.OnConflictOptions;
import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionRequest;
import io.temporal.api.workflowservice.v1.StartWorkflowExecutionRequest;
import io.temporal.api.workflowservice.v1.StartWorkflowExecutionResponse;
import io.temporal.client.*;
Expand Down Expand Up @@ -95,7 +100,20 @@ public void conflictPolicyUseExisting() {
StartWorkflowExecutionRequest request2 =
request1.toBuilder()
.setRequestId(newRequestId)
.setOnConflictOptions(OnConflictOptions.newBuilder().setAttachRequestId(true))
.setOnConflictOptions(
OnConflictOptions.newBuilder()
.setAttachRequestId(true)
.setAttachCompletionCallbacks(true)
.setAttachLinks(true))
.addCompletionCallbacks(
Callback.newBuilder()
.setInternal(
Callback.Internal.newBuilder()
.setData(ByteString.copyFromUtf8("some-random-callback-data"))))
.addLinks(
Link.newBuilder()
.setWorkflowEvent(
Link.WorkflowEvent.newBuilder().setWorkflowId("some-random-workflow-id")))
.build();

StartWorkflowExecutionResponse response2 =
Expand Down Expand Up @@ -133,8 +151,25 @@ public void conflictPolicyUseExisting() {
item.getEventType() == EventType.EVENT_TYPE_WORKFLOW_EXECUTION_OPTIONS_UPDATED)
.collect(Collectors.toList());
Assert.assertEquals(1, events.size());
HistoryEvent event = events.get(0);
Assert.assertEquals(
EventType.EVENT_TYPE_WORKFLOW_EXECUTION_OPTIONS_UPDATED, events.get(0).getEventType());
EventType.EVENT_TYPE_WORKFLOW_EXECUTION_OPTIONS_UPDATED, event.getEventType());
WorkflowExecutionOptionsUpdatedEventAttributes attrs =
event.getWorkflowExecutionOptionsUpdatedEventAttributes();
Assert.assertEquals(newRequestId, attrs.getAttachedRequestId());
Assert.assertEquals(1, attrs.getAttachedCompletionCallbacksCount());
Assert.assertEquals(
"some-random-callback-data",
attrs.getAttachedCompletionCallbacks(0).getInternal().getData().toStringUtf8());
Assert.assertEquals(1, event.getLinksCount());
Assert.assertEquals(
"some-random-workflow-id", event.getLinks(0).getWorkflowEvent().getWorkflowId());

DescribeWorkflowAsserter asserter = describe(we);
Assert.assertEquals(1, asserter.getActual().getCallbacksCount());
Assert.assertEquals(
"some-random-callback-data",
asserter.getActual().getCallbacks(0).getCallback().getInternal().getData().toStringUtf8());
}

@Test
Expand Down Expand Up @@ -191,6 +226,27 @@ public void conflictPolicyFail() {
Assert.assertEquals(Status.Code.ALREADY_EXISTS, e.getStatus().getCode());
}

private DescribeWorkflowAsserter describe(WorkflowExecution execution) {
DescribeWorkflowAsserter result =
new DescribeWorkflowAsserter(
testWorkflowRule
.getWorkflowClient()
.getWorkflowServiceStubs()
.blockingStub()
.describeWorkflowExecution(
DescribeWorkflowExecutionRequest.newBuilder()
.setNamespace(
testWorkflowRule.getWorkflowClient().getOptions().getNamespace())
.setExecution(execution)
.build()));

// There are some assertions that we can always make...
return result
.assertExecutionId(execution)
.assertSaneTimestamps()
.assertTaskQueue(testWorkflowRule.getTaskQueue());
}

public static class SignalWorkflowImpl implements TestWorkflows.WorkflowWithSignal {
boolean unblock = false;

Expand Down

0 comments on commit 86ff638

Please sign in to comment.