diff --git a/pkg/api/add_events/add_events.go b/pkg/api/add_events/add_events.go index 4e20431..0f13d5e 100644 --- a/pkg/api/add_events/add_events.go +++ b/pkg/api/add_events/add_events.go @@ -30,6 +30,7 @@ const ( AttrBundleKey = "bundle_key" AttrServerHost = "serverHost" AttrOrigServerHost = "__origServerHost" + AttrLogFile = "logfile" ) type ( diff --git a/pkg/api/add_events/event_bundle.go b/pkg/api/add_events/event_bundle.go index cba2ebf..47477ee 100644 --- a/pkg/api/add_events/event_bundle.go +++ b/pkg/api/add_events/event_bundle.go @@ -16,12 +16,6 @@ package add_events -import ( - "crypto/md5" - "encoding/hex" - "fmt" -) - // EventBundle represents a single DataSet event wrapper structure (see https://app.scalyr.com/help/api#addEvents) // Event - Zero or more events (log messages) to upload. // Thread - Optional. Lets you create a readable name for each thread in Event. @@ -32,24 +26,3 @@ type EventBundle struct { Thread *Thread Log *Log } - -func (bundle *EventBundle) Key(groupBy []string) string { - // construct key - key := "" - for _, k := range groupBy { - val, ok := bundle.Event.Attrs[k] - if ok { - key += fmt.Sprintf("%s:%s", k, val) - } - } - - // use md5 to shorten the key - hash := md5.Sum([]byte(key)) - bundleKey := hex.EncodeToString(hash[:]) - - // add the key as attribute - bundle.Event.Attrs[AttrBundleKey] = bundleKey - - // return the key - return bundleKey -} diff --git a/pkg/api/add_events/event_bundle_test.go b/pkg/api/add_events/event_bundle_test.go index 06e9d90..4abbf39 100644 --- a/pkg/api/add_events/event_bundle_test.go +++ b/pkg/api/add_events/event_bundle_test.go @@ -15,40 +15,3 @@ */ package add_events - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestEventBundle(t *testing.T) { - event := &Event{ - Thread: "5", - Sev: 3, - Ts: "0", - Attrs: map[string]interface{}{ - "foo": "a", - "bar": "b", - "baz": "a", - }, - } - bundle := &EventBundle{Event: event} - - keyFoo := bundle.Key([]string{"foo"}) - keyBar := bundle.Key([]string{"bar"}) - keyBaz := bundle.Key([]string{"baz"}) - keyNotThere1 := bundle.Key([]string{"notThere1"}) - keyNotThere2 := bundle.Key([]string{"notThere2"}) - - assert.Equal(t, "ef9faec68698672038857b2647429002", keyFoo) - assert.Equal(t, "55a2f7ebf2af8927837c599131d32d07", keyBar) - assert.Equal(t, "6dd515483537f552fd5fa604cd60f0d9", keyBaz) - assert.Equal(t, "d41d8cd98f00b204e9800998ecf8427e", keyNotThere1) - assert.Equal(t, "d41d8cd98f00b204e9800998ecf8427e", keyNotThere2) - - // although the value is same, key should be different because attributes differ - assert.NotEqual(t, keyBaz, keyFoo) - // non-existing attributes should have the same key - assert.Equal(t, keyNotThere1, keyNotThere2) -} diff --git a/pkg/client/add_events.go b/pkg/client/add_events.go index 89e38fe..ec93861 100644 --- a/pkg/client/add_events.go +++ b/pkg/client/add_events.go @@ -17,6 +17,8 @@ package client import ( + "crypto/md5" + "encoding/hex" "encoding/json" "errors" "fmt" @@ -46,12 +48,56 @@ type EventWithMeta struct { SessionInfo add_events.SessionInfo } -func NewEventWithMeta(bundle *add_events.EventBundle, groupBy []string) EventWithMeta { +func NewEventWithMeta( + bundle *add_events.EventBundle, + groupBy []string, + serverHost string, +) EventWithMeta { + // initialise + key := "" info := make(add_events.SessionInfo) + // if event's ServerHost is set, use it + if len(bundle.Event.ServerHost) > 0 { + bundle.Event.Attrs[add_events.AttrOrigServerHost] = bundle.Event.ServerHost + } else { + // if ServerHost is set in config, use it + if len(serverHost) > 0 { + bundle.Event.Attrs[add_events.AttrOrigServerHost] = serverHost + } else { + // if somebody is using library directly and forget to set Event.ServerHost, + // lets check the attribute + attrHost, ok := bundle.Event.Attrs[add_events.AttrServerHost] + if ok { + // it's in attributes, so lets used this value + bundle.Event.Attrs[add_events.AttrOrigServerHost] = attrHost + } + } + } + delete(bundle.Event.Attrs, add_events.AttrServerHost) + + // iterate over attributes and build structures + for _, k := range groupBy { + val, ok := bundle.Event.Attrs[k] + key += k + ":" + if ok { + key += fmt.Sprintf("%s", val) + + // move to session info and remove from attributes + info[k] = val + delete(bundle.Event.Attrs, k) + } + key += "___DELIM___" + } + + // use md5 to shorten the key + hash := md5.Sum([]byte(key)) + bundleKey := hex.EncodeToString(hash[:]) + info[add_events.AttrBundleKey] = bundleKey + return EventWithMeta{ EventBundle: bundle, - Key: bundle.Key(groupBy), + Key: bundleKey, SessionInfo: info, } } @@ -71,12 +117,12 @@ func (client *DataSetClient) AddEvents(bundles []*add_events.EventBundle) error // store there information about the host bundlesWithMeta := make(map[string][]EventWithMeta) for _, bundle := range bundles { - bWM := NewEventWithMeta(bundle, client.Config.BufferSettings.GroupBy) + bWM := NewEventWithMeta(bundle, client.Config.BufferSettings.GroupBy, client.serverHost) list, found := bundlesWithMeta[bWM.Key] if !found { bundlesWithMeta[bWM.Key] = []EventWithMeta{bWM} } else { - _ = append(list, bWM) + bundlesWithMeta[bWM.Key] = append(list, bWM) } } @@ -111,22 +157,6 @@ func (client *DataSetClient) AddEvents(bundles []*add_events.EventBundle) error return nil } -// fixServerHostsInBundle fills the attribute __origServerHost for the event -// and removes the attribute serverHost. This is needed to properly associate -// incoming events with the correct host -func (client *DataSetClient) fixServerHostsInBundle(bundle *add_events.EventBundle) { - delete(bundle.Event.Attrs, add_events.AttrServerHost) - - // set the attribute __origServerHost to the event's ServerHost - if len(bundle.Event.ServerHost) > 0 { - bundle.Event.Attrs[add_events.AttrOrigServerHost] = bundle.Event.ServerHost - return - } - - // as fallback use the value set to the client - bundle.Event.Attrs[add_events.AttrOrigServerHost] = client.serverHost -} - func (client *DataSetClient) newEventBundleSubscriberRoutine(key string) { ch := client.eventBundlePerKeyTopic.Sub(key) client.eventBundleSubscriptionChannels[key] = ch @@ -184,7 +214,6 @@ func (client *DataSetClient) listenAndSendBundlesForKey(key string, ch chan inte bundle, ok := msg.(EventWithMeta) if ok { buf := getBuffer(key) - client.fixServerHostsInBundle(bundle.EventBundle) added, err := buf.AddBundle(bundle.EventBundle) if err != nil { diff --git a/pkg/client/add_events_test.go b/pkg/client/add_events_test.go index 85484e8..9a3ed20 100644 --- a/pkg/client/add_events_test.go +++ b/pkg/client/add_events_test.go @@ -65,16 +65,21 @@ func extract(req *http.Request) (add_events.AddEventsRequest, error) { type ( tAttr = add_events.EventAttrs + tInfo = add_events.SessionInfo tEvent struct { attrs tAttr serverHost string } + tBundle struct { + attrs tAttr + info tInfo + } ) const attributeKey = "key" // byKey implement sort.Interface - https://pkg.go.dev/sort#Interface -type byKey [][]tAttr +type byKey [][]tBundle func (s byKey) Len() int { return len(s) @@ -87,12 +92,125 @@ func (s byKey) Swap(i, j int) { // Less returns true if ith element is nil or it's string representation // is before jth element. func (s byKey) Less(i, j int) bool { - if s[i][0][attributeKey] == nil { + if s[i][0].attrs[attributeKey] == nil && s[j][0].attrs[attributeKey] == nil { + if s[i][0].info[attributeKey] == nil { + return true + } else if s[j][0].info[attributeKey] == nil { + return false + } else { + return s[i][0].info[attributeKey].(string) < s[j][0].info[attributeKey].(string) + } + } else if s[i][0].attrs[attributeKey] == nil { return true - } else if s[j][0][attributeKey] == nil { + } else if s[j][0].attrs[attributeKey] == nil { return false } else { - return s[i][0][attributeKey].(string) < s[j][0][attributeKey].(string) + return s[i][0].attrs[attributeKey].(string) < s[j][0].attrs[attributeKey].(string) + } +} + +func TestNewEventWithMeta(t *testing.T) { + k1 := "k1" + k2 := "k2" + k3 := "k3" + k4 := "k4" + k5 := "k5" + v1 := "v1" + v2 := "v2" + v3 := "v3" + + tests := []struct { + name string + groupBy []string + expAttrs add_events.EventAttrs + expSessionInfo add_events.SessionInfo + }{ + // when no grouping is used, then attributes are kept + { + name: "empty group by", + groupBy: []string{}, + expAttrs: add_events.EventAttrs{ + k1: v1, k2: v2, k3: v3, + }, + expSessionInfo: add_events.SessionInfo{ + add_events.AttrBundleKey: "d41d8cd98f00b204e9800998ecf8427e", + }, + }, + + // group by not specified attribute - 1 + { + name: "group by unused attribute - 1", + groupBy: []string{k4}, + expAttrs: add_events.EventAttrs{ + k1: v1, k2: v2, k3: v3, + }, + expSessionInfo: add_events.SessionInfo{ + add_events.AttrBundleKey: "746ea36093d470e90a9c2fbf07c8ed17", + }, + }, + + // group by not specified attribute - 2 + { + name: "group by unused attribute - 2", + groupBy: []string{k5}, + expAttrs: add_events.EventAttrs{ + k1: v1, k2: v2, k3: v3, + }, + expSessionInfo: add_events.SessionInfo{ + add_events.AttrBundleKey: "d9b675aac82295ce36dad72278888308", + }, + }, + + // group by two attributes + { + name: "group by two attributes - 1", + groupBy: []string{k1, k2}, + expAttrs: add_events.EventAttrs{ + k3: v3, + }, + expSessionInfo: add_events.SessionInfo{ + add_events.AttrBundleKey: "4410d57b15f30fb22e92fc3e2338f288", + k1: v1, k2: v2, + }, + }, + + // group by two attributes - swapped + { + name: "group by two attributes - 2", + groupBy: []string{k2, k1}, + expAttrs: add_events.EventAttrs{ + k3: v3, + }, + expSessionInfo: add_events.SessionInfo{ + add_events.AttrBundleKey: "ce28b69e77b27f012501095cb343e2ae", + k1: v1, k2: v2, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(*testing.T) { + event := &add_events.Event{ + Thread: "5", + Sev: 3, + Ts: "0", + Attrs: map[string]interface{}{ + k1: v1, + k2: v2, + k3: v3, + }, + } + + eWM := NewEventWithMeta( + &add_events.EventBundle{Event: event}, + tt.groupBy, + "serverHost", + ) + + tt.expAttrs[add_events.AttrOrigServerHost] = "serverHost" + assert.Equal(t, tt.expAttrs, eWM.EventBundle.Event.Attrs) + assert.Equal(t, tt.expSessionInfo, eWM.SessionInfo) + }) } } @@ -357,30 +475,26 @@ func TestAddEventsLargeEvent(t *testing.T) { } } expectedLengths := map[string]int{ - add_events.AttrBundleKey: 32, - add_events.AttrOrigServerHost: 3, - "0": 990000, - "7": 995000, - "2": 999000, - "5": 999900, - "4": 1000000, - "3": 1000100, - "6": 241661, + "0": 990000, + "7": 995000, + "2": 999000, + "5": 999900, + "4": 1000000, + "3": 1000100, + "6": 241699, } expectedAttrs := map[string]interface{}{ - add_events.AttrBundleKey: "d41d8cd98f00b204e9800998ecf8427e", - add_events.AttrOrigServerHost: "foo", - "0": strings.Repeat("0", expectedLengths["0"]), - "7": strings.Repeat("7", expectedLengths["7"]), - "2": strings.Repeat("2", expectedLengths["2"]), - "5": strings.Repeat("5", expectedLengths["5"]), - "4": strings.Repeat("4", expectedLengths["4"]), - "3": strings.Repeat("3", expectedLengths["3"]), - "6": strings.Repeat("6", expectedLengths["6"]), + "0": strings.Repeat("0", expectedLengths["0"]), + "7": strings.Repeat("7", expectedLengths["7"]), + "2": strings.Repeat("2", expectedLengths["2"]), + "5": strings.Repeat("5", expectedLengths["5"]), + "4": strings.Repeat("4", expectedLengths["4"]), + "3": strings.Repeat("3", expectedLengths["3"]), + "6": strings.Repeat("6", expectedLengths["6"]), } - assert.Equal(t, wasLengths, expectedLengths) - assert.Equal(t, wasAttrs, expectedAttrs, wasAttrs) + assert.Equal(t, expectedLengths, wasLengths) + assert.Equal(t, expectedAttrs, wasAttrs, wasAttrs) wasSuccessful.Store(true) payload, err := json.Marshal(map[string]interface{}{ @@ -426,9 +540,9 @@ func TestAddEventsLargeEvent(t *testing.T) { assert.Equal(t, 1.0, stats.Buffers.SuccessRate()) assert.Equal(t, 1.0, stats.Transfer.SuccessRate()) assert.Equal(t, uint64(2), stats.Transfer.BuffersProcessed()) - assert.Equal(t, uint64(0x5f006d), stats.Transfer.BytesSent()) - assert.Equal(t, uint64(0x5f006d), stats.Transfer.BytesAccepted()) - assert.Equal(t, 3113014.5, stats.Transfer.AvgBufferBytes()) + assert.Equal(t, uint64(0x5f0073), stats.Transfer.BytesSent()) + assert.Equal(t, uint64(0x5f0073), stats.Transfer.BytesAccepted()) + assert.Equal(t, 3113017.5, stats.Transfer.AvgBufferBytes()) } func TestAddEventsLargeEventThatNeedEscaping(t *testing.T) { @@ -447,6 +561,8 @@ func TestAddEventsLargeEventThatNeedEscaping(t *testing.T) { assert.Equal(t, len(cer.Events), 1) wasAttrs := (cer.Events)[0].Attrs + wasSessionInfo := cer.SessionInfo + // if attributes were not modified, then we // should update test, so they are modified assert.NotEqual(t, wasAttrs, originalAttrs) @@ -458,24 +574,26 @@ func TestAddEventsLargeEventThatNeedEscaping(t *testing.T) { } } expectedLengths := map[string]int{ - add_events.AttrBundleKey: 32, - add_events.AttrOrigServerHost: 3, - "0": 990000, - "7": 995000, - "2": 999000, - "5": 6, + "0": 990000, + "7": 995000, + "2": 999000, + "5": 6, } - expectedAttrs := map[string]interface{}{ - add_events.AttrBundleKey: "d41d8cd98f00b204e9800998ecf8427e", + expectedAttrs := add_events.EventAttrs{ + "0": strings.Repeat("\"", expectedLengths["0"]), + "7": strings.Repeat("\"", expectedLengths["7"]), + "2": strings.Repeat("\"", expectedLengths["2"]), + "5": strings.Repeat("\"", expectedLengths["5"]), + } + expectedSessionInfo := add_events.SessionInfo{ add_events.AttrOrigServerHost: "foo", - "0": strings.Repeat("\"", expectedLengths["0"]), - "7": strings.Repeat("\"", expectedLengths["7"]), - "2": strings.Repeat("\"", expectedLengths["2"]), - "5": strings.Repeat("\"", expectedLengths["5"]), + add_events.AttrBundleKey: "b412b72ae4ad4a7c136ea140dc070d62", } - assert.Equal(t, wasLengths, expectedLengths) - assert.Equal(t, wasAttrs, expectedAttrs) + + assert.Equal(t, expectedLengths, wasLengths) + assert.Equal(t, expectedAttrs, wasAttrs) + assert.Equal(t, expectedSessionInfo, *wasSessionInfo) wasSuccessful.Store(true) payload, err := json.Marshal(map[string]interface{}{ @@ -760,7 +878,7 @@ func TestAddEventsServerHostLogic(t *testing.T) { name string events []tEvent groupBy []string - expCalls [][]tAttr + expCalls [][]tBundle }{ // when nothing is specified, there is just once call { @@ -773,10 +891,16 @@ func TestAddEventsServerHostLogic(t *testing.T) { attrs: tAttr{key: ev2Value}, }, }, - expCalls: [][]tAttr{ + expCalls: [][]tBundle{ { - {key: ev1Value, add_events.AttrOrigServerHost: configServerHost}, - {key: ev2Value, add_events.AttrOrigServerHost: configServerHost}, + { + attrs: tAttr{key: ev1Value}, + info: tInfo{add_events.AttrOrigServerHost: configServerHost}, + }, + { + attrs: tAttr{key: ev2Value}, + info: tInfo{add_events.AttrOrigServerHost: configServerHost}, + }, }, }, }, @@ -793,10 +917,17 @@ func TestAddEventsServerHostLogic(t *testing.T) { attrs: tAttr{key: ev2Value}, }, }, - expCalls: [][]tAttr{ + expCalls: [][]tBundle{ { - {key: ev1Value, add_events.AttrOrigServerHost: configServerHost}, - {key: ev2Value, add_events.AttrOrigServerHost: configServerHost}, + { + attrs: tAttr{key: ev1Value}, + info: tInfo{add_events.AttrOrigServerHost: configServerHost}, + }, + + { + attrs: tAttr{key: ev2Value}, + info: tInfo{add_events.AttrOrigServerHost: configServerHost}, + }, }, }, }, @@ -813,10 +944,18 @@ func TestAddEventsServerHostLogic(t *testing.T) { attrs: tAttr{key: ev2Value}, }, }, - expCalls: [][]tAttr{ + expCalls: [][]tBundle{ { - {key: ev1Value, add_events.AttrOrigServerHost: ev1ServerHost}, - {key: ev2Value, add_events.AttrOrigServerHost: configServerHost}, + { + attrs: tAttr{key: ev1Value}, + info: tInfo{add_events.AttrOrigServerHost: ev1ServerHost}, + }, + }, + { + { + attrs: tAttr{key: ev2Value}, + info: tInfo{add_events.AttrOrigServerHost: configServerHost}, + }, }, }, }, @@ -833,10 +972,18 @@ func TestAddEventsServerHostLogic(t *testing.T) { attrs: tAttr{key: ev2Value}, }, }, - expCalls: [][]tAttr{ + expCalls: [][]tBundle{ + { + { + attrs: tAttr{key: ev1Value}, + info: tInfo{add_events.AttrOrigServerHost: ev1ServerHost}, + }, + }, { - {key: ev1Value, add_events.AttrOrigServerHost: ev1ServerHost}, - {key: ev2Value, add_events.AttrOrigServerHost: configServerHost}, + { + attrs: tAttr{key: ev2Value}, + info: tInfo{add_events.AttrOrigServerHost: configServerHost}, + }, }, }, }, @@ -853,10 +1000,18 @@ func TestAddEventsServerHostLogic(t *testing.T) { attrs: tAttr{key: ev2Value}, }, }, - expCalls: [][]tAttr{ + expCalls: [][]tBundle{ + { + { + attrs: tAttr{key: ev1Value}, + info: tInfo{add_events.AttrOrigServerHost: ev3ServerHost}, + }, + }, { - {key: ev1Value, add_events.AttrOrigServerHost: ev3ServerHost}, - {key: ev2Value, add_events.AttrOrigServerHost: configServerHost}, + { + attrs: tAttr{key: ev2Value}, + info: tInfo{add_events.AttrOrigServerHost: configServerHost}, + }, }, }, }, @@ -874,12 +1029,18 @@ func TestAddEventsServerHostLogic(t *testing.T) { }, }, groupBy: []string{add_events.AttrServerHost}, - expCalls: [][]tAttr{ + expCalls: [][]tBundle{ { - {key: ev1Value, add_events.AttrOrigServerHost: ev3ServerHost}, + { + attrs: tAttr{key: ev1Value}, + info: tInfo{add_events.AttrOrigServerHost: ev3ServerHost}, + }, }, { - {key: ev2Value, add_events.AttrOrigServerHost: configServerHost}, + { + attrs: tAttr{key: ev2Value}, + info: tInfo{add_events.AttrOrigServerHost: configServerHost}, + }, }, }, }, @@ -910,13 +1071,36 @@ func TestAddEventsServerHostLogic(t *testing.T) { }, }, - expCalls: [][]tAttr{ + expCalls: [][]tBundle{ + { + { + attrs: tAttr{key: ev1Value}, + info: tInfo{add_events.AttrOrigServerHost: ev1ServerHost}, + }, + }, + { + { + attrs: tAttr{key: ev2Value}, + info: tInfo{add_events.AttrOrigServerHost: ev2ServerHost}, + }, + }, + { + { + attrs: tAttr{key: ev3Value}, + info: tInfo{add_events.AttrOrigServerHost: ev3ServerHost}, + }, + }, + { + { + attrs: tAttr{key: ev4Value}, + info: tInfo{add_events.AttrOrigServerHost: ev4ServerHost}, + }, + }, { - {key: ev1Value, add_events.AttrOrigServerHost: ev1ServerHost}, - {key: ev2Value, add_events.AttrOrigServerHost: ev2ServerHost}, - {key: ev3Value, add_events.AttrOrigServerHost: ev3ServerHost}, - {key: ev4Value, add_events.AttrOrigServerHost: ev4ServerHost}, - {key: ev5Value, add_events.AttrOrigServerHost: ev5ServerHost}, + { + attrs: tAttr{key: ev5Value}, + info: tInfo{add_events.AttrOrigServerHost: ev5ServerHost}, + }, }, }, }, @@ -947,21 +1131,36 @@ func TestAddEventsServerHostLogic(t *testing.T) { }, }, groupBy: []string{key}, - expCalls: [][]tAttr{ + expCalls: [][]tBundle{ { - {key: ev1Value, add_events.AttrOrigServerHost: ev1ServerHost}, + { + attrs: tAttr{}, + info: tInfo{key: ev1Value, add_events.AttrOrigServerHost: ev1ServerHost}, + }, }, { - {key: ev2Value, add_events.AttrOrigServerHost: ev2ServerHost}, + { + attrs: tAttr{}, + info: tInfo{key: ev2Value, add_events.AttrOrigServerHost: ev2ServerHost}, + }, }, { - {key: ev3Value, add_events.AttrOrigServerHost: ev3ServerHost}, + { + attrs: tAttr{}, + info: tInfo{key: ev3Value, add_events.AttrOrigServerHost: ev3ServerHost}, + }, }, { - {key: ev4Value, add_events.AttrOrigServerHost: ev4ServerHost}, + { + attrs: tAttr{}, + info: tInfo{key: ev4Value, add_events.AttrOrigServerHost: ev4ServerHost}, + }, }, { - {key: ev5Value, add_events.AttrOrigServerHost: ev5ServerHost}, + { + attrs: tAttr{}, + info: tInfo{key: ev5Value, add_events.AttrOrigServerHost: ev5ServerHost}, + }, }, }, }, @@ -979,16 +1178,19 @@ func TestAddEventsServerHostLogic(t *testing.T) { serverHost: ev3ServerHost, }, }, - expCalls: [][]tAttr{ + expCalls: [][]tBundle{ { - {key: ev1Value, add_events.AttrOrigServerHost: ev3ServerHost}, + { + attrs: tAttr{key: ev1Value}, + info: tInfo{add_events.AttrOrigServerHost: ev3ServerHost}, + }, }, }, }, // serverHost from the config wins { - name: "serverHost from event.serverHost wins", + name: "serverHost from config wins", events: []tEvent{ { attrs: tAttr{ @@ -998,28 +1200,31 @@ func TestAddEventsServerHostLogic(t *testing.T) { }, }, }, - expCalls: [][]tAttr{ + expCalls: [][]tBundle{ { - {key: ev1Value, add_events.AttrOrigServerHost: configServerHost}, + { + attrs: tAttr{key: ev1Value}, + info: tInfo{add_events.AttrOrigServerHost: configServerHost}, + }, }, }, }, } - extractAttrs := func(events []*add_events.Event) []tAttr { - attrs := make([]map[string]interface{}, 0) - for _, ev := range events { - delete(ev.Attrs, add_events.AttrBundleKey) - attrs = append(attrs, ev.Attrs) + extractBundles := func(req add_events.AddEventsRequest) []tBundle { + bundles := make([]tBundle, 0) + delete(*req.SessionInfo, add_events.AttrBundleKey) + for _, ev := range req.Events { + bundles = append(bundles, tBundle{attrs: ev.Attrs, info: *req.SessionInfo}) } - return attrs + return bundles } for _, tt := range tests { t.Run(tt.name, func(*testing.T) { numCalls := atomic.Int32{} lock := sync.Mutex{} - calls := make([][]tAttr, 0) + calls := make([][]tBundle, 0) server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { numCalls.Add(1) cer, err := extract(req) @@ -1027,7 +1232,7 @@ func TestAddEventsServerHostLogic(t *testing.T) { assert.Nil(t, err, "Error reading request: %v", err) lock.Lock() - calls = append(calls, extractAttrs(cer.Events)) + calls = append(calls, extractBundles(cer)) lock.Unlock() payload, err := json.Marshal(map[string]interface{}{ @@ -1084,6 +1289,127 @@ func TestAddEventsServerHostLogic(t *testing.T) { } } +func TestAddEventsGroupBy(t *testing.T) { + k1 := "k1" + k2 := "k2" + k3 := "k3" + k4 := "k4" + v1 := "v1" + v2 := "v2" + v3 := "v3" + + tests := []struct { + name string + groupBy []string + expAttrs add_events.EventAttrs + expSessionInfo add_events.SessionInfo + }{ + // when no grouping is used, then attributes are kept + { + name: "empty group by", + groupBy: []string{}, + expAttrs: add_events.EventAttrs{ + k1: v1, k2: v2, k3: v3, + }, + expSessionInfo: add_events.SessionInfo{ + add_events.AttrBundleKey: "ab1857c850d76129c895bc9d6ac48b7e", + }, + }, + + // group by not specified attribute + { + name: "group by unused attribute", + groupBy: []string{k4}, + expAttrs: add_events.EventAttrs{ + k1: v1, k2: v2, k3: v3, + }, + expSessionInfo: add_events.SessionInfo{ + add_events.AttrBundleKey: "17e9bed12fd281a419b1d600e1fcbf5f", + }, + }, + + // group by two attributes + { + name: "group by two attributes", + groupBy: []string{k1, k2}, + expAttrs: add_events.EventAttrs{ + k3: v3, + }, + expSessionInfo: add_events.SessionInfo{ + add_events.AttrBundleKey: "2b35b78fe48c09ba264d6c1759fee8c0", + k1: v1, k2: v2, + }, + }, + + // group by two attributes + { + name: "group by two attributes - swapped", + groupBy: []string{k2, k1}, + expAttrs: add_events.EventAttrs{ + k3: v3, + }, + expSessionInfo: add_events.SessionInfo{ + add_events.AttrBundleKey: "7368ff158ad3ed01a668701e1795e727", + k1: v1, k2: v2, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(*testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + cer, err := extract(req) + + assert.Nil(t, err, "Error reading request: %v", err) + + assert.Equal(t, tt.expAttrs, cer.Events[0].Attrs, tt.name) + tt.expSessionInfo[add_events.AttrOrigServerHost] = "serverHost" + assert.Equal(t, tt.expSessionInfo, *cer.SessionInfo, tt.name) + + payload, err := json.Marshal(map[string]interface{}{ + "status": "success", + "bytesCharged": 42, + }) + assert.NoError(t, err) + l, err := w.Write(payload) + assert.Greater(t, l, 1) + assert.NoError(t, err) + })) + defer server.Close() + + config := newDataSetConfig( + server.URL, + *newBufferSettings( + buffer_config.WithGroupBy(tt.groupBy), + buffer_config.WithRetryMaxElapsedTime(10*RetryBase), + buffer_config.WithRetryInitialInterval(RetryBase), + buffer_config.WithRetryMaxInterval(RetryBase), + ), + server_host_config.NewDefaultDataSetServerHostSettings(), + ) + sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()), nil) + require.Nil(t, err) + + bundles := []*add_events.EventBundle{ + { + Event: &add_events.Event{ + Thread: "5", + Sev: 3, + Ts: "1", + Attrs: add_events.EventAttrs{k1: v1, k2: v2, k3: v3}, + ServerHost: "serverHost", + }, + }, + } + + err = sc.AddEvents(bundles) + assert.Nil(t, err) + err = sc.Shutdown() + assert.Nil(t, err) + }) + } +} + func mockServerDefaultPayload(t *testing.T, statusCode int) *httptest.Server { payload, _ := json.Marshal(map[string]interface{}{ "status": "success", diff --git a/pkg/client/client.go b/pkg/client/client.go index a18ffe4..5ca8caf 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -122,7 +122,7 @@ func NewClient(cfg *config.DataSetConfig, client *http.Client, logger *zap.Logge // update group by, so that logs from the same host // belong to the same session - addServerHostIntoGroupBy(cfg) + adjustGroupByWithSpecialAttributes(cfg) serverHost, err := getServerHost(cfg.ServerHostSettings) if err != nil { @@ -227,13 +227,18 @@ func getServerHost(settings server_host_config.DataSetServerHostSettings) (strin return os.Hostname() } -// addServerHostIntoGroupBy adds attributes that indicate from which machine -// the logs are into the groupBy attribute, so that they are part of the same session -func addServerHostIntoGroupBy(cfg *config.DataSetConfig) { +// adjustGroupByWithSpecialAttributes adds attributes that have special meaning in the UI +// serverHost and logfile are used in the drop-down, so they have to be part of the +// SessionInfo. +func adjustGroupByWithSpecialAttributes(cfg *config.DataSetConfig) { groupBy := cfg.BufferSettings.GroupBy + if !slices.Contains(groupBy, add_events.AttrLogFile) { + groupBy = append(groupBy, add_events.AttrLogFile) + } if !slices.Contains(groupBy, add_events.AttrOrigServerHost) { groupBy = append(groupBy, add_events.AttrOrigServerHost) } + cfg.BufferSettings.GroupBy = groupBy }