Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/filter] Add ottl to filter processor #16369

Merged
merged 6 commits into from
Nov 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .chloggen/fp-add-ottl.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: filterprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add ability to filter spans, span events, metrics, datapoints, and logs via OTTL conditions

# One or more tracking issues related to the change
issues: [16369]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
61 changes: 58 additions & 3 deletions processor/filterprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@

The filter processor can be configured to include or exclude:

- logs, based on resource attributes using the `strict` or `regexp` match types
- metrics based on metric name in the case of the `strict` or `regexp` match types,
- Logs, based on OTTL conditions or resource attributes using the `strict` or `regexp` match types
- Metrics based on OTTL Conditions or metric name in the case of the `strict` or `regexp` match types,
or based on other metric attributes in the case of the `expr` match type.
Please refer to [config.go](./config.go) for the config spec.
- Spans based on span names, and resource attributes, all with full regex support
- Data points based on OTTL conditions
- Spans based on OTTL conditions or span names and resource attributes, all with full regex support
- Span Events based on OTTL conditions.

For OTTL conditions configuration see [OTTL](#ottl). For all other options, continue reading.

It takes a pipeline type, of which `logs` `metrics`, and `traces` are supported, followed
by an action:
Expand Down Expand Up @@ -283,6 +287,57 @@ processors:
Value: (localhost|127.0.0.1)
```

## OTTL
The [OpenTelemetry Transformation Language](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/README.md) is a language for interacting with telemetry within the collector in generic ways.
The filterprocessor can be configured to use OTTL conditions to determine when to drop telemetry.
If any condition is met, the telemetry is dropped.
Each configuration option corresponds with a different type of telemetry and OTTL Context.
See the table below for details on each context and the fields it exposes.

| Config | OTTL Context |
|---------------------|------------------------------------------------------------------------------------------------------------------------------------|
| `spans.span` | [Span](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/contexts/ottlspan/README.md) |
| `spans.spanevent` | [SpanEvent](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/contexts/ottlspanevent/README.md) |
| `metrics.metric` | [Metric](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/contexts/ottlmetric/README.md) |
| `metrics.datapoint` | [DataPoint](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/contexts/ottldatapoint/README.md) |
| `logs.log` | [Log](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/contexts/ottllog/README.md) |

The OTTL allows the use of `and`, `or`, and `()` in conditions.
See [OTTL Boolean Expressions](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/README.md#boolean-expressions) for more details.

For conditions that apply to the same signal, such as spans and span events, if the "higher" level telemetry matches a condition and is dropped, the "lower" level condition will not be checked.
This means that if a span is dropped but a span event condition was defined, the span event condition will not be checked.
The same relationship applies to metrics and datapoints.

If all span events for a span are dropped, the span will be left intact.
If all datapoints for a metric are dropped, the metric will also be dropped.

### OTTL Examples

```yaml
processors:
filter:
traces:
span:
- 'attributes["container.name"] == "app_container_1"'
- 'resource.attributes["host.name"] == "localhost"'
- 'name == "app_3"'
spanevent:
- 'attributes["grpc"] == true'
- 'IsMatch(name, ".*grpc.*") == true'
metrics:
metric:
- 'name == "my.metric" and attributes["my_label"] == "abc123"'
- 'type == METRIC_DATA_TYPE_HISTOGRAM'
datapoint:
- 'metric.type == METRIC_DATA_TYPE_SUMMARY'
- 'resource.attributes["service.name"] == "my_service_name"'
logs:
log_record:
- 'IsMatch(body, ".*password.*") == true'
- 'severity_number < SEVERITY_NUMBER_WARN'
```

[alpha]:https://github.com/open-telemetry/opentelemetry-collector#alpha
[contrib]:https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
[core]:https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol
89 changes: 83 additions & 6 deletions processor/filterprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,18 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/processor/filterconfig"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/processor/filtermetric"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/processor/filterset"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/processor/filterset/regexp"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottldatapoint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspan"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspanevent"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor/internal/common"
)

// Config defines configuration for Resource processor.
Expand All @@ -39,6 +46,8 @@ type Config struct {
Logs LogFilters `mapstructure:"logs"`

Spans SpanFilters `mapstructure:"spans"`

Traces TraceFilters `mapstructure:"traces"`
}

// MetricFilters filters by Metric properties.
Expand All @@ -55,6 +64,16 @@ type MetricFilters struct {

// RegexpConfig specifies options for the Regexp match type
RegexpConfig *regexp.Config `mapstructure:"regexp"`

// MetricConditions is a list of OTTL conditions for an ottlmetric context.
// If any condition resolves to true, the metric will be dropped.
// Supports `and`, `or`, and `()`
MetricConditions []string `mapstructure:"metric"`

// DataPointConditions is a list of OTTL conditions for an ottldatapoint context.
// If any condition resolves to true, the datapoint will be dropped.
// Supports `and`, `or`, and `()`
DataPointConditions []string `mapstructure:"datapoint"`
}

// SpanFilters filters by Span attributes and various other fields, Regexp config is per matcher
Expand All @@ -70,6 +89,19 @@ type SpanFilters struct {
Exclude *filterconfig.MatchProperties `mapstructure:"exclude"`
}

// TraceFilters filters by OTTL conditions
type TraceFilters struct {
// SpanConditions is a list of OTTL conditions for an ottlspan context.
// If any condition resolves to true, the span will be dropped.
// Supports `and`, `or`, and `()`
SpanConditions []string `mapstructure:"span"`

// SpanEventConditions is a list of OTTL conditions for an ottlspanevent context.
// If any condition resolves to true, the span event will be dropped.
// Supports `and`, `or`, and `()`
SpanEventConditions []string `mapstructure:"spanevent"`
}

// LogFilters filters by Log properties.
type LogFilters struct {
// Include match properties describe logs that should be included in the Collector Service pipeline,
Expand All @@ -80,6 +112,11 @@ type LogFilters struct {
// all other logs should be included.
// If both Include and Exclude are specified, Include filtering occurs first.
Exclude *LogMatchProperties `mapstructure:"exclude"`

// LogConditions is a list of OTTL conditions for an ottllog context.
// If any condition resolves to true, the log event will be dropped.
// Supports `and`, `or`, and `()`
LogConditions []string `mapstructure:"log_record"`
}

// LogMatchType specifies the strategy for matching against `plog.Log`s.
Expand Down Expand Up @@ -254,15 +291,55 @@ var _ component.ProcessorConfig = (*Config)(nil)

// Validate checks if the processor configuration is valid
func (cfg *Config) Validate() error {
var err error
if (cfg.Traces.SpanConditions != nil || cfg.Traces.SpanEventConditions != nil) && (cfg.Spans.Include != nil || cfg.Spans.Exclude != nil) {
return fmt.Errorf("cannot use ottl conditions and include/exclude for spans at the same time")
}
if (cfg.Metrics.MetricConditions != nil || cfg.Metrics.DataPointConditions != nil) && (cfg.Metrics.Include != nil || cfg.Metrics.Exclude != nil) {
return fmt.Errorf("cannot use ottl conditions and include/exclude for metrics at the same time")
}
if cfg.Logs.LogConditions != nil && (cfg.Logs.Include != nil || cfg.Logs.Exclude != nil) {
return fmt.Errorf("cannot use ottl conditions and include/exclude for logs at the same time")
}

var errors error

if cfg.Traces.SpanConditions != nil {
spanp := ottlspan.NewParser(common.Functions[ottlspan.TransformContext](), component.TelemetrySettings{Logger: zap.NewNop()})
_, err := spanp.ParseStatements(common.PrepareConditionForParsing(cfg.Traces.SpanConditions))
errors = multierr.Append(errors, err)
}

if cfg.Traces.SpanEventConditions != nil {
spaneventp := ottlspanevent.NewParser(common.Functions[ottlspanevent.TransformContext](), component.TelemetrySettings{Logger: zap.NewNop()})
_, err := spaneventp.ParseStatements(common.PrepareConditionForParsing(cfg.Traces.SpanEventConditions))
errors = multierr.Append(errors, err)
}

if cfg.Metrics.MetricConditions != nil {
metricp := ottlmetric.NewParser(common.Functions[ottlmetric.TransformContext](), component.TelemetrySettings{Logger: zap.NewNop()})
_, err := metricp.ParseStatements(common.PrepareConditionForParsing(cfg.Metrics.MetricConditions))
errors = multierr.Append(errors, err)
}

if cfg.Metrics.DataPointConditions != nil {
datapointp := ottldatapoint.NewParser(common.Functions[ottldatapoint.TransformContext](), component.TelemetrySettings{Logger: zap.NewNop()})
_, err := datapointp.ParseStatements(common.PrepareConditionForParsing(cfg.Metrics.DataPointConditions))
errors = multierr.Append(errors, err)
}

if cfg.Logs.LogConditions != nil {
logp := ottllog.NewParser(common.Functions[ottllog.TransformContext](), component.TelemetrySettings{Logger: zap.NewNop()})
_, err := logp.ParseStatements(common.PrepareConditionForParsing(cfg.Logs.LogConditions))
errors = multierr.Append(errors, err)
}

if cfg.Logs.Include != nil {
err = multierr.Append(err, cfg.Logs.Include.validate())
if cfg.Logs.LogConditions != nil && cfg.Logs.Include != nil {
errors = multierr.Append(errors, cfg.Logs.Include.validate())
}

if cfg.Logs.Exclude != nil {
err = multierr.Append(err, cfg.Logs.Exclude.validate())
if cfg.Logs.LogConditions != nil && cfg.Logs.Exclude != nil {
errors = multierr.Append(errors, cfg.Logs.Exclude.validate())
}

return err
return errors
}
102 changes: 102 additions & 0 deletions processor/filterprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -841,3 +841,105 @@ func TestLogSeverity_severityValidate(t *testing.T) {
})
}
}

func TestLoadingConfigOTTL(t *testing.T) {

cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config_ottl.yaml"))
require.NoError(t, err)

tests := []struct {
id component.ID
expected *Config
errorMessage string
}{
{
id: component.NewIDWithName("filter", "ottl"),
expected: &Config{
ProcessorSettings: config.NewProcessorSettings(component.NewID(typeStr)),
Traces: TraceFilters{
SpanConditions: []string{
`attributes["test"] == "pass"`,
},
SpanEventConditions: []string{
`attributes["test"] == "pass"`,
},
},
Metrics: MetricFilters{
MetricConditions: []string{
`name == "pass"`,
},
DataPointConditions: []string{
`attributes["test"] == "pass"`,
},
},
Logs: LogFilters{
LogConditions: []string{
`attributes["test"] == "pass"`,
},
},
},
},
{
id: component.NewIDWithName("filter", "multiline"),
expected: &Config{
ProcessorSettings: config.NewProcessorSettings(component.NewID(typeStr)),
Traces: TraceFilters{
SpanConditions: []string{
`attributes["test"] == "pass"`,
`attributes["test"] == "also pass"`,
},
},
},
},
{
id: component.NewIDWithName(typeStr, "spans_mix_config"),
errorMessage: "cannot use ottl conditions and include/exclude for spans at the same time",
},
{
id: component.NewIDWithName(typeStr, "metrics_mix_config"),
errorMessage: "cannot use ottl conditions and include/exclude for metrics at the same time",
},
{
id: component.NewIDWithName(typeStr, "logs_mix_config"),
errorMessage: "cannot use ottl conditions and include/exclude for logs at the same time",
},
{
id: component.NewIDWithName(typeStr, "bad_syntax_span"),
errorMessage: "1:24: unexpected token \"[\" (expected <opcomparison> Value)",
},
{
id: component.NewIDWithName(typeStr, "bad_syntax_spanevent"),
errorMessage: "1:24: unexpected token \"[\" (expected <opcomparison> Value)",
},
{
id: component.NewIDWithName(typeStr, "bad_syntax_metric"),
errorMessage: "1:33: unexpected token \"[\" (expected <opcomparison> Value)",
},
{
id: component.NewIDWithName(typeStr, "bad_syntax_datapoint"),
errorMessage: "1:24: unexpected token \"[\" (expected <opcomparison> Value)",
},
{
id: component.NewIDWithName(typeStr, "bad_syntax_log"),
errorMessage: "1:24: unexpected token \"[\" (expected <opcomparison> Value)",
},
}

for _, tt := range tests {
t.Run(tt.id.String(), func(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()

sub, err := cm.Sub(tt.id.String())
require.NoError(t, err)
require.NoError(t, component.UnmarshalProcessorConfig(sub, cfg))

if tt.expected == nil {
assert.EqualError(t, component.ValidateConfig(cfg), tt.errorMessage)
} else {
assert.NoError(t, component.ValidateConfig(cfg))
assert.Equal(t, tt.expected, cfg)
}
})
}
}
2 changes: 1 addition & 1 deletion processor/filterprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func createLogsProcessor(
set,
cfg,
nextConsumer,
fp.ProcessLogs,
fp.processLogs,
processorhelper.WithCapabilities(processorCapabilities))
}

Expand Down
10 changes: 8 additions & 2 deletions processor/filterprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.18

require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.64.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.64.0
github.com/stretchr/testify v1.8.1
go.opentelemetry.io/collector v0.64.2-0.20221119033128-e3509cd9f772
go.opentelemetry.io/collector/component v0.0.0-20221119033128-e3509cd9f772
Expand All @@ -14,28 +15,31 @@ require (
)

require (
github.com/alecthomas/participle/v2 v2.0.0-beta.5 // indirect
github.com/antonmedv/expr v1.9.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/gobwas/glob v0.2.3 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/iancoleman/strcase v0.2.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/knadh/koanf v1.4.4 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.6.1 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/collector/featuregate v0.0.0-20221119033128-e3509cd9f772 // indirect
go.opentelemetry.io/collector/semconv v0.64.2-0.20221119033128-e3509cd9f772 // indirect
go.opentelemetry.io/otel v1.11.1 // indirect
go.opentelemetry.io/otel/metric v0.33.0 // indirect
go.opentelemetry.io/otel/trace v1.11.1 // indirect
go.uber.org/atomic v1.10.0 // indirect
golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect
golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e // indirect
golang.org/x/sys v0.2.0 // indirect
golang.org/x/text v0.4.0 // indirect
Expand All @@ -46,3 +50,5 @@ require (
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal => ../../internal/coreinternal

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl => ../../pkg/ottl
Loading