Skip to content

Commit

Permalink
Limit event details
Browse files Browse the repository at this point in the history
  • Loading branch information
pkosiec committed Nov 17, 2023
1 parent bb8eafe commit c685f26
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 7 deletions.
24 changes: 19 additions & 5 deletions internal/analytics/batched/data.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
package batched

import "sync"
import (
"sync"
)

const (

// Segment limits the API calls to 32kB per request: https://segment.com/docs/connections/sources/catalog/libraries/server/http-api/
// We save 2kB (2048 characters) for general metadata. The rest of 30kB we can spend for sending source event details.
// Average event details size is 300 characters. So in theory we could include 30*1024/300=102.4 events.
// As the plugin name and additional labels don't have fixed size, we limit the number of events to 75 to be on the safe side.
maxEventDetailsCount = 75
)

// Data is a struct that holds data for batched reporting
type Data struct {
Expand Down Expand Up @@ -47,11 +58,14 @@ func (d *Data) AddSourceEvent(in SourceEvent) {
d.mutex.Lock()
defer d.mutex.Unlock()

d.heartbeatProperties.EventsCount++

key := in.PluginName
sourceProps := d.heartbeatProperties.Sources[key]
sourceProps.Events = append(sourceProps.Events, in)
sourceProps.EventsCount = len(sourceProps.Events)

sourceProps.EventsCount++
if d.heartbeatProperties.EventsCount <= maxEventDetailsCount {
// save event details only if we didn't exceed the limit
sourceProps.Events = append(sourceProps.Events, in)
}
d.heartbeatProperties.Sources[key] = sourceProps
d.heartbeatProperties.EventsCount++
}
61 changes: 61 additions & 0 deletions internal/analytics/batched/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,64 @@ func TestData(t *testing.T) {
assert.Equal(t, 0, data.heartbeatProperties.EventsCount)
assert.Len(t, data.heartbeatProperties.Sources, 0)
}

func TestData_EventDetailsLimit(t *testing.T) {
// given
data := NewData(1)
addEvent1Count := 50
addEvent2Count := 70
addEvent3Count := 30

totalCount := addEvent1Count + addEvent2Count + addEvent3Count
expectedKubernetesEventCount := addEvent1Count + addEvent3Count
expectedKubernetesEventDetailsLen := addEvent1Count
expectedArgoCDEventCount := addEvent2Count

kubernetesPlugin := "botkube/kubernetes"
argoCDPlugin := "botkube/argocd"

// when
for i := 0; i < addEvent1Count; i++ {
data.AddSourceEvent(SourceEvent{
IntegrationType: config.BotIntegrationType,
Platform: config.DiscordCommPlatformIntegration,
PluginName: kubernetesPlugin,
AnonymizedEventFields: map[string]any{
"foo": "bar",
},
Success: true,
})
}

for i := 0; i < addEvent2Count; i++ {
data.AddSourceEvent(SourceEvent{
IntegrationType: config.BotIntegrationType,
Platform: config.CloudSlackCommPlatformIntegration,
PluginName: argoCDPlugin,
AnonymizedEventFields: nil,
Success: true,
})
}

for i := 0; i < addEvent3Count; i++ {
data.AddSourceEvent(SourceEvent{
IntegrationType: config.SinkIntegrationType,
Platform: config.ElasticsearchCommPlatformIntegration,
PluginName: kubernetesPlugin,
AnonymizedEventFields: map[string]any{
"foo": "bar",
},
Success: true,
})
}

// then
assert.Equal(t, totalCount, data.heartbeatProperties.EventsCount)
assert.Len(t, data.heartbeatProperties.Sources, 2)

assert.Equal(t, expectedKubernetesEventCount, data.heartbeatProperties.Sources[kubernetesPlugin].EventsCount)
assert.Len(t, data.heartbeatProperties.Sources[kubernetesPlugin].Events, expectedKubernetesEventDetailsLen)

assert.Equal(t, expectedArgoCDEventCount, data.heartbeatProperties.Sources[argoCDPlugin].EventsCount)
assert.Len(t, data.heartbeatProperties.Sources[argoCDPlugin].Events, maxEventDetailsCount-addEvent1Count)
}
3 changes: 1 addition & 2 deletions internal/analytics/segment_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,7 @@ func (r *SegmentReporter) Run(ctx context.Context) error {
case <-ticker.C:
err := r.reportHeartbeatEvent()
if err != nil {
wrappedErr := fmt.Errorf("while reporting heartbeat event: %w", err)
r.log.Error(wrappedErr.Error())
r.log.WithError(err).Error("Failed to report heartbeat event")
r.batchedData.IncrementTimeWindowInHours()
continue
}
Expand Down

0 comments on commit c685f26

Please sign in to comment.