-
Notifications
You must be signed in to change notification settings - Fork 20
/
message.go
421 lines (349 loc) · 9.93 KB
/
message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
// Package message provides functions for managing data used by conditions and transforms.
package message
import (
"bytes"
"encoding/json"
"fmt"
"math"
"strings"
"unicode/utf8"
"github.com/brexhq/substation/v2/internal/base64"
"github.com/tidwall/gjson"
"github.com/tidwall/sjson"
)
type Flag int
const (
// metaKey is a prefix used to access the meta field in a Message.
metaKey = "meta "
// IsControl indicates that the message is a control message.
IsControl Flag = iota
// SkipNullValues indicates that null values should be ignored when processing the message.
SkipNullValues
// SkipMissingValues indicates that missing values should be ignored when processing the message.
SkipMissingValues
// SkipEmptyValues indicates that empty values should be ignored when processing the message.
SkipEmptyValues
)
// errSetRawInvalidValue is returned when setRaw receives an invalid interface type.
var errSetRawInvalidValue = fmt.Errorf("invalid value type")
// Message is the data structure that is handled by transforms and interpreted by
// conditions.
//
// Data in each message can be accessed and modified as JSON text or binary data:
// - JSON text is accessed using the GetValue, SetValue, and DeleteValue methods.
// - Binary data is accessed using the Data and SetData methods.
//
// Metadata is an additional data field that is meant to store information about the
// message, but can be used for any purpose. For JSON text, metadata is accessed using
// the GetValue, SetValue, and DeleteValue methods with a key prefixed with "meta" (e.g.
// "meta foo"). Binary metadata is accessed using the Metadata and SetMetadata methods.
//
// Messages can also be configured as "control messages." Control messages are used for flow
// control in Substation functions and applications, but can be used for any purpose depending
// on the needs of a transform or condition. These messages should not contain data or metadata.
type Message struct {
data []byte
meta []byte
// ctrl is a flag that indicates if the message is a control message.
//
// Control messages trigger special behavior in transforms and conditions.
ctrl bool
flags []Flag
}
// String returns the message data as a string.
func (m *Message) String() string {
return string(m.data)
}
// New returns a new Message.
func New(opts ...func(*Message)) *Message {
msg := &Message{}
for _, o := range opts {
o(msg)
}
return msg
}
// HasFlag returns true if the message contains a flag.
func (m *Message) HasFlag(i Flag) bool {
for _, f := range m.flags {
if f == i {
return true
}
}
return false
}
func (m *Message) SkipNullValues() *Message {
m.flags = append(m.flags, SkipNullValues)
return m
}
func (m *Message) SkipMissingValues() *Message {
m.flags = append(m.flags, SkipMissingValues)
return m
}
func (m *Message) SkipEmptyValues() *Message {
m.flags = append(m.flags, SkipEmptyValues)
return m
}
// AsControl sets the message as a control message.
func (m *Message) AsControl() *Message {
m.data = nil
m.meta = nil
m.ctrl = true
m.flags = append(m.flags, IsControl)
return m
}
// IsControl returns true if the message is a control message.
func (m *Message) IsControl() bool {
return m.ctrl
}
// Data returns the message data.
func (m *Message) Data() []byte {
if m.ctrl {
return nil
}
return m.data
}
// SetData sets the message data.
func (m *Message) SetData(data []byte) *Message {
if m.ctrl {
return m
}
m.data = data
return m
}
// Metadata returns the message metadata.
func (m *Message) Metadata() []byte {
if m.ctrl {
return nil
}
return m.meta
}
// SetMetadata sets the message metadata.
func (m *Message) SetMetadata(metadata []byte) *Message {
if m.ctrl {
return m
}
m.meta = metadata
return m
}
// GetValue returns a value from the message data or metadata.
//
// If the key is prefixed with "meta" (e.g. "meta foo"), then
// the value is retrieved from the metadata field, otherwise it
// is retrieved from the data field.
//
// This only works with JSON text. If the message data or metadata
// is not JSON text, then an empty value is returned.
func (m *Message) GetValue(key string) Value {
if strings.HasPrefix(key, metaKey) {
key = strings.TrimPrefix(key, metaKey)
key = strings.TrimSpace(key)
v := gjson.GetBytes(m.meta, key)
return Value{gjson: v}
}
key = strings.TrimSpace(key)
v := gjson.GetBytes(m.data, key)
return Value{gjson: v}
}
// SetValue sets a value in the message data or metadata.
//
// If the key is prefixed with "meta" (e.g. "meta foo"), then
// the value is placed into the metadata field, otherwise it
// is placed into the data field.
//
// This only works with JSON text. If the message data or metadata
// is not JSON text, then this method does nothing.
func (m *Message) SetValue(key string, value interface{}) error {
if strings.HasPrefix(key, metaKey) {
key = strings.TrimPrefix(key, metaKey)
key = strings.TrimSpace(key)
meta, err := setValue(m.meta, key, value)
if err != nil {
return err
}
m.meta = meta
return nil
}
key = strings.TrimSpace(key)
data, err := setValue(m.data, key, value)
if err != nil {
return err
}
m.data = data
return nil
}
// DeleteValue deletes a value in the message data or metadata.
//
// If the key is prefixed with "meta" (e.g. "meta foo"), then
// the value is removed from the metadata field, otherwise it
// is removed from the data field.
//
// This only works with JSON text. If the message data or metadata
// is not JSON text, then this method does nothing.
func (m *Message) DeleteValue(key string) error {
if strings.HasPrefix(key, metaKey) {
key = strings.TrimPrefix(key, metaKey)
key = strings.TrimSpace(key)
meta, err := deleteValue(m.meta, key)
if err != nil {
return err
}
m.meta = meta
return nil
}
data, err := deleteValue(m.data, key)
if err != nil {
return err
}
m.data = data
return nil
}
// Value is a wrapper around gjson.Result that provides a consistent interface
// for converting values from JSON text.
type Value struct {
gjson gjson.Result
}
// Value returns the value as an interface{}.
func (v Value) Value() any {
return v.gjson.Value()
}
// String returns the value as a string.
func (v Value) String() string {
return v.gjson.String()
}
// Bytes returns the value as a byte slice.
func (v Value) Bytes() []byte {
return []byte(v.gjson.String())
}
// Int returns the value as an int64.
func (v Value) Int() int64 {
return v.gjson.Int()
}
// Uint returns the value as a uint64.
func (v Value) Uint() uint64 {
return v.gjson.Uint()
}
// Float returns the value as a float64.
func (v Value) Float() float64 {
return v.gjson.Float()
}
// Bool returns the value as a bool.
func (v Value) Bool() bool {
return v.gjson.Bool()
}
// Array returns the value as a slice of Value.
func (v Value) Array() []Value {
var values []Value
for _, r := range v.gjson.Array() {
values = append(values, Value{gjson: r})
}
return values
}
// IsArray returns true if the value is an array.
func (v Value) IsArray() bool {
return v.gjson.IsArray()
}
// Map returns the value as a map of string to Value.
func (v Value) Map() map[string]Value {
values := make(map[string]Value)
for k, r := range v.gjson.Map() {
values[k] = Value{gjson: r}
}
return values
}
// IsObject returns true if the value is an object.
func (v Value) IsObject() bool {
return v.gjson.IsObject()
}
// Exists returns true if the value exists.
func (v Value) Exists() bool {
return v.gjson.Exists()
}
// IsNull returns true if the value is null.
func (v Value) IsNull() bool {
return v.gjson.Type == gjson.Null
}
// IsMissing returns true if the value is missing.
func (v Value) IsMissing() bool {
return !v.gjson.Exists()
}
// IsEmpty returns true if the value is an empty string,
// empty array, empty object, or null.
func (v Value) IsEmpty() bool {
if v.IsArray() {
return len(v.Array()) == 0
}
if v.IsObject() {
return v.String() == "{}"
}
// This catches all other types, including strings and null.
return v.String() == ""
}
func deleteValue(json []byte, key string) ([]byte, error) {
b, err := sjson.DeleteBytes(json, key)
if err != nil {
return nil, err
}
return b, nil
}
// sjson.SetBytesOptions is not used because transform benchmarks perform better with
// sjson.SetBytes (allocating a new byte slice). This may change if transforms are
// refactored.
func setValue(json []byte, key string, value interface{}) ([]byte, error) {
if validJSON(value) {
return setRaw(json, key, value)
}
switch v := value.(type) {
case []byte:
if utf8.Valid(v) {
return sjson.SetBytes(json, key, v)
} else {
return sjson.SetBytes(json, key, base64.Encode(v))
}
case Value:
// JSON number values can lose precision if not read with the right encoding.
// Determine if the value is an integer by checking if floating poit truncation has no
// affect of the value.
if v.gjson.Type == gjson.Number {
if v.Float() == math.Trunc(v.Float()) {
return sjson.SetBytes(json, key, v.Int())
}
return sjson.SetBytes(json, key, v.Float())
}
return sjson.SetBytes(json, key, v.Value())
default:
return sjson.SetBytes(json, key, v)
}
}
// sjson.SetRawBytesOptions is not used because transform benchmarks perform better with
// sjson.SetRawBytes (allocating a new byte slice). This may change if transforms are
// refactored.
func setRaw(json []byte, key string, value interface{}) ([]byte, error) {
switch v := value.(type) {
case []byte:
return sjson.SetRawBytes(json, key, v)
case string:
return sjson.SetRawBytes(json, key, []byte(v))
case Value:
return sjson.SetRawBytes(json, key, v.Bytes())
default:
return nil, errSetRawInvalidValue
}
}
func validJSON(data interface{}) bool {
switch v := data.(type) {
case []byte:
if !bytes.HasPrefix(v, []byte(`{`)) && !bytes.HasPrefix(v, []byte(`[`)) {
return false
}
return json.Valid(v)
case string:
if !strings.HasPrefix(v, `{`) && !strings.HasPrefix(v, `[`) {
return false
}
return json.Valid([]byte(v))
case Value:
return validJSON(v.String())
default:
return false
}
}