forked from twitchscience/kinsumer
-
Notifications
You must be signed in to change notification settings - Fork 1
/
checkpoints.go
339 lines (295 loc) · 10.7 KB
/
checkpoints.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
// Copyright (c) 2016 Twitch Interactive
package kinsumer
import (
"fmt"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
)
// Note: Not thread safe!
type checkpointer struct {
shardID string
tableName string
dynamodb dynamodbiface.DynamoDBAPI
sequenceNumber string
ownerName string
ownerID string
maxAgeForClientRecord time.Duration
stats StatReceiver
captured bool
dirty bool
mutex sync.Mutex
finished bool
finalSequenceNumber string
updateSequencer chan struct{}
lastUpdate int64
commitIntervalCounter time.Duration
lastRecordPassed time.Time
}
type checkpointRecord struct {
Shard string
SequenceNumber *string // last read sequence number, null if the shard has never been consumed
LastUpdate int64 // timestamp of last commit/ownership change
OwnerName *string // uuid of owning client, null if the shard is unowned
Finished *int64 // timestamp of when the shard was fully consumed, null if it's active
// Columns added to the table that are never used for decision making in the
// library, rather they are useful for manual troubleshooting
OwnerID *string
LastUpdateRFC string
FinishedRFC *string
}
// capture is a non-blocking call that attempts to capture the given shard/checkpoint.
// It returns a checkpointer on success, or nil if it fails to capture the checkpoint
func capture(
shardID string,
tableName string,
dynamodbiface dynamodbiface.DynamoDBAPI,
ownerName string,
ownerID string,
maxAgeForClientRecord time.Duration,
stats StatReceiver) (*checkpointer, error) {
cutoff := time.Now().Add(-maxAgeForClientRecord).UnixNano()
// Grab the entry from dynamo assuming there is one
resp, err := dynamodbiface.GetItem(&dynamodb.GetItemInput{
TableName: aws.String(tableName),
ConsistentRead: aws.Bool(true),
Key: map[string]*dynamodb.AttributeValue{
"Shard": {S: aws.String(shardID)},
},
})
if err != nil {
return nil, fmt.Errorf("error calling GetItem on shard checkpoint: %v", err)
}
// Convert to struct so we can work with the values
var record checkpointRecord
if err = dynamodbattribute.UnmarshalMap(resp.Item, &record); err != nil {
return nil, err
}
// If the record is marked as owned by someone else, and has not expired
if record.OwnerID != nil && record.LastUpdate > cutoff {
// We fail to capture it
return nil, nil
}
// Make sure the Shard is set in case there was no record
record.Shard = shardID
// Mark us as the owners
record.OwnerID = &ownerID
record.OwnerName = &ownerName
// Update timestamp
now := time.Now()
record.LastUpdate = now.UnixNano()
record.LastUpdateRFC = now.UTC().Format(time.RFC1123Z)
item, err := dynamodbattribute.MarshalMap(record)
if err != nil {
return nil, err
}
attrVals, err := dynamodbattribute.MarshalMap(map[string]interface{}{
":cutoff": aws.Int64(cutoff),
":nullType": aws.String("NULL"),
})
if err != nil {
return nil, err
}
if _, err = dynamodbiface.PutItem(&dynamodb.PutItemInput{
TableName: aws.String(tableName),
Item: item,
// The OwnerID doesn't exist if the entry doesn't exist, but PutItem with a marshaled
// checkpointRecord sets a nil OwnerID to the NULL type.
ConditionExpression: aws.String(
"attribute_not_exists(OwnerID) OR attribute_type(OwnerID, :nullType) OR LastUpdate <= :cutoff"),
ExpressionAttributeValues: attrVals,
}); err != nil {
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == conditionalFail {
// We failed to capture it
return nil, nil
}
return nil, err
}
checkpointer := &checkpointer{
shardID: shardID,
tableName: tableName,
dynamodb: dynamodbiface,
ownerName: ownerName,
ownerID: ownerID,
stats: stats,
sequenceNumber: aws.StringValue(record.SequenceNumber),
maxAgeForClientRecord: maxAgeForClientRecord,
captured: true,
lastUpdate: record.LastUpdate,
}
return checkpointer, nil
}
// commit writes the latest SequenceNumber consumed to dynamo and updates LastUpdate.
// Returns true if we set Finished in dynamo because the library user finished consuming the shard.
// Once that has happened, the checkpointer should be released and never grabbed again.
func (cp *checkpointer) commit(commitFrequency time.Duration) (bool, error) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
if !cp.dirty && !cp.finished {
cp.commitIntervalCounter += commitFrequency
// If we have recently passed a record to the user, don't update the table when we don't have a new sequence number
// If we haven't, update at a rate of maxAgeForClientRecord/2
if (time.Now().Sub(cp.lastRecordPassed) < cp.maxAgeForClientRecord/2) || (cp.commitIntervalCounter < cp.maxAgeForClientRecord/2) {
return false, nil
}
}
cp.commitIntervalCounter = 0 // Reset the counter if we're registering a commit
now := time.Now()
sn := &cp.sequenceNumber
if cp.sequenceNumber == "" {
// We are not allowed to pass empty strings to dynamo, so instead pass a nil *string
// to 'unset' it
sn = nil
}
record := checkpointRecord{
Shard: cp.shardID,
SequenceNumber: sn,
LastUpdate: now.UnixNano(),
LastUpdateRFC: now.UTC().Format(time.RFC1123Z),
}
finished := false
if cp.finished && (cp.sequenceNumber == cp.finalSequenceNumber || cp.finalSequenceNumber == "") {
record.Finished = aws.Int64(now.UnixNano())
record.FinishedRFC = aws.String(now.UTC().Format(time.RFC1123Z))
finished = true
}
record.OwnerID = &cp.ownerID
record.OwnerName = &cp.ownerName
item, err := dynamodbattribute.MarshalMap(&record)
if err != nil {
return false, err
}
attrVals, err := dynamodbattribute.MarshalMap(map[string]interface{}{
":ownerID": aws.String(cp.ownerID),
})
if err != nil {
return false, err
}
if _, err = cp.dynamodb.PutItem(&dynamodb.PutItemInput{
TableName: aws.String(cp.tableName),
Item: item,
ConditionExpression: aws.String("OwnerID = :ownerID"),
ExpressionAttributeValues: attrVals,
}); err != nil {
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == conditionalFail && cp.lastUpdate < time.Now().Add(-cp.maxAgeForClientRecord).UnixNano() {
return false, nil
}
return false, fmt.Errorf("error committing checkpoint: %s", err)
}
cp.lastUpdate = record.LastUpdate // update our internal copy of last update.
if sn != nil {
cp.stats.Checkpoint()
}
cp.dirty = false
return finished, nil
}
// release releases our ownership of the checkpoint in dynamo so another client can take it
func (cp *checkpointer) release() error {
now := time.Now()
attrVals, err := dynamodbattribute.MarshalMap(map[string]interface{}{
":ownerID": aws.String(cp.ownerID),
":sequenceNumber": aws.String(cp.sequenceNumber),
":lastUpdate": aws.Int64(now.UnixNano()),
":lastUpdateRFC": aws.String(now.UTC().Format(time.RFC1123Z)),
})
if err != nil {
return err
}
if _, err = cp.dynamodb.UpdateItem(&dynamodb.UpdateItemInput{
TableName: aws.String(cp.tableName),
Key: map[string]*dynamodb.AttributeValue{
"Shard": {S: aws.String(cp.shardID)},
},
UpdateExpression: aws.String("REMOVE OwnerID, OwnerName " +
"SET LastUpdate = :lastUpdate, LastUpdateRFC = :lastUpdateRFC, " +
"SequenceNumber = :sequenceNumber"),
ConditionExpression: aws.String("OwnerID = :ownerID"),
ExpressionAttributeValues: attrVals,
}); err != nil {
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == conditionalFail && cp.lastUpdate < time.Now().Add(-cp.maxAgeForClientRecord).UnixNano() {
// If we failed conditional check, and the record has expired, assume that another client has legitimately siezed the shard.
return nil
}
return fmt.Errorf("error releasing checkpoint: %s", err)
}
if cp.sequenceNumber != "" {
cp.stats.Checkpoint()
}
cp.captured = false
return nil
}
// update updates the current sequenceNumber of the checkpoint, marking it dirty if necessary
func (cp *checkpointer) update(sequenceNumber string) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
cp.dirty = cp.dirty || cp.sequenceNumber != sequenceNumber
cp.sequenceNumber = sequenceNumber
}
// updateFunc returns a function that will update to sequenceNumber when called, but maintains ordering
func (cp *checkpointer) updateFunc(sequenceNumber string) func() {
cp.mutex.Lock()
defer cp.mutex.Unlock()
// cp.updateSequencer represents whether the previous updateFunc has been called
// If nil there is no previous so we should act like there was one already called
if cp.updateSequencer == nil {
cp.updateSequencer = make(chan struct{})
close(cp.updateSequencer)
}
// Copy the previous channel and create a new one for the link to the next updateFunc
updateSequencer := cp.updateSequencer
cp.updateSequencer = make(chan struct{})
// Return everything in a closure to ensure references are maintained properly
return func(prev chan struct{}, sequenceNumber string, next chan struct{}) func() {
var once sync.Once
return func() {
once.Do(func() {
<-prev // Wait for all prior updateFuncs to be called
cp.update(sequenceNumber)
close(next) // Allow the next updateFunc to be called
})
}
}(updateSequencer, sequenceNumber, cp.updateSequencer)
}
// finish marks the given sequence number as the final one for the shard.
// sequenceNumber is the empty string if we never read anything from the shard.
func (cp *checkpointer) finish(sequenceNumber string) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
cp.finalSequenceNumber = sequenceNumber
cp.finished = true
}
// loadCheckpoints returns checkpoint records from dynamo mapped by shard id.
func loadCheckpoints(db dynamodbiface.DynamoDBAPI, tableName string) (map[string]*checkpointRecord, error) {
params := &dynamodb.ScanInput{
TableName: aws.String(tableName),
ConsistentRead: aws.Bool(true),
}
var records []*checkpointRecord
var innerError error
err := db.ScanPages(params, func(p *dynamodb.ScanOutput, lastPage bool) (shouldContinue bool) {
for _, item := range p.Items {
var record checkpointRecord
innerError = dynamodbattribute.UnmarshalMap(item, &record)
if innerError != nil {
return false
}
records = append(records, &record)
}
return !lastPage
})
if innerError != nil {
return nil, innerError
}
if err != nil {
return nil, err
}
checkpointMap := make(map[string]*checkpointRecord, len(records))
for _, checkpoint := range records {
checkpointMap[checkpoint.Shard] = checkpoint
}
return checkpointMap, nil
}