diff --git a/.chloggen/fix-router-random-statements.yaml b/.chloggen/fix-router-random-statements.yaml new file mode 100644 index 000000000000..57b8d4a4be24 --- /dev/null +++ b/.chloggen/fix-router-random-statements.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: routingprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix OTTL statement not eval in order + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [34860] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/processor/routingprocessor/logs.go b/processor/routingprocessor/logs.go index 2a613e1cfdaa..686b4e6cc28e 100644 --- a/processor/routingprocessor/logs.go +++ b/processor/routingprocessor/logs.go @@ -117,21 +117,21 @@ func (p *logProcessor) route(ctx context.Context, l plog.Logs) error { ) matchCount := len(p.router.routes) - for key, route := range p.router.routes { + for _, route := range p.router.routes { _, isMatch, err := route.statement.Execute(ctx, ltx) if err != nil { if p.config.ErrorMode == ottl.PropagateError { return err } p.group("", groups, p.router.defaultExporters, rlogs) - p.recordNonRoutedResourceLogs(ctx, key, rlogs) + p.recordNonRoutedResourceLogs(ctx, route.key, rlogs) continue } if !isMatch { matchCount-- continue } - p.group(key, groups, route.exporters, rlogs) + p.group(route.key, groups, route.exporters, rlogs) } if matchCount == 0 { @@ -152,14 +152,14 @@ func (p *logProcessor) group( key string, groups map[string]logsGroup, exporters []exporter.Logs, - spans plog.ResourceLogs, + logs plog.ResourceLogs, ) { group, ok := groups[key] if !ok { group.logs = plog.NewLogs() group.exporters = exporters } - spans.CopyTo(group.logs.ResourceLogs().AppendEmpty()) + logs.CopyTo(group.logs.ResourceLogs().AppendEmpty()) groups[key] = group } diff --git a/processor/routingprocessor/logs_test.go b/processor/routingprocessor/logs_test.go index 45145ac99bb6..6bc8810cb5b4 100644 --- a/processor/routingprocessor/logs_test.go +++ b/processor/routingprocessor/logs_test.go @@ -5,6 +5,7 @@ package routingprocessor import ( "context" + "fmt" "testing" "github.com/stretchr/testify/assert" @@ -128,6 +129,63 @@ func TestLogs_RoutingWorks_Context(t *testing.T) { }) } +func TestLogs_RoutingWorks_Context_Ordered(t *testing.T) { + defaultExp := &mockLogsExporter{} + lExpFirst := &mockLogsExporter{} + lExpSecond := &mockLogsExporter{} + lExpThird := &mockLogsExporter{} + + host := newMockHost(map[pipeline.Signal]map[component.ID]component.Component{ + pipeline.SignalLogs: { + component.MustNewID("otlp"): defaultExp, + component.MustNewIDWithName("otlp", "first"): lExpFirst, + component.MustNewIDWithName("otlp", "second"): lExpSecond, + component.MustNewIDWithName("otlp", "third"): lExpThird, + }, + }) + + exp, err := newLogProcessor(noopTelemetrySettings, &Config{ + FromAttribute: "X-Tenant", + AttributeSource: contextAttributeSource, + DefaultExporters: []string{"otlp"}, + Table: []RoutingTableItem{ + { + Value: "order-second", + Exporters: []string{"otlp/second"}, + }, + { + Value: "order-first", + Exporters: []string{"otlp/first"}, + }, + { + Value: "order-third", + Exporters: []string{"otlp/third"}, + }, + }, + }) + require.NoError(t, err) + require.NoError(t, exp.Start(context.Background(), host)) + + for i := 1; i <= 5; i++ { + t.Run(fmt.Sprintf("run %d time", i), func(t *testing.T) { + l := plog.NewLogs() + ll := l.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords() + ll.AppendEmpty().Body().SetStr("this is a log") + + assert.NoError(t, exp.ConsumeLogs( + metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{ + "X-Tenant": "order-third", + })), + l, + )) + assert.Empty(t, defaultExp.AllLogs()) + assert.Empty(t, lExpFirst.AllLogs()) + assert.Empty(t, lExpSecond.AllLogs()) + assert.Len(t, lExpThird.AllLogs(), i, "log should only be routed to lExpThird exporter") + }) + } +} + func TestLogs_RoutingWorks_ResourceAttribute(t *testing.T) { defaultExp := &mockLogsExporter{} lExp := &mockLogsExporter{} @@ -183,6 +241,59 @@ func TestLogs_RoutingWorks_ResourceAttribute(t *testing.T) { }) } +func TestLogs_RoutingWorks_ResourceAttribute_Ordered(t *testing.T) { + defaultExp := &mockLogsExporter{} + lExpFirst := &mockLogsExporter{} + lExpSecond := &mockLogsExporter{} + lExpThird := &mockLogsExporter{} + + host := newMockHost(map[pipeline.Signal]map[component.ID]component.Component{ + pipeline.SignalLogs: { + component.MustNewID("otlp"): defaultExp, + component.MustNewIDWithName("otlp", "first"): lExpFirst, + component.MustNewIDWithName("otlp", "second"): lExpSecond, + component.MustNewIDWithName("otlp", "third"): lExpThird, + }, + }) + + exp, err := newLogProcessor(noopTelemetrySettings, &Config{ + FromAttribute: "X-Tenant", + AttributeSource: resourceAttributeSource, + DefaultExporters: []string{"otlp"}, + Table: []RoutingTableItem{ + { + Value: "order-second", + Exporters: []string{"otlp/second"}, + }, + { + Value: "order-first", + Exporters: []string{"otlp/first"}, + }, + { + Value: "order-third", + Exporters: []string{"otlp/third"}, + }, + }, + }) + require.NoError(t, err) + require.NoError(t, exp.Start(context.Background(), host)) + + for i := 1; i <= 5; i++ { + t.Run(fmt.Sprintf("run %d time", i), func(t *testing.T) { + l := plog.NewLogs() + rl := l.ResourceLogs().AppendEmpty() + rl.Resource().Attributes().PutStr("X-Tenant", "order-third") + rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("this is a log") + + assert.NoError(t, exp.ConsumeLogs(context.Background(), l)) + assert.Empty(t, defaultExp.AllLogs()) + assert.Empty(t, lExpFirst.AllLogs()) + assert.Empty(t, lExpSecond.AllLogs()) + assert.Len(t, lExpThird.AllLogs(), i, "log should only be routed to lExpThird exporter") + }) + } +} + func TestLogs_RoutingWorks_ResourceAttribute_DropsRoutingAttribute(t *testing.T) { defaultExp := &mockLogsExporter{} lExp := &mockLogsExporter{} @@ -401,6 +512,117 @@ func TestLogsAreCorrectlySplitPerResourceAttributeWithOTTL(t *testing.T) { }) } +func TestLogs_RoutingWorks_ResourceAttribute_WithOTTL_Ordered(t *testing.T) { + defaultExp := &mockLogsExporter{} + lExpFirst := &mockLogsExporter{} + lExpSecond := &mockLogsExporter{} + + host := newMockHost(map[pipeline.Signal]map[component.ID]component.Component{ + pipeline.SignalLogs: { + component.MustNewID("otlp"): defaultExp, + component.MustNewIDWithName("otlp", "first"): lExpFirst, + component.MustNewIDWithName("otlp", "second"): lExpSecond, + }, + }) + + exp, err := newLogProcessor(noopTelemetrySettings, &Config{ + FromAttribute: "__otel_enabled__", + AttributeSource: resourceAttributeSource, + DefaultExporters: []string{"otlp"}, + Table: []RoutingTableItem{ + { + Statement: `route() where resource.attributes["non-matching"] != nil`, + Exporters: []string{"otlp/first"}, + }, + { + Statement: `route() where resource.attributes["non-matching"] == "true"`, + Exporters: []string{"otlp/first"}, + }, + { + Statement: `route() where resource.attributes["matching"] == "true"`, + Exporters: []string{"otlp/second"}, + }, + }, + }) + require.NoError(t, err) + require.NoError(t, exp.Start(context.Background(), host)) + + for i := 1; i <= 5; i++ { + t.Run(fmt.Sprintf("run %d time", i), func(t *testing.T) { + l := plog.NewLogs() + rl := l.ResourceLogs().AppendEmpty() + rl.Resource().Attributes().PutStr("matching", "true") + rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("this is a log") + + assert.NoError(t, exp.ConsumeLogs(context.Background(), l)) + + assert.Empty(t, defaultExp.AllLogs()) + assert.Empty(t, lExpFirst.AllLogs()) + + // we expect Statement eval in order and log should only be routed to lExpSecond exporter + assert.Len(t, lExpSecond.AllLogs(), i, "log should only be routed to lExpSecond exporter") + }) + } +} + +/* +# before +BenchmarkLogs_Routing +BenchmarkLogs_Routing-8 18951 63102 ns/op + +# after +BenchmarkLogs_Routing +BenchmarkLogs_Routing-8 21171 57387 ns/op +*/ +func BenchmarkLogs_Routing(t *testing.B) { + defaultExp := &mockLogsExporter{} + lExpFirst := &mockLogsExporter{} + lExpSecond := &mockLogsExporter{} + + host := newMockHost(map[pipeline.Signal]map[component.ID]component.Component{ + pipeline.SignalLogs: { + component.MustNewID("otlp"): defaultExp, + component.MustNewIDWithName("otlp", "first"): lExpFirst, + component.MustNewIDWithName("otlp", "second"): lExpSecond, + }, + }) + + exp, err := newLogProcessor(noopTelemetrySettings, &Config{ + FromAttribute: "__otel_enabled__", + AttributeSource: resourceAttributeSource, + DefaultExporters: []string{"otlp"}, + Table: []RoutingTableItem{ + { + Statement: `route() where resource.attributes["non-matching"] != nil`, + Exporters: []string{"otlp/first"}, + }, + { + Statement: `route() where resource.attributes["non-matching"] == "true"`, + Exporters: []string{"otlp/first"}, + }, + { + Statement: `route() where resource.attributes["matching"] == "true"`, + Exporters: []string{"otlp/second"}, + }, + }, + }) + require.NoError(t, err) + require.NoError(t, exp.Start(context.Background(), host)) + + mockLogs := plog.NewLogs() + for i := 0; i < 100; i++ { + rl := mockLogs.ResourceLogs().AppendEmpty() + rl.Resource().Attributes().PutStr("matching", "true") + } + + for i := 0; i < t.N; i++ { + l := plog.NewLogs() + mockLogs.CopyTo(l) + + assert.NoError(t, exp.ConsumeLogs(context.Background(), l)) + } +} + // see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/26462 func TestLogsAttributeWithOTTLDoesNotCauseCrash(t *testing.T) { // prepare diff --git a/processor/routingprocessor/metrics.go b/processor/routingprocessor/metrics.go index a82203205ac9..0570b7a4a678 100644 --- a/processor/routingprocessor/metrics.go +++ b/processor/routingprocessor/metrics.go @@ -115,7 +115,7 @@ func (p *metricsProcessor) route(ctx context.Context, tm pmetric.Metrics) error ) matchCount := len(p.router.routes) - for key, route := range p.router.routes { + for _, route := range p.router.routes { _, isMatch, err := route.statement.Execute(ctx, mtx) if err != nil { if p.config.ErrorMode == ottl.PropagateError { @@ -129,7 +129,7 @@ func (p *metricsProcessor) route(ctx context.Context, tm pmetric.Metrics) error matchCount-- continue } - p.group(key, groups, route.exporters, rmetrics) + p.group(route.key, groups, route.exporters, rmetrics) } if matchCount == 0 { diff --git a/processor/routingprocessor/router.go b/processor/routingprocessor/router.go index 8a5f6d6806db..9b20554a6021 100644 --- a/processor/routingprocessor/router.go +++ b/processor/routingprocessor/router.go @@ -26,7 +26,7 @@ type router[E component.Component, K any] struct { table []RoutingTableItem defaultExporters []E - routes map[string]routingItem[E, K] + routes []routingItem[E, K] } // newRouter creates a new router instance with its type parameter constrained @@ -44,12 +44,11 @@ func newRouter[E component.Component, K any]( table: table, defaultExporterIDs: defaultExporterIDs, - - routes: make(map[string]routingItem[E, K]), } } type routingItem[E component.Component, K any] struct { + key string exporters []E statement *ottl.Statement[K] } @@ -90,17 +89,14 @@ func (r *router[E, K]) registerDefaultExporters(available map[component.ID]compo // registerRouteExporters registers route exporters using the provided // available exporters map to check if they were available. func (r *router[E, K]) registerRouteExporters(available map[component.ID]component.Component) error { + var routes []routingItem[E, K] for _, item := range r.table { statement, err := r.getStatementFrom(item) if err != nil { return err } - route, ok := r.routes[key(item)] - if !ok { - route.statement = statement - } - + var exporters []E for _, name := range item.Exporters { e, err := r.extractExporter(name, available) if errors.Is(err, errExporterNotFound) { @@ -109,10 +105,16 @@ func (r *router[E, K]) registerRouteExporters(available map[component.ID]compone if err != nil { return err } - route.exporters = append(route.exporters, e) + exporters = append(exporters, e) } - r.routes[key(item)] = route + routes = append(routes, routingItem[E, K]{ + key: key(item), + exporters: exporters, + statement: statement, + }) } + r.routes = routes + return nil } @@ -171,9 +173,11 @@ func (r *router[E, K]) extractExporter(name string, available map[component.ID]c } func (r *router[E, K]) getExporters(key string) []E { - e, ok := r.routes[key] - if !ok { - return r.defaultExporters + for _, route := range r.routes { + if route.key == key { + return route.exporters + } } - return e.exporters + + return r.defaultExporters } diff --git a/processor/routingprocessor/traces.go b/processor/routingprocessor/traces.go index 7c00e0cf65f3..930e364c3c29 100644 --- a/processor/routingprocessor/traces.go +++ b/processor/routingprocessor/traces.go @@ -114,21 +114,21 @@ func (p *tracesProcessor) route(ctx context.Context, t ptrace.Traces) error { ) matchCount := len(p.router.routes) - for key, route := range p.router.routes { + for _, route := range p.router.routes { _, isMatch, err := route.statement.Execute(ctx, stx) if err != nil { if p.config.ErrorMode == ottl.PropagateError { return err } p.group("", groups, p.router.defaultExporters, rspans) - p.recordNonRoutedResourceSpans(ctx, key, rspans) + p.recordNonRoutedResourceSpans(ctx, route.key, rspans) continue } if !isMatch { matchCount-- continue } - p.group(key, groups, route.exporters, rspans) + p.group(route.key, groups, route.exporters, rspans) } if matchCount == 0 {