Skip to content

Commit

Permalink
[chore][testbed] add filelog as a part of test cases (#36603)
Browse files Browse the repository at this point in the history
Recently, we added memory limiter test cases for OTLP. 
This PR updates the cases to work with filelog sender as well.

---------

Co-authored-by: Christos Markou <[email protected]>
  • Loading branch information
VihasMakwana and ChrsMark authored Dec 18, 2024
1 parent 2f0339a commit 8959710
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 27 deletions.
71 changes: 48 additions & 23 deletions testbed/tests/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,41 +371,66 @@ func TestLargeFileOnce(t *testing.T) {
}

func TestMemoryLimiterHit(t *testing.T) {
otlpreceiver := testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t))
otlpreceiver.WithRetry(`
tests := []struct {
name string
sender func() testbed.DataSender
receiver func() testbed.DataReceiver
}{
{
name: "otlp",
sender: func() testbed.DataSender {
return testbed.NewOTLPLogsDataSender(testbed.DefaultHost, testutil.GetAvailablePort(t))
},
},
{
name: "filelog",
sender: func() testbed.DataSender {
return datasenders.NewFileLogWriter().WithRetry(`
retry_on_failure:
enabled: true
`)
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
otlpreceiver := testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t))
otlpreceiver.WithRetry(`
retry_on_failure:
enabled: true
max_interval: 5s
`)
otlpreceiver.WithQueue(`
otlpreceiver.WithQueue(`
sending_queue:
enabled: true
queue_size: 100000
num_consumers: 20
enabled: true
queue_size: 100000
num_consumers: 20
`)
otlpreceiver.WithTimeout(`
otlpreceiver.WithTimeout(`
timeout: 0s
`)
processors := []ProcessorNameAndConfigBody{
{
Name: "memory_limiter",
Body: `
processors := []ProcessorNameAndConfigBody{
{
Name: "memory_limiter",
Body: `
memory_limiter:
check_interval: 1s
limit_mib: 300
spike_limit_mib: 150
`,
},
},
}
ScenarioMemoryLimiterHit(
t,
test.sender(),
otlpreceiver,
testbed.LoadOptions{
DataItemsPerSecond: 100000,
ItemsPerBatch: 1000,
Parallel: 1,
MaxDelay: 20 * time.Second,
},
performanceResultsSummary, 100, processors)
})
}
ScenarioMemoryLimiterHit(
t,
testbed.NewOTLPLogsDataSender(testbed.DefaultHost, testutil.GetAvailablePort(t)),
otlpreceiver,
testbed.LoadOptions{
DataItemsPerSecond: 100000,
ItemsPerBatch: 1000,
Parallel: 1,
MaxDelay: 20 * time.Second,
},
performanceResultsSummary, 100, processors)
}
34 changes: 30 additions & 4 deletions testbed/tests/scenarios.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"math/rand"
"path"
"path/filepath"
"regexp"
"testing"
"time"

Expand All @@ -22,7 +23,11 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed"
)

var performanceResultsSummary testbed.TestResultsSummary = &testbed.PerformanceResults{}
var (
batchRegex = regexp.MustCompile(` batch_index=(\S+) `)
itemRegex = regexp.MustCompile(` item_index=(\S+) `)
performanceResultsSummary testbed.TestResultsSummary = &testbed.PerformanceResults{}
)

type ProcessorNameAndConfigBody struct {
Name string
Expand Down Expand Up @@ -658,9 +663,8 @@ func getLogsID(logToRetry []plog.Logs) []string {
logRecord := logElement.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords()
for index := 0; index < logRecord.Len(); index++ {
logObj := logRecord.At(index)
itemIndex, _ := logObj.Attributes().Get("item_index")
batchIndex, _ := logObj.Attributes().Get("batch_index")
result = append(result, fmt.Sprintf("%s%s", batchIndex.AsString(), itemIndex.AsString()))
itemIndex, batchIndex := extractIDFromLog(logObj)
result = append(result, fmt.Sprintf("%s%s", batchIndex, itemIndex))
}
}
return result
Expand All @@ -684,3 +688,25 @@ func allElementsExistInSlice(slice1, slice2 []string) bool {

return true
}

// in case of filelog receiver, the batch_index and item_index are a part of log body.
// we use regex to extract them
func extractIDFromLog(log plog.LogRecord) (string, string) {
var batch, item string
match := batchRegex.FindStringSubmatch(log.Body().AsString())
if len(match) == 2 {
batch = match[0]
}
match = itemRegex.FindStringSubmatch(log.Body().AsString())
if len(match) == 2 {
batch = match[0]
}
// in case of otlp receiver, batch_index and item_index are part of attributes.
if batchIndex, ok := log.Attributes().Get("batch_index"); ok {
batch = batchIndex.AsString()
}
if itemIndex, ok := log.Attributes().Get("item_index"); ok {
item = itemIndex.AsString()
}
return batch, item
}

0 comments on commit 8959710

Please sign in to comment.