diff --git a/test/replaytests/branch_workflow.go b/test/replaytests/branch_workflow.go index 380660fa7..99b189763 100644 --- a/test/replaytests/branch_workflow.go +++ b/test/replaytests/branch_workflow.go @@ -72,7 +72,7 @@ func sampleBranchWorkflow2(ctx workflow.Context) error { } ctx = workflow.WithActivityOptions(ctx, ao) - for i := 1; i <= 4; i++ { + for i := 1; i <= 2; i++ { activityInput := fmt.Sprintf("branch %d of 4", i) future := workflow.ExecuteActivity(ctx, sampleActivity, activityInput) futures = append(futures, future) diff --git a/test/replaytests/continue_as_new.json b/test/replaytests/continue_as_new.json new file mode 100644 index 000000000..ef403b847 --- /dev/null +++ b/test/replaytests/continue_as_new.json @@ -0,0 +1,90 @@ +[ + { + "eventId": 1, + "timestamp": 1699856700704442400, + "eventType": "WorkflowExecutionStarted", + "version": 4, + "taskId": 882931375, + "workflowExecutionStartedEventAttributes": { + "workflowType": { + "name": "fx.SimpleSignalWorkflow" + }, + "taskList": { + "name": "fx-worker" + }, + "executionStartToCloseTimeoutSeconds": 600, + "taskStartToCloseTimeoutSeconds": 10, + "continuedExecutionRunId": "a664f402-bfe9-4739-945c-9cbc637548f1", + "initiator": "CronSchedule", + "continuedFailureReason": "cadenceInternal:Timeout START_TO_CLOSE", + "originalExecutionRunId": "d0baf930-6a83-4740-b773-71aaa696eed1", + "firstExecutionRunId": "e85fa1b9-8899-40ce-8af9-7e0f93ed7ae5", + "firstScheduleTimeNano": "2023-05-22T15:45:26.535595761-07:00", + "cronSchedule": "* * * * *", + "firstDecisionTaskBackoffSeconds": 60, + "PartitionConfig": { + "isolation-group": "dca11" + } + } + }, + { + "eventId": 2, + "timestamp": 1699856760713586608, + "eventType": "DecisionTaskScheduled", + "version": 4, + "taskId": 882931383, + "decisionTaskScheduledEventAttributes": { + "taskList": { + "name": "fx-worker" + }, + "startToCloseTimeoutSeconds": 10 + } + }, + { + "eventId": 3, + "timestamp": 1699856760741837021, + "eventType": "DecisionTaskStarted", + "version": 4, + "taskId": 882931387, + "decisionTaskStartedEventAttributes": { + "scheduledEventId": 2, + "identity": "202@dca50-7q@fx-worker@db443597-5124-483a-b1a5-4b1ff35a0ed4", + "requestId": "bb0ee926-13d1-4af4-9f9c-51433333ad04" + } + }, + { + "eventId": 4, + "timestamp": 1699856760773459755, + "eventType": "DecisionTaskCompleted", + "version": 4, + "taskId": 882931391, + "decisionTaskCompletedEventAttributes": { + "scheduledEventId": 2, + "startedEventId": 3, + "identity": "202@dca50-7q@fx-worker@db443597-5124-483a-b1a5-4b1ff35a0ed4", + "binaryChecksum": "uDeploy:dc3e318b30a49e8bb88f462a50fe3a01dd210a3a" + } + }, + { + "eventId": 5, + "timestamp": 1699857360713649962, + "eventType": "WorkflowExecutionContinuedAsNew", + "version": 4, + "taskId": 882931394, + "workflowExecutionContinuedAsNewEventAttributes": { + "newExecutionRunId": "06c2468c-2d2d-44f7-ac7a-ff3c383f6e90", + "workflowType": { + "name": "fx.SimpleSignalWorkflow" + }, + "taskList": { + "name": "fx-worker" + }, + "executionStartToCloseTimeoutSeconds": 600, + "taskStartToCloseTimeoutSeconds": 10, + "decisionTaskCompletedEventId": -23, + "backoffStartIntervalInSeconds": 60, + "initiator": "CronSchedule", + "failureReason": "cadenceInternal:Timeout START_TO_CLOSE" + } + } +] diff --git a/test/replaytests/continue_as_new_wf.go b/test/replaytests/continue_as_new_wf.go new file mode 100644 index 000000000..a09c83b89 --- /dev/null +++ b/test/replaytests/continue_as_new_wf.go @@ -0,0 +1,26 @@ +package replaytests + +import ( + "go.uber.org/cadence/workflow" + "go.uber.org/zap" +) + +// ContinueAsNewWorkflow is a sample Cadence workflows that can receive a signal +func ContinueAsNewWorkflow(ctx workflow.Context) error { + selector := workflow.NewSelector(ctx) + var signalResult string + signalName := "helloWorldSignal" + for { + signalChan := workflow.GetSignalChannel(ctx, signalName) + selector.AddReceive(signalChan, func(c workflow.Channel, more bool) { + c.Receive(ctx, &signalResult) + workflow.GetLogger(ctx).Info("Received age signalResult from signal!", zap.String("signal", signalName), zap.String("value", signalResult)) + }) + workflow.GetLogger(ctx).Info("Waiting for signal on channel.. " + signalName) + // Wait for signal + selector.Select(ctx) + if signalResult == "kill" { + return nil + } + } +} diff --git a/test/replaytests/replay_test.go b/test/replaytests/replay_test.go index 382f1c4d2..5522ac951 100644 --- a/test/replaytests/replay_test.go +++ b/test/replaytests/replay_test.go @@ -151,6 +151,17 @@ func TestBranchWorkflowWithExtraBranch(t *testing.T) { assert.ErrorContains(t, err, "nondeterministic workflow") } +// TestSequentialStepsWorkflow replays a history with 2 sequential activity calls and runs it against new version of the workflow code which only calls 1 activity. +// This should be considered as non-determinism error. +func TestSequentialStepsWorkflow(t *testing.T) { + replayer := worker.NewWorkflowReplayer() + + replayer.RegisterWorkflowWithOptions(replayerHelloWorldWorkflow, workflow.RegisterOptions{Name: "fx.ReplayerHelloWorldWorkflow"}) + replayer.RegisterActivityWithOptions(replayerHelloWorldActivity, activity.RegisterOptions{Name: "replayerhello"}) + err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "sequential.json") + assert.NoError(t, err) +} + func TestParallel(t *testing.T) { replayer := worker.NewWorkflowReplayer() @@ -170,3 +181,13 @@ func TestParallel2(t *testing.T) { err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "branch2.json") require.NoError(t, err) } + +// Runs a history which ends with WorkflowExecutionContinuedAsNew. Replay fails because of the additional checks done +// for continue as new case by replayWorkflowHistory(). +// This should not have any error because it's a valid continue as new case. +func TestContinueAsNew(t *testing.T) { + replayer := worker.NewWorkflowReplayer() + replayer.RegisterWorkflowWithOptions(ContinueAsNewWorkflow, workflow.RegisterOptions{Name: "fx.SimpleSignalWorkflow"}) + err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "continue_as_new.json") + assert.ErrorContains(t, err, "missing replay decision for WorkflowExecutionContinuedAsNew") +} diff --git a/test/replaytests/sequential.json b/test/replaytests/sequential.json new file mode 100644 index 000000000..0c6f787c7 --- /dev/null +++ b/test/replaytests/sequential.json @@ -0,0 +1,231 @@ +[ + { + "eventId": 1, + "timestamp": 1697648630382933224, + "eventType": "WorkflowExecutionStarted", + "taskId": 3145798, + "workflowExecutionStartedEventAttributes": { + "workflowType": { + "name": "fx.ReplayerHelloWorldWorkflow" + }, + "taskList": { + "name": "fx-worker" + }, + "input": "eyJNZXNzYWdlIjoiaGVsbG8gcmVwbGF5ZXIifQ==", + "executionStartToCloseTimeoutSeconds": 60, + "taskStartToCloseTimeoutSeconds": 60, + "originalExecutionRunId": "dadbe958-4159-4762-88e7-6b352b4cccff", + "identity": "cadence-cli@taylan-trial", + "firstExecutionRunId": "dadbe958-4159-4762-88e7-6b352b4cccff", + "firstDecisionTaskBackoffSeconds": 0, + "PartitionConfig": null + } + }, + { + "eventId": 2, + "timestamp": 1697648630382957815, + "eventType": "DecisionTaskScheduled", + "taskId": 3145799, + "decisionTaskScheduledEventAttributes": { + "taskList": { + "name": "fx-worker" + }, + "startToCloseTimeoutSeconds": 60 + } + }, + { + "eventId": 3, + "timestamp": 1697648630401121943, + "eventType": "DecisionTaskStarted", + "taskId": 3145804, + "decisionTaskStartedEventAttributes": { + "scheduledEventId": 2, + "identity": "2126702@taylan-trial@fx-worker@b25ed4e0-0864-41ed-843d-29265e3c6401", + "requestId": "20ca16c3-b13e-4c72-85f9-bd79384d430e" + } + }, + { + "eventId": 4, + "timestamp": 1697648630412706912, + "eventType": "DecisionTaskCompleted", + "taskId": 3145807, + "decisionTaskCompletedEventAttributes": { + "scheduledEventId": 2, + "startedEventId": 3, + "identity": "2126702@taylan-trial@fx-worker@b25ed4e0-0864-41ed-843d-29265e3c6401", + "binaryChecksum": "uDeploy:" + } + }, + { + "eventId": 5, + "timestamp": 1697648630412753262, + "eventType": "ActivityTaskScheduled", + "taskId": 3145808, + "activityTaskScheduledEventAttributes": { + "activityId": "0", + "activityType": { + "name": "replayerhello" + }, + "taskList": { + "name": "fx-worker" + }, + "input": "eyJNZXNzYWdlIjoiaGVsbG8gcmVwbGF5ZXIifQo=", + "scheduleToCloseTimeoutSeconds": 60, + "scheduleToStartTimeoutSeconds": 60, + "startToCloseTimeoutSeconds": 60, + "heartbeatTimeoutSeconds": 0, + "decisionTaskCompletedEventId": 4, + "header": {} + } + }, + { + "eventId": 6, + "timestamp": 1697648630412771832, + "eventType": "ActivityTaskStarted", + "taskId": 3145809, + "activityTaskStartedEventAttributes": { + "scheduledEventId": 5, + "identity": "2126702@taylan-trial@fx-worker@b25ed4e0-0864-41ed-843d-29265e3c6401", + "requestId": "6adca0b2-bd1e-4877-ac44-c3fdf45784c6", + "lastFailureReason": "" + } + }, + { + "eventId": 7, + "timestamp": 1697648630422944301, + "eventType": "ActivityTaskCompleted", + "taskId": 3145812, + "activityTaskCompletedEventAttributes": { + "result": "IkhlbGxvLCBoZWxsbyByZXBsYXllciEiCg==", + "scheduledEventId": 5, + "startedEventId": 6, + "identity": "2126702@taylan-trial@fx-worker@b25ed4e0-0864-41ed-843d-29265e3c6401" + } + }, + { + "eventId": 8, + "timestamp": 1697648630422957221, + "eventType": "DecisionTaskScheduled", + "taskId": 3145814, + "decisionTaskScheduledEventAttributes": { + "taskList": { + "name": "taylan-trial:4037e064-7565-4716-8169-e97eb2449f32" + }, + "startToCloseTimeoutSeconds": 60 + } + }, + { + "eventId": 9, + "timestamp": 1697648630432583300, + "eventType": "DecisionTaskStarted", + "taskId": 3145818, + "decisionTaskStartedEventAttributes": { + "scheduledEventId": 8, + "identity": "2126702@taylan-trial@fx-worker@b25ed4e0-0864-41ed-843d-29265e3c6401", + "requestId": "6eea9a40-e738-4a9b-9de3-30cc30af3597" + } + }, + { + "eventId": 10, + "timestamp": 1697648630442893019, + "eventType": "DecisionTaskCompleted", + "taskId": 3145821, + "decisionTaskCompletedEventAttributes": { + "scheduledEventId": 8, + "startedEventId": 9, + "identity": "2126702@taylan-trial@fx-worker@b25ed4e0-0864-41ed-843d-29265e3c6401", + "binaryChecksum": "uDeploy:" + } + }, + { + "eventId": 11, + "timestamp": 1697648630442935059, + "eventType": "ActivityTaskScheduled", + "taskId": 3145822, + "activityTaskScheduledEventAttributes": { + "activityId": "1", + "activityType": { + "name": "replayerhello" + }, + "taskList": { + "name": "fx-worker" + }, + "input": "eyJNZXNzYWdlIjoiaGVsbG8gcmVwbGF5ZXIifQo=", + "scheduleToCloseTimeoutSeconds": 60, + "scheduleToStartTimeoutSeconds": 60, + "startToCloseTimeoutSeconds": 60, + "heartbeatTimeoutSeconds": 0, + "decisionTaskCompletedEventId": 10, + "header": {} + } + }, + { + "eventId": 12, + "timestamp": 1697648630442953489, + "eventType": "ActivityTaskStarted", + "taskId": 3145823, + "activityTaskStartedEventAttributes": { + "scheduledEventId": 11, + "identity": "2126702@taylan-trial@fx-worker@b25ed4e0-0864-41ed-843d-29265e3c6401", + "requestId": "f44f0ef6-b40a-479f-96e2-3a5093433996", + "lastFailureReason": "" + } + }, + { + "eventId": 13, + "timestamp": 1697648630451389698, + "eventType": "ActivityTaskCompleted", + "taskId": 3145826, + "activityTaskCompletedEventAttributes": { + "result": "IkhlbGxvLCBoZWxsbyByZXBsYXllciEiCg==", + "scheduledEventId": 11, + "startedEventId": 12, + "identity": "2126702@taylan-trial@fx-worker@b25ed4e0-0864-41ed-843d-29265e3c6401" + } + }, + { + "eventId": 14, + "timestamp": 1697648630451401018, + "eventType": "DecisionTaskScheduled", + "taskId": 3145828, + "decisionTaskScheduledEventAttributes": { + "taskList": { + "name": "taylan-trial:4037e064-7565-4716-8169-e97eb2449f32" + }, + "startToCloseTimeoutSeconds": 60 + } + }, + { + "eventId": 15, + "timestamp": 1697648630460950187, + "eventType": "DecisionTaskStarted", + "taskId": 3145832, + "decisionTaskStartedEventAttributes": { + "scheduledEventId": 14, + "identity": "2126702@taylan-trial@fx-worker@b25ed4e0-0864-41ed-843d-29265e3c6401", + "requestId": "51123563-8c56-442a-9583-5f6b2ae46643" + } + }, + { + "eventId": 16, + "timestamp": 1697648630471749886, + "eventType": "DecisionTaskCompleted", + "taskId": 3145835, + "decisionTaskCompletedEventAttributes": { + "scheduledEventId": 14, + "startedEventId": 15, + "identity": "2126702@taylan-trial@fx-worker@b25ed4e0-0864-41ed-843d-29265e3c6401", + "binaryChecksum": "uDeploy:" + } + }, + { + "eventId": 17, + "timestamp": 1697648630471777786, + "eventType": "WorkflowExecutionCompleted", + "taskId": 3145836, + "workflowExecutionCompletedEventAttributes": { + "decisionTaskCompletedEventId": 16 + } + } +] + diff --git a/test/replaytests/sequential_workflow.go b/test/replaytests/sequential_workflow.go new file mode 100644 index 000000000..be1de74f7 --- /dev/null +++ b/test/replaytests/sequential_workflow.go @@ -0,0 +1,76 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package replaytests + +import ( + "context" + "fmt" + "time" + + "go.uber.org/cadence/activity" + "go.uber.org/cadence/workflow" + "go.uber.org/zap" +) + +type replayerSampleMessage struct { + Message string +} + +// replayerHelloWorldWorkflow is a sample workflow that runs replayerHelloWorldActivity. +// In the previous version it was running the activity twice, sequentially. +// History of a past execution is in sequential.json +// Corresponding unit test covers the scenario that new workflow's history records is subset of previous version's history. +// +// v1: wf started -> call activity -> call activity -> wf complete +// v2: wf started -> call activity -> wf complete +// +// The v2 clearly has determinism issues and should be considered as non-determism error for replay tests. +func replayerHelloWorldWorkflow(ctx workflow.Context, inputMsg *replayerSampleMessage) error { + ao := workflow.ActivityOptions{ + ScheduleToStartTimeout: time.Minute, + StartToCloseTimeout: time.Minute, + } + logger := workflow.GetLogger(ctx) + logger.Info("executing replayerHelloWorldWorkflow") + ctx = workflow.WithActivityOptions(ctx, ao) + + count := 1 + for i := 0; i < count; i++ { + var greeting string + err := workflow.ExecuteActivity(ctx, replayerHelloWorldActivity, inputMsg).Get(ctx, &greeting) + if err != nil { + logger.Error("replayerHelloWorldActivity is broken ", zap.Error(err)) + return err + } + + logger.Sugar().Infof("replayerHelloWorldWorkflow is greeting to you %dth time -> ", i, greeting) + } + + return nil +} + +// replayerHelloWorldActivity takes a sample message input and return a greeting to the caller. +func replayerHelloWorldActivity(ctx context.Context, inputMsg *replayerSampleMessage) (string, error) { + logger := activity.GetLogger(ctx) + logger.Info("executing replayerHelloWorldActivity") + + return fmt.Sprintf("Hello, %s!", inputMsg.Message), nil +}