-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconsumer.go
114 lines (87 loc) · 3.44 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package kafka
import (
"context"
//"encoding/json"
//"fmt"
//"errors"
//"github.com/spf13/cast"
"github.com/go-kit/kit/endpoint"
//"github.com/go-kit/kit/log"
//"github.com/go-kit/kit/transport"
"github.com/Shopify/sarama"
)
// ConsumerGroupHandler represents interface of a consumer group.
type ConsumerGroupHandler interface {
sarama.ConsumerGroupHandler
WaitReady()
Reset()
Start()
Close() error
}
// ConsumerSessionMessage represents messages to consume.
type ConsumerSessionMessage struct {
Session sarama.ConsumerGroupSession
Message *sarama.ConsumerMessage
}
// DecodeRequestFunc decodes mesages received by the consumer
type DecodeRequestFunc func(context.Context, interface{}) (request interface{}, err error)
// EncodeResponseFunc encodes response message to publish to "response" topics
type EncodeResponseFunc func(context.Context, interface{}) ([]byte, error)
// ProduceResponseFunc sends response message to a output topic after EncodeResponseFunc invoked
type ProduceResponseFunc func(context.Context, interface{}, ProducerHandler) error
//type ConsumeRequestFunc func(context.Context, *ConsumerSessionMessage) (interface{}, error)
// ConsumerMsgHandler contains functions to handle encode/decode request/response messages
// and send response mesasges to output topics
type ConsumerMsgHandler struct {
Endpoint endpoint.Endpoint
Decode DecodeRequestFunc
Encode EncodeResponseFunc
Before []BeforeFunc
After []AfterFunc
Finalizer []FinalizerFunc
//Consume ConsumeRequestFunc
Produce ProduceResponseFunc
}
// ConsumerMsgHandlers is a map between topic and specific handlers
type ConsumerMsgHandlers map[string]*ConsumerMsgHandler
// ConsumerMsgOption set an option param func for ConsumerMsgHandler
type ConsumerMsgOption func(*ConsumerMsgHandler)
// NewConsumerGroup creates a new consumer group and returns a consumer group handler
func NewConsumerGroup(ctx context.Context, client sarama.Client, config map[string]interface{}, producer ProducerHandler, handlers ConsumerMsgHandlers) (ConsumerGroupHandler, error) {
kind := "multi-async"
m, ok := config["kind"]
if ok {
kind = m.(string)
}
if kind == "multi-async" {
return NewMultiAsyncCG(ctx, client, config, producer, handlers)
}
return nil, nil
}
// NewConsumerMsgHandler creates a new consumer message handler
func NewConsumerMsgHandler(e endpoint.Endpoint, dec DecodeRequestFunc, enc EncodeResponseFunc, p ProduceResponseFunc, options ...ConsumerMsgOption) *ConsumerMsgHandler {
h := &ConsumerMsgHandler{
Endpoint: e,
Decode: dec,
Encode: enc,
Produce: p,
}
for _, option := range options {
option(h)
}
return h
}
// ConsumerMsgHandlerBefore functions are executed on the publisher request object before the
// request is decoded.
func ConsumerMsgHandlerBefore(before ...BeforeFunc) ConsumerMsgOption {
return func(h *ConsumerMsgHandler) { h.Before = append(h.Before, before...) }
}
// ConsumerMsgHandlerAfter functions are executed on the consumer reply after the
// endpoint is invoked, but before anything is published to the reply.
func ConsumerMsgHandlerAfter(after ...AfterFunc) ConsumerMsgOption {
return func(h *ConsumerMsgHandler) { h.After = append(h.After, after...) }
}
// ConsumerMsgHandlerFinalizer functions are executed on the consumer on quitting the function (defer)
func ConsumerMsgHandlerFinalizer(finalizer ...FinalizerFunc) ConsumerMsgOption {
return func(h *ConsumerMsgHandler) { h.Finalizer = append(h.Finalizer, finalizer...) }
}