-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlistener.go
192 lines (161 loc) · 4.97 KB
/
listener.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
package convoy_cli
import (
"encoding/json"
"github.com/frain-dev/convoy-cli/net"
"github.com/frain-dev/convoy-cli/util"
"github.com/gorilla/websocket"
log "github.com/sirupsen/logrus"
"io"
"net/http"
"net/url"
"os"
"os/signal"
"time"
)
const (
// Time allowed to write a message to the server.
writeWait = 10 * time.Second
// Time allowed to read the next pong message from the server.
pongWait = 10 * time.Second
// Send pings to server with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
)
type Listener struct {
done chan interface{} // Channel to indicate that the receiverHandler is done
interrupt chan os.Signal // Channel to listen for interrupt signal to terminate gracefully
c *Config
}
func NewListener(c *Config) *Listener {
return &Listener{
c: c,
done: make(chan interface{}),
interrupt: make(chan os.Signal),
}
}
func (l *Listener) Listen(listenRequest *ListenRequest, hostInfo *url.URL) {
signal.Notify(l.interrupt, os.Interrupt) // Notify the interrupt channel for SIGINT
body, err := json.Marshal(listenRequest)
if err != nil {
log.Fatal("Error marshalling json:", err)
}
url := url.URL{
Scheme: "ws",
Host: hostInfo.Host,
Path: "/stream/listen",
}
conn, response, err := websocket.DefaultDialer.Dial(url.String(), http.Header{
"Authorization": []string{"Bearer " + l.c.ActiveApiKey},
"Body": []string{string(body)},
})
if err != nil {
if response != nil {
buf, e := io.ReadAll(response.Body)
if e != nil {
log.Fatal("Error parsing request body", e)
}
defer response.Body.Close()
log.WithError(err).Fatalln("websocket dialer failed with response: ", string(buf))
}
log.Fatal(err)
}
if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
defer conn.Close()
if !util.IsStringEmpty(listenRequest.Since) {
// Send a message to the server to resend unsuccessful events to the device
err := conn.WriteMessage(websocket.TextMessage, []byte(listenRequest.Since))
if err != nil {
log.WithError(err).Errorln("an error occurred sending 'since' message")
}
}
go l.HandleMessage(conn, listenRequest.ForwardTo)
l.PingUntilInterrupt(conn)
}
func (l *Listener) PingUntilInterrupt(conn *websocket.Conn) {
ticker := time.NewTicker(pingPeriod)
defer ticker.Stop()
// Our main loop for the client
// We send our relevant packets here
for {
select {
case <-ticker.C:
err := conn.SetWriteDeadline(time.Now().Add(writeWait))
if err != nil {
log.WithError(err).Errorln("failed to set write deadline")
}
if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
log.WithError(err).Errorln("failed to set write ping message")
return
}
case <-l.interrupt:
// We received a SIGINT (Ctrl + C). Terminate gracefully...
log.Println("Received SIGINT interrupt signal. Closing all pending connections")
// Send a message to set the device to offline
err := conn.WriteMessage(websocket.TextMessage, []byte("disconnect"))
if err != nil {
log.WithError(err).Errorln("error during closing websocket")
return
}
// Close our websocket connection
err = conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
if err != nil {
log.WithError(err).Errorln("error during closing websocket")
return
}
select {
case <-l.done:
log.Println("Receiver Channel Closed! Exiting....")
case <-time.After(time.Duration(1) * time.Second):
log.Println("Timeout in closing receiving channel. Exiting....")
}
return
}
}
}
func (l *Listener) HandleMessage(connection *websocket.Conn, url string) {
defer close(l.done)
for {
_, msg, err := connection.ReadMessage()
if err != nil {
if !websocket.IsUnexpectedCloseError(err,
websocket.CloseNormalClosure,
websocket.CloseGoingAway,
websocket.CloseAbnormalClosure) {
return
}
log.Error("an error occurred in the receive handler:", err)
return
}
var event CLIEvent
err = json.Unmarshal(msg, &event)
if err != nil {
log.Error("an error occurred in unmarshalling json:", err)
continue
}
// send request to the recipient
d, err := net.NewDispatcher(time.Second*10, "")
if err != nil {
log.Error("an error occurred while forwarding the event", err)
continue
}
res, err := d.ForwardCliEvent(url, http.MethodPost, event.Data, event.Headers)
if err != nil {
log.Error("an error occurred while forwarding the event", err)
continue
}
// set the event delivery status to Success when we successfully forward the event
ack := &AckEventDelivery{UID: event.UID}
mb, err := json.Marshal(ack)
if err != nil {
log.Error("an error occurred in marshalling json:", err)
continue
}
// write an ack message back to the connection here
err = connection.WriteMessage(websocket.TextMessage, mb)
if err != nil {
log.Error("an error occurred while acknowledging the event", err)
}
log.Println(string(res.Body))
}
}