forked from bsm/sarama-cluster
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathoffsets.go
69 lines (57 loc) · 2.04 KB
/
offsets.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package cluster
import (
"sync"
"github.com/Shopify/sarama"
)
// OffsetStash allows to accumulate offsets and
// mark them as processed in a bulk
type OffsetStash struct {
offsets map[topicPartition]offsetInfo
mu sync.Mutex
}
// NewOffsetStash inits a blank stash
func NewOffsetStash() *OffsetStash {
return &OffsetStash{offsets: make(map[topicPartition]offsetInfo)}
}
// MarkOffset stashes the provided message offset
func (s *OffsetStash) MarkOffset(msg *sarama.ConsumerMessage, metadata string) {
s.MarkPartitionOffset(msg.Topic, msg.Partition, msg.Offset, metadata)
}
// MarkPartitionOffset stashes the offset for the provided topic/partition combination
func (s *OffsetStash) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) {
s.mu.Lock()
defer s.mu.Unlock()
key := topicPartition{Topic: topic, Partition: partition}
if info := s.offsets[key]; offset >= info.Offset {
info.Offset = offset
info.Metadata = metadata
s.offsets[key] = info
}
}
// ResetPartitionOffset stashes the offset for the provided topic/partition combination.
// Difference between ResetPartitionOffset and MarkPartitionOffset is that, ResetPartitionOffset supports earlier offsets
func (s *OffsetStash) ResetPartitionOffset(topic string, partition int32, offset int64, metadata string) {
s.mu.Lock()
defer s.mu.Unlock()
key := topicPartition{Topic: topic, Partition: partition}
if info := s.offsets[key]; offset <= info.Offset {
info.Offset = offset
info.Metadata = metadata
s.offsets[key] = info
}
}
// ResetOffset stashes the provided message offset
// See ResetPartitionOffset for explanation
func (s *OffsetStash) ResetOffset(msg *sarama.ConsumerMessage, metadata string) {
s.ResetPartitionOffset(msg.Topic, msg.Partition, msg.Offset, metadata)
}
// Offsets returns the latest stashed offsets by topic-partition
func (s *OffsetStash) Offsets() map[string]int64 {
s.mu.Lock()
defer s.mu.Unlock()
res := make(map[string]int64, len(s.offsets))
for tp, info := range s.offsets {
res[tp.String()] = info.Offset
}
return res
}