diff --git a/processor/routingprocessor/logs_test.go b/processor/routingprocessor/logs_test.go index c539af6d6c9d..158597561e80 100644 --- a/processor/routingprocessor/logs_test.go +++ b/processor/routingprocessor/logs_test.go @@ -135,8 +135,8 @@ func TestLogs_RoutingWorks_Context_Ordered(t *testing.T) { lExpSecond := &mockLogsExporter{} lExpThird := &mockLogsExporter{} - host := newMockHost(map[component.DataType]map[component.ID]component.Component{ - component.DataTypeLogs: { + host := newMockHost(map[pipeline.Signal]map[component.ID]component.Component{ + pipeline.SignalLogs: { component.MustNewID("otlp"): defaultExp, component.MustNewIDWithName("otlp", "first"): lExpFirst, component.MustNewIDWithName("otlp", "second"): lExpSecond, @@ -247,8 +247,8 @@ func TestLogs_RoutingWorks_ResourceAttribute_Ordered(t *testing.T) { lExpSecond := &mockLogsExporter{} lExpThird := &mockLogsExporter{} - host := newMockHost(map[component.DataType]map[component.ID]component.Component{ - component.DataTypeLogs: { + host := newMockHost(map[pipeline.Signal]map[component.ID]component.Component{ + pipeline.SignalLogs: { component.MustNewID("otlp"): defaultExp, component.MustNewIDWithName("otlp", "first"): lExpFirst, component.MustNewIDWithName("otlp", "second"): lExpSecond, @@ -517,8 +517,8 @@ func TestLogs_RoutingWorks_ResourceAttribute_WithOTTL_Ordered(t *testing.T) { lExpFirst := &mockLogsExporter{} lExpSecond := &mockLogsExporter{} - host := newMockHost(map[component.DataType]map[component.ID]component.Component{ - component.DataTypeLogs: { + host := newMockHost(map[pipeline.Signal]map[component.ID]component.Component{ + pipeline.SignalLogs: { component.MustNewID("otlp"): defaultExp, component.MustNewIDWithName("otlp", "first"): lExpFirst, component.MustNewIDWithName("otlp", "second"): lExpSecond, @@ -531,15 +531,15 @@ func TestLogs_RoutingWorks_ResourceAttribute_WithOTTL_Ordered(t *testing.T) { DefaultExporters: []string{"otlp"}, Table: []RoutingTableItem{ { - Statement: `route() where resource.attributes["__otel_enabled__"] == nil`, + Statement: `route() where resource.attributes["non-matching"] != nil`, Exporters: []string{"otlp/first"}, }, { - Statement: `delete_key(resource.attributes, "__otel_enabled__") where resource.attributes["__otel_enabled__"] == "true"`, + Statement: `route() where resource.attributes["non-matching"] == "true"`, Exporters: []string{"otlp/first"}, }, { - Statement: `delete_key(resource.attributes, "__otel_enabled__") where resource.attributes["__otel_enabled__"] == "false"`, + Statement: `route() where resource.attributes["matching"] == "true"`, Exporters: []string{"otlp/second"}, }, }, @@ -551,7 +551,7 @@ func TestLogs_RoutingWorks_ResourceAttribute_WithOTTL_Ordered(t *testing.T) { t.Run(fmt.Sprintf("run %d time", i), func(t *testing.T) { l := plog.NewLogs() rl := l.ResourceLogs().AppendEmpty() - rl.Resource().Attributes().PutStr("__otel_enabled__", "false") + rl.Resource().Attributes().PutStr("matching", "true") rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("this is a log") assert.NoError(t, exp.ConsumeLogs(context.Background(), l)) @@ -565,6 +565,55 @@ func TestLogs_RoutingWorks_ResourceAttribute_WithOTTL_Ordered(t *testing.T) { } } +func BenchmarkLogs_Routing(t *testing.B) { + defaultExp := &mockLogsExporter{} + lExpFirst := &mockLogsExporter{} + lExpSecond := &mockLogsExporter{} + + host := newMockHost(map[pipeline.Signal]map[component.ID]component.Component{ + pipeline.SignalLogs: { + component.MustNewID("otlp"): defaultExp, + component.MustNewIDWithName("otlp", "first"): lExpFirst, + component.MustNewIDWithName("otlp", "second"): lExpSecond, + }, + }) + + exp, err := newLogProcessor(noopTelemetrySettings, &Config{ + FromAttribute: "__otel_enabled__", + AttributeSource: resourceAttributeSource, + DefaultExporters: []string{"otlp"}, + Table: []RoutingTableItem{ + { + Statement: `route() where resource.attributes["non-matching"] != nil`, + Exporters: []string{"otlp/first"}, + }, + { + Statement: `route() where resource.attributes["non-matching"] == "true"`, + Exporters: []string{"otlp/first"}, + }, + { + Statement: `route() where resource.attributes["matching"] == "true"`, + Exporters: []string{"otlp/second"}, + }, + }, + }) + require.NoError(t, err) + require.NoError(t, exp.Start(context.Background(), host)) + + mockLogs := plog.NewLogs() + for i := 0; i < 100; i++ { + rl := mockLogs.ResourceLogs().AppendEmpty() + rl.Resource().Attributes().PutStr("matching", "true") + } + + for i := 0; i < t.N; i++ { + l := plog.NewLogs() + mockLogs.CopyTo(l) + + assert.NoError(t, exp.ConsumeLogs(context.Background(), l)) + } +} + // see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/26462 func TestLogsAttributeWithOTTLDoesNotCauseCrash(t *testing.T) { // prepare