diff --git a/internal/analytics/batched/data.go b/internal/analytics/batched/data.go index ddd58ed4d..09a271536 100644 --- a/internal/analytics/batched/data.go +++ b/internal/analytics/batched/data.go @@ -1,6 +1,17 @@ package batched -import "sync" +import ( + "sync" +) + +const ( + + // Segment limits the API calls to 32kB per request: https://segment.com/docs/connections/sources/catalog/libraries/server/http-api/ + // We save 2kB (2048 characters) for general metadata. The rest of 30kB we can spend for sending source event details. + // Average event details size is 300 characters. So in theory we could include 30*1024/300=102.4 events. + // As the plugin name and additional labels don't have fixed size, we limit the number of events to 75 to be on the safe side. + maxEventDetailsCount = 75 +) // Data is a struct that holds data for batched reporting type Data struct { @@ -47,11 +58,14 @@ func (d *Data) AddSourceEvent(in SourceEvent) { d.mutex.Lock() defer d.mutex.Unlock() + d.heartbeatProperties.EventsCount++ + key := in.PluginName sourceProps := d.heartbeatProperties.Sources[key] - sourceProps.Events = append(sourceProps.Events, in) - sourceProps.EventsCount = len(sourceProps.Events) - + sourceProps.EventsCount++ + if d.heartbeatProperties.EventsCount <= maxEventDetailsCount { + // save event details only if we didn't exceed the limit + sourceProps.Events = append(sourceProps.Events, in) + } d.heartbeatProperties.Sources[key] = sourceProps - d.heartbeatProperties.EventsCount++ } diff --git a/internal/analytics/batched/data_test.go b/internal/analytics/batched/data_test.go index 079d56504..485b2e1f5 100644 --- a/internal/analytics/batched/data_test.go +++ b/internal/analytics/batched/data_test.go @@ -96,3 +96,64 @@ func TestData(t *testing.T) { assert.Equal(t, 0, data.heartbeatProperties.EventsCount) assert.Len(t, data.heartbeatProperties.Sources, 0) } + +func TestData_EventDetailsLimit(t *testing.T) { + // given + data := NewData(1) + addEvent1Count := 50 + addEvent2Count := 70 + addEvent3Count := 30 + + totalCount := addEvent1Count + addEvent2Count + addEvent3Count + expectedKubernetesEventCount := addEvent1Count + addEvent3Count + expectedKubernetesEventDetailsLen := addEvent1Count + expectedArgoCDEventCount := addEvent2Count + + kubernetesPlugin := "botkube/kubernetes" + argoCDPlugin := "botkube/argocd" + + // when + for i := 0; i < addEvent1Count; i++ { + data.AddSourceEvent(SourceEvent{ + IntegrationType: config.BotIntegrationType, + Platform: config.DiscordCommPlatformIntegration, + PluginName: kubernetesPlugin, + AnonymizedEventFields: map[string]any{ + "foo": "bar", + }, + Success: true, + }) + } + + for i := 0; i < addEvent2Count; i++ { + data.AddSourceEvent(SourceEvent{ + IntegrationType: config.BotIntegrationType, + Platform: config.CloudSlackCommPlatformIntegration, + PluginName: argoCDPlugin, + AnonymizedEventFields: nil, + Success: true, + }) + } + + for i := 0; i < addEvent3Count; i++ { + data.AddSourceEvent(SourceEvent{ + IntegrationType: config.SinkIntegrationType, + Platform: config.ElasticsearchCommPlatformIntegration, + PluginName: kubernetesPlugin, + AnonymizedEventFields: map[string]any{ + "foo": "bar", + }, + Success: true, + }) + } + + // then + assert.Equal(t, totalCount, data.heartbeatProperties.EventsCount) + assert.Len(t, data.heartbeatProperties.Sources, 2) + + assert.Equal(t, expectedKubernetesEventCount, data.heartbeatProperties.Sources[kubernetesPlugin].EventsCount) + assert.Len(t, data.heartbeatProperties.Sources[kubernetesPlugin].Events, expectedKubernetesEventDetailsLen) + + assert.Equal(t, expectedArgoCDEventCount, data.heartbeatProperties.Sources[argoCDPlugin].EventsCount) + assert.Len(t, data.heartbeatProperties.Sources[argoCDPlugin].Events, maxEventDetailsCount-addEvent1Count) +} diff --git a/internal/analytics/segment_reporter.go b/internal/analytics/segment_reporter.go index b213ab7f6..2317999ee 100644 --- a/internal/analytics/segment_reporter.go +++ b/internal/analytics/segment_reporter.go @@ -200,8 +200,7 @@ func (r *SegmentReporter) Run(ctx context.Context) error { case <-ticker.C: err := r.reportHeartbeatEvent() if err != nil { - wrappedErr := fmt.Errorf("while reporting heartbeat event: %w", err) - r.log.Error(wrappedErr.Error()) + r.log.WithError(err).Error("Failed to report heartbeat event") r.batchedData.IncrementTimeWindowInHours() continue }