Skip to content

Commit

Permalink
Fix flatten and flaky tests (#37)
Browse files Browse the repository at this point in the history
  • Loading branch information
nrwiersma authored Jan 15, 2019
1 parent 408faa8 commit 26c332b
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 7 deletions.
10 changes: 4 additions & 6 deletions supervisor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,14 +288,14 @@ func TestTimedSupervisor_GlobalCommitSourceError(t *testing.T) {
inner.On("Close").Return(nil)

called := false
supervisor := streams.NewTimedSupervisor(inner, 1, func(err error) {
supervisor := streams.NewTimedSupervisor(inner, 5 * time.Millisecond, func(err error) {
assert.Equal(t, "error", err.Error())
called = true
})
_ = supervisor.Start()
defer supervisor.Close()

time.Sleep(time.Millisecond)
time.Sleep(6 * time.Millisecond)

inner.AssertCalled(t, "Commit", nil)
assert.True(t, called, "Expected error function to be called")
Expand Down Expand Up @@ -388,15 +388,13 @@ func TestTimedSupervisor_ManualCommitSkipsTimedCommit(t *testing.T) {
inner.On("Commit", caller).Return(nil)
inner.On("Close").Return(nil)

supervisor := streams.NewTimedSupervisor(inner, 5*time.Millisecond, nil)
supervisor := streams.NewTimedSupervisor(inner, 10*time.Millisecond, nil)
_ = supervisor.Start()
defer supervisor.Close()

time.Sleep(2 * time.Millisecond)

_ = supervisor.Commit(caller)

time.Sleep(4 * time.Millisecond)
time.Sleep(11 * time.Millisecond)

inner.AssertNumberOfCalls(t, "Commit", 1)
}
Expand Down
26 changes: 26 additions & 0 deletions topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,22 @@ func flattenNodeTree(roots map[Source]Node) []Node {
}
}

// In asymmetric trees, our dependencies can be out of order,
// which will cause errors. In order to ratify this, we check
// that not dependency appears higher in the list than us.
for i := 0; i < len(nodes); i++ {
node := nodes[i]
for _, child := range node.Children() {
pos := indexOf(child, nodes)
if pos < i {
temp := nodes[pos]
nodes[pos] = nodes[i]
nodes[i] = temp
i = pos
}
}
}

return nodes
}

Expand All @@ -236,3 +252,13 @@ func contains(n Node, nodes []Node) bool {

return false
}

func indexOf(n Node, nodes []Node) int {
for i, node := range nodes {
if node == n {
return i
}
}

return -1
}
98 changes: 97 additions & 1 deletion topology_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,72 @@ func TestFlattenNodeTree(t *testing.T) {
assert.Equal(t, []Node{node3, node4, node5, node6, node7}, nodes)
}

func TestFlattenNodeTree_HandlesComplexTrees(t *testing.T) {
node11 := &testNode{
name: "node11",
processor: &testProcessor{},
}
node10 := &testNode{
name: "node10",
processor: &testProcessor{},
}
node9 := &testNode{
name: "node9",
children: []Node{node10, node11},
processor: &testProcessor{},
}
node8 := &testNode{
name: "node8",
children: []Node{node9},
processor: &testProcessor{},
}
node7 := &testNode{
name: "node7",
children: []Node{node8},
processor: &testProcessor{},
}
node6 := &testNode{
name: "node6",
children: []Node{node7},
processor: &testProcessor{},
}
node5 := &testNode{
name: "node5",
children: []Node{node6},
processor: &testProcessor{},
}
node4 := &testNode{
name: "node4",
children: []Node{node8},
processor: &testProcessor{},
}
node2 := &testNode{
name: "node2",
children: []Node{node4},
}
node3 := &testNode{
name: "node3",
children: []Node{node5},
processor: &testProcessor{},
}
node1 := &testNode{
name: "node1",
children: []Node{node3},
}

nodes := flattenNodeTree(map[Source]Node{
testSource(1): node1,
testSource(2): node2,
})

// Deal with the random access of maps
if nodes[0] == node4 {
assert.Equal(t, []Node{node4, node3, node5, node6, node7, node8, node9, node11, node10}, nodes)
} else {
assert.Equal(t, []Node{node3, node4, node5, node6, node7, node8, node9, node10, node11}, nodes)
}
}

func TestReverse(t *testing.T) {
node1 := &testNode{}
node2 := &testNode{}
Expand Down Expand Up @@ -129,14 +195,44 @@ func TestContains(t *testing.T) {
}
}

func TestIndexOf(t *testing.T) {
node1 := &testNode{}
node2 := &testNode{}
node3 := &testNode{}
node4 := &testNode{}

tests := []struct {
node Node
nodes []Node
index int
}{
{
node: node1,
nodes: []Node{node1, node2, node3},
index: 0,
},
{
node: node4,
nodes: []Node{node1, node2, node3},
index: -1,
},
}

for _, tt := range tests {
i := indexOf(tt.node, tt.nodes)

assert.Equal(t, tt.index, i)
}
}

type testNode struct {
name string
children []Node
processor Processor
}

func (t *testNode) Name() string {
return ""
return t.name
}

func (t *testNode) AddChild(n Node) {
Expand Down

0 comments on commit 26c332b

Please sign in to comment.