Skip to content

Commit

Permalink
[testbed] Passing processor configs in a struct slice to preserve ord…
Browse files Browse the repository at this point in the history
…er (#35064)

**Description:** <Describe what has changed.>
Previously, the processor config was passed as a map, because of which
preserving the processor was not possible.
Changed the data type to a newly added struct slice so that the
processors are used in the supplied order.

**NOTE** that this change is breaking - if any of these changed
functions are directly used, they will break as existing usages supply
map.

**Link to tracking Issue:** <Issue number if applicable>
Resolves
#33003
  • Loading branch information
swamisriman authored Oct 28, 2024
1 parent f3029ea commit d66daa3
Show file tree
Hide file tree
Showing 10 changed files with 131 additions and 51 deletions.
29 changes: 29 additions & 0 deletions .chloggen/testbed_proc_order_fix.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: testbed

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "`scenarios.createConfigYaml()` and `utils.CreateConfigYaml()` functions now take processor configs as a struct slice argument instead of `map[string]string`."

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33003]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
- This is to preserve processor order. `ProcessorNameAndConfigBody` is the newly created struct.
# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
11 changes: 6 additions & 5 deletions testbed/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,14 @@ Generally, when designing a test for new exporter and receiver components, devel
},
...
}

processors := map[string]string{
"batch": `
batch:
processors := []ProcessorNameAndConfigBody{
{
Name: "batch",
Body: `
batch:
`,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
Scenario10kItemsPerSecond(
Expand Down
9 changes: 6 additions & 3 deletions testbed/correctnesstests/connectors/correctness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ func TestMain(m *testing.M) {
}

func TestGoldenData(t *testing.T) {
processors := map[string]string{
"batch": `
processors := []correctnesstests.ProcessorNameAndConfigBody{
{
Name: "batch",
Body: `
batch:
send_batch_size: 1024
`,
},
}
sampleTest := correctnesstests.PipelineDef{
TestName: "test routing",
Expand All @@ -49,7 +52,7 @@ func testWithGoldenDataset(
receiver testbed.DataReceiver,
resourceSpec testbed.ResourceSpec,
connector testbed.DataConnector,
processors map[string]string,
processors []correctnesstests.ProcessorNameAndConfigBody,
) {
dataProvider := testbed.NewGoldenDataProvider(
"../../../internal/coreinternal/goldendataset/testdata/generated_pict_pairs_traces.txt",
Expand Down
9 changes: 6 additions & 3 deletions testbed/correctnesstests/traces/correctness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@ func TestMain(m *testing.M) {
func TestTracingGoldenData(t *testing.T) {
tests, err := correctnesstests.LoadPictOutputPipelineDefs("testdata/generated_pict_pairs_traces_pipeline.txt")
require.NoError(t, err)
processors := map[string]string{
"batch": `
processors := []correctnesstests.ProcessorNameAndConfigBody{
{
Name: "batch",
Body: `
batch:
send_batch_size: 1024
`,
},
}
for _, test := range tests {
test.TestName = fmt.Sprintf("%s-%s", test.Receiver, test.Exporter)
Expand All @@ -46,7 +49,7 @@ func testWithTracingGoldenDataset(
sender testbed.DataSender,
receiver testbed.DataReceiver,
resourceSpec testbed.ResourceSpec,
processors map[string]string,
processors []correctnesstests.ProcessorNameAndConfigBody,
) {
dataProvider := testbed.NewGoldenDataProvider(
"../../../internal/coreinternal/goldendataset/testdata/generated_pict_pairs_traces.txt",
Expand Down
13 changes: 9 additions & 4 deletions testbed/correctnesstests/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed"
)

type ProcessorNameAndConfigBody struct {
Name string
Body string
}

// CreateConfigYaml creates a yaml config for an otel collector given a testbed sender, testbed receiver, any
// processors, and a pipeline type. A collector created from the resulting yaml string should be able to talk
// the specified sender and receiver.
Expand All @@ -26,7 +31,7 @@ func CreateConfigYaml(
sender testbed.DataSender,
receiver testbed.DataReceiver,
connector testbed.DataConnector,
processors map[string]string,
processors []ProcessorNameAndConfigBody,
) string {

// Prepare extra processor config section and comma-separated list of extra processor
Expand All @@ -35,12 +40,12 @@ func CreateConfigYaml(
processorsList := ""
if len(processors) > 0 {
first := true
for name, cfg := range processors {
processorsSections += cfg + "\n"
for i := range processors {
processorsSections += processors[i].Body + "\n"
if !first {
processorsList += ","
}
processorsList += name
processorsList += processors[i].Name
first = false
}
}
Expand Down
7 changes: 5 additions & 2 deletions testbed/stabilitytests/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@ import (
var (
contribPerfResultsSummary = &testbed.PerformanceResults{}
resourceCheckPeriod, _ = time.ParseDuration("1m")
processorsConfig = map[string]string{
"batch": `
processorsConfig = []scenarios.ProcessorNameAndConfigBody{
{
Name: "batch",
Body: `
batch:
`,
},
}
)

Expand Down
24 changes: 17 additions & 7 deletions testbed/tests/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,13 @@ func TestLog10kDPS(t *testing.T) {
},
}

processors := map[string]string{
"batch": `
processors := []ProcessorNameAndConfigBody{
{
Name: "batch",
Body: `
batch:
`,
},
}

for _, test := range tests {
Expand Down Expand Up @@ -282,10 +285,14 @@ func TestLogLargeFiles(t *testing.T) {
sleepSeconds: 200,
},
}
processors := map[string]string{
"batch": `
processors := []ProcessorNameAndConfigBody{
{
Name: "batch",
Body: `
batch:
`}
`,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ScenarioLong(
Expand All @@ -302,10 +309,13 @@ func TestLogLargeFiles(t *testing.T) {
}

func TestLargeFileOnce(t *testing.T) {
processors := map[string]string{
"batch": `
processors := []ProcessorNameAndConfigBody{
{
Name: "batch",
Body: `
batch:
`,
},
}
resultDir, err := filepath.Abs(path.Join("results", t.Name()))
require.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions testbed/tests/resource_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ func TestMetricResourceProcessor(t *testing.T) {
require.NoError(t, err)

agentProc := testbed.NewChildProcessCollector(testbed.WithEnvVar("GOMAXPROCS", "2"))
processors := map[string]string{
"resource": test.resourceProcessorConfig,
processors := []ProcessorNameAndConfigBody{
{Name: "resource", Body: test.resourceProcessorConfig},
}
configStr := createConfigYaml(t, sender, receiver, resultDir, processors, nil)
configCleanup, err := agentProc.PrepareConfig(configStr)
Expand Down
29 changes: 17 additions & 12 deletions testbed/tests/scenarios.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ var (
performanceResultsSummary testbed.TestResultsSummary = &testbed.PerformanceResults{}
)

type ProcessorNameAndConfigBody struct {
Name string
Body string
}

// createConfigYaml creates a collector config file that corresponds to the
// sender and receiver used in the test and returns the config file name.
// Map of processor names to their configs. Config is in YAML and must be
Expand All @@ -36,7 +41,7 @@ func createConfigYaml(
sender testbed.DataSender,
receiver testbed.DataReceiver,
resultDir string,
processors map[string]string,
processors []ProcessorNameAndConfigBody,
extensions map[string]string,
) string {

Expand All @@ -51,12 +56,12 @@ func createConfigYaml(
processorsList := ""
if len(processors) > 0 {
first := true
for name, cfg := range processors {
processorsSections += cfg + "\n"
for i := range processors {
processorsSections += processors[i].Body + "\n"
if !first {
processorsList += ","
}
processorsList += name
processorsList += processors[i].Name
first = false
}
}
Expand Down Expand Up @@ -133,7 +138,7 @@ func Scenario10kItemsPerSecond(
receiver testbed.DataReceiver,
resourceSpec testbed.ResourceSpec,
resultsSummary testbed.TestResultsSummary,
processors map[string]string,
processors []ProcessorNameAndConfigBody,
extensions map[string]string,
) {
resultDir, err := filepath.Abs(path.Join("results", t.Name()))
Expand Down Expand Up @@ -191,7 +196,7 @@ func Scenario10kItemsPerSecondAlternateBackend(
backend testbed.DataReceiver,
resourceSpec testbed.ResourceSpec,
resultsSummary testbed.TestResultsSummary,
processors map[string]string,
processors []ProcessorNameAndConfigBody,
extensions map[string]string,
) {
resultDir, err := filepath.Abs(path.Join("results", t.Name()))
Expand Down Expand Up @@ -261,7 +266,7 @@ func genRandByteString(length int) string {

// Scenario1kSPSWithAttrs runs a performance test at 1k sps with specified span attributes
// and test options.
func Scenario1kSPSWithAttrs(t *testing.T, args []string, tests []TestCase, processors map[string]string, extensions map[string]string) {
func Scenario1kSPSWithAttrs(t *testing.T, args []string, tests []TestCase, processors []ProcessorNameAndConfigBody, extensions map[string]string) {
for i := range tests {
test := tests[i]

Expand Down Expand Up @@ -317,8 +322,8 @@ func Scenario1kSPSWithAttrs(t *testing.T, args []string, tests []TestCase, proce
// Defines RAM usage range for defined processor type.
type processorConfig struct {
Name string
// map of processor types to their config YAML to use.
Processor map[string]string
// slice of processor structs with their names and config YAML to use.
Processor []ProcessorNameAndConfigBody
ExpectedMaxRAM uint32
ExpectedMinFinalRAM uint32
}
Expand Down Expand Up @@ -374,7 +379,7 @@ func ScenarioSendingQueuesFull(
resourceSpec testbed.ResourceSpec,
sleepTime int,
resultsSummary testbed.TestResultsSummary,
processors map[string]string,
processors []ProcessorNameAndConfigBody,
extensions map[string]string,
) {
resultDir, err := filepath.Abs(path.Join("results", t.Name()))
Expand Down Expand Up @@ -456,7 +461,7 @@ func ScenarioSendingQueuesNotFull(
resourceSpec testbed.ResourceSpec,
sleepTime int,
resultsSummary testbed.TestResultsSummary,
processors map[string]string,
processors []ProcessorNameAndConfigBody,
extensions map[string]string,
) {
resultDir, err := filepath.Abs(path.Join("results", t.Name()))
Expand Down Expand Up @@ -508,7 +513,7 @@ func ScenarioLong(
loadOptions testbed.LoadOptions,
resultsSummary testbed.TestResultsSummary,
sleepTime int,
processors map[string]string,
processors []ProcessorNameAndConfigBody,
) {
resultDir, err := filepath.Abs(path.Join("results", t.Name()))
require.NoError(t, err)
Expand Down
Loading

0 comments on commit d66daa3

Please sign in to comment.