From cc027e06f69a4dc4dbcd92cd1ad59daf5fddbaec Mon Sep 17 00:00:00 2001 From: Roey Berman Date: Mon, 2 Dec 2024 13:06:21 -0800 Subject: [PATCH 1/3] Nexus callback and task validiation fixes --- components/callbacks/executors.go | 105 ++++++++++++++--------- components/callbacks/hsm_invocation.go | 10 +-- components/callbacks/nexus_invocation.go | 24 +++--- components/callbacks/tasks.go | 31 ++++++- components/nexusoperations/tasks.go | 29 +++++-- service/history/hsm/tasks.go | 2 +- 6 files changed, 133 insertions(+), 68 deletions(-) diff --git a/components/callbacks/executors.go b/components/callbacks/executors.go index 23a2ea3f687..d9f522c89d9 100644 --- a/components/callbacks/executors.go +++ b/components/callbacks/executors.go @@ -58,37 +58,65 @@ func RegisterExecutor( ) } -type ( - TaskExecutorOptions struct { - fx.In - - Config *Config - NamespaceRegistry namespace.Registry - MetricsHandler metrics.Handler - Logger log.Logger - HTTPCallerProvider HTTPCallerProvider - HistoryClient resource.HistoryClient - } +type TaskExecutorOptions struct { + fx.In + + Config *Config + NamespaceRegistry namespace.Registry + MetricsHandler metrics.Handler + Logger log.Logger + HTTPCallerProvider HTTPCallerProvider + HistoryClient resource.HistoryClient +} - taskExecutor struct { - TaskExecutorOptions - } +type taskExecutor struct { + TaskExecutorOptions +} - invocationResult int +// invocationResult is a marker for the callbackInvokable.Invoke result to indicate to the executor how to handle the +// invocation outcome. +type invocationResult interface { + error() error +} - callbackInvokable interface { - // Invoke executes the callback logic and returns a result, and the error to be logged in the state machine. - Invoke(ctx context.Context, ns *namespace.Namespace, e taskExecutor, task InvocationTask) (invocationResult, error) - // WrapError provides each variant the opportunity to return a different error up the call stack than the one logged. - WrapError(result invocationResult, err error) error - } -) +// invocationResultFail marks an invocation as successful. +type invocationResultOK struct{} -const ( - ok invocationResult = iota - retry - failed -) +func (invocationResultOK) mustImplementInvocationResult() {} + +func (invocationResultOK) error() error { + return nil +} + +// invocationResultFail marks an invocation as permanently failed. +type invocationResultFail struct { + err error +} + +func (invocationResultFail) mustImplementInvocationResult() {} + +func (r invocationResultFail) error() error { + return r.err +} + +// invocationResultFail marks an invocation as failed with the intent to retry. +type invocationResultRetry struct { + err error +} + +func (invocationResultRetry) mustImplementInvocationResult() {} + +func (r invocationResultRetry) error() error { + return r.err +} + +type callbackInvokable interface { + // Invoke executes the callback logic and returns the invocation result. + Invoke(ctx context.Context, ns *namespace.Namespace, e taskExecutor, task InvocationTask) invocationResult + // WrapError provides each variant the opportunity to wrap the error returned by the task executor for, e.g. to + // trigger the circuit breaker. + WrapError(result invocationResult, err error) error +} func (e taskExecutor) executeInvocationTask( ctx context.Context, @@ -112,13 +140,9 @@ func (e taskExecutor) executeInvocationTask( ) defer cancel() - result, err := invokable.Invoke(callCtx, ns, e, task) - - saveErr := e.saveResult(callCtx, env, ref, result, err) - if saveErr != nil { - return saveErr - } - return invokable.WrapError(result, err) + result := invokable.Invoke(callCtx, ns, e, task) + saveErr := e.saveResult(callCtx, env, ref, result) + return invokable.WrapError(result, saveErr) } func (e taskExecutor) loadInvocationArgs( @@ -174,25 +198,24 @@ func (e taskExecutor) saveResult( env hsm.Environment, ref hsm.Ref, result invocationResult, - callErr error, ) error { return env.Access(ctx, ref, hsm.AccessWrite, func(node *hsm.Node) error { return hsm.MachineTransition(node, func(callback Callback) (hsm.TransitionOutput, error) { - switch result { - case ok: + switch result.(type) { + case invocationResultOK: return TransitionSucceeded.Apply(callback, EventSucceeded{ Time: env.Now(), }) - case retry: + case invocationResultRetry: return TransitionAttemptFailed.Apply(callback, EventAttemptFailed{ Time: env.Now(), - Err: callErr, + Err: result.error(), RetryPolicy: e.Config.RetryPolicy(), }) - case failed: + case invocationResultFail: return TransitionFailed.Apply(callback, EventFailed{ Time: env.Now(), - Err: callErr, + Err: result.error(), }) default: return hsm.TransitionOutput{}, queues.NewUnprocessableTaskError(fmt.Sprintf("unrecognized callback result %v", result)) diff --git a/components/callbacks/hsm_invocation.go b/components/callbacks/hsm_invocation.go index 8f15f6e444c..c2a9813eaf9 100644 --- a/components/callbacks/hsm_invocation.go +++ b/components/callbacks/hsm_invocation.go @@ -77,11 +77,11 @@ func (s hsmInvocation) WrapError(invocationResult, error) error { return nil } -func (s hsmInvocation) Invoke(ctx context.Context, ns *namespace.Namespace, e taskExecutor, task InvocationTask) (invocationResult, error) { +func (s hsmInvocation) Invoke(ctx context.Context, ns *namespace.Namespace, e taskExecutor, task InvocationTask) invocationResult { // TODO(Tianyu): Will this ever be too big for an RPC call? callbackArgSerialized, err := s.callbackArg.Marshal() if err != nil { - return failed, fmt.Errorf("failed to serialize completion event: %v", err) + return invocationResultFail{fmt.Errorf("failed to serialize completion event: %v", err)} } request := historyservice.InvokeStateMachineMethodRequest{ @@ -107,9 +107,9 @@ func (s hsmInvocation) Invoke(ctx context.Context, ns *namespace.Namespace, e ta if err != nil { e.Logger.Error("Callback request failed", tag.Error(err)) if isRetryableRpcResponse(err) { - return retry, err + return invocationResultRetry{err} } - return failed, err + return invocationResultFail{err} } - return ok, nil + return invocationResultOK{} } diff --git a/components/callbacks/nexus_invocation.go b/components/callbacks/nexus_invocation.go index 118fff4f4ec..341bce12c93 100644 --- a/components/callbacks/nexus_invocation.go +++ b/components/callbacks/nexus_invocation.go @@ -67,22 +67,18 @@ func outcomeTag(callCtx context.Context, response *http.Response, callErr error) } func (n nexusInvocation) WrapError(result invocationResult, err error) error { - // If the request permanently failed there is no need to raise the error - if result == failed { - return nil + if failure, ok := result.(invocationResultRetry); ok { + return queues.NewDestinationDownError(failure.err.Error(), err) } - if err != nil { - return queues.NewDestinationDownError(err.Error(), err) - } - return nil + return err } -func (n nexusInvocation) Invoke(ctx context.Context, ns *namespace.Namespace, e taskExecutor, task InvocationTask) (invocationResult, error) { +func (n nexusInvocation) Invoke(ctx context.Context, ns *namespace.Namespace, e taskExecutor, task InvocationTask) invocationResult { request, err := nexus.NewCompletionHTTPRequest(ctx, n.nexus.Url, n.completion) if err != nil { - return failed, queues.NewUnprocessableTaskError( + return invocationResultFail{queues.NewUnprocessableTaskError( fmt.Sprintf("failed to construct Nexus request: %v", err), - ) + )} } if request.Header == nil { request.Header = make(http.Header) @@ -116,17 +112,17 @@ func (n nexusInvocation) Invoke(ctx context.Context, ns *namespace.Namespace, e if err != nil { e.Logger.Error("Callback request failed with error", tag.Error(err)) - return retry, err + return invocationResultRetry{err} } if response.StatusCode >= 200 && response.StatusCode < 300 { - return ok, nil + return invocationResultOK{} } retryable := isRetryableHTTPResponse(response) err = fmt.Errorf("request failed with: %v", response.Status) e.Logger.Error("Callback request failed", tag.Error(err), tag.NewStringTag("status", response.Status), tag.NewBoolTag("retryable", retryable)) if retryable { - return retry, err + return invocationResultRetry{err} } - return failed, err + return invocationResultFail{err} } diff --git a/components/callbacks/tasks.go b/components/callbacks/tasks.go index f3db938d13a..5e1dcf6d01e 100644 --- a/components/callbacks/tasks.go +++ b/components/callbacks/tasks.go @@ -23,9 +23,12 @@ package callbacks import ( + "fmt" "time" + enumsspb "go.temporal.io/server/api/enums/v1" persistencespb "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/service/history/consts" "go.temporal.io/server/service/history/hsm" ) @@ -59,7 +62,19 @@ func (t InvocationTask) Deadline() time.Time { } func (InvocationTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.Node) error { - return hsm.ValidateNotTransitioned(ref, node) + cb, err := hsm.MachineData[Callback](node) + if err != nil { + return err + } + if cb.State() != enumsspb.CALLBACK_STATE_SCHEDULED { + return fmt.Errorf( + "%w: %w: expected a machine in SCHEDULED state, got %v", + consts.ErrStaleReference, + hsm.ErrInvalidTransition, + cb.State(), + ) + } + return nil } type InvocationTaskSerializer struct{} @@ -91,7 +106,19 @@ func (BackoffTask) Destination() string { } func (BackoffTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.Node) error { - return hsm.ValidateNotTransitioned(ref, node) + cb, err := hsm.MachineData[Callback](node) + if err != nil { + return err + } + if cb.State() != enumsspb.CALLBACK_STATE_BACKING_OFF { + return fmt.Errorf( + "%w: %w: expected a machine in BACKING_OFF state, got %v", + consts.ErrStaleReference, + hsm.ErrInvalidTransition, + cb.State(), + ) + } + return nil } type BackoffTaskSerializer struct{} diff --git a/components/nexusoperations/tasks.go b/components/nexusoperations/tasks.go index 37701034cce..e77b275f2fe 100644 --- a/components/nexusoperations/tasks.go +++ b/components/nexusoperations/tasks.go @@ -26,7 +26,8 @@ import ( "fmt" "time" - enumspb "go.temporal.io/server/api/enums/v1" + enumspb "go.temporal.io/api/enums/v1" + enumsspb "go.temporal.io/server/api/enums/v1" persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/service/history/consts" "go.temporal.io/server/service/history/hsm" @@ -114,7 +115,7 @@ func (InvocationTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.No if err != nil { return err } - if op.State() != enumspb.NEXUS_OPERATION_STATE_SCHEDULED { + if op.State() != enumsspb.NEXUS_OPERATION_STATE_SCHEDULED { return fmt.Errorf( "%w: operation is not in Scheduled state, current state: %v", consts.ErrStaleReference, @@ -160,7 +161,7 @@ func (t BackoffTask) Validate(_ *persistencespb.StateMachineRef, node *hsm.Node) if err != nil { return err } - if op.State() != enumspb.NEXUS_OPERATION_STATE_BACKING_OFF { + if op.State() != enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF { return fmt.Errorf( "%w: operation is not in BackingOff state, current state: %v", consts.ErrStaleReference, @@ -199,9 +200,18 @@ func (t CancelationTask) Destination() string { } func (CancelationTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.Node) error { - if err := hsm.ValidateNotTransitioned(ref, node); err != nil { + c, err := hsm.MachineData[Cancelation](node) + if err != nil { return err } + if c.State() != enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SCHEDULED { + return fmt.Errorf( + "%w: %w: expected a machine in SCHEDULED state, got %v", + consts.ErrStaleReference, + hsm.ErrInvalidTransition, + c.State(), + ) + } return node.CheckRunning() } @@ -234,9 +244,18 @@ func (CancelationBackoffTask) Destination() string { } func (CancelationBackoffTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.Node) error { - if err := hsm.ValidateNotTransitioned(ref, node); err != nil { + c, err := hsm.MachineData[Cancelation](node) + if err != nil { return err } + if c.State() != enumspb.NEXUS_OPERATION_CANCELLATION_STATE_BACKING_OFF { + return fmt.Errorf( + "%w: %w: expected a machine in BACKING_OFF state, got %v", + consts.ErrStaleReference, + hsm.ErrInvalidTransition, + c.State(), + ) + } return node.CheckRunning() } diff --git a/service/history/hsm/tasks.go b/service/history/hsm/tasks.go index fa2a2a3e5fa..00aaca44f24 100644 --- a/service/history/hsm/tasks.go +++ b/service/history/hsm/tasks.go @@ -78,7 +78,7 @@ type TaskSerializer interface { // generated. func ValidateNotTransitioned(ref *persistencespb.StateMachineRef, node *Node) error { if ref.MachineTransitionCount != node.InternalRepr().TransitionCount { - return fmt.Errorf("%w: state machine transitions != ref transitions", consts.ErrStaleReference) + return fmt.Errorf("%w: state machine transitions (%d) != ref transitions (%d)", consts.ErrStaleReference, node.InternalRepr().TransitionCount, ref.MachineTransitionCount) } return nil } From 2e8fdedec342ad6ec5c3ba7789bc686506ce52ef Mon Sep 17 00:00:00 2001 From: Roey Berman Date: Mon, 2 Dec 2024 13:51:09 -0800 Subject: [PATCH 2/3] Remove MachineTransitionCount tests --- .../history/statemachine_environment_test.go | 47 ------------------- 1 file changed, 47 deletions(-) diff --git a/service/history/statemachine_environment_test.go b/service/history/statemachine_environment_test.go index 14ac87b4c33..2897cfcdd76 100644 --- a/service/history/statemachine_environment_test.go +++ b/service/history/statemachine_environment_test.go @@ -240,17 +240,6 @@ func TestValidateStateMachineRef(t *testing.T) { require.ErrorIs(t, err, consts.ErrStaleReference) }, }, - { - name: "WithoutTransitionHistory/MachineTransitionInequality", - enableTransitionHistory: false, - mutateRef: func(ref *hsm.Ref) { - ref.StateMachineRef.MachineTransitionCount++ - }, - mutateNode: func(node *hsm.Node) {}, - assertOutcome: func(t *testing.T, err error) { - require.ErrorIs(t, err, consts.ErrStaleReference) - }, - }, { name: "WithTransitionHistory/Valid", enableTransitionHistory: true, @@ -271,42 +260,6 @@ func TestValidateStateMachineRef(t *testing.T) { require.NoError(t, err) }, }, - { - name: "WithoutTransitionHistory/NodeRebuilt/MachineTransitionInequality", - enableTransitionHistory: true, - mutateRef: func(ref *hsm.Ref) { - // this validates we fallback to the validation logic without transition history - ref.StateMachineRef.MachineTransitionCount++ - }, - mutateNode: func(node *hsm.Node) { - initialVersionedTransition := node.InternalRepr().InitialVersionedTransition - node.InternalRepr().InitialVersionedTransition = &persistencespb.VersionedTransition{ - NamespaceFailoverVersion: initialVersionedTransition.NamespaceFailoverVersion, - TransitionCount: 0, // transition history disabled when re-creating the node - } - }, - assertOutcome: func(t *testing.T, err error) { - require.ErrorIs(t, err, consts.ErrStaleReference) - }, - }, - { - name: "WithoutTransitionHistory/NodeTransitioned/MachineTransitionInequality", - enableTransitionHistory: true, - mutateRef: func(ref *hsm.Ref) { - // this validates we fallback to the validation logic without transition history - ref.StateMachineRef.MachineTransitionCount++ - }, - mutateNode: func(node *hsm.Node) { - lastUpdateVersionedTransition := node.InternalRepr().LastUpdateVersionedTransition - node.InternalRepr().InitialVersionedTransition = &persistencespb.VersionedTransition{ - NamespaceFailoverVersion: lastUpdateVersionedTransition.NamespaceFailoverVersion, - TransitionCount: 0, // transition history disabled when node transitioned. - } - }, - assertOutcome: func(t *testing.T, err error) { - require.ErrorIs(t, err, consts.ErrStaleReference) - }, - }, } for _, tc := range cases { tc := tc From 1e1c1f7adf1271febeef0d7f53a66782c450c750 Mon Sep 17 00:00:00 2001 From: Roey Berman Date: Tue, 3 Dec 2024 11:36:38 -0800 Subject: [PATCH 3/3] Address review comments --- components/callbacks/executors.go | 4 +- components/callbacks/hsm_invocation.go | 2 +- components/callbacks/tasks.go | 30 +-------------- components/nexusoperations/tasks.go | 52 +++----------------------- service/history/hsm/tasks.go | 19 ++++++++++ 5 files changed, 31 insertions(+), 76 deletions(-) diff --git a/components/callbacks/executors.go b/components/callbacks/executors.go index d9f522c89d9..f28af2a4a5c 100644 --- a/components/callbacks/executors.go +++ b/components/callbacks/executors.go @@ -76,6 +76,8 @@ type taskExecutor struct { // invocationResult is a marker for the callbackInvokable.Invoke result to indicate to the executor how to handle the // invocation outcome. type invocationResult interface { + // A marker for all possible implementations. + mustImplementInvocationResult() error() error } @@ -99,7 +101,7 @@ func (r invocationResultFail) error() error { return r.err } -// invocationResultFail marks an invocation as failed with the intent to retry. +// invocationResultRetry marks an invocation as failed with the intent to retry. type invocationResultRetry struct { err error } diff --git a/components/callbacks/hsm_invocation.go b/components/callbacks/hsm_invocation.go index c2a9813eaf9..080ce0f5ea8 100644 --- a/components/callbacks/hsm_invocation.go +++ b/components/callbacks/hsm_invocation.go @@ -81,7 +81,7 @@ func (s hsmInvocation) Invoke(ctx context.Context, ns *namespace.Namespace, e ta // TODO(Tianyu): Will this ever be too big for an RPC call? callbackArgSerialized, err := s.callbackArg.Marshal() if err != nil { - return invocationResultFail{fmt.Errorf("failed to serialize completion event: %v", err)} + return invocationResultFail{fmt.Errorf("failed to serialize completion event: %w", err)} } request := historyservice.InvokeStateMachineMethodRequest{ diff --git a/components/callbacks/tasks.go b/components/callbacks/tasks.go index 5e1dcf6d01e..a9cb5d73042 100644 --- a/components/callbacks/tasks.go +++ b/components/callbacks/tasks.go @@ -23,12 +23,10 @@ package callbacks import ( - "fmt" "time" enumsspb "go.temporal.io/server/api/enums/v1" persistencespb "go.temporal.io/server/api/persistence/v1" - "go.temporal.io/server/service/history/consts" "go.temporal.io/server/service/history/hsm" ) @@ -62,19 +60,7 @@ func (t InvocationTask) Deadline() time.Time { } func (InvocationTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.Node) error { - cb, err := hsm.MachineData[Callback](node) - if err != nil { - return err - } - if cb.State() != enumsspb.CALLBACK_STATE_SCHEDULED { - return fmt.Errorf( - "%w: %w: expected a machine in SCHEDULED state, got %v", - consts.ErrStaleReference, - hsm.ErrInvalidTransition, - cb.State(), - ) - } - return nil + return hsm.ValidateState[enumsspb.CallbackState, Callback](node, enumsspb.CALLBACK_STATE_SCHEDULED) } type InvocationTaskSerializer struct{} @@ -106,19 +92,7 @@ func (BackoffTask) Destination() string { } func (BackoffTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.Node) error { - cb, err := hsm.MachineData[Callback](node) - if err != nil { - return err - } - if cb.State() != enumsspb.CALLBACK_STATE_BACKING_OFF { - return fmt.Errorf( - "%w: %w: expected a machine in BACKING_OFF state, got %v", - consts.ErrStaleReference, - hsm.ErrInvalidTransition, - cb.State(), - ) - } - return nil + return hsm.ValidateState[enumsspb.CallbackState, Callback](node, enumsspb.CALLBACK_STATE_BACKING_OFF) } type BackoffTaskSerializer struct{} diff --git a/components/nexusoperations/tasks.go b/components/nexusoperations/tasks.go index e77b275f2fe..f8bd7ca06c1 100644 --- a/components/nexusoperations/tasks.go +++ b/components/nexusoperations/tasks.go @@ -111,18 +111,7 @@ func (InvocationTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.No if err := node.CheckRunning(); err != nil { return err } - op, err := hsm.MachineData[Operation](node) - if err != nil { - return err - } - if op.State() != enumsspb.NEXUS_OPERATION_STATE_SCHEDULED { - return fmt.Errorf( - "%w: operation is not in Scheduled state, current state: %v", - consts.ErrStaleReference, - op.State(), - ) - } - return nil + return hsm.ValidateState[enumsspb.NexusOperationState, Operation](node, enumsspb.NEXUS_OPERATION_STATE_SCHEDULED) } type InvocationTaskSerializer struct{} @@ -157,18 +146,7 @@ func (t BackoffTask) Validate(_ *persistencespb.StateMachineRef, node *hsm.Node) if err := node.CheckRunning(); err != nil { return err } - op, err := hsm.MachineData[Operation](node) - if err != nil { - return err - } - if op.State() != enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF { - return fmt.Errorf( - "%w: operation is not in BackingOff state, current state: %v", - consts.ErrStaleReference, - op.State(), - ) - } - return nil + return hsm.ValidateState[enumsspb.NexusOperationState, Operation](node, enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF) } type BackoffTaskSerializer struct{} @@ -200,19 +178,10 @@ func (t CancelationTask) Destination() string { } func (CancelationTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.Node) error { - c, err := hsm.MachineData[Cancelation](node) - if err != nil { + if err := node.CheckRunning(); err != nil { return err } - if c.State() != enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SCHEDULED { - return fmt.Errorf( - "%w: %w: expected a machine in SCHEDULED state, got %v", - consts.ErrStaleReference, - hsm.ErrInvalidTransition, - c.State(), - ) - } - return node.CheckRunning() + return hsm.ValidateState[enumspb.NexusOperationCancellationState, Cancelation](node, enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SCHEDULED) } type CancelationTaskSerializer struct{} @@ -244,19 +213,10 @@ func (CancelationBackoffTask) Destination() string { } func (CancelationBackoffTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.Node) error { - c, err := hsm.MachineData[Cancelation](node) - if err != nil { + if err := node.CheckRunning(); err != nil { return err } - if c.State() != enumspb.NEXUS_OPERATION_CANCELLATION_STATE_BACKING_OFF { - return fmt.Errorf( - "%w: %w: expected a machine in BACKING_OFF state, got %v", - consts.ErrStaleReference, - hsm.ErrInvalidTransition, - c.State(), - ) - } - return node.CheckRunning() + return hsm.ValidateState[enumspb.NexusOperationCancellationState, Cancelation](node, enumspb.NEXUS_OPERATION_CANCELLATION_STATE_BACKING_OFF) } type CancelationBackoffTaskSerializer struct{} diff --git a/service/history/hsm/tasks.go b/service/history/hsm/tasks.go index 00aaca44f24..53fcd31ca42 100644 --- a/service/history/hsm/tasks.go +++ b/service/history/hsm/tasks.go @@ -82,3 +82,22 @@ func ValidateNotTransitioned(ref *persistencespb.StateMachineRef, node *Node) er } return nil } + +// ValidateState returns a [consts.ErrStaleReference] if the machine is not in the expected state. +func ValidateState[S comparable, T StateMachine[S]](node *Node, expected S) error { + cb, err := MachineData[T](node) + if err != nil { + return err + } + if cb.State() != expected { + return fmt.Errorf( + "%w: %w: expected a %s machine in %v state, got %v", + consts.ErrStaleReference, + ErrInvalidTransition, + node.Key.ID, + expected, + cb.State(), + ) + } + return nil +}