-
Notifications
You must be signed in to change notification settings - Fork 0
/
events_bstates.go
458 lines (424 loc) · 15.5 KB
/
events_bstates.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
package idefixgo
import (
"crypto/sha256"
"encoding/base64"
"encoding/json"
"fmt"
"sync"
"time"
"github.com/jaracil/ei"
be "github.com/nayarsystems/bstates"
"github.com/nayarsystems/idefix-go/eval"
"github.com/nayarsystems/idefix-go/messages"
)
// GetBstatesParams defines the parameters used to retrieve a list of Bstates from the remote system.
// Each field corresponds to a specific filter or setting for the request, allowing for flexible
// querying of Bstates based on various criteria.
type GetBstatesParams struct {
UID string // Unique identifier for the event.
Domain string // Specifies the domain from which to retrieve.
Since time.Time // A timestamp indicating the starting point for retrieving Bstates; only Bstates after this time will be returned.
Limit uint // The maximum number of results to return in the response.
Cid string // ContinuationID lets you get following results after your last request.
AddressFilter string // An optional filter to limit results based on specific addresses.
MetaFilter eval.CompiledExpr // A compiled expression used to filter results based on metadata criteria.
Timeout time.Duration // The maximum duration to wait for the request to complete before timing out.
ForceTsField string // Specifies the field used for the timestamp if needed, overriding default behavior.
RawTsFieldYearOffset uint // An optional offset to apply to the year field in raw timestamp data.
RawTsFieldFactor float32 // A scaling factor applied to the raw timestamp data for accurate representation.
}
// Bstate represents a single snapshot of the state of a device at a specific point in time.
// It consists of a timestamp indicating when the snapshot was recorded and the actual state data.
// For more information see Bstates documentation: https://github.com/nayarsystems/bstates
type Bstate struct {
Timestamp time.Time
State *be.State
}
// BstatesBlob represents a collection of Bstates along with associated metadata.
// It contains information about the unique identifier of the user, the timestamp
// when the collection was created, the actual states, and the raw byte data
// representing the compressed queue of states.
type BstatesBlob struct {
UID string
Timestamp time.Time
States []*Bstate
Raw []byte
}
// BstatesSource represents a source of Bstates that includes metadata and a collection of
// BstatesBlob instances. It provides contextual information for processing the Bstates.
type BstatesSource struct {
Meta map[string]interface{}
MetaRaw string
Blobs []*BstatesBlob
timestampField string
timestampFieldYearOffset int
timestampFieldFactor float32
}
// domain -> address -> schema -> meta-hash -> source of states
// GetBstatesResult represents the result of retrieving bstate information
// from the Idefix system. It is structured as a nested map to provide
// easy access to bstate sources.
type GetBstatesResult = map[string]map[string]map[string]map[string]*BstatesSource
// GetSchema retrieves the schema associated with the provided hash from the Idefix service.
//
// This method sends a request to the Idefix service to fetch the schema by hash.
// It uses a message structure that specifies the hash and whether to perform a check.
//
// If the request is successful, the function returns the response containing the schema information.
// If an error occurs during the request, it returns nil along with the error.
func (c *Client) GetSchema(hash string, timeout time.Duration) (*messages.SchemaGetResponseMsg, error) {
msg := messages.SchemaGetMsg{
Hash: hash,
Check: false,
}
resp := &messages.SchemaGetResponseMsg{}
err := c.Call2("idefix", &messages.Message{To: "schemas.get", Data: &msg}, resp, timeout)
if err != nil {
return nil, err
}
return resp, nil
}
// GetBstates retrieves bstates events from cloud based on the provided parameters.
//
// If a UID is specified in the parameters, the function first fetches the event associated with that UID.
// It then populates the provided state map with the states extracted from the event. If no states are found,
// an error is returned indicating that the event is not based on bstates.
//
// If no UID is specified, the function retrieves bstates using the provided parameters and populates the state map accordingly.
//
// The function returns the total number of blobs processed, the continuation ID, and any error encountered during the process.
func GetBstates(ic *Client, p *GetBstatesParams, stateMap GetBstatesResult) (totalBlobs uint, cid string, err error) {
if p.UID != "" {
var res *messages.EventsGetUIDResponseMsg
res, err = ic.GetEventByUID(p.UID, p.Timeout)
if err != nil {
return
}
input := []*messages.Event{
&res.Event,
}
totalBlobs, err = fillStateMap(ic, input, p, stateMap)
if totalBlobs == 0 {
err = fmt.Errorf("not a bstates based event")
}
return
}
totalBlobs, p.Cid, err = getBstates(ic, p, stateMap)
cid = p.Cid
return
}
func getBstates(ic *Client, p *GetBstatesParams, stateMap GetBstatesResult) (numblobs uint, cid string, err error) {
m, err := ic.GetEvents(p.Domain, p.AddressFilter, p.Since, p.Limit, p.Cid, p.Timeout)
if err != nil {
return
}
cid = m.ContinuationID
numblobs, err = fillStateMap(ic, m.Events, p, stateMap)
return
}
func fillStateMap(ic *Client, events []*messages.Event, p *GetBstatesParams, stateMap GetBstatesResult) (numblobs uint, err error) {
for _, e := range events {
if parseEvent, _ := evalMeta(e.Meta, p.MetaFilter); !parseEvent {
continue
}
schemaId, serr := messages.BstatesParseSchemaIdFromType(e.Type)
if serr != nil {
continue
}
var blob []byte
var payloadErr error = nil
rawMsi, ok := e.Payload.(map[string]interface{})
if ok {
blobI, ok := rawMsi["Data"]
if !ok {
fmt.Println("no 'Data' field found")
continue
}
switch v := blobI.(type) {
case []byte:
blob = v
case string:
var derr error
blob, derr = base64.StdEncoding.DecodeString(v)
if derr != nil {
payloadErr = fmt.Errorf("'Data' is a string but is not valid base64: %v", derr)
}
default:
payloadErr = fmt.Errorf("can't get a buffer from 'Data' field")
}
if payloadErr != nil {
fmt.Printf("%v\n", payloadErr)
continue
}
} else {
b64Str, ok := e.Payload.(string)
if !ok {
fmt.Println("wrong payload format")
}
var derr error
blob, derr = base64.StdEncoding.DecodeString(b64Str)
if derr != nil {
fmt.Printf("payload is a string but is not a valid base64: %v", derr)
continue
}
}
var schema *be.StateSchema
if schema = getSchemaFromCache(schemaId); schema == nil {
schemaMsg, serr := ic.GetSchema(schemaId, time.Second)
if serr != nil {
fmt.Printf("schema '%s' was not found: %v\n", schemaId, serr)
continue
}
schema = &be.StateSchema{}
serr = schema.UnmarshalJSON([]byte(schemaMsg.Payload))
if serr != nil {
fmt.Printf("can't parse schema '%s': %v\n", schemaId, serr)
continue
}
saveSchemaOnCache(schemaId, schema)
}
var domainMap map[string]map[string]map[string]*BstatesSource
if domainMap, ok = stateMap[e.Domain]; !ok {
domainMap = map[string]map[string]map[string]*BstatesSource{}
stateMap[e.Domain] = domainMap
}
var addressMap map[string]map[string]*BstatesSource
if addressMap, ok = domainMap[e.Address]; !ok {
addressMap = map[string]map[string]*BstatesSource{}
domainMap[e.Address] = addressMap
}
var schemaMap map[string]*BstatesSource
if schemaMap, ok = addressMap[schemaId]; !ok {
schemaMap = map[string]*BstatesSource{}
addressMap[schemaId] = schemaMap
}
metaRaw, merr := json.Marshal(e.Meta)
if merr != nil {
fmt.Printf("can't get raw meta: %v\n", merr)
continue
}
metaHashRaw := sha256.Sum256(metaRaw)
metaHash := base64.StdEncoding.EncodeToString(metaHashRaw[:])
stateSource, ok := schemaMap[metaHash]
if !ok {
stateSource = &BstatesSource{
Meta: e.Meta,
MetaRaw: string(metaRaw),
}
schemaMap[metaHash] = stateSource
}
// get timestamp field
dfields := schema.GetDecodedFields()
tsFieldName := ""
for _, f := range dfields {
if f.Decoder.Name() == be.NumberToUnixTsMsDecoderType {
if p.ForceTsField == "" {
tsFieldName = f.Name
break
} else {
if p.ForceTsField == f.Name {
tsFieldName = f.Name
break
}
}
}
}
if tsFieldName == "" && p.ForceTsField != "" {
// Let's check if forceTsField is a raw numeric field
rawFields := schema.GetFields()
for _, f := range rawFields {
if f.Name == p.ForceTsField {
tsFieldName = f.Name
stateSource.timestampFieldYearOffset = int(p.RawTsFieldYearOffset)
stateSource.timestampFieldFactor = p.RawTsFieldFactor
break
}
}
}
stateSource.timestampField = tsFieldName
var beBlob *BstatesBlob
beBlob, err = createBlobInfo(stateSource, schema, e.UID, e.Timestamp, blob)
if err != nil {
return
}
stateSource.Blobs = append(stateSource.Blobs, beBlob)
numblobs += 1
if numblobs == p.Limit {
return
}
}
return
}
// BenchmarkBstates measures and compares the sizes of BstatesBlob against different compression pipelines.
//
// This function takes a BstatesBlob and a list of Bstates, extracts the state data, and calculates the uncompressed size.
// It then compares this size with the compressed sizes produced by two different pipelines ("gzip" and "transposed + gzip").
// The results are printed, including the number of states, the uncompressed size, the raw blob size, and the compressed blob sizes.
// If there are no states to process, the function returns immediately.
func BenchmarkBstates(blob *BstatesBlob, bstates []*Bstate) {
states := []*be.State{}
for _, s := range bstates {
states = append(states, s.State)
}
if len(states) == 0 {
return
}
stateSize := states[0].GetByteSize()
fmt.Printf("states count: %d\n", len(states))
fmt.Printf("state size (B): %d\n", stateSize)
uncompressedSize := stateSize * len(states)
fmt.Printf("total states size (B): %d\n", uncompressedSize)
fmt.Printf("received blob size (B): %d (%.2f %%)\n", len(blob.Raw), float32(len(blob.Raw))/float32(uncompressedSize)*100)
pipeline := "z"
size, err := getSizeUsingNewPipeline(states, pipeline)
if err != nil {
fmt.Printf("error: %v\n", err)
return
}
fmt.Printf("blob size using pipeline \"%s\": %d (%.2f %%)\n", pipeline, size, float32(size)/float32(uncompressedSize)*100)
pipeline = "t:z"
size, err = getSizeUsingNewPipeline(states, pipeline)
if err != nil {
fmt.Printf("error: %v\n", err)
return
}
fmt.Printf("blob size using pipeline \"%s\": %d (%.2f %%)\n", pipeline, size, float32(size)/float32(uncompressedSize)*100)
}
func getSizeUsingNewPipeline(states []*be.State, pipeline string) (size uint, err error) {
schema, err := updateSchemaPipeline(states[0].GetSchema(), pipeline)
if err != nil {
return
}
// Every input state has a reference to the original schema that was used to encode it.
// The same schema is used by the StateQueue since it needs to know the fields structure of every state
// to generate a blob (multiple states encoded). The schema contains the pipeline used to encode the StateQueue.
// We cannot use the input states to create a new StateQueue (blob) that uses the schema with the pipeline updated
// since it would cause an error due to a schema hash mismatch between the input state's schemas and the new StateQueue
// (despite having the same field structure).
// TODO: bstates lib should allow the updates of a StateQueue using a state that has
// compatible fields structure.
// So, we need to create new states with the schema with the pipeline updated
stateQueue := be.CreateStateQueue(schema)
for _, s := range states {
rawState, err := s.Encode()
if err != nil {
return 0, err
}
newState, err := be.CreateState(schema)
if err != nil {
return 0, err
}
newState.Decode(rawState)
err = stateQueue.Push(newState)
if err != nil {
return 0, err
}
}
blob, err := stateQueue.Encode()
if err != nil {
return 0, err
}
return uint(len(blob)), nil
}
func updateSchemaPipeline(schema *be.StateSchema, pipeline string) (newSchema *be.StateSchema, err error) {
schemaMsi := schema.ToMsi()
schemaMsi["encoderPipeline"] = pipeline
var newSchemaRaw []byte
newSchemaRaw, err = json.Marshal(schemaMsi)
if err != nil {
return nil, fmt.Errorf("fix me: can't create new schema: %v", err)
}
newSchema = &be.StateSchema{}
err = json.Unmarshal(newSchemaRaw, &newSchema)
if err != nil {
return nil, fmt.Errorf("fix me: can't decode new schema: %v", err)
}
return
}
func getTimestampValue(s *be.State, tsField string, tsYearOffset int, tsFactor float32) (time.Time, error) {
tsmsI, err := s.Get(tsField)
if err != nil {
return time.Time{}, fmt.Errorf("fix me: cannot get '%s' value: %v", tsField, err)
}
tsms, err := ei.N(tsmsI).Float64()
if err != nil {
return time.Time{}, fmt.Errorf("fix me: cannot get '%s' value: %v", tsField, err)
}
if tsYearOffset == 0 {
return time.UnixMilli(int64(tsms)), nil
}
offsetDate := time.Date(int(tsYearOffset), time.January, 1, 0, 0, 0, 0, time.UTC)
offsetDateUnixMs := offsetDate.UnixMilli()
// convert to millis using given factor
unixTsMs := uint64(offsetDateUnixMs + int64(tsms*float64(tsFactor)))
return time.UnixMilli(int64(unixTsMs)), nil
}
func evalMeta(meta map[string]interface{}, expr eval.CompiledExpr) (bool, error) {
metaEnv := map[string]interface{}{}
for k, v := range meta {
metaEnv[k] = v
}
res := eval.EvalCompiled(expr, metaEnv)
switch res.Res {
case eval.ResInvalidExpr, eval.ResInvalidOp, eval.ResTypeMismatch:
return false, fmt.Errorf("filter expression error (%d): %s", res.Res, res.Iden)
}
if res.Res == eval.ResOK {
return true, nil
}
return false, nil
}
// GetDeltaStates processes a slice of Bstate objects and retrieves the delta states
// represented as a slice of maps. Each Bstate contains a State that is processed to
// derive the delta states, which are useful for tracking changes between different
// states over time.
func GetDeltaStates(in []*Bstate) ([]map[string]interface{}, error) {
var states []*be.State
for _, bs := range in {
states = append(states, bs.State)
}
msiEvents, err := be.GetDeltaMsiStates(states)
return msiEvents, err
}
func createBlobInfo(source *BstatesSource, schema *be.StateSchema, uid string, ts time.Time, raw []byte) (res *BstatesBlob, err error) {
decoder := be.CreateStateQueue(schema)
err = decoder.Decode([]byte(raw))
if err != nil {
return nil, fmt.Errorf("can't decode event: %v", err)
}
states, err := decoder.GetStates()
if err != nil {
return nil, fmt.Errorf("can't decode event: %v", err)
}
res = &BstatesBlob{
Timestamp: ts,
UID: uid,
Raw: raw,
}
for _, s := range states {
bstate := &Bstate{
State: s,
}
if source.timestampField != "" {
ts, err := getTimestampValue(s, source.timestampField, source.timestampFieldYearOffset, source.timestampFieldFactor)
if err != nil {
return nil, err
}
bstate.Timestamp = ts
}
res.States = append(res.States, bstate)
}
return res, err
}
var schemasMap = map[string]*be.StateSchema{}
var schemasMapMutex sync.Mutex
func getSchemaFromCache(schemaId string) *be.StateSchema {
schemasMapMutex.Lock()
defer schemasMapMutex.Unlock()
return schemasMap[schemaId]
}
func saveSchemaOnCache(schemaId string, schema *be.StateSchema) {
schemasMapMutex.Lock()
defer schemasMapMutex.Unlock()
schemasMap[schemaId] = schema
}