diff --git a/components/nexusoperations/statemachine_test.go b/components/nexusoperations/statemachine_test.go index 7002ba0c26f..38dea1fc859 100644 --- a/components/nexusoperations/statemachine_test.go +++ b/components/nexusoperations/statemachine_test.go @@ -86,10 +86,13 @@ func TestAddChild(t *testing.T) { } child, err := nexusoperations.AddChild(root, "test-id", event, []byte("token"), false) require.NoError(t, err) - oap := root.Outputs() - require.Equal(t, 1, len(oap)) - require.Equal(t, 1, len(oap[0].Outputs)) - tc.assertTasks(t, oap[0].Outputs[0].Tasks) + opLog, err := root.Outputs() + require.NoError(t, err) + require.Equal(t, 1, len(opLog)) + transitionOp, ok := opLog[0].(hsm.TransitionOperation) + require.True(t, ok) + tc.assertTasks(t, transitionOp.Output.Tasks) + op, err := hsm.MachineData[nexusoperations.Operation](child) require.NoError(t, err) require.Equal(t, enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, op.State()) @@ -190,11 +193,13 @@ func TestRetry(t *testing.T) { RetryPolicy: backoff.NewExponentialRetryPolicy(time.Second), }) })) - oap := node.Outputs() - require.Equal(t, 1, len(oap)) - require.Equal(t, 1, len(oap[0].Outputs)) - require.Equal(t, 1, len(oap[0].Outputs[0].Tasks)) - boTask := oap[0].Outputs[0].Tasks[0].(nexusoperations.BackoffTask) // nolint:revive + opLog, err := node.Parent.Outputs() + require.NoError(t, err) + require.Equal(t, 1, len(opLog)) + transitionOp, ok := opLog[0].(hsm.TransitionOperation) + require.True(t, ok) + require.Equal(t, 1, len(transitionOp.Output.Tasks)) + boTask := transitionOp.Output.Tasks[0].(nexusoperations.BackoffTask) // nolint:revive op, err := hsm.MachineData[nexusoperations.Operation](node) require.NoError(t, err) require.Equal(t, enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF, op.State()) @@ -208,11 +213,13 @@ func TestRetry(t *testing.T) { Node: node, }) })) - oap = node.Outputs() - require.Equal(t, 1, len(oap)) - require.Equal(t, 1, len(oap[0].Outputs)) - require.Equal(t, 1, len(oap[0].Outputs[0].Tasks)) - invocationTask := oap[0].Outputs[0].Tasks[0].(nexusoperations.InvocationTask) // nolint:revive + opLog, err = node.Parent.Outputs() + require.NoError(t, err) + require.Equal(t, 1, len(opLog)) + transitionOp, ok = opLog[0].(hsm.TransitionOperation) + require.True(t, ok) + require.Equal(t, 1, len(transitionOp.Output.Tasks)) + invocationTask := transitionOp.Output.Tasks[0].(nexusoperations.InvocationTask) // nolint:revive require.Equal(t, "endpoint", invocationTask.EndpointName) op, err = hsm.MachineData[nexusoperations.Operation](node) require.NoError(t, err) @@ -315,10 +322,12 @@ func TestCompleteFromAttempt(t *testing.T) { require.NoError(t, hsm.MachineTransition(node, func(op nexusoperations.Operation) (hsm.TransitionOutput, error) { return tc.transition(node, op) })) - oap := node.Outputs() - require.Equal(t, 1, len(oap)) - require.Equal(t, 1, len(oap[0].Outputs)) - require.Equal(t, 0, len(oap[0].Outputs[0].Tasks)) + opLog, err := node.Parent.Outputs() + require.NoError(t, err) + require.Equal(t, 1, len(opLog)) + transitionOp, ok := opLog[0].(hsm.TransitionOperation) + require.True(t, ok, "expected TransitionOperation") + require.Empty(t, transitionOp.Output.Tasks) op, err := hsm.MachineData[nexusoperations.Operation](node) require.NoError(t, err) tc.assertState(t, op) @@ -431,10 +440,13 @@ func TestCompleteExternally(t *testing.T) { require.NoError(t, hsm.MachineTransition(node, func(op nexusoperations.Operation) (hsm.TransitionOutput, error) { return tc.transition(node, op) })) - oap := node.Outputs() - require.Equal(t, 1, len(oap)) - require.Equal(t, 1, len(oap[0].Outputs)) - require.Equal(t, 0, len(oap[0].Outputs[0].Tasks)) + opLog, err := node.Parent.Outputs() + require.NoError(t, err) + require.Equal(t, 1, len(opLog)) + transitionOp, ok := opLog[0].(hsm.TransitionOperation) + require.True(t, ok) + require.Empty(t, transitionOp.Output.Tasks) + op, err := hsm.MachineData[nexusoperations.Operation](node) require.NoError(t, err) tc.assertState(t, op) @@ -583,9 +595,13 @@ func TestCancelationBeforeStarted(t *testing.T) { require.NoError(t, hsm.MachineTransition(root, func(op nexusoperations.Operation) (hsm.TransitionOutput, error) { return op.Cancel(root, time.Now()) })) - require.Len(t, root.Outputs(), 1) - require.Len(t, root.Outputs()[0].Outputs[0].Tasks, 1) - require.Equal(t, nexusoperations.TaskTypeInvocation, root.Outputs()[0].Outputs[0].Tasks[0].Type()) + opLog, err := root.Parent.Outputs() + require.NoError(t, err) + require.Len(t, opLog, 2) + transitionOp, ok := opLog[0].(hsm.TransitionOperation) + require.True(t, ok) + require.Len(t, transitionOp.Output.Tasks, 1) + require.Equal(t, nexusoperations.TaskTypeInvocation, transitionOp.Output.Tasks[0].Type()) node, err := root.Child([]hsm.Key{nexusoperations.CancelationMachineKey}) require.NoError(t, err) @@ -604,10 +620,18 @@ func TestCancelationBeforeStarted(t *testing.T) { }, }) })) - require.Len(t, root.Outputs(), 2) - require.Len(t, root.Outputs()[0].Outputs[0].Tasks, 0) - require.Len(t, root.Outputs()[1].Outputs[0].Tasks, 1) - require.Equal(t, nexusoperations.TaskTypeCancelation, root.Outputs()[1].Outputs[0].Tasks[0].Type()) + opLog, err = root.Parent.Outputs() + require.NoError(t, err) + require.Len(t, opLog, 2) + + firstOp, ok := opLog[0].(hsm.TransitionOperation) + require.True(t, ok) + require.Len(t, firstOp.Output.Tasks, 1) + require.Equal(t, nexusoperations.TaskTypeCancelation, firstOp.Output.Tasks[0].Type()) + + secondOp, ok := opLog[1].(hsm.TransitionOperation) + require.True(t, ok) + require.Empty(t, secondOp.Output.Tasks) node, err = root.Child([]hsm.Key{nexusoperations.CancelationMachineKey}) require.NoError(t, err) diff --git a/service/history/hsm/tree.go b/service/history/hsm/tree.go index 86d1f52ab2b..3f7b2e71fff 100644 --- a/service/history/hsm/tree.go +++ b/service/history/hsm/tree.go @@ -28,6 +28,7 @@ import ( "fmt" "reflect" "slices" + "strings" enumspb "go.temporal.io/api/enums/v1" historypb "go.temporal.io/api/history/v1" @@ -61,6 +62,11 @@ type Key struct { ID string } +// String returns a human-readable representation of a Key +func (k Key) String() string { + return fmt.Sprintf("%s:%s", k.Type, k.ID) +} + // StateMachineDefinition provides type information and a serializer for a state machine. type StateMachineDefinition interface { Type() string @@ -90,20 +96,79 @@ type cachedMachine struct { deleted bool } -type OperationLog struct { - // All transitions, including regular state changes and deletions - TransitionsByPath map[string][]TransitionOutputWithCount - // Track topmost deleted paths to filter outputs efficiently - DeletedPaths [][]Key +// formatPath returns a human-readable representation of a path. +func formatPath(path []Key) string { + if len(path) == 0 { + return "" + } + var parts []string + for _, key := range path { + parts = append(parts, key.String()) + } + return strings.Join(parts, "/") } -func (o *OperationLog) IsDeleted(path []Key) bool { - for _, deletedPath := range o.DeletedPaths { - if isPathPrefix(deletedPath, path) { - return true +// Operation represents a state change in the hierarchical state machine tree. +// Each operation is associated with a path in the tree and provides information +// about what occurred at that location. +type Operation interface { + // Path returns the full path to the node where this operation occurred. + Path() []Key + mustImplementOperation() +} + +// DeleteOperation represents the deletion of a node in the tree. +type DeleteOperation struct { + // path is the full path to the deleted node. + path []Key +} + +func (d DeleteOperation) Path() []Key { return d.path } + +func (DeleteOperation) mustImplementOperation() {} + +// String returns a human-readable representation of a DeleteOperation. +func (d DeleteOperation) String() string { + return fmt.Sprintf("Delete(%s)", formatPath(d.path)) +} + +// TransitionOperation represents a state transition that occurred at a specific +// node in the tree. +type TransitionOperation struct { + // path is the full path to the node where the transition occurred. + path []Key + // Output contains the transition output and associated metadata. + Output TransitionOutputWithCount +} + +func (t TransitionOperation) Path() []Key { return t.path } + +func (TransitionOperation) mustImplementOperation() {} + +// String returns a human-readable representation of a TransitionOperation. +func (t TransitionOperation) String() string { + return fmt.Sprintf("Transition(%s)[count=%d]", formatPath(t.path), t.Output.TransitionCount) +} + +// OperationLog represents an ordered sequence of operations that have occurred in the tree. Operations are ordered +// chronologically. +type OperationLog []Operation + +// String returns a human-readable representation of an OperationLog. +func (ol OperationLog) String() string { + var ops []string + for _, op := range ol { + switch o := op.(type) { + case DeleteOperation: + ops = append(ops, o.String()) + case TransitionOperation: + ops = append(ops, o.String()) + default: + // Fallback for unknown operation types + ops = append(ops, fmt.Sprintf("%T(%v)", op, op.Path())) } } - return false + return fmt.Sprintf("[%s]", strings.Join(ops, ", ")) } // NodeBackend is a concrete implementation to support interacting with the underlying platform. @@ -141,7 +206,7 @@ type Node struct { persistence *persistencespb.StateMachineNode definition StateMachineDefinition backend NodeBackend - opLog *OperationLog + opLog OperationLog } // NewRoot creates a new root [Node]. @@ -178,9 +243,7 @@ func NewRoot( children: make(map[Key]*Node), }, backend: backend, - opLog: &OperationLog{ - TransitionsByPath: make(map[string][]TransitionOutputWithCount), - }, + opLog: make(OperationLog, 0), }, nil } @@ -201,10 +264,6 @@ type TransitionOutputWithCount struct { TransitionOutput TransitionCount int64 } -type PathAndOutputs struct { - Path []Key - Outputs []TransitionOutputWithCount -} func (n *Node) Path() []Key { if n.Parent == nil { @@ -213,40 +272,24 @@ func (n *Node) Path() []Key { return append(n.Parent.Path(), n.Key) } -// Outputs returns all outputs produced by transitions on this tree. -func (n *Node) Outputs() []PathAndOutputs { - root := n.root() - currentPath := n.Path() - - if root.opLog.IsDeleted(currentPath) { - return nil - } - - var paos []PathAndOutputs - pathKey := fmt.Sprint(currentPath) - if transitions := root.opLog.TransitionsByPath[pathKey]; len(transitions) > 0 { - paos = append(paos, PathAndOutputs{ - Path: currentPath, - Outputs: transitions, - }) +// Outputs returns a compacted operation log from the root state machine. The operation log maintains a sequence of +// state changes, while compaction ensures operations are properly filtered when portions of the state machine tree are +// deleted. For details on compaction rules, see OperationLog.compact(). +// This method must be called on the root node only. +func (n *Node) Outputs() (OperationLog, error) { + if n.Parent != nil { + return nil, fmt.Errorf("can only be called from root node") } - for _, child := range n.cache.children { - paos = append(paos, child.Outputs()...) - } - - return paos + compacted := n.opLog.compact() + return compacted, nil } // ClearTransactionState resets all transition outputs in the tree. // This should be called at the end of every transaction where the transitions are performed to avoid emitting duplicate // transition outputs. func (n *Node) ClearTransactionState() { - root := n.root() - if root.opLog != nil { - root.opLog.DeletedPaths = nil - root.opLog.TransitionsByPath = make(map[string][]TransitionOutputWithCount) - } + n.root().opLog = nil n.cache.dirty = false for _, child := range n.cache.children { @@ -375,7 +418,8 @@ func (n *Node) AddChild(key Key, data any) (*Node, error) { return node, nil } -// DeleteChild deletes an immediate child node and all its descendants from the tree. +// DeleteChild marks a child node and all its descendants as deleted, removing them from the cache. No transitions will +// be allowed after deleting a child. func (n *Node) DeleteChild(key Key) error { if n.cache.deleted { return fmt.Errorf("%w: cannot delete from deleted node: %v", ErrStateMachineInvalidState, n.Key) @@ -394,9 +438,10 @@ func (n *Node) DeleteChild(key Key) error { return err } - // Record deletion root := n.root() - root.opLog.DeletedPaths = append(root.opLog.DeletedPaths, child.Path()) + root.opLog = append(root.opLog, DeleteOperation{ + path: child.Path(), + }) // Remove from persistence and cache machinesMap := n.persistence.Children[key.Type] @@ -582,10 +627,12 @@ func MachineTransition[T any](n *Node, transitionFn func(T) (TransitionOutput, e n.cache.dirty = true root := n.root() - pathKey := fmt.Sprint(n.Path()) - root.opLog.TransitionsByPath[pathKey] = append(root.opLog.TransitionsByPath[pathKey], TransitionOutputWithCount{ - TransitionOutput: output, - TransitionCount: n.persistence.TransitionCount, + root.opLog = append(root.opLog, TransitionOperation{ + path: n.Path(), + Output: TransitionOutputWithCount{ + TransitionOutput: output, + TransitionCount: n.persistence.TransitionCount, + }, }) return nil @@ -700,6 +747,40 @@ func (n *Node) root() *Node { return root } +// compact filters the operation log based on deletion status. For any operation path: +// - If any parent in the path is deleted, the operation is excluded +// - If the target of the operation is deleted, only its DeleteOperation is kept +// - Otherwise, the operation is included +func (ol OperationLog) compact() OperationLog { + if len(ol) == 0 { + return ol + } + + root := newOpNode(Key{}) + for _, op := range ol { + node := root.getOrCreateNode(op.Path()) + if _, ok := op.(DeleteOperation); ok { + node.isDeleted = true + } + } + + return root.collect(ol) +} + +// getOrCreateNode traverses/creates path and returns the final node. +func (n *opNode) getOrCreateNode(path []Key) *opNode { + current := n + for _, key := range path { + next, exists := current.children[key] + if !exists { + next = newOpNode(key) + current.children[key] = next + } + current = next + } + return current +} + func isPathPrefix(prefix, path []Key) bool { if len(prefix) > len(path) { return false @@ -707,3 +788,59 @@ func isPathPrefix(prefix, path []Key) bool { return slices.Equal(prefix, path[:len(prefix)]) } + +// opNode represents a node in the operation tree, tracking deletion status. +type opNode struct { + key Key + children map[Key]*opNode + isDeleted bool +} + +// newOpNode creates a new operation tree node with the given key. +func newOpNode(key Key) *opNode { + return &opNode{ + key: key, + children: make(map[Key]*opNode), + } +} + +// collect returns an ordered subset of the input operation log based on the deletion status tracked in this operation +// tree. The original chronological order of operations is preserved. The status of each node in the path (not just the +// operation's target) determines whether the operation is included in the result. +func (n *opNode) collect(oplog OperationLog) OperationLog { + var result OperationLog + + for _, op := range oplog { + path := op.Path() + current := n + + var isAncestorDeleted bool + + // Traverse the path to the target node, checking deletion status + for i, key := range path { + child, exists := current.children[key] + if !exists { + panic("path must exist in tree") + } + + if child.isDeleted { + isAncestorDeleted = true + if i == len(path)-1 { + if deleteOp, ok := op.(DeleteOperation); ok { + result = append(result, deleteOp) + } + } + break + } + current = child + } + + if isAncestorDeleted { + continue + } + + result = append(result, op) + } + + return result +} diff --git a/service/history/hsm/tree_test.go b/service/history/hsm/tree_test.go index 6bffc4eb6c3..0bf5399f99d 100644 --- a/service/history/hsm/tree_test.go +++ b/service/history/hsm/tree_test.go @@ -25,6 +25,7 @@ package hsm_test import ( "context" "fmt" + "slices" "sort" "testing" @@ -80,7 +81,9 @@ func TestNode_MaintainsCachedData(t *testing.T) { require.NoError(t, err) require.False(t, root.Dirty()) - require.Equal(t, 0, len(root.Outputs())) + opLog, err := root.Outputs() + require.NoError(t, err) + require.Equal(t, 0, len(opLog)) err = hsm.MachineTransition(root, func(d *hsmtest.Data) (hsm.TransitionOutput, error) { d.SetState(hsmtest.State2) @@ -91,10 +94,14 @@ func TestNode_MaintainsCachedData(t *testing.T) { // Our data variable is a pointer to the cache. require.Equal(t, hsmtest.State2, v1.State()) - require.NoError(t, err) require.True(t, root.Dirty()) - require.Equal(t, 1, len(root.Outputs())) - require.Equal(t, []hsm.Key{}, root.Outputs()[0].Path) + opLog, err = root.Outputs() + require.NoError(t, err) + require.Equal(t, 1, len(opLog)) + + transOp, ok := opLog[0].(hsm.TransitionOperation) + require.True(t, ok) + require.Equal(t, []hsm.Key{}, transOp.Path()) } func TestNode_MaintainsChildCache(t *testing.T) { @@ -143,9 +150,14 @@ func TestNode_MaintainsChildCache(t *testing.T) { }) require.NoError(t, err) require.True(t, root.Dirty()) // Should now be dirty again. - require.Equal(t, 1, len(root.Outputs())) - require.Equal(t, int64(1), root.Outputs()[0].Outputs[0].TransitionCount) - require.Equal(t, []hsm.Key{key}, root.Outputs()[0].Path) + + opLog, err := root.Outputs() + require.NoError(t, err) + require.Equal(t, 1, len(opLog)) + transOp, ok := opLog[0].(hsm.TransitionOperation) + require.True(t, ok) + require.Equal(t, int64(1), transOp.Output.TransitionCount) + require.Equal(t, []hsm.Key{key}, transOp.Path()) // Cache when loaded from persistence. path := []hsm.Key{{Type: def1.Type(), ID: "persisted"}, {Type: def1.Type(), ID: "persisted-child"}} @@ -384,12 +396,13 @@ func TestNode_Sync(t *testing.T) { protorequire.ProtoEqual(t, incomingNode.InternalRepr().LastUpdateVersionedTransition, currentNode.InternalRepr().LastUpdateVersionedTransition) require.Equal(t, currentNodeTransitionCount+1, currentNode.InternalRepr().TransitionCount) - paos := currentNode.Outputs() - require.Len(t, paos, 1) - pao := paos[0] - require.Equal(t, currentNode.Path(), pao.Path) - require.Len(t, pao.Outputs, 1) - require.Len(t, pao.Outputs[0].Tasks, 2) + opLog, err := currentNode.Outputs() + require.NoError(t, err) + require.Len(t, opLog, 1) + transOp, ok := opLog[0].(hsm.TransitionOperation) + require.True(t, ok) + require.Equal(t, currentNode.Path(), transOp.Path()) + require.Len(t, transOp.Output.Tasks, 2) }) } } @@ -498,7 +511,7 @@ func TestNode_DeleteChild(t *testing.T) { }) require.NoError(t, err) - err = l1.DeleteChild(hsm.Key{Type: def1.Type(), ID: "l2"}) + err = l1.DeleteChild(l2.Key) require.NoError(t, err) err = hsm.MachineTransition(l2, func(d *hsmtest.Data) (hsm.TransitionOutput, error) { @@ -506,18 +519,24 @@ func TestNode_DeleteChild(t *testing.T) { }) require.ErrorIs(t, err, hsm.ErrStateMachineInvalidState) - l2Outputs := l2.Outputs() - require.Empty(t, l2Outputs) + _, err = l2.Outputs() + require.Error(t, err) + + opLog, err := root.Outputs() + require.NoError(t, err) + require.Len(t, opLog, 1) // After compaction, only the delete operation remains + _, ok := opLog[0].(hsm.DeleteOperation) + require.True(t, ok) // Cannot delete non-existent or already deleted nodes err = l1.DeleteChild(hsm.Key{Type: def1.Type(), ID: "nonexistent"}) require.ErrorIs(t, err, hsm.ErrStateMachineNotFound) - err = l1.DeleteChild(hsm.Key{Type: def1.Type(), ID: "l2"}) + err = l1.DeleteChild(l2.Key) require.ErrorIs(t, err, hsm.ErrStateMachineNotFound) } -func TestOperationLog_IsDeleted(t *testing.T) { +func TestNode_PreservesUnrelatedOperations(t *testing.T) { root, err := hsm.NewRoot(reg, def1.Type(), hsmtest.NewData(hsmtest.State1), make(map[string]*persistencespb.StateMachineMap), &backend{}) require.NoError(t, err) @@ -540,16 +559,27 @@ func TestOperationLog_IsDeleted(t *testing.T) { }) require.NoError(t, err) - err = l1.DeleteChild(hsm.Key{Type: def1.Type(), ID: "l2"}) + err = l1.DeleteChild(l2.Key) require.NoError(t, err) - l2Outputs := l2.Outputs() - require.Empty(t, l2Outputs) + opLog, err := root.Outputs() + require.NoError(t, err) - // Non-deleted sibling should still have outputs - siblingOutputs := l2_sibling.Outputs() - require.Len(t, siblingOutputs, 1) - require.Equal(t, l2_sibling.Path(), siblingOutputs[0].Path) + foundDelete := slices.ContainsFunc(opLog, func(op hsm.Operation) bool { + if delOp, ok := op.(hsm.DeleteOperation); ok { + return slices.Equal(delOp.Path(), l2.Path()) + } + return false + }) + require.True(t, foundDelete, "should find l2's delete operation") + + foundTransition := slices.ContainsFunc(opLog, func(op hsm.Operation) bool { + if transOp, ok := op.(hsm.TransitionOperation); ok { + return slices.Equal(transOp.Path(), l2_sibling.Path()) + } + return false + }) + require.True(t, foundTransition, "should find l2_sibling's transition") } func TestNode_OutputsWithDeletion(t *testing.T) { @@ -573,23 +603,28 @@ func TestNode_OutputsWithDeletion(t *testing.T) { }) require.NoError(t, err) - rootOutputs := root.Outputs() - require.Len(t, rootOutputs, 2) // root and l2 transitions - - // Delete l1 (and by extension l2) - err = root.DeleteChild(hsm.Key{Type: def1.Type(), ID: "l1"}) + err = root.DeleteChild(l1.Key) require.NoError(t, err) - rootOutputs = root.Outputs() - require.Len(t, rootOutputs, 1) - require.Equal(t, []hsm.Key{}, rootOutputs[0].Path) - - // Deleted nodes have no outputs - l1Outputs := l1.Outputs() - require.Empty(t, l1Outputs) + outputs, err := root.Outputs() + require.NoError(t, err) + require.Len(t, outputs, 2) // root's transition and l1's deletion (l2's operations excluded due to l1 deletion) - l2Outputs := l2.Outputs() - require.Empty(t, l2Outputs) + var foundTransition, foundL1Deletion bool + for _, op := range outputs { + switch o := op.(type) { + case hsm.TransitionOperation: + if slices.Equal(o.Path(), []hsm.Key{}) { // root's path + foundTransition = true + } + case hsm.DeleteOperation: + if slices.Equal(o.Path(), l1.Path()) { + foundL1Deletion = true + } + } + } + require.True(t, foundTransition, "should have root's transition") + require.True(t, foundL1Deletion, "should have l1's deletion") } func TestNode_ClearTransactionState(t *testing.T) { @@ -605,16 +640,18 @@ func TestNode_ClearTransactionState(t *testing.T) { }) require.NoError(t, err) - err = root.DeleteChild(hsm.Key{Type: def1.Type(), ID: "l1"}) + err = root.DeleteChild(l1.Key) require.NoError(t, err) - outputs := root.Outputs() - require.NotEmpty(t, outputs) + opLog, err := root.Outputs() + require.NoError(t, err) + require.NotEmpty(t, opLog) root.ClearTransactionState() - outputs = root.Outputs() - require.Empty(t, outputs) + opLog, err = root.Outputs() + require.NoError(t, err) + require.Empty(t, opLog) require.False(t, root.Dirty()) err = hsm.MachineTransition(l1, func(d *hsmtest.Data) (hsm.TransitionOutput, error) { @@ -649,15 +686,25 @@ func TestNode_DeleteDeepHierarchy(t *testing.T) { err = nodes[1].DeleteChild(hsm.Key{Type: def1.Type(), ID: "node2"}) require.NoError(t, err) - // Verify outputs at each level - for i, node := range nodes { - outputs := node.Outputs() - if i <= 1 { // Above deletion - require.NotEmpty(t, outputs) - } else { // At or below deletion - require.Empty(t, outputs) + opLog, err := root.Outputs() + require.NoError(t, err) + require.NotEmpty(t, opLog) + + // Count transitions and deletions + var transitionCount, deletionCount int + for _, op := range opLog { + switch o := op.(type) { + case hsm.TransitionOperation: + transitionCount++ + pathLen := len(o.Path()) + require.True(t, pathLen <= 3, "should not see transitions for deleted nodes") + case hsm.DeleteOperation: + deletionCount++ } } + + require.Equal(t, 2, transitionCount, "should see transitions for nodes above deletion") + require.Equal(t, 1, deletionCount, "should see one deletion operation") } func TestNode_MixedOperationsBeforeDeletion(t *testing.T) { @@ -675,15 +722,38 @@ func TestNode_MixedOperationsBeforeDeletion(t *testing.T) { require.NoError(t, err) } - outputs := l1.Outputs() - require.Len(t, outputs, 1) - require.Len(t, outputs[0].Outputs, 3) + // Count transition operations for l1 from root's outputs + opLog, err := root.Outputs() + require.NoError(t, err) + transitionCount := 0 + for _, op := range opLog { + if transOp, ok := op.(hsm.TransitionOperation); ok { + if slices.Equal(transOp.Path(), l1.Path()) { + transitionCount++ + } + } + } + require.Equal(t, 3, transitionCount, "should see all transitions before deletion") - err = root.DeleteChild(hsm.Key{Type: def1.Type(), ID: "l1"}) + err = root.DeleteChild(l1.Key) require.NoError(t, err) - outputs = l1.Outputs() - require.Empty(t, outputs) + opLog, err = root.Outputs() + require.NoError(t, err) + require.NotEmpty(t, opLog) + + var foundDelete bool + for _, op := range opLog { + if delOp, ok := op.(hsm.DeleteOperation); ok { + if slices.Equal(delOp.Path(), l1.Path()) { + foundDelete = true + } + } + if transOp, ok := op.(hsm.TransitionOperation); ok { + require.NotEqual(t, l1.Path(), transOp.Path(), "should not see transitions for deleted node") + } + } + require.True(t, foundDelete, "should find deletion operation") } func TestNode_MultipleDeletedPaths(t *testing.T) { @@ -712,16 +782,219 @@ func TestNode_MultipleDeletedPaths(t *testing.T) { }) require.NoError(t, err) - err = branch1.DeleteChild(hsm.Key{Type: def1.Type(), ID: "b1child"}) + err = branch1.DeleteChild(b1child.Key) + require.NoError(t, err) + err = branch2.DeleteChild(b2child.Key) + require.NoError(t, err) + + outputs, err := root.Outputs() + require.NoError(t, err) + + // Verify both deletion operations exist + var foundB1ChildDel, foundB2ChildDel bool + for _, op := range outputs { + if del, ok := op.(hsm.DeleteOperation); ok { + if slices.Equal(del.Path(), b1child.Path()) { + foundB1ChildDel = true + } + if slices.Equal(del.Path(), b2child.Path()) { + foundB2ChildDel = true + } + } + } + require.True(t, foundB1ChildDel, "should find b1child deletion") + require.True(t, foundB2ChildDel, "should find b2child deletion") +} + +func TestNode_PathPrefixEdgeCases(t *testing.T) { + root, err := hsm.NewRoot(reg, def1.Type(), hsmtest.NewData(hsmtest.State1), make(map[string]*persistencespb.StateMachineMap), &backend{}) + require.NoError(t, err) + + // Test similar paths at different levels: + // root + // ├── node1 + // │ └── child1 + // │ └── node1 (same name as parent) + // ├── node10 + // └── node11 + + node1, err := root.AddChild(hsm.Key{Type: def1.Type(), ID: "node1"}, hsmtest.NewData(hsmtest.State1)) + require.NoError(t, err) + node10, err := root.AddChild(hsm.Key{Type: def1.Type(), ID: "node10"}, hsmtest.NewData(hsmtest.State1)) + require.NoError(t, err) + node11, err := root.AddChild(hsm.Key{Type: def1.Type(), ID: "node11"}, hsmtest.NewData(hsmtest.State1)) + require.NoError(t, err) + + // Add a path that reuses "node1" at a deeper level + child1, err := node1.AddChild(hsm.Key{Type: def1.Type(), ID: "child1"}, hsmtest.NewData(hsmtest.State1)) require.NoError(t, err) - err = branch2.DeleteChild(hsm.Key{Type: def1.Type(), ID: "b2child"}) + deepNode1, err := child1.AddChild(hsm.Key{Type: def1.Type(), ID: "node1"}, hsmtest.NewData(hsmtest.State1)) require.NoError(t, err) - require.Empty(t, b1child.Outputs()) - require.Empty(t, b2child.Outputs()) + // Add transitions to all nodes + for _, node := range []*hsm.Node{node1, node10, node11, child1, deepNode1} { + err = hsm.MachineTransition(node, func(d *hsmtest.Data) (hsm.TransitionOutput, error) { + d.SetState(hsmtest.State2) + return hsm.TransitionOutput{}, nil + }) + require.NoError(t, err) + } + + err = root.DeleteChild(node1.Key) + require.NoError(t, err) + + // Verify operations + opLog, err := root.Outputs() + require.NoError(t, err) + + var transitions, deletes [][]hsm.Key + for _, op := range opLog { + switch o := op.(type) { + case hsm.TransitionOperation: + transitions = append(transitions, o.Path()) + case hsm.DeleteOperation: + deletes = append(deletes, o.Path()) + } + } - branch1Outputs := branch1.Outputs() - require.NotEmpty(t, branch1Outputs) - branch2Outputs := branch2.Outputs() - require.NotEmpty(t, branch2Outputs) + // Should see: + // - Delete operation for node1 and its subtree + // - Transitions for node10 and node11 + // - No transitions from node1's subtree + require.Len(t, deletes, 1, "should have one delete operation") + require.Equal(t, node1.Path(), deletes[0], "delete should be for top-level node1") + + // Only node10 and node11 transitions should remain + require.Len(t, transitions, 2, "should have only node10 and node11 transitions") + for _, transition := range transitions { + require.True(t, slices.Equal(transition, node10.Path()) || slices.Equal(transition, node11.Path()), + "remaining transitions should only be for node10 or node11") + } + + // Verify root operations are preserved + err = hsm.MachineTransition(root, func(d *hsmtest.Data) (hsm.TransitionOutput, error) { + d.SetState(hsmtest.State2) + return hsm.TransitionOutput{}, nil + }) + require.NoError(t, err) + + opLog, err = root.Outputs() + require.NoError(t, err) + + // Find root's transition + foundRootTransition := false + for _, op := range opLog { + if transOp, ok := op.(hsm.TransitionOperation); ok { + if len(transOp.Path()) == 0 { + foundRootTransition = true + break + } + } + } + require.True(t, foundRootTransition, "root's transition should be preserved") +} + +func TestNode_ComplexHierarchicalDeletions(t *testing.T) { + root, err := hsm.NewRoot(reg, def1.Type(), hsmtest.NewData(hsmtest.State1), make(map[string]*persistencespb.StateMachineMap), &backend{}) + require.NoError(t, err) + + // Create a three-level hierarchy with siblings + parent, err := root.AddChild(hsm.Key{Type: def1.Type(), ID: "parent"}, hsmtest.NewData(hsmtest.State1)) + require.NoError(t, err) + + child1, err := parent.AddChild(hsm.Key{Type: def1.Type(), ID: "child1"}, hsmtest.NewData(hsmtest.State1)) + require.NoError(t, err) + child2, err := parent.AddChild(hsm.Key{Type: def1.Type(), ID: "child2"}, hsmtest.NewData(hsmtest.State1)) + require.NoError(t, err) + + grandchild1, err := child1.AddChild(hsm.Key{Type: def1.Type(), ID: "grandchild1"}, hsmtest.NewData(hsmtest.State1)) + require.NoError(t, err) + grandchild2, err := child2.AddChild(hsm.Key{Type: def1.Type(), ID: "grandchild2"}, hsmtest.NewData(hsmtest.State1)) + require.NoError(t, err) + + // Add operations to all nodes + for _, node := range []*hsm.Node{parent, child1, child2, grandchild1, grandchild2} { + err = hsm.MachineTransition(node, func(d *hsmtest.Data) (hsm.TransitionOutput, error) { + d.SetState(hsmtest.State2) + return hsm.TransitionOutput{}, nil + }) + require.NoError(t, err) + } + + // Delete parent while children have pending operations + err = root.DeleteChild(parent.Key) + require.NoError(t, err) + + // Verify only parent's delete operation remains, all child operations are removed + opLog, err := root.Outputs() + require.NoError(t, err) + + require.Len(t, opLog, 1, "should only see parent's delete operation") + deleteOp, ok := opLog[0].(hsm.DeleteOperation) + require.True(t, ok) + require.Equal(t, parent.Path(), deleteOp.Path()) +} + +func TestNode_CompactionOrderPreservation(t *testing.T) { + root, err := hsm.NewRoot(reg, def1.Type(), hsmtest.NewData(hsmtest.State1), make(map[string]*persistencespb.StateMachineMap), &backend{}) + require.NoError(t, err) + + node1, err := root.AddChild(hsm.Key{Type: def1.Type(), ID: "node1"}, hsmtest.NewData(hsmtest.State1)) + require.NoError(t, err) + node2, err := root.AddChild(hsm.Key{Type: def1.Type(), ID: "node2"}, hsmtest.NewData(hsmtest.State1)) + require.NoError(t, err) + + // Record sequence of operations: + // 1. Transition on node1 + // 2. Transition on node2 + // 3. Delete node1 (should remove node1's transition) + // 4. Another transition on node2 + + err = hsm.MachineTransition(node1, func(d *hsmtest.Data) (hsm.TransitionOutput, error) { + return hsm.TransitionOutput{}, nil + }) + require.NoError(t, err) + + err = hsm.MachineTransition(node2, func(d *hsmtest.Data) (hsm.TransitionOutput, error) { + return hsm.TransitionOutput{}, nil + }) + require.NoError(t, err) + + err = root.DeleteChild(node1.Key) + require.NoError(t, err) + + err = hsm.MachineTransition(node2, func(d *hsmtest.Data) (hsm.TransitionOutput, error) { + return hsm.TransitionOutput{}, nil + }) + require.NoError(t, err) + + // After compaction: + // - node1's transition should be gone + // - delete operation should remain + // - node2's transitions should remain in original order + opLog, err := root.Outputs() + require.NoError(t, err) + + var ops []string + for _, op := range opLog { + switch o := op.(type) { + case hsm.TransitionOperation: + if slices.Equal(o.Path(), node2.Path()) { + ops = append(ops, "node2_transition") + } else if slices.Equal(o.Path(), node1.Path()) { + ops = append(ops, "node1_transition") + } + case hsm.DeleteOperation: + if slices.Equal(o.Path(), node1.Path()) { + ops = append(ops, "node1_delete") + } + } + } + + expected := []string{ + "node2_transition", + "node1_delete", + "node2_transition", + } + require.Equal(t, expected, ops, "operations should maintain chronological order after compaction") } diff --git a/service/history/workflow/task_generator.go b/service/history/workflow/task_generator.go index c1fda3c87a0..74892e9c291 100644 --- a/service/history/workflow/task_generator.go +++ b/service/history/workflow/task_generator.go @@ -296,26 +296,34 @@ func (r *TaskGeneratorImpl) GenerateDirtySubStateMachineTasks( stateMachineRegistry *hsm.Registry, ) error { tree := r.mutableState.HSM() - for _, pao := range tree.Outputs() { - node, err := tree.Child(pao.Path) + opLog, err := tree.Outputs() + if err != nil { + return err + } + for _, op := range opLog { + transitionOp, ok := op.(hsm.TransitionOperation) + if !ok { + continue + } + + node, err := tree.Child(transitionOp.Path()) if err != nil { return err } - for _, output := range pao.Outputs { - for _, task := range output.Tasks { - // since this method is called after transition history is updated for the current transition, - // we can safely call generateSubStateMachineTask which sets MutableStateVersionedTransition - // to the last versioned transition in StateMachineRef - if err := generateSubStateMachineTask( - r.mutableState, - stateMachineRegistry, - node, - pao.Path, - output.TransitionCount, - task, - ); err != nil { - return err - } + + for _, task := range transitionOp.Output.Tasks { + // since this method is called after transition history is updated for the current transition, + // we can safely call generateSubStateMachineTask which sets MutableStateVersionedTransition + // to the last versioned transition in StateMachineRef + if err := generateSubStateMachineTask( + r.mutableState, + stateMachineRegistry, + node, + transitionOp.Path(), + transitionOp.Output.TransitionCount, + task, + ); err != nil { + return err } } }