forked from twitchscience/kinsumer
-
Notifications
You must be signed in to change notification settings - Fork 1
/
checkpoints_test.go
158 lines (130 loc) · 4.87 KB
/
checkpoints_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
// Copyright (c) 2016 Twitch Interactive.
package kinsumer
import (
"testing"
"time"
"github.com/twitchscience/kinsumer/mocks"
)
func TestCheckpointer(t *testing.T) {
table := "checkpoints"
mock := mocks.NewMockDynamo([]string{table})
stats := &NoopStatReceiver{}
cp, err := capture("shard", table, mock, "ownerName", "ownerId", 3*time.Minute, stats)
// Initially, we expect that there is no record, so our new record should have no sequence number
if err != nil {
t.Errorf("current 1 err=%q", err)
}
if cp == nil {
t.Errorf("Should always be able to capture the shard if there is no entry in dynamo")
}
if cp.sequenceNumber != "" {
t.Errorf("sequence number should initially be an empty string")
}
// Update the sequence number. This shouldn't cause any external request.
mocks.AssertNoRequestsMade(t, mock.(*mocks.MockDynamo), "update(seq1)", func() {
cp.update("seq1")
})
// Now actually commit.
mocks.AssertRequestMade(t, mock.(*mocks.MockDynamo), "commit(seq1)", func() {
if _, err = cp.commit(50 * time.Millisecond); err != nil {
t.Errorf("commit seq1 err=%q", err)
}
})
// Call update, but keep the same sequence number
cp.update("seq1")
// Since the sequence number hasn't changed, committing shouldn't make a request.
mocks.AssertNoRequestsMade(t, mock.(*mocks.MockDynamo), "commit unchanged sequence number", func() {
if _, err = cp.commit(50 * time.Millisecond); err != nil {
t.Errorf("commit unchanged err=%q", err)
}
})
// Call update again with a new value
cp.update("seq2")
// committing should trigger a request
mocks.AssertRequestMade(t, mock.(*mocks.MockDynamo), "commit(seq2)", func() {
if _, err = cp.commit(50 * time.Millisecond); err != nil {
t.Errorf("commit seq2 err=%q", err)
}
})
// Call update with a new value twice in a row
cp.update("seq3")
cp.update("seq3")
// This should still trigger an update
mocks.AssertRequestMade(t, mock.(*mocks.MockDynamo), "commit(seq3)", func() {
if _, err = cp.commit(50 * time.Millisecond); err != nil {
t.Errorf("commit seq3 err=%q", err)
}
})
// Try to get another checkpointer for this shard, should not succeed but not error
cp2, err := capture("shard", table, mock, "differentOwner", "differentOwnerId", 3*time.Minute, stats)
if err != nil {
t.Errorf("cp2 first attempt err=%q", err)
}
if cp2 != nil {
t.Errorf("Should not be able to steal shard")
}
cp.update("lastseq")
// release should trigger an update
mocks.AssertRequestMade(t, mock.(*mocks.MockDynamo), "cp.release()", func() {
if err = cp.release(); err != nil {
t.Errorf("release err=%q", err)
}
})
//TODO: Test fails because dynamo mock does not handle replacing records in put, need to resolve that
/*
// Now that we have released the shard, we should be able to grab it
cp2, err = newCheckpointer(aws.String("shard"), table, mock, "differentOwner", "differentOwnerId", 3*time.Minute)
if err != nil {
t.Errorf("cp2 second attempt err=%q", err)
}
if cp2 == nil {
t.Errorf("The shard should be ours!")
}
if cp2.sequenceNumber != "lastseq" {
t.Errorf("Release should have committed `lastseq` but new checkpointer got %s!", cp2.sequenceNumber)
}
*/
}
func TestCheckpointer2(t *testing.T) {
table := "checkpoints"
mock := mocks.NewMockDynamo([]string{table})
stats := &NoopStatReceiver{}
cp, err := capture("shard", table, mock, "ownerName", "ownerId", 3*time.Minute, stats)
// Initially, we expect that there is no record, so our new record should have no sequence number
if err != nil {
t.Errorf("current 1 err=%q", err)
}
if cp == nil {
t.Errorf("Should always be able to capture the shard if there is no entry in dynamo")
}
if cp.sequenceNumber != "" {
t.Errorf("sequence number should initially be an empty string")
}
// Update the sequence number. This shouldn't cause any external request.
mocks.AssertNoRequestsMade(t, mock.(*mocks.MockDynamo), "update(seq1)", func() {
cp.update("seq1")
})
// Now actually commit.
mocks.AssertRequestMade(t, mock.(*mocks.MockDynamo), "commit(seq1)", func() {
if _, err = cp.commit(50 * time.Millisecond); err != nil {
t.Errorf("commit seq1 err=%q", err)
}
})
// Call update, but keep the same sequence number
cp.update("seq1")
// Since the sequence number hasn't changed, committing shouldn't make a request.
mocks.AssertNoRequestsMade(t, mock.(*mocks.MockDynamo), "commit unchanged sequence number", func() {
if _, err = cp.commit(50 * time.Millisecond); err != nil {
t.Errorf("commit unchanged err=%q", err)
}
})
// Set cp information to mimic conditions to retain the shard with no data.
cp.maxAgeForClientRecord = 1 * time.Second
cp.lastRecordPassed = time.Now().Add(-501 * time.Millisecond)
// committing should trigger a request
mocks.AssertRequestMade(t, mock.(*mocks.MockDynamo), "commit(seq2)", func() {
if _, err = cp.commit(501 * time.Millisecond); err != nil {
t.Errorf("commit seq2 err=%q", err)
}
})
}