Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/transform] Add drop function to the transform processor #16297

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .chloggen/tp-drop.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: transformprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Adds a `drop` function to the transform processor that allows dropping of telemetry.

# One or more tracking issues related to the change
issues: [16297]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
59 changes: 53 additions & 6 deletions processor/transformprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,15 @@ The transform processor modifies telemetry based on configuration using the [Ope
For each signal type, the processor takes a list of statements associated to a [Context type](#contexts) and executes the statements against the incoming telemetry in the order specified in the config.
Each statement can access and transform telemetry using functions and allow the use of a condition to help decide whether the function should be executed.

**Tables of Contents**
TylerHelmuth marked this conversation as resolved.
Show resolved Hide resolved
- [Config](#config)
- [Example](#example)
- [Grammar](#grammar)
- [Contexts](#contexts)
- [Supported Functions](#supported-functions)
- [Contributing](#contributing)
- [Warnings](#warnings)

## Config

The transform processor allows configuring multiple context statements for traces, metrics, and logs.
Expand Down Expand Up @@ -105,11 +114,11 @@ The contexts allow the OTTL to interact with the underlying telemetry data in it

- [Resource Context](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/contexts/ottlresource)
- [Scope Context](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/contexts/ottlscope)
- [Span Context](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/contexts/ottlspan) <!-- markdown-link-check-disable-line -->
- [Span Context](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/contexts/ottlspan)
- [SpanEvent Context](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/contexts/ottlspanevent)
- [Metric Context](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/contexts/ottlmetric)
- [DataPoint Context](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/contexts/ottldatapoint) <!-- markdown-link-check-disable-line -->
- [Log Context](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/contexts/ottllog) <!-- markdown-link-check-disable-line -->
- [DataPoint Context](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/contexts/ottldatapoint)
- [Log Context](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/contexts/ottllog)

Each context allows transformation of its type of telemetry.
For example, statements associated to a `resource` context will be able to transform the resource's `attributes` and `dropped_attributes_count`.
Expand Down Expand Up @@ -152,17 +161,55 @@ This is because contexts are nested: the efficiency comes because higher-level c
## Supported functions:

Since the transform processor utilizes the OTTL's contexts for Traces, Metrics, and Logs, it is able to utilize functions that expect pdata in addition to any common functions. These common functions can be used for any signal.
<!-- markdown-link-check-disable-next-line -->

- [OTTL Functions](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/ottl/ottlfuncs)

In addition to OTTL functions, the processor defines its own functions to help with transformations specific to this processor:

**Common functions**
- [drop](#drop)

**Metrics only functions**
- [convert_sum_to_gauge](#convert_sum_to_gauge)
- [convert_gauge_to_sum](#convert_gauge_to_sum)
- [convert_summary_count_val_to_sum](#convert_summary_count_val_to_sum)
- [convert_summary_sum_val_to_sum](#convert_summary_sum_val_to_sum)

## drop

`drop()`

Drops the specified [Context's](#contexts) telemetry from the collector. Best used with a condition, otherwise all the Context's telemetry would be dropped.
TylerHelmuth marked this conversation as resolved.
Show resolved Hide resolved

If a Metric ever has an empty DataPoints slice, the metric will also be dropped.
If a Scope ever has an empty Span/Metric/Log slice the scope will also be dropped.
If a Resource ever has an empty Scope slice the resource will also be dropped.
An empty SpanEvents slice will not cause a Span to be dropped.

**Be very careful when dropping telemetry**. Dropping telemetry can result in [unsound transformations or orphaned telemetry](#warnings).
Drop does not attempt to reconcile any issues such as orphaned spans/logs or re-aggregation of metric datapoints.

Examples:
```yaml
- context: datapoint
statements:
# drops any datapoint that has an attribute named "test" with a value of "pass".
- drop() where attributes["test"] == "pass"
```
```yaml
- context: resource
statements:
# drops any resource (and therefore all its scopes and spans/metrics/logs)
# that has an attribute named "test" with a value of "pass".
- drop() where attributes["test"] == "pass"
```
```yaml
- context: spanevent
statements:
# drops any span event that has a name of "drop-me"
- drop() where name == "drop-me"
TylerHelmuth marked this conversation as resolved.
Show resolved Hide resolved
```

## convert_sum_to_gauge

`convert_sum_to_gauge()`
Expand Down Expand Up @@ -241,8 +288,8 @@ The transform processor's implementation of the [OpenTelemetry Transformation La

- [Unsound Transformations](https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/standard-warnings.md#unsound-transformations): Several Metric-only functions allow you to transform one metric data type to another or create new metrics from an existing metrics. Transformations between metric data types are not defined in the [metrics data model](https://github.com/open-telemetry/opentelemetry-specification/blob/main//specification/metrics/data-model.md). These functions have the expectation that you understand the incoming data and know that it can be meaningfully converted to a new metric data type or can meaningfully be used to create new metrics.
- Although the OTTL allows the `set` function to be used with `metric.data_type`, its implementation in the transform processor is NOOP. To modify a data type you must use a function specific to that purpose.
- [Identity Conflict](https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/standard-warnings.md#identity-conflict): Transformation of metrics have the potential to affect the identity of a metric leading to an Identity Crisis. Be especially cautious when transforming metric name and when reducing/changing existing attributes. Adding new attributes is safe.
- [Orphaned Telemetry](https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/standard-warnings.md#orphaned-telemetry): The processor allows you to modify `span_id`, `trace_id`, and `parent_span_id` for traces and `span_id`, and `trace_id` logs. Modifying these fields could lead to orphaned spans or logs.
- [Identity Conflict](https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/standard-warnings.md#identity-conflict): Transformation of metrics have the potential to affect the identity of a metric leading to an Identity Crisis. Be especially cautious when transforming metric name, when reducing/changing existing attributes, and when dropping datapoints. Adding new attributes is safe.
- [Orphaned Telemetry](https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/standard-warnings.md#orphaned-telemetry): The processor allows you to modify `span_id`, `trace_id`, and `parent_span_id` for traces and `span_id`, and `trace_id` logs. It also allows dropping spans and logs. Modifying these fields or dropping spans/logs could lead to orphaned spans or logs.

[alpha]: https://github.com/open-telemetry/opentelemetry-collector#alpha
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
2 changes: 1 addition & 1 deletion processor/transformprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/stretchr/testify v1.8.1
go.opentelemetry.io/collector v0.64.2-0.20221115155901-1550938c18fd
go.opentelemetry.io/collector/pdata v0.64.2-0.20221115155901-1550938c18fd
go.uber.org/multierr v1.8.0
go.uber.org/zap v1.23.0
)

Expand All @@ -33,7 +34,6 @@ require (
go.opentelemetry.io/otel/metric v0.33.0 // indirect
go.opentelemetry.io/otel/trace v1.11.1 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect
golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e // indirect
golang.org/x/sys v0.2.0 // indirect
Expand Down
27 changes: 27 additions & 0 deletions processor/transformprocessor/internal/common/func_drop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package common // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"

import (
"context"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
)

func drop[K any]() (ottl.ExprFunc[K], error) {
TylerHelmuth marked this conversation as resolved.
Show resolved Hide resolved
return func(ctx context.Context, tCtx K) (interface{}, error) {
return shouldRemove(true), nil
}, nil
}
30 changes: 30 additions & 0 deletions processor/transformprocessor/internal/common/func_drop_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package common // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
)

func Test_drop(t *testing.T) {
exprFunc, err := drop[any]()
assert.NoError(t, err)
result, err := exprFunc(context.Background(), nil)
assert.NoError(t, err)
assert.Equal(t, shouldRemove(true), result)
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func Functions[K any]() map[string]interface{} {
"replace_all_patterns": ottlfuncs.ReplaceAllPatterns[K],
"delete_key": ottlfuncs.DeleteKey[K],
"delete_matching_keys": ottlfuncs.DeleteMatchingKeys[K],
"drop": drop[K],
}
}

Expand Down
33 changes: 17 additions & 16 deletions processor/transformprocessor/internal/common/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/multierr"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog"
Expand All @@ -38,23 +39,23 @@ func (l logStatements) Capabilities() consumer.Capabilities {
}

func (l logStatements) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
for i := 0; i < ld.ResourceLogs().Len(); i++ {
rlogs := ld.ResourceLogs().At(i)
for j := 0; j < rlogs.ScopeLogs().Len(); j++ {
slogs := rlogs.ScopeLogs().At(j)
logs := slogs.LogRecords()
for k := 0; k < logs.Len(); k++ {
tCtx := ottllog.NewTransformContext(logs.At(k), slogs.Scope(), rlogs.Resource())
for _, statement := range l {
_, _, err := statement.Execute(ctx, tCtx)
if err != nil {
return err
}
var errors error
ld.ResourceLogs().RemoveIf(func(rlogs plog.ResourceLogs) bool {
rlogs.ScopeLogs().RemoveIf(func(slogs plog.ScopeLogs) bool {
slogs.LogRecords().RemoveIf(func(logRecord plog.LogRecord) bool {
tCtx := ottllog.NewTransformContext(logRecord, slogs.Scope(), rlogs.Resource())
remove, err := executeStatements(ctx, tCtx, l)
if err != nil {
errors = multierr.Append(errors, err)
return false
}
}
}
}
return nil
return bool(remove)
})
return slogs.LogRecords().Len() == 0
})
return rlogs.ScopeLogs().Len() == 0
})
return errors
}

type LogParserCollection struct {
Expand Down
Loading