forked from xmidt-org/talaria
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy patheventDispatcher.go
318 lines (274 loc) · 10.1 KB
/
eventDispatcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
// SPDX-FileCopyrightText: 2017 Comcast Cable Communications Management, LLC
// SPDX-License-Identifier: Apache-2.0
package main
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net/http"
"runtime/debug"
"time"
"go.uber.org/zap"
"github.com/prometheus/client_golang/prometheus"
"github.com/xmidt-org/webpa-common/v2/device"
"github.com/xmidt-org/webpa-common/v2/event"
// nolint:staticcheck
"github.com/xmidt-org/wrp-go/v3"
)
var (
ErrorEncodingFailed = errors.New("encoding failed")
ErrorNoEndpointConfiguredForEvent = errors.New("no endpoints configured for event")
ErrorMalformedHttpRequest = errors.New("malformed http request")
ErrorUnroutableDestination = errors.New("unroutable destination")
ErrorUnsupportedEvent = errors.New("unsupported event")
)
// eventDispatcher is an internal Dispatcher implementation that sends envelopes
// via the returned channel. The channel may be used to spawn one or more workers
// to process the envelopes
type eventDispatcher struct {
logger *zap.Logger
urlFilter URLFilter
method string
timeout time.Duration
authorizationKey string
source string
eventMap event.MultiMap
queueSize prometheus.Gauge
droppedMessages CounterVec
outboundEvents CounterVec
outbounds chan<- outboundEnvelope
}
// NewEventDispatcher is an eventDispatcher factory which sends envelopes via
// the returned channel. The channel may be used to spawn one or more workers
// to process the envelopes.
func NewEventDispatcher(om OutboundMeasures, o *Outbounder, urlFilter URLFilter) (Dispatcher, <-chan outboundEnvelope, error) {
if urlFilter == nil {
var err error
urlFilter, err = NewURLFilter(o)
if err != nil {
return nil, nil, err
}
}
outbounds := make(chan outboundEnvelope, o.outboundQueueSize())
logger := o.logger()
eventMap, err := o.eventMap()
if err != nil {
return nil, nil, err
}
logger.Info("eventMap created", zap.Any("eventMap", eventMap))
return &eventDispatcher{
logger: logger,
urlFilter: urlFilter,
method: o.method(),
timeout: o.requestTimeout(),
authorizationKey: o.authKey(),
eventMap: eventMap,
queueSize: om.QueueSize,
source: o.source(),
droppedMessages: om.DroppedMessages,
outboundEvents: om.OutboundEvents,
outbounds: outbounds,
}, outbounds, nil
}
// OnDeviceEvent is the device.Listener function that processes outbound events.
func (d *eventDispatcher) OnDeviceEvent(event *device.Event) {
// TODO improve how we test dispatchEvent & dispatchTo
var (
err error
scheme = unknown
)
defer func() {
if r := recover(); nil != r {
d.logger.Debug("stacktrace from panic", zap.String("stacktrace", string(debug.Stack())), zap.Any("panic", r))
switch event.Type {
case device.Connect, device.Disconnect, device.MessageReceived:
d.logger.Error("Dropped message, event not sent", zap.String(schemeLabel, scheme), zap.String(codeLabel, messageDroppedCode), zap.String(reasonLabel, panicReason), zap.Any("panic", r))
d.droppedMessages.With(prometheus.Labels{schemeLabel: scheme, codeLabel: messageDroppedCode, reasonLabel: panicReason}).Add(1.0)
d.outboundEvents.With(prometheus.Labels{schemeLabel: scheme, reasonLabel: panicReason, outcomeLabel: failureOutcome}).Add(1.0)
}
}
}()
if event == nil {
d.logger.Error("Error nil event")
return
}
switch event.Type {
case device.Connect:
scheme = wrp.SchemeEvent
eventType, message := newOnlineMessage(d.source, event.Device)
_, err = d.encodeAndDispatchEvent(eventType, wrp.Msgpack, message)
if err != nil {
d.logger.Error("Error dispatching online event", zap.Any("eventType", eventType), zap.Any("destination", message.Destination), zap.Error(err))
}
case device.Disconnect:
scheme = wrp.SchemeEvent
eventType, message := newOfflineMessage(d.source, event.Device)
_, err = d.encodeAndDispatchEvent(eventType, wrp.Msgpack, message)
if err != nil {
d.logger.Error("Error dispatching offline event", zap.Any("eventType", eventType), zap.Any("destination", message.Destination), zap.Error(err))
}
case device.MessageReceived:
scheme, err = d.routeMessageReceivedEvent(event)
if err != nil {
scheme = unknown
}
default:
err = ErrorUnsupportedEvent
}
var outboundEventsLabels prometheus.Labels
if err != nil {
reason := getDroppedMessageReason(err)
outboundEventsLabels = prometheus.Labels{schemeLabel: scheme, reasonLabel: reason, outcomeLabel: failureOutcome}
if errors.Is(err, ErrorUnsupportedEvent) {
d.logger.Debug("Dropped message, event not sent", zap.String(schemeLabel, scheme), zap.String(codeLabel, messageDroppedCode), zap.String(reasonLabel, reason), zap.Error(err))
} else {
d.logger.Error("Dropped message, event not sent", zap.String(schemeLabel, scheme), zap.String(codeLabel, messageDroppedCode), zap.String(reasonLabel, reason), zap.Error(err))
}
d.droppedMessages.With(prometheus.Labels{schemeLabel: scheme, codeLabel: messageDroppedCode, reasonLabel: reason}).Add(1.0)
} else {
outboundEventsLabels = prometheus.Labels{schemeLabel: scheme, reasonLabel: noErrReason, outcomeLabel: successOutcome}
}
d.outboundEvents.With(outboundEventsLabels).Add(1.0)
}
func (d *eventDispatcher) routeMessageReceivedEvent(event *device.Event) (scheme string, err error) {
routable, ok := event.Message.(wrp.Routable)
if !ok {
return "", errors.New("wrp event message is not routable")
}
destination := routable.To()
contentType := event.Format.ContentType()
var l wrp.Locator
if l, err = wrp.ParseLocator(destination); err != nil {
return "", err
}
scheme = l.Scheme
eventType := l.Authority
switch scheme {
case wrp.SchemeEvent:
_, err = d.dispatchEvent(eventType, contentType, event.Contents)
case wrp.SchemeDNS:
url := l.Authority + l.Ignored
// `l.Authority + l.Ignored` is used because incoming dns events are expected to have the format `dns:some_url` or dns:some_scheme://some_url.
_, err = d.dispatchTo(url, contentType, event.Contents)
default:
scheme = unknown
err = ErrorUnroutableDestination
}
if err != nil {
d.logger.Error("Error dispatching event", zap.String(schemeLabel, scheme), zap.Any("destination", destination), zap.Error(err))
}
return scheme, err
}
// send wraps the given request in an outboundEnvelope together with a cancellable context,
// then asynchronously sends that request to the outbounds channel. This method will
// block on the outbound channel only as long as the context is not canceled, i.e. does not time out.
// If the context is canceled before the envelope can be queued, this method drops the message
// and returns an error.
func (d *eventDispatcher) send(parent context.Context, request *http.Request) error {
// increment the queue size first, so that we always keep a positive queue size
d.queueSize.Add(1.0)
ctx, cancel := context.WithTimeout(parent, d.timeout)
select {
case d.outbounds <- outboundEnvelope{request.WithContext(ctx), cancel}:
return nil
default:
d.queueSize.Add(-1.0) // the message never made it to the queue
return ErrOutboundQueueFull
}
}
// newRequest creates a basic HTTP request appropriate for this eventDispatcher.
func (d *eventDispatcher) newRequest(url, contentType string, body io.Reader) (*http.Request, error) {
request, err := http.NewRequest(d.method, url, body)
if err != nil {
return request, fmt.Errorf("%w: %s", ErrorMalformedHttpRequest, err)
}
request.Header.Set("Content-Type", contentType)
// TODO: Need to work out how to handle authorization better, without basic auth
if len(d.authorizationKey) > 0 {
request.Header.Set("Authorization", "Basic "+d.authorizationKey)
}
return request, nil
}
func (d *eventDispatcher) dispatchEvent(eventType, contentType string, contents []byte) (string, error) {
url := unknown
endpoints, ok := d.eventMap.Get(eventType, DefaultEventType)
if !ok {
// allow no endpoints, but log an error since this means that we're dropping
// traffic explicitly because of configuration
return url, fmt.Errorf("%w: %s", ErrorNoEndpointConfiguredForEvent, eventType)
}
ctx := context.WithValue(
context.Background(), schemeContextKey{},
wrp.SchemeEvent,
)
for _, url = range endpoints {
request, err := d.newRequest(url, contentType, bytes.NewReader(contents))
if err != nil {
return url, err
}
url = request.URL.String()
if err := d.send(ctx, request); err != nil {
return url, err
}
}
return url, nil
}
func (d *eventDispatcher) encodeAndDispatchEvent(eventType string, format wrp.Format, message *wrp.Message) (string, error) {
var (
err error
url = unknown
)
var (
contents []byte
encoder = wrp.NewEncoderBytes(&contents, format)
)
if err = encoder.Encode(message); err != nil {
return url, fmt.Errorf("%w; %s", ErrorEncodingFailed, err)
}
if url, err = d.dispatchEvent(eventType, format.ContentType(), contents); err != nil {
return url, err
}
return url, nil
}
func (d *eventDispatcher) dispatchTo(unfiltered string, contentType string, contents []byte) (string, error) {
var (
err error
url = unfiltered
)
url, err = d.urlFilter.Filter(unfiltered)
if err != nil {
return url, err
}
request, err := d.newRequest(url, contentType, bytes.NewReader(contents))
if err != nil {
return url, err
}
return request.URL.String(), d.send(
context.WithValue(context.Background(), schemeContextKey{},
wrp.SchemeDNS),
request,
)
}
func getDroppedMessageReason(err error) string {
if err == nil {
return noErrReason
} else if errors.Is(err, ErrorEncodingFailed) {
return encodeErrReason
} else if errors.Is(err, ErrorNoEndpointConfiguredForEvent) {
return noEndpointConfiguredForEventReason
} else if errors.Is(err, ErrorURLSchemeNotAllowed) {
return urlSchemeNotAllowedReason
} else if errors.Is(err, ErrorMalformedHttpRequest) {
return malformedHTTPRequestReason
} else if errors.Is(err, ErrOutboundQueueFull) {
return fullQueueReason
} else if errors.Is(err, ErrorUnroutableDestination) {
return unroutableDestinationReason
} else if errors.Is(err, ErrorUnsupportedEvent) {
return notSupportedEventReason
}
// check for http `Do` related errors
return getDoErrReason(err)
}