Skip to content

Commit

Permalink
Add replay test case for problematic continue-as-new case
Browse files Browse the repository at this point in the history
  • Loading branch information
taylanisikdemir committed Nov 16, 2023
1 parent b26326f commit ff4f4d1
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 0 deletions.
90 changes: 90 additions & 0 deletions test/replaytests/continue_as_new.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
[
{
"eventId": 1,
"timestamp": 1699856700704442400,
"eventType": "WorkflowExecutionStarted",
"version": 4,
"taskId": 882931375,
"workflowExecutionStartedEventAttributes": {
"workflowType": {
"name": "fx.SimpleSignalWorkflow"
},
"taskList": {
"name": "fx-worker"
},
"executionStartToCloseTimeoutSeconds": 600,
"taskStartToCloseTimeoutSeconds": 10,
"continuedExecutionRunId": "a664f402-bfe9-4739-945c-9cbc637548f1",
"initiator": "CronSchedule",
"continuedFailureReason": "cadenceInternal:Timeout START_TO_CLOSE",
"originalExecutionRunId": "d0baf930-6a83-4740-b773-71aaa696eed1",
"firstExecutionRunId": "e85fa1b9-8899-40ce-8af9-7e0f93ed7ae5",
"firstScheduleTimeNano": "2023-05-22T15:45:26.535595761-07:00",
"cronSchedule": "* * * * *",
"firstDecisionTaskBackoffSeconds": 60,
"PartitionConfig": {
"isolation-group": "dca11"
}
}
},
{
"eventId": 2,
"timestamp": 1699856760713586608,
"eventType": "DecisionTaskScheduled",
"version": 4,
"taskId": 882931383,
"decisionTaskScheduledEventAttributes": {
"taskList": {
"name": "fx-worker"
},
"startToCloseTimeoutSeconds": 10
}
},
{
"eventId": 3,
"timestamp": 1699856760741837021,
"eventType": "DecisionTaskStarted",
"version": 4,
"taskId": 882931387,
"decisionTaskStartedEventAttributes": {
"scheduledEventId": 2,
"identity": "202@dca50-7q@fx-worker@db443597-5124-483a-b1a5-4b1ff35a0ed4",
"requestId": "bb0ee926-13d1-4af4-9f9c-51433333ad04"
}
},
{
"eventId": 4,
"timestamp": 1699856760773459755,
"eventType": "DecisionTaskCompleted",
"version": 4,
"taskId": 882931391,
"decisionTaskCompletedEventAttributes": {
"scheduledEventId": 2,
"startedEventId": 3,
"identity": "202@dca50-7q@fx-worker@db443597-5124-483a-b1a5-4b1ff35a0ed4",
"binaryChecksum": "uDeploy:dc3e318b30a49e8bb88f462a50fe3a01dd210a3a"
}
},
{
"eventId": 5,
"timestamp": 1699857360713649962,
"eventType": "WorkflowExecutionContinuedAsNew",
"version": 4,
"taskId": 882931394,
"workflowExecutionContinuedAsNewEventAttributes": {
"newExecutionRunId": "06c2468c-2d2d-44f7-ac7a-ff3c383f6e90",
"workflowType": {
"name": "fx.SimpleSignalWorkflow"
},
"taskList": {
"name": "fx-worker"
},
"executionStartToCloseTimeoutSeconds": 600,
"taskStartToCloseTimeoutSeconds": 10,
"decisionTaskCompletedEventId": -23,
"backoffStartIntervalInSeconds": 60,
"initiator": "CronSchedule",
"failureReason": "cadenceInternal:Timeout START_TO_CLOSE"
}
}
]
26 changes: 26 additions & 0 deletions test/replaytests/continue_as_new_wf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package replaytests

import (
"go.uber.org/cadence/workflow"
"go.uber.org/zap"
)

// ContinueAsNewWorkflow is a sample Cadence workflows that can receive a signal
func ContinueAsNewWorkflow(ctx workflow.Context) error {
selector := workflow.NewSelector(ctx)
var signalResult string
signalName := "helloWorldSignal"
for {
signalChan := workflow.GetSignalChannel(ctx, signalName)
selector.AddReceive(signalChan, func(c workflow.Channel, more bool) {
c.Receive(ctx, &signalResult)
workflow.GetLogger(ctx).Info("Received age signalResult from signal!", zap.String("signal", signalName), zap.String("value", signalResult))
})
workflow.GetLogger(ctx).Info("Waiting for signal on channel.. " + signalName)
// Wait for signal
selector.Select(ctx)
if signalResult == "kill" {
return nil
}
}
}
10 changes: 10 additions & 0 deletions test/replaytests/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,3 +181,13 @@ func TestSimpleParallelWorkflowWithMissingActivityCall(t *testing.T) {
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "branch2.json")
assert.ErrorContains(t, err, "nondeterministic workflow: missing replay decision")
}

// Runs a history which ends with WorkflowExecutionContinuedAsNew. Replay fails because of the additional checks done
// for continue as new case by replayWorkflowHistory().
// This should not have any error because it's a valid continue as new case.
func TestContinueAsNew(t *testing.T) {
replayer := worker.NewWorkflowReplayer()
replayer.RegisterWorkflowWithOptions(ContinueAsNewWorkflow, workflow.RegisterOptions{Name: "fx.SimpleSignalWorkflow"})
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "continue_as_new.json")
assert.ErrorContains(t, err, "replay workflow doesn't return the same result as the last event")
}

0 comments on commit ff4f4d1

Please sign in to comment.