-
Notifications
You must be signed in to change notification settings - Fork 1
/
reader.go
339 lines (303 loc) · 11 KB
/
reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
package zkafka
//go:generate mockgen -package mock_confluent --destination=./mocks/confluent/kafka_consumer.go . KafkaConsumer
//go:generate mockgen --package=mock_zkafka --destination=./mocks/mock_reader.go . Reader
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)
//// go:generate mockgen -destination=./mocks/mock_metrics.go -source=reader.go
// Reader is the convenient interface for kafka KReader
type Reader interface {
Read(ctx context.Context) (*Message, error)
Close() error
}
// static type checking for the convenient Reader interface
var _ Reader = (*KReader)(nil)
type KafkaConsumer interface {
SubscribeTopics(topics []string, rebalanceCb kafka.RebalanceCb) error
ReadMessage(timeout time.Duration) (*kafka.Message, error)
Commit() ([]kafka.TopicPartition, error)
StoreOffsets(offsets []kafka.TopicPartition) (storedOffsets []kafka.TopicPartition, err error)
Close() error
Assignment() (partitions []kafka.TopicPartition, err error)
AssignmentLost() bool
}
var _ KafkaConsumer = (*kafka.Consumer)(nil)
// KReader is a Reader implementation which allows for the subscription to multiple topics. It provides methods
// for consuming messages from its subscribed topics and assigned partitions.
type KReader struct {
consumer KafkaConsumer
topicConfig ConsumerTopicConfig
isClosed bool
formatter kFormatter
logger Logger
lifecycle LifecycleHooks
once sync.Once
tCommitMgr *topicCommitMgr
}
type readerArgs struct {
cfg Config
cCfg ConsumerTopicConfig
consumerProvider confluentConsumerProvider
f kFormatter
l Logger
prefix string
hooks LifecycleHooks
opts []ReaderOption
}
// newReader makes a new reader based on the configurations
func newReader(args readerArgs) (*KReader, error) {
conf := args.cfg
topicConfig := args.cCfg
prefix := args.prefix
provider := args.consumerProvider
formatter := args.f
logger := args.l
confluentConfig, err := makeConsumerConfig(conf, topicConfig, prefix)
if err != nil {
return nil, err
}
consumer, err := provider(confluentConfig)
if err != nil {
return nil, err
}
r := &KReader{
consumer: consumer,
topicConfig: topicConfig,
formatter: formatter,
logger: logger,
lifecycle: args.hooks,
tCommitMgr: newTopicCommitMgr(),
}
s := ReaderSettings{}
for _, opt := range args.opts {
opt(&s)
}
if s.formatter != nil {
r.formatter = s.formatter
}
return r, nil
}
// Read consumes a single message at a time. Blocks until a message is returned or some
// non-fatal error occurs in which case a nil message is returned
func (r *KReader) Read(ctx context.Context) (*Message, error) {
r.once.Do(func() {
rebalanceCb := r.getRebalanceCb()
err := r.consumer.SubscribeTopics(r.topicConfig.topics(), rebalanceCb)
if err != nil {
r.logger.Errorw(ctx, "Failed to subscribe to topic", "topics", r.topicConfig.topics(), "error", err)
r.isClosed = true
}
})
if r.isClosed {
return nil, errors.New("reader closed")
}
kmsg, err := r.consumer.ReadMessage(time.Duration(*r.topicConfig.ReadTimeoutMillis) * time.Millisecond)
if err != nil {
var v kafka.Error
switch {
case errors.As(err, &v):
// timeouts occur (because the assigned partitions aren't being written to, lack of activity, etc.). We'll
// log them for debugging purposes
if v.Code() == kafka.ErrTimedOut {
return nil, nil
}
if v.IsRetriable() {
r.logger.Debugw(ctx, "Retryable error occurred", "topics", r.topicConfig.topics(), "error", v)
return nil, nil
}
return nil, fmt.Errorf("failed to read kafka message: %w", err)
}
return nil, fmt.Errorf("failed to read kafka message: %w", err)
}
if kmsg == nil {
return nil, nil
}
return r.mapMessage(ctx, *kmsg), nil
}
// Close terminates the consumer. This will gracefully unsubscribe
// the consumer from the kafka topic (which includes properly
// revoking the assigned partitions)
func (r *KReader) Close() error {
if r.isClosed {
return nil
}
r.isClosed = true
err := r.consumer.Close()
if err != nil {
return fmt.Errorf("failed to close kafka reader: %w", err)
}
return nil
}
// Assignments returns the current partition assignments for the kafka consumer
func (r *KReader) Assignments(_ context.Context) ([]Assignment, error) {
assignments, err := r.consumer.Assignment()
if err != nil {
return nil, fmt.Errorf("failed to get assignments: %w", err)
}
topicPartitions := make([]Assignment, 0, len(assignments))
for _, tp := range assignments {
if tp.Topic == nil {
continue
}
topicPartitions = append(topicPartitions, Assignment{
Partition: tp.Partition,
Topic: *tp.Topic,
})
}
return topicPartitions, nil
}
func (r *KReader) removeInWork(offset kafka.TopicPartition) {
topicName := getTopicName(offset.Topic)
c := r.tCommitMgr.get(topicName)
c.RemoveInWork(offset)
}
// mapMessage is responsible for mapping the confluent kafka.Message to a zkafka.Message.
func (r *KReader) mapMessage(_ context.Context, msg kafka.Message) *Message {
headerMap := headers(msg)
topicName := getTopicName(msg.TopicPartition.Topic)
c := r.tCommitMgr.get(topicName)
c.PushInWork(msg.TopicPartition)
partition := msg.TopicPartition.Partition
offset := int64(msg.TopicPartition.Offset)
return &Message{
Key: string(msg.Key),
isKeyNil: msg.Key == nil,
Headers: headerMap,
Partition: partition,
Topic: topicName,
GroupID: r.topicConfig.GroupID,
Offset: offset,
topicPartition: msg.TopicPartition,
TimeStamp: msg.Timestamp,
doneFunc: func(ctx context.Context) {
c.PushCompleted(msg.TopicPartition)
commitOffset := c.TryPop(ctx, partition)
if commitOffset == nil {
r.logger.Debugw(ctx, "Message complete, but can't commit yet", "topicName", topicName, "groupID", r.topicConfig.GroupID, "partition", partition, "offset", offset)
return
}
if commitOffset.Error != nil {
r.logger.Errorw(ctx, "Message complete, but can't commit because of error", "commitOffset", commitOffset)
return
}
if commitOffset.Offset < 0 {
r.logger.Errorw(ctx, "Message complete, but can't commit because offset < 0", "commitOffset", commitOffset)
return
}
// https://github.com/confluentinc/confluent-kafka-go/v2/blob/master/kafka/consumer.go#L297
// https://github.com/confluentinc/confluent-kafka-go/v2/issues/656
commitOffset.Offset++
_, err := r.consumer.StoreOffsets([]kafka.TopicPartition{*commitOffset})
r.logger.Debugw(ctx, "Stored offsets", "offset", commitOffset, "groupID", r.topicConfig.GroupID)
if err != nil {
r.logger.Errorw(ctx, "Error storing offsets", "topicName", topicName, "groupID", r.topicConfig.GroupID, "partition", partition, "offset", offset, "error", err)
}
},
value: msg.Value,
fmt: r.formatter,
schema: r.topicConfig.SchemaRegistry.Deserialization.Schema,
}
}
// getRebalanceCb returns a callback which can be used during rebalances.
// It previously attempted to do one final, explicit commit of stored offsets.
// This was unnecessary per the maintainer of librdkafka (https://github.com/confluentinc/librdkafka/issues/1829#issuecomment-393427324)
// since when using auto.offset.commit=true (which this library does) the offsets are commit at configured intervals, during close and finally during rebalance.
//
// We do however, want to attempt to let current work complete before allowing a rebalance (so we check the in progress heap) for up to 10 seconds.
//
// This is part of the commit management strategy per guidance here https://docs.confluent.io/platform/current/clients/consumer.html#offset-management
// commit when partitions are revoked
func (r *KReader) getRebalanceCb() kafka.RebalanceCb {
ctx := context.Background()
rebalanceCb := func(_ *kafka.Consumer, event kafka.Event) error {
switch e := event.(type) {
case kafka.AssignedPartitions:
r.logger.Infow(ctx, "Assigned partitions event received", "event", e, "topics", r.topicConfig.topics(), "groupID", r.topicConfig.GroupID)
case kafka.RevokedPartitions:
r.logger.Infow(ctx, "Revoked partitions event received", "event", e, "topics", r.topicConfig.topics(), "groupID", r.topicConfig.GroupID)
// Usually, the rebalance callback for `RevokedPartitions` is called
// just before the partitions are revoked. We can be certain that a
// partition being revoked is not yet owned by any other consumer.
// This way, logic like storing any pending offsets or committing
// offsets can be handled.
// However, there can be cases where the assignment is lost
// involuntarily. In this case, the partition might already be owned
// by another consumer, and operations including committing
// offsets may not work. (this part of the check comes from this confluent-kafka-go example)[https://github.com/confluentinc/confluent-kafka-go/blob/master/examples/consumer_rebalance_example/consumer_rebalance_example.go]
if r.consumer.AssignmentLost() {
r.logger.Infow(ctx, "Assignment lost prior to revoke (possibly because client was closed)", "event", e, "topics", r.topicConfig.topics(), "groupID", r.topicConfig.GroupID)
return nil
}
// we're going to try and finish processing the inwork messages.
// We'll do this by checking the commit manager. When the RevokedPartitions event is emitted,
// subsequent ReadMessage() calls from a consumer will return nil, allowing the commit manager to get widdled down.
ctxNew, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
for {
var inWorkCount int64 = 0
for _, t := range getTopics(e.Partitions) {
cmtMgr := r.tCommitMgr.get(t)
inWorkCount += cmtMgr.InWorkCount()
}
if inWorkCount == 0 {
break
}
if ctxNew.Err() != nil {
r.logger.Warnw(ctx, "Incomplete inwork drain during revokedPartitions event", "event", e, "inWorkCount", inWorkCount, "error", ctxNew.Err(), "topics", r.topicConfig.topics(), "groupID", r.topicConfig.GroupID)
break
}
// we're polling the commit manager, we'll do a small pause to avoid a busy loop
time.Sleep(time.Microsecond * 1)
}
}
return nil
}
return rebalanceCb
}
func getTopics(partitions []kafka.TopicPartition) []string {
uniqueTopics := map[string]struct{}{}
for _, p := range partitions {
if p.Topic == nil {
continue
}
uniqueTopics[*p.Topic] = struct{}{}
}
topics := make([]string, 0, len(uniqueTopics))
for t := range uniqueTopics {
topics = append(topics, t)
}
return topics
}
func getTopicName(topicName *string) string {
topic := ""
if topicName != nil {
topic = *topicName
}
return topic
}
type ReaderSettings struct {
formatter kFormatter
}
// ReaderOption is a function that modify the KReader configurations
type ReaderOption func(*ReaderSettings)
// RFormatterOption sets the formatter for this reader
func RFormatterOption(formatter Formatter) ReaderOption {
return func(s *ReaderSettings) {
if formatter != nil {
s.formatter = zfmtShim{F: formatter}
}
}
}
type TopicPartition struct {
Partition int32
Offset int64
}
type Assignment struct {
Partition int32
Topic string
}