Skip to content

Commit

Permalink
DSET-4559: Move grouped attributes to session info
Browse files Browse the repository at this point in the history
  • Loading branch information
martin-majlis-s1 committed Nov 16, 2023
1 parent b473f82 commit 965a4dc
Show file tree
Hide file tree
Showing 6 changed files with 471 additions and 174 deletions.
1 change: 1 addition & 0 deletions pkg/api/add_events/add_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (
AttrBundleKey = "bundle_key"
AttrServerHost = "serverHost"
AttrOrigServerHost = "__origServerHost"
AttrLogFile = "logfile"
)

type (
Expand Down
27 changes: 0 additions & 27 deletions pkg/api/add_events/event_bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,6 @@

package add_events

import (
"crypto/md5"
"encoding/hex"
"fmt"
)

// EventBundle represents a single DataSet event wrapper structure (see https://app.scalyr.com/help/api#addEvents)
// Event - Zero or more events (log messages) to upload.
// Thread - Optional. Lets you create a readable name for each thread in Event.
Expand All @@ -32,24 +26,3 @@ type EventBundle struct {
Thread *Thread
Log *Log
}

func (bundle *EventBundle) Key(groupBy []string) string {
// construct key
key := ""
for _, k := range groupBy {
val, ok := bundle.Event.Attrs[k]
if ok {
key += fmt.Sprintf("%s:%s", k, val)
}
}

// use md5 to shorten the key
hash := md5.Sum([]byte(key))
bundleKey := hex.EncodeToString(hash[:])

// add the key as attribute
bundle.Event.Attrs[AttrBundleKey] = bundleKey

// return the key
return bundleKey
}
37 changes: 0 additions & 37 deletions pkg/api/add_events/event_bundle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,40 +15,3 @@
*/

package add_events

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestEventBundle(t *testing.T) {
event := &Event{
Thread: "5",
Sev: 3,
Ts: "0",
Attrs: map[string]interface{}{
"foo": "a",
"bar": "b",
"baz": "a",
},
}
bundle := &EventBundle{Event: event}

keyFoo := bundle.Key([]string{"foo"})
keyBar := bundle.Key([]string{"bar"})
keyBaz := bundle.Key([]string{"baz"})
keyNotThere1 := bundle.Key([]string{"notThere1"})
keyNotThere2 := bundle.Key([]string{"notThere2"})

assert.Equal(t, "ef9faec68698672038857b2647429002", keyFoo)
assert.Equal(t, "55a2f7ebf2af8927837c599131d32d07", keyBar)
assert.Equal(t, "6dd515483537f552fd5fa604cd60f0d9", keyBaz)
assert.Equal(t, "d41d8cd98f00b204e9800998ecf8427e", keyNotThere1)
assert.Equal(t, "d41d8cd98f00b204e9800998ecf8427e", keyNotThere2)

// although the value is same, key should be different because attributes differ
assert.NotEqual(t, keyBaz, keyFoo)
// non-existing attributes should have the same key
assert.Equal(t, keyNotThere1, keyNotThere2)
}
71 changes: 50 additions & 21 deletions pkg/client/add_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package client

import (
"crypto/md5"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -46,12 +48,56 @@ type EventWithMeta struct {
SessionInfo add_events.SessionInfo
}

func NewEventWithMeta(bundle *add_events.EventBundle, groupBy []string) EventWithMeta {
func NewEventWithMeta(
bundle *add_events.EventBundle,
groupBy []string,
serverHost string,
) EventWithMeta {
// initialise
key := ""
info := make(add_events.SessionInfo)

// if event's ServerHost is set, use it
if len(bundle.Event.ServerHost) > 0 {
bundle.Event.Attrs[add_events.AttrOrigServerHost] = bundle.Event.ServerHost
} else {
// if ServerHost is set in config, use it
if len(serverHost) > 0 {
bundle.Event.Attrs[add_events.AttrOrigServerHost] = serverHost
} else {
// if somebody is using library directly and forget to set Event.ServerHost,
// lets check the attribute
attrHost, ok := bundle.Event.Attrs[add_events.AttrServerHost]
if ok {
// it's in attributes, so lets used this value
bundle.Event.Attrs[add_events.AttrOrigServerHost] = attrHost
}
}
}
delete(bundle.Event.Attrs, add_events.AttrServerHost)

// iterate over attributes and build structures
for _, k := range groupBy {
val, ok := bundle.Event.Attrs[k]
key += k + ":"
if ok {
key += fmt.Sprintf("%s", val)

// move to session info and remove from attributes
info[k] = val
delete(bundle.Event.Attrs, k)
}
key += "___DELIM___"
}

// use md5 to shorten the key
hash := md5.Sum([]byte(key))
bundleKey := hex.EncodeToString(hash[:])
info[add_events.AttrBundleKey] = bundleKey

return EventWithMeta{
EventBundle: bundle,
Key: bundle.Key(groupBy),
Key: bundleKey,
SessionInfo: info,
}
}
Expand All @@ -71,12 +117,12 @@ func (client *DataSetClient) AddEvents(bundles []*add_events.EventBundle) error
// store there information about the host
bundlesWithMeta := make(map[string][]EventWithMeta)
for _, bundle := range bundles {
bWM := NewEventWithMeta(bundle, client.Config.BufferSettings.GroupBy)
bWM := NewEventWithMeta(bundle, client.Config.BufferSettings.GroupBy, client.serverHost)
list, found := bundlesWithMeta[bWM.Key]
if !found {
bundlesWithMeta[bWM.Key] = []EventWithMeta{bWM}
} else {
_ = append(list, bWM)
bundlesWithMeta[bWM.Key] = append(list, bWM)
}
}

Expand Down Expand Up @@ -111,22 +157,6 @@ func (client *DataSetClient) AddEvents(bundles []*add_events.EventBundle) error
return nil
}

// fixServerHostsInBundle fills the attribute __origServerHost for the event
// and removes the attribute serverHost. This is needed to properly associate
// incoming events with the correct host
func (client *DataSetClient) fixServerHostsInBundle(bundle *add_events.EventBundle) {
delete(bundle.Event.Attrs, add_events.AttrServerHost)

// set the attribute __origServerHost to the event's ServerHost
if len(bundle.Event.ServerHost) > 0 {
bundle.Event.Attrs[add_events.AttrOrigServerHost] = bundle.Event.ServerHost
return
}

// as fallback use the value set to the client
bundle.Event.Attrs[add_events.AttrOrigServerHost] = client.serverHost
}

func (client *DataSetClient) newEventBundleSubscriberRoutine(key string) {
ch := client.eventBundlePerKeyTopic.Sub(key)
client.eventBundleSubscriptionChannels[key] = ch
Expand Down Expand Up @@ -184,7 +214,6 @@ func (client *DataSetClient) listenAndSendBundlesForKey(key string, ch chan inte
bundle, ok := msg.(EventWithMeta)
if ok {
buf := getBuffer(key)
client.fixServerHostsInBundle(bundle.EventBundle)

added, err := buf.AddBundle(bundle.EventBundle)
if err != nil {
Expand Down
Loading

0 comments on commit 965a4dc

Please sign in to comment.