diff --git a/testbed/tests/log_test.go b/testbed/tests/log_test.go index f46e2abd04f2..b98c3493b22e 100644 --- a/testbed/tests/log_test.go +++ b/testbed/tests/log_test.go @@ -371,41 +371,66 @@ func TestLargeFileOnce(t *testing.T) { } func TestMemoryLimiterHit(t *testing.T) { - otlpreceiver := testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t)) - otlpreceiver.WithRetry(` + tests := []struct { + name string + sender func() testbed.DataSender + receiver func() testbed.DataReceiver + }{ + { + name: "otlp", + sender: func() testbed.DataSender { + return testbed.NewOTLPLogsDataSender(testbed.DefaultHost, testutil.GetAvailablePort(t)) + }, + }, + { + name: "filelog", + sender: func() testbed.DataSender { + return datasenders.NewFileLogWriter().WithRetry(` + retry_on_failure: + enabled: true +`) + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + otlpreceiver := testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t)) + otlpreceiver.WithRetry(` retry_on_failure: enabled: true max_interval: 5s `) - otlpreceiver.WithQueue(` + otlpreceiver.WithQueue(` sending_queue: - enabled: true - queue_size: 100000 - num_consumers: 20 + enabled: true + queue_size: 100000 + num_consumers: 20 `) - otlpreceiver.WithTimeout(` + otlpreceiver.WithTimeout(` timeout: 0s `) - processors := []ProcessorNameAndConfigBody{ - { - Name: "memory_limiter", - Body: ` + processors := []ProcessorNameAndConfigBody{ + { + Name: "memory_limiter", + Body: ` memory_limiter: check_interval: 1s limit_mib: 300 spike_limit_mib: 150 `, - }, + }, + } + ScenarioMemoryLimiterHit( + t, + test.sender(), + otlpreceiver, + testbed.LoadOptions{ + DataItemsPerSecond: 100000, + ItemsPerBatch: 1000, + Parallel: 1, + MaxDelay: 20 * time.Second, + }, + performanceResultsSummary, 100, processors) + }) } - ScenarioMemoryLimiterHit( - t, - testbed.NewOTLPLogsDataSender(testbed.DefaultHost, testutil.GetAvailablePort(t)), - otlpreceiver, - testbed.LoadOptions{ - DataItemsPerSecond: 100000, - ItemsPerBatch: 1000, - Parallel: 1, - MaxDelay: 20 * time.Second, - }, - performanceResultsSummary, 100, processors) } diff --git a/testbed/tests/scenarios.go b/testbed/tests/scenarios.go index cdb897c3b260..a5aa6fc84e96 100644 --- a/testbed/tests/scenarios.go +++ b/testbed/tests/scenarios.go @@ -11,6 +11,7 @@ import ( "math/rand" "path" "path/filepath" + "regexp" "testing" "time" @@ -22,7 +23,11 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed" ) -var performanceResultsSummary testbed.TestResultsSummary = &testbed.PerformanceResults{} +var ( + batchRegex = regexp.MustCompile(` batch_index=(\S+) `) + itemRegex = regexp.MustCompile(` item_index=(\S+) `) + performanceResultsSummary testbed.TestResultsSummary = &testbed.PerformanceResults{} +) type ProcessorNameAndConfigBody struct { Name string @@ -658,9 +663,8 @@ func getLogsID(logToRetry []plog.Logs) []string { logRecord := logElement.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords() for index := 0; index < logRecord.Len(); index++ { logObj := logRecord.At(index) - itemIndex, _ := logObj.Attributes().Get("item_index") - batchIndex, _ := logObj.Attributes().Get("batch_index") - result = append(result, fmt.Sprintf("%s%s", batchIndex.AsString(), itemIndex.AsString())) + itemIndex, batchIndex := extractIDFromLog(logObj) + result = append(result, fmt.Sprintf("%s%s", batchIndex, itemIndex)) } } return result @@ -684,3 +688,25 @@ func allElementsExistInSlice(slice1, slice2 []string) bool { return true } + +// in case of filelog receiver, the batch_index and item_index are a part of log body. +// we use regex to extract them +func extractIDFromLog(log plog.LogRecord) (string, string) { + var batch, item string + match := batchRegex.FindStringSubmatch(log.Body().AsString()) + if len(match) == 2 { + batch = match[0] + } + match = itemRegex.FindStringSubmatch(log.Body().AsString()) + if len(match) == 2 { + batch = match[0] + } + // in case of otlp receiver, batch_index and item_index are part of attributes. + if batchIndex, ok := log.Attributes().Get("batch_index"); ok { + batch = batchIndex.AsString() + } + if itemIndex, ok := log.Attributes().Get("item_index"); ok { + item = itemIndex.AsString() + } + return batch, item +}