-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathproducer.go
138 lines (108 loc) · 3.54 KB
/
producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package kafka
import (
//"encoding/json"
//"fmt"
//"time"
"context"
"errors"
"github.com/Shopify/sarama"
log "github.com/uthng/golog"
)
var (
// ErrProducerTopicMsgHandlerNotFound indicates that no message handler is found for the given topic
ErrProducerTopicMsgHandlerNotFound = errors.New("producer handler for topic not found")
)
// ProducerHandler is an interface for Producer
type ProducerHandler interface {
Produce(msg interface{}, topic string) error
Close() error
}
type asyncProducer struct {
p sarama.AsyncProducer
ctx context.Context
handlers ProducerMsgHandlers
}
// EncodeRequestFunc decodes mesages received by the consumer
type EncodeRequestFunc func(context.Context, interface{}) ([]byte, error)
// EncodeResponseFunc encodes response message to publish to "response" topics
//type DecodeResponseFunc func(context.Context, []byte, interface{}) error
//type ProduceResponseFunc func(context.Context, interface{}, *AsyncProducer) error
//type ConsumeRequestFunc func(context.Context, *ProducerSessionMessage) (interface{}, error)
// ProducerMsgHandler represents a kafka message handler
type ProducerMsgHandler struct {
Encode EncodeRequestFunc
Before []BeforeFunc
After []AfterFunc
}
// ProducerMsgHandlers is a map between topic & message handler
type ProducerMsgHandlers map[string]*ProducerMsgHandler
// ProducerMsgOption is an option function
type ProducerMsgOption func(*ProducerMsgHandler)
// NewProducer creates an instance sarama async producer
func NewAsyncProducer(ctx context.Context, client sarama.Client, handlers ProducerMsgHandlers) (ProducerHandler, error) {
producer, err := sarama.NewAsyncProducerFromClient(client)
if err != nil {
return nil, err
}
return &asyncProducer{
p: producer,
ctx: ctx,
handlers: handlers,
}, nil
}
// Produce sends message to a specified topic
func (p *asyncProducer) Produce(request interface{}, topic string) error {
var err error
handler, ok := p.handlers[topic]
if !ok {
log.Errorw("Producer handler for not found", "topic", topic)
return ErrProducerTopicMsgHandlerNotFound
}
var msg []byte
if handler.Encode != nil {
msg, err = handler.Encode(p.ctx, request)
if err != nil {
return err
}
} else {
msg = request.([]byte)
}
select {
case p.p.Input() <- &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(msg),
}:
log.Infow("Kafka producer sending message...", "type", "async", "topic", topic, "msg", msg)
case err := <-p.p.Errors():
log.Errorw("Kafka producer failed to send message", "type", "async", "topic", topic, "msg", msg, "err", err)
return err
}
return nil
}
// Close teminates the instance
func (p *asyncProducer) Close() error {
if p != nil {
return p.p.Close()
}
return nil
}
// NewProducerMsgHandler creates a new consumer message handler
func NewProducerMsgHandler(enc EncodeRequestFunc, options ...ProducerMsgOption) *ProducerMsgHandler {
h := &ProducerMsgHandler{
Encode: enc,
}
for _, option := range options {
option(h)
}
return h
}
// ProducerMsgHandlerBefore functions are executed on the publisher request object before the
// request is decoded.
func ProducerMsgHandlerBefore(before ...BeforeFunc) ProducerMsgOption {
return func(h *ProducerMsgHandler) { h.Before = append(h.Before, before...) }
}
// ProducerMsgHandlerAfter functions are executed on the subscriber reply after the
// endpoint is invoked, but before anything is published to the reply.
func ProducerMsgHandlerAfter(after ...AfterFunc) ProducerMsgOption {
return func(h *ProducerMsgHandler) { h.After = append(h.After, after...) }
}