-
Notifications
You must be signed in to change notification settings - Fork 0
/
stream_sub.go
145 lines (124 loc) · 3.69 KB
/
stream_sub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package idefixgo
import (
"context"
"fmt"
"strings"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/jaracil/ei"
ie "github.com/nayarsystems/idefix-go/errors"
m "github.com/nayarsystems/idefix-go/messages"
"github.com/vmihailenco/msgpack/v5"
)
// SubscriberStream manages the state and operations for a subscription to a message stream.
type SubscriberStream struct {
ctx context.Context
cancel context.CancelCauseFunc
timeout time.Duration
c *Client
buffer chan *m.Message
topic string
address string
subId string
payloadOnly bool
}
// NewSubscriberStream creates a new SubscriberStream for the specified topic.
// It establishes a connection to the remote address and subscribes to the provided topic.
//
// The SubscriberStream allows the client to receive messages published to the specified topic.
// It can handle both payload-only messages and full message structures based on the
// value of the payloadOnly parameter.
func (c *Client) NewSubscriberStream(address string, topic string, capacity uint, payloadOnly bool, timeout time.Duration) (*SubscriberStream, error) {
s := &SubscriberStream{
address: address,
timeout: timeout,
topic: topic,
c: c,
buffer: make(chan *m.Message, capacity),
payloadOnly: payloadOnly,
}
s.ctx, s.cancel = context.WithCancelCause(c.ctx)
res := m.StreamCreateSubResMsg{}
err := s.c.Call2(address, &m.Message{To: m.TopicRemoteSubscribe, Data: m.StreamCreateMsg{
TargetTopic: topic,
Timeout: timeout,
PayloadOnly: s.payloadOnly,
}}, &res, time.Second*5)
if err != nil {
return nil, err
}
if res.StickyPayload != nil {
s.buffer <- &m.Message{To: s.topic, Data: res.StickyPayload}
}
pubtopic := fmt.Sprintf("%s/%s", m.MqttPublicPrefix, res.PublicTopic)
c.client.Subscribe(pubtopic, 0, s.receiveMessage)
s.subId = res.Id
go s.keepalive()
return s, nil
}
func (s *SubscriberStream) handleMsg(msg any) {
if s.payloadOnly {
s.buffer <- &m.Message{To: s.topic, Data: msg}
return
}
topic, err := ei.N(msg).M("s").String()
if err != nil {
topic = s.topic
}
payload, err := ei.N(msg).M("p").Raw()
if err != nil {
fmt.Println("Error getting payload", err)
return
}
s.buffer <- &m.Message{To: topic, Data: payload}
}
func (s *SubscriberStream) receiveMessage(client mqtt.Client, msg mqtt.Message) {
if strings.HasPrefix(msg.Topic(), m.MqttPublicPrefix+"/") {
var tmp any
err := msgpack.Unmarshal(msg.Payload(), &tmp)
if err != nil {
fmt.Println("Error unmarshalling message", err, msg.Payload())
return
}
s.handleMsg(tmp)
}
}
func (s *SubscriberStream) keepalive() {
t := time.NewTicker(s.timeout / 4)
defer s.Close()
for {
select {
case <-s.ctx.Done():
return
case <-t.C:
err := s.c.Call2(s.address, &m.Message{To: m.TopicRemoteSubscribe, Data: m.StreamCreateMsg{
Id: s.subId,
Timeout: s.timeout,
PayloadOnly: s.payloadOnly,
}}, nil, time.Second*5)
if err != nil && !ie.ErrTimeout.Is(err) {
s.cancel(err)
return
}
}
}
}
// Channel returns a read-only channel that streams messages from the subscriber.
func (s *SubscriberStream) Channel() <-chan *m.Message {
return s.buffer
}
// Context returns the context of a given SubscriberStream
func (s *SubscriberStream) Context() context.Context {
return s.ctx
}
// Close terminates the 'SubscriberStream' by canceling the stream and unsubscribing from the remote topic.
func (s *SubscriberStream) Close() error {
defer s.cancel(fmt.Errorf("closed by user"))
_, err := s.c.Call(s.address, &m.Message{To: m.TopicRemoteUnsubscribe, Data: m.StreamDeleteMsg{
Id: s.subId,
}}, time.Second*5)
if err != nil {
return err
}
return nil
}