Skip to content

Commit

Permalink
[connector/routing] Fix issue where conditions were not deduplicated …
Browse files Browse the repository at this point in the history
…properly (#35962)
  • Loading branch information
djaglowski authored Oct 23, 2024
1 parent 468908c commit a41298f
Show file tree
Hide file tree
Showing 25 changed files with 1,749 additions and 23 deletions.
27 changes: 27 additions & 0 deletions .chloggen/routing-tests-golden.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: connector/routing

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix detection of duplicate conditions in routing table.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35962]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
8 changes: 4 additions & 4 deletions connector/routingconnector/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestLoadConfig(t *testing.T) {
expected component.Config
}{
{
configPath: "config_traces.yaml",
configPath: filepath.Join("testdata", "config", "traces.yaml"),
id: component.NewIDWithName(metadata.Type, ""),
expected: &Config{
DefaultPipelines: []pipeline.ID{
Expand All @@ -49,7 +49,7 @@ func TestLoadConfig(t *testing.T) {
},
},
{
configPath: "config_metrics.yaml",
configPath: filepath.Join("testdata", "config", "metrics.yaml"),
id: component.NewIDWithName(metadata.Type, ""),
expected: &Config{
DefaultPipelines: []pipeline.ID{
Expand All @@ -74,7 +74,7 @@ func TestLoadConfig(t *testing.T) {
},
},
{
configPath: "config_logs.yaml",
configPath: filepath.Join("testdata", "config", "logs.yaml"),
id: component.NewIDWithName(metadata.Type, ""),
expected: &Config{
DefaultPipelines: []pipeline.ID{
Expand Down Expand Up @@ -102,7 +102,7 @@ func TestLoadConfig(t *testing.T) {

for _, tt := range testcases {
t.Run(tt.configPath, func(t *testing.T) {
cm, err := confmaptest.LoadConf(filepath.Join("testdata", tt.configPath))
cm, err := confmaptest.LoadConf(tt.configPath)
require.NoError(t, err)

factory := NewFactory()
Expand Down
92 changes: 92 additions & 0 deletions connector/routingconnector/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,24 @@ package routingconnector // import "github.com/open-telemetry/opentelemetry-coll

import (
"context"
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/confmap/confmaptest"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/connector/connectortest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pipeline"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest"
)

func TestLogsRegisterConsumersForValidRoute(t *testing.T) {
Expand Down Expand Up @@ -465,3 +472,88 @@ func TestLogsConnectorCapabilities(t *testing.T) {
require.NoError(t, err)
assert.False(t, conn.Capabilities().MutatesData)
}

func TestLogsConnectorDetailed(t *testing.T) {
testCases := []string{
filepath.Join("testdata", "logs", "resource_context", "all_match_first_only"),
filepath.Join("testdata", "logs", "resource_context", "all_match_last_only"),
filepath.Join("testdata", "logs", "resource_context", "all_match_once"),
filepath.Join("testdata", "logs", "resource_context", "each_matches_one"),
filepath.Join("testdata", "logs", "resource_context", "match_none_with_default"),
filepath.Join("testdata", "logs", "resource_context", "match_none_without_default"),
}

for _, tt := range testCases {
t.Run(tt, func(t *testing.T) {

cm, err := confmaptest.LoadConf(filepath.Join(tt, "config.yaml"))
require.NoError(t, err)
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
sub, err := cm.Sub("routing")
require.NoError(t, err)
require.NoError(t, sub.Unmarshal(cfg))
require.NoError(t, component.ValidateConfig(cfg))

var sinkDefault, sink0, sink1 consumertest.LogsSink
router := connector.NewLogsRouter(map[pipeline.ID]consumer.Logs{
pipeline.NewIDWithName(pipeline.SignalLogs, "default"): &sinkDefault,
pipeline.NewIDWithName(pipeline.SignalLogs, "0"): &sink0,
pipeline.NewIDWithName(pipeline.SignalLogs, "1"): &sink1,
})

conn, err := factory.CreateLogsToLogs(
context.Background(),
connectortest.NewNopSettings(),
cfg,
router.(consumer.Logs),
)
require.NoError(t, err)

var expected0, expected1, expectedDefault *plog.Logs
if expected, readErr := golden.ReadLogs(filepath.Join(tt, "sink_0.yaml")); readErr == nil {
expected0 = &expected
} else if !os.IsNotExist(readErr) {
t.Fatalf("Error reading sink_0.yaml: %v", readErr)
}

if expected, readErr := golden.ReadLogs(filepath.Join(tt, "sink_1.yaml")); readErr == nil {
expected1 = &expected
} else if !os.IsNotExist(readErr) {
t.Fatalf("Error reading sink_1.yaml: %v", readErr)
}

if expected, readErr := golden.ReadLogs(filepath.Join(tt, "sink_default.yaml")); readErr == nil {
expectedDefault = &expected
} else if !os.IsNotExist(readErr) {
t.Fatalf("Error reading sink_default.yaml: %v", readErr)
}

input, readErr := golden.ReadLogs(filepath.Join(tt, "input.yaml"))
require.NoError(t, readErr)

require.NoError(t, conn.ConsumeLogs(context.Background(), input))

if expected0 == nil {
assert.Empty(t, sink0.AllLogs(), "sink0 should be empty")
} else {
require.Len(t, sink0.AllLogs(), 1, "sink0 should have one plog.Logs")
assert.NoError(t, plogtest.CompareLogs(*expected0, sink0.AllLogs()[0]), "sink0 has unexpected result")
}

if expected1 == nil {
assert.Empty(t, sink1.AllLogs(), "sink1 should be empty")
} else {
require.Len(t, sink1.AllLogs(), 1, "sink1 should have one plog.Logs")
assert.NoError(t, plogtest.CompareLogs(*expected1, sink1.AllLogs()[0]), "sink1 has unexpected result")
}

if expectedDefault == nil {
assert.Empty(t, sinkDefault.AllLogs(), "sinkDefault should be empty")
} else {
require.Len(t, sinkDefault.AllLogs(), 1, "sinkDefault should have one plog.Logs")
assert.NoError(t, plogtest.CompareLogs(*expectedDefault, sinkDefault.AllLogs()[0]), "sinkDefault has unexpected result")
}
})
}
}
32 changes: 13 additions & 19 deletions connector/routingconnector/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ func (r *router[C]) registerConsumers(defaultPipelineIDs []pipeline.ID) error {
return err
}

r.normalizeConditions()

// register pipelines for each route
err = r.registerRouteConsumers()
if err != nil {
Expand All @@ -109,11 +111,21 @@ func (r *router[C]) registerDefaultConsumer(pipelineIDs []pipeline.ID) error {
return nil
}

// convert conditions to statements
func (r *router[C]) normalizeConditions() {
for i := range r.table {
item := &r.table[i]
if item.Condition != "" {
item.Statement = fmt.Sprintf("route() where %s", item.Condition)
}
}
}

// registerRouteConsumers registers a consumer for the pipelines configured
// for each route
func (r *router[C]) registerRouteConsumers() error {
for _, item := range r.table {
statement, err := r.getStatementFrom(item)
statement, err := r.parser.ParseStatement(item.Statement)
if err != nil {
return err
}
Expand Down Expand Up @@ -144,24 +156,6 @@ func (r *router[C]) registerRouteConsumers() error {
return nil
}

// getStatementFrom builds a routing OTTL statement from the provided
// routing table entry configuration. If the routing table entry configuration
// does not contain a valid OTTL statement then nil is returned.
func (r *router[C]) getStatementFrom(item RoutingTableItem) (*ottl.Statement[ottlresource.TransformContext], error) {
var statement *ottl.Statement[ottlresource.TransformContext]
if item.Condition != "" {
item.Statement = fmt.Sprintf("route() where %s", item.Condition)
}
if item.Statement != "" {
var err error
statement, err = r.parser.ParseStatement(item.Statement)
if err != nil {
return statement, err
}
}
return statement, nil
}

func key(entry RoutingTableItem) string {
return entry.Statement
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
routing:
default_pipelines:
- logs/default
table:
- condition: attributes["resourceName"] != nil
pipelines:
- logs/0
- condition: attributes["resourceName"] == "resourceY"
pipelines:
- logs/1
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
resourceLogs:
- resource:
attributes:
- key: resourceName
value:
stringValue: resourceA
- key: resourceNameAgain
value:
stringValue: resourceA
schemaUrl: https://opentelemetry.io/schemas/1.6.1
scopeLogs:
- attributes:
- key: scopeName
value:
stringValue: scopeA
- key: scopeNameAgain
value:
stringValue: scopeA
logRecords:
- attributes:
- key: logName
value:
stringValue: logA
- key: logNameAgain
value:
stringValue: logA
body:
stringValue: logA
- attributes:
- key: logName
value:
stringValue: logB
- key: logNameAgain
value:
stringValue: logB
body:
stringValue: logB
schemaUrl: https://opentelemetry.io/schemas/1.6.1
scope:
name: scopeA
version: v0.1.0
- attributes:
- key: scopeName
value:
stringValue: scopeB
- key: scopeNameAgain
value:
stringValue: scopeB
logRecords:
- attributes:
- key: logName
value:
stringValue: logA
- key: logNameAgain
value:
stringValue: logA
body:
stringValue: logA
- attributes:
- key: logName
value:
stringValue: logB
- key: logNameAgain
value:
stringValue: logB
body:
stringValue: logB
schemaUrl: https://opentelemetry.io/schemas/1.6.1
scope:
name: scopeB
version: v0.1.0
- resource:
attributes:
- key: resourceName
value:
stringValue: resourceB
- key: resourceNameAgain
value:
stringValue: resourceB
schemaUrl: https://opentelemetry.io/schemas/1.6.1
scopeLogs:
- attributes:
- key: scopeName
value:
stringValue: scopeA
- key: scopeNameAgain
value:
stringValue: scopeA
logRecords:
- attributes:
- key: logName
value:
stringValue: logA
- key: logNameAgain
value:
stringValue: logA
body:
stringValue: logA
- attributes:
- key: logName
value:
stringValue: logB
- key: logNameAgain
value:
stringValue: logB
body:
stringValue: logB
schemaUrl: https://opentelemetry.io/schemas/1.6.1
scope:
name: scopeA
version: v0.1.0
- attributes:
- key: scopeName
value:
stringValue: scopeB
- key: scopeNameAgain
value:
stringValue: scopeB
logRecords:
- attributes:
- key: logName
value:
stringValue: logA
- key: logNameAgain
value:
stringValue: logA
body:
stringValue: logA
- attributes:
- key: logName
value:
stringValue: logB
- key: logNameAgain
value:
stringValue: logB
body:
stringValue: logB
schemaUrl: https://opentelemetry.io/schemas/1.6.1
scope:
name: scopeB
version: v0.1.0
Loading

0 comments on commit a41298f

Please sign in to comment.