diff --git a/common/metrics/metric_defs.go b/common/metrics/metric_defs.go index fd811acdf03..27a8c4cce11 100644 --- a/common/metrics/metric_defs.go +++ b/common/metrics/metric_defs.go @@ -533,6 +533,8 @@ const ( BackfillHistoryEventsTaskScope = "BackfillHistoryEventsTask" // VerifyVersionedTransitionTaskScope is the scope used by verify versioned transition task processing VerifyVersionedTransitionTaskScope = "VerifyVersionedTransitionTask" + // SyncVersionedTransitionTaskScope is the scope used by sync versioned transition task processing + SyncVersionedTransitionTaskScope = "SyncVersionedTransitionTask" // SyncWatermarkScope is the scope used by closed workflow task replication processing SyncWatermarkScope = "SyncWatermark" // NoopTaskScope is the scope used by noop task diff --git a/service/history/ndc_standby_task_util.go b/service/history/ndc_standby_task_util.go index 6e6d03021f1..11a1bec9457 100644 --- a/service/history/ndc_standby_task_util.go +++ b/service/history/ndc_standby_task_util.go @@ -29,13 +29,16 @@ import ( "errors" "time" + commonpb "go.temporal.io/api/common/v1" taskqueuepb "go.temporal.io/api/taskqueue/v1" + "go.temporal.io/server/api/adminservice/v1" persistencespb "go.temporal.io/server/api/persistence/v1" taskqueuespb "go.temporal.io/server/api/taskqueue/v1" + "go.temporal.io/server/client" + "go.temporal.io/server/common" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/namespace" - "go.temporal.io/server/common/persistence/versionhistory" "go.temporal.io/server/service/history/consts" "go.temporal.io/server/service/history/tasks" "go.temporal.io/server/service/history/workflow" @@ -97,58 +100,71 @@ func standbyTimerTaskPostActionTaskDiscarded( return consts.ErrTaskDiscarded } -type ( - historyResendInfo struct { - - // used by NDC - lastEventID int64 - lastEventVersion int64 +func isWorkflowExistOnSource( + ctx context.Context, + taskInfo tasks.Task, + logger log.Logger, + currentCluster string, + clientBean client.Bean, + registry namespace.Registry, +) bool { + workflowKey := taskWorkflowKey(taskInfo) + remoteClusterName, err := getRemoteClusterName( + currentCluster, + registry, + workflowKey.GetNamespaceID(), + ) + if err != nil { + return true } + remoteAdminClient, err := clientBean.GetRemoteAdminClient(remoteClusterName) + if err != nil { + return true + } + _, err = remoteAdminClient.DescribeMutableState(ctx, &adminservice.DescribeMutableStateRequest{ + Namespace: workflowKey.GetNamespaceID(), + Execution: &commonpb.WorkflowExecution{ + WorkflowId: workflowKey.GetWorkflowID(), + RunId: workflowKey.GetRunID(), + }, + }) + if err != nil { + if common.IsNotFoundError(err) { + return false + } + logger.Error("Error describe mutable state from remote.", + tag.WorkflowNamespaceID(workflowKey.GetNamespaceID()), + tag.WorkflowID(workflowKey.GetWorkflowID()), + tag.WorkflowRunID(workflowKey.GetRunID()), + tag.ClusterName(remoteClusterName), + tag.Error(err)) + } + return true +} +type ( executionTimerPostActionInfo struct { - *historyResendInfo - currentRunID string } activityTaskPostActionInfo struct { - *historyResendInfo - taskQueue string activityTaskScheduleToStartTimeout time.Duration versionDirective *taskqueuespb.TaskVersionDirective } workflowTaskPostActionInfo struct { - *historyResendInfo - workflowTaskScheduleToStartTimeout time.Duration taskqueue *taskqueuepb.TaskQueue versionDirective *taskqueuespb.TaskVersionDirective } ) -func newHistoryResendInfo( - lastEventID int64, - lastEventVersion int64, -) *historyResendInfo { - return &historyResendInfo{ - lastEventID: lastEventID, - lastEventVersion: lastEventVersion, - } -} - func newExecutionTimerPostActionInfo( mutableState workflow.MutableState, ) (*executionTimerPostActionInfo, error) { - resendInfo, err := getHistoryResendInfo(mutableState) - if err != nil { - return nil, err - } - return &executionTimerPostActionInfo{ - historyResendInfo: resendInfo, - currentRunID: mutableState.GetExecutionState().RunId, + currentRunID: mutableState.GetExecutionState().RunId, }, nil } @@ -156,15 +172,9 @@ func newActivityTaskPostActionInfo( mutableState workflow.MutableState, activityInfo *persistencespb.ActivityInfo, ) (*activityTaskPostActionInfo, error) { - resendInfo, err := getHistoryResendInfo(mutableState) - if err != nil { - return nil, err - } - directive := MakeDirectiveForActivityTask(mutableState, activityInfo) return &activityTaskPostActionInfo{ - historyResendInfo: resendInfo, activityTaskScheduleToStartTimeout: activityInfo.ScheduleToStartTimeout.AsDuration(), versionDirective: directive, }, nil @@ -176,15 +186,9 @@ func newActivityRetryTimePostActionInfo( activityScheduleToStartTimeout time.Duration, activityInfo *persistencespb.ActivityInfo, ) (*activityTaskPostActionInfo, error) { - resendInfo, err := getHistoryResendInfo(mutableState) - if err != nil { - return nil, err - } - directive := MakeDirectiveForActivityTask(mutableState, activityInfo) return &activityTaskPostActionInfo{ - historyResendInfo: resendInfo, taskQueue: taskQueue, activityTaskScheduleToStartTimeout: activityScheduleToStartTimeout, versionDirective: directive, @@ -196,59 +200,30 @@ func newWorkflowTaskPostActionInfo( workflowTaskScheduleToStartTimeout time.Duration, taskqueue *taskqueuepb.TaskQueue, ) (*workflowTaskPostActionInfo, error) { - resendInfo, err := getHistoryResendInfo(mutableState) - if err != nil { - return nil, err - } - directive := MakeDirectiveForWorkflowTask(mutableState) return &workflowTaskPostActionInfo{ - historyResendInfo: resendInfo, workflowTaskScheduleToStartTimeout: workflowTaskScheduleToStartTimeout, taskqueue: taskqueue, versionDirective: directive, }, nil } -func getHistoryResendInfo( - mutableState workflow.MutableState, -) (*historyResendInfo, error) { - - currentBranch, err := versionhistory.GetCurrentVersionHistory(mutableState.GetExecutionInfo().GetVersionHistories()) - if err != nil { - return nil, err - } - lastItem, err := versionhistory.GetLastVersionHistoryItem(currentBranch) - if err != nil { - return nil, err - } - return newHistoryResendInfo(lastItem.GetEventId(), lastItem.GetVersion()), nil -} - func getStandbyPostActionFn( taskInfo tasks.Task, standbyNow standbyCurrentTimeFn, - standbyTaskMissingEventsResendDelay time.Duration, standbyTaskMissingEventsDiscardDelay time.Duration, - fetchHistoryStandbyPostActionFn standbyPostActionFn, discardTaskStandbyPostActionFn standbyPostActionFn, ) standbyPostActionFn { // this is for task retry, use machine time now := standbyNow() taskTime := taskInfo.GetVisibilityTime() - resendTime := taskTime.Add(standbyTaskMissingEventsResendDelay) discardTime := taskTime.Add(standbyTaskMissingEventsDiscardDelay) // now < task start time + StandbyTaskMissingEventsResendDelay - if now.Before(resendTime) { - return standbyTaskPostActionNoOp - } - - // task start time + StandbyTaskMissingEventsResendDelay <= now < task start time + StandbyTaskMissingEventsResendDelay if now.Before(discardTime) { - return fetchHistoryStandbyPostActionFn + return standbyTaskPostActionNoOp } // task start time + StandbyTaskMissingEventsResendDelay <= now diff --git a/service/history/replication/executable_sync_versioned_transition_task.go b/service/history/replication/executable_sync_versioned_transition_task.go index 5fbbd49532f..6057665130f 100644 --- a/service/history/replication/executable_sync_versioned_transition_task.go +++ b/service/history/replication/executable_sync_versioned_transition_task.go @@ -66,7 +66,7 @@ func NewExecutableSyncVersionedTransitionTask( ExecutableTask: NewExecutableTask( processToolBox, taskID, - metrics.VerifyVersionedTransitionTaskScope, + metrics.SyncVersionedTransitionTaskScope, taskCreationTime, time.Now().UTC(), sourceClusterName, @@ -102,7 +102,7 @@ func (e *ExecutableSyncVersionedTransitionTask) Execute() error { ) metrics.ReplicationTasksSkipped.With(e.MetricsHandler).Record( 1, - metrics.OperationTag(metrics.VerifyVersionedTransitionTaskScope), + metrics.OperationTag(metrics.SyncVersionedTransitionTaskScope), metrics.NamespaceTag(namespaceName), ) return nil diff --git a/service/history/timer_queue_factory.go b/service/history/timer_queue_factory.go index 3352c66cf3e..98a98c60fc9 100644 --- a/service/history/timer_queue_factory.go +++ b/service/history/timer_queue_factory.go @@ -25,22 +25,14 @@ package history import ( - "context" - - historypb "go.temporal.io/api/history/v1" - historyspb "go.temporal.io/server/api/history/v1" "go.temporal.io/server/client" - "go.temporal.io/server/common/definition" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" - "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence/visibility/manager" "go.temporal.io/server/common/resource" - "go.temporal.io/server/common/xdc" deletemanager "go.temporal.io/server/service/history/deletemanager" "go.temporal.io/server/service/history/queues" - "go.temporal.io/server/service/history/replication/eventhandler" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tasks" wcache "go.temporal.io/server/service/history/workflow/cache" @@ -146,68 +138,11 @@ func (f *timerQueueFactory) CreateQueue( f.Config, f.MatchingRawClient, ) - eventImporter := eventhandler.NewEventImporter( - f.RemoteHistoryFetcher, - func(ctx context.Context, namespaceID namespace.ID, workflowID string) (shard.Engine, error) { - return shardContext.GetEngine(ctx) - }, - f.Serializer, - logger, - ) - resendHandler := eventhandler.NewResendHandler( - f.NamespaceRegistry, - f.ClientBean, - f.Serializer, - f.ClusterMetadata, - func(ctx context.Context, namespaceID namespace.ID, workflowID string) (shard.Engine, error) { - return shardContext.GetEngine(ctx) - }, - f.RemoteHistoryFetcher, - eventImporter, - logger, - f.Config, - ) standbyExecutor := newTimerQueueStandbyTaskExecutor( shardContext, workflowCache, workflowDeleteManager, - xdc.NewNDCHistoryResender( - shardContext.GetNamespaceRegistry(), - f.ClientBean, - func( - ctx context.Context, - sourceClusterName string, - namespaceId namespace.ID, - workflowId string, - runId string, - events [][]*historypb.HistoryEvent, - versionHistory []*historyspb.VersionHistoryItem, - ) error { - engine, err := shardContext.GetEngine(ctx) - if err != nil { - return err - } - return engine.ReplicateHistoryEvents( - ctx, - definition.WorkflowKey{ - NamespaceID: namespaceId.String(), - WorkflowID: workflowId, - RunID: runId, - }, - nil, - versionHistory, - events, - nil, - "", - ) - }, - shardContext.GetPayloadSerializer(), - f.Config.StandbyTaskReReplicationContextTimeout, - logger, - f.Config, - ), - resendHandler, f.MatchingRawClient, logger, f.MetricsHandler, @@ -217,6 +152,7 @@ func (f *timerQueueFactory) CreateQueue( // we have the option of revert to old behavior currentClusterName, f.Config, + f.ClientBean, ) executor := queues.NewActiveStandbyExecutor( diff --git a/service/history/timer_queue_standby_task_executor.go b/service/history/timer_queue_standby_task_executor.go index e0578d96000..74c7f60b3ed 100644 --- a/service/history/timer_queue_standby_task_executor.go +++ b/service/history/timer_queue_standby_task_executor.go @@ -36,19 +36,17 @@ import ( taskqueuepb "go.temporal.io/api/taskqueue/v1" enumsspb "go.temporal.io/server/api/enums/v1" "go.temporal.io/server/api/matchingservice/v1" + "go.temporal.io/server/client" "go.temporal.io/server/common" "go.temporal.io/server/common/log" - "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/resource" - "go.temporal.io/server/common/xdc" "go.temporal.io/server/service/history/configs" "go.temporal.io/server/service/history/consts" "go.temporal.io/server/service/history/deletemanager" "go.temporal.io/server/service/history/hsm" "go.temporal.io/server/service/history/queues" - "go.temporal.io/server/service/history/replication/eventhandler" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tasks" "go.temporal.io/server/service/history/vclock" @@ -60,10 +58,8 @@ import ( type ( timerQueueStandbyTaskExecutor struct { *timerQueueTaskExecutorBase - - clusterName string - nDCHistoryResender xdc.NDCHistoryResender - resendHandler eventhandler.ResendHandler + clusterName string + clientBean client.Bean } ) @@ -71,13 +67,12 @@ func newTimerQueueStandbyTaskExecutor( shard shard.Context, workflowCache wcache.Cache, workflowDeleteManager deletemanager.DeleteManager, - nDCHistoryResender xdc.NDCHistoryResender, - resendHandler eventhandler.ResendHandler, matchingRawClient resource.MatchingRawClient, logger log.Logger, metricProvider metrics.Handler, clusterName string, config *configs.Config, + clientBean client.Bean, ) queues.Executor { return &timerQueueStandbyTaskExecutor{ timerQueueTaskExecutorBase: newTimerQueueTaskExecutorBase( @@ -90,9 +85,8 @@ func newTimerQueueStandbyTaskExecutor( config, false, ), - clusterName: clusterName, - nDCHistoryResender: nDCHistoryResender, - resendHandler: resendHandler, + clusterName: clusterName, + clientBean: clientBean, } } @@ -166,7 +160,7 @@ func (t *timerQueueStandbyTaskExecutor) executeUserTimerTimeoutTask( timerTask.GetVisibilityTime(), timerSequenceID.Timestamp, ) { - return getHistoryResendInfo(mutableState) + return &struct{}{}, nil } // Since the user timers are already sorted, then if there is one timer which is not expired, // all user timers after that timer are not expired. @@ -182,10 +176,8 @@ func (t *timerQueueStandbyTaskExecutor) executeUserTimerTimeoutTask( getStandbyPostActionFn( timerTask, t.getCurrentTime, - t.config.StandbyTaskMissingEventsResendDelay(timerTask.GetType()), t.config.StandbyTaskMissingEventsDiscardDelay(timerTask.GetType()), - t.fetchHistoryFromRemote, - standbyTimerTaskPostActionTaskDiscarded, + t.checkWorkflowStillExistOnSourceBeforeDiscard, ), ) } @@ -227,7 +219,7 @@ func (t *timerQueueStandbyTaskExecutor) executeActivityTimeoutTask( timerTask.GetVisibilityTime(), timerSequenceID.Timestamp, ) { - return getHistoryResendInfo(mutableState) + return &struct{}{}, nil } // Since the activity timers are already sorted, then if there is one timer which is not expired, // all activity timers after that timer are not expired. @@ -288,10 +280,8 @@ func (t *timerQueueStandbyTaskExecutor) executeActivityTimeoutTask( getStandbyPostActionFn( timerTask, t.getCurrentTime, - t.config.StandbyTaskMissingEventsResendDelay(timerTask.GetType()), t.config.StandbyTaskMissingEventsDiscardDelay(timerTask.GetType()), - t.fetchHistoryFromRemote, - standbyTimerTaskPostActionTaskDiscarded, + t.checkWorkflowStillExistOnSourceBeforeDiscard, ), ) } @@ -337,9 +327,7 @@ func (t *timerQueueStandbyTaskExecutor) executeActivityRetryTimerTask( getStandbyPostActionFn( task, t.getCurrentTime, - t.config.StandbyTaskMissingEventsResendDelay(task.GetType()), t.config.StandbyTaskMissingEventsDiscardDelay(task.GetType()), - t.fetchHistoryFromRemote, t.pushActivity, ), ) @@ -372,7 +360,7 @@ func (t *timerQueueStandbyTaskExecutor) executeWorkflowTaskTimeoutTask( return nil, err } - return getHistoryResendInfo(mutableState) + return &struct{}{}, nil } return t.processTimer( @@ -382,10 +370,8 @@ func (t *timerQueueStandbyTaskExecutor) executeWorkflowTaskTimeoutTask( getStandbyPostActionFn( timerTask, t.getCurrentTime, - t.config.StandbyTaskMissingEventsResendDelay(timerTask.GetType()), t.config.StandbyTaskMissingEventsDiscardDelay(timerTask.GetType()), - t.fetchHistoryFromRemote, - standbyTimerTaskPostActionTaskDiscarded, + t.checkWorkflowStillExistOnSourceBeforeDiscard, ), ) } @@ -416,7 +402,7 @@ func (t *timerQueueStandbyTaskExecutor) executeWorkflowBackoffTimerTask( // standby cluster should just call ack manager to retry this task // since we are stilling waiting for the first WorkflowTaskScheduledEvent to be replicated from active side. - return getHistoryResendInfo(mutableState) + return &struct{}{}, nil } return t.processTimer( @@ -426,10 +412,8 @@ func (t *timerQueueStandbyTaskExecutor) executeWorkflowBackoffTimerTask( getStandbyPostActionFn( timerTask, t.getCurrentTime, - t.config.StandbyTaskMissingEventsResendDelay(timerTask.GetType()), t.config.StandbyTaskMissingEventsDiscardDelay(timerTask.GetType()), - t.fetchHistoryFromRemote, - standbyTimerTaskPostActionTaskDiscarded, + t.checkWorkflowStillExistOnSourceBeforeDiscard, ), ) } @@ -453,7 +437,7 @@ func (t *timerQueueStandbyTaskExecutor) executeWorkflowRunTimeoutTask( return nil, err } - return getHistoryResendInfo(mutableState) + return &struct{}{}, nil } return t.processTimer( @@ -463,10 +447,8 @@ func (t *timerQueueStandbyTaskExecutor) executeWorkflowRunTimeoutTask( getStandbyPostActionFn( timerTask, t.getCurrentTime, - t.config.StandbyTaskMissingEventsResendDelay(timerTask.GetType()), t.config.StandbyTaskMissingEventsDiscardDelay(timerTask.GetType()), - t.fetchHistoryFromRemote, - standbyTimerTaskPostActionTaskDiscarded, + t.checkWorkflowStillExistOnSourceBeforeDiscard, ), ) } @@ -500,10 +482,8 @@ func (t *timerQueueStandbyTaskExecutor) executeWorkflowExecutionTimeoutTask( getStandbyPostActionFn( timerTask, t.getCurrentTime, - t.config.StandbyTaskMissingEventsResendDelay(timerTask.GetType()), t.config.StandbyTaskMissingEventsDiscardDelay(timerTask.GetType()), - t.fetchHistoryFromRemote, - standbyTimerTaskPostActionTaskDiscarded, + t.checkWorkflowStillExistOnSourceBeforeDiscard, ), ) } @@ -531,7 +511,7 @@ func (t *timerQueueStandbyTaskExecutor) executeStateMachineTimerTask( if err != nil { if errors.Is(err, consts.ErrTaskRetry) { // This handles the ErrTaskRetry error returned by executeStateMachineTimers. - return getHistoryResendInfo(mutableState) + return &struct{}{}, nil } return nil, err } @@ -560,10 +540,8 @@ func (t *timerQueueStandbyTaskExecutor) executeStateMachineTimerTask( getStandbyPostActionFn( timerTask, t.getCurrentTime, - t.config.StandbyTaskMissingEventsResendDelay(timerTask.GetType()), t.config.StandbyTaskMissingEventsDiscardDelay(timerTask.GetType()), - t.fetchHistoryFromRemote, - standbyTimerTaskPostActionTaskDiscarded, + t.checkWorkflowStillExistOnSourceBeforeDiscard, ), ) } @@ -622,99 +600,6 @@ func (t *timerQueueStandbyTaskExecutor) processTimer( return postActionFn(ctx, timerTask, historyResendInfo, t.logger) } -func (t *timerQueueStandbyTaskExecutor) fetchHistoryFromRemote( - ctx context.Context, - taskInfo tasks.Task, - postActionInfo interface{}, - logger log.Logger, -) error { - workflowKey := taskWorkflowKey(taskInfo) - - var resendInfo *historyResendInfo - switch postActionInfo := postActionInfo.(type) { - case nil: - return nil - case *historyResendInfo: - resendInfo = postActionInfo - case *executionTimerPostActionInfo: - resendInfo = postActionInfo.historyResendInfo - workflowKey.RunID = postActionInfo.currentRunID - case *activityTaskPostActionInfo: - resendInfo = postActionInfo.historyResendInfo - default: - logger.Fatal("unknown post action info for fetching remote history", tag.Value(postActionInfo)) - } - - remoteClusterName, err := getRemoteClusterName( - t.currentClusterName, - t.registry, - workflowKey.GetNamespaceID(), - ) - if err != nil { - return err - } - - scope := t.metricsHandler.WithTags(metrics.OperationTag(metrics.HistoryRereplicationByTimerTaskScope)) - metrics.ClientRequests.With(scope).Record(1) - startTime := time.Now() - defer func() { metrics.ClientLatency.With(scope).Record(time.Since(startTime)) }() - - if resendInfo.lastEventID == common.EmptyEventID || resendInfo.lastEventVersion == common.EmptyVersion { - t.logger.Error("Error re-replicating history from remote: timerQueueStandbyProcessor encountered empty historyResendInfo.", - tag.ShardID(t.shardContext.GetShardID()), - tag.WorkflowNamespaceID(workflowKey.GetNamespaceID()), - tag.WorkflowID(workflowKey.GetWorkflowID()), - tag.WorkflowRunID(workflowKey.GetRunID()), - tag.ClusterName(remoteClusterName)) - - return consts.ErrTaskRetry - } - - // NOTE: history resend may take long time and its timeout is currently - // controlled by a separate dynamicconfig config: StandbyTaskReReplicationContextTimeout - if t.config.EnableReplicateLocalGeneratedEvent() { - err = t.resendHandler.ResendHistoryEvents( - ctx, - remoteClusterName, - namespace.ID(workflowKey.GetNamespaceID()), - workflowKey.GetWorkflowID(), - workflowKey.GetRunID(), - resendInfo.lastEventID, - resendInfo.lastEventVersion, - common.EmptyEventID, - common.EmptyVersion, - ) - } else { - err = t.nDCHistoryResender.SendSingleWorkflowHistory( - ctx, - remoteClusterName, - namespace.ID(workflowKey.GetNamespaceID()), - workflowKey.GetWorkflowID(), - workflowKey.GetRunID(), - resendInfo.lastEventID, - resendInfo.lastEventVersion, - common.EmptyEventID, - common.EmptyVersion, - ) - } - if err != nil { - if _, isNotFound := err.(*serviceerror.NamespaceNotFound); isNotFound { - // Don't log NamespaceNotFound error because it is valid case, and return error to stop retrying. - return err - } - t.logger.Error("Error re-replicating history from remote.", - tag.ShardID(t.shardContext.GetShardID()), - tag.WorkflowNamespaceID(workflowKey.GetNamespaceID()), - tag.WorkflowID(workflowKey.GetWorkflowID()), - tag.WorkflowRunID(workflowKey.GetRunID()), - tag.ClusterName(remoteClusterName), - tag.Error(err)) - } - - // Return retryable error, so task processing will retry. - return consts.ErrTaskRetry -} - func (t *timerQueueStandbyTaskExecutor) pushActivity( ctx context.Context, task tasks.Task, @@ -770,3 +655,18 @@ func (t *timerQueueStandbyTaskExecutor) pushActivity( func (t *timerQueueStandbyTaskExecutor) getCurrentTime() time.Time { return t.shardContext.GetCurrentTime(t.clusterName) } + +func (t *timerQueueStandbyTaskExecutor) checkWorkflowStillExistOnSourceBeforeDiscard( + ctx context.Context, + taskInfo tasks.Task, + postActionInfo interface{}, + logger log.Logger, +) error { + if postActionInfo == nil { + return nil + } + if !isWorkflowExistOnSource(ctx, taskInfo, logger, t.clusterName, t.clientBean, t.shardContext.GetNamespaceRegistry()) { + return standbyTimerTaskPostActionTaskDiscarded(ctx, taskInfo, nil, logger) + } + return standbyTimerTaskPostActionTaskDiscarded(ctx, taskInfo, postActionInfo, logger) +} diff --git a/service/history/timer_queue_standby_task_executor_test.go b/service/history/timer_queue_standby_task_executor_test.go index a90be1dd1f5..09972290c3d 100644 --- a/service/history/timer_queue_standby_task_executor_test.go +++ b/service/history/timer_queue_standby_task_executor_test.go @@ -44,6 +44,7 @@ import ( "go.temporal.io/server/api/matchingservice/v1" "go.temporal.io/server/api/matchingservicemock/v1" persistencespb "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/client" "go.temporal.io/server/common" "go.temporal.io/server/common/clock" "go.temporal.io/server/common/cluster" @@ -56,7 +57,6 @@ import ( "go.temporal.io/server/common/persistence/versionhistory" "go.temporal.io/server/common/testing/protorequire" "go.temporal.io/server/common/worker_versioning" - "go.temporal.io/server/common/xdc" "go.temporal.io/server/components/dummy" "go.temporal.io/server/service/history/configs" "go.temporal.io/server/service/history/consts" @@ -64,7 +64,6 @@ import ( "go.temporal.io/server/service/history/events" "go.temporal.io/server/service/history/hsm" "go.temporal.io/server/service/history/queues" - "go.temporal.io/server/service/history/replication/eventhandler" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tasks" "go.temporal.io/server/service/history/tests" @@ -82,18 +81,16 @@ type ( *require.Assertions protorequire.ProtoAssertions - controller *gomock.Controller - mockExecutionMgr *persistence.MockExecutionManager - mockShard *shard.ContextTest - mockTxProcessor *queues.MockQueue - mockTimerProcessor *queues.MockQueue - mockNamespaceCache *namespace.MockRegistry - mockClusterMetadata *cluster.MockMetadata - mockAdminClient *adminservicemock.MockAdminServiceClient - mockNDCHistoryResender *xdc.MockNDCHistoryResender - mockResendHandler *eventhandler.MockResendHandler - mockDeleteManager *deletemanager.MockDeleteManager - mockMatchingClient *matchingservicemock.MockMatchingServiceClient + controller *gomock.Controller + mockExecutionMgr *persistence.MockExecutionManager + mockShard *shard.ContextTest + mockTxProcessor *queues.MockQueue + mockTimerProcessor *queues.MockQueue + mockNamespaceCache *namespace.MockRegistry + mockClusterMetadata *cluster.MockMetadata + mockAdminClient *adminservicemock.MockAdminServiceClient + mockDeleteManager *deletemanager.MockDeleteManager + mockMatchingClient *matchingservicemock.MockMatchingServiceClient config *configs.Config workflowCache wcache.Cache @@ -106,6 +103,7 @@ type ( timeSource *clock.EventTimeSource fetchHistoryDuration time.Duration discardDuration time.Duration + clientBean *client.MockBean timerQueueStandbyTaskExecutor *timerQueueStandbyTaskExecutor } @@ -134,14 +132,13 @@ func (s *timerQueueStandbyTaskExecutorSuite) SetupTest() { s.discardDuration = time.Minute * 30 s.controller = gomock.NewController(s.T()) - s.mockNDCHistoryResender = xdc.NewMockNDCHistoryResender(s.controller) s.mockTxProcessor = queues.NewMockQueue(s.controller) s.mockTimerProcessor = queues.NewMockQueue(s.controller) s.mockTxProcessor.EXPECT().Category().Return(tasks.CategoryTransfer).AnyTimes() s.mockTimerProcessor.EXPECT().Category().Return(tasks.CategoryTimer).AnyTimes() s.mockTxProcessor.EXPECT().NotifyNewTasks(gomock.Any()).AnyTimes() s.mockTimerProcessor.EXPECT().NotifyNewTasks(gomock.Any()).AnyTimes() - s.mockResendHandler = eventhandler.NewMockResendHandler(s.controller) + s.clientBean = client.NewMockBean(s.controller) s.mockShard = shard.NewTestContextWithTimeSource( s.controller, @@ -204,13 +201,12 @@ func (s *timerQueueStandbyTaskExecutorSuite) SetupTest() { s.mockShard, s.workflowCache, s.mockDeleteManager, - s.mockNDCHistoryResender, - s.mockResendHandler, s.mockMatchingClient, s.logger, metrics.NoopMetricsHandler, s.clusterName, s.config, + s.clientBean, ).(*timerQueueStandbyTaskExecutor) } @@ -258,7 +254,6 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessUserTimerTimeout_Pending timerID := "timer" timerTimeout := 2 * time.Second event, _ = addTimerStartedEvent(mutableState, event.GetEventId(), timerID, timerTimeout) - nextEventID := event.GetEventId() timerSequence := workflow.NewTimerSequence(mutableState) mutableState.InsertTasks[tasks.CategoryTimer] = nil @@ -286,17 +281,6 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessUserTimerTimeout_Pending s.Equal(consts.ErrTaskRetry, resp.ExecutionErr) s.mockShard.SetCurrentTime(s.clusterName, s.now.Add(s.fetchHistoryDuration)) - s.mockNDCHistoryResender.EXPECT().SendSingleWorkflowHistory( - gomock.Any(), - s.clusterName, - namespace.ID(timerTask.NamespaceID), - timerTask.WorkflowID, - timerTask.RunID, - nextEventID, - s.version, - int64(0), - int64(0), - ).Return(nil) resp = s.timerQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) s.Equal(consts.ErrTaskRetry, resp.ExecutionErr) @@ -475,7 +459,6 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityTimeout_Pending( timerTimeout := 2 * time.Second scheduledEvent, _ := addActivityTaskScheduledEvent(mutableState, event.GetEventId(), activityID, activityType, taskqueue, nil, timerTimeout, timerTimeout, timerTimeout, timerTimeout) - nextEventID := scheduledEvent.GetEventId() timerSequence := workflow.NewTimerSequence(mutableState) mutableState.InsertTasks[tasks.CategoryTimer] = nil @@ -505,17 +488,6 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityTimeout_Pending( s.Equal(consts.ErrTaskRetry, resp.ExecutionErr) s.mockShard.SetCurrentTime(s.clusterName, s.now.Add(s.fetchHistoryDuration)) - s.mockNDCHistoryResender.EXPECT().SendSingleWorkflowHistory( - gomock.Any(), - s.clusterName, - namespace.ID(timerTask.NamespaceID), - timerTask.WorkflowID, - timerTask.RunID, - nextEventID, - s.version, - int64(0), - int64(0), - ).Return(nil) resp = s.timerQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) s.Equal(consts.ErrTaskRetry, resp.ExecutionErr) @@ -814,7 +786,6 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessWorkflowTaskTimeout_Pend startedEvent := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) // Flush buffered events so real IDs get assigned mutableState.FlushBufferedEvents() - nextEventID := startedEvent.GetEventId() timerTask := &tasks.WorkflowTaskTimeoutTask{ WorkflowKey: definition.NewWorkflowKey( @@ -838,17 +809,6 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessWorkflowTaskTimeout_Pend s.Equal(consts.ErrTaskRetry, resp.ExecutionErr) s.mockShard.SetCurrentTime(s.clusterName, s.now.Add(s.fetchHistoryDuration)) - s.mockNDCHistoryResender.EXPECT().SendSingleWorkflowHistory( - gomock.Any(), - s.clusterName, - namespace.ID(timerTask.NamespaceID), - timerTask.WorkflowID, - timerTask.RunID, - nextEventID, - s.version, - int64(0), - int64(0), - ).Return(nil) resp = s.timerQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) s.Equal(consts.ErrTaskRetry, resp.ExecutionErr) @@ -963,7 +923,6 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessWorkflowBackoffTimer_Pen // Flush buffered events so real IDs get assigned mutableState.FlushBufferedEvents() - nextEventID := event.GetEventId() timerTask := &tasks.WorkflowBackoffTimerTask{ WorkflowKey: definition.NewWorkflowKey( @@ -985,17 +944,6 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessWorkflowBackoffTimer_Pen s.Equal(consts.ErrTaskRetry, resp.ExecutionErr) s.mockShard.SetCurrentTime(s.clusterName, time.Now().UTC().Add(s.fetchHistoryDuration)) - s.mockNDCHistoryResender.EXPECT().SendSingleWorkflowHistory( - gomock.Any(), - s.clusterName, - namespace.ID(timerTask.NamespaceID), - timerTask.WorkflowID, - timerTask.RunID, - nextEventID, - s.version, - int64(0), - int64(0), - ).Return(nil) resp = s.timerQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) s.Equal(consts.ErrTaskRetry, resp.ExecutionErr) @@ -1084,7 +1032,6 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessWorkflowRunTimeout_Pendi completionEvent := addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") // Flush buffered events so real IDs get assigned mutableState.FlushBufferedEvents() - nextEventID := completionEvent.GetEventId() timerTask := &tasks.WorkflowRunTimeoutTask{ WorkflowKey: definition.NewWorkflowKey( @@ -1106,17 +1053,6 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessWorkflowRunTimeout_Pendi s.Equal(consts.ErrTaskRetry, resp.ExecutionErr) s.mockShard.SetCurrentTime(s.clusterName, s.now.Add(s.fetchHistoryDuration)) - s.mockNDCHistoryResender.EXPECT().SendSingleWorkflowHistory( - gomock.Any(), - s.clusterName, - namespace.ID(timerTask.NamespaceID), - timerTask.WorkflowID, - timerTask.RunID, - nextEventID, - s.version, - int64(0), - int64(0), - ).Return(nil) resp = s.timerQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) s.Equal(consts.ErrTaskRetry, resp.ExecutionErr) @@ -1206,7 +1142,6 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessWorkflowExecutionTimeout // Flush buffered events so real IDs get assigned mutableState.FlushBufferedEvents() - nextEventID := startedEvent.GetEventId() timerTask := &tasks.WorkflowExecutionTimeoutTask{ NamespaceID: s.namespaceID.String(), @@ -1235,17 +1170,7 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessWorkflowExecutionTimeout s.Equal(consts.ErrTaskRetry, resp.ExecutionErr) s.mockShard.SetCurrentTime(s.clusterName, s.now.Add(s.fetchHistoryDuration)) - s.mockNDCHistoryResender.EXPECT().SendSingleWorkflowHistory( - gomock.Any(), - s.clusterName, - namespace.ID(timerTask.NamespaceID), - timerTask.WorkflowID, - execution.RunId, - nextEventID, - s.version, - int64(0), - int64(0), - ).Return(nil) + resp = s.timerQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) s.Equal(consts.ErrTaskRetry, resp.ExecutionErr) @@ -1586,17 +1511,7 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityRetryTimer_Pendi // resend history post action s.mockShard.SetCurrentTime(s.clusterName, s.now.Add(s.fetchHistoryDuration)) - s.mockNDCHistoryResender.EXPECT().SendSingleWorkflowHistory( - gomock.Any(), - s.clusterName, - s.namespaceID, - execution.WorkflowId, - execution.RunId, - scheduledEvent.GetEventId(), - s.version, - int64(0), - int64(0), - ).Return(nil) + resp = s.timerQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) s.Equal(consts.ErrTaskRetry, resp.ExecutionErr) @@ -1731,13 +1646,12 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestExecuteStateMachineTimerTask_Ex s.mockShard, mockCache, s.mockDeleteManager, - s.mockNDCHistoryResender, - s.mockResendHandler, s.mockMatchingClient, s.logger, metrics.NoopMetricsHandler, s.clusterName, s.config, + s.clientBean, ).(*timerQueueStandbyTaskExecutor) err = timerQueueStandbyTaskExecutor.executeStateMachineTimerTask(context.Background(), task) @@ -1839,13 +1753,12 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestExecuteStateMachineTimerTask_Va s.mockShard, mockCache, s.mockDeleteManager, - s.mockNDCHistoryResender, - s.mockResendHandler, s.mockMatchingClient, s.logger, metrics.NoopMetricsHandler, s.clusterName, s.config, + s.clientBean, ).(*timerQueueStandbyTaskExecutor) err = timerQueueStandbyTaskExecutor.executeStateMachineTimerTask(context.Background(), task) @@ -1942,13 +1855,12 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestExecuteStateMachineTimerTask_St s.mockShard, mockCache, s.mockDeleteManager, - s.mockNDCHistoryResender, - s.mockResendHandler, s.mockMatchingClient, s.logger, metrics.NoopMetricsHandler, s.clusterName, s.config, + s.clientBean, ).(*timerQueueStandbyTaskExecutor) err = timerQueueStandbyTaskExecutor.executeStateMachineTimerTask(context.Background(), task) diff --git a/service/history/transfer_queue_factory.go b/service/history/transfer_queue_factory.go index f8fcc70c12e..bdf054d63e3 100644 --- a/service/history/transfer_queue_factory.go +++ b/service/history/transfer_queue_factory.go @@ -25,22 +25,14 @@ package history import ( - "context" - - historypb "go.temporal.io/api/history/v1" - historyspb "go.temporal.io/server/api/history/v1" "go.temporal.io/server/client" - "go.temporal.io/server/common/definition" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" - "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence/visibility/manager" "go.temporal.io/server/common/resource" "go.temporal.io/server/common/sdk" - "go.temporal.io/server/common/xdc" "go.temporal.io/server/service/history/queues" - "go.temporal.io/server/service/history/replication/eventhandler" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tasks" wcache "go.temporal.io/server/service/history/workflow/cache" @@ -144,73 +136,17 @@ func (f *transferQueueFactory) CreateQueue( f.MatchingRawClient, f.VisibilityManager, ) - eventImporter := eventhandler.NewEventImporter( - f.RemoteHistoryFetcher, - func(ctx context.Context, namespaceID namespace.ID, workflowID string) (shard.Engine, error) { - return shardContext.GetEngine(ctx) - }, - f.Serializer, - logger, - ) - resendHandler := eventhandler.NewResendHandler( - f.NamespaceRegistry, - f.ClientBean, - f.Serializer, - f.ClusterMetadata, - func(ctx context.Context, namespaceID namespace.ID, workflowID string) (shard.Engine, error) { - return shardContext.GetEngine(ctx) - }, - f.RemoteHistoryFetcher, - eventImporter, - logger, - f.Config, - ) standbyExecutor := newTransferQueueStandbyTaskExecutor( shardContext, workflowCache, - xdc.NewNDCHistoryResender( - f.NamespaceRegistry, - f.ClientBean, - func( - ctx context.Context, - sourceClusterName string, - namespaceId namespace.ID, - workflowId string, - runId string, - events [][]*historypb.HistoryEvent, - versionHistory []*historyspb.VersionHistoryItem, - ) error { - engine, err := shardContext.GetEngine(ctx) - if err != nil { - return err - } - return engine.ReplicateHistoryEvents( - ctx, - definition.WorkflowKey{ - NamespaceID: namespaceId.String(), - WorkflowID: workflowId, - RunID: runId, - }, - nil, - versionHistory, - events, - nil, - "", - ) - }, - shardContext.GetPayloadSerializer(), - f.Config.StandbyTaskReReplicationContextTimeout, - logger, - f.Config, - ), - resendHandler, logger, f.MetricsHandler, currentClusterName, f.HistoryRawClient, f.MatchingRawClient, f.VisibilityManager, + f.ClientBean, ) executor := queues.NewActiveStandbyExecutor( diff --git a/service/history/transfer_queue_standby_task_executor.go b/service/history/transfer_queue_standby_task_executor.go index 5d9aa486bc9..7946d355797 100644 --- a/service/history/transfer_queue_standby_task_executor.go +++ b/service/history/transfer_queue_standby_task_executor.go @@ -35,18 +35,16 @@ import ( "go.temporal.io/api/serviceerror" taskqueuepb "go.temporal.io/api/taskqueue/v1" "go.temporal.io/server/api/historyservice/v1" + "go.temporal.io/server/client" "go.temporal.io/server/common" "go.temporal.io/server/common/log" - "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence/visibility/manager" "go.temporal.io/server/common/resource" - "go.temporal.io/server/common/xdc" "go.temporal.io/server/service/history/consts" "go.temporal.io/server/service/history/ndc" "go.temporal.io/server/service/history/queues" - "go.temporal.io/server/service/history/replication/eventhandler" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tasks" "go.temporal.io/server/service/history/workflow" @@ -62,9 +60,8 @@ type ( transferQueueStandbyTaskExecutor struct { *transferQueueTaskExecutorBase - clusterName string - nDCHistoryResender xdc.NDCHistoryResender // Deprecated, will delete once eventhandler.ResendHandler feature is fully launched - resendHandler eventhandler.ResendHandler + clusterName string + clientBean client.Bean } verificationErr struct { @@ -76,14 +73,13 @@ type ( func newTransferQueueStandbyTaskExecutor( shard shard.Context, workflowCache wcache.Cache, - nDCHistoryResender xdc.NDCHistoryResender, - resendHandler eventhandler.ResendHandler, logger log.Logger, metricProvider metrics.Handler, clusterName string, historyRawClient resource.HistoryRawClient, matchingRawClient resource.MatchingRawClient, visibilityManager manager.VisibilityManager, + clientBean client.Bean, ) queues.Executor { return &transferQueueStandbyTaskExecutor{ transferQueueTaskExecutorBase: newTransferQueueTaskExecutorBase( @@ -95,9 +91,8 @@ func newTransferQueueStandbyTaskExecutor( matchingRawClient, visibilityManager, ), - clusterName: clusterName, - nDCHistoryResender: nDCHistoryResender, - resendHandler: resendHandler, + clusterName: clusterName, + clientBean: clientBean, } } @@ -175,9 +170,7 @@ func (t *transferQueueStandbyTaskExecutor) processActivityTask( getStandbyPostActionFn( transferTask, t.getCurrentTime, - t.config.StandbyTaskMissingEventsResendDelay(transferTask.GetType()), t.config.StandbyTaskMissingEventsDiscardDelay(transferTask.GetType()), - t.fetchHistoryFromRemote, t.pushActivity, ), ) @@ -228,9 +221,7 @@ func (t *transferQueueStandbyTaskExecutor) processWorkflowTask( getStandbyPostActionFn( transferTask, t.getCurrentTime, - t.config.StandbyTaskMissingEventsResendDelay(transferTask.GetType()), t.config.StandbyTaskMissingEventsDiscardDelay(transferTask.GetType()), - t.fetchHistoryFromRemote, t.pushWorkflowTask, ), ) @@ -316,10 +307,8 @@ func (t *transferQueueStandbyTaskExecutor) processCloseExecution( getStandbyPostActionFn( transferTask, t.getCurrentTime, - t.config.StandbyTaskMissingEventsResendDelay(transferTask.GetType()), t.config.StandbyTaskMissingEventsDiscardDelay(transferTask.GetType()), - standbyTaskPostActionNoOp, - standbyTransferTaskPostActionTaskDiscarded, + t.checkWorkflowStillExistOnSourceBeforeDiscard, ), ) } @@ -340,7 +329,7 @@ func (t *transferQueueStandbyTaskExecutor) processCancelExecution( return nil, err } - return getHistoryResendInfo(mutableState) + return &struct{}{}, nil } return t.processTransfer( @@ -351,10 +340,8 @@ func (t *transferQueueStandbyTaskExecutor) processCancelExecution( getStandbyPostActionFn( transferTask, t.getCurrentTime, - t.config.StandbyTaskMissingEventsResendDelay(transferTask.GetType()), t.config.StandbyTaskMissingEventsDiscardDelay(transferTask.GetType()), - t.fetchHistoryFromRemote, - standbyTransferTaskPostActionTaskDiscarded, + t.checkWorkflowStillExistOnSourceBeforeDiscard, ), ) } @@ -375,7 +362,7 @@ func (t *transferQueueStandbyTaskExecutor) processSignalExecution( return nil, err } - return getHistoryResendInfo(mutableState) + return &struct{}{}, nil } return t.processTransfer( @@ -386,10 +373,8 @@ func (t *transferQueueStandbyTaskExecutor) processSignalExecution( getStandbyPostActionFn( transferTask, t.getCurrentTime, - t.config.StandbyTaskMissingEventsResendDelay(transferTask.GetType()), t.config.StandbyTaskMissingEventsDiscardDelay(transferTask.GetType()), - t.fetchHistoryFromRemote, - standbyTransferTaskPostActionTaskDiscarded, + t.checkWorkflowStillExistOnSourceBeforeDiscard, ), ) } @@ -424,11 +409,7 @@ func (t *transferQueueStandbyTaskExecutor) processStartChildExecution( } if !childStarted { - historyResendInfo, err := getHistoryResendInfo(mutableState) - if err != nil { - return nil, err - } - return historyResendInfo, nil + return &struct{}{}, nil } _, err = t.historyRawClient.VerifyFirstWorkflowTaskScheduled(ctx, &historyservice.VerifyFirstWorkflowTaskScheduledRequest{ @@ -468,10 +449,8 @@ func (t *transferQueueStandbyTaskExecutor) processStartChildExecution( getStandbyPostActionFn( transferTask, t.getCurrentTime, - t.config.StandbyTaskMissingEventsResendDelay(transferTask.GetType()), t.config.StandbyTaskMissingEventsDiscardDelay(transferTask.GetType()), - t.startChildExecutionResendPostAction, - standbyTransferTaskPostActionTaskDiscarded, + t.checkWorkflowStillExistOnSourceBeforeDiscard, ), ) } @@ -570,121 +549,29 @@ func (t *transferQueueStandbyTaskExecutor) pushWorkflowTask( ) } -func (t *transferQueueStandbyTaskExecutor) startChildExecutionResendPostAction( - ctx context.Context, - taskInfo tasks.Task, - postActionInfo interface{}, - log log.Logger, -) error { - if postActionInfo == nil { - return nil - } +func (t *transferQueueStandbyTaskExecutor) getCurrentTime() time.Time { + return t.shardContext.GetCurrentTime(t.clusterName) +} - if historyResendInfo, ok := postActionInfo.(*historyResendInfo); ok { - return t.fetchHistoryFromRemote(ctx, taskInfo, historyResendInfo, log) - } +func (e *verificationErr) Error() string { + return fmt.Sprintf("%v: %v", e.msg, e.err.Error()) +} - return standbyTaskPostActionNoOp(ctx, taskInfo, postActionInfo, log) +func (e *verificationErr) Unwrap() error { + return e.err } -func (t *transferQueueStandbyTaskExecutor) fetchHistoryFromRemote( +func (t *transferQueueStandbyTaskExecutor) checkWorkflowStillExistOnSourceBeforeDiscard( ctx context.Context, taskInfo tasks.Task, postActionInfo interface{}, logger log.Logger, ) error { - var resendInfo *historyResendInfo - switch postActionInfo := postActionInfo.(type) { - case nil: + if postActionInfo == nil { return nil - case *historyResendInfo: - resendInfo = postActionInfo - case *activityTaskPostActionInfo: - resendInfo = postActionInfo.historyResendInfo - case *workflowTaskPostActionInfo: - resendInfo = postActionInfo.historyResendInfo - default: - logger.Fatal("unknown post action info for fetching remote history", tag.Value(postActionInfo)) - } - - remoteClusterName, err := getRemoteClusterName( - t.currentClusterName, - t.registry, - taskInfo.GetNamespaceID(), - ) - if err != nil { - return err - } - - scope := t.metricHandler.WithTags(metrics.OperationTag(metrics.HistoryRereplicationByTransferTaskScope)) - metrics.ClientRequests.With(scope).Record(1) - startTime := time.Now().UTC() - defer func() { metrics.ClientLatency.With(scope).Record(time.Since(startTime)) }() - - if resendInfo.lastEventID == common.EmptyEventID || resendInfo.lastEventVersion == common.EmptyVersion { - t.logger.Error("Error re-replicating history from remote: transferQueueStandbyProcessor encountered empty historyResendInfo.", - tag.ShardID(t.shardContext.GetShardID()), - tag.WorkflowNamespaceID(taskInfo.GetNamespaceID()), - tag.WorkflowID(taskInfo.GetWorkflowID()), - tag.WorkflowRunID(taskInfo.GetRunID()), - tag.SourceCluster(remoteClusterName)) - - return consts.ErrTaskRetry - } - - // NOTE: history resend may take long time and its timeout is currently - // controlled by a separate dynamicconfig config: StandbyTaskReReplicationContextTimeout - if t.config.EnableReplicateLocalGeneratedEvent() { - err = t.resendHandler.ResendHistoryEvents( - ctx, - remoteClusterName, - namespace.ID(taskInfo.GetNamespaceID()), - taskInfo.GetWorkflowID(), - taskInfo.GetRunID(), - resendInfo.lastEventID, - resendInfo.lastEventVersion, - common.EmptyEventID, - common.EmptyVersion, - ) - } else { - err = t.nDCHistoryResender.SendSingleWorkflowHistory( - ctx, - remoteClusterName, - namespace.ID(taskInfo.GetNamespaceID()), - taskInfo.GetWorkflowID(), - taskInfo.GetRunID(), - resendInfo.lastEventID, - resendInfo.lastEventVersion, - 0, - 0, - ) } - if err != nil { - if _, isNotFound := err.(*serviceerror.NamespaceNotFound); isNotFound { - // Don't log NamespaceNotFound error because it is valid case, and return error to stop retrying. - return err - } - t.logger.Error("Error re-replicating history from remote.", - tag.ShardID(t.shardContext.GetShardID()), - tag.WorkflowNamespaceID(taskInfo.GetNamespaceID()), - tag.WorkflowID(taskInfo.GetWorkflowID()), - tag.WorkflowRunID(taskInfo.GetRunID()), - tag.SourceCluster(remoteClusterName), - tag.Error(err)) + if !isWorkflowExistOnSource(ctx, taskInfo, logger, t.clusterName, t.clientBean, t.shardContext.GetNamespaceRegistry()) { + return standbyTransferTaskPostActionTaskDiscarded(ctx, taskInfo, nil, logger) } - - // Return retryable error, so task processing will retry. - return consts.ErrTaskRetry -} - -func (t *transferQueueStandbyTaskExecutor) getCurrentTime() time.Time { - return t.shardContext.GetCurrentTime(t.clusterName) -} - -func (e *verificationErr) Error() string { - return fmt.Sprintf("%v: %v", e.msg, e.err.Error()) -} - -func (e *verificationErr) Unwrap() error { - return e.err + return standbyTransferTaskPostActionTaskDiscarded(ctx, taskInfo, postActionInfo, logger) } diff --git a/service/history/transfer_queue_standby_task_executor_test.go b/service/history/transfer_queue_standby_task_executor_test.go index 0a4b85db70b..a96129bda47 100644 --- a/service/history/transfer_queue_standby_task_executor_test.go +++ b/service/history/transfer_queue_standby_task_executor_test.go @@ -46,6 +46,7 @@ import ( "go.temporal.io/server/api/matchingservicemock/v1" persistencespb "go.temporal.io/server/api/persistence/v1" workflowspb "go.temporal.io/server/api/workflow/v1" + "go.temporal.io/server/client" "go.temporal.io/server/common" "go.temporal.io/server/common/archiver" "go.temporal.io/server/common/archiver/provider" @@ -60,12 +61,10 @@ import ( "go.temporal.io/server/common/persistence/visibility/manager" "go.temporal.io/server/common/searchattribute" "go.temporal.io/server/common/testing/protomock" - "go.temporal.io/server/common/xdc" "go.temporal.io/server/service/history/consts" "go.temporal.io/server/service/history/events" "go.temporal.io/server/service/history/hsm" "go.temporal.io/server/service/history/queues" - "go.temporal.io/server/service/history/replication/eventhandler" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tasks" "go.temporal.io/server/service/history/tests" @@ -81,15 +80,13 @@ type ( suite.Suite *require.Assertions - controller *gomock.Controller - mockShard *shard.ContextTest - mockNamespaceCache *namespace.MockRegistry - mockClusterMetadata *cluster.MockMetadata - mockAdminClient *adminservicemock.MockAdminServiceClient - mockNDCHistoryResender *xdc.MockNDCHistoryResender - resendHandler *eventhandler.MockResendHandler - mockHistoryClient *historyservicemock.MockHistoryServiceClient - mockMatchingClient *matchingservicemock.MockMatchingServiceClient + controller *gomock.Controller + mockShard *shard.ContextTest + mockNamespaceCache *namespace.MockRegistry + mockClusterMetadata *cluster.MockMetadata + mockAdminClient *adminservicemock.MockAdminServiceClient + mockHistoryClient *historyservicemock.MockHistoryServiceClient + mockMatchingClient *matchingservicemock.MockMatchingServiceClient mockExecutionMgr *persistence.MockExecutionManager mockArchivalMetadata archiver.MetadataMock @@ -109,6 +106,7 @@ type ( transferQueueStandbyTaskExecutor *transferQueueStandbyTaskExecutor mockSearchAttributesProvider *searchattribute.MockProvider mockVisibilityManager *manager.MockVisibilityManager + clientBean *client.MockBean } ) @@ -134,7 +132,6 @@ func (s *transferQueueStandbyTaskExecutorSuite) SetupTest() { s.discardDuration = time.Minute * 30 s.controller = gomock.NewController(s.T()) - s.mockNDCHistoryResender = xdc.NewMockNDCHistoryResender(s.controller) s.mockShard = shard.NewTestContextWithTimeSource( s.controller, &persistencespb.ShardInfo{ @@ -189,13 +186,12 @@ func (s *transferQueueStandbyTaskExecutorSuite) SetupTest() { s.mockClusterMetadata.EXPECT().GetAllClusterInfo().Return(cluster.TestAllClusterInfo).AnyTimes() s.mockClusterMetadata.EXPECT().IsGlobalNamespaceEnabled().Return(true).AnyTimes() s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(s.namespaceEntry.IsGlobalNamespace(), s.version).Return(s.clusterName).AnyTimes() - + s.clientBean = client.NewMockBean(s.controller) s.workflowCache = wcache.NewHostLevelCache(s.mockShard.GetConfig(), s.mockShard.GetLogger(), metrics.NoopMetricsHandler) s.logger = s.mockShard.GetLogger() s.mockArchivalMetadata.SetHistoryEnabledByDefault() s.mockArchivalMetadata.SetVisibilityEnabledByDefault() - s.resendHandler = eventhandler.NewMockResendHandler(s.controller) h := &historyEngineImpl{ currentClusterName: s.mockShard.Resource.GetClusterMetadata().GetCurrentClusterName(), @@ -212,14 +208,13 @@ func (s *transferQueueStandbyTaskExecutorSuite) SetupTest() { s.transferQueueStandbyTaskExecutor = newTransferQueueStandbyTaskExecutor( s.mockShard, s.workflowCache, - s.mockNDCHistoryResender, - s.resendHandler, s.logger, metrics.NoopMetricsHandler, s.clusterName, s.mockShard.Resource.HistoryClient, s.mockShard.Resource.MatchingClient, s.mockVisibilityManager, + s.clientBean, ).(*transferQueueStandbyTaskExecutor) } @@ -286,17 +281,6 @@ func (s *transferQueueStandbyTaskExecutorSuite) TestProcessActivityTask_Pending( // resend history post action s.mockShard.SetCurrentTime(s.clusterName, now.Add(s.fetchHistoryDuration)) - s.mockNDCHistoryResender.EXPECT().SendSingleWorkflowHistory( - gomock.Any(), - s.clusterName, - namespace.ID(transferTask.NamespaceID), - transferTask.WorkflowID, - transferTask.RunID, - event.GetEventId(), - s.version, - int64(0), - int64(0), - ).Return(nil) resp = s.transferQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask)) s.Equal(consts.ErrTaskRetry, resp.ExecutionErr) @@ -421,17 +405,6 @@ func (s *transferQueueStandbyTaskExecutorSuite) TestProcessWorkflowTask_Pending( // resend history post action s.mockShard.SetCurrentTime(s.clusterName, now.Add(s.fetchHistoryDuration)) - s.mockNDCHistoryResender.EXPECT().SendSingleWorkflowHistory( - gomock.Any(), - s.clusterName, - namespace.ID(transferTask.NamespaceID), - transferTask.WorkflowID, - transferTask.RunID, - wt.ScheduledEventID, - s.version, - int64(0), - int64(0), - ).Return(nil) resp = s.transferQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask)) s.Equal(consts.ErrTaskRetry, resp.ExecutionErr) @@ -701,7 +674,6 @@ func (s *transferQueueStandbyTaskExecutorSuite) TestProcessCancelExecution_Pendi taskID := s.mustGenerateTaskID() event, _ = addRequestCancelInitiatedEvent(mutableState, event.GetEventId(), uuid.New(), tests.TargetNamespace, tests.TargetNamespaceID, targetExecution.GetWorkflowId(), targetExecution.GetRunId()) - nextEventID := event.GetEventId() now := time.Now().UTC() transferTask := &tasks.CancelExecutionTask{ @@ -728,17 +700,6 @@ func (s *transferQueueStandbyTaskExecutorSuite) TestProcessCancelExecution_Pendi s.Equal(consts.ErrTaskRetry, resp.ExecutionErr) s.mockShard.SetCurrentTime(s.clusterName, now.Add(s.fetchHistoryDuration)) - s.mockNDCHistoryResender.EXPECT().SendSingleWorkflowHistory( - gomock.Any(), - s.clusterName, - namespace.ID(transferTask.NamespaceID), - transferTask.WorkflowID, - transferTask.RunID, - nextEventID, - s.version, - int64(0), - int64(0), - ).Return(nil) resp = s.transferQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask)) s.Equal(consts.ErrTaskRetry, resp.ExecutionErr) @@ -850,7 +811,6 @@ func (s *transferQueueStandbyTaskExecutorSuite) TestProcessSignalExecution_Pendi taskID := s.mustGenerateTaskID() event, _ = addRequestSignalInitiatedEvent(mutableState, event.GetEventId(), uuid.New(), tests.TargetNamespace, tests.TargetNamespaceID, targetExecution.GetWorkflowId(), targetExecution.GetRunId(), signalName, nil, "", nil) - nextEventID := event.GetEventId() now := time.Now().UTC() transferTask := &tasks.SignalExecutionTask{ @@ -876,17 +836,6 @@ func (s *transferQueueStandbyTaskExecutorSuite) TestProcessSignalExecution_Pendi s.Equal(consts.ErrTaskRetry, resp.ExecutionErr) s.mockShard.SetCurrentTime(s.clusterName, now.Add(s.fetchHistoryDuration)) - s.mockNDCHistoryResender.EXPECT().SendSingleWorkflowHistory( - gomock.Any(), - s.clusterName, - namespace.ID(transferTask.NamespaceID), - transferTask.WorkflowID, - transferTask.RunID, - nextEventID, - s.version, - int64(0), - int64(0), - ).Return(nil) resp = s.transferQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask)) s.Equal(consts.ErrTaskRetry, resp.ExecutionErr) @@ -998,7 +947,6 @@ func (s *transferQueueStandbyTaskExecutorSuite) TestProcessStartChildExecution_P taskID := s.mustGenerateTaskID() event, _ = addStartChildWorkflowExecutionInitiatedEvent(mutableState, event.GetEventId(), uuid.New(), tests.ChildNamespace, tests.ChildNamespaceID, childWorkflowID, childWorkflowType, childTaskQueueName, nil, 1*time.Second, 1*time.Second, 1*time.Second, enumspb.PARENT_CLOSE_POLICY_ABANDON) - nextEventID := event.GetEventId() now := time.Now().UTC() transferTask := &tasks.StartChildExecutionTask{ @@ -1023,17 +971,6 @@ func (s *transferQueueStandbyTaskExecutorSuite) TestProcessStartChildExecution_P s.Equal(consts.ErrTaskRetry, resp.ExecutionErr) s.mockShard.SetCurrentTime(s.clusterName, now.Add(s.fetchHistoryDuration)) - s.mockNDCHistoryResender.EXPECT().SendSingleWorkflowHistory( - gomock.Any(), - s.clusterName, - namespace.ID(transferTask.NamespaceID), - transferTask.WorkflowID, - transferTask.RunID, - nextEventID, - s.version, - int64(0), - int64(0), - ).Return(nil) resp = s.transferQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(transferTask)) s.Equal(consts.ErrTaskRetry, resp.ExecutionErr)