-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathlocal_mq.go
73 lines (62 loc) · 1.32 KB
/
local_mq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package emitter
import (
"github.com/gone-io/gone"
"github.com/gone-io/gone/goner/tracer"
"strconv"
)
func NewLocalMQ() (gone.Angel, gone.GonerId) {
return &localMq{}, IdGoneEmitterMq
}
type msgWrap struct {
id int64
content []byte
headers Headers
}
type localMq struct {
gone.Flag
gone.Logger `gone:"gone-logger"`
tracer tracer.Tracer `gone:"gone-tracer"`
buf chan msgWrap
sub Subscriber
incr int64
stopSignal chan struct{}
}
func (q *localMq) Send(msg MQMsg) (msgIds []string, err error) {
q.incr++
q.buf <- msgWrap{
id: q.incr,
content: msg.GetBody(),
headers: msg.GetHeaders(),
}
msgIds = append(msgIds, strconv.FormatInt(q.incr, 10))
return
}
func (q *localMq) Consumer(sub Subscriber) {
q.sub = sub
}
func (q *localMq) receiveMsg() {
q.tracer.RecoverSetTraceId("", func() {
for {
select {
case m := <-q.buf:
m.headers[MsgId] = strconv.FormatInt(m.id, 10)
err := q.sub(NewMQMsg(m.content, m.headers))
if err != nil {
q.Warnf("msg deal failed for err: %v", err)
}
case <-q.stopSignal:
return
}
}
})
}
func (q *localMq) Start(gone.Cemetery) error {
q.buf = make(chan msgWrap, 8)
q.stopSignal = make(chan struct{})
go q.receiveMsg()
return nil
}
func (q *localMq) Stop(gone.Cemetery) error {
close(q.stopSignal)
return nil
}